Rust AppView - highly experimental!

cleanup

+60 -347
-276
parakeet/src/common/cache_id_helpers.rs
··· 1 - //! IdCache helper functions with automatic database fallback 2 - //! 3 - //! These functions combine IdCache lookups with database queries, 4 - //! automatically fetching and caching missing entries. 5 - 6 - use crate::common::errors::{Error, XrpcResult}; 7 - use diesel::sql_types::{Array, Integer, Text}; 8 - use diesel_async::pooled_connection::deadpool::Pool; 9 - use diesel_async::{AsyncPgConnection, RunQueryDsl}; 10 - use parakeet_db::id_cache::{CachedActor, CachedActorData, IdCache}; 11 - use std::collections::HashMap; 12 - use std::sync::Arc; 13 - 14 - /// Get actor_id for a DID, fetching from database if not cached 15 - /// 16 - /// This automatically: 17 - /// 1. Checks IdCache for the DID 18 - /// 2. If miss, queries database 19 - /// 3. Updates cache with result 20 - /// 4. Returns the actor_id 21 - /// 22 - /// # Errors 23 - /// Returns error if DID not found in database 24 - pub async fn get_actor_id_or_fetch( 25 - pool: &Pool<AsyncPgConnection>, 26 - id_cache: &Arc<IdCache>, 27 - did: &str, 28 - ) -> XrpcResult<i32> { 29 - // Try cache first 30 - if let Some(actor_id) = id_cache.get_actor_id_only(did).await { 31 - return Ok(actor_id); 32 - } 33 - 34 - // Cache miss - query database 35 - let mut conn = pool.get().await.map_err(|e| { 36 - Error::new( 37 - axum::http::StatusCode::INTERNAL_SERVER_ERROR, 38 - "DatabaseError", 39 - Some(format!("Failed to get database connection: {}", e)), 40 - ) 41 - })?; 42 - 43 - #[derive(diesel::QueryableByName)] 44 - struct ActorRow { 45 - #[diesel(sql_type = Integer)] 46 - id: i32, 47 - #[diesel(sql_type = diesel::sql_types::Nullable<Text>)] 48 - handle: Option<String>, 49 - } 50 - 51 - let actor: ActorRow = diesel::sql_query( 52 - "SELECT id, handle 53 - FROM actors 54 - WHERE did = $1" 55 - ) 56 - .bind::<Text, _>(did) 57 - .get_result(&mut conn) 58 - .await 59 - .map_err(|e| match e { 60 - diesel::result::Error::NotFound => { 61 - Error::new( 62 - axum::http::StatusCode::NOT_FOUND, 63 - "ActorNotFound", 64 - Some(format!("Actor not found: {}", did)), 65 - ) 66 - } 67 - _ => Error::new( 68 - axum::http::StatusCode::INTERNAL_SERVER_ERROR, 69 - "DatabaseError", 70 - Some(format!("Failed to query actor: {}", e)), 71 - ), 72 - })?; 73 - 74 - // Cache both forward and reverse lookups 75 - id_cache.set_actor_id( 76 - did.to_string(), 77 - CachedActor { 78 - actor_id: actor.id, 79 - is_allowlisted: false, // Allowlist concept removed - default to false 80 - }, 81 - ).await; 82 - 83 - id_cache.set_actor_data( 84 - actor.id, 85 - CachedActorData { 86 - did: did.to_string(), 87 - handle: actor.handle, 88 - }, 89 - ).await; 90 - 91 - Ok(actor.id) 92 - } 93 - 94 - /// Get DIDs for multiple actor_ids, fetching from database for cache misses 95 - /// 96 - /// This automatically: 97 - /// 1. Checks IdCache for each actor_id 98 - /// 2. For cache misses, queries database in batch 99 - /// 3. Updates cache with results 100 - /// 4. Returns HashMap of actor_id → DID 101 - /// 102 - /// # Returns 103 - /// HashMap with all requested actor_ids (omits IDs not found in database) 104 - pub async fn get_actor_dids_or_fetch( 105 - pool: &Pool<AsyncPgConnection>, 106 - id_cache: &Arc<IdCache>, 107 - actor_ids: &[i32], 108 - ) -> XrpcResult<HashMap<i32, String>> { 109 - if actor_ids.is_empty() { 110 - return Ok(HashMap::new()); 111 - } 112 - 113 - // Try cache first 114 - let cached = id_cache.get_actor_data_many(actor_ids).await; 115 - let mut result: HashMap<i32, String> = cached 116 - .into_iter() 117 - .map(|(id, data)| (id, data.did)) 118 - .collect(); 119 - 120 - // Find cache misses 121 - let missing: Vec<i32> = actor_ids 122 - .iter() 123 - .filter(|id| !result.contains_key(id)) 124 - .copied() 125 - .collect(); 126 - 127 - if missing.is_empty() { 128 - return Ok(result); 129 - } 130 - 131 - // Query database for misses 132 - let mut conn = pool.get().await.map_err(|e| { 133 - Error::new( 134 - axum::http::StatusCode::INTERNAL_SERVER_ERROR, 135 - "DatabaseError", 136 - Some(format!("Failed to get database connection: {}", e)), 137 - ) 138 - })?; 139 - 140 - #[derive(diesel::QueryableByName)] 141 - struct ActorRow { 142 - #[diesel(sql_type = Integer)] 143 - id: i32, 144 - #[diesel(sql_type = Text)] 145 - did: String, 146 - #[diesel(sql_type = diesel::sql_types::Nullable<Text>)] 147 - handle: Option<String>, 148 - } 149 - 150 - let db_results: Vec<ActorRow> = diesel::sql_query( 151 - "SELECT id, did, handle 152 - FROM actors 153 - WHERE id = ANY($1)" 154 - ) 155 - .bind::<Array<Integer>, _>(&missing) 156 - .load(&mut conn) 157 - .await 158 - .map_err(|e| { 159 - Error::new( 160 - axum::http::StatusCode::INTERNAL_SERVER_ERROR, 161 - "DatabaseError", 162 - Some(format!("Failed to resolve actor DIDs: {}", e)), 163 - ) 164 - })?; 165 - 166 - // Update cache and result 167 - for row in db_results { 168 - id_cache.set_actor_data( 169 - row.id, 170 - CachedActorData { 171 - did: row.did.clone(), 172 - handle: row.handle, 173 - }, 174 - ).await; 175 - 176 - result.insert(row.id, row.did); 177 - } 178 - 179 - Ok(result) 180 - } 181 - 182 - /// Get actor_ids for multiple DIDs, fetching from database for cache misses 183 - /// 184 - /// This automatically: 185 - /// 1. Checks IdCache for each DID 186 - /// 2. For cache misses, queries database in batch 187 - /// 3. Updates cache with results 188 - /// 4. Returns HashMap of DID → actor_id 189 - /// 190 - /// # Returns 191 - /// HashMap with all requested DIDs (omits DIDs not found in database) 192 - pub async fn get_actor_ids_or_fetch( 193 - pool: &Pool<AsyncPgConnection>, 194 - id_cache: &Arc<IdCache>, 195 - dids: &[String], 196 - ) -> XrpcResult<HashMap<String, i32>> { 197 - if dids.is_empty() { 198 - return Ok(HashMap::new()); 199 - } 200 - 201 - // Try cache first 202 - let cached = id_cache.get_actor_ids(dids).await; 203 - let mut result: HashMap<String, i32> = cached 204 - .into_iter() 205 - .map(|(did, cached)| (did, cached.actor_id)) 206 - .collect(); 207 - 208 - // Find cache misses 209 - let missing: Vec<&str> = dids 210 - .iter() 211 - .filter(|did| !result.contains_key(did.as_str())) 212 - .map(|s| s.as_str()) 213 - .collect(); 214 - 215 - if missing.is_empty() { 216 - return Ok(result); 217 - } 218 - 219 - // Query database for misses 220 - let mut conn = pool.get().await.map_err(|e| { 221 - Error::new( 222 - axum::http::StatusCode::INTERNAL_SERVER_ERROR, 223 - "DatabaseError", 224 - Some(format!("Failed to get database connection: {}", e)), 225 - ) 226 - })?; 227 - 228 - #[derive(diesel::QueryableByName)] 229 - struct ActorRow { 230 - #[diesel(sql_type = Integer)] 231 - id: i32, 232 - #[diesel(sql_type = Text)] 233 - did: String, 234 - #[diesel(sql_type = diesel::sql_types::Nullable<Text>)] 235 - handle: Option<String>, 236 - } 237 - 238 - let db_results: Vec<ActorRow> = diesel::sql_query( 239 - "SELECT id, did, handle 240 - FROM actors 241 - WHERE did = ANY($1)" 242 - ) 243 - .bind::<Array<Text>, _>(&missing) 244 - .load(&mut conn) 245 - .await 246 - .map_err(|e| { 247 - Error::new( 248 - axum::http::StatusCode::INTERNAL_SERVER_ERROR, 249 - "DatabaseError", 250 - Some(format!("Failed to resolve actor IDs: {}", e)), 251 - ) 252 - })?; 253 - 254 - // Update cache and result 255 - for row in db_results { 256 - id_cache.set_actor_id( 257 - row.did.clone(), 258 - CachedActor { 259 - actor_id: row.id, 260 - is_allowlisted: false, // Allowlist concept removed - default to false 261 - }, 262 - ).await; 263 - 264 - id_cache.set_actor_data( 265 - row.id, 266 - CachedActorData { 267 - did: row.did.clone(), 268 - handle: row.handle, 269 - }, 270 - ).await; 271 - 272 - result.insert(row.did, row.id); 273 - } 274 - 275 - Ok(result) 276 - }
-2
parakeet/src/lib.rs
··· 10 10 //! Common infrastructure and utilities shared across the application 11 11 12 12 pub mod auth; 13 - pub mod cache_id_helpers; 14 13 pub mod cache_listener; 15 14 pub mod cache_timeline; 16 15 pub mod errors; ··· 19 18 20 19 // Re-export commonly used items 21 20 pub use auth::{AtpAcceptLabelers, AtpAuth, JwtVerifier}; 22 - pub use cache_id_helpers::{get_actor_id_or_fetch, get_actor_dids_or_fetch, get_actor_ids_or_fetch}; 23 21 pub use cache_listener::spawn_cache_listener; 24 22 pub use cache_timeline::{AuthorFeedCache, TimelineCache}; 25 23 pub use errors::{Error, XrpcResult};
+10 -7
parakeet/src/xrpc/app_bsky/actor.rs
··· 258 258 .into_response()); 259 259 } 260 260 261 - // Convert DIDs to actor_ids for efficient profile loading 262 - let did_to_actor_id = crate::common::get_actor_ids_or_fetch( 263 - &state.pool, 264 - &state.id_cache, 265 - &all_dids, 266 - ).await.unwrap_or_default(); 261 + // Convert DIDs to actor_ids for efficient profile loading using ProfileEntity 262 + let actor_ids = state.profile_entity.resolve_identifiers(&all_dids) 263 + .await 264 + .unwrap_or_default(); 267 265 268 - let actor_ids: Vec<i32> = did_to_actor_id.values().copied().collect(); 266 + // Create a mapping for compatibility with existing code 267 + let did_to_actor_id: std::collections::HashMap<String, i32> = all_dids 268 + .iter() 269 + .cloned() 270 + .zip(actor_ids.iter().copied()) 271 + .collect(); 269 272 270 273 // Load profiles to check quality using ProfileEntity 271 274 let profiles = state.profile_entity
+10 -22
parakeet/src/xrpc/app_bsky/graph/thread_mutes.rs
··· 18 18 use crate::common::errors::Error; 19 19 let mut conn = state.pool.get().await?; 20 20 21 - // Resolve authenticated user's actor_id via IdCache 22 - let actor_id = crate::common::get_actor_id_or_fetch( 23 - &state.pool, 24 - &state.id_cache, 25 - &auth.0, 26 - ).await?; 21 + // Resolve authenticated user's actor_id using ProfileEntity 22 + let actor_id = state.profile_entity.resolve_identifier(&auth.0).await 23 + .map_err(|_| Error::actor_not_found(&auth.0))?; 27 24 28 25 // Parse thread root URI and resolve to post_id 29 26 let parts = form.root.strip_prefix("at://").ok_or_else(|| Error::invalid_request(Some("Invalid AT URI".into())))? ··· 39 36 let rkey_bigint = parakeet_db::tid_util::decode_tid(rkey_str) 40 37 .map_err(|_| Error::invalid_request(Some("Invalid TID in root URI".into())))?; 41 38 42 - let root_post_actor_id = crate::common::get_actor_id_or_fetch( 43 - &state.pool, 44 - &state.id_cache, 45 - root_did, 46 - ).await?; 39 + let root_post_actor_id = state.profile_entity.resolve_identifier(root_did).await 40 + .map_err(|_| Error::actor_not_found(root_did))?; 47 41 48 42 // Append to thread_mutes array (off-protocol, managed directly by AppView) 49 43 // Deduplicates based on root_post_actor_id + root_post_rkey ··· 78 72 use crate::common::errors::Error; 79 73 let mut conn = state.pool.get().await?; 80 74 81 - // Resolve authenticated user's actor_id via IdCache 82 - let actor_id = crate::common::get_actor_id_or_fetch( 83 - &state.pool, 84 - &state.id_cache, 85 - &auth.0, 86 - ).await?; 75 + // Resolve authenticated user's actor_id using ProfileEntity 76 + let actor_id = state.profile_entity.resolve_identifier(&auth.0).await 77 + .map_err(|_| Error::actor_not_found(&auth.0))?; 87 78 88 79 // Parse thread root URI and resolve to post_id 89 80 let parts = form.root.strip_prefix("at://").ok_or_else(|| Error::invalid_request(Some("Invalid AT URI".into())))? ··· 99 90 let rkey_bigint = parakeet_db::tid_util::decode_tid(rkey_str) 100 91 .map_err(|_| Error::invalid_request(Some("Invalid TID in root URI".into())))?; 101 92 102 - let root_post_actor_id = crate::common::get_actor_id_or_fetch( 103 - &state.pool, 104 - &state.id_cache, 105 - root_did, 106 - ).await?; 93 + let root_post_actor_id = state.profile_entity.resolve_identifier(root_did).await 94 + .map_err(|_| Error::actor_not_found(root_did))?; 107 95 108 96 // Remove from thread_mutes array (off-protocol, managed directly by AppView) 109 97 diesel_async::RunQueryDsl::execute(
+24 -24
parakeet/src/xrpc/app_bsky/notification.rs
··· 192 192 } 193 193 } 194 194 195 - // Resolve actor_ids to DIDs using IdCache helper (with automatic database fallback) 195 + // Resolve actor_ids to DIDs using ProfileEntity 196 196 let actor_id_vec: Vec<i32> = actor_ids_to_resolve.into_iter().collect(); 197 - let actor_did_map = crate::common::get_actor_dids_or_fetch( 198 - &state.pool, 199 - &state.id_cache, 200 - &actor_id_vec, 201 - ) 202 - .await 203 - .map_err(|_| { 204 - Error::new( 205 - axum::http::StatusCode::INTERNAL_SERVER_ERROR, 197 + let profiles = state.profile_entity.get_profiles_by_ids(&actor_id_vec) 198 + .await 199 + .map_err(|_| { 200 + Error::new( 201 + axum::http::StatusCode::INTERNAL_SERVER_ERROR, 206 202 "InternalServerError".to_string(), 207 203 Some("Database error resolving actor DIDs".to_string()), 208 204 ) 209 205 })?; 206 + let actor_did_map: std::collections::HashMap<i32, String> = profiles 207 + .into_iter() 208 + .map(|actor| (actor.id, actor.did)) 209 + .collect(); 210 210 tracing::info!(" → Resolve actor_ids→DIDs (IdCache): {:.1}ms ({} actors)", resolve_start.elapsed().as_secs_f64() * 1000.0, actor_did_map.len()); 211 211 212 212 // Extract author DIDs for profile hydration ··· 275 275 let resolve_additional_start = std::time::Instant::now(); 276 276 let actor_id_vec: Vec<i32> = additional_actor_ids.into_iter().collect(); 277 277 278 - // Use IdCache helper for additional actors 279 - let additional_dids = crate::common::get_actor_dids_or_fetch( 280 - &state.pool, 281 - &state.id_cache, 282 - &actor_id_vec, 283 - ) 284 - .await 285 - .map_err(|_| { 286 - Error::new( 287 - axum::http::StatusCode::INTERNAL_SERVER_ERROR, 288 - "InternalServerError".to_string(), 289 - Some("Database error resolving parent/root actor DIDs".to_string()), 290 - ) 291 - })?; 278 + // Use ProfileEntity for additional actors 279 + let additional_profiles = state.profile_entity.get_profiles_by_ids(&actor_id_vec) 280 + .await 281 + .map_err(|_| { 282 + Error::new( 283 + axum::http::StatusCode::INTERNAL_SERVER_ERROR, 284 + "InternalServerError".to_string(), 285 + Some("Database error resolving parent/root actor DIDs".to_string()), 286 + ) 287 + })?; 288 + let additional_dids: std::collections::HashMap<i32, String> = additional_profiles 289 + .into_iter() 290 + .map(|actor| (actor.id, actor.did)) 291 + .collect(); 292 292 tracing::info!(" → Resolve parent/root actor_ids (IdCache): {:.1}ms ({} actors)", resolve_additional_start.elapsed().as_secs_f64() * 1000.0, additional_dids.len()); 293 293 294 294 // Merge into actor_did_map
+11 -8
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
··· 230 230 return Ok(Json(GetSuggestedUsersResponse { actors: Vec::new() })); 231 231 } 232 232 233 - // Convert DIDs to actor_ids for profile loading 234 - let actor_id_map = match crate::common::get_actor_ids_or_fetch( 235 - &state.pool, 236 - &state.id_cache, 237 - &all_dids, 238 - ).await { 239 - Ok(map) => map, 233 + // Convert DIDs to actor_ids for profile loading using ProfileEntity 234 + let actor_ids = match state.profile_entity.resolve_identifiers(&all_dids).await { 235 + Ok(ids) => ids, 240 236 Err(_) => return Ok(Json(GetSuggestedUsersResponse { actors: Vec::new() })), 241 237 }; 238 + 239 + // Create a mapping for compatibility with existing code 240 + let actor_id_map: std::collections::HashMap<String, i32> = all_dids 241 + .iter() 242 + .cloned() 243 + .zip(actor_ids.iter().copied()) 244 + .collect(); 242 245 243 246 let actor_ids: Vec<i32> = actor_id_map.values().copied().collect(); 244 247 ··· 563 566 // Hydrate starter packs maintaining order 564 567 let (maybe_did, maybe_actor_id) = if let Some(auth) = maybe_auth { 565 568 let did = auth.0.clone(); 566 - let actor_id = crate::common::get_actor_id_or_fetch(&state.pool, &state.id_cache, &did).await.ok(); 569 + let actor_id = state.profile_entity.resolve_identifier(&did).await.ok(); 567 570 (Some(did), actor_id) 568 571 } else { 569 572 (None, None)
+1 -1
parakeet/src/xrpc/app_bsky/unspecced/thread_v2/other_replies.rs
··· 59 59 let is_authenticated = maybe_auth.is_some(); 60 60 let (maybe_did, maybe_actor_id) = if let Some(auth) = maybe_auth { 61 61 let did = auth.0.clone(); 62 - let actor_id = crate::common::get_actor_id_or_fetch(&state.pool, &state.id_cache, &did).await.ok(); 62 + let actor_id = state.profile_entity.resolve_identifier(&did).await.ok(); 63 63 (Some(did), actor_id) 64 64 } else { 65 65 (None, None)
+1 -1
parakeet/src/xrpc/app_bsky/unspecced/thread_v2/post_thread.rs
··· 28 28 let is_authenticated = maybe_auth.is_some(); 29 29 let (maybe_did, maybe_actor_id) = if let Some(auth) = maybe_auth { 30 30 let did = auth.0.clone(); 31 - let actor_id = crate::common::get_actor_id_or_fetch(&state.pool, &state.id_cache, &did).await.ok(); 31 + let actor_id = state.profile_entity.resolve_identifier(&did).await.ok(); 32 32 (Some(did), actor_id) 33 33 } else { 34 34 (None, None)
+3 -6
parakeet/src/xrpc/community_lexicon/bookmarks.rs
··· 30 30 31 31 let limit = query.limit.unwrap_or(50).clamp(1, 100); 32 32 33 - // Resolve DID to actor_id via IdCache 34 - let actor_id = crate::common::get_actor_id_or_fetch( 35 - &state.pool, 36 - &state.id_cache, 37 - &auth.0, 38 - ).await?; 33 + // Resolve DID to actor_id using ProfileEntity 34 + let actor_id = state.profile_entity.resolve_identifier(&auth.0).await 35 + .map_err(|_| crate::common::errors::Error::actor_not_found(&auth.0))?; 39 36 40 37 // Note: tags filtering not supported in current schema 41 38 if query.tags.is_some() {