Highly ambitious ATProtocol AppView service and sdks
at main 12 kB view raw
1//! Actor management operations. 2//! 3//! This module handles database operations for ATProto actors (users/DIDs) 4//! tracked within slices, including batch insertion, querying, and filtering. 5 6use super::client::Database; 7use super::types::{WhereClause, WhereCondition}; 8use crate::errors::DatabaseError; 9use crate::models::Actor; 10 11impl Database { 12 /// Inserts multiple actors in batches with conflict resolution. 13 /// 14 /// Updates handle and indexed_at if an actor already exists for the 15 /// (did, slice_uri) pair. 16 pub async fn batch_insert_actors(&self, actors: &[Actor]) -> Result<(), DatabaseError> { 17 if actors.is_empty() { 18 return Ok(()); 19 } 20 21 let mut tx = self.pool.begin().await?; 22 23 const CHUNK_SIZE: usize = 1000; 24 25 for chunk in actors.chunks(CHUNK_SIZE) { 26 for actor in chunk { 27 sqlx::query!( 28 r#"INSERT INTO "actor" ("did", "handle", "slice_uri", "indexed_at") 29 VALUES ($1, $2, $3, $4) 30 ON CONFLICT ("did", "slice_uri") 31 DO UPDATE SET 32 "handle" = EXCLUDED."handle", 33 "indexed_at" = EXCLUDED."indexed_at""#, 34 actor.did, 35 actor.handle, 36 actor.slice_uri, 37 actor.indexed_at 38 ) 39 .execute(&mut *tx) 40 .await?; 41 } 42 } 43 44 tx.commit().await?; 45 Ok(()) 46 } 47 48 /// Queries actors for a slice with advanced filtering and cursor-based pagination. 49 /// 50 /// Supports: 51 /// - Complex WHERE conditions (AND/OR, eq/in/contains operators) 52 /// - Cursor-based pagination 53 /// 54 /// # Returns 55 /// Tuple of (actors, next_cursor) where cursor is the last DID 56 pub async fn get_slice_actors( 57 &self, 58 slice_uri: &str, 59 limit: Option<i32>, 60 cursor: Option<&str>, 61 where_clause: Option<&WhereClause>, 62 ) -> Result<(Vec<Actor>, Option<String>), DatabaseError> { 63 let limit = limit.unwrap_or(50).min(100); 64 65 let mut where_clauses = vec![format!("slice_uri = $1")]; 66 let mut param_count = 2; 67 68 // Build WHERE conditions for actors (handle table columns properly) 69 let (and_conditions, or_conditions) = 70 build_actor_where_conditions(where_clause, &mut param_count); 71 where_clauses.extend(and_conditions); 72 73 if !or_conditions.is_empty() { 74 let or_clause = format!("({})", or_conditions.join(" OR ")); 75 where_clauses.push(or_clause); 76 } 77 78 // Add cursor condition 79 if let Some(_cursor_did) = cursor { 80 where_clauses.push(format!("did > ${}", param_count)); 81 param_count += 1; 82 } 83 84 let where_sql = format!("WHERE {}", where_clauses.join(" AND ")); 85 86 let query = format!( 87 r#" 88 SELECT did, handle, slice_uri, indexed_at 89 FROM actor 90 {} 91 ORDER BY did ASC 92 LIMIT ${} 93 "#, 94 where_sql, param_count 95 ); 96 97 let mut sqlx_query = sqlx::query_as::<_, Actor>(&query); 98 99 // Bind parameters in order 100 sqlx_query = sqlx_query.bind(slice_uri); 101 102 // Bind WHERE clause parameters 103 if let Some(clause) = where_clause { 104 for condition in clause.conditions.values() { 105 if let Some(eq_value) = &condition.eq { 106 if let Some(str_val) = eq_value.as_str() { 107 sqlx_query = sqlx_query.bind(str_val); 108 } else { 109 sqlx_query = sqlx_query.bind(eq_value); 110 } 111 } 112 if let Some(in_values) = &condition.in_values { 113 let str_values: Vec<String> = in_values 114 .iter() 115 .filter_map(|v| v.as_str().map(|s| s.to_string())) 116 .collect(); 117 sqlx_query = sqlx_query.bind(str_values); 118 } 119 if let Some(contains_value) = &condition.contains { 120 sqlx_query = sqlx_query.bind(contains_value); 121 } 122 } 123 124 // Bind OR conditions 125 if let Some(or_conditions) = &clause.or_conditions { 126 for condition in or_conditions.values() { 127 if let Some(eq_value) = &condition.eq { 128 if let Some(str_val) = eq_value.as_str() { 129 sqlx_query = sqlx_query.bind(str_val); 130 } else { 131 sqlx_query = sqlx_query.bind(eq_value); 132 } 133 } 134 if let Some(in_values) = &condition.in_values { 135 let str_values: Vec<String> = in_values 136 .iter() 137 .filter_map(|v| v.as_str().map(|s| s.to_string())) 138 .collect(); 139 sqlx_query = sqlx_query.bind(str_values); 140 } 141 if let Some(contains_value) = &condition.contains { 142 sqlx_query = sqlx_query.bind(contains_value); 143 } 144 } 145 } 146 } 147 148 // Bind cursor parameter 149 if let Some(cursor_did) = cursor { 150 sqlx_query = sqlx_query.bind(cursor_did); 151 } 152 153 // Bind limit 154 sqlx_query = sqlx_query.bind(limit as i64); 155 156 let records = sqlx_query.fetch_all(&self.pool).await?; 157 158 let cursor = if records.len() < limit as usize { 159 None // Last page - no more results 160 } else { 161 records.last().map(|actor| actor.did.clone()) 162 }; 163 164 Ok((records, cursor)) 165 } 166 167 /// Gets all actors across all slices. 168 /// 169 /// # Returns 170 /// Vector of (did, slice_uri) tuples 171 pub async fn get_all_actors(&self) -> Result<Vec<(String, String)>, DatabaseError> { 172 let rows = sqlx::query!( 173 r#" 174 SELECT did, slice_uri 175 FROM actor 176 "# 177 ) 178 .fetch_all(&self.pool) 179 .await?; 180 181 Ok(rows 182 .into_iter() 183 .map(|row| (row.did, row.slice_uri)) 184 .collect()) 185 } 186 187 /// Checks if an actor has any records in a slice. 188 /// 189 /// Used before actor deletion to maintain referential integrity. 190 pub async fn actor_has_records( 191 &self, 192 did: &str, 193 slice_uri: &str, 194 ) -> Result<bool, DatabaseError> { 195 let count = sqlx::query!( 196 r#" 197 SELECT COUNT(*) as count 198 FROM record 199 WHERE did = $1 AND slice_uri = $2 200 "#, 201 did, 202 slice_uri 203 ) 204 .fetch_one(&self.pool) 205 .await?; 206 Ok(count.count.unwrap_or(0) > 0) 207 } 208 209 /// Deletes an actor from a specific slice. 210 /// 211 /// # Returns 212 /// Number of rows affected 213 pub async fn delete_actor(&self, did: &str, slice_uri: &str) -> Result<u64, DatabaseError> { 214 let result = sqlx::query!( 215 r#" 216 DELETE FROM actor 217 WHERE did = $1 AND slice_uri = $2 218 "#, 219 did, 220 slice_uri 221 ) 222 .execute(&self.pool) 223 .await?; 224 Ok(result.rows_affected()) 225 } 226 227 /// Deletes all actors for a specific slice. 228 /// 229 /// This is a destructive operation that removes all tracked actors 230 /// from the specified slice. Actors will be recreated when records 231 /// are re-indexed during sync. 232 /// 233 /// # Arguments 234 /// * `slice_uri` - AT-URI of the slice to clear 235 /// 236 /// # Returns 237 /// Number of actors deleted 238 pub async fn delete_all_actors_for_slice(&self, slice_uri: &str) -> Result<u64, DatabaseError> { 239 let result = sqlx::query!( 240 r#" 241 DELETE FROM actor 242 WHERE slice_uri = $1 243 "#, 244 slice_uri 245 ) 246 .execute(&self.pool) 247 .await?; 248 Ok(result.rows_affected()) 249 } 250 251 /// Resolves actor handles to DIDs for a specific slice. 252 /// 253 /// # Arguments 254 /// * `handles` - List of handles to resolve 255 /// * `slice_uri` - AT-URI of the slice 256 /// 257 /// # Returns 258 /// Vec of DIDs corresponding to the handles 259 pub async fn resolve_handles_to_dids( 260 &self, 261 handles: &[String], 262 slice_uri: &str, 263 ) -> Result<Vec<String>, DatabaseError> { 264 if handles.is_empty() { 265 return Ok(Vec::new()); 266 } 267 268 let placeholders: Vec<String> = (1..=handles.len()).map(|i| format!("${}", i)).collect(); 269 let query_sql = format!( 270 "SELECT DISTINCT did FROM actor WHERE handle = ANY(ARRAY[{}]) AND slice_uri = ${}", 271 placeholders.join(", "), 272 handles.len() + 1 273 ); 274 275 let mut query = sqlx::query_scalar::<_, String>(&query_sql); 276 for handle in handles { 277 query = query.bind(handle); 278 } 279 query = query.bind(slice_uri); 280 281 Ok(query.fetch_all(&self.pool).await?) 282 } 283 284 /// Resolves actor handles to DIDs using pattern matching (ILIKE). 285 /// 286 /// # Arguments 287 /// * `pattern` - Handle pattern to search for (partial match) 288 /// * `slice_uri` - AT-URI of the slice 289 /// 290 /// # Returns 291 /// Vec of DIDs matching the handle pattern 292 pub async fn resolve_handle_pattern_to_dids( 293 &self, 294 pattern: &str, 295 slice_uri: &str, 296 ) -> Result<Vec<String>, DatabaseError> { 297 let query_sql = "SELECT DISTINCT did FROM actor WHERE handle ILIKE '%' || $1 || '%' AND slice_uri = $2"; 298 299 let dids = sqlx::query_scalar::<_, String>(query_sql) 300 .bind(pattern) 301 .bind(slice_uri) 302 .fetch_all(&self.pool) 303 .await?; 304 305 Ok(dids) 306 } 307 308 /// Resolves actor handles to DIDs using fuzzy matching (trigram similarity). 309 /// 310 /// # Arguments 311 /// * `pattern` - Handle pattern to fuzzy match 312 /// * `slice_uri` - AT-URI of the slice 313 /// 314 /// # Returns 315 /// Vec of DIDs with similar handles 316 pub async fn resolve_handle_fuzzy_to_dids( 317 &self, 318 pattern: &str, 319 slice_uri: &str, 320 ) -> Result<Vec<String>, DatabaseError> { 321 let query_sql = "SELECT DISTINCT did FROM actor WHERE handle % $1 AND slice_uri = $2"; 322 323 let dids = sqlx::query_scalar::<_, String>(query_sql) 324 .bind(pattern) 325 .bind(slice_uri) 326 .fetch_all(&self.pool) 327 .await?; 328 329 Ok(dids) 330 } 331} 332 333/// Builds WHERE conditions specifically for actor queries. 334/// 335/// Unlike the general query builder, this handles actor table columns directly 336/// rather than treating them as JSON paths. 337fn build_actor_where_conditions( 338 where_clause: Option<&WhereClause>, 339 param_count: &mut usize, 340) -> (Vec<String>, Vec<String>) { 341 let mut where_clauses = Vec::new(); 342 let mut or_clauses = Vec::new(); 343 344 if let Some(clause) = where_clause { 345 for (field, condition) in &clause.conditions { 346 let field_clause = build_actor_single_condition(field, condition, param_count); 347 if !field_clause.is_empty() { 348 where_clauses.push(field_clause); 349 } 350 } 351 352 if let Some(or_conditions) = &clause.or_conditions { 353 for (field, condition) in or_conditions { 354 let field_clause = build_actor_single_condition(field, condition, param_count); 355 if !field_clause.is_empty() { 356 or_clauses.push(field_clause); 357 } 358 } 359 } 360 } 361 362 (where_clauses, or_clauses) 363} 364 365/// Builds a single SQL condition clause for actor fields. 366fn build_actor_single_condition( 367 field: &str, 368 condition: &WhereCondition, 369 param_count: &mut usize, 370) -> String { 371 if let Some(_eq_value) = &condition.eq { 372 let clause = format!("{} = ${}", field, param_count); 373 *param_count += 1; 374 clause 375 } else if let Some(_in_values) = &condition.in_values { 376 let clause = format!("{} = ANY(${})", field, param_count); 377 *param_count += 1; 378 clause 379 } else if let Some(_contains_value) = &condition.contains { 380 let clause = format!("{} ILIKE '%' || ${} || '%'", field, param_count); 381 *param_count += 1; 382 clause 383 } else { 384 String::new() 385 } 386}