Opentelemetry tracing #1

merged
opened by bad-example.com targeting main from otel-tracing

Add tracing middleware and optionally configure an http-binary opentelemetry exporter (eg. for honeycomb.io)

Changed files
+269 -22
src
+162
Cargo.lock
··· 39 39 "http-body-util", 40 40 "log", 41 41 "native-tls", 42 + "opentelemetry", 43 + "opentelemetry-otlp", 44 + "opentelemetry_sdk", 42 45 "poem", 43 46 "postgres-native-tls", 44 47 "reqwest", ··· 52 55 "tokio-postgres", 53 56 "tokio-stream", 54 57 "tokio-util", 58 + "tracing", 59 + "tracing-opentelemetry", 55 60 "tracing-subscriber", 56 61 ] 57 62 ··· 1583 1588 "vcpkg", 1584 1589 ] 1585 1590 1591 + [[package]] 1592 + name = "opentelemetry" 1593 + version = "0.30.0" 1594 + source = "registry+https://github.com/rust-lang/crates.io-index" 1595 + checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" 1596 + dependencies = [ 1597 + "futures-core", 1598 + "futures-sink", 1599 + "js-sys", 1600 + "pin-project-lite", 1601 + "thiserror 2.0.16", 1602 + "tracing", 1603 + ] 1604 + 1605 + [[package]] 1606 + name = "opentelemetry-http" 1607 + version = "0.30.0" 1608 + source = "registry+https://github.com/rust-lang/crates.io-index" 1609 + checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" 1610 + dependencies = [ 1611 + "async-trait", 1612 + "bytes", 1613 + "http", 1614 + "opentelemetry", 1615 + "reqwest", 1616 + ] 1617 + 1618 + [[package]] 1619 + name = "opentelemetry-otlp" 1620 + version = "0.30.0" 1621 + source = "registry+https://github.com/rust-lang/crates.io-index" 1622 + checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" 1623 + dependencies = [ 1624 + "http", 1625 + "opentelemetry", 1626 + "opentelemetry-http", 1627 + "opentelemetry-proto", 1628 + "opentelemetry_sdk", 1629 + "prost", 1630 + "reqwest", 1631 + "thiserror 2.0.16", 1632 + "tracing", 1633 + ] 1634 + 1635 + [[package]] 1636 + name = "opentelemetry-proto" 1637 + version = "0.30.0" 1638 + source = "registry+https://github.com/rust-lang/crates.io-index" 1639 + checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" 1640 + dependencies = [ 1641 + "opentelemetry", 1642 + "opentelemetry_sdk", 1643 + "prost", 1644 + "tonic", 1645 + ] 1646 + 1647 + [[package]] 1648 + name = "opentelemetry_sdk" 1649 + version = "0.30.0" 1650 + source = "registry+https://github.com/rust-lang/crates.io-index" 1651 + checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" 1652 + dependencies = [ 1653 + "futures-channel", 1654 + "futures-executor", 1655 + "futures-util", 1656 + "opentelemetry", 1657 + "percent-encoding", 1658 + "rand 0.9.2", 1659 + "serde_json", 1660 + "thiserror 2.0.16", 1661 + "tokio", 1662 + "tokio-stream", 1663 + ] 1664 + 1586 1665 [[package]] 1587 1666 name = "parking_lot" 1588 1667 version = "0.11.2" ··· 1665 1744 "siphasher", 1666 1745 ] 1667 1746 1747 + [[package]] 1748 + name = "pin-project" 1749 + version = "1.1.10" 1750 + source = "registry+https://github.com/rust-lang/crates.io-index" 1751 + checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" 1752 + dependencies = [ 1753 + "pin-project-internal", 1754 + ] 1755 + 1756 + [[package]] 1757 + name = "pin-project-internal" 1758 + version = "1.1.10" 1759 + source = "registry+https://github.com/rust-lang/crates.io-index" 1760 + checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" 1761 + dependencies = [ 1762 + "proc-macro2", 1763 + "quote", 1764 + "syn", 1765 + ] 1766 + 1668 1767 [[package]] 1669 1768 name = "pin-project-lite" 1670 1769 version = "0.2.16" ··· 1839 1938 "unicode-ident", 1840 1939 ] 1841 1940 1941 + [[package]] 1942 + name = "prost" 1943 + version = "0.13.5" 1944 + source = "registry+https://github.com/rust-lang/crates.io-index" 1945 + checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 1946 + dependencies = [ 1947 + "bytes", 1948 + "prost-derive", 1949 + ] 1950 + 1951 + [[package]] 1952 + name = "prost-derive" 1953 + version = "0.13.5" 1954 + source = "registry+https://github.com/rust-lang/crates.io-index" 1955 + checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" 1956 + dependencies = [ 1957 + "anyhow", 1958 + "itertools", 1959 + "proc-macro2", 1960 + "quote", 1961 + "syn", 1962 + ] 1963 + 1842 1964 [[package]] 1843 1965 name = "quanta" 1844 1966 version = "0.12.6" ··· 2061 2183 "base64", 2062 2184 "bytes", 2063 2185 "encoding_rs", 2186 + "futures-channel", 2064 2187 "futures-core", 2065 2188 "futures-util", 2066 2189 "h2", ··· 2802 2925 "winnow", 2803 2926 ] 2804 2927 2928 + [[package]] 2929 + name = "tonic" 2930 + version = "0.13.1" 2931 + source = "registry+https://github.com/rust-lang/crates.io-index" 2932 + checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" 2933 + dependencies = [ 2934 + "async-trait", 2935 + "base64", 2936 + "bytes", 2937 + "http", 2938 + "http-body", 2939 + "http-body-util", 2940 + "percent-encoding", 2941 + "pin-project", 2942 + "prost", 2943 + "tokio-stream", 2944 + "tower-layer", 2945 + "tower-service", 2946 + "tracing", 2947 + ] 2948 + 2805 2949 [[package]] 2806 2950 name = "tower" 2807 2951 version = "0.5.2" ··· 2890 3034 "tracing-core", 2891 3035 ] 2892 3036 3037 + [[package]] 3038 + name = "tracing-opentelemetry" 3039 + version = "0.31.0" 3040 + source = "registry+https://github.com/rust-lang/crates.io-index" 3041 + checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" 3042 + dependencies = [ 3043 + "js-sys", 3044 + "once_cell", 3045 + "opentelemetry", 3046 + "opentelemetry_sdk", 3047 + "smallvec", 3048 + "tracing", 3049 + "tracing-core", 3050 + "tracing-log", 3051 + "tracing-subscriber", 3052 + "web-time", 3053 + ] 3054 + 2893 3055 [[package]] 2894 3056 name = "tracing-subscriber" 2895 3057 version = "0.3.20"
+5
Cargo.toml
··· 16 16 http-body-util = "0.1.3" 17 17 log = "0.4.28" 18 18 native-tls = "0.2.14" 19 + opentelemetry = "0.30.0" 20 + opentelemetry-otlp = { version = "0.30.0" } 21 + opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] } 19 22 poem = { version = "3.1.12", features = ["acme", "compression"] } 20 23 postgres-native-tls = "0.5.1" 21 24 reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] } ··· 29 32 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 30 33 tokio-stream = { version = "0.1.17", features = ["io-util"] } 31 34 tokio-util = { version = "0.7.16", features = ["compat"] } 35 + tracing = "0.1.41" 36 + tracing-opentelemetry = "0.31.0" 32 37 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+24 -4
src/bin/allegedly.rs
··· 1 - use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream}; 1 + use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init}; 2 + use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream}; 2 3 use clap::{CommandFactory, Parser, Subcommand}; 3 4 use std::{path::PathBuf, time::Duration, time::Instant}; 4 5 use tokio::fs::create_dir_all; ··· 48 49 Mirror { 49 50 #[command(flatten)] 50 51 args: mirror::Args, 52 + #[command(flatten)] 53 + instrumentation: InstrumentationArgs, 51 54 }, 52 55 /// Wrap any did-method-plc server, without syncing upstream (read-only) 53 56 Wrap { 54 57 #[command(flatten)] 55 58 args: mirror::Args, 59 + #[command(flatten)] 60 + instrumentation: InstrumentationArgs, 56 61 }, 57 62 /// Poll an upstream PLC server and log new ops to stdout 58 63 Tail { ··· 62 67 }, 63 68 } 64 69 70 + impl Commands { 71 + fn enable_otel(&self) -> bool { 72 + match self { 73 + Commands::Mirror { 74 + instrumentation, .. 75 + } 76 + | Commands::Wrap { 77 + instrumentation, .. 78 + } => instrumentation.enable_opentelemetry, 79 + _ => false, 80 + } 81 + } 82 + } 83 + 65 84 #[tokio::main] 66 85 async fn main() -> anyhow::Result<()> { 67 86 let args = Cli::parse(); 68 87 let matches = Cli::command().get_matches(); 69 88 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 70 - bin_init(name); 89 + bin_init(args.command.enable_otel()); 90 + log::info!("{}", logo(name)); 71 91 72 92 let globals = args.globals.clone(); 73 93 ··· 96 116 .await 97 117 .expect("to write bundles to output files"); 98 118 } 99 - Commands::Mirror { args } => mirror::run(globals, args, true).await?, 100 - Commands::Wrap { args } => mirror::run(globals, args, false).await?, 119 + Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?, 120 + Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?, 101 121 Commands::Tail { after } => { 102 122 let mut url = globals.upstream; 103 123 url.set_path("/export");
+5 -3
src/bin/backfill.rs
··· 1 1 use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, 3 - bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream, 2 + Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, 3 + bin::{GlobalArgs, bin_init}, 4 + full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream, 4 5 }; 5 6 use clap::Parser; 6 7 use reqwest::Url; ··· 199 200 #[tokio::main] 200 201 async fn main() -> anyhow::Result<()> { 201 202 let args = CliArgs::parse(); 202 - bin_init("backfill"); 203 + bin_init(false); 204 + log::info!("{}", logo("backfill")); 203 205 run(args.globals, args.args).await?; 204 206 Ok(()) 205 207 }
+28
src/bin/instrumentation/mod.rs
··· 1 + use opentelemetry::trace::TracerProvider as _; 2 + use opentelemetry_otlp::{Protocol, WithExportConfig}; 3 + use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracer, SdkTracerProvider}; 4 + use tracing::Subscriber; 5 + use tracing_opentelemetry::OpenTelemetryLayer; 6 + use tracing_subscriber::registry::LookupSpan; 7 + 8 + pub fn otel_layer<S>() -> OpenTelemetryLayer<S, SdkTracer> 9 + where 10 + S: Subscriber + for<'span> LookupSpan<'span>, 11 + { 12 + let exporter = opentelemetry_otlp::SpanExporter::builder() 13 + .with_http() 14 + .with_protocol(Protocol::HttpBinary) 15 + .build() 16 + .expect("to build otel otlp exporter"); 17 + 18 + let provider = SdkTracerProvider::builder() 19 + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( 20 + 1.0, 21 + )))) 22 + .with_id_generator(RandomIdGenerator::default()) 23 + .with_batch_exporter(exporter) 24 + .build(); 25 + 26 + let tracer = provider.tracer("tracing-otel-subscriber"); 27 + tracing_opentelemetry::layer().with_tracer(tracer) 28 + }
+7 -2
src/bin/mirror.rs
··· 1 1 use allegedly::{ 2 - Db, ExperimentalConf, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve, 2 + Db, ExperimentalConf, ListenConf, 3 + bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 + logo, pages_to_pg, poll_upstream, serve, 3 5 }; 4 6 use clap::Parser; 5 7 use reqwest::Url; ··· 166 168 #[command(flatten)] 167 169 globals: GlobalArgs, 168 170 #[command(flatten)] 171 + instrumentation: InstrumentationArgs, 172 + #[command(flatten)] 169 173 args: Args, 170 174 /// Run the mirror in wrap mode, no upstream synchronization (read-only) 171 175 #[arg(long, action)] ··· 176 180 #[tokio::main] 177 181 async fn main() -> anyhow::Result<()> { 178 182 let args = CliArgs::parse(); 179 - bin_init("mirror"); 183 + bin_init(args.instrumentation.enable_opentelemetry); 184 + log::info!("{}", logo("mirror")); 180 185 run(args.globals, args.args, !args.wrap_mode).await?; 181 186 Ok(()) 182 187 }
+38
src/bin/mod.rs
··· 1 + mod instrumentation; 2 + 1 3 use reqwest::Url; 4 + use tracing_subscriber::layer::SubscriberExt; 2 5 3 6 #[derive(Debug, Clone, clap::Args)] 4 7 pub struct GlobalArgs { ··· 14 17 pub upstream_throttle_ms: u64, 15 18 } 16 19 20 + #[derive(Debug, Default, Clone, clap::Args)] 21 + pub struct InstrumentationArgs { 22 + /// Export traces to an OTLP collector 23 + /// 24 + /// Configure the colletctor via standard env vars: 25 + /// - `OTEL_EXPORTER_OTLP_ENDPOINT` eg "https://api.honeycomb.io/" 26 + /// - `OTEL_EXPORTER_OTLP_HEADERS` eg "x-honeycomb-team=supersecret" 27 + /// - `OTEL_SERVICE_NAME` eg "my-app" 28 + #[arg(long, action, global = true, env = "ALLEGEDLY_ENABLE_OTEL")] 29 + pub enable_opentelemetry: bool, 30 + } 31 + 32 + pub fn bin_init(enable_otlp: bool) { 33 + let filter = tracing_subscriber::EnvFilter::builder() 34 + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) 35 + .from_env_lossy(); 36 + 37 + let stderr_log = tracing_subscriber::fmt::layer() 38 + .with_writer(std::io::stderr) 39 + .pretty(); 40 + 41 + let otel = if enable_otlp { 42 + Some(instrumentation::otel_layer()) 43 + } else { 44 + None 45 + }; 46 + 47 + let subscriber = tracing_subscriber::Registry::default() 48 + .with(filter) 49 + .with(stderr_log) 50 + .with(otel); 51 + 52 + tracing::subscriber::set_global_default(subscriber).expect("to set global tracing subscriber"); 53 + } 54 + 17 55 #[allow(dead_code)] 18 56 fn main() { 19 57 panic!("this is not actually a module")
-13
src/lib.rs
··· 145 145 env!("CARGO_PKG_VERSION"), 146 146 ) 147 147 } 148 - 149 - pub fn bin_init(name: &str) { 150 - if std::env::var_os("RUST_LOG").is_none() { 151 - unsafe { std::env::set_var("RUST_LOG", "info") }; 152 - } 153 - let filter = tracing_subscriber::EnvFilter::from_default_env(); 154 - tracing_subscriber::fmt() 155 - .with_writer(std::io::stderr) 156 - .with_env_filter(filter) 157 - .init(); 158 - 159 - log::info!("{}", logo(name)); 160 - }