Rust AppView - highly experimental!

feat: denormalization strategy; test driven development

+116 -100
+15 -6
consumer/src/db/gates/queries.rs
··· 26 26 SELECT id FROM actors WHERE did = $2 27 27 ) 28 28 SELECT 29 - EXISTS(SELECT 1 FROM follows WHERE actor_id = (SELECT id FROM root_actor) AND subject_actor_id = (SELECT id FROM post_actor)) as following, 30 - EXISTS(SELECT 1 FROM follows WHERE actor_id = (SELECT id FROM post_actor) AND subject_actor_id = (SELECT id FROM root_actor)) as followed", 29 + EXISTS( 30 + SELECT 1 FROM actors a, unnest(COALESCE(a.following, ARRAY[]::follow_record[])) AS f 31 + WHERE a.id = (SELECT id FROM root_actor) 32 + AND (f).subject_actor_id = (SELECT id FROM post_actor) 33 + ) as following, 34 + EXISTS( 35 + SELECT 1 FROM actors a, unnest(COALESCE(a.following, ARRAY[]::follow_record[])) AS f 36 + WHERE a.id = (SELECT id FROM post_actor) 37 + AND (f).subject_actor_id = (SELECT id FROM root_actor) 38 + ) as followed", 31 39 &[&root_author, &post_author], 32 40 ) 33 41 .await?; ··· 86 94 SPLIT_PART(SUBSTRING(uri FROM 6), '/', 3) as rkey 87 95 FROM list_uris 88 96 ), 89 - list_ids AS ( 90 - SELECT l.id 97 + list_keys AS ( 98 + SELECT a.id as list_owner_actor_id, lp.rkey as list_rkey 91 99 FROM list_parts lp 92 100 INNER JOIN actors a ON a.did = lp.did 93 101 INNER JOIN lists l ON l.actor_id = a.id AND l.rkey = lp.rkey ··· 95 103 SELECT count(*) 96 104 FROM list_items li 97 105 INNER JOIN actors a ON a.did = $2 98 - WHERE li.list_id IN (SELECT id FROM list_ids) 99 - AND li.subject_actor_id = a.id", 106 + INNER JOIN list_keys lk ON li.list_owner_actor_id = lk.list_owner_actor_id 107 + AND li.list_rkey = lk.list_rkey 108 + WHERE li.subject_actor_id = a.id", 100 109 &[&allow_lists, &post_author], 101 110 ) 102 111 .await?;
+4 -4
consumer/src/db/mod.rs
··· 134 134 ) -> Result<bool> { 135 135 let row = conn 136 136 .query_opt( 137 - "SELECT 1 FROM thread_mutes 138 - WHERE actor_id = $1 139 - AND root_post_actor_id = $2 140 - AND root_post_rkey = $3", 137 + "SELECT 1 FROM actors, unnest(COALESCE(thread_mutes, ARRAY[]::thread_mute_record[])) AS tm 138 + WHERE id = $1 139 + AND (tm).root_post_actor_id = $2 140 + AND (tm).root_post_rkey = $3", 141 141 &[&recipient_actor_id, &root_post_actor_id, &root_post_rkey], 142 142 ) 143 143 .await?;
+36 -28
consumer/src/db/operations/graph.rs
··· 22 22 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("follows", actor_id, rkey); 23 23 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; 24 24 25 + // Check if exact same follow already exists (same subject_actor_id and rkey) 26 + let already_exists = conn 27 + .query_opt( 28 + "SELECT 1 FROM actors, unnest(COALESCE(following, ARRAY[]::follow_record[])) AS f 29 + WHERE id = $1 AND (f).subject_actor_id = $2 AND (f).rkey = $3", 30 + &[&actor_id, &subject_actor_id, &rkey], 31 + ) 32 + .await? 33 + .is_some(); 34 + 35 + if already_exists { 36 + return Ok(0); // Duplicate, no change needed 37 + } 38 + 25 39 // BIDIRECTIONAL UPDATE: Update both following[] and followers[] arrays 26 - // 1. Add to actor's following array 27 - // 2. Add to subject_actor's followers array 40 + // 1. Add to actor's following array - stores (subject_actor_id, rkey) 41 + // 2. Add to subject_actor's followers array - stores (follower_actor_id, rkey) using same follow_record type 28 42 // Returns number of rows updated (should be 2 if both actors exist) 29 43 let rows = conn 30 44 .execute( 31 - "WITH follow_record AS ( 32 - SELECT ROW($2, $3)::follow_record as rec 45 + "WITH follow_rec AS ( 46 + SELECT ROW($3, $2)::follow_record as rec -- (subject_actor_id, rkey) 33 47 ), 34 48 update_following AS ( 35 49 UPDATE actors 36 50 SET following = COALESCE(following, ARRAY[]::follow_record[]) || 37 - (SELECT ARRAY[rec] FROM follow_record) 51 + (SELECT ARRAY[rec] FROM follow_rec) 38 52 WHERE id = $1 39 53 AND NOT EXISTS ( 40 54 SELECT 1 FROM unnest(following) f WHERE (f).rkey = $2 ··· 44 58 update_followers AS ( 45 59 UPDATE actors 46 60 SET followers = COALESCE(followers, ARRAY[]::follow_record[]) || 47 - ARRAY[ROW($1, $2)::follow_record] 61 + ARRAY[ROW($1, $2)::follow_record] -- (follower_actor_id, rkey) 48 62 WHERE id = $3 49 63 AND NOT EXISTS ( 50 - SELECT 1 FROM unnest(followers) f WHERE (f).actor_id = $1 AND (f).rkey = $2 64 + SELECT 1 FROM unnest(followers) f WHERE (f).subject_actor_id = $1 AND (f).rkey = $2 51 65 ) 52 66 RETURNING 1 53 67 ) ··· 58 72 .await?; 59 73 60 74 // Return 1 if at least one array was updated (follow was created) 61 - // Return 0 if neither was updated (duplicate follow) 75 + // Return 0 if neither was updated (shouldn't happen since we checked already_exists) 62 76 Ok(if rows > 0 { 1 } else { 0 }) 63 77 } 64 78 ··· 94 108 UPDATE actors 95 109 SET followers = ARRAY( 96 110 SELECT f FROM unnest(followers) AS f 97 - WHERE NOT ((f).actor_id = $2 AND (f).rkey = $1) 111 + WHERE NOT ((f).subject_actor_id = $2 AND (f).rkey = $1) 98 112 ) 99 113 WHERE id = (SELECT subject_actor_id FROM follow_to_delete) 100 114 AND EXISTS ( 101 115 SELECT 1 FROM unnest(followers) f 102 - WHERE (f).actor_id = $2 AND (f).rkey = $1 116 + WHERE (f).subject_actor_id = $2 AND (f).rkey = $1 103 117 ) 104 118 RETURNING 1 105 119 ) ··· 243 257 244 258 let (list_did, _collection, list_rkey) = (parts[0], parts[1], parts[2]); 245 259 246 - // Resolve list owner DID to actor_id 247 - let list_actor_id: i32 = conn.query_one( 248 - "INSERT INTO actors (did, handle, created_at) 249 - VALUES ($1, NULL, NOW()) 250 - ON CONFLICT (did) DO UPDATE SET did = EXCLUDED.did 251 - RETURNING id", 252 - &[&list_did], 253 - ) 254 - .await? 255 - .get(0); 260 + // Resolve list owner DID to actor_id (creates stub actor if needed) 261 + let (list_actor_id, _, _) = super::feed::get_actor_id(conn, list_did).await?; 256 262 257 263 // Acquire advisory lock on record to prevent deadlocks 258 264 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("list_blocks", actor_id, rkey); 259 265 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; 260 266 261 267 // Append to list_blocks array with natural key deduplication 268 + // list_block_record has fields: (list_actor_id, list_rkey, rkey, cid) 262 269 // Returns number of rows updated (1 if inserted, 0 if duplicate) 263 270 let rows = conn.execute( 264 271 "UPDATE actors ··· 267 274 WHERE id = $1 268 275 AND NOT EXISTS ( 269 276 SELECT 1 FROM unnest(list_blocks) lb 270 - WHERE (lb).rkey = $2 277 + WHERE (lb).rkey = $4 271 278 )", 272 279 &[ 273 280 &actor_id, 274 - &rkey, 275 - &cid_digest, 276 - &list_actor_id, 277 - &list_rkey, 281 + &list_actor_id, // $2: list_actor_id 282 + &list_rkey, // $3: list_rkey 283 + &rkey, // $4: rkey 284 + &cid_digest, // $5: cid 278 285 ], 279 286 ) 280 287 .await ··· 320 327 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("list_items", actor_id, rkey); 321 328 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; 322 329 323 - // Resolve list_id by creating stub list if needed 324 - let list_id = super::feed::ensure_list_id(conn, &rec.list).await?; 330 + // Resolve list natural keys by creating stub list if needed 331 + let (list_owner_actor_id, list_rkey) = super::feed::ensure_list_natural_key(conn, &rec.list).await?; 325 332 326 333 conn.execute( 327 334 include_str!("../sql/list_item_upsert.sql"), ··· 331 338 &cid_digest, 332 339 // Note: created_at (param $4) removed - derived from TID rkey 333 340 &rkey, 334 - &list_id, // Pass resolved list_id directly 341 + &list_owner_actor_id, // Pass resolved list_owner_actor_id (may be NULL) 342 + &list_rkey, // Pass resolved list_rkey (may be NULL) 335 343 ], 336 344 ) 337 345 .await
+13 -15
consumer/src/db/record_exists/queries.rs
··· 53 53 Ok(row.is_some()) 54 54 } 55 55 56 - /// Check if a follow exists 56 + /// Check if a follow exists in the actor's following[] array 57 57 pub async fn follow_exists<C: GenericClient>(conn: &C, did: &str, rkey: i64) -> QueryResult<bool> { 58 58 let row = conn 59 59 .query_opt( 60 - "SELECT 1 FROM follows f 61 - INNER JOIN actors a ON f.actor_id = a.id 62 - WHERE a.did = $1 AND f.rkey = $2", 60 + "SELECT 1 FROM actors a, unnest(COALESCE(a.following, ARRAY[]::follow_record[])) AS f 61 + WHERE a.did = $1 AND (f).rkey = $2", 63 62 &[&did, &rkey], 64 63 ) 65 64 .await?; 66 65 Ok(row.is_some()) 67 66 } 68 67 69 - /// Check if a block exists 68 + /// Check if a block exists in the actor's blocks[] array 70 69 pub async fn block_exists<C: GenericClient>(conn: &C, did: &str, rkey: i64) -> QueryResult<bool> { 71 70 let row = conn 72 71 .query_opt( 73 - "SELECT 1 FROM blocks b 74 - INNER JOIN actors a ON b.actor_id = a.id 75 - WHERE a.did = $1 AND b.rkey = $2", 72 + "SELECT 1 FROM actors a, unnest(COALESCE(a.blocks, ARRAY[]::block_record[])) AS b 73 + WHERE a.did = $1 AND (b).rkey = $2", 76 74 &[&did, &rkey], 77 75 ) 78 76 .await?; ··· 250 248 251 249 /// Check if a labeler exists and should skip fetching 252 250 /// 251 + /// DENORMALIZED: Labelers are now stored directly on actors table (labeler_status, labeler_cid, etc) 252 + /// 253 253 /// Returns true if the labeler exists with any status except 'stub'. 254 254 /// - stub: Return false (needs fetching) 255 255 /// - missing: Return true (permanently unfetchable, skip) ··· 258 258 pub async fn labeler_exists<C: GenericClient>(conn: &C, did: &str) -> QueryResult<bool> { 259 259 let row = conn 260 260 .query_opt( 261 - "SELECT 1 FROM labelers l 262 - INNER JOIN actors a ON l.actor_id = a.id 263 - WHERE a.did = $1 AND l.status != 'stub'::labeler_status", 261 + "SELECT 1 FROM actors 262 + WHERE did = $1 AND labeler_status IS NOT NULL AND labeler_status != 'stub'::labeler_status", 264 263 &[&did], 265 264 ) 266 265 .await?; ··· 279 278 Ok(row.is_some()) 280 279 } 281 280 282 - /// Check if a bookmark exists 281 + /// Check if a bookmark exists in the actor's bookmarks[] array 283 282 pub async fn bookmark_exists<C: GenericClient>( 284 283 conn: &C, 285 284 did: &str, ··· 287 286 ) -> QueryResult<bool> { 288 287 let row = conn 289 288 .query_opt( 290 - "SELECT 1 FROM bookmarks b 291 - INNER JOIN actors a ON b.actor_id = a.id 292 - WHERE a.did = $1 AND b.rkey = $2", 289 + "SELECT 1 FROM actors a, unnest(COALESCE(a.bookmarks, ARRAY[]::bookmark_record[])) AS b 290 + WHERE a.did = $1 AND (b).rkey = $2", 293 291 &[&did, &rkey], 294 292 ) 295 293 .await?;
+8 -5
consumer/src/db/sql/list_item_upsert.sql
··· 1 - -- Insert list_item with self-contained schema (no records table) 1 + -- Insert list_item with natural keys (no synthetic list_id) 2 2 -- This prevents foreign key violations when indexing list items 3 - -- Parameters: $1=actor_id(INT4), $2=subject_actor_id(i32), $3=cid(bytea), $4=rkey, $5=list_id(INT8) 3 + -- Parameters: $1=actor_id(INT4), $2=subject_actor_id(i32), $3=cid(bytea), $4=rkey, 4 + -- $5=list_owner_actor_id(INT4), $6=list_rkey(TEXT) 4 5 -- Note: created_at is derived from TID rkey 5 6 -- Note: actor_id is provided by caller after calling get_actor_id (ensures zero sequence waste) 6 - INSERT INTO list_items (actor_id, rkey, cid, list_id, subject_actor_id) 7 + INSERT INTO list_items (actor_id, rkey, cid, list_owner_actor_id, list_rkey, subject_actor_id) 7 8 SELECT 8 9 $1, -- actor_id (provided by caller) 9 10 $4, -- rkey (INT8) 10 11 $3, -- cid (embedded, already bytea) 11 - $5, -- list_id (already resolved, may be stub) 12 + $5, -- list_owner_actor_id (already resolved, may be NULL) 13 + $6, -- list_rkey (already resolved, may be NULL) 12 14 $2::int4 -- subject_actor_id (already resolved, cast to int4) 13 15 WHERE $1::int4 IS NOT NULL -- Only insert if owner exists 14 16 AND $2::int4 IS NOT NULL -- Only insert if subject exists 15 17 ON CONFLICT (actor_id, rkey) DO UPDATE SET 16 18 cid=EXCLUDED.cid, 17 - list_id=EXCLUDED.list_id, 19 + list_owner_actor_id=EXCLUDED.list_owner_actor_id, 20 + list_rkey=EXCLUDED.list_rkey, 18 21 subject_actor_id=EXCLUDED.subject_actor_id
+6 -5
consumer/src/workers/stub_resolution/queries.rs
··· 75 75 76 76 /// Find stub labelers and return their AT URIs for fetching 77 77 /// 78 + /// DENORMALIZED: Labelers are now stored directly on actors table (labeler_status, labeler_cid, etc) 79 + /// 78 80 /// This query finds labelers with status='stub', constructs their AT URIs, 79 81 /// and returns them for enqueuing to the fetch queue. 80 82 /// Limits to 100 records per batch to avoid overwhelming the fetch queue. 81 83 pub async fn find_stub_labelers<C: GenericClient>(conn: &C) -> QueryResult<Vec<String>> { 82 84 let rows = conn 83 85 .query( 84 - "SELECT 'at://' || a.did || '/app.bsky.labeler.service/self' as uri 85 - FROM labelers l 86 - INNER JOIN actors a ON l.actor_id = a.id 87 - WHERE l.status = 'stub'::labeler_status 88 - ORDER BY l.created_at ASC 86 + "SELECT 'at://' || did || '/app.bsky.labeler.service/self' as uri 87 + FROM actors 88 + WHERE labeler_status = 'stub'::labeler_status 89 + ORDER BY labeler_created_at ASC 89 90 LIMIT 100", 90 91 &[], 91 92 )
+28 -34
consumer/tests/graph_operations_test.rs
··· 85 85 .get(0); 86 86 assert_eq!(target_count, 1, "Target actor should be created"); 87 87 88 - // Verify follow relationship exists 88 + // Verify follow relationship exists in following[] array 89 89 let follow_count: i64 = tx 90 90 .query_one( 91 - "SELECT COUNT(*) FROM follows f 92 - INNER JOIN actors a ON f.actor_id = a.id 93 - WHERE a.did = $1 AND f.rkey = tid_to_i64($2)", 91 + "SELECT COUNT(*) FROM actors a, unnest(a.following) AS f 92 + WHERE a.did = $1 AND (f).rkey = tid_to_i64($2)", 94 93 &[&"did:plc:user123", &"3l7mkz4lmk245"], 95 94 ) 96 95 .await ··· 214 213 "Should return deleted target DID" 215 214 ); 216 215 217 - // Verify follow was deleted 216 + // Verify follow was deleted from following[] array 218 217 let follow_count: i64 = tx 219 218 .query_one( 220 - "SELECT COUNT(*) FROM follows f 221 - INNER JOIN actors a ON f.actor_id = a.id 222 - WHERE a.did = $1 AND f.rkey = tid_to_i64($2)", 219 + "SELECT COUNT(*) FROM actors a, unnest(COALESCE(a.following, ARRAY[]::follow_record[])) AS f 220 + WHERE a.did = $1 AND (f).rkey = tid_to_i64($2)", 223 221 &[&"did:plc:user789", &"3l7mkz4lmk247"], 224 222 ) 225 223 .await ··· 294 292 .get(0); 295 293 assert!(blocked_exists, "Blocked actor should exist"); 296 294 297 - // Verify block relationship 295 + // Verify block relationship in blocks[] array 298 296 let block_count: i64 = tx 299 297 .query_one( 300 - "SELECT COUNT(*) FROM blocks b 301 - INNER JOIN actors a ON b.actor_id = a.id 298 + "SELECT COUNT(*) FROM actors a, unnest(a.blocks) AS b 302 299 WHERE a.did = $1", 303 300 &[&"did:plc:blocker123"], 304 301 ) ··· 710 707 .get(0); 711 708 assert!(item_exists, "List item should exist"); 712 709 713 - // Verify list_id is set (not pending) 714 - let list_id_is_set: bool = tx 710 + // Verify list natural keys are set (not NULL) 711 + let list_keys_are_set: bool = tx 715 712 .query_one( 716 - "SELECT li.list_id IS NOT NULL 713 + "SELECT li.list_owner_actor_id IS NOT NULL AND li.list_rkey IS NOT NULL 717 714 FROM list_items li 718 715 INNER JOIN actors a ON li.actor_id = a.id 719 716 WHERE a.did = $1 AND li.rkey = tid_to_i64($2)", ··· 722 719 .await 723 720 .wrap_err("Failed to query list_items")? 724 721 .get(0); 725 - assert!(list_id_is_set, "List ID should be set (not pending)"); 722 + assert!(list_keys_are_set, "List natural keys should be set (not NULL)"); 726 723 Ok(()) 727 724 } 728 725 ··· 766 763 ); 767 764 assert_eq!(result.wrap_err("Operation failed")?, 1, "Should insert 1 row"); 768 765 769 - // Verify list_id is set (not NULL) - stub list was created 770 - let list_id_is_set: bool = tx 766 + // Verify list natural keys are set (not NULL) - stub list was created 767 + let list_keys_are_set: bool = tx 771 768 .query_one( 772 - "SELECT li.list_id IS NOT NULL 769 + "SELECT li.list_owner_actor_id IS NOT NULL AND li.list_rkey IS NOT NULL 773 770 FROM list_items li 774 771 INNER JOIN actors a ON li.actor_id = a.id 775 772 WHERE a.did = $1 AND li.rkey = tid_to_i64($2)", ··· 778 775 .await 779 776 .wrap_err("Failed to query list_items")? 780 777 .get(0); 781 - assert!(list_id_is_set, "List ID should be set (stub list created)"); 778 + assert!(list_keys_are_set, "List natural keys should be set (stub list created)"); 782 779 783 780 // Verify the stub list was created with correct status 784 781 let stub_list_row = tx ··· 938 935 ); 939 936 assert_eq!(result.wrap_err("Operation failed")?, 1, "Should insert 1 row"); 940 937 941 - // Verify list block exists 938 + // Verify list block exists in list_blocks[] array 942 939 let block_exists: bool = tx 943 940 .query_one( 944 941 "SELECT EXISTS( 945 - SELECT 1 FROM list_blocks lb 946 - INNER JOIN actors a ON lb.actor_id = a.id 947 - WHERE a.did = $1 AND lb.rkey = tid_to_i64($2) 942 + SELECT 1 FROM actors a, unnest(a.list_blocks) AS lb 943 + WHERE a.did = $1 AND (lb).rkey = tid_to_i64($2) 948 944 )", 949 945 &[&"did:plc:blocker7", &"3l7mkz4lmk24m"], 950 946 ) ··· 953 949 .get(0); 954 950 assert!(block_exists, "List block should exist"); 955 951 956 - // Verify list_id is set 957 - let list_id_is_set: bool = tx 952 + // Verify list natural keys are set 953 + let list_keys_are_set: bool = tx 958 954 .query_one( 959 - "SELECT lb.list_id IS NOT NULL 960 - FROM list_blocks lb 961 - INNER JOIN actors a ON lb.actor_id = a.id 962 - WHERE a.did = $1 AND lb.rkey = tid_to_i64($2)", 955 + "SELECT (lb).list_actor_id IS NOT NULL AND (lb).list_rkey IS NOT NULL 956 + FROM actors a, unnest(a.list_blocks) AS lb 957 + WHERE a.did = $1 AND (lb).rkey = tid_to_i64($2)", 963 958 &[&"did:plc:blocker7", &"3l7mkz4lmk24m"], 964 959 ) 965 960 .await 966 961 .wrap_err("Failed to query list_blocks")? 967 962 .get(0); 968 - assert!(list_id_is_set, "List ID should be set"); 963 + assert!(list_keys_are_set, "List natural keys should be set"); 969 964 Ok(()) 970 965 } 971 966 ··· 1028 1023 ); 1029 1024 assert_eq!(result.unwrap(), 1, "Should delete 1 row"); 1030 1025 1031 - // Verify list block was deleted 1026 + // Verify list block was deleted from list_blocks[] array 1032 1027 let block_exists: bool = tx 1033 1028 .query_one( 1034 1029 "SELECT EXISTS( 1035 - SELECT 1 FROM list_blocks lb 1036 - INNER JOIN actors a ON lb.actor_id = a.id 1037 - WHERE a.did = $1 AND lb.rkey = tid_to_i64($2) 1030 + SELECT 1 FROM actors a, unnest(COALESCE(a.list_blocks, ARRAY[]::list_block_record[])) AS lb 1031 + WHERE a.did = $1 AND (lb).rkey = tid_to_i64($2) 1038 1032 )", 1039 1033 &[&"did:plc:blocker8", &"3l7mkz4lmk24o"], 1040 1034 )
+6 -3
consumer/tests/notification_test.rs
··· 101 101 .wrap_err("Failed to get actor ID")? 102 102 .get(0); 103 103 104 - // Insert a thread mute using natural keys (root_post_actor_id, root_post_rkey) 105 - // This is the only place we manually INSERT as there's no function for it 104 + // Insert a thread mute into the actors.thread_mutes[] array 105 + // thread_mute_record has fields: (root_post_actor_id, root_post_rkey, created_at) 106 106 tx.execute( 107 - "INSERT INTO thread_mutes (actor_id, root_post_actor_id, root_post_rkey, created_at) VALUES ($1, $2, $3, NOW())", 107 + "UPDATE actors 108 + SET thread_mutes = COALESCE(thread_mutes, ARRAY[]::thread_mute_record[]) || 109 + ARRAY[ROW($2, $3, NOW())::thread_mute_record] 110 + WHERE id = $1", 108 111 &[&actor_id, &rootpostauthor_id, &rkey_i64], 109 112 ) 110 113 .await