Rust AppView - highly experimental!
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at feat-post-thread-v2 340 lines 12 kB view raw
1use super::{DL_DONE_KEY, PDS_SERVICE_ID}; 2use crate::db; 3use chrono::prelude::*; 4use deadpool_postgres::{Client as PgClient, Pool}; 5use did_resolver::Resolver; 6use futures::TryStreamExt; 7use metrics::{counter, histogram}; 8use parakeet_db::types::{ActorStatus, ActorSyncState}; 9use redis::aio::MultiplexedConnection; 10use redis::AsyncTypedCommands; 11use reqwest::header::HeaderMap; 12use reqwest::Client as HttpClient; 13use std::path::{Path, PathBuf}; 14use std::sync::Arc; 15use tokio::sync::watch::Receiver as WatchReceiver; 16use tokio::time::{Duration, Instant}; 17use tokio_postgres::types::Type; 18use tokio_util::io::StreamReader; 19use tokio_util::task::TaskTracker; 20use tracing::instrument; 21 22const BF_RESET_KEY: &str = "bf_download_ratelimit_reset"; 23const BF_REM_KEY: &str = "bf_download_ratelimit_rem"; 24const DL_DUP_KEY: &str = "bf_downloaded"; 25 26pub async fn downloader( 27 mut rc: MultiplexedConnection, 28 pool: Pool, 29 resolver: Arc<Resolver>, 30 tmp_dir: PathBuf, 31 concurrency: usize, 32 buffer: usize, 33 tracker: TaskTracker, 34 stop: WatchReceiver<bool>, 35) { 36 let (tx, rx) = flume::bounded(64); 37 let mut conn = pool.get().await.unwrap(); 38 39 let http = HttpClient::new(); 40 41 for _ in 0..concurrency { 42 tracker.spawn(download_thread( 43 rc.clone(), 44 pool.clone(), 45 resolver.clone(), 46 http.clone(), 47 rx.clone(), 48 tmp_dir.clone(), 49 )); 50 } 51 52 let status_stmt = conn.prepare_typed_cached( 53 "INSERT INTO actors (did, sync_state, last_indexed) VALUES ($1, 'processing', NOW()) ON CONFLICT (did) DO UPDATE SET sync_state = 'processing', last_indexed=NOW()", 54 &[Type::TEXT] 55 ).await.unwrap(); 56 57 loop { 58 if stop.has_changed().unwrap_or(true) { 59 tracing::info!("stopping downloader"); 60 break; 61 } 62 63 if let Ok(count) = rc.llen(DL_DONE_KEY).await { 64 if count > buffer { 65 tracing::info!("waiting due to full buffer"); 66 tokio::time::sleep(Duration::from_secs(5)).await; 67 continue; 68 } 69 } 70 71 let did: String = match rc.lpop("backfill_queue", None).await { 72 Ok(Some(did)) => did, 73 Ok(None) => { 74 tokio::time::sleep(Duration::from_millis(250)).await; 75 continue; 76 } 77 Err(e) => { 78 tracing::error!("failed to get item from backfill queue: {e}"); 79 continue; 80 } 81 }; 82 83 tracing::trace!("resolving repo {did}"); 84 85 // has the repo already been downloaded? 86 if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() { 87 tracing::warn!("skipping duplicate repo {did}"); 88 continue; 89 } 90 91 // check if they're already synced in DB too 92 match db::actor_get_statuses(&mut conn, &did).await { 93 Ok(Some((_, state))) => { 94 if state == ActorSyncState::Synced || state == ActorSyncState::Processing { 95 tracing::warn!("skipping duplicate repo {did}"); 96 continue; 97 } 98 } 99 Ok(None) => {} 100 Err(e) => { 101 tracing::error!(did, "failed to check current repo status: {e}"); 102 db::backfill_job_write(&mut conn, &did, "failed.resolve") 103 .await 104 .unwrap(); 105 } 106 } 107 108 match resolver.resolve_did(&did).await { 109 Ok(Some(did_doc)) => { 110 let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else { 111 tracing::warn!("bad DID doc for {did}"); 112 db::backfill_job_write(&mut conn, &did, "failed.resolve") 113 .await 114 .unwrap(); 115 continue; 116 }; 117 let service = service.service_endpoint.clone(); 118 119 // set the repo to processing 120 if let Err(e) = conn.execute(&status_stmt, &[&did]).await { 121 tracing::error!("failed to update repo status for {did}: {e}"); 122 continue; 123 } 124 125 let handle = did_doc 126 .also_known_as 127 .and_then(|akas| akas.first().map(|v| v[5..].to_owned())); 128 129 tracing::trace!("resolved repo {did} {service}"); 130 if let Err(e) = tx.send_async((service, did, handle)).await { 131 tracing::error!("failed to send: {e}"); 132 } 133 } 134 Ok(None) => { 135 tracing::warn!(did, "bad DID doc"); 136 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 137 .await 138 .unwrap(); 139 db::backfill_job_write(&mut conn, &did, "failed.resolve") 140 .await 141 .unwrap(); 142 } 143 Err(e) => { 144 tracing::error!(did, "failed to resolve DID doc: {e}"); 145 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 146 .await 147 .unwrap(); 148 db::backfill_job_write(&mut conn, &did, "failed.resolve") 149 .await 150 .unwrap(); 151 } 152 } 153 } 154} 155 156async fn download_thread( 157 mut rc: MultiplexedConnection, 158 pool: Pool, 159 resolver: Arc<Resolver>, 160 http: reqwest::Client, 161 rx: flume::Receiver<(String, String, Option<String>)>, 162 tmp_dir: PathBuf, 163) { 164 tracing::debug!("spawning thread"); 165 166 // this will return Err(_) and exit when all senders (only held above) are dropped 167 while let Ok((pds, did, maybe_handle)) = rx.recv_async().await { 168 if let Err(e) = enforce_ratelimit(&mut rc, &pds).await { 169 tracing::error!("ratelimiter error: {e}"); 170 continue; 171 }; 172 173 { 174 tracing::trace!("getting DB conn..."); 175 let mut conn = pool.get().await.unwrap(); 176 tracing::trace!("got DB conn..."); 177 match check_and_update_repo_status(&http, &mut conn, &pds, &did).await { 178 Ok(true) => {} 179 Ok(false) => continue, 180 Err(e) => { 181 tracing::error!(pds, did, "failed to check repo status: {e}"); 182 db::backfill_job_write(&mut conn, &did, "failed.resolve") 183 .await 184 .unwrap(); 185 continue; 186 } 187 } 188 189 tracing::debug!("trying to resolve handle..."); 190 if let Some(handle) = maybe_handle { 191 if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await { 192 tracing::error!(pds, did, "failed to resolve handle: {e}"); 193 db::backfill_job_write(&mut conn, &did, "failed.resolve") 194 .await 195 .unwrap(); 196 } 197 } 198 } 199 200 let start = Instant::now(); 201 202 tracing::trace!("downloading repo {did}"); 203 204 match download_car(&http, &tmp_dir, &pds, &did).await { 205 Ok(Some((rem, reset))) => { 206 let _ = rc.zadd(BF_REM_KEY, &pds, rem).await; 207 let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await; 208 } 209 Ok(_) => tracing::warn!(pds, "got response with no ratelimit headers."), 210 Err(e) => { 211 tracing::error!(pds, did, "failed to download repo: {e}"); 212 continue; 213 } 214 } 215 216 histogram!("backfill_download_dur", "pds" => pds).record(start.elapsed().as_secs_f64()); 217 218 let _ = rc.sadd(DL_DUP_KEY, &did).await; 219 if let Err(e) = rc.rpush(DL_DONE_KEY, &did).await { 220 tracing::error!(did, "failed to mark download complete: {e}"); 221 } else { 222 counter!("backfill_downloaded").increment(1); 223 } 224 } 225 226 tracing::debug!("thread exiting"); 227} 228 229async fn enforce_ratelimit(rc: &mut MultiplexedConnection, pds: &str) -> eyre::Result<()> { 230 let score = rc.zscore(BF_REM_KEY, pds).await?; 231 232 if let Some(rem) = score { 233 if (rem as i32) < 100 { 234 // if we've got None for some reason, just hope that the next req will contain the reset header. 235 if let Some(at) = rc.zscore(BF_RESET_KEY, pds).await? { 236 tracing::debug!("rate limit for {pds} resets at {at}"); 237 let time = chrono::DateTime::from_timestamp(at as i64, 0).unwrap(); 238 let delta = (time - Utc::now()).num_milliseconds().max(0); 239 240 tokio::time::sleep(Duration::from_millis(delta as u64)).await; 241 }; 242 } 243 } 244 245 Ok(()) 246} 247 248// you wouldn't... 249#[instrument(skip(http, tmp_dir, pds))] 250async fn download_car( 251 http: &HttpClient, 252 tmp_dir: &Path, 253 pds: &str, 254 did: &str, 255) -> eyre::Result<Option<(i32, i32)>> { 256 let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?; 257 258 let res = http 259 .get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={did}")) 260 .send() 261 .await? 262 .error_for_status()?; 263 264 let headers = res.headers(); 265 let ratelimit_rem = header_to_int(headers, "ratelimit-remaining"); 266 let ratelimit_reset = header_to_int(headers, "ratelimit-reset"); 267 268 let strm = res.bytes_stream().map_err(std::io::Error::other); 269 let mut reader = StreamReader::new(strm); 270 271 tokio::io::copy(&mut reader, &mut file).await?; 272 273 Ok(ratelimit_rem.zip(ratelimit_reset)) 274} 275 276// there's no ratelimit handling here because we pretty much always call download_car after. 277#[instrument(skip(http, conn, pds))] 278async fn check_and_update_repo_status( 279 http: &HttpClient, 280 conn: &mut PgClient, 281 pds: &str, 282 repo: &str, 283) -> eyre::Result<bool> { 284 match super::check_pds_repo_status(http, pds, repo).await? { 285 Some(status) => { 286 if !status.active { 287 tracing::debug!("repo is inactive"); 288 289 let status = status 290 .status 291 .unwrap_or(crate::firehose::AtpAccountStatus::Deleted); 292 conn.execute( 293 "UPDATE actors SET sync_state='dirty', status=$2 WHERE did=$1", 294 &[&repo, &ActorStatus::from(status)], 295 ) 296 .await?; 297 298 Ok(false) 299 } else { 300 Ok(true) 301 } 302 } 303 None => { 304 // this repo can't be found - set dirty and assume deleted. 305 tracing::debug!("repo was deleted"); 306 conn.execute( 307 "UPDATE actors SET sync_state='dirty', status='deleted' WHERE did=$1", 308 &[&repo], 309 ) 310 .await?; 311 312 Ok(false) 313 } 314 } 315} 316 317async fn resolve_and_set_handle( 318 conn: &PgClient, 319 resolver: &Resolver, 320 did: &str, 321 handle: &str, 322) -> eyre::Result<()> { 323 if let Some(handle_did) = resolver.resolve_handle(handle).await? { 324 if handle_did == did { 325 conn.execute("UPDATE actors SET handle=$2 WHERE did=$1", &[&did, &handle]) 326 .await?; 327 } else { 328 tracing::warn!("requested DID ({did}) doesn't match handle"); 329 } 330 } 331 332 Ok(()) 333} 334 335fn header_to_int(headers: &HeaderMap, name: &str) -> Option<i32> { 336 headers 337 .get(name) 338 .and_then(|v| v.to_str().ok()) 339 .and_then(|v| v.parse().ok()) 340}