1use super::Result;
2use chrono::{DateTime, Utc};
3use deadpool_postgres::GenericClient;
4use eyre::Context as _;
5use ipld_core::cid::Cid;
6use parakeet_db::types::{ActorStatus, ActorSyncState};
7
8pub async fn actor_upsert<C: GenericClient>(
9 conn: &C,
10 did: &str,
11 status: Option<&ActorStatus>,
12 sync_state: &ActorSyncState,
13 handle: Option<&str>,
14 account_created_at: Option<&DateTime<Utc>>,
15 time: DateTime<Utc>,
16) -> Result<u64> {
17 // Acquire advisory lock on DID to prevent races
18 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did);
19 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
20
21 // Allow allowlist states (synced, dirty, processing) to flow freely
22 // Allow upgrading from partial to allowlist states
23 // Never downgrade from allowlist states to partial
24 //
25 // Account created_at is updated if provided and current value is NULL
26 // This allows enrichment during handle resolution without overwriting existing values
27 //
28 // CONSOLIDATED APPROACH (replaces 8 match arms with 1 UPDATE + 1 INSERT):
29 // - UPDATE: Use COALESCE to conditionally update only provided fields
30 // - INSERT: Use defaults for None values (matches database DEFAULT behavior)
31 //
32 // Note: status column is NOT NULL with DEFAULT 'active', so we provide
33 // the default in Rust when None is passed (can't insert explicit NULL)
34
35 // Use consolidated ActorUpdate API for UPDATE
36 use crate::db::operations::{ActorUpdate, ActorUpdateResult, ActorUpdateTarget};
37
38 let result = ActorUpdate {
39 target: ActorUpdateTarget::ByDid(did.to_string()),
40 actor_status: status.copied(),
41 handle: handle.map(|s| s.to_string()),
42 sync_state: Some(*sync_state),
43 sync_state_upgrade_only: true, // Prevent downgrades from allowlist states to partial
44 account_created_at: account_created_at.copied(),
45 account_created_at_coalesce: true, // Don't overwrite existing values
46 last_indexed: Some(time),
47 ..Default::default()
48 }
49 .execute(conn)
50 .await?;
51
52 let updated = match result {
53 ActorUpdateResult::Count(n) => n,
54 _ => unreachable!("ActorUpdate with Count returning should return Count"),
55 };
56
57 if updated == 0 {
58 // For INSERT, provide default values for NOT NULL columns when None is passed
59 let status_for_insert = status.unwrap_or(&ActorStatus::Active);
60
61 conn.execute(
62 "INSERT INTO actors (did, status, handle, sync_state, account_created_at, last_indexed)
63 VALUES ($1, $2, $3, $4, $5, $6)",
64 &[&did, &status_for_insert, &handle, &sync_state, &account_created_at, &time],
65 )
66 .await
67 .wrap_err_with(|| format!("Failed to upsert actor {}", did))
68 } else {
69 Ok(updated)
70 }
71}
72
73pub async fn actor_set_sync_status<C: GenericClient>(
74 conn: &C,
75 did: &str,
76 sync_state: &ActorSyncState,
77 time: DateTime<Utc>,
78) -> Result<u64> {
79 // Use consolidated ActorUpdate API for sync status update
80 use crate::db::operations::{ActorUpdate, ActorUpdateResult, ActorUpdateTarget};
81
82 let result = ActorUpdate {
83 target: ActorUpdateTarget::ByDid(did.to_string()),
84 sync_state: Some(*sync_state),
85 last_indexed: Some(time),
86 ..Default::default()
87 }
88 .execute(conn)
89 .await
90 .wrap_err_with(|| format!("Failed to set sync status for actor {}", did))?;
91
92 match result {
93 ActorUpdateResult::Count(n) => Ok(n),
94 _ => unreachable!("ActorUpdate with Count returning should return Count"),
95 }
96}
97
98pub async fn actor_set_repo_state<C: GenericClient>(
99 conn: &C,
100 did: &str,
101 rev: &str,
102 cid: Cid,
103) -> Result<u64> {
104 let cid_bytes = cid.to_bytes();
105 let cid_digest = parakeet_db::utils::cid::cid_to_digest(&cid_bytes)
106 .expect("CID must be valid AT Protocol CID");
107
108 // Use consolidated ActorUpdate API for repo state update
109 use crate::db::operations::{ActorUpdate, ActorUpdateResult, ActorUpdateTarget};
110
111 let result = ActorUpdate {
112 target: ActorUpdateTarget::ByDid(did.to_string()),
113 repo_rev: Some(rev.to_string()),
114 repo_cid: Some(cid_digest.to_vec()),
115 ..Default::default()
116 }
117 .execute(conn)
118 .await
119 .wrap_err_with(|| format!("Failed to set repo state for actor {}", did))?;
120
121 match result {
122 ActorUpdateResult::Count(n) => Ok(n),
123 _ => unreachable!("ActorUpdate with Count returning should return Count"),
124 }
125}
126
127pub async fn actor_get_statuses<C: GenericClient>(
128 conn: &C,
129 did: &str,
130) -> Result<Option<(ActorStatus, ActorSyncState)>> {
131 let res = conn
132 .query_opt(
133 "SELECT status, sync_state FROM actors WHERE did=$1 LIMIT 1",
134 &[&did],
135 )
136 .await
137 .wrap_err_with(|| format!("Failed to get statuses for actor {}", did))?;
138
139 Ok(res.map(|v| (v.get(0), v.get(1))))
140}
141
142/// Get the stored repo_rev for an actor (used for incremental backfill)
143pub async fn actor_get_repo_rev<C: GenericClient>(conn: &C, did: &str) -> Result<Option<String>> {
144 let row = conn
145 .query_opt("SELECT repo_rev FROM actors WHERE did = $1", &[&did])
146 .await
147 .wrap_err_with(|| format!("Failed to get repo_rev for actor {}", did))?;
148
149 Ok(row.and_then(|r| r.get(0)))
150}
151
152/// Ensure actor exists and return its ID
153/// Creates actor with Partial sync state if it doesn't exist
154/// This is used by the dispatcher to resolve actor_ids before routing operations
155///
156/// Uses INSERT ... ON CONFLICT to handle concurrent resolution workers safely.
157/// Returns existing actor_id if actor already exists.
158///
159/// Note: For cached version, use ensure_actor_id_with_cache or get_actor_id from feed::helpers.
160pub async fn ensure_actor_id<C: GenericClient>(
161 conn: &C,
162 did: &str,
163 status: Option<&ActorStatus>,
164 handle: Option<&str>,
165 time: DateTime<Utc>,
166) -> Result<i32> {
167 // Acquire per-DID advisory lock to prevent race conditions
168 // This ensures only one transaction at a time can create/access this specific actor
169 // Prevents sequence consumption from concurrent inserts or rollback scenarios
170 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did);
171 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
172
173 // Use CTE to SELECT first, then conditionally INSERT only if not found
174 // This prevents unnecessary sequence consumption when actor already exists
175 // The advisory lock ensures the WHERE NOT EXISTS check is atomic
176 let row = match (status, handle) {
177 (Some(status), Some(handle)) => {
178 conn.query_one(
179 "WITH existing AS (
180 SELECT id FROM actors WHERE did = $1
181 ),
182 inserted AS (
183 INSERT INTO actors (did, status, handle, sync_state, last_indexed)
184 SELECT $1, $2, $3, $4, $5
185 WHERE NOT EXISTS (SELECT 1 FROM existing)
186 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS
187 RETURNING id
188 )
189 SELECT COALESCE(
190 (SELECT id FROM existing),
191 (SELECT id FROM inserted)
192 ) as id",
193 &[&did, &status, &handle, &ActorSyncState::Partial, &time],
194 )
195 .await
196 }
197 (Some(status), None) => {
198 conn.query_one(
199 "WITH existing AS (
200 SELECT id FROM actors WHERE did = $1
201 ),
202 inserted AS (
203 INSERT INTO actors (did, status, sync_state, last_indexed)
204 SELECT $1, $2, $3, $4
205 WHERE NOT EXISTS (SELECT 1 FROM existing)
206 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS
207 RETURNING id
208 )
209 SELECT COALESCE(
210 (SELECT id FROM existing),
211 (SELECT id FROM inserted)
212 ) as id",
213 &[&did, &status, &ActorSyncState::Partial, &time],
214 )
215 .await
216 }
217 (None, Some(handle)) => {
218 conn.query_one(
219 "WITH existing AS (
220 SELECT id FROM actors WHERE did = $1
221 ),
222 inserted AS (
223 INSERT INTO actors (did, handle, sync_state, last_indexed)
224 SELECT $1, $2, $3, $4
225 WHERE NOT EXISTS (SELECT 1 FROM existing)
226 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS
227 RETURNING id
228 )
229 SELECT COALESCE(
230 (SELECT id FROM existing),
231 (SELECT id FROM inserted)
232 ) as id",
233 &[&did, &handle, &ActorSyncState::Partial, &time],
234 )
235 .await
236 }
237 (None, None) => {
238 conn.query_one(
239 "WITH existing AS (
240 SELECT id FROM actors WHERE did = $1
241 ),
242 inserted AS (
243 INSERT INTO actors (did, sync_state, last_indexed)
244 SELECT $1, $2, $3
245 WHERE NOT EXISTS (SELECT 1 FROM existing)
246 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS
247 RETURNING id
248 )
249 SELECT COALESCE(
250 (SELECT id FROM existing),
251 (SELECT id FROM inserted)
252 ) as id",
253 &[&did, &ActorSyncState::Partial, &time],
254 )
255 .await
256 }
257 }
258 .wrap_err_with(|| format!("Failed to ensure actor exists {}", did))?;
259
260 Ok(row.get(0))
261}
262
263/// Ensure actor exists and return its ID with allowlist status (cached version)
264///
265/// This is a cached wrapper around ensure_actor_id that:
266/// 1. Checks the cache first for a fast path
267/// 2. Falls back to database if not cached
268/// 3. Caches the result for future lookups
269///
270/// Returns (actor_id, is_allowlisted, was_created) where:
271/// - is_allowlisted indicates if the actor has sync_state IN ('synced', 'dirty', 'processing')
272/// - was_created indicates if a new actor stub was just created (vs. already existed)
273///
274/// Use this in hot paths like the database writer resolution phase.
275pub async fn ensure_actor_id_with_cache<C: GenericClient>(
276 conn: &C,
277 did: &str,
278 status: Option<&ActorStatus>,
279 handle: Option<&str>,
280 time: DateTime<Utc>,
281 cache: ¶keet_db::id_cache::IdCache,
282) -> Result<(i32, bool, bool)> {
283 // Fast path: Check cache first
284 // If cached, the actor already existed (was_created = false)
285 if let Some(cached) = cache.get_actor_id(did).await {
286 return Ok((cached.actor_id, cached.is_allowlisted, false));
287 }
288
289 // Slow path: Not in cache, do database operation
290 // Acquire per-DID advisory lock to prevent race conditions
291 let (table_id, key_id) = crate::database_writer::locking::table_record_lock("actors", did);
292 crate::database_writer::locking::acquire_lock(conn, table_id, key_id).await?;
293
294 // Modified CTE that also returns sync_state for allowlist check and was_created flag
295 let row = match (status, handle) {
296 (Some(status), Some(handle)) => {
297 conn.query_one(
298 "WITH existing AS (
299 SELECT id, sync_state FROM actors WHERE did = $1
300 ),
301 inserted AS (
302 INSERT INTO actors (did, status, handle, sync_state, last_indexed)
303 SELECT $1, $2, $3, $4, $5
304 WHERE NOT EXISTS (SELECT 1 FROM existing)
305 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS
306 RETURNING id, sync_state
307 )
308 SELECT
309 COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id,
310 COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted,
311 (SELECT id FROM existing) IS NULL as was_created",
312 &[&did, &status, &handle, &ActorSyncState::Partial, &time],
313 )
314 .await
315 }
316 (Some(status), None) => {
317 conn.query_one(
318 "WITH existing AS (
319 SELECT id, sync_state FROM actors WHERE did = $1
320 ),
321 inserted AS (
322 INSERT INTO actors (did, status, sync_state, last_indexed)
323 SELECT $1, $2, $3, $4
324 WHERE NOT EXISTS (SELECT 1 FROM existing)
325 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS
326 RETURNING id, sync_state
327 )
328 SELECT
329 COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id,
330 COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted,
331 (SELECT id FROM existing) IS NULL as was_created",
332 &[&did, &status, &ActorSyncState::Partial, &time],
333 )
334 .await
335 }
336 (None, Some(handle)) => {
337 conn.query_one(
338 "WITH existing AS (
339 SELECT id, sync_state FROM actors WHERE did = $1
340 ),
341 inserted AS (
342 INSERT INTO actors (did, handle, sync_state, last_indexed)
343 SELECT $1, $2, $3, $4
344 WHERE NOT EXISTS (SELECT 1 FROM existing)
345 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS
346 RETURNING id, sync_state
347 )
348 SELECT
349 COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id,
350 COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted,
351 (SELECT id FROM existing) IS NULL as was_created",
352 &[&did, &handle, &ActorSyncState::Partial, &time],
353 )
354 .await
355 }
356 (None, None) => {
357 conn.query_one(
358 "WITH existing AS (
359 SELECT id, sync_state FROM actors WHERE did = $1
360 ),
361 inserted AS (
362 INSERT INTO actors (did, sync_state, last_indexed)
363 SELECT $1, $2, $3
364 WHERE NOT EXISTS (SELECT 1 FROM existing)
365 ON CONFLICT DO NOTHING -- Never conflicts due to WHERE NOT EXISTS
366 RETURNING id, sync_state
367 )
368 SELECT
369 COALESCE((SELECT id FROM existing), (SELECT id FROM inserted)) as id,
370 COALESCE((SELECT sync_state FROM existing), (SELECT sync_state FROM inserted)) IN ('synced', 'dirty', 'processing') as is_allowlisted,
371 (SELECT id FROM existing) IS NULL as was_created",
372 &[&did, &ActorSyncState::Partial, &time],
373 )
374 .await
375 }
376 }
377 .wrap_err_with(|| format!("Failed to ensure actor exists {}", did))?;
378
379 let actor_id: i32 = row.get(0);
380 let is_allowlisted: bool = row.get(1);
381 let was_created: bool = row.get(2);
382
383 // Cache the result for future lookups
384 cache
385 .set_actor_id_with_allowlist(did.to_string(), actor_id, is_allowlisted)
386 .await;
387
388 Ok((actor_id, is_allowlisted, was_created))
389}
390
391/// Resolve actor DID to actor_id
392/// Returns None if actor doesn't exist
393///
394/// This is a simple lookup helper to consolidate the many inline queries
395/// scattered throughout the codebase.
396pub async fn actor_id_from_did<C: GenericClient>(
397 conn: &C,
398 did: &str,
399) -> Result<Option<i32>> {
400 let row = conn
401 .query_opt("SELECT id FROM actors WHERE did = $1", &[&did])
402 .await
403 .wrap_err_with(|| format!("Failed to resolve actor_id for DID {}", did))?;
404
405 Ok(row.map(|r| r.get(0)))
406}
407
408/// Resolve actor_id to DID (required version)
409/// Returns error if actor doesn't exist
410///
411/// This is a simple lookup helper to consolidate the many inline queries
412/// scattered throughout the codebase.
413pub async fn actor_did_from_id<C: GenericClient>(
414 conn: &C,
415 actor_id: i32,
416) -> Result<String> {
417 let row = conn
418 .query_one("SELECT did FROM actors WHERE id = $1", &[&actor_id])
419 .await
420 .wrap_err_with(|| format!("Failed to resolve DID for actor_id {}", actor_id))?;
421
422 Ok(row.get(0))
423}
424
425/// Resolve actor_id to DID (optional version)
426/// Returns None if actor doesn't exist
427///
428/// This is a simple lookup helper to consolidate the many inline queries
429/// scattered throughout the codebase.
430pub async fn actor_did_from_id_opt<C: GenericClient>(
431 conn: &C,
432 actor_id: i32,
433) -> Result<Option<String>> {
434 let row = conn
435 .query_opt("SELECT did FROM actors WHERE id = $1", &[&actor_id])
436 .await
437 .wrap_err_with(|| format!("Failed to resolve DID for actor_id {}", actor_id))?;
438
439 Ok(row.map(|r| r.get(0)))
440}