Rust AppView - highly experimental!
at experiments 8.9 kB view raw
1//! Per-DID advisory locking for deadlock-free actor ensuring 2//! 3//! This module provides fine-grained, table-scoped advisory locks that prevent deadlocks 4//! when multiple workers concurrently ensure actors exist in the database. 5//! 6//! ## Lock Namespacing 7//! 8//! Locks are table-scoped using PostgreSQL's two-key advisory lock form: 9//! `pg_advisory_xact_lock(table_id, key_id)` where both are int4 (32-bit). 10//! 11//! This ensures that operations on different tables don't block each other: 12//! - Updating a profile won't block updating a status (different tables) 13//! - Inserting a post won't block inserting a like (different tables) 14//! - But two workers trying to update the same profile WILL block (same table + key) 15 16use std::collections::hash_map::DefaultHasher; 17use std::hash::{Hash, Hasher}; 18 19/// Hash a string to a 32-bit signed integer for PostgreSQL advisory locks 20/// 21/// Uses DefaultHasher for fast, stable hashing within a process. 22/// Hash collisions are acceptable - they just cause false contention 23/// (performance impact only, not a correctness issue). 24fn hash_to_i32(s: &str) -> i32 { 25 let mut hasher = DefaultHasher::new(); 26 s.hash(&mut hasher); 27 28 // Take lower 32 bits and cast to i32 29 // This is safe - we just reinterpret the bits 30 hasher.finish() as i32 31} 32 33/// Generate a table-scoped advisory lock ID pair 34/// 35/// Returns `(table_id, key_id)` for use with PostgreSQL's two-key advisory lock form: 36/// `pg_advisory_xact_lock(table_id, key_id)` 37/// 38/// # Arguments 39/// * `table_name` - The name of the table being locked (e.g., "profiles", "posts", "actors") 40/// * `key` - The key identifying the specific record (e.g., DID, URI) 41/// 42/// # Example 43/// ```ignore 44/// let (table_id, key_id) = table_record_lock("profiles", did); 45/// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?; 46/// ``` 47pub fn table_record_lock(table_name: &str, key: &str) -> (i32, i32) { 48 let table_id = hash_to_i32(table_name); 49 let key_id = hash_to_i32(key); 50 (table_id, key_id) 51} 52 53/// Generate a table-scoped advisory lock using actor_id and rkey (i64 TID) 54/// 55/// Returns `(table_id, key_id)` for use with PostgreSQL's two-key advisory lock form. 56/// This is more efficient than constructing AT URIs for locking. 57/// 58/// # Arguments 59/// * `table_name` - The name of the table being locked (e.g., "likes", "reposts", "follows") 60/// * `actor_id` - The actor's database ID 61/// * `rkey` - The record key (TID already converted to i64) 62/// 63/// # Example 64/// ```ignore 65/// let (table_id, key_id) = actor_record_lock("likes", actor_id, rkey_i64); 66/// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?; 67/// ``` 68pub fn actor_record_lock(table_name: &str, actor_id: i32, rkey: i64) -> (i32, i32) { 69 let table_id = hash_to_i32(table_name); 70 // XOR actor_id with lower 32 bits of rkey for a unique lock key 71 // This is safe and gives good distribution without string allocation 72 let key_id = actor_id ^ (rkey as i32); 73 (table_id, key_id) 74} 75 76/// Generate a table-scoped advisory lock using actor_id and rkey (string form) 77/// 78/// Use this for collections with arbitrary string rkeys (e.g., feedgens). 79/// For TID-based collections, use `actor_record_lock` instead. 80/// 81/// # Arguments 82/// * `table_name` - The name of the table being locked (e.g., "feedgens") 83/// * `actor_id` - The actor's database ID 84/// * `rkey` - The record key as string 85/// 86/// # Example 87/// ```ignore 88/// let (table_id, key_id) = actor_record_lock_str("feedgens", actor_id, rkey); 89/// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?; 90/// ``` 91pub fn actor_record_lock_str(table_name: &str, actor_id: i32, rkey: &str) -> (i32, i32) { 92 let table_id = hash_to_i32(table_name); 93 // Combine actor_id and rkey for a unique lock key 94 let combined_key = format!("{}:{}", actor_id, rkey); 95 let key_id = hash_to_i32(&combined_key); 96 (table_id, key_id) 97} 98 99/// Acquire a single advisory lock 100/// 101/// This is a simple wrapper around the common pattern of: 102/// ```ignore 103/// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?; 104/// ``` 105/// 106/// The lock is automatically released when the transaction commits/rolls back. 107/// 108/// # Example 109/// ```ignore 110/// let (table_id, key_id) = table_record_lock("actors", did); 111/// acquire_lock(conn, table_id, key_id).await?; 112/// ``` 113pub async fn acquire_lock<C: deadpool_postgres::GenericClient>( 114 conn: &C, 115 table_id: i32, 116 key_id: i32, 117) -> eyre::Result<()> { 118 conn.execute( 119 "SELECT pg_advisory_xact_lock($1, $2)", 120 &[&table_id, &key_id], 121 ) 122 .await?; 123 Ok(()) 124} 125 126/// Acquire transaction-level advisory locks for a set of DIDs 127/// 128/// Locks are acquired in sorted DID order to prevent deadlocks. 129/// All locks are automatically released when the transaction commits/rolls back. 130/// 131/// # Deadlock Prevention 132/// 133/// By sorting DIDs before acquiring locks, we ensure all workers acquire 134/// locks in the same order, preventing circular wait conditions: 135/// 136/// ```text 137/// Worker A: lock(alice), lock(bob) ✓ No deadlock 138/// Worker B: lock(alice), lock(bob) ✓ B waits for A to release alice 139/// ``` 140/// 141/// Without sorting: 142/// ```text 143/// Worker A: lock(alice), lock(bob) ✗ Deadlock! 144/// Worker B: lock(bob), lock(alice) ✗ A waits for B, B waits for A 145/// ``` 146/// 147/// # Performance 148/// 149/// All locks are acquired in a single PostgreSQL query, minimizing round-trips. 150/// Empty DID lists short-circuit without a database call. 151pub async fn acquire_did_locks( 152 conn: &(impl tokio_postgres::GenericClient + Sync), 153 dids: &[String], 154) -> eyre::Result<()> { 155 if dids.is_empty() { 156 return Ok(()); 157 } 158 159 // Sort DIDs to ensure consistent lock ordering across all workers 160 let mut sorted_dids: Vec<&str> = dids.iter().map(|d| d.as_str()).collect(); 161 sorted_dids.sort_unstable(); 162 163 // Deduplicate DIDs (same DID shouldn't be locked twice) 164 sorted_dids.dedup(); 165 166 // Convert to table-scoped lock IDs (actors table, DID key) 167 let lock_pairs: Vec<(i32, i32)> = sorted_dids 168 .iter() 169 .map(|did| table_record_lock("actors", did)) 170 .collect(); 171 172 // Split into separate arrays for PostgreSQL 173 let table_ids: Vec<i32> = lock_pairs.iter().map(|(t, _)| *t).collect(); 174 let key_ids: Vec<i32> = lock_pairs.iter().map(|(_, k)| *k).collect(); 175 176 // Acquire all locks in a single query (transaction-scoped) 177 // pg_advisory_xact_lock automatically releases on commit/rollback 178 // Use two-key form for table-scoped locking 179 conn.execute( 180 "SELECT pg_advisory_xact_lock(t, k) FROM unnest($1::int[], $2::int[]) AS locks(t, k)", 181 &[&table_ids, &key_ids], 182 ) 183 .await?; 184 185 tracing::trace!( 186 "Acquired {} advisory locks for {} DIDs", 187 lock_pairs.len(), 188 sorted_dids.len() 189 ); 190 191 Ok(()) 192} 193 194#[cfg(test)] 195mod tests { 196 use super::*; 197 198 #[test] 199 fn test_table_record_lock_stability() { 200 let did = "did:plc:abcdef123456"; 201 let (t1, k1) = table_record_lock("profiles", did); 202 let (t2, k2) = table_record_lock("profiles", did); 203 assert_eq!(t1, t2, "Table ID should be stable for same table"); 204 assert_eq!(k1, k2, "Key ID should be stable for same key"); 205 } 206 207 #[test] 208 fn test_table_record_lock_different_tables() { 209 let did = "did:plc:abcdef123456"; 210 let (t1, k1) = table_record_lock("profiles", did); 211 let (t2, k2) = table_record_lock("statuses", did); 212 213 // Same DID, different tables should have different table IDs 214 assert_ne!(t1, t2, "Different tables should have different table IDs"); 215 // But same key ID (same DID) 216 assert_eq!(k1, k2, "Same DID should have same key ID across tables"); 217 } 218 219 #[test] 220 fn test_table_record_lock_different_keys() { 221 let (t1, k1) = table_record_lock("profiles", "did:plc:alice"); 222 let (t2, k2) = table_record_lock("profiles", "did:plc:bob"); 223 224 // Same table, different keys should have same table ID 225 assert_eq!(t1, t2, "Same table should have same table ID"); 226 // But different key IDs 227 assert_ne!(k1, k2, "Different DIDs should have different key IDs"); 228 } 229 230 #[test] 231 fn test_table_record_lock_namespacing() { 232 // The key insight: same DID on different tables = different locks 233 let did = "did:plc:test123"; 234 let profile_lock = table_record_lock("profiles", did); 235 let status_lock = table_record_lock("statuses", did); 236 let actor_lock = table_record_lock("actors", did); 237 238 // All should be unique lock pairs 239 assert_ne!(profile_lock, status_lock, "Profile and status locks should differ"); 240 assert_ne!(profile_lock, actor_lock, "Profile and actor locks should differ"); 241 assert_ne!(status_lock, actor_lock, "Status and actor locks should differ"); 242 } 243}