Add tracing middleware and optionally configure an http-binary opentelemetry exporter (eg. for honeycomb.io)
+162
Cargo.lock
+162
Cargo.lock
···
39
39
"http-body-util",
40
40
"log",
41
41
"native-tls",
42
+
"opentelemetry",
43
+
"opentelemetry-otlp",
44
+
"opentelemetry_sdk",
42
45
"poem",
43
46
"postgres-native-tls",
44
47
"reqwest",
···
52
55
"tokio-postgres",
53
56
"tokio-stream",
54
57
"tokio-util",
58
+
"tracing",
59
+
"tracing-opentelemetry",
55
60
"tracing-subscriber",
56
61
]
57
62
···
1583
1588
"vcpkg",
1584
1589
]
1585
1590
1591
+
[[package]]
1592
+
name = "opentelemetry"
1593
+
version = "0.30.0"
1594
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1595
+
checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
1596
+
dependencies = [
1597
+
"futures-core",
1598
+
"futures-sink",
1599
+
"js-sys",
1600
+
"pin-project-lite",
1601
+
"thiserror 2.0.16",
1602
+
"tracing",
1603
+
]
1604
+
1605
+
[[package]]
1606
+
name = "opentelemetry-http"
1607
+
version = "0.30.0"
1608
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1609
+
checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d"
1610
+
dependencies = [
1611
+
"async-trait",
1612
+
"bytes",
1613
+
"http",
1614
+
"opentelemetry",
1615
+
"reqwest",
1616
+
]
1617
+
1618
+
[[package]]
1619
+
name = "opentelemetry-otlp"
1620
+
version = "0.30.0"
1621
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1622
+
checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b"
1623
+
dependencies = [
1624
+
"http",
1625
+
"opentelemetry",
1626
+
"opentelemetry-http",
1627
+
"opentelemetry-proto",
1628
+
"opentelemetry_sdk",
1629
+
"prost",
1630
+
"reqwest",
1631
+
"thiserror 2.0.16",
1632
+
"tracing",
1633
+
]
1634
+
1635
+
[[package]]
1636
+
name = "opentelemetry-proto"
1637
+
version = "0.30.0"
1638
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1639
+
checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc"
1640
+
dependencies = [
1641
+
"opentelemetry",
1642
+
"opentelemetry_sdk",
1643
+
"prost",
1644
+
"tonic",
1645
+
]
1646
+
1647
+
[[package]]
1648
+
name = "opentelemetry_sdk"
1649
+
version = "0.30.0"
1650
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1651
+
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
1652
+
dependencies = [
1653
+
"futures-channel",
1654
+
"futures-executor",
1655
+
"futures-util",
1656
+
"opentelemetry",
1657
+
"percent-encoding",
1658
+
"rand 0.9.2",
1659
+
"serde_json",
1660
+
"thiserror 2.0.16",
1661
+
"tokio",
1662
+
"tokio-stream",
1663
+
]
1664
+
1586
1665
[[package]]
1587
1666
name = "parking_lot"
1588
1667
version = "0.11.2"
···
1665
1744
"siphasher",
1666
1745
]
1667
1746
1747
+
[[package]]
1748
+
name = "pin-project"
1749
+
version = "1.1.10"
1750
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1751
+
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
1752
+
dependencies = [
1753
+
"pin-project-internal",
1754
+
]
1755
+
1756
+
[[package]]
1757
+
name = "pin-project-internal"
1758
+
version = "1.1.10"
1759
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1760
+
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
1761
+
dependencies = [
1762
+
"proc-macro2",
1763
+
"quote",
1764
+
"syn",
1765
+
]
1766
+
1668
1767
[[package]]
1669
1768
name = "pin-project-lite"
1670
1769
version = "0.2.16"
···
1839
1938
"unicode-ident",
1840
1939
]
1841
1940
1941
+
[[package]]
1942
+
name = "prost"
1943
+
version = "0.13.5"
1944
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1945
+
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
1946
+
dependencies = [
1947
+
"bytes",
1948
+
"prost-derive",
1949
+
]
1950
+
1951
+
[[package]]
1952
+
name = "prost-derive"
1953
+
version = "0.13.5"
1954
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1955
+
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
1956
+
dependencies = [
1957
+
"anyhow",
1958
+
"itertools",
1959
+
"proc-macro2",
1960
+
"quote",
1961
+
"syn",
1962
+
]
1963
+
1842
1964
[[package]]
1843
1965
name = "quanta"
1844
1966
version = "0.12.6"
···
2061
2183
"base64",
2062
2184
"bytes",
2063
2185
"encoding_rs",
2186
+
"futures-channel",
2064
2187
"futures-core",
2065
2188
"futures-util",
2066
2189
"h2",
···
2802
2925
"winnow",
2803
2926
]
2804
2927
2928
+
[[package]]
2929
+
name = "tonic"
2930
+
version = "0.13.1"
2931
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2932
+
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
2933
+
dependencies = [
2934
+
"async-trait",
2935
+
"base64",
2936
+
"bytes",
2937
+
"http",
2938
+
"http-body",
2939
+
"http-body-util",
2940
+
"percent-encoding",
2941
+
"pin-project",
2942
+
"prost",
2943
+
"tokio-stream",
2944
+
"tower-layer",
2945
+
"tower-service",
2946
+
"tracing",
2947
+
]
2948
+
2805
2949
[[package]]
2806
2950
name = "tower"
2807
2951
version = "0.5.2"
···
2890
3034
"tracing-core",
2891
3035
]
2892
3036
3037
+
[[package]]
3038
+
name = "tracing-opentelemetry"
3039
+
version = "0.31.0"
3040
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3041
+
checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c"
3042
+
dependencies = [
3043
+
"js-sys",
3044
+
"once_cell",
3045
+
"opentelemetry",
3046
+
"opentelemetry_sdk",
3047
+
"smallvec",
3048
+
"tracing",
3049
+
"tracing-core",
3050
+
"tracing-log",
3051
+
"tracing-subscriber",
3052
+
"web-time",
3053
+
]
3054
+
2893
3055
[[package]]
2894
3056
name = "tracing-subscriber"
2895
3057
version = "0.3.20"
+5
Cargo.toml
+5
Cargo.toml
···
16
16
http-body-util = "0.1.3"
17
17
log = "0.4.28"
18
18
native-tls = "0.2.14"
19
+
opentelemetry = "0.30.0"
20
+
opentelemetry-otlp = { version = "0.30.0" }
21
+
opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] }
19
22
poem = { version = "3.1.12", features = ["acme", "compression"] }
20
23
postgres-native-tls = "0.5.1"
21
24
reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] }
···
29
32
tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
30
33
tokio-stream = { version = "0.1.17", features = ["io-util"] }
31
34
tokio-util = { version = "0.7.16", features = ["compat"] }
35
+
tracing = "0.1.41"
36
+
tracing-opentelemetry = "0.31.0"
32
37
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+24
-4
src/bin/allegedly.rs
+24
-4
src/bin/allegedly.rs
···
1
-
use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream};
1
+
use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init};
2
+
use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream};
2
3
use clap::{CommandFactory, Parser, Subcommand};
3
4
use std::{path::PathBuf, time::Duration, time::Instant};
4
5
use tokio::fs::create_dir_all;
···
48
49
Mirror {
49
50
#[command(flatten)]
50
51
args: mirror::Args,
52
+
#[command(flatten)]
53
+
instrumentation: InstrumentationArgs,
51
54
},
52
55
/// Wrap any did-method-plc server, without syncing upstream (read-only)
53
56
Wrap {
54
57
#[command(flatten)]
55
58
args: mirror::Args,
59
+
#[command(flatten)]
60
+
instrumentation: InstrumentationArgs,
56
61
},
57
62
/// Poll an upstream PLC server and log new ops to stdout
58
63
Tail {
···
62
67
},
63
68
}
64
69
70
+
impl Commands {
71
+
fn enable_otel(&self) -> bool {
72
+
match self {
73
+
Commands::Mirror {
74
+
instrumentation, ..
75
+
}
76
+
| Commands::Wrap {
77
+
instrumentation, ..
78
+
} => instrumentation.enable_opentelemetry,
79
+
_ => false,
80
+
}
81
+
}
82
+
}
83
+
65
84
#[tokio::main]
66
85
async fn main() -> anyhow::Result<()> {
67
86
let args = Cli::parse();
68
87
let matches = Cli::command().get_matches();
69
88
let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
70
-
bin_init(name);
89
+
bin_init(args.command.enable_otel());
90
+
log::info!("{}", logo(name));
71
91
72
92
let globals = args.globals.clone();
73
93
···
96
116
.await
97
117
.expect("to write bundles to output files");
98
118
}
99
-
Commands::Mirror { args } => mirror::run(globals, args, true).await?,
100
-
Commands::Wrap { args } => mirror::run(globals, args, false).await?,
119
+
Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?,
120
+
Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?,
101
121
Commands::Tail { after } => {
102
122
let mut url = globals.upstream;
103
123
url.set_path("/export");
+5
-3
src/bin/backfill.rs
+5
-3
src/bin/backfill.rs
···
1
1
use allegedly::{
2
-
Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs,
3
-
bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream,
2
+
Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg,
3
+
bin::{GlobalArgs, bin_init},
4
+
full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream,
4
5
};
5
6
use clap::Parser;
6
7
use reqwest::Url;
···
199
200
#[tokio::main]
200
201
async fn main() -> anyhow::Result<()> {
201
202
let args = CliArgs::parse();
202
-
bin_init("backfill");
203
+
bin_init(false);
204
+
log::info!("{}", logo("backfill"));
203
205
run(args.globals, args.args).await?;
204
206
Ok(())
205
207
}
+28
src/bin/instrumentation/mod.rs
+28
src/bin/instrumentation/mod.rs
···
1
+
use opentelemetry::trace::TracerProvider as _;
2
+
use opentelemetry_otlp::{Protocol, WithExportConfig};
3
+
use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracer, SdkTracerProvider};
4
+
use tracing::Subscriber;
5
+
use tracing_opentelemetry::OpenTelemetryLayer;
6
+
use tracing_subscriber::registry::LookupSpan;
7
+
8
+
pub fn otel_layer<S>() -> OpenTelemetryLayer<S, SdkTracer>
9
+
where
10
+
S: Subscriber + for<'span> LookupSpan<'span>,
11
+
{
12
+
let exporter = opentelemetry_otlp::SpanExporter::builder()
13
+
.with_http()
14
+
.with_protocol(Protocol::HttpBinary)
15
+
.build()
16
+
.expect("to build otel otlp exporter");
17
+
18
+
let provider = SdkTracerProvider::builder()
19
+
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
20
+
1.0,
21
+
))))
22
+
.with_id_generator(RandomIdGenerator::default())
23
+
.with_batch_exporter(exporter)
24
+
.build();
25
+
26
+
let tracer = provider.tracer("tracing-otel-subscriber");
27
+
tracing_opentelemetry::layer().with_tracer(tracer)
28
+
}
+7
-2
src/bin/mirror.rs
+7
-2
src/bin/mirror.rs
···
1
1
use allegedly::{
2
-
Db, ExperimentalConf, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve,
2
+
Db, ExperimentalConf, ListenConf,
3
+
bin::{GlobalArgs, InstrumentationArgs, bin_init},
4
+
logo, pages_to_pg, poll_upstream, serve,
3
5
};
4
6
use clap::Parser;
5
7
use reqwest::Url;
···
166
168
#[command(flatten)]
167
169
globals: GlobalArgs,
168
170
#[command(flatten)]
171
+
instrumentation: InstrumentationArgs,
172
+
#[command(flatten)]
169
173
args: Args,
170
174
/// Run the mirror in wrap mode, no upstream synchronization (read-only)
171
175
#[arg(long, action)]
···
176
180
#[tokio::main]
177
181
async fn main() -> anyhow::Result<()> {
178
182
let args = CliArgs::parse();
179
-
bin_init("mirror");
183
+
bin_init(args.instrumentation.enable_opentelemetry);
184
+
log::info!("{}", logo("mirror"));
180
185
run(args.globals, args.args, !args.wrap_mode).await?;
181
186
Ok(())
182
187
}
+38
src/bin/mod.rs
+38
src/bin/mod.rs
···
1
+
mod instrumentation;
2
+
1
3
use reqwest::Url;
4
+
use tracing_subscriber::layer::SubscriberExt;
2
5
3
6
#[derive(Debug, Clone, clap::Args)]
4
7
pub struct GlobalArgs {
···
14
17
pub upstream_throttle_ms: u64,
15
18
}
16
19
20
+
#[derive(Debug, Default, Clone, clap::Args)]
21
+
pub struct InstrumentationArgs {
22
+
/// Export traces to an OTLP collector
23
+
///
24
+
/// Configure the colletctor via standard env vars:
25
+
/// - `OTEL_EXPORTER_OTLP_ENDPOINT` eg "https://api.honeycomb.io/"
26
+
/// - `OTEL_EXPORTER_OTLP_HEADERS` eg "x-honeycomb-team=supersecret"
27
+
/// - `OTEL_SERVICE_NAME` eg "my-app"
28
+
#[arg(long, action, global = true, env = "ALLEGEDLY_ENABLE_OTEL")]
29
+
pub enable_opentelemetry: bool,
30
+
}
31
+
32
+
pub fn bin_init(enable_otlp: bool) {
33
+
let filter = tracing_subscriber::EnvFilter::builder()
34
+
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
35
+
.from_env_lossy();
36
+
37
+
let stderr_log = tracing_subscriber::fmt::layer()
38
+
.with_writer(std::io::stderr)
39
+
.pretty();
40
+
41
+
let otel = if enable_otlp {
42
+
Some(instrumentation::otel_layer())
43
+
} else {
44
+
None
45
+
};
46
+
47
+
let subscriber = tracing_subscriber::Registry::default()
48
+
.with(filter)
49
+
.with(stderr_log)
50
+
.with(otel);
51
+
52
+
tracing::subscriber::set_global_default(subscriber).expect("to set global tracing subscriber");
53
+
}
54
+
17
55
#[allow(dead_code)]
18
56
fn main() {
19
57
panic!("this is not actually a module")
-13
src/lib.rs
-13
src/lib.rs
···
145
145
env!("CARGO_PKG_VERSION"),
146
146
)
147
147
}
148
-
149
-
pub fn bin_init(name: &str) {
150
-
if std::env::var_os("RUST_LOG").is_none() {
151
-
unsafe { std::env::set_var("RUST_LOG", "info") };
152
-
}
153
-
let filter = tracing_subscriber::EnvFilter::from_default_env();
154
-
tracing_subscriber::fmt()
155
-
.with_writer(std::io::stderr)
156
-
.with_env_filter(filter)
157
-
.init();
158
-
159
-
log::info!("{}", logo(name));
160
-
}