1//! Per-DID advisory locking for deadlock-free actor ensuring
2//!
3//! This module provides fine-grained, table-scoped advisory locks that prevent deadlocks
4//! when multiple workers concurrently ensure actors exist in the database.
5//!
6//! ## Lock Namespacing
7//!
8//! Locks are table-scoped using PostgreSQL's two-key advisory lock form:
9//! `pg_advisory_xact_lock(table_id, key_id)` where both are int4 (32-bit).
10//!
11//! This ensures that operations on different tables don't block each other:
12//! - Updating a profile won't block updating a status (different tables)
13//! - Inserting a post won't block inserting a like (different tables)
14//! - But two workers trying to update the same profile WILL block (same table + key)
15
16use std::collections::hash_map::DefaultHasher;
17use std::hash::{Hash, Hasher};
18
19/// Hash a string to a 32-bit signed integer for PostgreSQL advisory locks
20///
21/// Uses DefaultHasher for fast, stable hashing within a process.
22/// Hash collisions are acceptable - they just cause false contention
23/// (performance impact only, not a correctness issue).
24fn hash_to_i32(s: &str) -> i32 {
25 let mut hasher = DefaultHasher::new();
26 s.hash(&mut hasher);
27
28 // Take lower 32 bits and cast to i32
29 // This is safe - we just reinterpret the bits
30 hasher.finish() as i32
31}
32
33/// Generate a table-scoped advisory lock ID pair
34///
35/// Returns `(table_id, key_id)` for use with PostgreSQL's two-key advisory lock form:
36/// `pg_advisory_xact_lock(table_id, key_id)`
37///
38/// # Arguments
39/// * `table_name` - The name of the table being locked (e.g., "profiles", "posts", "actors")
40/// * `key` - The key identifying the specific record (e.g., DID, URI)
41///
42/// # Example
43/// ```ignore
44/// let (table_id, key_id) = table_record_lock("profiles", did);
45/// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?;
46/// ```
47pub fn table_record_lock(table_name: &str, key: &str) -> (i32, i32) {
48 let table_id = hash_to_i32(table_name);
49 let key_id = hash_to_i32(key);
50 (table_id, key_id)
51}
52
53/// Generate a table-scoped advisory lock using actor_id and rkey (i64 TID)
54///
55/// Returns `(table_id, key_id)` for use with PostgreSQL's two-key advisory lock form.
56/// This is more efficient than constructing AT URIs for locking.
57///
58/// # Arguments
59/// * `table_name` - The name of the table being locked (e.g., "likes", "reposts", "follows")
60/// * `actor_id` - The actor's database ID
61/// * `rkey` - The record key (TID already converted to i64)
62///
63/// # Example
64/// ```ignore
65/// let (table_id, key_id) = actor_record_lock("likes", actor_id, rkey_i64);
66/// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?;
67/// ```
68pub fn actor_record_lock(table_name: &str, actor_id: i32, rkey: i64) -> (i32, i32) {
69 let table_id = hash_to_i32(table_name);
70 // XOR actor_id with lower 32 bits of rkey for a unique lock key
71 // This is safe and gives good distribution without string allocation
72 let key_id = actor_id ^ (rkey as i32);
73 (table_id, key_id)
74}
75
76/// Generate a table-scoped advisory lock using actor_id and rkey (string form)
77///
78/// Use this for collections with arbitrary string rkeys (e.g., feedgens).
79/// For TID-based collections, use `actor_record_lock` instead.
80///
81/// # Arguments
82/// * `table_name` - The name of the table being locked (e.g., "feedgens")
83/// * `actor_id` - The actor's database ID
84/// * `rkey` - The record key as string
85///
86/// # Example
87/// ```ignore
88/// let (table_id, key_id) = actor_record_lock_str("feedgens", actor_id, rkey);
89/// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?;
90/// ```
91pub fn actor_record_lock_str(table_name: &str, actor_id: i32, rkey: &str) -> (i32, i32) {
92 let table_id = hash_to_i32(table_name);
93 // Combine actor_id and rkey for a unique lock key
94 let combined_key = format!("{}:{}", actor_id, rkey);
95 let key_id = hash_to_i32(&combined_key);
96 (table_id, key_id)
97}
98
99/// Acquire a single advisory lock
100///
101/// This is a simple wrapper around the common pattern of:
102/// ```ignore
103/// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?;
104/// ```
105///
106/// The lock is automatically released when the transaction commits/rolls back.
107///
108/// # Example
109/// ```ignore
110/// let (table_id, key_id) = table_record_lock("actors", did);
111/// acquire_lock(conn, table_id, key_id).await?;
112/// ```
113pub async fn acquire_lock<C: deadpool_postgres::GenericClient>(
114 conn: &C,
115 table_id: i32,
116 key_id: i32,
117) -> eyre::Result<()> {
118 conn.execute(
119 "SELECT pg_advisory_xact_lock($1, $2)",
120 &[&table_id, &key_id],
121 )
122 .await?;
123 Ok(())
124}
125
126/// Acquire transaction-level advisory locks for a set of DIDs
127///
128/// Locks are acquired in sorted DID order to prevent deadlocks.
129/// All locks are automatically released when the transaction commits/rolls back.
130///
131/// # Deadlock Prevention
132///
133/// By sorting DIDs before acquiring locks, we ensure all workers acquire
134/// locks in the same order, preventing circular wait conditions:
135///
136/// ```text
137/// Worker A: lock(alice), lock(bob) ✓ No deadlock
138/// Worker B: lock(alice), lock(bob) ✓ B waits for A to release alice
139/// ```
140///
141/// Without sorting:
142/// ```text
143/// Worker A: lock(alice), lock(bob) ✗ Deadlock!
144/// Worker B: lock(bob), lock(alice) ✗ A waits for B, B waits for A
145/// ```
146///
147/// # Performance
148///
149/// All locks are acquired in a single PostgreSQL query, minimizing round-trips.
150/// Empty DID lists short-circuit without a database call.
151pub async fn acquire_did_locks(
152 conn: &(impl tokio_postgres::GenericClient + Sync),
153 dids: &[String],
154) -> eyre::Result<()> {
155 if dids.is_empty() {
156 return Ok(());
157 }
158
159 // Sort DIDs to ensure consistent lock ordering across all workers
160 let mut sorted_dids: Vec<&str> = dids.iter().map(|d| d.as_str()).collect();
161 sorted_dids.sort_unstable();
162
163 // Deduplicate DIDs (same DID shouldn't be locked twice)
164 sorted_dids.dedup();
165
166 // Convert to table-scoped lock IDs (actors table, DID key)
167 let lock_pairs: Vec<(i32, i32)> = sorted_dids
168 .iter()
169 .map(|did| table_record_lock("actors", did))
170 .collect();
171
172 // Split into separate arrays for PostgreSQL
173 let table_ids: Vec<i32> = lock_pairs.iter().map(|(t, _)| *t).collect();
174 let key_ids: Vec<i32> = lock_pairs.iter().map(|(_, k)| *k).collect();
175
176 // Acquire all locks in a single query (transaction-scoped)
177 // pg_advisory_xact_lock automatically releases on commit/rollback
178 // Use two-key form for table-scoped locking
179 conn.execute(
180 "SELECT pg_advisory_xact_lock(t, k) FROM unnest($1::int[], $2::int[]) AS locks(t, k)",
181 &[&table_ids, &key_ids],
182 )
183 .await?;
184
185 tracing::trace!(
186 "Acquired {} advisory locks for {} DIDs",
187 lock_pairs.len(),
188 sorted_dids.len()
189 );
190
191 Ok(())
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197
198 #[test]
199 fn test_table_record_lock_stability() {
200 let did = "did:plc:abcdef123456";
201 let (t1, k1) = table_record_lock("profiles", did);
202 let (t2, k2) = table_record_lock("profiles", did);
203 assert_eq!(t1, t2, "Table ID should be stable for same table");
204 assert_eq!(k1, k2, "Key ID should be stable for same key");
205 }
206
207 #[test]
208 fn test_table_record_lock_different_tables() {
209 let did = "did:plc:abcdef123456";
210 let (t1, k1) = table_record_lock("profiles", did);
211 let (t2, k2) = table_record_lock("statuses", did);
212
213 // Same DID, different tables should have different table IDs
214 assert_ne!(t1, t2, "Different tables should have different table IDs");
215 // But same key ID (same DID)
216 assert_eq!(k1, k2, "Same DID should have same key ID across tables");
217 }
218
219 #[test]
220 fn test_table_record_lock_different_keys() {
221 let (t1, k1) = table_record_lock("profiles", "did:plc:alice");
222 let (t2, k2) = table_record_lock("profiles", "did:plc:bob");
223
224 // Same table, different keys should have same table ID
225 assert_eq!(t1, t2, "Same table should have same table ID");
226 // But different key IDs
227 assert_ne!(k1, k2, "Different DIDs should have different key IDs");
228 }
229
230 #[test]
231 fn test_table_record_lock_namespacing() {
232 // The key insight: same DID on different tables = different locks
233 let did = "did:plc:test123";
234 let profile_lock = table_record_lock("profiles", did);
235 let status_lock = table_record_lock("statuses", did);
236 let actor_lock = table_record_lock("actors", did);
237
238 // All should be unique lock pairs
239 assert_ne!(profile_lock, status_lock, "Profile and status locks should differ");
240 assert_ne!(profile_lock, actor_lock, "Profile and actor locks should differ");
241 assert_ne!(status_lock, actor_lock, "Status and actor locks should differ");
242 }
243}