···9696 (table_id, key_id)
9797}
98989999+/// Acquire a single advisory lock
100100+///
101101+/// This is a simple wrapper around the common pattern of:
102102+/// ```ignore
103103+/// conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id]).await?;
104104+/// ```
105105+///
106106+/// The lock is automatically released when the transaction commits/rolls back.
107107+///
108108+/// # Example
109109+/// ```ignore
110110+/// let (table_id, key_id) = table_record_lock("actors", did);
111111+/// acquire_lock(conn, table_id, key_id).await?;
112112+/// ```
113113+pub async fn acquire_lock<C: deadpool_postgres::GenericClient>(
114114+ conn: &C,
115115+ table_id: i32,
116116+ key_id: i32,
117117+) -> eyre::Result<()> {
118118+ conn.execute(
119119+ "SELECT pg_advisory_xact_lock($1, $2)",
120120+ &[&table_id, &key_id],
121121+ )
122122+ .await?;
123123+ Ok(())
124124+}
125125+99126/// Acquire transaction-level advisory locks for a set of DIDs
100127///
101128/// Locks are acquired in sorted DID order to prevent deadlocks.
+3-6
consumer/src/db/actor.rs
···1616) -> Result<u64> {
1717 // Acquire advisory lock on DID to prevent races
1818 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did);
1919- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
2020- .await?;
1919+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
21202221 // Allow allowlist states (synced, dirty, processing) to flow freely
2322 // Allow upgrading from partial to allowlist states
···327326 // This ensures only one transaction at a time can create/access this specific actor
328327 // Prevents sequence consumption from concurrent inserts or rollback scenarios
329328 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did);
330330- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
331331- .await?;
329329+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
332330333331 // Use CTE to SELECT first, then conditionally INSERT only if not found
334332 // This prevents unnecessary sequence consumption when actor already exists
···449447 // Slow path: Not in cache, do database operation
450448 // Acquire per-DID advisory lock to prevent race conditions
451449 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did);
452452- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
453453- .await?;
450450+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
454451455452 // Modified CTE that also returns sync_state for allowlist check and was_created flag
456453 let row = match (status, handle) {
+2-4
consumer/src/db/operations/feed/helpers.rs
···9393) -> Result<(i64, bool)> {
9494 // Acquire advisory lock on feedgen URI to prevent concurrent access races
9595 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("feedgens", at_uri);
9696- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
9797- .await?;
9696+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
98979998 // Parse the CID string to get the digest
10099 let cid = ipld_core::cid::Cid::try_from(cid_str)
···163162pub(super) async fn get_list_id<C: GenericClient>(conn: &C, at_uri: &str) -> Result<(i64, bool)> {
164163 // Acquire advisory lock on list URI to prevent concurrent access races
165164 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("lists", at_uri);
166166- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
167167- .await?;
165165+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
168166169167 // Extract owner DID and rkey from AT URI
170168 let did = parakeet_db::at_uri_util::extract_did(at_uri)
+1-2
consumer/src/db/operations/feed/like.rs
···5151 // Acquire advisory lock on record to prevent deadlocks
5252 let lock_start = std::time::Instant::now();
5353 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("likes", actor_id, rkey);
5454- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
5555- .await?;
5454+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
5655 let lock_ms = lock_start.elapsed().as_millis();
57565857 // Note: via_repost_id is already resolved in the reference extraction phase (workers.rs)
+1-2
consumer/src/db/operations/feed/post.rs
···76767777 // Acquire advisory lock using actor_id and rkey to prevent deadlocks
7878 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("posts", actor_id, rkey);
7979- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
8080- .await?;
7979+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
81808281 // Get parent and root post natural keys (create stubs if necessary)
8382 // First get the actor IDs, then get the post natural keys
+1-2
consumer/src/db/operations/feed/repost.rs
···44444545 // Acquire advisory lock on record to prevent deadlocks
4646 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("reposts", actor_id, rkey);
4747- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
4848- .await?;
4747+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
49485049 // Note: via_repost_id is already resolved in the reference extraction phase (workers.rs)
5150 // This ensures the FK constraint is satisfied before we reach this point
+6-12
consumer/src/db/operations/graph.rs
···2020) -> Result<u64> {
2121 // Acquire advisory lock on record to prevent deadlocks
2222 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("follows", actor_id, rkey);
2323- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
2424- .await?;
2323+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
25242625 // Insert follow - simple insert on primary key (actor_id, rkey)
2726 // Note: CID is synthetic, generated from actor_id + rkey
···6968) -> Result<u64> {
7069 // Acquire advisory lock on record to prevent deadlocks
7170 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("blocks", actor_id, rkey);
7272- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
7373- .await?;
7171+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
74727573 // Insert block - simple insert on primary key (actor_id, rkey)
7674 // Note: CID is synthetic, generated from actor_id + rkey
···114112115113 // Acquire table-scoped advisory lock on lists table for this record
116114 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock_str("lists", actor_id, rkey);
117117- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
118118- .await?;
115115+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
119116120117 conn.query_one(
121118 include_str!("../sql/list_upsert.sql"),
···162159163160 // Acquire advisory lock on record to prevent deadlocks
164161 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("list_blocks", actor_id, rkey);
165165- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
166166- .await?;
162162+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
167163168164 conn.execute(
169165 include_str!("../sql/list_block_upsert.sql"),
···210206211207 // Acquire advisory lock on record to prevent deadlocks
212208 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("list_items", actor_id, rkey);
213213- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
214214- .await?;
209209+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
215210216211 // Resolve list_id by creating stub list if needed
217212 let list_id = super::feed::ensure_list_id(conn, &rec.list).await?;
···262257263258 // Acquire advisory lock on record to prevent deadlocks
264259 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("verification", actor_id, rkey);
265265- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
266266- .await?;
260260+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
267261268262 // Insert verification - actors already resolved (no SELECT subquery needed)
269263 // Note: created_at is derived from TID rkey
+2-4
consumer/src/db/operations/labeler.rs
···2424 // Labeler URI is always at://{did}/app.bsky.labeler.service/self
2525 let labeler_uri = format!("at://{}/app.bsky.labeler.service/self", did);
2626 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("labelers", &labeler_uri);
2727- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
2828- .await?;
2727+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
29283029 // Parse the CID string to get the digest
3130 let cid = ipld_core::cid::Cid::try_from(cid_str)
···7574 // Acquire table-scoped advisory lock on labelers table for this actor
7675 // This prevents concurrent labeler updates for the same actor
7776 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("labelers", did);
7878- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
7979- .await?;
7777+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
80788179 let cid_bytes = cid.to_bytes();
8280 let cid_digest = parakeet_db::cid_util::cid_to_digest(&cid_bytes)
+1-2
consumer/src/db/operations/starter_pack.rs
···24242525 // Acquire advisory lock on record to prevent deadlocks
2626 let (table_id, key_id) = crate::database_writer::locking::actor_record_lock("starterpacks", actor_id, rkey);
2727- conn.execute("SELECT pg_advisory_xact_lock($1, $2)", &[&table_id, &key_id])
2828- .await?;
2727+ crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
29283029 // Resolve list_id by creating stub list if needed
3130 let list_id = super::feed::ensure_list_id(conn, &rec.list).await?;