use super::Result; use chrono::{DateTime, Utc}; use deadpool_postgres::GenericClient; use eyre::Context as _; use ipld_core::cid::Cid; use parakeet_db::types::{ActorStatus, ActorSyncState}; pub async fn actor_upsert( conn: &C, did: &str, status: Option<&ActorStatus>, sync_state: &ActorSyncState, handle: Option<&str>, account_created_at: Option<&DateTime>, time: DateTime, ) -> Result { // Acquire advisory lock on DID to prevent races let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did); crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; // Allow allowlist states (synced, dirty, processing) to flow freely // Allow upgrading from partial to allowlist states // Never downgrade from allowlist states to partial // // Account created_at is updated if provided and current value is NULL // This allows enrichment during handle resolution without overwriting existing values // // CONSOLIDATED APPROACH (replaces 8 match arms with 1 UPDATE + 1 INSERT): // - UPDATE: Use COALESCE to conditionally update only provided fields // - INSERT: Use defaults for None values (matches database DEFAULT behavior) // // Note: status column is NOT NULL with DEFAULT 'active', so we provide // the default in Rust when None is passed (can't insert explicit NULL) // Use consolidated ActorUpdate API for UPDATE use crate::db::operations::{ActorUpdate, ActorUpdateResult, ActorUpdateTarget}; let result = ActorUpdate { target: ActorUpdateTarget::ByDid(did.to_string()), actor_status: status.copied(), handle: handle.map(|s| s.to_string()), sync_state: Some(*sync_state), sync_state_upgrade_only: true, // Prevent downgrades from allowlist states to partial account_created_at: account_created_at.copied(), account_created_at_coalesce: true, // Don't overwrite existing values last_indexed: Some(time), ..Default::default() } .execute(conn) .await?; let updated = match result { ActorUpdateResult::Count(n) => n, _ => unreachable!("ActorUpdate with Count returning should return Count"), }; if updated == 0 { // For INSERT, provide default values for NOT NULL columns when None is passed let status_for_insert = status.unwrap_or(&ActorStatus::Active); conn.execute( "INSERT INTO actors (did, status, handle, sync_state, account_created_at, last_indexed) VALUES ($1, $2, $3, $4, $5, $6)", &[&did, &status_for_insert, &handle, &sync_state, &account_created_at, &time], ) .await .wrap_err_with(|| format!("Failed to upsert actor {}", did)) } else { Ok(updated) } } pub async fn actor_set_sync_status( conn: &C, did: &str, sync_state: &ActorSyncState, time: DateTime, ) -> Result { // Use consolidated ActorUpdate API for sync status update use crate::db::operations::{ActorUpdate, ActorUpdateResult, ActorUpdateTarget}; let result = ActorUpdate { target: ActorUpdateTarget::ByDid(did.to_string()), sync_state: Some(*sync_state), last_indexed: Some(time), ..Default::default() } .execute(conn) .await .wrap_err_with(|| format!("Failed to set sync status for actor {}", did))?; match result { ActorUpdateResult::Count(n) => Ok(n), _ => unreachable!("ActorUpdate with Count returning should return Count"), } } pub async fn actor_set_repo_state( conn: &C, did: &str, rev: &str, cid: Cid, ) -> Result { let cid_bytes = cid.to_bytes(); let cid_digest = parakeet_db::utils::cid::cid_to_digest(&cid_bytes) .expect("CID must be valid AT Protocol CID"); // Use consolidated ActorUpdate API for repo state update use crate::db::operations::{ActorUpdate, ActorUpdateResult, ActorUpdateTarget}; let result = ActorUpdate { target: ActorUpdateTarget::ByDid(did.to_string()), repo_rev: Some(rev.to_string()), repo_cid: Some(cid_digest.to_vec()), ..Default::default() } .execute(conn) .await .wrap_err_with(|| format!("Failed to set repo state for actor {}", did))?; match result { ActorUpdateResult::Count(n) => Ok(n), _ => unreachable!("ActorUpdate with Count returning should return Count"), } } pub async fn actor_get_statuses( conn: &C, did: &str, ) -> Result> { let res = conn .query_opt( "SELECT status, sync_state FROM actors WHERE did=$1 LIMIT 1", &[&did], ) .await .wrap_err_with(|| format!("Failed to get statuses for actor {}", did))?; Ok(res.map(|v| (v.get(0), v.get(1)))) } /// Get the stored repo_rev for an actor (used for incremental backfill) pub async fn actor_get_repo_rev(conn: &C, did: &str) -> Result> { let row = conn .query_opt("SELECT repo_rev FROM actors WHERE did = $1", &[&did]) .await .wrap_err_with(|| format!("Failed to get repo_rev for actor {}", did))?; Ok(row.and_then(|r| r.get(0))) } /// Ensure actor exists and return its ID /// Creates actor with Partial sync state if it doesn't exist /// This is used by the dispatcher to resolve actor_ids before routing operations /// /// Uses INSERT ... ON CONFLICT to handle concurrent resolution workers safely. /// Returns existing actor_id if actor already exists. /// /// Note: For cached version, use ensure_actor_id_with_cache or get_actor_id from feed::helpers. pub async fn ensure_actor_id( conn: &C, did: &str, status: Option<&ActorStatus>, handle: Option<&str>, time: DateTime, ) -> Result { // Acquire per-DID advisory lock to prevent race conditions // This ensures only one transaction at a time can create/access this specific actor // Prevents sequence consumption from concurrent inserts or rollback scenarios let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did); crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; // Use CTE to SELECT first, then conditionally INSERT only if not found // This prevents unnecessary sequence consumption when actor already exists // The advisory lock ensures the WHERE NOT EXISTS check is atomic let row = match (status, handle) { (Some(status), Some(handle)) => { conn.query_one( "WITH existing AS ( SELECT id FROM actors WHERE did = $1 ), inserted AS ( INSERT INTO actors (did, status, handle, sync_state, last_indexed) SELECT $1, $2, $3, $4, $5 WHERE NOT EXISTS (SELECT 1 FROM existing) ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS RETURNING id ) SELECT COALESCE( (SELECT id FROM existing), (SELECT id FROM inserted) ) as id", &[&did, &status, &handle, &ActorSyncState::Partial, &time], ) .await } (Some(status), None) => { conn.query_one( "WITH existing AS ( SELECT id FROM actors WHERE did = $1 ), inserted AS ( INSERT INTO actors (did, status, sync_state, last_indexed) SELECT $1, $2, $3, $4 WHERE NOT EXISTS (SELECT 1 FROM existing) ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS RETURNING id ) SELECT COALESCE( (SELECT id FROM existing), (SELECT id FROM inserted) ) as id", &[&did, &status, &ActorSyncState::Partial, &time], ) .await } (None, Some(handle)) => { conn.query_one( "WITH existing AS ( SELECT id FROM actors WHERE did = $1 ), inserted AS ( INSERT INTO actors (did, handle, sync_state, last_indexed) SELECT $1, $2, $3, $4 WHERE NOT EXISTS (SELECT 1 FROM existing) ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS RETURNING id ) SELECT COALESCE( (SELECT id FROM existing), (SELECT id FROM inserted) ) as id", &[&did, &handle, &ActorSyncState::Partial, &time], ) .await } (None, None) => { conn.query_one( "WITH existing AS ( SELECT id FROM actors WHERE did = $1 ), inserted AS ( INSERT INTO actors (did, sync_state, last_indexed) SELECT $1, $2, $3 WHERE NOT EXISTS (SELECT 1 FROM existing) ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS RETURNING id ) SELECT COALESCE( (SELECT id FROM existing), (SELECT id FROM inserted) ) as id", &[&did, &ActorSyncState::Partial, &time], ) .await } } .wrap_err_with(|| format!("Failed to ensure actor exists {}", did))?; Ok(row.get(0)) } /// Ensure actor exists and return its ID with allowlist status (cached version) /// /// This is a cached wrapper around ensure_actor_id that: /// 1. Checks the cache first for a fast path /// 2. Falls back to database if not cached /// 3. Caches the result for future lookups /// /// Returns (actor_id, is_allowlisted, was_created) where: /// - is_allowlisted indicates if the actor has sync_state IN ('synced', 'dirty', 'processing') /// - was_created indicates if a new actor stub was just created (vs. already existed) /// /// Use this in hot paths like the database writer resolution phase. pub async fn ensure_actor_id_with_cache( conn: &C, did: &str, status: Option<&ActorStatus>, handle: Option<&str>, time: DateTime, cache: ¶keet_db::id_cache::IdCache, ) -> Result<(i32, bool, bool)> { // Fast path: Check cache first // If cached, the actor already existed (was_created = false) if let Some(cached) = cache.get_actor_id(did).await { return Ok((cached.actor_id, cached.is_allowlisted, false)); } // Slow path: Not in cache, do database operation // Acquire per-DID advisory lock to prevent race conditions let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did); crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; // Modified CTE that also returns sync_state for allowlist check and was_created flag let row = match (status, handle) { (Some(status), Some(handle)) => { conn.query_one( "WITH existing AS ( SELECT id, sync_state FROM actors WHERE did = $1 ), inserted AS ( INSERT INTO actors (did, status, handle, sync_state, last_indexed) SELECT $1, $2, $3, $4, $5 WHERE NOT EXISTS (SELECT 1 FROM existing) ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS RETURNING id, sync_state ) SELECT COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id, COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted, (SELECT id FROM existing) IS NULL as was_created", &[&did, &status, &handle, &ActorSyncState::Partial, &time], ) .await } (Some(status), None) => { conn.query_one( "WITH existing AS ( SELECT id, sync_state FROM actors WHERE did = $1 ), inserted AS ( INSERT INTO actors (did, status, sync_state, last_indexed) SELECT $1, $2, $3, $4 WHERE NOT EXISTS (SELECT 1 FROM existing) ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS RETURNING id, sync_state ) SELECT COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id, COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted, (SELECT id FROM existing) IS NULL as was_created", &[&did, &status, &ActorSyncState::Partial, &time], ) .await } (None, Some(handle)) => { conn.query_one( "WITH existing AS ( SELECT id, sync_state FROM actors WHERE did = $1 ), inserted AS ( INSERT INTO actors (did, handle, sync_state, last_indexed) SELECT $1, $2, $3, $4 WHERE NOT EXISTS (SELECT 1 FROM existing) ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS RETURNING id, sync_state ) SELECT COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id, COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted, (SELECT id FROM existing) IS NULL as was_created", &[&did, &handle, &ActorSyncState::Partial, &time], ) .await } (None, None) => { conn.query_one( "WITH existing AS ( SELECT id, sync_state FROM actors WHERE did = $1 ), inserted AS ( INSERT INTO actors (did, sync_state, last_indexed) SELECT $1, $2, $3 WHERE NOT EXISTS (SELECT 1 FROM existing) ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS RETURNING id, sync_state ) SELECT COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id, COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted, (SELECT id FROM existing) IS NULL as was_created", &[&did, &ActorSyncState::Partial, &time], ) .await } } .wrap_err_with(|| format!("Failed to ensure actor exists {}", did))?; let actor_id: i32 = row.get(0); let is_allowlisted: bool = row.get(1); let was_created: bool = row.get(2); // Cache the result for future lookups cache .set_actor_id_with_allowlist(did.to_string(), actor_id, is_allowlisted) .await; Ok((actor_id, is_allowlisted, was_created)) } /// Resolve actor DID to actor_id /// Returns None if actor doesn't exist /// /// This is a simple lookup helper to consolidate the many inline queries /// scattered throughout the codebase. pub async fn actor_id_from_did( conn: &C, did: &str, ) -> Result> { let row = conn .query_opt("SELECT id FROM actors WHERE did = $1", &[&did]) .await .wrap_err_with(|| format!("Failed to resolve actor_id for DID {}", did))?; Ok(row.map(|r| r.get(0))) } /// Resolve actor_id to DID (required version) /// Returns error if actor doesn't exist /// /// This is a simple lookup helper to consolidate the many inline queries /// scattered throughout the codebase. pub async fn actor_did_from_id( conn: &C, actor_id: i32, ) -> Result { let row = conn .query_one("SELECT did FROM actors WHERE id = $1", &[&actor_id]) .await .wrap_err_with(|| format!("Failed to resolve DID for actor_id {}", actor_id))?; Ok(row.get(0)) } /// Resolve actor_id to DID (optional version) /// Returns None if actor doesn't exist /// /// This is a simple lookup helper to consolidate the many inline queries /// scattered throughout the codebase. pub async fn actor_did_from_id_opt( conn: &C, actor_id: i32, ) -> Result> { let row = conn .query_opt("SELECT did FROM actors WHERE id = $1", &[&actor_id]) .await .wrap_err_with(|| format!("Failed to resolve DID for actor_id {}", actor_id))?; Ok(row.map(|r| r.get(0))) }