Rust AppView - highly experimental!

fix: tests

+43 -37
+3
consumer/src/database_writer/operations/executor.rs
··· 13 13 //! This makes stats idempotent - duplicate events won't double-count. 14 14 15 15 use super::{cache, ActorStatsDeltas, DatabaseOperation, PostStatsDeltas}; 16 + use eyre::WrapErr as _; 16 17 use ipld_core::cid::Cid; 17 18 18 19 /// Helper to create a single post stats delta ··· 773 774 cid, 774 775 record, 775 776 } => { 777 + // Lists use arbitrary string rkeys (not TID-based like posts) 776 778 let inserted = db::list_upsert(conn, actor_id, &rkey, cid, record).await?; 777 779 if inserted { 778 780 let actor_deltas = actor_list_delta(actor_id, 1); ··· 1093 1095 // Lists use arbitrary string rkeys - extract from at_uri 1094 1096 let rkey_str = parakeet_db::at_uri_util::extract_rkey(&at_uri) 1095 1097 .ok_or_else(|| eyre::eyre!("Invalid at_uri: missing rkey"))?; 1098 + 1096 1099 let rows = db::list_delete(conn, actor_id, rkey_str).await?; 1097 1100 if rows > 0 { 1098 1101 actor_deltas.extend(actor_list_delta(actor_id, -1));
+12 -8
consumer/src/db/operations/actor_update.rs
··· 264 264 } 265 265 ActorUpdateTarget::ByDid(did) => { 266 266 // Resolve DID to ID first 267 - let actor_id = super::super::actor_id_from_did(conn, did) 268 - .await? 269 - .ok_or_else(|| eyre::eyre!("Actor not found for DID: {}", did))?; 267 + let actor_id_opt = super::super::actor_id_from_did(conn, did).await?; 270 268 271 - let clause = "id = $1".to_string(); 272 - let params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = 273 - vec![Box::new(actor_id)]; 274 - Ok((clause, params)) 269 + if let Some(actor_id) = actor_id_opt { 270 + let clause = "id = $1".to_string(); 271 + let params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = 272 + vec![Box::new(actor_id)]; 273 + Ok((clause, params)) 274 + } else { 275 + // Actor doesn't exist - return WHERE FALSE so UPDATE returns 0 rows 276 + // This allows the caller to handle the not-found case (e.g., actor_upsert does INSERT) 277 + Ok(("FALSE".to_string(), vec![])) 278 + } 275 279 } 276 280 ActorUpdateTarget::Batch { ids } => { 277 281 if ids.is_empty() { ··· 397 401 params.push(Box::new(*embed_post_rkey)); 398 402 param_idx += 1; 399 403 400 - clauses.push(format!("status_thumb_mime_type = ${}", param_idx)); 404 + clauses.push(format!("status_thumb_mime_type = ${}::text::image_mime_type", param_idx)); 401 405 params.push(Box::new(thumb_mime_type.clone())); 402 406 param_idx += 1; 403 407
+1 -1
consumer/src/db/operations/graph.rs
··· 98 98 pub async fn list_upsert<C: GenericClient>( 99 99 conn: &C, 100 100 actor_id: i32, 101 - rkey: &str, 101 + rkey: &str, // Lists use arbitrary string rkeys (not TID-based like posts) 102 102 cid: Cid, 103 103 rec: AppBskyGraphList, 104 104 ) -> Result<bool> {
+4 -4
consumer/tests/community_starterpack_operations_test.rs
··· 287 287 graph::list_upsert( 288 288 &tx, 289 289 list_owner_id, 290 - parakeet_db::models::tid_to_i64(&extract_rkey(list_uri)).unwrap(), 290 + &extract_rkey(list_uri), 291 291 test_cid(), 292 292 list, 293 293 ) ··· 384 384 graph::list_upsert( 385 385 &tx, 386 386 list_owner_id, 387 - parakeet_db::models::tid_to_i64(&extract_rkey(list_uri)).unwrap(), 387 + &extract_rkey(list_uri), 388 388 test_cid(), 389 389 list, 390 390 ) ··· 492 492 graph::list_upsert( 493 493 &tx, 494 494 list_owner_id, 495 - parakeet_db::models::tid_to_i64(&extract_rkey(list_uri)).unwrap(), 495 + &extract_rkey(list_uri), 496 496 test_cid(), 497 497 list, 498 498 ) ··· 653 653 graph::list_upsert( 654 654 &tx, 655 655 list_owner_id, 656 - parakeet_db::models::tid_to_i64(&extract_rkey(list_uri)).unwrap(), 656 + &extract_rkey(list_uri), 657 657 test_cid(), 658 658 list, 659 659 )
+13 -13
consumer/tests/graph_operations_test.rs
··· 342 342 let (actor_id, _, _) = consumer::db::operations::feed::get_actor_id(&tx, "did:plc:listowner").await?; 343 343 let rkey = "3l7mkz4lmk24a"; 344 344 let rkey_i64 = parakeet_db::models::tid_to_i64(rkey).unwrap(); 345 - let result = graph::list_upsert(&tx, actor_id, rkey_i64, test_cid(), list).await; 345 + let result = graph::list_upsert(&tx, actor_id, rkey, test_cid(), list).await; 346 346 347 347 assert!( 348 348 result.is_ok(), ··· 357 357 "SELECT EXISTS( 358 358 SELECT 1 FROM lists l 359 359 INNER JOIN actors a ON l.actor_id = a.id 360 - WHERE a.did = $1 AND l.rkey = tid_to_i64($2) 360 + WHERE a.did = $1 AND l.rkey = $2 361 361 )", 362 362 &[&"did:plc:listowner", &"3l7mkz4lmk24a"], 363 363 ) ··· 398 398 let (actor_id, _, _) = consumer::db::operations::feed::get_actor_id(&tx, "did:plc:listowner2").await?; 399 399 let rkey = "3l7mkz4lmk24b"; 400 400 let rkey_i64 = parakeet_db::models::tid_to_i64(rkey).unwrap(); 401 - let result1 = graph::list_upsert(&tx, actor_id, rkey_i64, test_cid(), list1).await; 401 + let result1 = graph::list_upsert(&tx, actor_id, rkey, test_cid(), list1).await; 402 402 403 403 assert!(result1.unwrap(), "First insert should return true"); 404 404 ··· 413 413 created_at: Utc::now(), 414 414 }; 415 415 416 - let result2 = graph::list_upsert(&tx, actor_id, rkey_i64, test_cid(), list2).await; 416 + let result2 = graph::list_upsert(&tx, actor_id, rkey, test_cid(), list2).await; 417 417 418 418 assert!(!result2.unwrap(), "Update should return false"); 419 419 ··· 422 422 .query_one( 423 423 "SELECT l.name FROM lists l 424 424 INNER JOIN actors a ON l.actor_id = a.id 425 - WHERE a.did = $1 AND l.rkey = tid_to_i64($2)", 425 + WHERE a.did = $1 AND l.rkey = $2", 426 426 &[&"did:plc:listowner2", &"3l7mkz4lmk24b"], 427 427 ) 428 428 .await ··· 462 462 let (actor_id, _, _) = consumer::db::operations::feed::get_actor_id(&tx, "did:plc:listowner3").await?; 463 463 let rkey = "3l7mkz4lmk24c"; 464 464 let rkey_i64 = parakeet_db::models::tid_to_i64(rkey).unwrap(); 465 - graph::list_upsert(&tx, actor_id, rkey_i64, test_cid(), list) 465 + graph::list_upsert(&tx, actor_id, rkey, test_cid(), list) 466 466 .await 467 467 .wrap_err("Failed to insert list")?; 468 468 469 469 // Delete the list 470 - let result = graph::list_delete(&tx, actor_id, rkey_i64).await; 470 + let result = graph::list_delete(&tx, actor_id,rkey).await; 471 471 472 472 assert!( 473 473 result.is_ok(), ··· 482 482 "SELECT EXISTS( 483 483 SELECT 1 FROM lists l 484 484 INNER JOIN actors a ON l.actor_id = a.id 485 - WHERE a.did = $1 AND l.rkey = tid_to_i64($2) 485 + WHERE a.did = $1 AND l.rkey = $2 486 486 )", 487 487 &[&"did:plc:listowner3", &"3l7mkz4lmk24c"], 488 488 ) ··· 673 673 let list_uri = "at://did:plc:listowner4/app.bsky.graph.list/3l7mkz4lmk24d"; 674 674 let list_rkey = "3l7mkz4lmk24d"; 675 675 let list_rkey_i64 = parakeet_db::models::tid_to_i64(list_rkey).unwrap(); 676 - graph::list_upsert(&tx, list_owner_actor_id, list_rkey_i64, test_cid(), list) 676 + graph::list_upsert(&tx, list_owner_actor_id, list_rkey, test_cid(), list) 677 677 .await 678 678 .wrap_err("Failed to create list")?; 679 679 ··· 786 786 "SELECT l.status::text, l.list_type IS NULL, l.name IS NULL 787 787 FROM lists l 788 788 INNER JOIN actors a ON l.actor_id = a.id 789 - WHERE a.did = $1 AND l.rkey = tid_to_i64($2)", 789 + WHERE a.did = $1 AND l.rkey = $2", 790 790 &[&"did:plc:listowner5", &"3l7mkz4lmk24h"], 791 791 ) 792 792 .await ··· 835 835 let list_uri = "at://did:plc:listowner6/app.bsky.graph.list/3l7mkz4lmk24j"; 836 836 let list_rkey = "3l7mkz4lmk24j"; 837 837 let list_rkey_i64 = parakeet_db::models::tid_to_i64(list_rkey).unwrap(); 838 - graph::list_upsert(&tx, list_owner_actor_id, list_rkey_i64, test_cid(), list) 838 + graph::list_upsert(&tx, list_owner_actor_id, list_rkey, test_cid(), list) 839 839 .await 840 840 .wrap_err("Failed to create list")?; 841 841 ··· 916 916 let list_uri = "at://did:plc:listowner7/app.bsky.graph.list/3l7mkz4lmk24l"; 917 917 let list_rkey = "3l7mkz4lmk24l"; 918 918 let list_rkey_i64 = parakeet_db::models::tid_to_i64(list_rkey).unwrap(); 919 - graph::list_upsert(&tx, list_owner_actor_id, list_rkey_i64, test_cid(), list) 919 + graph::list_upsert(&tx, list_owner_actor_id, list_rkey, test_cid(), list) 920 920 .await 921 921 .wrap_err("Failed to create list")?; 922 922 ··· 1002 1002 let list_uri = "at://did:plc:listowner8/app.bsky.graph.list/3l7mkz4lmk24n"; 1003 1003 let list_rkey = "3l7mkz4lmk24n"; 1004 1004 let list_rkey_i64 = parakeet_db::models::tid_to_i64(list_rkey).unwrap(); 1005 - graph::list_upsert(&tx, list_owner_actor_id, list_rkey_i64, test_cid(), list) 1005 + graph::list_upsert(&tx, list_owner_actor_id, list_rkey, test_cid(), list) 1006 1006 .await 1007 1007 .wrap_err("Failed to create list")?; 1008 1008
+1 -3
consumer/tests/record_exists_queries_test.rs
··· 130 130 let pool = test_pool(); 131 131 let conn = pool.get().await.wrap_err("Failed to get connection")?; 132 132 133 - let rkey_i64 = decode_tid(TEST_TID).unwrap(); 134 - 135 - let result = queries::list_exists(&**conn, "did:plc:test", rkey_i64).await; 133 + let result = queries::list_exists(&**conn, "did:plc:test", TEST_TID).await; 136 134 137 135 assert!( 138 136 result.is_ok(),
+2 -2
consumer/tests/threadgate_enforcement_test.rs
··· 799 799 graph::list_upsert( 800 800 &tx, 801 801 listowner_id, 802 - parakeet_db::models::tid_to_i64(&extract_rkey(list_uri)).unwrap(), 802 + &extract_rkey(list_uri), 803 803 test_cid(), 804 804 list, 805 805 ) ··· 921 921 graph::list_upsert( 922 922 &tx, 923 923 listowner2_id, 924 - parakeet_db::models::tid_to_i64(&extract_rkey(list_uri)).unwrap(), 924 + &extract_rkey(list_uri), 925 925 test_cid(), 926 926 list, 927 927 )
+7 -6
consumer/tests/workers_test.rs
··· 27 27 let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 28 28 let tx = conn.transaction().await.wrap_err("Failed to start transaction")?; 29 29 30 - let dids = vec!["did:plc:alice", "did:plc:bob", "did:plc:charlie"]; 30 + // Use unique DIDs to avoid cache pollution from other tests 31 + let dids = vec!["did:plc:alice_new1", "did:plc:bob_new1", "did:plc:charlie_new1"]; 31 32 let rows_inserted = workers::bulk_ensure_actors(&tx, &dids) 32 33 .await 33 34 .wrap_err("Failed to bulk ensure actors")?; ··· 43 44 let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 44 45 let tx = conn.transaction().await.wrap_err("Failed to start transaction")?; 45 46 46 - // First insert 47 - let dids = vec!["did:plc:alice", "did:plc:bob"]; 47 + // First insert - use unique DIDs to avoid cache pollution 48 + let dids = vec!["did:plc:alice_existing", "did:plc:bob_existing"]; 48 49 workers::bulk_ensure_actors(&tx, &dids) 49 50 .await 50 51 .wrap_err("Failed to bulk ensure actors (first insert)")?; ··· 65 66 let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 66 67 let tx = conn.transaction().await.wrap_err("Failed to start transaction")?; 67 68 68 - // First insert 69 - let dids1 = vec!["did:plc:alice", "did:plc:bob"]; 69 + // First insert - use unique DIDs to avoid cache pollution 70 + let dids1 = vec!["did:plc:alice_mixed", "did:plc:bob_mixed"]; 70 71 workers::bulk_ensure_actors(&tx, &dids1).await.wrap_err("Failed to bulk ensure actors")?; 71 72 72 73 // Second insert with overlap 73 - let dids2 = vec!["did:plc:bob", "did:plc:charlie"]; // bob exists, charlie is new 74 + let dids2 = vec!["did:plc:bob_mixed", "did:plc:charlie_mixed"]; // bob exists, charlie is new 74 75 let result = workers::bulk_ensure_actors(&tx, &dids2).await; 75 76 76 77 assert!(