Rust AppView - highly experimental!

chore: cleanup

Changed files
+381 -631
parakeet
+52 -6
parakeet/src/entity_cache.rs
··· 123 123 124 124 /// Get multiple profiles with caching 125 125 /// 126 - /// This is the recommended way to get multiple profiles, ensuring each one 127 - /// uses the cache individually 126 + /// This is the recommended way to get multiple profiles, using batch hydration 127 + /// for all cache misses to avoid N+1 queries 128 128 pub async fn get_or_hydrate_batch( 129 129 &self, 130 130 dids: Vec<String>, ··· 132 132 id_cache: &Arc<IdCache>, 133 133 hydrator: &StatefulHydrator<'_>, 134 134 ) -> Vec<ProfileViewDetailed> { 135 - let mut profiles = Vec::with_capacity(dids.len()); 135 + let mut cached_profiles = Vec::new(); 136 + let mut uncached_requests = Vec::new(); 137 + let mut did_to_actor_id = std::collections::HashMap::new(); 136 138 139 + // First pass: check cache for all DIDs 137 140 for did in dids { 138 141 // Get actor_id for each DID 139 142 if let Ok(actor_id) = id_cache_helpers::get_actor_id_or_fetch(pool, id_cache, &did).await { 140 - if let Some(profile) = self.get_or_hydrate(actor_id, did, hydrator).await { 141 - profiles.push(profile); 143 + did_to_actor_id.insert(did.clone(), actor_id); 144 + 145 + // Check cache 146 + if let Some(profile) = self.cache.get(&actor_id).await { 147 + tracing::debug!(actor_id, "Profile cache hit in batch"); 148 + cached_profiles.push(profile); 149 + } else { 150 + // Track cache misses for batch hydration 151 + uncached_requests.push(did); 142 152 } 143 153 } 144 154 } 145 155 146 - profiles 156 + // Batch hydrate all uncached profiles at once 157 + if !uncached_requests.is_empty() { 158 + let cache_hit_count = cached_profiles.len(); 159 + let miss_count = uncached_requests.len(); 160 + let hit_rate = if cache_hit_count + miss_count > 0 { 161 + (cache_hit_count as f64 / (cache_hit_count + miss_count) as f64) * 100.0 162 + } else { 163 + 0.0 164 + }; 165 + 166 + tracing::debug!( 167 + "Profile cache batch: {} hits, {} misses ({:.1}% hit rate), hydrating misses", 168 + cache_hit_count, miss_count, hit_rate 169 + ); 170 + 171 + // Build a map of actor_id to DID for the uncached requests 172 + let uncached_actor_ids: Vec<(i32, String)> = uncached_requests 173 + .iter() 174 + .filter_map(|did| { 175 + did_to_actor_id.get(did).map(|&actor_id| (actor_id, did.clone())) 176 + }) 177 + .collect(); 178 + 179 + // Use the batch hydration method for all misses at once (passing actor_ids) 180 + let hydrated = hydrator.hydrate_profiles_detailed_by_id(uncached_actor_ids.clone()).await; 181 + 182 + // Store in cache and collect results 183 + for (actor_id, _did) in uncached_actor_ids { 184 + if let Some(profile) = hydrated.get(&actor_id) { 185 + // Cache the hydrated profile 186 + self.cache.insert(actor_id, profile.clone()).await; 187 + cached_profiles.push(profile.clone()); 188 + } 189 + } 190 + } 191 + 192 + cached_profiles 147 193 } 148 194 149 195 pub async fn invalidate(&self, actor_id: i32) {
+7 -22
parakeet/src/hydration/mod.rs
··· 176 176 labels 177 177 } 178 178 179 - /// Get labels for multiple URIs using actor_ids (optimized version - avoids decompressing actors table) 179 + /// Get labels for multiple URIs 180 180 /// 181 - /// This resolves labeler DIDs → actor_ids via IdCache first, avoiding the actors table join. 181 + /// For now, fallback to the regular load_many function until we properly extract labels from loaded data 182 182 async fn get_label_many_by_actor_ids( 183 183 &self, 184 184 uris: &[String], 185 185 ) -> HashMap<String, Vec<parakeet_db::models::Label>> { 186 - // Resolve labeler DIDs → actor_ids via IdCache 187 - let labeler_dids: Vec<String> = self 188 - .accept_labelers 189 - .iter() 190 - .map(|v| v.labeler.clone()) 191 - .collect(); 192 - 193 - let mut labeler_actor_ids = Vec::new(); 194 - for did in &labeler_dids { 195 - if let Some(cached) = self.loaders.label.id_cache().get_actor_id(did).await { 196 - labeler_actor_ids.push(cached.actor_id); 197 - } 198 - } 199 - 200 - if labeler_actor_ids.is_empty() { 201 - return HashMap::new(); 202 - } 203 - 186 + // TODO: Extract labels from already-loaded post/actor data instead of querying separately 187 + // Posts and actors already have labels field loaded 204 188 self.loaders 205 189 .label 206 - .load_many_by_actor_ids(uris, &labeler_actor_ids) 190 + .load_many(uris, &self.accept_labelers) 207 191 .await 208 192 } 209 193 210 - /// Get profile labels using actor_ids (optimized version) 194 + /// Get profile labels 211 195 async fn get_profile_label_many_by_actor_ids( 212 196 &self, 213 197 uris: &[String], ··· 218 202 .map(|did| format!("at://{did}/app.bsky.actor.profile/self")), 219 203 ); 220 204 205 + // TODO: Extract labels from already-loaded actor data instead of querying separately 221 206 let mut labels = self.get_label_many_by_actor_ids(&uris_full).await; 222 207 223 208 for did in uris {
+56 -6
parakeet/src/hydration/posts/mod.rs
··· 225 225 }) 226 226 .collect::<HashMap<String, ProfileViewBasic>>() 227 227 }; 228 + // Extract labels directly from the loaded posts instead of querying separately 228 229 let labels_future = async { 229 230 let start = std::time::Instant::now(); 230 - let result = self.get_label_many_by_actor_ids(&post_uris).await; 231 - let elapsed = start.elapsed().as_secs_f64() * 1000.0; 232 - tracing::info!(" → Labels: {:.1} ms ({} posts)", elapsed, post_uri_count); 233 - if elapsed > 10.0 { 234 - tracing::warn!(" → Slow labels: {:.1} ms", elapsed); 231 + 232 + // Build a map of URI -> labels from the posts we already loaded 233 + let mut labels_map: HashMap<String, Vec<parakeet_db::models::Label>> = HashMap::new(); 234 + 235 + // Get allowed labeler actor IDs 236 + let labeler_dids: Vec<String> = self 237 + .accept_labelers 238 + .iter() 239 + .map(|v| v.labeler.clone()) 240 + .collect(); 241 + 242 + let mut labeler_actor_ids = Vec::new(); 243 + for did in &labeler_dids { 244 + if let Some(cached) = self.loaders.label.id_cache().get_actor_id(did).await { 245 + labeler_actor_ids.push(cached.actor_id); 246 + } 235 247 } 236 - result 248 + 249 + // Extract labels from each post 250 + for (uri, (post, _, _)) in &posts_with_stats { 251 + if let Some(post_labels) = &post.post.labels { 252 + let mut labels_for_uri = Vec::new(); 253 + 254 + for label_opt in post_labels { 255 + if let Some(label) = label_opt { 256 + // Only include labels from allowed labelers 257 + if labeler_actor_ids.contains(&label.labeler_actor_id) { 258 + // Resolve labeler DID 259 + if let Some(actor_data) = self.loaders.label.id_cache().get_actor_data(label.labeler_actor_id).await { 260 + labels_for_uri.push(parakeet_db::models::Label { 261 + labeler_actor_id: label.labeler_actor_id, 262 + label: label.label.clone(), 263 + uri: uri.clone(), 264 + self_label: false, // Post labels are not self-labels 265 + cid: None, // Not stored in denormalized structure 266 + negated: label.negated, 267 + expires: label.expires, 268 + sig: None, // Not stored in denormalized structure 269 + created_at: label.created_at, 270 + labeler: actor_data.did, 271 + }); 272 + } 273 + } 274 + } 275 + } 276 + 277 + if !labels_for_uri.is_empty() { 278 + labels_map.insert(uri.clone(), labels_for_uri); 279 + } 280 + } 281 + } 282 + 283 + let elapsed = start.elapsed().as_secs_f64() * 1000.0; 284 + tracing::info!(" → Labels extracted from posts: {:.1} ms ({} posts)", elapsed, post_uri_count); 285 + 286 + labels_map 237 287 }; 238 288 let viewer_future = async { 239 289 let start = std::time::Instant::now();
+156 -21
parakeet/src/hydration/profile/mod.rs
··· 66 66 } 67 67 68 68 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 69 + // Convert DID to actor_id first 70 + let actor_id = crate::id_cache_helpers::get_actor_id_or_fetch( 71 + self.loaders.profile_state.pool(), 72 + self.loaders.profile_state.id_cache(), 73 + &did, 74 + ).await.ok()?; 75 + 69 76 let viewer = self.get_profile_viewer_state(&did).await; 70 77 let verif = self.loaders.verification.load(did.clone()).await; 71 78 let stats = self.loaders.profile_stats.load(did.clone()).await; 72 - let profile_info = self.loaders.profile.load(did.clone()).await?; 79 + let profile_info = self.loaders.profile_by_id.load(actor_id).await?; 73 80 74 81 // Extract and convert inline labels 75 82 let labels = if let Some(ref label_records) = profile_info.8 { ··· 243 250 } 244 251 245 252 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 246 - let labels = self.get_profile_label(&did).await; 253 + // Convert DID to actor_id first 254 + let actor_id = crate::id_cache_helpers::get_actor_id_or_fetch( 255 + self.loaders.profile_state.pool(), 256 + self.loaders.profile_state.id_cache(), 257 + &did, 258 + ).await.ok()?; 259 + 260 + let profile_info = self.loaders.profile_by_id.load(actor_id).await?; 261 + 262 + // Extract labels from loaded profile 263 + let labels = if let Some(ref label_records) = profile_info.8 { 264 + self.convert_actor_labels(&did, label_records).await 265 + } else { 266 + vec![] 267 + }; 268 + 247 269 let viewer = self.get_profile_viewer_state(&did).await; 248 270 let verif = self.loaders.verification.load(did.clone()).await; 249 271 let stats = self.loaders.profile_stats.load(did.clone()).await; 250 - let profile_info = self.loaders.profile.load(did).await?; 251 272 252 273 Some(build_profile( 253 274 profile_info, ··· 260 281 } 261 282 262 283 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 263 - let labels = self.get_profile_label_many(&dids).await; 284 + // Convert DIDs to actor_ids first 285 + let actor_id_map = match crate::id_cache_helpers::get_actor_ids_or_fetch( 286 + self.loaders.profile_state.pool(), 287 + self.loaders.profile_state.id_cache(), 288 + &dids, 289 + ).await { 290 + Ok(map) => map, 291 + Err(_) => return HashMap::new(), 292 + }; 293 + 294 + let actor_ids: Vec<i32> = actor_id_map.values().copied().collect(); 295 + 296 + let profiles = self.loaders.profile_by_id.load_many(actor_ids).await; 297 + 298 + // Extract labels from loaded profiles 299 + let mut labels: HashMap<String, Vec<parakeet_db::models::Label>> = HashMap::new(); 300 + for (did, _, _, _, _, _, _, _, label_records) in profiles.values() { 301 + if let Some(records) = label_records { 302 + let converted = self.convert_actor_labels(did, records).await; 303 + if !converted.is_empty() { 304 + labels.insert(did.clone(), converted); 305 + } 306 + } 307 + } 308 + 264 309 let viewers = self.get_profile_viewer_states(&dids).await; 265 310 let verif = self.loaders.verification.load_many(dids.clone()).await; 266 311 let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 267 - let profiles = self.loaders.profile.load_many(dids).await; 312 + 313 + // Convert actor_id-keyed results back to DID-keyed 314 + let id_to_did: HashMap<i32, String> = actor_id_map.iter().map(|(did, id)| (*id, did.clone())).collect(); 268 315 269 316 profiles 270 317 .into_iter() 271 - .map(|(k, profile_info)| { 272 - let labels = labels.get(&k).cloned().unwrap_or_default(); 273 - let verif = verif.get(&k).cloned(); 274 - let viewer = viewers.get(&k).cloned(); 275 - let stats = stats.get(&k).copied(); 318 + .filter_map(|(actor_id, profile_info)| { 319 + let did = id_to_did.get(&actor_id)?; 320 + let labels = labels.get(did).cloned().unwrap_or_default(); 321 + let verif = verif.get(did).cloned(); 322 + let viewer = viewers.get(did).cloned(); 323 + let stats = stats.get(did).copied(); 276 324 277 325 let v = build_profile(profile_info, stats, labels, verif, viewer, &self.cdn); 278 - (k, v) 326 + Some((did.clone(), v)) 279 327 }) 280 328 .collect() 281 329 } ··· 289 337 note = "Use ProfileCache::get_or_hydrate() to ensure caching. Direct hydration bypasses the cache." 290 338 )] 291 339 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 292 - let labels = self.get_profile_label(&did).await; 340 + // Convert DID to actor_id first 341 + let actor_id = crate::id_cache_helpers::get_actor_id_or_fetch( 342 + self.loaders.profile_state.pool(), 343 + self.loaders.profile_state.id_cache(), 344 + &did, 345 + ).await.ok()?; 346 + 347 + let profile_info = self.loaders.profile_by_id.load(actor_id).await?; 348 + 349 + // Extract labels from loaded profile 350 + let labels = if let Some(ref label_records) = profile_info.8 { 351 + self.convert_actor_labels(&did, label_records).await 352 + } else { 353 + vec![] 354 + }; 355 + 293 356 let viewer = self.get_profile_viewer_state(&did).await; 294 357 let verif = self.loaders.verification.load(did.clone()).await; 295 358 let stats = self.loaders.profile_stats.load(did.clone()).await; 296 - let profile_info = self.loaders.profile.load(did).await?; 297 359 298 360 Some(build_detailed( 299 361 profile_info, ··· 309 371 &self, 310 372 dids: Vec<String>, 311 373 ) -> HashMap<String, ProfileViewDetailed> { 312 - let labels = self.get_profile_label_many(&dids).await; 374 + // Convert DIDs to actor_ids first 375 + let actor_id_map = match crate::id_cache_helpers::get_actor_ids_or_fetch( 376 + self.loaders.profile_state.pool(), 377 + self.loaders.profile_state.id_cache(), 378 + &dids, 379 + ).await { 380 + Ok(map) => map, 381 + Err(_) => return HashMap::new(), 382 + }; 383 + 384 + let actor_ids: Vec<i32> = actor_id_map.values().copied().collect(); 385 + 386 + let profiles = self.loaders.profile_by_id.load_many(actor_ids).await; 387 + 388 + // Extract labels from loaded profiles 389 + let mut labels: HashMap<String, Vec<parakeet_db::models::Label>> = HashMap::new(); 390 + for (did, _, _, _, _, _, _, _, label_records) in profiles.values() { 391 + if let Some(records) = label_records { 392 + let converted = self.convert_actor_labels(did, records).await; 393 + if !converted.is_empty() { 394 + labels.insert(did.clone(), converted); 395 + } 396 + } 397 + } 398 + 313 399 let viewers = self.get_profile_viewer_states(&dids).await; 314 400 let verif = self.loaders.verification.load_many(dids.clone()).await; 315 401 let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 316 - let profiles = self.loaders.profile.load_many(dids).await; 402 + 403 + // Convert actor_id-keyed results back to DID-keyed 404 + let id_to_did: HashMap<i32, String> = actor_id_map.iter().map(|(did, id)| (*id, did.clone())).collect(); 317 405 318 406 profiles 319 407 .into_iter() 320 - .map(|(k, profile_info)| { 321 - let labels = labels.get(&k).cloned().unwrap_or_default(); 322 - let verif = verif.get(&k).cloned(); 323 - let viewer = viewers.get(&k).cloned(); 324 - let stats = stats.get(&k).copied(); 408 + .filter_map(|(actor_id, profile_info)| { 409 + let did = id_to_did.get(&actor_id)?; 410 + let labels = labels.get(did).cloned().unwrap_or_default(); 411 + let verif = verif.get(did).cloned(); 412 + let viewer = viewers.get(did).cloned(); 413 + let stats = stats.get(did).copied(); 325 414 326 415 let v = build_detailed(profile_info, stats, labels, verif, viewer, &self.cdn); 327 - (k, v) 416 + Some((did.clone(), v)) 417 + }) 418 + .collect() 419 + } 420 + 421 + /// Optimized version that takes actor_ids directly, avoiding DID lookups 422 + pub async fn hydrate_profiles_detailed_by_id( 423 + &self, 424 + actor_ids_with_dids: Vec<(i32, String)>, 425 + ) -> HashMap<i32, ProfileViewDetailed> { 426 + // Extract just the actor_ids for queries 427 + let actor_ids: Vec<i32> = actor_ids_with_dids.iter().map(|(id, _)| *id).collect(); 428 + let dids: Vec<String> = actor_ids_with_dids.iter().map(|(_, did)| did.clone()).collect(); 429 + 430 + // Build a map of actor_id to DID for later lookups 431 + let id_to_did: std::collections::HashMap<i32, String> = actor_ids_with_dids.into_iter().collect(); 432 + 433 + // Load data using actor_ids where possible 434 + let profiles = self.loaders.profile_by_id.load_many(actor_ids.clone()).await; 435 + let stats = self.loaders.profile_stats_by_id.load_many(&actor_ids).await; 436 + 437 + // Extract labels directly from loaded profiles 438 + let mut labels: HashMap<String, Vec<parakeet_db::models::Label>> = HashMap::new(); 439 + for (actor_id, profile_info) in &profiles { 440 + if let Some(did) = id_to_did.get(actor_id) { 441 + if let Some(ref label_records) = profile_info.8 { 442 + let converted_labels = self.convert_actor_labels(did, label_records).await; 443 + if !converted_labels.is_empty() { 444 + labels.insert(did.clone(), converted_labels); 445 + } 446 + } 447 + } 448 + } 449 + let viewers = self.get_profile_viewer_states(&dids).await; 450 + let verif = self.loaders.verification.load_many(dids).await; 451 + 452 + profiles 453 + .into_iter() 454 + .map(|(actor_id, profile_info)| { 455 + let did = id_to_did.get(&actor_id).cloned().unwrap_or_default(); 456 + let labels = labels.get(&did).cloned().unwrap_or_default(); 457 + let verif = verif.get(&did).cloned(); 458 + let viewer = viewers.get(&did).cloned(); 459 + let stats = stats.get(&actor_id).copied(); 460 + 461 + let v = build_detailed(profile_info, stats, labels, verif, viewer, &self.cdn); 462 + (actor_id, v) 328 463 }) 329 464 .collect() 330 465 }
-92
parakeet/src/loaders/labeler.rs
··· 304 304 .into_group_map_by(|v| v.uri.clone()) 305 305 } 306 306 307 - /// Load labels by URIs using actor_ids (optimized version - avoids decompressing actors table) 308 - /// 309 - /// This is an optimized version that resolves labeler DIDs → actor_ids via IdCache first, 310 - /// then queries by actor_ids directly, avoiding the 2-30ms penalty from joining the 311 - /// compressed actors table. 312 - /// 313 - /// Expected performance: 2-30ms → 0.5-2ms (5-60x faster) 314 - pub async fn load_many_by_actor_ids( 315 - &self, 316 - uris: &[String], 317 - labeler_actor_ids: &[i32], 318 - ) -> HashMap<String, Vec<models::Label>> { 319 - let mut conn = self.0.get().await.unwrap(); 320 - 321 - if labeler_actor_ids.is_empty() || uris.is_empty() { 322 - return HashMap::new(); 323 - } 324 - 325 - let uri_refs: Vec<&str> = uris.iter().map(|s| s.as_str()).collect(); 326 - 327 - #[derive(diesel::QueryableByName)] 328 - struct LabelRowById { 329 - #[diesel(sql_type = diesel::sql_types::Integer)] 330 - labeler_actor_id: i32, 331 - #[diesel(sql_type = diesel::sql_types::Text)] 332 - label: String, 333 - #[diesel(sql_type = diesel::sql_types::Text)] 334 - uri: String, 335 - #[diesel(sql_type = diesel::sql_types::Bool)] 336 - self_label: bool, 337 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] 338 - cid: Option<Vec<u8>>, 339 - #[diesel(sql_type = diesel::sql_types::Bool)] 340 - negated: bool, 341 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamptz>)] 342 - expires: Option<chrono::DateTime<chrono::Utc>>, 343 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] 344 - sig: Option<Vec<u8>>, 345 - #[diesel(sql_type = diesel::sql_types::Timestamptz)] 346 - created_at: chrono::DateTime<chrono::Utc>, 347 - } 348 - 349 - let labels: Vec<LabelRowById> = diesel_async::RunQueryDsl::load( 350 - diesel::sql_query(include_str!("../sql/labels_by_actor_ids.sql")) 351 - .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(&uri_refs) 352 - .bind::<diesel::sql_types::Array<diesel::sql_types::Integer>, _>(labeler_actor_ids), 353 - &mut conn, 354 - ) 355 - .await 356 - .unwrap_or_else(|e| { 357 - tracing::error!("label load by actor_ids failed: {e}"); 358 - vec![] 359 - }); 360 - 361 - // Collect unique labeler actor_ids to resolve 362 - let labeler_ids: std::collections::HashSet<i32> = labels 363 - .iter() 364 - .map(|row| row.labeler_actor_id) 365 - .collect(); 366 - 367 - // Batch resolve labeler actor_ids → DIDs using IdCache 368 - let labeler_ids_vec: Vec<i32> = labeler_ids.into_iter().collect(); 369 - let id_to_actor_data = self.1.get_actor_data_many(&labeler_ids_vec).await; 370 - 371 - // Build result with resolved labeler DIDs 372 - labels 373 - .into_iter() 374 - .filter_map(|row| { 375 - // Resolve labeler DID from IdCache 376 - let labeler_did = match id_to_actor_data.get(&row.labeler_actor_id) { 377 - Some(data) => data.did.clone(), 378 - None => { 379 - tracing::warn!("label: missing labeler DID for actor_id {}", row.labeler_actor_id); 380 - return None; 381 - } 382 - }; 383 - 384 - Some(models::Label { 385 - labeler_actor_id: row.labeler_actor_id, 386 - label: row.label.clone(), 387 - uri: row.uri.clone(), 388 - self_label: row.self_label, 389 - cid: row.cid, 390 - negated: row.negated, 391 - expires: row.expires, 392 - sig: row.sig, 393 - created_at: row.created_at, 394 - labeler: labeler_did, 395 - }) 396 - }) 397 - .into_group_map_by(|v| v.uri.clone()) 398 - } 399 307 400 308 /// Get the IdCache for DID → actor_id resolution 401 309 pub fn id_cache(&self) -> &parakeet_db::id_cache::IdCache {
+2 -4
parakeet/src/loaders/mod.rs
··· 20 20 pub use misc::{EnrichedStarterPack, EnrichedVerification, StarterPackKey, StarterPackLoader, StarterPackLoaderRet, VerificationLoader}; 21 21 pub use post::{EnrichedThreadgate, HydratedPost, PostLoader, PostLoaderRet, PostStateLoader, PostWithComputed}; 22 22 pub use profile::{ 23 - EnrichedStatus, HandleLoader, Profile, ProfileByIdLoader, ProfileLoader, ProfileLoaderRet, ProfileStateLoader, ProfileStatsLoader, ProfileStatsByIdLoader, 23 + EnrichedStatus, HandleLoader, Profile, ProfileByIdLoader, ProfileLoaderRet, ProfileStateLoader, ProfileStatsLoader, ProfileStatsByIdLoader, 24 24 }; 25 25 26 26 // Re-export query builder functions (for testing) ··· 29 29 pub use list::build_lists_batch_query; 30 30 pub use misc::{build_starterpack_feeds_query, build_starterpacks_batch_query, build_verifications_batch_query}; 31 31 pub use post::{build_posts_batch_query, build_posts_by_natural_keys_batch_query}; 32 - pub use profile::{build_profiles_batch_query, build_profiles_by_id_batch_query, build_statuses_batch_query}; 32 + pub use profile::build_profiles_by_id_batch_query; 33 33 34 34 type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<K, V>>; 35 35 ··· 67 67 pub like_state: LikeRecordLoader, 68 68 pub posts: CachingLoader<String, PostLoaderRet, PostLoader>, 69 69 pub post_state: PostStateLoader, 70 - pub profile: CachingLoader<String, ProfileLoaderRet, ProfileLoader>, 71 70 pub profile_by_id: CachingLoader<i32, ProfileLoaderRet, ProfileByIdLoader>, 72 71 pub profile_stats: CachingLoader<String, parakeet_db::models::ProfileStats, ProfileStatsLoader>, 73 72 pub profile_stats_by_id: ProfileStatsByIdLoader, ··· 96 95 97 96 // Occasionally changed: Profile metadata can be updated 98 97 // 1 hour TTL, 50k capacity for profiles 99 - profile: new_plc_loader(ProfileLoader(pool.clone()), "profile:", 3600, 50_000), 100 98 profile_by_id: new_plc_loader(ProfileByIdLoader(pool.clone(), id_cache.clone()), "profile_id:", 3600, 50_000), 101 99 // 1 hour TTL, 10k capacity for feeds/lists/etc 102 100 feedgen: new_plc_loader(FeedGenLoader(pool.clone()), "feedgen:", 3600, 10_000),
+30 -260
parakeet/src/loaders/profile.rs
··· 37 37 pub thumb_cid: Option<Vec<u8>>, 38 38 } 39 39 40 - /// Build SQL query for batch loading profiles with actor metadata 41 - /// 42 - /// This function is public for testing purposes. 43 - /// 44 - /// SCHEMA CHANGE: profiles, chat_decls, and notif_decl tables have been consolidated into actors table 45 - /// All data now comes from actors.profile_*, actors.chat_*, actors.notif_decl_* columns 46 - pub fn build_profiles_batch_query() -> &'static str { 47 - "SELECT 48 - a.did, 49 - a.handle, 50 - a.account_created_at, 51 - a.sync_state, 52 - a.id as actor_id, 53 - a.profile_cid as cid, 54 - a.profile_avatar_cid as avatar_cid, 55 - a.profile_banner_cid as banner_cid, 56 - a.profile_display_name as display_name, 57 - a.profile_description as description, 58 - a.profile_pinned_post_rkey as pinned_post_rkey, 59 - a.profile_joined_sp_id as joined_sp_id, 60 - a.profile_pronouns as pronouns, 61 - a.profile_website as website, 62 - a.chat_allow_incoming as allow_incoming, 63 - CASE WHEN a.labeler_cid IS NOT NULL THEN a.id ELSE NULL END as labeler_actor_id, 64 - a.notif_decl_allow_subscriptions as allow_subscriptions, 65 - a.labels 66 - FROM actors a 67 - WHERE a.did = ANY($1) 68 - AND a.status = 'active'::actor_status" 69 - } 70 - 71 - /// Build SQL query for batch loading statuses with embed URI reconstruction 72 - /// 73 - /// This function is public for testing purposes. 74 - /// 75 - /// SCHEMA CHANGE: statuses table has been consolidated into actors table 76 - /// All data now comes from actors.status_* columns 77 - pub fn build_statuses_batch_query() -> &'static str { 78 - "SELECT 79 - a.id as actor_id, 80 - a.status_cid as cid, 81 - a.status_created_at as created_at, 82 - a.status_type as status, 83 - a.status_duration as duration, 84 - a.status_embed_post_actor_id as embed_post_actor_id, 85 - a.status_embed_post_rkey as embed_post_rkey, 86 - a.status_thumb_mime_type as thumb_mime_type, 87 - a.status_thumb_cid as thumb_cid, 88 - a.did, 89 - (SELECT 'at://' || emb_a.did || '/app.bsky.feed.post/' || i64_to_tid(emb_p.rkey) 90 - FROM posts emb_p 91 - INNER JOIN actors emb_a ON emb_p.actor_id = emb_a.id 92 - WHERE emb_p.actor_id = a.status_embed_post_actor_id AND emb_p.rkey = a.status_embed_post_rkey) as embed_uri 93 - FROM actors a 94 - WHERE a.did = ANY($1) 95 - AND a.status_cid IS NOT NULL" 96 - } 97 40 98 41 // Enriched Status with reconstructed fields 99 42 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] ··· 172 115 } 173 116 } 174 117 118 + /// ProfileLoaderRet is the return type for both ProfileByIdLoader 119 + /// 120 + /// Tuple format: 121 + /// - String: did - always present 122 + /// - Option<String>: handle - from actors table 123 + /// - Option<DateTime>: account_created_at - from actors table 124 + /// - Option<Profile>: profile - optional (may not exist) 125 + /// - Option<ChatAllowIncoming>: chat declaration 126 + /// - bool: is_labeler 127 + /// - Option<EnrichedStatus>: status 128 + /// - Option<ProfileAllowSubscriptions>: notification subscription settings 129 + /// - Option<Vec<ActorLabelRecord>>: labels - from actors table 130 + pub type ProfileLoaderRet = ( 131 + String, // did - always present (from actors table) 132 + Option<String>, // handle - from actors table 133 + Option<chrono::DateTime<chrono::Utc>>, // account_created_at - from actors table 134 + Option<Profile>, // profile - optional (may not exist) 135 + Option<ChatAllowIncoming>, 136 + bool, 137 + Option<EnrichedStatus>, 138 + Option<ProfileAllowSubscriptions>, 139 + Option<Vec<parakeet_db::composite_types::ActorLabelRecord>>, // labels - from actors table 140 + ); 141 + 175 142 pub struct HandleLoader(pub(super) Pool<AsyncPgConnection>); 176 143 impl BatchFn<String, String> for HandleLoader { 177 144 async fn load(&mut self, keys: &[String]) -> HashMap<String, String> { ··· 198 165 } 199 166 } 200 167 201 - pub struct ProfileLoader(pub(super) Pool<AsyncPgConnection>); 202 - pub type ProfileLoaderRet = ( 203 - String, // did - always present (from actors table) 204 - Option<String>, // handle - from actors table 205 - Option<chrono::DateTime<chrono::Utc>>, // account_created_at - from actors table 206 - Option<Profile>, // profile - optional (may not exist) 207 - Option<ChatAllowIncoming>, 208 - bool, 209 - Option<EnrichedStatus>, 210 - Option<ProfileAllowSubscriptions>, 211 - Option<Vec<parakeet_db::composite_types::ActorLabelRecord>>, // labels - from actors table 212 - ); 213 - impl BatchFn<String, ProfileLoaderRet> for ProfileLoader { 214 - async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> { 215 - let overall_start = std::time::Instant::now(); 216 - let mut conn = self.0.get().await.unwrap(); 217 - 218 - // Load basic actor/profile data with raw SQL to avoid Diesel DSL join complexity 219 - let dids: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); 220 - 221 - #[derive(diesel::QueryableByName)] 222 - #[allow(dead_code, reason = "Diesel QueryableByName requires all SQL columns even if unused")] 223 - struct ActorRow { 224 - #[diesel(sql_type = diesel::sql_types::Text)] 225 - did: String, 226 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 227 - handle: Option<String>, 228 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamptz>)] 229 - account_created_at: Option<chrono::DateTime<chrono::Utc>>, 230 - #[diesel(sql_type = parakeet_db::schema::sql_types::ActorSyncState)] 231 - sync_state: parakeet_db::types::ActorSyncState, 232 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)] 233 - actor_id: Option<i32>, 234 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] 235 - cid: Option<Vec<u8>>, 236 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] 237 - avatar_cid: Option<Vec<u8>>, 238 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] 239 - banner_cid: Option<Vec<u8>>, 240 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 241 - display_name: Option<String>, 242 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 243 - description: Option<String>, 244 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)] 245 - pinned_post_rkey: Option<i64>, 246 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)] 247 - joined_sp_id: Option<i64>, 248 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 249 - pronouns: Option<String>, 250 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 251 - website: Option<String>, 252 - #[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::ChatAllowIncoming>)] 253 - allow_incoming: Option<parakeet_db::types::ChatAllowIncoming>, 254 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)] 255 - labeler_actor_id: Option<i32>, 256 - #[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::NotifAllowSubscriptions>)] 257 - allow_subscriptions: Option<parakeet_db::types::NotifAllowSubscriptions>, 258 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Array<parakeet_db::schema::sql_types::ActorLabel>>)] 259 - labels: Option<Vec<parakeet_db::composite_types::ActorLabelRecord>>, 260 - } 261 - 262 - let profile_query_start = std::time::Instant::now(); 263 - let res: Result<Vec<ActorRow>, _> = diesel_async::RunQueryDsl::load( 264 - diesel::sql_query(build_profiles_batch_query()) 265 - .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(&dids), 266 - &mut conn, 267 - ) 268 - .await; 269 - let profile_query_time = profile_query_start.elapsed().as_secs_f64() * 1000.0; 270 - 271 - match res { 272 - Ok(res) => { 273 - let profile_count = res.len(); 274 - 275 - // Load enriched statuses separately with raw SQL 276 - let status_dids: Vec<String> = res.iter().map(|row| row.did.clone()).collect(); 277 - let status_query_start = std::time::Instant::now(); 278 - let status_map = if !status_dids.is_empty() { 279 - let status_dids_refs: Vec<&str> = status_dids.iter().map(|s| s.as_str()).collect(); 280 - 281 - #[derive(diesel::QueryableByName)] 282 - struct StatusRow { 283 - #[diesel(sql_type = diesel::sql_types::Integer)] 284 - actor_id: i32, 285 - #[diesel(sql_type = diesel::sql_types::Binary)] 286 - cid: Vec<u8>, 287 - #[diesel(sql_type = diesel::sql_types::Timestamptz)] 288 - created_at: chrono::DateTime<chrono::Utc>, 289 - #[diesel(sql_type = parakeet_db::schema::sql_types::StatusType)] 290 - status: parakeet_db::types::StatusType, 291 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)] 292 - duration: Option<i32>, 293 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)] 294 - embed_post_actor_id: Option<i32>, 295 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)] 296 - embed_post_rkey: Option<i64>, 297 - #[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::ImageMimeType>)] 298 - thumb_mime_type: Option<parakeet_db::types::ImageMimeType>, 299 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] 300 - thumb_cid: Option<Vec<u8>>, 301 - #[diesel(sql_type = diesel::sql_types::Text)] 302 - did: String, 303 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 304 - embed_uri: Option<String>, 305 - } 306 - 307 - let statuses: Vec<StatusRow> = diesel_async::RunQueryDsl::load( 308 - diesel::sql_query(build_statuses_batch_query()) 309 - .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(&status_dids_refs), 310 - &mut conn, 311 - ) 312 - .await 313 - .unwrap_or_default(); 314 - 315 - statuses 316 - .into_iter() 317 - .map(|row| { 318 - // Reconstruct the status record 319 - let record = build_status_record( 320 - &row.status, 321 - &row.duration, 322 - &row.embed_uri, 323 - &None, // embed_title - not currently loaded 324 - &None, // embed_description - not currently loaded 325 - &row.thumb_mime_type, 326 - &row.thumb_cid, 327 - &row.created_at, 328 - ); 329 - 330 - let enriched = EnrichedStatus { 331 - status: Status { 332 - actor_id: row.actor_id, 333 - cid: row.cid, 334 - created_at: row.created_at, 335 - status: row.status, 336 - duration: row.duration, 337 - embed_post_actor_id: row.embed_post_actor_id, 338 - embed_post_rkey: row.embed_post_rkey, 339 - thumb_mime_type: row.thumb_mime_type, 340 - thumb_cid: row.thumb_cid, 341 - }, 342 - did: row.did.clone(), 343 - created_at: row.created_at, 344 - record, 345 - embed_uri: row.embed_uri, 346 - embed_title: None, 347 - embed_description: None, 348 - }; 349 - (row.did, enriched) 350 - }) 351 - .collect::<HashMap<_, _>>() 352 - } else { 353 - HashMap::new() 354 - }; 355 - let status_query_time = status_query_start.elapsed().as_secs_f64() * 1000.0; 356 - let status_count = status_map.len(); 357 - 358 - let results: HashMap<String, ProfileLoaderRet> = HashMap::from_iter(res.into_iter().map( 359 - |row| { 360 - // Construct Profile if actor_id is present (cid can be None for empty profiles) 361 - let profile = row.actor_id.map(|actor_id| Profile { 362 - actor_id, 363 - cid: row.cid, 364 - avatar_cid: row.avatar_cid, 365 - banner_cid: row.banner_cid, 366 - display_name: row.display_name, 367 - description: row.description, 368 - pinned_post_rkey: row.pinned_post_rkey, 369 - joined_sp_id: row.joined_sp_id, 370 - pronouns: row.pronouns, 371 - website: row.website, 372 - }); 373 - 374 - let chat_decl = row.allow_incoming.and_then(|v| ChatAllowIncoming::from_str(&v.to_string()).ok()); 375 - let notif_decl = row.allow_subscriptions.and_then(|v| ProfileAllowSubscriptions::from_str(&v.to_string()).ok()); 376 - let is_labeler = row.labeler_actor_id.is_some(); 377 - let status = status_map.get(&row.did).cloned(); 378 - 379 - let val = (row.did.clone(), row.handle, row.account_created_at, profile, chat_decl, is_labeler, status, notif_decl, row.labels); 380 - 381 - (row.did, val) 382 - }, 383 - )); 384 - 385 - let overall_time = overall_start.elapsed().as_secs_f64() * 1000.0; 386 - 387 - if overall_time > 15.0 || profile_query_time > 10.0 || status_query_time > 5.0 { 388 - tracing::info!( 389 - " → ProfileLoader: {:.1}ms total ({} profiles, {} statuses) | profile_query: {:.1}ms, status_query: {:.1}ms", 390 - overall_time, profile_count, status_count, profile_query_time, status_query_time 391 - ); 392 - } 393 - 394 - results 395 - } 396 - Err(e) => { 397 - tracing::error!("profile load failed: {e}"); 398 - HashMap::new() 399 - } 400 - } 401 - } 402 - } 403 168 404 169 /// Build SQL query for batch loading profiles by actor_id (uses consolidated actors table) 405 170 /// ··· 878 643 879 644 pub struct ProfileStateLoader(pub(super) Pool<AsyncPgConnection>, pub(super) std::sync::Arc<parakeet_db::id_cache::IdCache>); 880 645 impl ProfileStateLoader { 646 + /// Get the connection pool 647 + pub fn pool(&self) -> &Pool<AsyncPgConnection> { 648 + &self.0 649 + } 650 + 881 651 /// Get single profile state (DID-based interface, internally uses actor_ids) 882 652 pub async fn get(&self, viewer_did: &str, subject_did: &str) -> Option<db::ProfileStateRet> { 883 653 let results = self.get_many(viewer_did, &vec![subject_did.to_string()]).await; ··· 978 748 } 979 749 980 750 /// Get the IdCache for DID → actor_id resolution 981 - pub fn id_cache(&self) -> &parakeet_db::id_cache::IdCache { 751 + pub fn id_cache(&self) -> &std::sync::Arc<parakeet_db::id_cache::IdCache> { 982 752 &self.1 983 753 } 984 754 }
-57
parakeet/src/sql/labels_by_actor_ids.sql
··· 1 - -- Phase 7: Labels denormalized into actors.labels[] and posts.labels[] arrays 2 - -- Optimized labels query that uses actor_ids instead of DIDs 3 - -- This avoids decompressing the actors table by using IdCache to resolve DIDs beforehand 4 - -- 5 - -- Parameters: 6 - -- $1: uris (text[]) - Array of URIs to fetch labels for 7 - -- $2: labeler_actor_ids (integer[]) - Array of allowed labeler actor IDs 8 - -- 9 - -- Performance: Maintained similar performance with denormalized structure 10 - -- Avoids: Separate labels table join 11 - -- 12 - -- Note: Caller must use IdCache to: 13 - -- 1. Resolve labeler DIDs → actor_ids before this query 14 - -- 2. Resolve labeler_actor_id → DID after this query 15 - -- 16 - -- Note: The new composite types (actor_label, post_label) do not include: 17 - -- - uri (reconstructed from actor/post) 18 - -- - self_label (always false for post labels, always true for actor self-labels) 19 - -- - cid (not stored in denormalized structure) 20 - -- - sig (not stored in denormalized structure) 21 - 22 - -- Extract labels from actors table 23 - SELECT 24 - (label_record).labeler_actor_id as labeler_actor_id, 25 - (label_record).label as label, 26 - 'at://' || a.did as uri, 27 - true as self_label, -- Actor labels are always self-labels 28 - NULL::bytea as cid, -- Not stored in denormalized structure 29 - (label_record).negated as negated, 30 - (label_record).expires as expires, 31 - NULL::bytea as sig, -- Not stored in denormalized structure 32 - (label_record).created_at as created_at 33 - FROM actors a, UNNEST(a.labels) as label_record 34 - WHERE 'at://' || a.did = ANY($1) 35 - AND (label_record).negated = false 36 - AND ((label_record).labeler_actor_id = a.id OR (label_record).labeler_actor_id = ANY($2)) 37 - 38 - UNION ALL 39 - 40 - -- Extract labels from posts table 41 - SELECT 42 - (label_record).labeler_actor_id as labeler_actor_id, 43 - (label_record).label as label, 44 - 'at://' || a.did || '/app.bsky.feed.post/' || parakeet_db.i64_to_tid(p.rkey) as uri, 45 - false as self_label, -- Post labels are not self-labels 46 - NULL::bytea as cid, -- Not stored in denormalized structure 47 - (label_record).negated as negated, 48 - (label_record).expires as expires, 49 - NULL::bytea as sig, -- Not stored in denormalized structure 50 - (label_record).created_at as created_at 51 - FROM posts p 52 - INNER JOIN actors a ON p.actor_id = a.id, UNNEST(p.labels) as label_record 53 - WHERE 'at://' || a.did || '/app.bsky.feed.post/' || parakeet_db.i64_to_tid(p.rkey) = ANY($1) 54 - AND (label_record).negated = false 55 - AND (label_record).labeler_actor_id = ANY($2) 56 - 57 - ORDER BY created_at
+45 -5
parakeet/src/xrpc/app_bsky/actor.rs
··· 9 9 use axum_extra::extract::Query as ExtraQuery; 10 10 use lexica::app_bsky::actor::ProfileViewDetailed; 11 11 use serde::{Deserialize, Serialize}; 12 + use std::collections::HashMap; 12 13 13 14 #[derive(Debug, Deserialize)] 14 15 pub struct ActorQuery { ··· 190 191 (None, None) 191 192 }; 192 193 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 193 - let mut profiles_map = hyd.hydrate_profiles_detailed(dids.clone()).await; 194 + 195 + // Use the cache's batch method instead of direct hydration 196 + let profiles_vec = state.profile_cache 197 + .get_or_hydrate_batch(dids.clone(), &state.pool, &state.id_cache, &hyd) 198 + .await; 199 + 200 + // Convert Vec to HashMap for compatibility with existing code 201 + let mut profiles_map: std::collections::HashMap<String, ProfileViewDetailed> = profiles_vec 202 + .into_iter() 203 + .map(|profile| (profile.did.clone(), profile)) 204 + .collect(); 194 205 195 206 // Maintain search result order (hydration returns HashMap) 196 207 let actors: Vec<ProfileViewDetailed> = dids ··· 264 275 (None, None) 265 276 }; 266 277 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 267 - let mut profiles_map = hyd.hydrate_profiles_detailed(dids.clone()).await; 278 + 279 + // Use the cache's batch method instead of direct hydration 280 + let profiles_vec = state.profile_cache 281 + .get_or_hydrate_batch(dids.clone(), &state.pool, &state.id_cache, &hyd) 282 + .await; 283 + 284 + // Convert Vec to HashMap for compatibility with existing code 285 + let mut profiles_map: std::collections::HashMap<String, ProfileViewDetailed> = profiles_vec 286 + .into_iter() 287 + .map(|profile| (profile.did.clone(), profile)) 288 + .collect(); 268 289 269 290 // Maintain priority order 270 291 let actors: Vec<ProfileViewDetailed> = dids ··· 325 346 .into_response()); 326 347 } 327 348 349 + // Convert DIDs to actor_ids for efficient profile loading 350 + let did_to_actor_id = crate::id_cache_helpers::get_actor_ids_or_fetch( 351 + &state.pool, 352 + &state.id_cache, 353 + &all_dids, 354 + ).await.unwrap_or_default(); 355 + 356 + let actor_ids: Vec<i32> = did_to_actor_id.values().copied().collect(); 357 + 328 358 // Load profiles to check quality (already sorted by follower count from SQL) 329 - let profiles = state 359 + let profiles_by_id = state 330 360 .dataloaders 331 - .profile 332 - .load_many(all_dids.clone()) 361 + .profile_by_id 362 + .load_many(actor_ids) 333 363 .await; 364 + 365 + // Convert back to DID-keyed map for compatibility 366 + let profiles: HashMap<String, _> = profiles_by_id 367 + .into_iter() 368 + .filter_map(|(actor_id, profile)| { 369 + did_to_actor_id.iter() 370 + .find(|(_, id)| **id == actor_id) 371 + .map(|(did, _)| (did.clone(), profile)) 372 + }) 373 + .collect(); 334 374 335 375 // Filter by quality, maintaining follower-count order from SQL query 336 376 let ranked_dids: Vec<String> = all_dids
+22 -1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
··· 250 250 return Ok(Json(GetSuggestedUsersResponse { actors: Vec::new() })); 251 251 } 252 252 253 + // Convert DIDs to actor_ids for profile loading 254 + let actor_id_map = match crate::id_cache_helpers::get_actor_ids_or_fetch( 255 + &state.pool, 256 + &state.id_cache, 257 + &all_dids, 258 + ).await { 259 + Ok(map) => map, 260 + Err(_) => return Ok(Json(GetSuggestedUsersResponse { actors: Vec::new() })), 261 + }; 262 + 263 + let actor_ids: Vec<i32> = actor_id_map.values().copied().collect(); 264 + 253 265 // Load profiles to check quality (already sorted by follower count from SQL) 254 - let profiles = state.dataloaders.profile.load_many(all_dids.clone()).await; 266 + let profiles_by_id = state.dataloaders.profile_by_id.load_many(actor_ids).await; 267 + 268 + // Convert actor_id-keyed profiles back to DID-keyed for easier lookup 269 + let id_to_did: std::collections::HashMap<i32, String> = actor_id_map.iter().map(|(did, id)| (*id, did.clone())).collect(); 270 + let profiles: std::collections::HashMap<String, _> = profiles_by_id 271 + .into_iter() 272 + .filter_map(|(actor_id, profile_info)| { 273 + id_to_did.get(&actor_id).map(|did| (did.clone(), profile_info)) 274 + }) 275 + .collect(); 255 276 256 277 // Filter by quality, maintaining follower-count order from SQL query 257 278 let ranked_dids: Vec<String> = all_dids
+3 -129
parakeet/tests/loaders_test.rs
··· 15 15 // ============================================================================ 16 16 // PROFILE LOADER TESTS 17 17 // ============================================================================ 18 - // ProfileLoader uses complex SQL with: 19 - // - Queries consolidated actors table (profile_*, chat_*, notif_decl_* columns) 20 - // - LEFT JOIN to labelers table (still separate) 21 - // - Format string for DID list interpolation 22 - // - Separate status query with embed reconstruction 23 - 24 - #[tokio::test] 25 - async fn test_profile_loader_main_query_empty() -> eyre::Result<()> { 26 - common::ensure_test_db_ready().await; 27 - let pool = common::test_diesel_pool(); 28 - let _conn = pool.get().await.wrap_err("Failed to get connection")?; 29 - 30 - // Simulate ProfileLoader's main query with empty DID list 31 - let dids: Vec<String> = vec![]; 32 - 33 - if dids.is_empty() { 34 - // Empty case - should handle gracefully 35 - return Ok(()); 36 - } 37 - 38 - // This would be the actual query, but it won't execute with empty DIDs 39 - // Just verifying the logic handles it 40 - Ok(()) 41 - } 42 - 43 - #[tokio::test] 44 - async fn test_profile_loader_main_query_structure() -> eyre::Result<()> { 45 - common::ensure_test_db_ready().await; 46 - let pool = common::test_diesel_pool(); 47 - let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 48 - 49 - let dids = ["did:plc:test1".to_string(), "did:plc:test2".to_string()]; 50 - 51 - // Build the query using the actual ProfileLoader query builder (no SQL duplication!) 52 - let dids_refs: Vec<&str> = dids.iter().map(|s| s.as_str()).collect(); 53 - 54 - let query = parakeet::loaders::build_profiles_batch_query(); 55 - 56 - #[derive(diesel::QueryableByName)] 57 - #[allow(dead_code)] 58 - struct ActorRow { 59 - #[diesel(sql_type = diesel::sql_types::Text)] 60 - did: String, 61 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 62 - handle: Option<String>, 63 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)] 64 - actor_id: Option<i32>, 65 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] 66 - cid: Option<Vec<u8>>, 67 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamptz>)] 68 - created_at: Option<chrono::DateTime<chrono::Utc>>, 69 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] 70 - avatar_cid: Option<Vec<u8>>, 71 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] 72 - banner_cid: Option<Vec<u8>>, 73 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 74 - display_name: Option<String>, 75 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 76 - description: Option<String>, 77 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)] 78 - pinned_post_id: Option<i64>, 79 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)] 80 - joined_sp_id: Option<i64>, 81 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 82 - pronouns: Option<String>, 83 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 84 - website: Option<String>, 85 - #[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::ChatAllowIncoming>)] 86 - allow_incoming: Option<parakeet_db::types::ChatAllowIncoming>, 87 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)] 88 - labeler_actor_id: Option<i32>, 89 - #[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::NotifAllowSubscriptions>)] 90 - allow_subscriptions: Option<parakeet_db::types::NotifAllowSubscriptions>, 91 - } 92 - 93 - diesel::sql_query(query) 94 - .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(&dids_refs) 95 - .load::<ActorRow>(&mut conn) 96 - .await 97 - .wrap_err("ProfileLoader main query SQL failed")?; 98 - 99 - Ok(()) 100 - } 101 - 102 - #[tokio::test] 103 - async fn test_profile_loader_status_query_structure() -> eyre::Result<()> { 104 - common::ensure_test_db_ready().await; 105 - let pool = common::test_diesel_pool(); 106 - let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 107 - 108 - let dids = ["did:plc:test1".to_string()]; 109 - let dids_refs: Vec<&str> = dids.iter().map(|s| s.as_str()).collect(); 110 - 111 - // Use the actual status query builder from ProfileLoader (no SQL duplication!) 112 - let status_query = parakeet::loaders::build_statuses_batch_query(); 113 - 114 - #[derive(diesel::QueryableByName)] 115 - #[allow(dead_code)] 116 - struct StatusRow { 117 - #[diesel(sql_type = diesel::sql_types::Integer)] 118 - actor_id: i32, 119 - #[diesel(sql_type = diesel::sql_types::Binary)] 120 - cid: Vec<u8>, 121 - #[diesel(sql_type = diesel::sql_types::Timestamptz)] 122 - created_at: chrono::DateTime<chrono::Utc>, 123 - #[diesel(sql_type = parakeet_db::schema::sql_types::StatusType)] 124 - status: parakeet_db::types::StatusType, 125 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)] 126 - duration: Option<i32>, 127 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)] 128 - embed_post_id: Option<i64>, 129 - #[diesel(sql_type = diesel::sql_types::Nullable<parakeet_db::schema::sql_types::ImageMimeType>)] 130 - thumb_mime_type: Option<parakeet_db::types::ImageMimeType>, 131 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] 132 - thumb_cid: Option<Vec<u8>>, 133 - #[diesel(sql_type = diesel::sql_types::Text)] 134 - did: String, 135 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 136 - embed_uri: Option<String>, 137 - } 138 - 139 - diesel::sql_query(status_query) 140 - .bind::<diesel::sql_types::Array<diesel::sql_types::Text>, _>(&dids_refs) 141 - .load::<StatusRow>(&mut conn) 142 - .await 143 - .wrap_err("ProfileLoader status query SQL failed")?; 144 - 145 - Ok(()) 146 - } 18 + // ProfileByIdLoader uses optimized SQL querying by actor_id (primary key) 19 + // instead of DID (btree index). Queries consolidated actors table with 20 + // profile_*, chat_*, notif_decl_*, and status_* columns. 147 21 148 22 #[tokio::test] 149 23 async fn test_profile_by_id_loader_query_structure() -> eyre::Result<()> {
+8 -28
parakeet/tests/sql/batch_loading_test.rs
··· 49 49 // NOTE: Threadgate loading test removed - threadgates are now loaded inline with posts 50 50 // in build_posts_batch_query() to eliminate a separate database roundtrip 51 51 52 - /// Test batch loading profiles (from loaders/profile.rs) 53 - #[tokio::test] 54 - async fn test_batch_profiles_query() -> eyre::Result<()> { 55 - common::ensure_test_db_ready().await; 56 - let pool = common::test_diesel_pool(); 57 - let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 58 - 59 - // Use the ACTUAL query builder from the source code 60 - let query = parakeet::loaders::build_profiles_batch_query(); 61 - let dids = vec!["did:plc:test1", "did:plc:test2"]; 62 - 63 - diesel_async::RunQueryDsl::execute( 64 - diesel::sql_query(query) 65 - .bind::<Array<diesel::sql_types::Text>, _>(&dids), 66 - &mut conn, 67 - ) 68 - .await 69 - .wrap_err("Batch profile loading query failed")?; 70 - 71 - Ok(()) 72 - } 73 - 74 - /// Test batch loading statuses (from loaders/profile.rs) 52 + /// Test batch loading profiles by ID (from loaders/profile.rs) 53 + /// Note: The old DID-based profile/status loaders have been removed as they were 54 + /// inefficient (required btree index lookups). ProfileByIdLoader uses primary key lookups. 75 55 #[tokio::test] 76 - async fn test_batch_statuses_query() -> eyre::Result<()> { 56 + async fn test_batch_profiles_by_id_query() -> eyre::Result<()> { 77 57 common::ensure_test_db_ready().await; 78 58 let pool = common::test_diesel_pool(); 79 59 let mut conn = pool.get().await.wrap_err("Failed to get connection")?; 80 60 81 61 // Use the ACTUAL query builder from the source code 82 - let query = parakeet::loaders::build_statuses_batch_query(); 83 - let dids = vec!["did:plc:test1"]; 62 + let query = parakeet::loaders::build_profiles_by_id_batch_query(); 63 + let actor_ids = vec![1i32, 2i32]; 84 64 85 65 diesel_async::RunQueryDsl::execute( 86 66 diesel::sql_query(query) 87 - .bind::<Array<diesel::sql_types::Text>, _>(&dids), 67 + .bind::<Array<diesel::sql_types::Integer>, _>(&actor_ids), 88 68 &mut conn, 89 69 ) 90 70 .await 91 - .wrap_err("Batch status loading query failed")?; 71 + .wrap_err("Batch profile loading by ID query failed")?; 92 72 93 73 Ok(()) 94 74 }