Rust AppView - highly experimental!
at experiments 440 lines 18 kB view raw
1use super::Result; 2use chrono::{DateTime, Utc}; 3use deadpool_postgres::GenericClient; 4use eyre::Context as _; 5use ipld_core::cid::Cid; 6use parakeet_db::types::{ActorStatus, ActorSyncState}; 7 8pub async fn actor_upsert<C: GenericClient>( 9 conn: &C, 10 did: &str, 11 status: Option<&ActorStatus>, 12 sync_state: &ActorSyncState, 13 handle: Option<&str>, 14 account_created_at: Option<&DateTime<Utc>>, 15 time: DateTime<Utc>, 16) -> Result<u64> { 17 // Acquire advisory lock on DID to prevent races 18 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did); 19 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; 20 21 // Allow allowlist states (synced, dirty, processing) to flow freely 22 // Allow upgrading from partial to allowlist states 23 // Never downgrade from allowlist states to partial 24 // 25 // Account created_at is updated if provided and current value is NULL 26 // This allows enrichment during handle resolution without overwriting existing values 27 // 28 // CONSOLIDATED APPROACH (replaces 8 match arms with 1 UPDATE + 1 INSERT): 29 // - UPDATE: Use COALESCE to conditionally update only provided fields 30 // - INSERT: Use defaults for None values (matches database DEFAULT behavior) 31 // 32 // Note: status column is NOT NULL with DEFAULT 'active', so we provide 33 // the default in Rust when None is passed (can't insert explicit NULL) 34 35 // Use consolidated ActorUpdate API for UPDATE 36 use crate::db::operations::{ActorUpdate, ActorUpdateResult, ActorUpdateTarget}; 37 38 let result = ActorUpdate { 39 target: ActorUpdateTarget::ByDid(did.to_string()), 40 actor_status: status.copied(), 41 handle: handle.map(|s| s.to_string()), 42 sync_state: Some(*sync_state), 43 sync_state_upgrade_only: true, // Prevent downgrades from allowlist states to partial 44 account_created_at: account_created_at.copied(), 45 account_created_at_coalesce: true, // Don't overwrite existing values 46 last_indexed: Some(time), 47 ..Default::default() 48 } 49 .execute(conn) 50 .await?; 51 52 let updated = match result { 53 ActorUpdateResult::Count(n) => n, 54 _ => unreachable!("ActorUpdate with Count returning should return Count"), 55 }; 56 57 if updated == 0 { 58 // For INSERT, provide default values for NOT NULL columns when None is passed 59 let status_for_insert = status.unwrap_or(&ActorStatus::Active); 60 61 conn.execute( 62 "INSERT INTO actors (did, status, handle, sync_state, account_created_at, last_indexed) 63 VALUES ($1, $2, $3, $4, $5, $6)", 64 &[&did, &status_for_insert, &handle, &sync_state, &account_created_at, &time], 65 ) 66 .await 67 .wrap_err_with(|| format!("Failed to upsert actor {}", did)) 68 } else { 69 Ok(updated) 70 } 71} 72 73pub async fn actor_set_sync_status<C: GenericClient>( 74 conn: &C, 75 did: &str, 76 sync_state: &ActorSyncState, 77 time: DateTime<Utc>, 78) -> Result<u64> { 79 // Use consolidated ActorUpdate API for sync status update 80 use crate::db::operations::{ActorUpdate, ActorUpdateResult, ActorUpdateTarget}; 81 82 let result = ActorUpdate { 83 target: ActorUpdateTarget::ByDid(did.to_string()), 84 sync_state: Some(*sync_state), 85 last_indexed: Some(time), 86 ..Default::default() 87 } 88 .execute(conn) 89 .await 90 .wrap_err_with(|| format!("Failed to set sync status for actor {}", did))?; 91 92 match result { 93 ActorUpdateResult::Count(n) => Ok(n), 94 _ => unreachable!("ActorUpdate with Count returning should return Count"), 95 } 96} 97 98pub async fn actor_set_repo_state<C: GenericClient>( 99 conn: &C, 100 did: &str, 101 rev: &str, 102 cid: Cid, 103) -> Result<u64> { 104 let cid_bytes = cid.to_bytes(); 105 let cid_digest = parakeet_db::utils::cid::cid_to_digest(&cid_bytes) 106 .expect("CID must be valid AT Protocol CID"); 107 108 // Use consolidated ActorUpdate API for repo state update 109 use crate::db::operations::{ActorUpdate, ActorUpdateResult, ActorUpdateTarget}; 110 111 let result = ActorUpdate { 112 target: ActorUpdateTarget::ByDid(did.to_string()), 113 repo_rev: Some(rev.to_string()), 114 repo_cid: Some(cid_digest.to_vec()), 115 ..Default::default() 116 } 117 .execute(conn) 118 .await 119 .wrap_err_with(|| format!("Failed to set repo state for actor {}", did))?; 120 121 match result { 122 ActorUpdateResult::Count(n) => Ok(n), 123 _ => unreachable!("ActorUpdate with Count returning should return Count"), 124 } 125} 126 127pub async fn actor_get_statuses<C: GenericClient>( 128 conn: &C, 129 did: &str, 130) -> Result<Option<(ActorStatus, ActorSyncState)>> { 131 let res = conn 132 .query_opt( 133 "SELECT status, sync_state FROM actors WHERE did=$1 LIMIT 1", 134 &[&did], 135 ) 136 .await 137 .wrap_err_with(|| format!("Failed to get statuses for actor {}", did))?; 138 139 Ok(res.map(|v| (v.get(0), v.get(1)))) 140} 141 142/// Get the stored repo_rev for an actor (used for incremental backfill) 143pub async fn actor_get_repo_rev<C: GenericClient>(conn: &C, did: &str) -> Result<Option<String>> { 144 let row = conn 145 .query_opt("SELECT repo_rev FROM actors WHERE did = $1", &[&did]) 146 .await 147 .wrap_err_with(|| format!("Failed to get repo_rev for actor {}", did))?; 148 149 Ok(row.and_then(|r| r.get(0))) 150} 151 152/// Ensure actor exists and return its ID 153/// Creates actor with Partial sync state if it doesn't exist 154/// This is used by the dispatcher to resolve actor_ids before routing operations 155/// 156/// Uses INSERT ... ON CONFLICT to handle concurrent resolution workers safely. 157/// Returns existing actor_id if actor already exists. 158/// 159/// Note: For cached version, use ensure_actor_id_with_cache or get_actor_id from feed::helpers. 160pub async fn ensure_actor_id<C: GenericClient>( 161 conn: &C, 162 did: &str, 163 status: Option<&ActorStatus>, 164 handle: Option<&str>, 165 time: DateTime<Utc>, 166) -> Result<i32> { 167 // Acquire per-DID advisory lock to prevent race conditions 168 // This ensures only one transaction at a time can create/access this specific actor 169 // Prevents sequence consumption from concurrent inserts or rollback scenarios 170 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did); 171 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; 172 173 // Use CTE to SELECT first, then conditionally INSERT only if not found 174 // This prevents unnecessary sequence consumption when actor already exists 175 // The advisory lock ensures the WHERE NOT EXISTS check is atomic 176 let row = match (status, handle) { 177 (Some(status), Some(handle)) => { 178 conn.query_one( 179 "WITH existing AS ( 180 SELECT id FROM actors WHERE did = $1 181 ), 182 inserted AS ( 183 INSERT INTO actors (did, status, handle, sync_state, last_indexed) 184 SELECT $1, $2, $3, $4, $5 185 WHERE NOT EXISTS (SELECT 1 FROM existing) 186 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS 187 RETURNING id 188 ) 189 SELECT COALESCE( 190 (SELECT id FROM existing), 191 (SELECT id FROM inserted) 192 ) as id", 193 &[&did, &status, &handle, &ActorSyncState::Partial, &time], 194 ) 195 .await 196 } 197 (Some(status), None) => { 198 conn.query_one( 199 "WITH existing AS ( 200 SELECT id FROM actors WHERE did = $1 201 ), 202 inserted AS ( 203 INSERT INTO actors (did, status, sync_state, last_indexed) 204 SELECT $1, $2, $3, $4 205 WHERE NOT EXISTS (SELECT 1 FROM existing) 206 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS 207 RETURNING id 208 ) 209 SELECT COALESCE( 210 (SELECT id FROM existing), 211 (SELECT id FROM inserted) 212 ) as id", 213 &[&did, &status, &ActorSyncState::Partial, &time], 214 ) 215 .await 216 } 217 (None, Some(handle)) => { 218 conn.query_one( 219 "WITH existing AS ( 220 SELECT id FROM actors WHERE did = $1 221 ), 222 inserted AS ( 223 INSERT INTO actors (did, handle, sync_state, last_indexed) 224 SELECT $1, $2, $3, $4 225 WHERE NOT EXISTS (SELECT 1 FROM existing) 226 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS 227 RETURNING id 228 ) 229 SELECT COALESCE( 230 (SELECT id FROM existing), 231 (SELECT id FROM inserted) 232 ) as id", 233 &[&did, &handle, &ActorSyncState::Partial, &time], 234 ) 235 .await 236 } 237 (None, None) => { 238 conn.query_one( 239 "WITH existing AS ( 240 SELECT id FROM actors WHERE did = $1 241 ), 242 inserted AS ( 243 INSERT INTO actors (did, sync_state, last_indexed) 244 SELECT $1, $2, $3 245 WHERE NOT EXISTS (SELECT 1 FROM existing) 246 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS 247 RETURNING id 248 ) 249 SELECT COALESCE( 250 (SELECT id FROM existing), 251 (SELECT id FROM inserted) 252 ) as id", 253 &[&did, &ActorSyncState::Partial, &time], 254 ) 255 .await 256 } 257 } 258 .wrap_err_with(|| format!("Failed to ensure actor exists {}", did))?; 259 260 Ok(row.get(0)) 261} 262 263/// Ensure actor exists and return its ID with allowlist status (cached version) 264/// 265/// This is a cached wrapper around ensure_actor_id that: 266/// 1. Checks the cache first for a fast path 267/// 2. Falls back to database if not cached 268/// 3. Caches the result for future lookups 269/// 270/// Returns (actor_id, is_allowlisted, was_created) where: 271/// - is_allowlisted indicates if the actor has sync_state IN ('synced', 'dirty', 'processing') 272/// - was_created indicates if a new actor stub was just created (vs. already existed) 273/// 274/// Use this in hot paths like the database writer resolution phase. 275pub async fn ensure_actor_id_with_cache<C: GenericClient>( 276 conn: &C, 277 did: &str, 278 status: Option<&ActorStatus>, 279 handle: Option<&str>, 280 time: DateTime<Utc>, 281 cache: &parakeet_db::id_cache::IdCache, 282) -> Result<(i32, bool, bool)> { 283 // Fast path: Check cache first 284 // If cached, the actor already existed (was_created = false) 285 if let Some(cached) = cache.get_actor_id(did).await { 286 return Ok((cached.actor_id, cached.is_allowlisted, false)); 287 } 288 289 // Slow path: Not in cache, do database operation 290 // Acquire per-DID advisory lock to prevent race conditions 291 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did); 292 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?; 293 294 // Modified CTE that also returns sync_state for allowlist check and was_created flag 295 let row = match (status, handle) { 296 (Some(status), Some(handle)) => { 297 conn.query_one( 298 "WITH existing AS ( 299 SELECT id, sync_state FROM actors WHERE did = $1 300 ), 301 inserted AS ( 302 INSERT INTO actors (did, status, handle, sync_state, last_indexed) 303 SELECT $1, $2, $3, $4, $5 304 WHERE NOT EXISTS (SELECT 1 FROM existing) 305 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS 306 RETURNING id, sync_state 307 ) 308 SELECT 309 COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id, 310 COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted, 311 (SELECT id FROM existing) IS NULL as was_created", 312 &[&did, &status, &handle, &ActorSyncState::Partial, &time], 313 ) 314 .await 315 } 316 (Some(status), None) => { 317 conn.query_one( 318 "WITH existing AS ( 319 SELECT id, sync_state FROM actors WHERE did = $1 320 ), 321 inserted AS ( 322 INSERT INTO actors (did, status, sync_state, last_indexed) 323 SELECT $1, $2, $3, $4 324 WHERE NOT EXISTS (SELECT 1 FROM existing) 325 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS 326 RETURNING id, sync_state 327 ) 328 SELECT 329 COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id, 330 COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted, 331 (SELECT id FROM existing) IS NULL as was_created", 332 &[&did, &status, &ActorSyncState::Partial, &time], 333 ) 334 .await 335 } 336 (None, Some(handle)) => { 337 conn.query_one( 338 "WITH existing AS ( 339 SELECT id, sync_state FROM actors WHERE did = $1 340 ), 341 inserted AS ( 342 INSERT INTO actors (did, handle, sync_state, last_indexed) 343 SELECT $1, $2, $3, $4 344 WHERE NOT EXISTS (SELECT 1 FROM existing) 345 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS 346 RETURNING id, sync_state 347 ) 348 SELECT 349 COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id, 350 COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted, 351 (SELECT id FROM existing) IS NULL as was_created", 352 &[&did, &handle, &ActorSyncState::Partial, &time], 353 ) 354 .await 355 } 356 (None, None) => { 357 conn.query_one( 358 "WITH existing AS ( 359 SELECT id, sync_state FROM actors WHERE did = $1 360 ), 361 inserted AS ( 362 INSERT INTO actors (did, sync_state, last_indexed) 363 SELECT $1, $2, $3 364 WHERE NOT EXISTS (SELECT 1 FROM existing) 365 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS 366 RETURNING id, sync_state 367 ) 368 SELECT 369 COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id, 370 COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted, 371 (SELECT id FROM existing) IS NULL as was_created", 372 &[&did, &ActorSyncState::Partial, &time], 373 ) 374 .await 375 } 376 } 377 .wrap_err_with(|| format!("Failed to ensure actor exists {}", did))?; 378 379 let actor_id: i32 = row.get(0); 380 let is_allowlisted: bool = row.get(1); 381 let was_created: bool = row.get(2); 382 383 // Cache the result for future lookups 384 cache 385 .set_actor_id_with_allowlist(did.to_string(), actor_id, is_allowlisted) 386 .await; 387 388 Ok((actor_id, is_allowlisted, was_created)) 389} 390 391/// Resolve actor DID to actor_id 392/// Returns None if actor doesn't exist 393/// 394/// This is a simple lookup helper to consolidate the many inline queries 395/// scattered throughout the codebase. 396pub async fn actor_id_from_did<C: GenericClient>( 397 conn: &C, 398 did: &str, 399) -> Result<Option<i32>> { 400 let row = conn 401 .query_opt("SELECT id FROM actors WHERE did = $1", &[&did]) 402 .await 403 .wrap_err_with(|| format!("Failed to resolve actor_id for DID {}", did))?; 404 405 Ok(row.map(|r| r.get(0))) 406} 407 408/// Resolve actor_id to DID (required version) 409/// Returns error if actor doesn't exist 410/// 411/// This is a simple lookup helper to consolidate the many inline queries 412/// scattered throughout the codebase. 413pub async fn actor_did_from_id<C: GenericClient>( 414 conn: &C, 415 actor_id: i32, 416) -> Result<String> { 417 let row = conn 418 .query_one("SELECT did FROM actors WHERE id = $1", &[&actor_id]) 419 .await 420 .wrap_err_with(|| format!("Failed to resolve DID for actor_id {}", actor_id))?; 421 422 Ok(row.get(0)) 423} 424 425/// Resolve actor_id to DID (optional version) 426/// Returns None if actor doesn't exist 427/// 428/// This is a simple lookup helper to consolidate the many inline queries 429/// scattered throughout the codebase. 430pub async fn actor_did_from_id_opt<C: GenericClient>( 431 conn: &C, 432 actor_id: i32, 433) -> Result<Option<String>> { 434 let row = conn 435 .query_opt("SELECT did FROM actors WHERE id = $1", &[&actor_id]) 436 .await 437 .wrap_err_with(|| format!("Failed to resolve DID for actor_id {}", actor_id))?; 438 439 Ok(row.map(|r| r.get(0))) 440}