Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
69
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: json log option

mia.omg.lol 7d7ac22f 006eb4a3

verified
+115 -28
+16 -3
Cargo.lock
··· 3141 3141 "quinn-udp", 3142 3142 "rustc-hash 2.1.1", 3143 3143 "rustls", 3144 - "socket2 0.5.8", 3144 + "socket2 0.6.0", 3145 3145 "thiserror 2.0.12", 3146 3146 "tokio", 3147 3147 "tracing", ··· 3178 3178 "cfg_aliases", 3179 3179 "libc", 3180 3180 "once_cell", 3181 - "socket2 0.5.8", 3181 + "socket2 0.6.0", 3182 3182 "tracing", 3183 - "windows-sys 0.52.0", 3183 + "windows-sys 0.59.0", 3184 3184 ] 3185 3185 3186 3186 [[package]] ··· 4638 4638 ] 4639 4639 4640 4640 [[package]] 4641 + name = "tracing-serde" 4642 + version = "0.2.0" 4643 + source = "registry+https://github.com/rust-lang/crates.io-index" 4644 + checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" 4645 + dependencies = [ 4646 + "serde", 4647 + "tracing-core", 4648 + ] 4649 + 4650 + [[package]] 4641 4651 name = "tracing-subscriber" 4642 4652 version = "0.3.19" 4643 4653 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4647 4657 "nu-ansi-term", 4648 4658 "once_cell", 4649 4659 "regex", 4660 + "serde", 4661 + "serde_json", 4650 4662 "sharded-slab", 4651 4663 "smallvec", 4652 4664 "thread_local", 4653 4665 "tracing", 4654 4666 "tracing-core", 4655 4667 "tracing-log", 4668 + "tracing-serde", 4656 4669 ] 4657 4670 4658 4671 [[package]]
+1 -1
consumer/Cargo.toml
··· 36 36 tokio-tungstenite = { version = "0.26.1", features = ["native-tls"] } 37 37 tokio-util = { version = "0.7.14", features = ["io", "rt"] } 38 38 tracing = "0.1.40" 39 - tracing-subscriber = "0.3.18" 39 + tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
+8
consumer/src/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 + #[serde(flatten)] 17 + pub instruments: ConfigInstruments, 16 18 pub index_uri: String, 17 19 pub database: deadpool_postgres::Config, 18 20 pub redis_uri: String, ··· 27 29 pub indexer: Option<IndexerConfig>, 28 30 /// Configuration items specific to backfill 29 31 pub backfill: Option<BackfillConfig>, 32 + } 33 + 34 + #[derive(Debug, Deserialize)] 35 + pub struct ConfigInstruments { 36 + #[serde(default)] 37 + pub log_json: bool, 30 38 } 31 39 32 40 #[derive(Debug, Deserialize)]
+25
consumer/src/instrumentation.rs
··· 1 + use tracing::Subscriber; 2 + use tracing_subscriber::filter::Filtered; 3 + use tracing_subscriber::layer::SubscriberExt; 4 + use tracing_subscriber::registry::LookupSpan; 5 + use tracing_subscriber::util::SubscriberInitExt; 6 + use tracing_subscriber::{EnvFilter, Layer}; 7 + 8 + pub fn init_instruments(cfg: &crate::config::ConfigInstruments) { 9 + let log_layer = init_log(cfg.log_json); 10 + 11 + tracing_subscriber::registry().with(log_layer).init(); 12 + } 13 + 14 + fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 15 + where 16 + S: Subscriber + for<'span> LookupSpan<'span>, 17 + { 18 + let stdout_filter = EnvFilter::from_default_env(); 19 + 20 + match json { 21 + true => tracing_subscriber::fmt::layer().json().boxed(), 22 + false => tracing_subscriber::fmt::layer().boxed(), 23 + } 24 + .with_filter(stdout_filter) 25 + }
+2 -1
consumer/src/main.rs
··· 12 12 mod db; 13 13 mod firehose; 14 14 mod indexer; 15 + mod instrumentation; 15 16 mod label_indexer; 16 17 mod utils; 17 18 18 19 #[tokio::main] 19 20 async fn main() -> eyre::Result<()> { 20 - tracing_subscriber::fmt::init(); 21 21 PrometheusBuilder::new().install()?; 22 22 23 23 let cli = cmd::parse(); 24 24 let conf = config::load_config()?; 25 25 26 + instrumentation::init_instruments(&conf.instruments); 26 27 let user_agent = build_ua(&conf.ua_contact); 27 28 28 29 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
+1 -1
parakeet-index/Cargo.toml
··· 24 24 tokio = { version = "1.42.0", features = ["full"], optional = true } 25 25 tonic-health = { version = "0.13.0", optional = true } 26 26 tracing = { version = "0.1.40", optional = true } 27 - tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true } 27 + tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true } 28 28 tracing-opentelemetry = { version = "0.32", optional = true } 29 29 30 30 [build-dependencies]
+2 -2
parakeet-index/src/main.rs
··· 9 9 async fn main() -> eyre::Result<()> { 10 10 let conf = config::load_config()?; 11 11 12 - instrumentation::init_instruments(conf.otel_enable); 12 + instrumentation::init_instruments(&conf.instruments); 13 13 14 14 let db_root = conf.index_db_path.parse()?; 15 15 let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port); ··· 21 21 let service = Service::new(state.clone()); 22 22 23 23 let mw = tower::ServiceBuilder::new() 24 - .option_layer(conf.otel_enable.then(OtelGrpcLayer::default)); 24 + .option_layer(conf.instruments.otel_enable.then(OtelGrpcLayer::default)); 25 25 26 26 Server::builder() 27 27 .layer(mw)
+10 -2
parakeet-index/src/server/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 - #[serde(default)] 17 - pub otel_enable: bool, 16 + #[serde(flatten)] 17 + pub instruments: ConfigInstruments, 18 18 pub database_url: String, 19 19 pub index_db_path: String, 20 20 #[serde(default)] 21 21 pub server: ConfigServer, 22 + } 23 + 24 + #[derive(Debug, Deserialize)] 25 + pub struct ConfigInstruments { 26 + #[serde(default)] 27 + pub otel_enable: bool, 28 + #[serde(default)] 29 + pub log_json: bool, 22 30 } 23 31 24 32 #[derive(Debug, Deserialize)]
+18 -6
parakeet-index/src/server/instrumentation.rs
··· 9 9 use tracing_subscriber::util::SubscriberInitExt; 10 10 use tracing_subscriber::{EnvFilter, Layer}; 11 11 12 - pub fn init_instruments(otel_enable: bool) { 13 - let otel_layer = otel_enable.then(init_otel); 14 - 15 - let stdout_filter = 16 - EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 12 + pub fn init_instruments(cfg: &super::config::ConfigInstruments) { 13 + let otel_layer = cfg.otel_enable.then(init_otel); 14 + let log_layer = init_log(cfg.log_json); 17 15 18 16 tracing_subscriber::registry() 19 - .with(tracing_subscriber::fmt::layer().with_filter(stdout_filter)) 17 + .with(log_layer) 20 18 .with(otel_layer) 21 19 .init(); 22 20 } ··· 43 41 44 42 OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 45 43 } 44 + 45 + fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 46 + where 47 + S: Subscriber + for<'span> LookupSpan<'span>, 48 + { 49 + let stdout_filter = 50 + EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 51 + 52 + match json { 53 + true => tracing_subscriber::fmt::layer().json().boxed(), 54 + false => tracing_subscriber::fmt::layer().boxed(), 55 + } 56 + .with_filter(stdout_filter) 57 + }
+1 -1
parakeet/Cargo.toml
··· 36 36 tower = "0.5" 37 37 tower-http = { version = "0.6.2", features = ["cors", "trace"] } 38 38 tracing = "0.1.40" 39 - tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } 39 + tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } 40 40 tracing-opentelemetry = "0.32"
+10 -2
parakeet/src/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 - #[serde(default)] 17 - pub otel_enable: bool, 16 + #[serde(flatten)] 17 + pub instruments: ConfigInstruments, 18 18 pub index_uri: String, 19 19 pub database_url: String, 20 20 pub redis_uri: String, ··· 29 29 pub did_allowlist: Option<Vec<String>>, 30 30 #[serde(default)] 31 31 pub migrate: bool, 32 + } 33 + 34 + #[derive(Debug, Deserialize)] 35 + pub struct ConfigInstruments { 36 + #[serde(default)] 37 + pub otel_enable: bool, 38 + #[serde(default)] 39 + pub log_json: bool, 32 40 } 33 41 34 42 #[derive(Debug, Deserialize)]
+18 -6
parakeet/src/instrumentation.rs
··· 9 9 use tracing_subscriber::util::SubscriberInitExt; 10 10 use tracing_subscriber::{EnvFilter, Layer}; 11 11 12 - pub fn init_instruments(otel_enable: bool) { 13 - let otel_layer = otel_enable.then(init_otel); 14 - 15 - let stdout_filter = 16 - EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 12 + pub fn init_instruments(cfg: &crate::config::ConfigInstruments) { 13 + let otel_layer = cfg.otel_enable.then(init_otel); 14 + let log_layer = init_log(cfg.log_json); 17 15 18 16 tracing_subscriber::registry() 19 - .with(tracing_subscriber::fmt::layer().with_filter(stdout_filter)) 17 + .with(log_layer) 20 18 .with(otel_layer) 21 19 .init(); 22 20 } ··· 43 41 44 42 OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 45 43 } 44 + 45 + fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 46 + where 47 + S: Subscriber + for<'span> LookupSpan<'span>, 48 + { 49 + let stdout_filter = 50 + EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 51 + 52 + match json { 53 + true => tracing_subscriber::fmt::layer().json().boxed(), 54 + false => tracing_subscriber::fmt::layer().boxed(), 55 + } 56 + .with_filter(stdout_filter) 57 + }
+3 -3
parakeet/src/main.rs
··· 35 35 async fn main() -> eyre::Result<()> { 36 36 let conf = config::load_config()?; 37 37 38 - instrumentation::init_instruments(conf.otel_enable); 38 + instrumentation::init_instruments(&conf.instruments); 39 39 40 40 let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url); 41 41 let pool = Pool::builder(db_mgr).build()?; ··· 84 84 let did_doc = did_web_doc(&conf.service); 85 85 86 86 let mw = tower::ServiceBuilder::new() 87 - .option_layer(conf.otel_enable.then(OtelInResponseLayer::default)) 88 - .option_layer(conf.otel_enable.then(OtelAxumLayer::default)) 87 + .option_layer(conf.instruments.otel_enable.then(OtelInResponseLayer::default)) 88 + .option_layer(conf.instruments.otel_enable.then(OtelAxumLayer::default)) 89 89 .layer(TraceLayer::new_for_http()) 90 90 .layer(cors); 91 91