//! Per-DID advisory locking for deadlock-free actor ensuring //! //! This module provides fine-grained, table-scoped advisory locks that prevent deadlocks //! when multiple workers concurrently ensure actors exist in the database. //! //! ## Lock Namespacing //! //! Locks are table-scoped using PostgreSQL's two-key advisory lock form: //! `pg_advisory_xact_lock(table_id, key_id)` where both are int4 (32-bit). //! //! This ensures that operations on different tables don't block each other: //! - Updating a profile won't block updating a status (different tables) //! - Inserting a post won't block inserting a like (different tables) //! - But two workers trying to update the same profile WILL block (same table + key) use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; /// Hash a string to a 32-bit signed integer for PostgreSQL advisory locks /// /// Uses DefaultHasher for fast, stable hashing within a process. /// Hash collisions are acceptable - they just cause false contention /// (performance impact only, not a correctness issue). fn hash_to_i32(s: &str) -> i32 { let mut hasher = DefaultHasher::new(); s.hash(&mut hasher); // Take lower 32 bits and cast to i32 // This is safe - we just reinterpret the bits hasher.finish() as i32 } /// Generate a table-scoped advisory lock ID pair /// /// Returns `(table_id, key_id)` for use with PostgreSQL's two-key advisory lock form: /// `pg_advisory_xact_lock(table_id, key_id)` /// /// # Arguments /// * `table_name` - The name of the table being locked (e.g., "profiles", "posts", "actors") /// * `key` - The key identifying the specific record (e.g., DID, URI) /// /// # Example /// ```ignore /// let (table_id, key_id) = table_record_lock("profiles", did); /// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?; /// ``` pub fn table_record_lock(table_name: &str, key: &str) -> (i32, i32) { let table_id = hash_to_i32(table_name); let key_id = hash_to_i32(key); (table_id, key_id) } /// Generate a table-scoped advisory lock using actor_id and rkey (i64 TID) /// /// Returns `(table_id, key_id)` for use with PostgreSQL's two-key advisory lock form. /// This is more efficient than constructing AT URIs for locking. /// /// # Arguments /// * `table_name` - The name of the table being locked (e.g., "likes", "reposts", "follows") /// * `actor_id` - The actor's database ID /// * `rkey` - The record key (TID already converted to i64) /// /// # Example /// ```ignore /// let (table_id, key_id) = actor_record_lock("likes", actor_id, rkey_i64); /// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?; /// ``` pub fn actor_record_lock(table_name: &str, actor_id: i32, rkey: i64) -> (i32, i32) { let table_id = hash_to_i32(table_name); // XOR actor_id with lower 32 bits of rkey for a unique lock key // This is safe and gives good distribution without string allocation let key_id = actor_id ^ (rkey as i32); (table_id, key_id) } /// Generate a table-scoped advisory lock using actor_id and rkey (string form) /// /// Use this for collections with arbitrary string rkeys (e.g., feedgens). /// For TID-based collections, use `actor_record_lock` instead. /// /// # Arguments /// * `table_name` - The name of the table being locked (e.g., "feedgens") /// * `actor_id` - The actor's database ID /// * `rkey` - The record key as string /// /// # Example /// ```ignore /// let (table_id, key_id) = actor_record_lock_str("feedgens", actor_id, rkey); /// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?; /// ``` pub fn actor_record_lock_str(table_name: &str, actor_id: i32, rkey: &str) -> (i32, i32) { let table_id = hash_to_i32(table_name); // Combine actor_id and rkey for a unique lock key let combined_key = format!("{}:{}", actor_id, rkey); let key_id = hash_to_i32(&combined_key); (table_id, key_id) } /// Acquire a single advisory lock /// /// This is a simple wrapper around the common pattern of: /// ```ignore /// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?; /// ``` /// /// The lock is automatically released when the transaction commits/rolls back. /// /// # Example /// ```ignore /// let (table_id, key_id) = table_record_lock("actors", did); /// acquire_lock(conn, table_id, key_id).await?; /// ``` pub async fn acquire_lock( conn: &C, table_id: i32, key_id: i32, ) -> eyre::Result<()> { conn.execute( "SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id], ) .await?; Ok(()) } /// Acquire transaction-level advisory locks for a set of DIDs /// /// Locks are acquired in sorted DID order to prevent deadlocks. /// All locks are automatically released when the transaction commits/rolls back. /// /// # Deadlock Prevention /// /// By sorting DIDs before acquiring locks, we ensure all workers acquire /// locks in the same order, preventing circular wait conditions: /// /// ```text /// Worker A: lock(alice), lock(bob) ✓ No deadlock /// Worker B: lock(alice), lock(bob) ✓ B waits for A to release alice /// ``` /// /// Without sorting: /// ```text /// Worker A: lock(alice), lock(bob) ✗ Deadlock! /// Worker B: lock(bob), lock(alice) ✗ A waits for B, B waits for A /// ``` /// /// # Performance /// /// All locks are acquired in a single PostgreSQL query, minimizing round-trips. /// Empty DID lists short-circuit without a database call. pub async fn acquire_did_locks( conn: &(impl tokio_postgres::GenericClient + Sync), dids: &[String], ) -> eyre::Result<()> { if dids.is_empty() { return Ok(()); } // Sort DIDs to ensure consistent lock ordering across all workers let mut sorted_dids: Vec<&str> = dids.iter().map(|d| d.as_str()).collect(); sorted_dids.sort_unstable(); // Deduplicate DIDs (same DID shouldn't be locked twice) sorted_dids.dedup(); // Convert to table-scoped lock IDs (actors table, DID key) let lock_pairs: Vec<(i32, i32)> = sorted_dids .iter() .map(|did| table_record_lock("actors", did)) .collect(); // Split into separate arrays for PostgreSQL let table_ids: Vec = lock_pairs.iter().map(|(t, _)| *t).collect(); let key_ids: Vec = lock_pairs.iter().map(|(_, k)| *k).collect(); // Acquire all locks in a single query (transaction-scoped) // pg_advisory_xact_lock automatically releases on commit/rollback // Use two-key form for table-scoped locking conn.execute( "SELECT pg_advisory_xact_lock(t, k) FROM unnest($1::int[], $2::int[]) AS locks(t, k)", &[&table_ids, &key_ids], ) .await?; tracing::trace!( "Acquired {} advisory locks for {} DIDs", lock_pairs.len(), sorted_dids.len() ); Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn test_table_record_lock_stability() { let did = "did:plc:abcdef123456"; let (t1, k1) = table_record_lock("profiles", did); let (t2, k2) = table_record_lock("profiles", did); assert_eq!(t1, t2, "Table ID should be stable for same table"); assert_eq!(k1, k2, "Key ID should be stable for same key"); } #[test] fn test_table_record_lock_different_tables() { let did = "did:plc:abcdef123456"; let (t1, k1) = table_record_lock("profiles", did); let (t2, k2) = table_record_lock("statuses", did); // Same DID, different tables should have different table IDs assert_ne!(t1, t2, "Different tables should have different table IDs"); // But same key ID (same DID) assert_eq!(k1, k2, "Same DID should have same key ID across tables"); } #[test] fn test_table_record_lock_different_keys() { let (t1, k1) = table_record_lock("profiles", "did:plc:alice"); let (t2, k2) = table_record_lock("profiles", "did:plc:bob"); // Same table, different keys should have same table ID assert_eq!(t1, t2, "Same table should have same table ID"); // But different key IDs assert_ne!(k1, k2, "Different DIDs should have different key IDs"); } #[test] fn test_table_record_lock_namespacing() { // The key insight: same DID on different tables = different locks let did = "did:plc:test123"; let profile_lock = table_record_lock("profiles", did); let status_lock = table_record_lock("statuses", did); let actor_lock = table_record_lock("actors", did); // All should be unique lock pairs assert_ne!(profile_lock, status_lock, "Profile and status locks should differ"); assert_ne!(profile_lock, actor_lock, "Profile and actor locks should differ"); assert_ne!(status_lock, actor_lock, "Status and actor locks should differ"); } }