Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

fix(consumer): a few backfill fixes

- move file creation
- log more error detail in backfill_jobs
- move the annoying unknown record error to debug

Changed files
+25 -12
consumer
+8 -8
consumer/src/backfill/downloader.rs
··· 109 109 Ok(Some(did_doc)) => { 110 110 let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else { 111 111 tracing::warn!("bad DID doc for {did}"); 112 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 112 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did_svc") 113 113 .await 114 114 .unwrap(); 115 115 continue; ··· 132 132 } 133 133 } 134 134 Ok(None) => { 135 - tracing::warn!(did, "bad DID doc"); 135 + tracing::warn!(did, "bad/missing DID doc"); 136 136 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 137 137 .await 138 138 .unwrap(); 139 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 139 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did_doc") 140 140 .await 141 141 .unwrap(); 142 142 } ··· 145 145 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 146 146 .await 147 147 .unwrap(); 148 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 148 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did") 149 149 .await 150 150 .unwrap(); 151 151 } ··· 179 179 Ok(false) => continue, 180 180 Err(e) => { 181 181 tracing::error!(pds, did, "failed to check repo status: {e}"); 182 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 182 + db::backfill_job_write(&mut conn, &did, "failed.resolve.status") 183 183 .await 184 184 .unwrap(); 185 185 continue; ··· 190 190 if let Some(handle) = maybe_handle { 191 191 if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await { 192 192 tracing::error!(pds, did, "failed to resolve handle: {e}"); 193 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 193 + db::backfill_job_write(&mut conn, &did, "failed.resolve.handle") 194 194 .await 195 195 .unwrap(); 196 196 } ··· 253 253 pds: &str, 254 254 did: &str, 255 255 ) -> eyre::Result<Option<(i32, i32)>> { 256 - let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?; 257 - 258 256 let res = http 259 257 .get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={did}")) 260 258 .send() 261 259 .await? 262 260 .error_for_status()?; 261 + 262 + let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?; 263 263 264 264 let headers = res.headers(); 265 265 let ratelimit_rem = header_to_int(headers, "ratelimit-remaining");
+5 -2
consumer/src/backfill/repo.rs
··· 1 1 use super::{ 2 - types::{CarCommitEntry, CarEntry}, 2 + types::{CarCommitEntry, CarEntry, CarRecordEntry}, 3 3 CopyStore, 4 4 }; 5 5 use crate::indexer::records; ··· 54 54 CarEntry::Commit(_) => { 55 55 tracing::warn!("got commit entry that was not in root") 56 56 } 57 - CarEntry::Record(record) => { 57 + CarEntry::Record(CarRecordEntry::Known(record)) => { 58 58 if let Some(path) = mst_nodes.remove(&cid) { 59 59 record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 60 60 } else { 61 61 records.insert(cid, record); 62 62 } 63 + } 64 + CarEntry::Record(CarRecordEntry::Other { ty }) => { 65 + tracing::debug!("repo contains unknown record type: {ty} ({cid})"); 63 66 } 64 67 CarEntry::Mst(mst) => { 65 68 let mut out = Vec::with_capacity(mst.e.len());
+11 -1
consumer/src/backfill/types.rs
··· 8 8 pub enum CarEntry { 9 9 Mst(CarMstEntry), 10 10 Commit(CarCommitEntry), 11 - Record(RecordTypes), 11 + Record(CarRecordEntry), 12 12 } 13 13 14 14 #[derive(Debug, Deserialize)] ··· 33 33 pub rev: String, 34 34 pub prev: Option<Cid>, 35 35 pub sig: ByteBuf, 36 + } 37 + 38 + #[derive(Debug, Deserialize)] 39 + #[serde(untagged)] 40 + pub enum CarRecordEntry { 41 + Known(RecordTypes), 42 + Other { 43 + #[serde(rename = "$type")] 44 + ty: String, 45 + }, 36 46 } 37 47 38 48 #[derive(Debug, Deserialize)]
+1 -1
consumer/src/db/backfill.rs
··· 19 19 status: &str, 20 20 ) -> PgExecResult { 21 21 conn.execute( 22 - "INSERT INTO backfill_jobs (did, status) VALUES ($1, $2)", 22 + "INSERT INTO backfill_jobs (did, status) VALUES ($1, $2) ON CONFLICT (did) DO UPDATE SET status = $2, updated_at = NOW()", 23 23 &[&did, &status], 24 24 ) 25 25 .await