Highly ambitious ATProtocol AppView service and sdks
1//! Actor management operations.
2//!
3//! This module handles database operations for ATProto actors (users/DIDs)
4//! tracked within slices, including batch insertion, querying, and filtering.
5
6use super::client::Database;
7use super::types::{WhereClause, WhereCondition};
8use crate::errors::DatabaseError;
9use crate::models::Actor;
10
11impl Database {
12 /// Inserts multiple actors in batches with conflict resolution.
13 ///
14 /// Updates handle and indexed_at if an actor already exists for the
15 /// (did, slice_uri) pair.
16 pub async fn batch_insert_actors(&self, actors: &[Actor]) -> Result<(), DatabaseError> {
17 if actors.is_empty() {
18 return Ok(());
19 }
20
21 let mut tx = self.pool.begin().await?;
22
23 const CHUNK_SIZE: usize = 1000;
24
25 for chunk in actors.chunks(CHUNK_SIZE) {
26 for actor in chunk {
27 sqlx::query!(
28 r#"INSERT INTO "actor" ("did", "handle", "slice_uri", "indexed_at")
29 VALUES ($1, $2, $3, $4)
30 ON CONFLICT ("did", "slice_uri")
31 DO UPDATE SET
32 "handle" = EXCLUDED."handle",
33 "indexed_at" = EXCLUDED."indexed_at""#,
34 actor.did,
35 actor.handle,
36 actor.slice_uri,
37 actor.indexed_at
38 )
39 .execute(&mut *tx)
40 .await?;
41 }
42 }
43
44 tx.commit().await?;
45 Ok(())
46 }
47
48 /// Queries actors for a slice with advanced filtering and cursor-based pagination.
49 ///
50 /// Supports:
51 /// - Complex WHERE conditions (AND/OR, eq/in/contains operators)
52 /// - Cursor-based pagination
53 ///
54 /// # Returns
55 /// Tuple of (actors, next_cursor) where cursor is the last DID
56 pub async fn get_slice_actors(
57 &self,
58 slice_uri: &str,
59 limit: Option<i32>,
60 cursor: Option<&str>,
61 where_clause: Option<&WhereClause>,
62 ) -> Result<(Vec<Actor>, Option<String>), DatabaseError> {
63 let limit = limit.unwrap_or(50).min(100);
64
65 let mut where_clauses = vec![format!("slice_uri = $1")];
66 let mut param_count = 2;
67
68 // Build WHERE conditions for actors (handle table columns properly)
69 let (and_conditions, or_conditions) =
70 build_actor_where_conditions(where_clause, &mut param_count);
71 where_clauses.extend(and_conditions);
72
73 if !or_conditions.is_empty() {
74 let or_clause = format!("({})", or_conditions.join(" OR "));
75 where_clauses.push(or_clause);
76 }
77
78 // Add cursor condition
79 if let Some(_cursor_did) = cursor {
80 where_clauses.push(format!("did > ${}", param_count));
81 param_count += 1;
82 }
83
84 let where_sql = format!("WHERE {}", where_clauses.join(" AND "));
85
86 let query = format!(
87 r#"
88 SELECT did, handle, slice_uri, indexed_at
89 FROM actor
90 {}
91 ORDER BY did ASC
92 LIMIT ${}
93 "#,
94 where_sql, param_count
95 );
96
97 let mut sqlx_query = sqlx::query_as::<_, Actor>(&query);
98
99 // Bind parameters in order
100 sqlx_query = sqlx_query.bind(slice_uri);
101
102 // Bind WHERE clause parameters
103 if let Some(clause) = where_clause {
104 for condition in clause.conditions.values() {
105 if let Some(eq_value) = &condition.eq {
106 if let Some(str_val) = eq_value.as_str() {
107 sqlx_query = sqlx_query.bind(str_val);
108 } else {
109 sqlx_query = sqlx_query.bind(eq_value);
110 }
111 }
112 if let Some(in_values) = &condition.in_values {
113 let str_values: Vec<String> = in_values
114 .iter()
115 .filter_map(|v| v.as_str().map(|s| s.to_string()))
116 .collect();
117 sqlx_query = sqlx_query.bind(str_values);
118 }
119 if let Some(contains_value) = &condition.contains {
120 sqlx_query = sqlx_query.bind(contains_value);
121 }
122 }
123
124 // Bind OR conditions
125 if let Some(or_conditions) = &clause.or_conditions {
126 for condition in or_conditions.values() {
127 if let Some(eq_value) = &condition.eq {
128 if let Some(str_val) = eq_value.as_str() {
129 sqlx_query = sqlx_query.bind(str_val);
130 } else {
131 sqlx_query = sqlx_query.bind(eq_value);
132 }
133 }
134 if let Some(in_values) = &condition.in_values {
135 let str_values: Vec<String> = in_values
136 .iter()
137 .filter_map(|v| v.as_str().map(|s| s.to_string()))
138 .collect();
139 sqlx_query = sqlx_query.bind(str_values);
140 }
141 if let Some(contains_value) = &condition.contains {
142 sqlx_query = sqlx_query.bind(contains_value);
143 }
144 }
145 }
146 }
147
148 // Bind cursor parameter
149 if let Some(cursor_did) = cursor {
150 sqlx_query = sqlx_query.bind(cursor_did);
151 }
152
153 // Bind limit
154 sqlx_query = sqlx_query.bind(limit as i64);
155
156 let records = sqlx_query.fetch_all(&self.pool).await?;
157
158 let cursor = if records.len() < limit as usize {
159 None // Last page - no more results
160 } else {
161 records.last().map(|actor| actor.did.clone())
162 };
163
164 Ok((records, cursor))
165 }
166
167 /// Gets all actors across all slices.
168 ///
169 /// # Returns
170 /// Vector of (did, slice_uri) tuples
171 pub async fn get_all_actors(&self) -> Result<Vec<(String, String)>, DatabaseError> {
172 let rows = sqlx::query!(
173 r#"
174 SELECT did, slice_uri
175 FROM actor
176 "#
177 )
178 .fetch_all(&self.pool)
179 .await?;
180
181 Ok(rows
182 .into_iter()
183 .map(|row| (row.did, row.slice_uri))
184 .collect())
185 }
186
187 /// Checks if an actor has any records in a slice.
188 ///
189 /// Used before actor deletion to maintain referential integrity.
190 pub async fn actor_has_records(
191 &self,
192 did: &str,
193 slice_uri: &str,
194 ) -> Result<bool, DatabaseError> {
195 let count = sqlx::query!(
196 r#"
197 SELECT COUNT(*) as count
198 FROM record
199 WHERE did = $1 AND slice_uri = $2
200 "#,
201 did,
202 slice_uri
203 )
204 .fetch_one(&self.pool)
205 .await?;
206 Ok(count.count.unwrap_or(0) > 0)
207 }
208
209 /// Deletes an actor from a specific slice.
210 ///
211 /// # Returns
212 /// Number of rows affected
213 pub async fn delete_actor(&self, did: &str, slice_uri: &str) -> Result<u64, DatabaseError> {
214 let result = sqlx::query!(
215 r#"
216 DELETE FROM actor
217 WHERE did = $1 AND slice_uri = $2
218 "#,
219 did,
220 slice_uri
221 )
222 .execute(&self.pool)
223 .await?;
224 Ok(result.rows_affected())
225 }
226
227 /// Deletes all actors for a specific slice.
228 ///
229 /// This is a destructive operation that removes all tracked actors
230 /// from the specified slice. Actors will be recreated when records
231 /// are re-indexed during sync.
232 ///
233 /// # Arguments
234 /// * `slice_uri` - AT-URI of the slice to clear
235 ///
236 /// # Returns
237 /// Number of actors deleted
238 pub async fn delete_all_actors_for_slice(&self, slice_uri: &str) -> Result<u64, DatabaseError> {
239 let result = sqlx::query!(
240 r#"
241 DELETE FROM actor
242 WHERE slice_uri = $1
243 "#,
244 slice_uri
245 )
246 .execute(&self.pool)
247 .await?;
248 Ok(result.rows_affected())
249 }
250
251 /// Resolves actor handles to DIDs for a specific slice.
252 ///
253 /// # Arguments
254 /// * `handles` - List of handles to resolve
255 /// * `slice_uri` - AT-URI of the slice
256 ///
257 /// # Returns
258 /// Vec of DIDs corresponding to the handles
259 pub async fn resolve_handles_to_dids(
260 &self,
261 handles: &[String],
262 slice_uri: &str,
263 ) -> Result<Vec<String>, DatabaseError> {
264 if handles.is_empty() {
265 return Ok(Vec::new());
266 }
267
268 let placeholders: Vec<String> = (1..=handles.len()).map(|i| format!("${}", i)).collect();
269 let query_sql = format!(
270 "SELECT DISTINCT did FROM actor WHERE handle = ANY(ARRAY[{}]) AND slice_uri = ${}",
271 placeholders.join(", "),
272 handles.len() + 1
273 );
274
275 let mut query = sqlx::query_scalar::<_, String>(&query_sql);
276 for handle in handles {
277 query = query.bind(handle);
278 }
279 query = query.bind(slice_uri);
280
281 Ok(query.fetch_all(&self.pool).await?)
282 }
283
284 /// Resolves actor handles to DIDs using pattern matching (ILIKE).
285 ///
286 /// # Arguments
287 /// * `pattern` - Handle pattern to search for (partial match)
288 /// * `slice_uri` - AT-URI of the slice
289 ///
290 /// # Returns
291 /// Vec of DIDs matching the handle pattern
292 pub async fn resolve_handle_pattern_to_dids(
293 &self,
294 pattern: &str,
295 slice_uri: &str,
296 ) -> Result<Vec<String>, DatabaseError> {
297 let query_sql = "SELECT DISTINCT did FROM actor WHERE handle ILIKE '%' || $1 || '%' AND slice_uri = $2";
298
299 let dids = sqlx::query_scalar::<_, String>(query_sql)
300 .bind(pattern)
301 .bind(slice_uri)
302 .fetch_all(&self.pool)
303 .await?;
304
305 Ok(dids)
306 }
307
308 /// Resolves actor handles to DIDs using fuzzy matching (trigram similarity).
309 ///
310 /// # Arguments
311 /// * `pattern` - Handle pattern to fuzzy match
312 /// * `slice_uri` - AT-URI of the slice
313 ///
314 /// # Returns
315 /// Vec of DIDs with similar handles
316 pub async fn resolve_handle_fuzzy_to_dids(
317 &self,
318 pattern: &str,
319 slice_uri: &str,
320 ) -> Result<Vec<String>, DatabaseError> {
321 let query_sql = "SELECT DISTINCT did FROM actor WHERE handle % $1 AND slice_uri = $2";
322
323 let dids = sqlx::query_scalar::<_, String>(query_sql)
324 .bind(pattern)
325 .bind(slice_uri)
326 .fetch_all(&self.pool)
327 .await?;
328
329 Ok(dids)
330 }
331}
332
333/// Builds WHERE conditions specifically for actor queries.
334///
335/// Unlike the general query builder, this handles actor table columns directly
336/// rather than treating them as JSON paths.
337fn build_actor_where_conditions(
338 where_clause: Option<&WhereClause>,
339 param_count: &mut usize,
340) -> (Vec<String>, Vec<String>) {
341 let mut where_clauses = Vec::new();
342 let mut or_clauses = Vec::new();
343
344 if let Some(clause) = where_clause {
345 for (field, condition) in &clause.conditions {
346 let field_clause = build_actor_single_condition(field, condition, param_count);
347 if !field_clause.is_empty() {
348 where_clauses.push(field_clause);
349 }
350 }
351
352 if let Some(or_conditions) = &clause.or_conditions {
353 for (field, condition) in or_conditions {
354 let field_clause = build_actor_single_condition(field, condition, param_count);
355 if !field_clause.is_empty() {
356 or_clauses.push(field_clause);
357 }
358 }
359 }
360 }
361
362 (where_clauses, or_clauses)
363}
364
365/// Builds a single SQL condition clause for actor fields.
366fn build_actor_single_condition(
367 field: &str,
368 condition: &WhereCondition,
369 param_count: &mut usize,
370) -> String {
371 if let Some(_eq_value) = &condition.eq {
372 let clause = format!("{} = ${}", field, param_count);
373 *param_count += 1;
374 clause
375 } else if let Some(_in_values) = &condition.in_values {
376 let clause = format!("{} = ANY(${})", field, param_count);
377 *param_count += 1;
378 clause
379 } else if let Some(_contains_value) = &condition.contains {
380 let clause = format!("{} ILIKE '%' || ${} || '%'", field, param_count);
381 *param_count += 1;
382 clause
383 } else {
384 String::new()
385 }
386}