Rust AppView - highly experimental!
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add account_created_at

+214 -36
+1
Cargo.lock
··· 1009 1009 name = "did-resolver" 1010 1010 version = "0.1.0" 1011 1011 dependencies = [ 1012 + "chrono", 1012 1013 "hickory-resolver", 1013 1014 "reqwest", 1014 1015 "serde",
+2
consumer/src/database_writer/operations/executor.rs
··· 573 573 status, 574 574 sync_state, 575 575 handle, 576 + account_created_at, 576 577 timestamp, 577 578 } => { 578 579 db::actor_upsert( ··· 581 582 status.as_ref(), 582 583 &sync_state, 583 584 handle.as_deref(), 585 + account_created_at.as_ref(), 584 586 timestamp, 585 587 ) 586 588 .await?;
+1
consumer/src/database_writer/operations/types.rs
··· 121 121 status: Option<parakeet_db::types::ActorStatus>, 122 122 sync_state: parakeet_db::types::ActorSyncState, 123 123 handle: Option<String>, 124 + account_created_at: Option<DateTime<Utc>>, 124 125 timestamp: DateTime<Utc>, 125 126 }, 126 127
+78 -6
consumer/src/db/actor.rs
··· 11 11 status: Option<&ActorStatus>, 12 12 sync_state: &ActorSyncState, 13 13 handle: Option<&str>, 14 + account_created_at: Option<&DateTime<Utc>>, 14 15 time: DateTime<Utc>, 15 16 ) -> Result<u64> { 16 17 // Allow allowlist states (synced, dirty, processing) to flow freely 17 18 // Allow upgrading from partial to allowlist states 18 19 // Never downgrade from allowlist states to partial 20 + // 21 + // Account created_at is updated if provided and current value is NULL 22 + // This allows enrichment during handle resolution without overwriting existing values 19 23 20 - match (status, handle) { 21 - (Some(status), Some(handle)) => { 22 - // Both status and handle provided 24 + match (status, handle, account_created_at) { 25 + (Some(status), Some(handle), Some(created_at)) => { 26 + // All three provided 27 + conn.execute( 28 + "INSERT INTO actors (did, status, handle, sync_state, account_created_at, last_indexed) VALUES ($1, $2, $3, $4, $5, $6) 29 + ON CONFLICT (did) DO UPDATE SET 30 + status=EXCLUDED.status, 31 + handle=EXCLUDED.handle, 32 + sync_state=CASE 33 + WHEN actors.sync_state IN ('synced', 'dirty', 'processing') AND EXCLUDED.sync_state = 'partial' 34 + THEN actors.sync_state 35 + ELSE EXCLUDED.sync_state 36 + END, 37 + account_created_at=COALESCE(actors.account_created_at, EXCLUDED.account_created_at), 38 + last_indexed=EXCLUDED.last_indexed", 39 + &[&did, &status, &handle, &sync_state, &created_at, &time], 40 + ) 41 + .await 42 + } 43 + (Some(status), Some(handle), None) => { 44 + // Status and handle, no created_at 23 45 conn.execute( 24 46 "INSERT INTO actors (did, status, handle, sync_state, last_indexed) VALUES ($1, $2, $3, $4, $5) 25 47 ON CONFLICT (did) DO UPDATE SET ··· 35 57 ) 36 58 .await 37 59 } 38 - (Some(status), None) => { 60 + (Some(status), None, Some(created_at)) => { 61 + // Status and created_at, no handle 62 + conn.execute( 63 + "INSERT INTO actors (did, status, sync_state, account_created_at, last_indexed) VALUES ($1, $2, $3, $4, $5) 64 + ON CONFLICT (did) DO UPDATE SET 65 + status=EXCLUDED.status, 66 + sync_state=CASE 67 + WHEN actors.sync_state IN ('synced', 'dirty', 'processing') AND EXCLUDED.sync_state = 'partial' 68 + THEN actors.sync_state 69 + ELSE EXCLUDED.sync_state 70 + END, 71 + account_created_at=COALESCE(actors.account_created_at, EXCLUDED.account_created_at), 72 + last_indexed=EXCLUDED.last_indexed", 73 + &[&did, &status, &sync_state, &created_at, &time], 74 + ) 75 + .await 76 + } 77 + (Some(status), None, None) => { 39 78 // Only status provided 40 79 conn.execute( 41 80 "INSERT INTO actors (did, status, sync_state, last_indexed) VALUES ($1, $2, $3, $4) ··· 51 90 ) 52 91 .await 53 92 } 54 - (None, Some(handle)) => { 93 + (None, Some(handle), Some(created_at)) => { 94 + // Handle and created_at, no status 95 + conn.execute( 96 + "INSERT INTO actors (did, handle, sync_state, account_created_at, last_indexed) VALUES ($1, $2, $3, $4, $5) 97 + ON CONFLICT (did) DO UPDATE SET 98 + handle=EXCLUDED.handle, 99 + sync_state=CASE 100 + WHEN actors.sync_state IN ('synced', 'dirty', 'processing') AND EXCLUDED.sync_state = 'partial' 101 + THEN actors.sync_state 102 + ELSE EXCLUDED.sync_state 103 + END, 104 + account_created_at=COALESCE(actors.account_created_at, EXCLUDED.account_created_at), 105 + last_indexed=EXCLUDED.last_indexed", 106 + &[&did, &handle, &sync_state, &created_at, &time], 107 + ) 108 + .await 109 + } 110 + (None, Some(handle), None) => { 55 111 // Only handle provided 56 112 conn.execute( 57 113 "INSERT INTO actors (did, handle, sync_state, last_indexed) VALUES ($1, $2, $3, $4) ··· 67 123 ) 68 124 .await 69 125 } 70 - (None, None) => { 126 + (None, None, Some(created_at)) => { 127 + // Only created_at provided 128 + conn.execute( 129 + "INSERT INTO actors (did, sync_state, account_created_at, last_indexed) VALUES ($1, $2, $3, $4) 130 + ON CONFLICT (did) DO UPDATE SET 131 + sync_state=CASE 132 + WHEN actors.sync_state IN ('synced', 'dirty', 'processing') AND EXCLUDED.sync_state = 'partial' 133 + THEN actors.sync_state 134 + ELSE EXCLUDED.sync_state 135 + END, 136 + account_created_at=COALESCE(actors.account_created_at, EXCLUDED.account_created_at), 137 + last_indexed=EXCLUDED.last_indexed", 138 + &[&did, &sync_state, &created_at, &time], 139 + ) 140 + .await 141 + } 142 + (None, None, None) => { 71 143 // Neither provided - just ensure actor exists with sync_state 72 144 conn.execute( 73 145 "INSERT INTO actors (did, sync_state, last_indexed) VALUES ($1, $2, $3)
+1
consumer/src/db/allowlist.rs
··· 172 172 Some(&ActorStatus::Active), 173 173 &ActorSyncState::Dirty, 174 174 None, // no handle 175 + None, // no account_created_at (enriched during handle resolution) 175 176 now, 176 177 ) 177 178 .await
+46 -7
consumer/src/workers/handle_resolver.rs
··· 118 118 let cache_key = format!("did_handle:{}", did); 119 119 let cached_handle: Option<String> = self.redis.get(&cache_key).await?; 120 120 121 - let handle = if let Some(cached) = cached_handle { 121 + let (handle, account_created_at) = if let Some(cached) = cached_handle { 122 122 counter!("handle_resolver.cache_hit").increment(1); 123 - Some(cached) 123 + // Cache hit - we have the handle but not the account creation time 124 + // Fetch account creation time separately (not cached, but only fetched once per actor typically) 125 + let created_at = match self.resolver.get_plc_creation_time(did).await { 126 + Ok(Some(timestamp)) => { 127 + counter!("handle_resolver.plc_creation_fetch_success").increment(1); 128 + Some(timestamp) 129 + } 130 + Ok(None) => { 131 + debug!("No PLC creation time found for {}", did); 132 + counter!("handle_resolver.plc_creation_not_found").increment(1); 133 + None 134 + } 135 + Err(e) => { 136 + debug!("Failed to fetch PLC creation time for {}: {}", did, e); 137 + counter!("handle_resolver.plc_creation_error").increment(1); 138 + None 139 + } 140 + }; 141 + (Some(cached), created_at) 124 142 } else { 125 143 // 2. Cache miss - resolve from PLC directory 126 144 counter!("handle_resolver.cache_miss").increment(1); 127 145 128 - match self.resolver.resolve_did(did).await { 146 + let (handle, created_at) = match self.resolver.resolve_did(did).await { 129 147 Ok(Some(doc)) => { 130 148 // Extract handle from DID document's alsoKnownAs field 131 149 let handle = doc ··· 148 166 counter!("handle_resolver.no_handle").increment(1); 149 167 } 150 168 151 - handle 169 + // Fetch account creation time from PLC audit log 170 + let created_at = match self.resolver.get_plc_creation_time(did).await { 171 + Ok(Some(timestamp)) => { 172 + counter!("handle_resolver.plc_creation_fetch_success").increment(1); 173 + Some(timestamp) 174 + } 175 + Ok(None) => { 176 + debug!("No PLC creation time found for {}", did); 177 + counter!("handle_resolver.plc_creation_not_found").increment(1); 178 + None 179 + } 180 + Err(e) => { 181 + debug!("Failed to fetch PLC creation time for {}: {}", did, e); 182 + counter!("handle_resolver.plc_creation_error").increment(1); 183 + None 184 + } 185 + }; 186 + 187 + (handle, created_at) 152 188 } 153 189 Ok(None) => { 154 190 debug!("DID document not found for {}", did); 155 191 counter!("handle_resolver.did_not_found").increment(1); 156 - None 192 + (None, None) 157 193 } 158 194 Err(e) => { 159 195 debug!("Failed to resolve DID for {}: {}", did, e); 160 196 counter!("handle_resolver.did_resolution_error").increment(1); 161 - None 197 + (None, None) 162 198 } 163 - } 199 + }; 200 + 201 + (handle, created_at) 164 202 }; 165 203 166 204 // 4. Send handle update operation to batch writer (bounded channel with backpressure) ··· 172 210 status: None, 173 211 sync_state: parakeet_db::types::ActorSyncState::Partial, 174 212 handle, 213 + account_created_at, 175 214 timestamp: chrono::Utc::now(), 176 215 }], 177 216 cache_invalidations: vec![],
+2
consumer/src/workers/jetstream/processing.rs
··· 202 202 status: None, 203 203 sync_state, 204 204 handle, // Pass the Option<String> directly 205 + account_created_at: None, // Jetstream doesn't provide creation time, enriched during handle resolution 205 206 timestamp, 206 207 }]; 207 208 ··· 244 245 status: Some(status), 245 246 sync_state, 246 247 handle: None, 248 + account_created_at: None, // Jetstream doesn't provide creation time, enriched during handle resolution 247 249 timestamp, 248 250 }]; 249 251
+1
did-resolver/Cargo.toml
··· 4 4 edition = "2021" 5 5 6 6 [dependencies] 7 + chrono = { version = "0.4", features = ["serde"] } 7 8 hickory-resolver = "0.24.2" 8 9 reqwest = { version = "0.12.12", features = ["json", "native-tls"] } 9 10 serde = { version = "1.0.217", features = ["derive"] }
+34
did-resolver/src/lib.rs
··· 79 79 Ok(Some(did_doc)) 80 80 } 81 81 82 + /// Get account creation timestamp from PLC audit log 83 + /// Returns the timestamp of the first operation (where prev is null) 84 + /// Only works for did:plc DIDs - returns None for other DID methods 85 + pub async fn get_plc_creation_time( 86 + &self, 87 + did: &str, 88 + ) -> Result<Option<chrono::DateTime<chrono::Utc>>, Error> { 89 + // Only fetch for did:plc 90 + if !did.starts_with("did:plc:") { 91 + return Ok(None); 92 + } 93 + 94 + let res = self 95 + .client 96 + .get(format!("{}/{did}/log/audit", self.plc)) 97 + .send() 98 + .await?; 99 + 100 + let status = res.status(); 101 + 102 + if status.is_server_error() { 103 + return Err(Error::ServerError); 104 + } 105 + 106 + if status == StatusCode::NOT_FOUND || status == StatusCode::GONE { 107 + return Ok(None); 108 + } 109 + 110 + let audit_log: Vec<types::PlcAuditLogEntry> = res.json().await?; 111 + 112 + // First entry in the audit log is the account creation 113 + Ok(audit_log.first().map(|entry| entry.created_at)) 114 + } 115 + 82 116 async fn resolve_did_web(&self, id: &str) -> Result<Option<types::DidDocument>, Error> { 83 117 let res = match self 84 118 .client
+8
did-resolver/src/types.rs
··· 2 2 3 3 #[derive(Debug, Deserialize, Serialize)] 4 4 #[serde(rename_all = "camelCase")] 5 + pub struct PlcAuditLogEntry { 6 + pub did: String, 7 + pub created_at: chrono::DateTime<chrono::Utc>, 8 + // Other fields exist but we only need createdAt 9 + } 10 + 11 + #[derive(Debug, Deserialize, Serialize)] 12 + #[serde(rename_all = "camelCase")] 5 13 pub struct DidDocument { 6 14 #[serde(default, rename = "@context")] 7 15 pub context: Vec<String>,
+3
migrations/2025-11-05-120000_add_account_created_at_to_actors/down.sql
··· 1 + -- Remove account_created_at column from actors table 2 + DROP INDEX IF EXISTS idx_actors_account_created_at; 3 + ALTER TABLE actors DROP COLUMN account_created_at;
+8
migrations/2025-11-05-120000_add_account_created_at_to_actors/up.sql
··· 1 + -- Add account_created_at column to actors table 2 + -- This represents when the account was created (from PLC directory first operation) 3 + -- Nullable because we may not know it for all actors initially (especially stubs) 4 + -- Will be populated during handle resolution 5 + ALTER TABLE actors ADD COLUMN account_created_at TIMESTAMP WITH TIME ZONE; 6 + 7 + -- Create index for potential queries by account age 8 + CREATE INDEX idx_actors_account_created_at ON actors(account_created_at) WHERE account_created_at IS NOT NULL;
+1
parakeet-db/src/schema.rs
··· 124 124 repo_rev -> Nullable<Text>, 125 125 repo_cid -> Nullable<Bytea>, 126 126 last_indexed -> Nullable<Timestamptz>, 127 + account_created_at -> Nullable<Timestamptz>, 127 128 } 128 129 } 129 130
+12 -11
parakeet/src/hydration/profile/builders.rs
··· 115 115 } 116 116 117 117 pub(super) fn build_basic( 118 - (did, handle, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 118 + (did, handle, account_created_at, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 119 119 stats: Option<ProfileStats>, 120 120 labels: Vec<models::Label>, 121 121 verifications: Option<Vec<crate::loaders::EnrichedVerification>>, ··· 141 141 }) 142 142 .unwrap_or_else(|| (None, None, None)); 143 143 144 - // Use profile.created_at if available, fall back to Utc::now() 145 - let created_at = profile.as_ref().map(|p| p.created_at).unwrap_or_else(Utc::now); 144 + // Use actor.account_created_at if available (from PLC directory), fall back to Utc::now() 145 + // This is the actual account creation time, not when the profile record was created 146 + let created_at = account_created_at.unwrap_or_else(Utc::now); 146 147 147 148 ProfileViewBasic { 148 149 did, ··· 160 161 } 161 162 162 163 pub(super) fn build_profile( 163 - (did, handle, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 164 + (did, handle, account_created_at, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 164 165 stats: Option<ProfileStats>, 165 166 labels: Vec<models::Label>, 166 167 verifications: Option<Vec<crate::loaders::EnrichedVerification>>, ··· 187 188 }) 188 189 .unwrap_or_else(|| (None, None, None, None)); 189 190 190 - // Use profile.created_at for both createdAt and indexedAt (if profile exists) 191 - // Fall back to Utc::now() if no profile record exists yet 192 - let created_at = profile.as_ref().map(|p| p.created_at).unwrap_or_else(Utc::now); 191 + // Use actor.account_created_at for both createdAt and indexedAt 192 + // Fall back to Utc::now() if account creation time not available 193 + let created_at = account_created_at.unwrap_or_else(Utc::now); 193 194 194 195 ProfileView { 195 196 did, ··· 209 210 } 210 211 211 212 pub(super) fn build_detailed( 212 - (did, handle, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 213 + (did, handle, account_created_at, profile, chat_decl, is_labeler, status, notif_decl): ProfileLoaderRet, 213 214 stats: Option<ProfileStats>, 214 215 labels: Vec<models::Label>, 215 216 verifications: Option<Vec<crate::loaders::EnrichedVerification>>, ··· 241 242 }) 242 243 .unwrap_or_else(|| (None, None, None, None, None, None)); 243 244 244 - // Use profile.created_at for both createdAt and indexedAt (if profile exists) 245 - // Fall back to Utc::now() if no profile record exists yet 246 - let created_at = profile.as_ref().map(|p| p.created_at).unwrap_or_else(Utc::now); 245 + // Use actor.account_created_at for both createdAt and indexedAt 246 + // Fall back to Utc::now() if account creation time not available 247 + let created_at = account_created_at.unwrap_or_else(Utc::now); 247 248 248 249 ProfileViewDetailed { 249 250 did,
+9 -5
parakeet/src/loaders/profile.rs
··· 16 16 "SELECT 17 17 a.did, 18 18 a.handle, 19 + a.account_created_at, 19 20 p.actor_id, 20 21 p.cid, 21 22 p.created_at, ··· 171 172 172 173 pub struct ProfileLoader(pub(super) Pool<AsyncPgConnection>, pub(super) redis::aio::MultiplexedConnection); 173 174 pub type ProfileLoaderRet = ( 174 - String, // did - always present (from actors table) 175 - Option<String>, // handle - from actors table 176 - Option<models::Profile>, // profile - optional (may not exist) 175 + String, // did - always present (from actors table) 176 + Option<String>, // handle - from actors table 177 + Option<chrono::DateTime<chrono::Utc>>, // account_created_at - from actors table 178 + Option<models::Profile>, // profile - optional (may not exist) 177 179 Option<ChatAllowIncoming>, 178 180 bool, 179 181 Option<EnrichedStatus>, ··· 198 200 did: String, 199 201 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 200 202 handle: Option<String>, 203 + #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamptz>)] 204 + account_created_at: Option<chrono::DateTime<chrono::Utc>>, 201 205 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Integer>)] 202 206 actor_id: Option<i32>, 203 207 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Binary>)] ··· 344 348 let is_labeler = row.labeler_actor_id.is_some(); 345 349 let status = status_map.get(&row.did).cloned(); 346 350 347 - let val = (row.did.clone(), row.handle, profile, chat_decl, is_labeler, status, notif_decl); 351 + let val = (row.did.clone(), row.handle, row.account_created_at, profile, chat_decl, is_labeler, status, notif_decl); 348 352 349 353 (row.did, val) 350 354 }, ··· 353 357 // Enqueue missing profiles for background fetching using Redis 354 358 let missing_profiles: Vec<String> = results 355 359 .iter() 356 - .filter(|(_, (_, _, profile, _, _, _, _))| profile.is_none()) 360 + .filter(|(_, (_, _, _, profile, _, _, _, _))| profile.is_none()) 357 361 .map(|(did, _)| format!("at://{}/app.bsky.actor.profile/self", did)) 358 362 .collect(); 359 363
+1 -1
parakeet/src/xrpc/app_bsky/actor.rs
··· 319 319 // Check if has profile (display name or description) 320 320 profiles 321 321 .get(did) 322 - .and_then(|p| p.2.as_ref()) // .2 is Option<models::Profile> 322 + .and_then(|p| p.3.as_ref()) // .3 is Option<models::Profile> 323 323 .map(|prof| { 324 324 prof.display_name.is_some() || prof.description.is_some() 325 325 })
+5 -5
parakeet/src/xrpc/app_bsky/notification/mod.rs
··· 146 146 let profile_state = profile_states.get(&notif.author_did); 147 147 148 148 // Build associated object 149 - let associated = if let Some((_, _, _, chat_decl, _, _, notif_decl)) = profile_data { 149 + let associated = if let Some((_, _, _, _, chat_decl, _, _, notif_decl)) = profile_data { 150 150 let mut assoc = serde_json::Map::new(); 151 151 152 152 if let Some(chat) = chat_decl { ··· 224 224 let author = NotificationAuthor { 225 225 did: notif.author_did.clone(), 226 226 handle: profile_data 227 - .and_then(|(_, h, _, _, _, _, _)| h.clone()) 227 + .and_then(|(_, h, _, _, _, _, _, _)| h.clone()) 228 228 .unwrap_or_else(|| { 229 229 // Fallback to handle.invalid if profile not yet indexed 230 230 "handle.invalid".to_string() 231 231 }), 232 - display_name: profile_data.and_then(|(_, _, p, _, _, _, _)| { 232 + display_name: profile_data.and_then(|(_, _, _, p, _, _, _, _)| { 233 233 p.as_ref().and_then(|prof| prof.display_name.clone()) 234 234 }), 235 - avatar: profile_data.and_then(|(did, _, p, _, _, _, _)| { 235 + avatar: profile_data.and_then(|(did, _, _, p, _, _, _, _)| { 236 236 p.as_ref().and_then(|prof| { 237 237 prof.avatar_cid.as_ref().and_then(|cid_digest| { 238 238 parakeet_db::cid_util::digest_to_blob_cid_string(cid_digest) ··· 243 243 associated, 244 244 viewer, 245 245 labels: Some(Vec::new()), // Empty labels array for compatibility 246 - description: profile_data.and_then(|(_, _, p, _, _, _, _)| { 246 + description: profile_data.and_then(|(_, _, _, p, _, _, _, _)| { 247 247 p.as_ref().and_then(|prof| prof.description.clone()) 248 248 }), 249 249 // TODO: Profile timestamps should be fetched from records_literal table via record_id FK
+1 -1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
··· 256 256 // Check if has profile (display name or description) 257 257 profiles 258 258 .get(did) 259 - .and_then(|p| p.2.as_ref()) 259 + .and_then(|p| p.3.as_ref()) // .3 is Option<models::Profile> 260 260 .map(|prof| prof.display_name.is_some() || prof.description.is_some()) 261 261 .unwrap_or(false) 262 262 })