Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client
at main 4.6 kB view raw
1use deadpool_postgres::Runtime; 2use did_resolver::{Resolver, ResolverOpts}; 3use eyre::OptionExt; 4use metrics_exporter_prometheus::PrometheusBuilder; 5use std::sync::Arc; 6use tokio::signal::ctrl_c; 7use tokio_postgres::NoTls; 8 9mod backfill; 10mod cmd; 11mod config; 12mod db; 13mod firehose; 14mod indexer; 15mod instrumentation; 16mod label_indexer; 17mod utils; 18 19#[tokio::main] 20async fn main() -> eyre::Result<()> { 21 PrometheusBuilder::new().install()?; 22 23 let cli = cmd::parse(); 24 let conf = config::load_config()?; 25 26 instrumentation::init_instruments(&conf.instruments); 27 let user_agent = build_ua(&conf.ua_contact); 28 29 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?; 30 31 let (redis_conn, redis_fut) = redis::Client::open(conf.redis_uri)? 32 .create_multiplexed_tokio_connection() 33 .await?; 34 tokio::spawn(async { 35 redis_fut.await; 36 Ok::<_, eyre::Report>(()) 37 }); 38 39 let resolver = Arc::new(Resolver::new(ResolverOpts { 40 plc_directory: conf.plc_directory, 41 user_agent: Some(user_agent.clone()), 42 ..Default::default() 43 })?); 44 45 let index_client = parakeet_index::Client::connect(conf.index_uri).await?; 46 47 let tracker = tokio_util::task::TaskTracker::new(); 48 let (stop_tx, stop) = tokio::sync::watch::channel(false); 49 50 let resume = (cli.labels || cli.indexer) 51 .then::<Result<_, eyre::Report>, _>(|| { 52 let resume_path = conf.resume_path.ok_or_eyre( 53 "Config item resume_path must be specified when using --indexer or --labels", 54 )?; 55 56 let db = sled::open(resume_path)?; 57 58 Ok(db) 59 }) 60 .transpose()?; 61 62 if cli.labels { 63 let resume = resume.clone().unwrap(); 64 65 let label_mgr = 66 label_indexer::LabelServiceManager::new(pool.clone(), resume, user_agent.clone()) 67 .await?; 68 69 if let Some(label_source) = conf.label_source { 70 tracker.spawn(label_mgr.run(label_source, stop.clone())); 71 } 72 } 73 74 if cli.backfill { 75 let bf_cfg = conf 76 .backfill 77 .ok_or_eyre("Config item [backfill] must be specified when using --backfill")?; 78 79 let backfiller = backfill::BackfillManager::new( 80 pool.clone(), 81 redis_conn.clone(), 82 resolver.clone(), 83 (!bf_cfg.skip_aggregation).then_some(index_client.clone()), 84 bf_cfg, 85 ) 86 .await?; 87 88 tracker.spawn(backfiller.run(stop.clone())); 89 } 90 91 if cli.indexer { 92 let resume = resume.clone().unwrap(); 93 94 let indexer_cfg = conf 95 .indexer 96 .ok_or_eyre("Config item [indexer] must be specified when using --indexer")?; 97 98 let (idxc_tx, idxc_rx) = tokio::sync::mpsc::channel(128); 99 100 let start_seq = resume 101 .get("firehose")? 102 .and_then(utils::u64_from_ivec) 103 .or(indexer_cfg.start_commit_seq); 104 105 if let Some(start_seq) = start_seq { 106 tracing::info!("starting firehose consumer from {start_seq}"); 107 } 108 109 let relay_firehose = firehose::FirehoseConsumer::new_relay( 110 &indexer_cfg.relay_source, 111 start_seq, 112 &user_agent, 113 ) 114 .await?; 115 116 let indexer_opts = indexer::RelayIndexerOpts { 117 history_mode: indexer_cfg.history_mode, 118 skip_handle_validation: indexer_cfg.skip_handle_validation, 119 request_backfill: indexer_cfg.request_backfill, 120 }; 121 122 let relay_indexer = indexer::RelayIndexer::new( 123 pool.clone(), 124 redis_conn.clone(), 125 idxc_tx, 126 resolver.clone(), 127 relay_firehose, 128 resume, 129 indexer_opts, 130 ) 131 .await?; 132 133 tracker.spawn(relay_indexer.run(indexer_cfg.workers, stop)); 134 tracker.spawn(index_transport(index_client, idxc_rx)); 135 } 136 137 tokio::spawn(async move { 138 let _ = ctrl_c().await; 139 tracing::info!("stopping consumer"); 140 stop_tx.send(true).unwrap(); 141 }); 142 143 tracker.close(); 144 tracker.wait().await; 145 146 Ok(()) 147} 148 149async fn index_transport( 150 mut idxc: parakeet_index::Client, 151 rx: tokio::sync::mpsc::Receiver<parakeet_index::AggregateDeltaReq>, 152) -> eyre::Result<()> { 153 use tokio_stream::wrappers::ReceiverStream; 154 155 idxc.submit_aggregate_delta_stream(ReceiverStream::new(rx)) 156 .await?; 157 158 Ok(()) 159} 160 161fn build_ua(contact: &Option<String>) -> String { 162 let mut ua = format!("Parakeet {}", env!("CARGO_PKG_VERSION")); 163 164 if let Some(contact) = contact { 165 ua += &format!(" ({contact})"); 166 } 167 168 ua 169}