Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client
1use deadpool_postgres::Runtime;
2use did_resolver::{Resolver, ResolverOpts};
3use eyre::OptionExt;
4use metrics_exporter_prometheus::PrometheusBuilder;
5use std::sync::Arc;
6use tokio::signal::ctrl_c;
7use tokio_postgres::NoTls;
8
9mod backfill;
10mod cmd;
11mod config;
12mod db;
13mod firehose;
14mod indexer;
15mod instrumentation;
16mod label_indexer;
17mod utils;
18
19#[tokio::main]
20async fn main() -> eyre::Result<()> {
21 PrometheusBuilder::new().install()?;
22
23 let cli = cmd::parse();
24 let conf = config::load_config()?;
25
26 instrumentation::init_instruments(&conf.instruments);
27 let user_agent = build_ua(&conf.ua_contact);
28
29 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
30
31 let (redis_conn, redis_fut) = redis::Client::open(conf.redis_uri)?
32 .create_multiplexed_tokio_connection()
33 .await?;
34 tokio::spawn(async {
35 redis_fut.await;
36 Ok::<_, eyre::Report>(())
37 });
38
39 let resolver = Arc::new(Resolver::new(ResolverOpts {
40 plc_directory: conf.plc_directory,
41 user_agent: Some(user_agent.clone()),
42 ..Default::default()
43 })?);
44
45 let index_client = parakeet_index::Client::connect(conf.index_uri).await?;
46
47 let tracker = tokio_util::task::TaskTracker::new();
48 let (stop_tx, stop) = tokio::sync::watch::channel(false);
49
50 let resume = (cli.labels || cli.indexer)
51 .then::<Result<_, eyre::Report>, _>(|| {
52 let resume_path = conf.resume_path.ok_or_eyre(
53 "Config item resume_path must be specified when using --indexer or --labels",
54 )?;
55
56 let db = sled::open(resume_path)?;
57
58 Ok(db)
59 })
60 .transpose()?;
61
62 if cli.labels {
63 let resume = resume.clone().unwrap();
64
65 let label_mgr =
66 label_indexer::LabelServiceManager::new(pool.clone(), resume, user_agent.clone())
67 .await?;
68
69 if let Some(label_source) = conf.label_source {
70 tracker.spawn(label_mgr.run(label_source, stop.clone()));
71 }
72 }
73
74 if cli.backfill {
75 let bf_cfg = conf
76 .backfill
77 .ok_or_eyre("Config item [backfill] must be specified when using --backfill")?;
78
79 let backfiller = backfill::BackfillManager::new(
80 pool.clone(),
81 redis_conn.clone(),
82 resolver.clone(),
83 (!bf_cfg.skip_aggregation).then_some(index_client.clone()),
84 bf_cfg,
85 )
86 .await?;
87
88 tracker.spawn(backfiller.run(stop.clone()));
89 }
90
91 if cli.indexer {
92 let resume = resume.clone().unwrap();
93
94 let indexer_cfg = conf
95 .indexer
96 .ok_or_eyre("Config item [indexer] must be specified when using --indexer")?;
97
98 let (idxc_tx, idxc_rx) = tokio::sync::mpsc::channel(128);
99
100 let start_seq = resume
101 .get("firehose")?
102 .and_then(utils::u64_from_ivec)
103 .or(indexer_cfg.start_commit_seq);
104
105 if let Some(start_seq) = start_seq {
106 tracing::info!("starting firehose consumer from {start_seq}");
107 }
108
109 let relay_firehose = firehose::FirehoseConsumer::new_relay(
110 &indexer_cfg.relay_source,
111 start_seq,
112 &user_agent,
113 )
114 .await?;
115
116 let indexer_opts = indexer::RelayIndexerOpts {
117 history_mode: indexer_cfg.history_mode,
118 skip_handle_validation: indexer_cfg.skip_handle_validation,
119 request_backfill: indexer_cfg.request_backfill,
120 };
121
122 let relay_indexer = indexer::RelayIndexer::new(
123 pool.clone(),
124 redis_conn.clone(),
125 idxc_tx,
126 resolver.clone(),
127 relay_firehose,
128 resume,
129 indexer_opts,
130 )
131 .await?;
132
133 tracker.spawn(relay_indexer.run(indexer_cfg.workers, stop));
134 tracker.spawn(index_transport(index_client, idxc_rx));
135 }
136
137 tokio::spawn(async move {
138 let _ = ctrl_c().await;
139 tracing::info!("stopping consumer");
140 stop_tx.send(true).unwrap();
141 });
142
143 tracker.close();
144 tracker.wait().await;
145
146 Ok(())
147}
148
149async fn index_transport(
150 mut idxc: parakeet_index::Client,
151 rx: tokio::sync::mpsc::Receiver<parakeet_index::AggregateDeltaReq>,
152) -> eyre::Result<()> {
153 use tokio_stream::wrappers::ReceiverStream;
154
155 idxc.submit_aggregate_delta_stream(ReceiverStream::new(rx))
156 .await?;
157
158 Ok(())
159}
160
161fn build_ua(contact: &Option<String>) -> String {
162 let mut ua = format!("Parakeet {}", env!("CARGO_PKG_VERSION"));
163
164 if let Some(contact) = contact {
165 ua += &format!(" ({contact})");
166 }
167
168 ua
169}