forked from
parakeet.at/parakeet
Rust AppView - highly experimental!
1use super::{DL_DONE_KEY, PDS_SERVICE_ID};
2use crate::db;
3use chrono::prelude::*;
4use deadpool_postgres::{Client as PgClient, Pool};
5use did_resolver::Resolver;
6use futures::TryStreamExt;
7use metrics::{counter, histogram};
8use parakeet_db::types::{ActorStatus, ActorSyncState};
9use redis::aio::MultiplexedConnection;
10use redis::AsyncTypedCommands;
11use reqwest::header::HeaderMap;
12use reqwest::Client as HttpClient;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use tokio::sync::watch::Receiver as WatchReceiver;
16use tokio::time::{Duration, Instant};
17use tokio_postgres::types::Type;
18use tokio_util::io::StreamReader;
19use tokio_util::task::TaskTracker;
20use tracing::instrument;
21
22const BF_RESET_KEY: &str = "bf_download_ratelimit_reset";
23const BF_REM_KEY: &str = "bf_download_ratelimit_rem";
24const DL_DUP_KEY: &str = "bf_downloaded";
25
26pub async fn downloader(
27 mut rc: MultiplexedConnection,
28 pool: Pool,
29 resolver: Arc<Resolver>,
30 tmp_dir: PathBuf,
31 concurrency: usize,
32 buffer: usize,
33 tracker: TaskTracker,
34 stop: WatchReceiver<bool>,
35) {
36 let (tx, rx) = flume::bounded(64);
37 let mut conn = pool.get().await.unwrap();
38
39 let http = HttpClient::new();
40
41 for _ in 0..concurrency {
42 tracker.spawn(download_thread(
43 rc.clone(),
44 pool.clone(),
45 resolver.clone(),
46 http.clone(),
47 rx.clone(),
48 tmp_dir.clone(),
49 ));
50 }
51
52 let status_stmt = conn.prepare_typed_cached(
53 "INSERT INTO actors (did, sync_state, last_indexed) VALUES ($1, 'processing', NOW()) ON CONFLICT (did) DO UPDATE SET sync_state = 'processing', last_indexed=NOW()",
54 &[Type::TEXT]
55 ).await.unwrap();
56
57 loop {
58 if stop.has_changed().unwrap_or(true) {
59 tracing::info!("stopping downloader");
60 break;
61 }
62
63 if let Ok(count) = rc.llen(DL_DONE_KEY).await {
64 if count > buffer {
65 tracing::info!("waiting due to full buffer");
66 tokio::time::sleep(Duration::from_secs(5)).await;
67 continue;
68 }
69 }
70
71 let did: String = match rc.lpop("backfill_queue", None).await {
72 Ok(Some(did)) => did,
73 Ok(None) => {
74 tokio::time::sleep(Duration::from_millis(250)).await;
75 continue;
76 }
77 Err(e) => {
78 tracing::error!("failed to get item from backfill queue: {e}");
79 continue;
80 }
81 };
82
83 tracing::trace!("resolving repo {did}");
84
85 // has the repo already been downloaded?
86 if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() {
87 tracing::warn!("skipping duplicate repo {did}");
88 continue;
89 }
90
91 // check if they're already synced in DB too
92 match db::actor_get_statuses(&mut conn, &did).await {
93 Ok(Some((_, state))) => {
94 if state == ActorSyncState::Synced || state == ActorSyncState::Processing {
95 tracing::warn!("skipping duplicate repo {did}");
96 continue;
97 }
98 }
99 Ok(None) => {}
100 Err(e) => {
101 tracing::error!(did, "failed to check current repo status: {e}");
102 db::backfill_job_write(&mut conn, &did, "failed.resolve")
103 .await
104 .unwrap();
105 }
106 }
107
108 match resolver.resolve_did(&did).await {
109 Ok(Some(did_doc)) => {
110 let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else {
111 tracing::warn!("bad DID doc for {did}");
112 db::backfill_job_write(&mut conn, &did, "failed.resolve")
113 .await
114 .unwrap();
115 continue;
116 };
117 let service = service.service_endpoint.clone();
118
119 // set the repo to processing
120 if let Err(e) = conn.execute(&status_stmt, &[&did]).await {
121 tracing::error!("failed to update repo status for {did}: {e}");
122 continue;
123 }
124
125 let handle = did_doc
126 .also_known_as
127 .and_then(|akas| akas.first().map(|v| v[5..].to_owned()));
128
129 tracing::trace!("resolved repo {did} {service}");
130 if let Err(e) = tx.send_async((service, did, handle)).await {
131 tracing::error!("failed to send: {e}");
132 }
133 }
134 Ok(None) => {
135 tracing::warn!(did, "bad DID doc");
136 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now())
137 .await
138 .unwrap();
139 db::backfill_job_write(&mut conn, &did, "failed.resolve")
140 .await
141 .unwrap();
142 }
143 Err(e) => {
144 tracing::error!(did, "failed to resolve DID doc: {e}");
145 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now())
146 .await
147 .unwrap();
148 db::backfill_job_write(&mut conn, &did, "failed.resolve")
149 .await
150 .unwrap();
151 }
152 }
153 }
154}
155
156async fn download_thread(
157 mut rc: MultiplexedConnection,
158 pool: Pool,
159 resolver: Arc<Resolver>,
160 http: reqwest::Client,
161 rx: flume::Receiver<(String, String, Option<String>)>,
162 tmp_dir: PathBuf,
163) {
164 tracing::debug!("spawning thread");
165
166 // this will return Err(_) and exit when all senders (only held above) are dropped
167 while let Ok((pds, did, maybe_handle)) = rx.recv_async().await {
168 if let Err(e) = enforce_ratelimit(&mut rc, &pds).await {
169 tracing::error!("ratelimiter error: {e}");
170 continue;
171 };
172
173 {
174 tracing::trace!("getting DB conn...");
175 let mut conn = pool.get().await.unwrap();
176 tracing::trace!("got DB conn...");
177 match check_and_update_repo_status(&http, &mut conn, &pds, &did).await {
178 Ok(true) => {}
179 Ok(false) => continue,
180 Err(e) => {
181 tracing::error!(pds, did, "failed to check repo status: {e}");
182 db::backfill_job_write(&mut conn, &did, "failed.resolve")
183 .await
184 .unwrap();
185 continue;
186 }
187 }
188
189 tracing::debug!("trying to resolve handle...");
190 if let Some(handle) = maybe_handle {
191 if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await {
192 tracing::error!(pds, did, "failed to resolve handle: {e}");
193 db::backfill_job_write(&mut conn, &did, "failed.resolve")
194 .await
195 .unwrap();
196 }
197 }
198 }
199
200 let start = Instant::now();
201
202 tracing::trace!("downloading repo {did}");
203
204 match download_car(&http, &tmp_dir, &pds, &did).await {
205 Ok(Some((rem, reset))) => {
206 let _ = rc.zadd(BF_REM_KEY, &pds, rem).await;
207 let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await;
208 }
209 Ok(_) => tracing::warn!(pds, "got response with no ratelimit headers."),
210 Err(e) => {
211 tracing::error!(pds, did, "failed to download repo: {e}");
212 continue;
213 }
214 }
215
216 histogram!("backfill_download_dur", "pds" => pds).record(start.elapsed().as_secs_f64());
217
218 let _ = rc.sadd(DL_DUP_KEY, &did).await;
219 if let Err(e) = rc.rpush(DL_DONE_KEY, &did).await {
220 tracing::error!(did, "failed to mark download complete: {e}");
221 } else {
222 counter!("backfill_downloaded").increment(1);
223 }
224 }
225
226 tracing::debug!("thread exiting");
227}
228
229async fn enforce_ratelimit(rc: &mut MultiplexedConnection, pds: &str) -> eyre::Result<()> {
230 let score = rc.zscore(BF_REM_KEY, pds).await?;
231
232 if let Some(rem) = score {
233 if (rem as i32) < 100 {
234 // if we've got None for some reason, just hope that the next req will contain the reset header.
235 if let Some(at) = rc.zscore(BF_RESET_KEY, pds).await? {
236 tracing::debug!("rate limit for {pds} resets at {at}");
237 let time = chrono::DateTime::from_timestamp(at as i64, 0).unwrap();
238 let delta = (time - Utc::now()).num_milliseconds().max(0);
239
240 tokio::time::sleep(Duration::from_millis(delta as u64)).await;
241 };
242 }
243 }
244
245 Ok(())
246}
247
248// you wouldn't...
249#[instrument(skip(http, tmp_dir, pds))]
250async fn download_car(
251 http: &HttpClient,
252 tmp_dir: &Path,
253 pds: &str,
254 did: &str,
255) -> eyre::Result<Option<(i32, i32)>> {
256 let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?;
257
258 let res = http
259 .get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={did}"))
260 .send()
261 .await?
262 .error_for_status()?;
263
264 let headers = res.headers();
265 let ratelimit_rem = header_to_int(headers, "ratelimit-remaining");
266 let ratelimit_reset = header_to_int(headers, "ratelimit-reset");
267
268 let strm = res.bytes_stream().map_err(std::io::Error::other);
269 let mut reader = StreamReader::new(strm);
270
271 tokio::io::copy(&mut reader, &mut file).await?;
272
273 Ok(ratelimit_rem.zip(ratelimit_reset))
274}
275
276// there's no ratelimit handling here because we pretty much always call download_car after.
277#[instrument(skip(http, conn, pds))]
278async fn check_and_update_repo_status(
279 http: &HttpClient,
280 conn: &mut PgClient,
281 pds: &str,
282 repo: &str,
283) -> eyre::Result<bool> {
284 match super::check_pds_repo_status(http, pds, repo).await? {
285 Some(status) => {
286 if !status.active {
287 tracing::debug!("repo is inactive");
288
289 let status = status
290 .status
291 .unwrap_or(crate::firehose::AtpAccountStatus::Deleted);
292 conn.execute(
293 "UPDATE actors SET sync_state='dirty', status=$2 WHERE did=$1",
294 &[&repo, &ActorStatus::from(status)],
295 )
296 .await?;
297
298 Ok(false)
299 } else {
300 Ok(true)
301 }
302 }
303 None => {
304 // this repo can't be found - set dirty and assume deleted.
305 tracing::debug!("repo was deleted");
306 conn.execute(
307 "UPDATE actors SET sync_state='dirty', status='deleted' WHERE did=$1",
308 &[&repo],
309 )
310 .await?;
311
312 Ok(false)
313 }
314 }
315}
316
317async fn resolve_and_set_handle(
318 conn: &PgClient,
319 resolver: &Resolver,
320 did: &str,
321 handle: &str,
322) -> eyre::Result<()> {
323 if let Some(handle_did) = resolver.resolve_handle(handle).await? {
324 if handle_did == did {
325 conn.execute("UPDATE actors SET handle=$2 WHERE did=$1", &[&did, &handle])
326 .await?;
327 } else {
328 tracing::warn!("requested DID ({did}) doesn't match handle");
329 }
330 }
331
332 Ok(())
333}
334
335fn header_to_int(headers: &HeaderMap, name: &str) -> Option<i32> {
336 headers
337 .get(name)
338 .and_then(|v| v.to_str().ok())
339 .and_then(|v| v.parse().ok())
340}