I've been saying "PDSes seem easy enough, they're what, some CRUD to a db? I can do that in my sleep". well i'm sleeping rn so let's go

misc genesis fixes

+15
.sqlx/query-3b791fdb8e29043c980963d4d18e1e492c73c39818a8648a7af70555418fb5d1.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Left": [ 8 + "TextArray", 9 + "Int8" 10 + ] 11 + }, 12 + "nullable": [] 13 + }, 14 + "hash": "3b791fdb8e29043c980963d4d18e1e492c73c39818a8648a7af70555418fb5d1" 15 + }
-22
.sqlx/query-3fae97c8a2551c1ef8db06c4cde5480e44c5f771397e01574d0026e5bac6af55.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT r.repo_root_cid\n FROM repos r\n JOIN users u ON u.id = r.user_id\n WHERE u.did = $1\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "repo_root_cid", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "3fae97c8a2551c1ef8db06c4cde5480e44c5f771397e01574d0026e5bac6af55" 22 - }
-22
.sqlx/query-5b692e8f6d32dcbdcb45a3fff152a2be5672aadd807a4abab6914f80d57cba02.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "\n SELECT r.repo_root_cid\n FROM repos r\n JOIN users u ON r.user_id = u.id\n WHERE u.did = $1\n ", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "repo_root_cid", 9 - "type_info": "Text" 10 - } 11 - ], 12 - "parameters": { 13 - "Left": [ 14 - "Text" 15 - ] 16 - }, 17 - "nullable": [ 18 - false 19 - ] 20 - }, 21 - "hash": "5b692e8f6d32dcbdcb45a3fff152a2be5672aadd807a4abab6914f80d57cba02" 22 - }
-28
.sqlx/query-933f6585efdafedc82a8b6ac3c1513f25459bd9ab08e385ebc929469666d7747.json
··· 1 - { 2 - "db_name": "PostgreSQL", 3 - "query": "SELECT id, deactivated_at FROM users WHERE did = $1", 4 - "describe": { 5 - "columns": [ 6 - { 7 - "ordinal": 0, 8 - "name": "id", 9 - "type_info": "Uuid" 10 - }, 11 - { 12 - "ordinal": 1, 13 - "name": "deactivated_at", 14 - "type_info": "Timestamptz" 15 - } 16 - ], 17 - "parameters": { 18 - "Left": [ 19 - "Text" 20 - ] 21 - }, 22 - "nullable": [ 23 - false, 24 - true 25 - ] 26 - }, 27 - "hash": "933f6585efdafedc82a8b6ac3c1513f25459bd9ab08e385ebc929469666d7747" 28 - }
+32
.sqlx/query-d65ebbc09a5756438063cb6eaf8284f17beeedde25d4f41dd6788d9c60d162f7.json
··· 1 + { 2 + "db_name": "PostgreSQL", 3 + "query": "\n SELECT seq, did, commit_cid\n FROM repo_seq\n WHERE event_type = 'commit'\n AND prev_cid IS NULL\n AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0)\n ", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "ordinal": 0, 8 + "name": "seq", 9 + "type_info": "Int8" 10 + }, 11 + { 12 + "ordinal": 1, 13 + "name": "did", 14 + "type_info": "Text" 15 + }, 16 + { 17 + "ordinal": 2, 18 + "name": "commit_cid", 19 + "type_info": "Text" 20 + } 21 + ], 22 + "parameters": { 23 + "Left": [] 24 + }, 25 + "nullable": [ 26 + false, 27 + false, 28 + true 29 + ] 30 + }, 31 + "hash": "d65ebbc09a5756438063cb6eaf8284f17beeedde25d4f41dd6788d9c60d162f7" 32 + }
+1 -1
src/api/identity/account.rs
··· 996 996 warn!("Failed to sequence account event for {}: {}", did, e); 997 997 } 998 998 if let Err(e) = 999 - crate::api::repo::record::sequence_empty_commit_event(&state, &did).await 999 + crate::api::repo::record::sequence_genesis_commit(&state, &did, &commit_cid, &mst_root, &rev_str).await 1000 1000 { 1001 1001 warn!("Failed to sequence commit event for {}: {}", did, e); 1002 1002 }
+12 -13
src/api/repo/record/utils.rs
··· 512 512 Ok(seq_row.seq) 513 513 } 514 514 515 - pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> { 516 - let repo_info = sqlx::query!( 517 - "SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", 518 - did 519 - ) 520 - .fetch_optional(&state.db) 521 - .await 522 - .map_err(|e| format!("DB Error fetching repo root: {}", e))? 523 - .ok_or_else(|| "Repo not found".to_string())?; 515 + pub async fn sequence_genesis_commit( 516 + state: &AppState, 517 + did: &str, 518 + commit_cid: &Cid, 519 + mst_root_cid: &Cid, 520 + rev: &str, 521 + ) -> Result<i64, String> { 524 522 let ops = serde_json::json!([]); 525 523 let blobs: Vec<String> = vec![]; 526 - let blocks_cids: Vec<String> = vec![]; 524 + let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 527 525 let prev_cid: Option<&str> = None; 526 + let commit_cid_str = commit_cid.to_string(); 528 527 let seq_row = sqlx::query!( 529 528 r#" 530 529 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev) ··· 532 531 RETURNING seq 533 532 "#, 534 533 did, 535 - repo_info.repo_root_cid, 534 + commit_cid_str, 536 535 prev_cid, 537 536 ops, 538 537 &blobs, 539 538 &blocks_cids, 540 - repo_info.repo_rev 539 + rev 541 540 ) 542 541 .fetch_one(&state.db) 543 542 .await 544 - .map_err(|e| format!("DB Error (repo_seq empty commit): {}", e))?; 543 + .map_err(|e| format!("DB Error (repo_seq genesis commit): {}", e))?; 545 544 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 546 545 .execute(&state.db) 547 546 .await
+2 -1
src/main.rs
··· 5 5 use tracing::{error, info, warn}; 6 6 use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7 7 use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8 - use tranquil_pds::scheduled::{backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks}; 8 + use tranquil_pds::scheduled::{backfill_genesis_commit_blocks, backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks}; 9 9 use tranquil_pds::state::AppState; 10 10 11 11 #[tokio::main] ··· 32 32 let backfill_db = state.db.clone(); 33 33 let backfill_block_store = state.block_store.clone(); 34 34 tokio::spawn(async move { 35 + backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()).await; 35 36 backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await; 36 37 backfill_user_blocks(&backfill_db, backfill_block_store).await; 37 38 });
+97
src/scheduled.rs
··· 13 13 use crate::repo::PostgresBlockStore; 14 14 use crate::storage::BlobStorage; 15 15 16 + pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { 17 + let broken_genesis_commits = match sqlx::query!( 18 + r#" 19 + SELECT seq, did, commit_cid 20 + FROM repo_seq 21 + WHERE event_type = 'commit' 22 + AND prev_cid IS NULL 23 + AND (blocks_cids IS NULL OR array_length(blocks_cids, 1) IS NULL OR array_length(blocks_cids, 1) = 0) 24 + "# 25 + ) 26 + .fetch_all(db) 27 + .await 28 + { 29 + Ok(rows) => rows, 30 + Err(e) => { 31 + error!("Failed to query repo_seq for genesis commit backfill: {}", e); 32 + return; 33 + } 34 + }; 35 + 36 + if broken_genesis_commits.is_empty() { 37 + debug!("No genesis commits need blocks_cids backfill"); 38 + return; 39 + } 40 + 41 + info!( 42 + count = broken_genesis_commits.len(), 43 + "Backfilling blocks_cids for genesis commits" 44 + ); 45 + 46 + let mut success = 0; 47 + let mut failed = 0; 48 + 49 + for commit_row in broken_genesis_commits { 50 + let commit_cid_str = match &commit_row.commit_cid { 51 + Some(c) => c.clone(), 52 + None => { 53 + warn!(seq = commit_row.seq, "Genesis commit missing commit_cid"); 54 + failed += 1; 55 + continue; 56 + } 57 + }; 58 + 59 + let commit_cid = match Cid::from_str(&commit_cid_str) { 60 + Ok(c) => c, 61 + Err(_) => { 62 + warn!(seq = commit_row.seq, "Invalid commit CID"); 63 + failed += 1; 64 + continue; 65 + } 66 + }; 67 + 68 + let block = match block_store.get(&commit_cid).await { 69 + Ok(Some(b)) => b, 70 + Ok(None) => { 71 + warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store"); 72 + failed += 1; 73 + continue; 74 + } 75 + Err(e) => { 76 + warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block"); 77 + failed += 1; 78 + continue; 79 + } 80 + }; 81 + 82 + let commit = match Commit::from_cbor(&block) { 83 + Ok(c) => c, 84 + Err(e) => { 85 + warn!(seq = commit_row.seq, error = %e, "Failed to parse commit"); 86 + failed += 1; 87 + continue; 88 + } 89 + }; 90 + 91 + let mst_root_cid = commit.data; 92 + let blocks_cids: Vec<String> = vec![mst_root_cid.to_string(), commit_cid.to_string()]; 93 + 94 + if let Err(e) = sqlx::query!( 95 + "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", 96 + &blocks_cids, 97 + commit_row.seq 98 + ) 99 + .execute(db) 100 + .await 101 + { 102 + warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids"); 103 + failed += 1; 104 + } else { 105 + info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids"); 106 + success += 1; 107 + } 108 + } 109 + 110 + info!(success, failed, "Completed genesis commit blocks_cids backfill"); 111 + } 112 + 16 113 pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { 17 114 let repos_missing_rev = match sqlx::query!( 18 115 "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL"