Rust AppView - highly experimental!
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(hack): failover api; remove preference endpoints

+443 -231
+19 -15
lexica/src/app_bsky/actor.rs
··· 6 6 use std::fmt::Display; 7 7 use std::str::FromStr; 8 8 9 - #[derive(Clone, Default, Debug, Serialize)] 9 + #[derive(Clone, Default, Debug, Serialize, Deserialize)] 10 10 #[serde(rename_all = "camelCase")] 11 11 pub struct ProfileViewerState { 12 12 pub muted: bool, ··· 27 27 // pub activity_subscriptions: Option<()>, 28 28 } 29 29 30 - #[derive(Clone, Default, Debug, Serialize)] 30 + #[derive(Clone, Default, Debug, Serialize, Deserialize)] 31 31 #[serde(rename_all = "camelCase")] 32 32 pub struct ProfileAssociated { 33 - pub lists: i64, 34 - pub feedgens: i64, 35 - pub starter_packs: i64, 36 - pub labeler: bool, 33 + #[serde(skip_serializing_if = "Option::is_none")] 34 + pub lists: Option<i64>, 35 + #[serde(skip_serializing_if = "Option::is_none")] 36 + pub feedgens: Option<i64>, 37 + #[serde(skip_serializing_if = "Option::is_none")] 38 + pub starter_packs: Option<i64>, 39 + #[serde(skip_serializing_if = "Option::is_none")] 40 + pub labeler: Option<bool>, 37 41 #[serde(skip_serializing_if = "Option::is_none")] 38 42 pub chat: Option<ProfileAssociatedChat>, 39 43 #[serde(skip_serializing_if = "Option::is_none")] 40 44 pub activity_subscription: Option<ProfileAssociatedActivitySubscription>, 41 45 } 42 46 43 - #[derive(Clone, Debug, Serialize)] 47 + #[derive(Clone, Debug, Serialize, Deserialize)] 44 48 #[serde(rename_all = "camelCase")] 45 49 pub struct ProfileAssociatedChat { 46 50 pub allow_incoming: ChatAllowIncoming, ··· 77 81 } 78 82 } 79 83 80 - #[derive(Clone, Debug, Serialize)] 84 + #[derive(Clone, Debug, Serialize, Deserialize)] 81 85 #[serde(rename_all = "camelCase")] 82 86 pub struct ProfileAssociatedActivitySubscription { 83 87 pub allow_subscriptions: ProfileAllowSubscriptions, ··· 140 144 } 141 145 } 142 146 143 - #[derive(Clone, Debug, Serialize)] 147 + #[derive(Clone, Debug, Serialize, Deserialize)] 144 148 #[serde(rename_all = "camelCase")] 145 149 pub struct ProfileViewBasic { 146 150 pub did: String, ··· 166 170 pub created_at: DateTime<Utc>, 167 171 } 168 172 169 - #[derive(Clone, Debug, Serialize)] 173 + #[derive(Clone, Debug, Serialize, Deserialize)] 170 174 #[serde(rename_all = "camelCase")] 171 175 pub struct ProfileView { 172 176 pub did: String, ··· 195 199 pub indexed_at: NaiveDateTime, 196 200 } 197 201 198 - #[derive(Debug, Serialize)] 202 + #[derive(Debug, Serialize, Deserialize)] 199 203 #[serde(rename_all = "camelCase")] 200 204 pub struct ProfileViewDetailed { 201 205 pub did: String, ··· 235 239 pub indexed_at: NaiveDateTime, 236 240 } 237 241 238 - #[derive(Clone, Debug, Serialize)] 242 + #[derive(Clone, Debug, Serialize, Deserialize)] 239 243 #[serde(rename_all = "camelCase")] 240 244 pub struct VerificationView { 241 245 pub issuer: String, ··· 244 248 pub created_at: DateTime<Utc>, 245 249 } 246 250 247 - #[derive(Clone, Debug, Serialize)] 251 + #[derive(Clone, Debug, Serialize, Deserialize)] 248 252 #[serde(rename_all = "camelCase")] 249 253 pub struct VerificationState { 250 254 pub verifications: Vec<VerificationView>, ··· 268 272 None, 269 273 } 270 274 271 - #[derive(Clone, Debug, Serialize)] 275 + #[derive(Clone, Debug, Serialize, Deserialize)] 272 276 #[serde(rename_all = "camelCase")] 273 277 pub struct StatusView { 274 278 pub status: Status, ··· 281 285 pub is_active: Option<bool>, 282 286 } 283 287 284 - #[derive(Clone, Debug, Serialize)] 288 + #[derive(Clone, Debug, Serialize, Deserialize)] 285 289 #[serde(tag = "$type")] 286 290 #[serde(rename = "app.bsky.embed.external#view")] 287 291 pub struct StatusViewEmbed {
+7 -7
lexica/src/app_bsky/embed.rs
··· 14 14 pub height: i32, 15 15 } 16 16 17 - #[derive(Clone, Debug, Serialize)] 17 + #[derive(Clone, Debug, Serialize, Deserialize)] 18 18 #[serde(tag = "$type")] 19 19 pub enum Embed { 20 20 #[serde(rename = "app.bsky.embed.images#view")] ··· 31 31 media: Box<Embed>, 32 32 }, 33 33 } 34 - #[derive(Clone, Debug, Serialize)] 34 + #[derive(Clone, Debug, Serialize, Deserialize)] 35 35 #[serde(rename_all = "camelCase")] 36 36 pub struct ImageView { 37 37 pub thumb: String, ··· 41 41 pub aspect_ratio: Option<AspectRatio>, 42 42 } 43 43 44 - #[derive(Clone, Debug, Serialize)] 44 + #[derive(Clone, Debug, Serialize, Deserialize)] 45 45 #[serde(rename_all = "camelCase")] 46 46 pub struct VideoView { 47 47 pub cid: String, ··· 54 54 pub aspect_ratio: Option<AspectRatio>, 55 55 } 56 56 57 - #[derive(Clone, Debug, Serialize)] 57 + #[derive(Clone, Debug, Serialize, Deserialize)] 58 58 #[serde(rename_all = "camelCase")] 59 59 pub struct External { 60 60 pub uri: String, ··· 64 64 pub thumb: Option<String>, 65 65 } 66 66 67 - #[derive(Clone, Debug, Serialize)] 67 + #[derive(Clone, Debug, Serialize, Deserialize)] 68 68 #[serde(tag = "$type")] 69 69 pub enum RecordViewInner { 70 70 #[serde(rename = "app.bsky.embed.record#viewRecord")] ··· 93 93 // StaterPackView() 94 94 } 95 95 96 - #[derive(Clone, Debug, Serialize)] 96 + #[derive(Clone, Debug, Serialize, Deserialize)] 97 97 #[serde(rename_all = "camelCase")] 98 98 pub struct RecordView { 99 99 pub uri: String, ··· 109 109 pub indexed_at: DateTime<Utc>, 110 110 } 111 111 112 - #[derive(Clone, Debug, Serialize)] 112 + #[derive(Clone, Debug, Serialize, Deserialize)] 113 113 pub struct RecordWrapper { 114 114 pub record: RecordViewInner, 115 115 }
+12 -12
lexica/src/app_bsky/feed.rs
··· 8 8 use serde::{Deserialize, Serialize}; 9 9 use std::str::FromStr; 10 10 11 - #[derive(Clone, Default, Debug, Serialize)] 11 + #[derive(Clone, Default, Debug, Serialize, Deserialize)] 12 12 #[serde(rename_all = "camelCase")] 13 13 pub struct PostViewerState { 14 14 #[serde(skip_serializing_if = "Option::is_none")] ··· 22 22 pub pinned: bool, 23 23 } 24 24 25 - #[derive(Clone, Debug, Serialize)] 25 + #[derive(Clone, Debug, Serialize, Deserialize)] 26 26 #[serde(rename_all = "camelCase")] 27 27 pub struct PostView { 28 28 pub uri: String, ··· 45 45 pub indexed_at: DateTime<Utc>, 46 46 } 47 47 48 - #[derive(Debug, Serialize)] 48 + #[derive(Debug, Serialize, Deserialize)] 49 49 #[serde(rename_all = "camelCase")] 50 50 pub struct FeedViewPost { 51 51 pub post: PostView, ··· 57 57 pub feed_context: Option<String>, 58 58 } 59 59 60 - #[derive(Debug, Serialize)] 60 + #[derive(Debug, Serialize, Deserialize)] 61 61 #[serde(rename_all = "camelCase")] 62 62 pub struct ReplyRef { 63 63 pub root: ReplyRefPost, ··· 66 66 pub grandparent_author: Option<ProfileViewBasic>, 67 67 } 68 68 69 - #[derive(Debug, Serialize)] 69 + #[derive(Debug, Serialize, Deserialize)] 70 70 #[serde(tag = "$type")] 71 71 pub enum ReplyRefPost { 72 72 #[serde(rename = "app.bsky.feed.defs#postView")] ··· 85 85 }, 86 86 } 87 87 88 - #[derive(Debug, Serialize)] 88 + #[derive(Debug, Serialize, Deserialize)] 89 89 #[serde(tag = "$type")] 90 90 pub enum FeedViewPostReason { 91 91 #[serde(rename = "app.bsky.feed.defs#reasonRepost")] ··· 94 94 Pin, 95 95 } 96 96 97 - #[derive(Debug, Serialize)] 97 + #[derive(Debug, Serialize, Deserialize)] 98 98 #[serde(rename_all = "camelCase")] 99 99 pub struct FeedReasonRepost { 100 100 pub by: ProfileViewBasic, ··· 135 135 }, 136 136 } 137 137 138 - #[derive(Clone, Debug, Serialize)] 138 + #[derive(Clone, Debug, Serialize, Deserialize)] 139 139 pub struct BlockedAuthor { 140 140 pub did: String, 141 141 pub viewer: Option<ProfileViewerState>, 142 142 } 143 143 144 - #[derive(Clone, Default, Debug, Serialize)] 144 + #[derive(Clone, Default, Debug, Serialize, Deserialize)] 145 145 #[serde(rename_all = "camelCase")] 146 146 pub struct GeneratorViewerState { 147 147 #[serde(skip_serializing_if = "Option::is_none")] 148 148 pub like: Option<String>, 149 149 } 150 150 151 - #[derive(Clone, Debug, Serialize)] 151 + #[derive(Clone, Debug, Serialize, Deserialize)] 152 152 #[serde(rename_all = "camelCase")] 153 153 pub struct GeneratorView { 154 154 pub uri: String, ··· 177 177 pub indexed_at: DateTime<Utc>, 178 178 } 179 179 180 - #[derive(Copy, Clone, Debug, Serialize)] 180 + #[derive(Copy, Clone, Debug, Serialize, Deserialize)] 181 181 pub enum GeneratorContentMode { 182 182 #[serde(rename = "app.bsky.feed.defs#contentModeUnspecified")] 183 183 Unspecified, ··· 197 197 } 198 198 } 199 199 200 - #[derive(Clone, Debug, Serialize)] 200 + #[derive(Clone, Debug, Serialize, Deserialize)] 201 201 pub struct ThreadgateView { 202 202 pub uri: String, 203 203 pub cid: String,
+3 -3
lexica/src/app_bsky/graph.rs
··· 6 6 use serde::{Deserialize, Serialize}; 7 7 use std::str::FromStr; 8 8 9 - #[derive(Clone, Default, Debug, Serialize)] 9 + #[derive(Clone, Default, Debug, Serialize, Deserialize)] 10 10 #[serde(rename_all = "camelCase")] 11 11 pub struct ListViewerState { 12 12 pub muted: bool, ··· 14 14 pub blocked: Option<String>, 15 15 } 16 16 17 - #[derive(Clone, Debug, Serialize)] 17 + #[derive(Clone, Debug, Serialize, Deserialize)] 18 18 #[serde(rename_all = "camelCase")] 19 19 pub struct ListViewBasic { 20 20 pub uri: String, ··· 34 34 pub indexed_at: DateTime<Utc>, 35 35 } 36 36 37 - #[derive(Clone, Debug, Serialize)] 37 + #[derive(Clone, Debug, Serialize, Deserialize)] 38 38 #[serde(rename_all = "camelCase")] 39 39 pub struct ListView { 40 40 pub uri: String,
+2 -2
lexica/src/app_bsky/labeler.rs
··· 4 4 use chrono::prelude::*; 5 5 use serde::{Deserialize, Serialize}; 6 6 7 - #[derive(Clone, Default, Debug, Serialize)] 7 + #[derive(Clone, Default, Debug, Serialize, Deserialize)] 8 8 #[serde(rename_all = "camelCase")] 9 9 pub struct LabelerViewerState { 10 10 #[serde(skip_serializing_if = "Option::is_none")] 11 11 pub like: Option<String>, 12 12 } 13 13 14 - #[derive(Clone, Debug, Serialize)] 14 + #[derive(Clone, Debug, Serialize, Deserialize)] 15 15 #[serde(rename_all = "camelCase")] 16 16 pub struct LabelerView { 17 17 pub uri: String,
+2 -2
lexica/src/app_bsky/mod.rs
··· 1 - use serde::Serialize; 1 + use serde::{Deserialize, Serialize}; 2 2 3 3 pub mod actor; 4 4 pub mod bookmark; ··· 9 9 pub mod richtext; 10 10 pub mod unspecced; 11 11 12 - #[derive(Clone, Default, Debug, Serialize)] 12 + #[derive(Clone, Default, Debug, Serialize, Deserialize)] 13 13 #[serde(rename_all = "camelCase")] 14 14 pub struct RecordStats { 15 15 pub reply_count: i64,
+1 -1
lexica/src/com_atproto/label.rs
··· 136 136 pub val: String, 137 137 } 138 138 139 - #[derive(Clone, Debug, Serialize)] 139 + #[derive(Clone, Debug, Serialize, Deserialize)] 140 140 pub struct Label { 141 141 pub ver: i32, 142 142 pub src: String,
+1 -1
lexica/src/lib.rs
··· 8 8 pub mod community_lexicon; 9 9 mod utils; 10 10 11 - #[derive(Clone, Debug, Serialize)] 11 + #[derive(Clone, Debug, Serialize, Deserialize)] 12 12 pub struct JsonBytes { 13 13 #[serde(rename = "$bytes")] 14 14 pub bytes: String,
+1 -1
parakeet/src/hydration/posts.rs
··· 84 84 let threadgate = threadgate?; 85 85 86 86 let lists = match threadgate.allowed_lists.as_ref() { 87 - Some(allowed_lists) => allowed_lists.iter().cloned().collect(), 87 + Some(allowed_lists) => allowed_lists.to_vec(), 88 88 None => Vec::new(), 89 89 }; 90 90 let lists = self.hydrate_lists_basic(lists).await;
+4 -4
parakeet/src/hydration/profile.rs
··· 25 25 let stats = stats.unwrap_or_default(); 26 26 27 27 Some(ProfileAssociated { 28 - lists: stats.lists as i64, 29 - feedgens: stats.feeds as i64, 30 - starter_packs: stats.starterpacks as i64, 31 - labeler, 28 + lists: Some(stats.lists as i64), 29 + feedgens: Some(stats.feeds as i64), 30 + starter_packs: Some(stats.starterpacks as i64), 31 + labeler: Some(labeler), 32 32 chat: chat.map(|v| ProfileAssociatedChat { allow_incoming: v }), 33 33 activity_subscription: notif.map(|v| ProfileAssociatedActivitySubscription { 34 34 allow_subscriptions: v,
+189 -78
parakeet/src/xrpc/app_bsky/actor.rs
··· 6 6 use axum::extract::{Query, State}; 7 7 use axum::Json; 8 8 use axum_extra::extract::Query as ExtraQuery; 9 + use chrono::{DateTime, NaiveDateTime, Utc}; 9 10 use diesel::prelude::*; 10 11 use diesel_async::RunQueryDsl; 11 12 use lexica::app_bsky::actor::ProfileViewDetailed; ··· 24 25 maybe_auth: Option<AtpAuth>, 25 26 Query(query): Query<ActorQuery>, 26 27 ) -> XrpcResult<Json<ProfileViewDetailed>> { 27 - let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 28 + let hyd = StatefulHydrator::new( 29 + &state.dataloaders, 30 + &state.cdn, 31 + &labelers, 32 + maybe_auth.clone(), 33 + ); 28 34 29 - let did = get_actor_did(&state.dataloaders, query.actor).await?; 35 + // Try to get the profile from our database first 36 + match get_actor_did(&state.dataloaders, query.actor.clone()).await { 37 + Ok(did) => { 38 + // We found the DID, check if it's valid in our system 39 + let mut conn = state.pool.get().await?; 30 40 31 - let mut conn = state.pool.get().await?; 32 - check_actor_status(&mut conn, &did).await?; 41 + if check_actor_status(&mut conn, &did).await.is_err() { 42 + tracing::info!( 43 + "Actor status check failed for {}, falling back to public API", 44 + query.actor 45 + ); 46 + return fallback_to_public_api(&query.actor).await; 47 + } 33 48 34 - let maybe_profile = hyd.hydrate_profile_detailed(did).await; 49 + // Try to hydrate the profile from our data 50 + if let Some(profile) = hyd.hydrate_profile_detailed(did).await { 51 + return Ok(Json(profile)); 52 + } 35 53 36 - match maybe_profile { 37 - Some(profile) => Ok(Json(profile)), 38 - None => Err(Error::not_found()), 54 + // Profile hydration failed, fall back to public API 55 + tracing::info!( 56 + "Failed to hydrate profile for {}, falling back to public API", 57 + query.actor 58 + ); 59 + fallback_to_public_api(&query.actor).await 60 + } 61 + Err(_) => { 62 + // Couldn't resolve the DID, fall back to public API 63 + tracing::info!( 64 + "DID not found for {}, falling back to public API", 65 + query.actor 66 + ); 67 + fallback_to_public_api(&query.actor).await 68 + } 69 + } 70 + } 71 + 72 + // Helper function for falling back to the public API 73 + async fn fallback_to_public_api(actor: &str) -> XrpcResult<Json<ProfileViewDetailed>> { 74 + tracing::info!("Fetching profile for {} from public API", actor); 75 + 76 + // Make the request to the public Bluesky API 77 + let client = reqwest::Client::new(); 78 + let url = format!( 79 + "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", 80 + actor 81 + ); 82 + 83 + match client.get(&url).send().await { 84 + Ok(response) => { 85 + if response.status().is_success() { 86 + match response.text().await { 87 + Ok(json_text) => { 88 + match serde_json::from_str::<ProfileViewDetailed>(&json_text) { 89 + Ok(profile) => { 90 + tracing::info!("Successfully deserialized profile from public API"); 91 + return Ok(Json(profile)); 92 + } 93 + Err(e) => { 94 + tracing::error!( 95 + "Failed to deserialize profile from public API: {}", 96 + e 97 + ); 98 + tracing::debug!("Response JSON: {}", json_text); 99 + 100 + // Try to extract the essential fields manually 101 + match serde_json::from_str::<serde_json::Value>(&json_text) { 102 + Ok(json_value) => { 103 + tracing::info!( 104 + "Attempting to manually construct profile from JSON" 105 + ); 106 + let profile = manual_construct_profile(json_value); 107 + return Ok(Json(profile)); 108 + } 109 + Err(_) => { 110 + return Err(Error::not_found()); 111 + } 112 + } 113 + } 114 + } 115 + } 116 + Err(e) => { 117 + tracing::error!("Failed to get response text: {}", e); 118 + return Err(Error::not_found()); 119 + } 120 + } 121 + } 122 + Err(Error::not_found()) 123 + } 124 + Err(e) => { 125 + tracing::error!("Request to public API failed: {}", e); 126 + Err(Error::not_found()) 127 + } 39 128 } 40 129 } 41 130 ··· 49 138 profiles: Vec<ProfileViewDetailed>, 50 139 } 51 140 141 + // Helper function to manually construct ProfileViewDetailed from JSON 142 + fn manual_construct_profile(json: serde_json::Value) -> ProfileViewDetailed { 143 + use chrono::prelude::*; 144 + 145 + // Extract values with defaults for required fields 146 + let did = json 147 + .get("did") 148 + .and_then(|v| v.as_str()) 149 + .unwrap_or("") 150 + .to_string(); 151 + let handle = json 152 + .get("handle") 153 + .and_then(|v| v.as_str()) 154 + .unwrap_or("handle.invalid") 155 + .to_string(); 156 + 157 + // Optional fields 158 + let display_name = json 159 + .get("displayName") 160 + .and_then(|v| v.as_str()) 161 + .map(|s| s.to_string()); 162 + let description = json 163 + .get("description") 164 + .and_then(|v| v.as_str()) 165 + .map(|s| s.to_string()); 166 + let avatar = json 167 + .get("avatar") 168 + .and_then(|v| v.as_str()) 169 + .map(|s| s.to_string()); 170 + let banner = json 171 + .get("banner") 172 + .and_then(|v| v.as_str()) 173 + .map(|s| s.to_string()); 174 + 175 + // Count fields with defaults 176 + let followers_count = json 177 + .get("followersCount") 178 + .and_then(|v| v.as_i64()) 179 + .unwrap_or(0); 180 + let follows_count = json 181 + .get("followsCount") 182 + .and_then(|v| v.as_i64()) 183 + .unwrap_or(0); 184 + let posts_count = json.get("postsCount").and_then(|v| v.as_i64()).unwrap_or(0); 185 + 186 + // We'll create an empty associated for now 187 + let associated = None; 188 + 189 + // Empty vectors for labels 190 + let labels = Vec::new(); 191 + 192 + // Parse dates 193 + let created_at = json 194 + .get("createdAt") 195 + .and_then(|v| v.as_str()) 196 + .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) 197 + .map(|dt| dt.with_timezone(&Utc)) 198 + .unwrap_or_else(Utc::now); 199 + 200 + let indexed_at_str = json.get("indexedAt").and_then(|v| v.as_str()); 201 + let indexed_at = match indexed_at_str { 202 + Some(s) => match DateTime::parse_from_rfc3339(s) { 203 + Ok(dt) => DateTime::from_timestamp(dt.timestamp(), 0) 204 + .unwrap_or(Utc::now()) 205 + .naive_utc(), 206 + Err(_) => Utc::now().naive_utc(), 207 + }, 208 + None => Utc::now().naive_utc(), 209 + }; 210 + 211 + ProfileViewDetailed { 212 + did, 213 + handle, 214 + display_name, 215 + description, 216 + avatar, 217 + banner, 218 + followers_count, 219 + follows_count, 220 + posts_count, 221 + associated, 222 + viewer: None, 223 + labels, 224 + verification: None, 225 + status: None, 226 + pronouns: None, 227 + website: None, 228 + created_at, 229 + indexed_at, 230 + } 231 + } 232 + 52 233 pub async fn get_profiles( 53 234 State(state): State<GlobalState>, 54 235 AtpAcceptLabelers(labelers): AtpAcceptLabelers, ··· 67 248 68 249 Ok(Json(GetProfilesRes { profiles })) 69 250 } 70 - 71 - #[derive(Debug, Serialize)] 72 - pub struct GetPreferencesRes { 73 - preferences: Vec<Value>, 74 - } 75 - 76 - #[derive(Debug, Deserialize)] 77 - pub struct PutPreferencesReq { 78 - preferences: Vec<Value>, 79 - } 80 - 81 - pub async fn get_preferences( 82 - auth: AtpAuth, 83 - State(state): State<GlobalState>, 84 - ) -> XrpcResult<Json<GetPreferencesRes>> { 85 - let prefs_str = "{\"$type\":\"app.bsky.actor.defs#savedFeedsPrefV2\",\"items\":[{\"id\":\"3lzwjjbwm4226\",\"type\":\"timeline\",\"value\":\"following\",\"pinned\":true}]}"; 86 - 87 - return Ok(Json(GetPreferencesRes { 88 - preferences: vec![serde_json::from_str(prefs_str).unwrap()], 89 - })); 90 - 91 - let did = &auth.0; 92 - let mut conn = state.pool.get().await?; 93 - 94 - // Get preferences from database 95 - let profile = schema::profiles::table 96 - .filter(schema::profiles::did.eq(did)) 97 - .select(schema::profiles::preferences) 98 - .first::<Option<String>>(&mut conn) 99 - .await 100 - .optional()? 101 - .flatten(); 102 - 103 - // If preferences exist and can be parsed, return them 104 - if let Some(prefs_str) = profile { 105 - if let Ok(prefs) = serde_json::from_str::<Vec<Value>>(&prefs_str) { 106 - return Ok(Json(GetPreferencesRes { preferences: prefs })); 107 - } 108 - } 109 - 110 - // Return empty preferences if none found or parsing failed 111 - Ok(Json(GetPreferencesRes { 112 - preferences: vec![], 113 - })) 114 - } 115 - 116 - pub async fn put_preferences( 117 - auth: AtpAuth, 118 - State(state): State<GlobalState>, 119 - Json(req): Json<PutPreferencesReq>, 120 - ) -> XrpcResult<()> { 121 - let did = &auth.0; 122 - let mut conn = state.pool.get().await?; 123 - 124 - // Serialize the preferences array to string 125 - let prefs_str = serde_json::to_string(&req.preferences) 126 - .map_err(|_| Error::invalid_request(Some("Invalid preferences format".to_string())))?; 127 - 128 - // Update the database with new preferences 129 - diesel::update(schema::profiles::table) 130 - .filter(schema::profiles::did.eq(did)) 131 - .set(schema::profiles::preferences.eq(prefs_str)) 132 - .execute(&mut conn) 133 - .await 134 - .map_err(|e| { 135 - eprintln!("Database error: {}", e); 136 - Error::server_error(Some("Failed to update preferences")) 137 - })?; 138 - Ok(()) 139 - }
+200 -103
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 26 26 27 27 const FEEDGEN_SERVICE_ID: &str = "#bsky_fg"; 28 28 29 - #[derive(Debug, Serialize)] 29 + #[derive(Debug, Serialize, Deserialize)] 30 30 pub struct FeedRes { 31 31 #[serde(skip_serializing_if = "Option::is_none")] 32 32 cursor: Option<String>, ··· 214 214 ) -> XrpcResult<Json<FeedRes>> { 215 215 let mut conn = state.pool.get().await?; 216 216 217 - let did = get_actor_did(&state.dataloaders, query.actor.clone()).await?; 217 + // Try to get the DID from our database 218 + match get_actor_did(&state.dataloaders, query.actor.clone()).await { 219 + Ok(did) => { 220 + // Check if the actor status is valid 221 + if check_actor_status(&mut conn, &did).await.is_err() { 222 + tracing::info!( 223 + "Actor status check failed for {}, falling back to public API", 224 + query.actor 225 + ); 226 + return fallback_to_public_api(&query).await; 227 + } 218 228 219 - check_actor_status(&mut conn, &did).await?; 229 + // check if we block the actor or if they block us 230 + if let Some(auth) = &maybe_auth { 231 + if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? { 232 + if psr.blocked.unwrap_or_default() { 233 + // they block us 234 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)); 235 + } else if psr.blocking.is_some() { 236 + // we block them 237 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)); 238 + } 239 + } 240 + } 241 + 242 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 243 + 244 + let pin = match query.include_pins && query.cursor.is_none() { 245 + false => None, 246 + true => match crate::db::get_pinned_post_uri(&mut conn, &did).await? { 247 + Some(post) => hyd.hydrate_post(post).await, 248 + None => None, 249 + }, 250 + }; 251 + 252 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 253 + 254 + let mut posts_query = schema::author_feeds::table 255 + .select(models::AuthorFeedItem::as_select()) 256 + .left_join( 257 + schema::posts::table.on(schema::posts::at_uri.eq(schema::author_feeds::post)), 258 + ) 259 + .filter(schema::author_feeds::did.eq(&did)) 260 + .into_boxed(); 220 261 221 - // check if we block the actor or if they block us 222 - if let Some(auth) = &maybe_auth { 223 - if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? { 224 - if psr.blocked.unwrap_or_default() { 225 - // they block us 226 - return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)); 227 - } else if psr.blocking.is_some() { 228 - // we block them 229 - return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)); 262 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 263 + posts_query = posts_query.filter(schema::author_feeds::sort_at.lt(cursor)); 230 264 } 231 - } 232 - } 233 265 234 - let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 266 + posts_query = match query.filter { 267 + GetAuthorFeedFilter::PostsWithReplies => { 268 + posts_query.filter(schema::author_feeds::typ.eq("post")) 269 + } 270 + GetAuthorFeedFilter::PostsNoReplies => { 271 + posts_query.filter(schema::posts::parent_uri.is_null()) 272 + } 273 + GetAuthorFeedFilter::PostsWithMedia => posts_query.filter( 274 + embed_type_filter(&["app.bsky.embed.video", "app.bsky.embed.images"]) 275 + .and(schema::author_feeds::typ.eq("post")), 276 + ), 277 + GetAuthorFeedFilter::PostsAndAuthorThreads => posts_query.filter( 278 + (schema::posts::parent_uri 279 + .like(format!("at://{did}/%")) 280 + .or(schema::posts::parent_uri.is_null())) 281 + .and( 282 + schema::posts::root_uri 283 + .like(format!("at://{did}/%")) 284 + .or(schema::posts::root_uri.is_null()), 285 + ), 286 + ), 287 + GetAuthorFeedFilter::PostsWithVideo => posts_query.filter( 288 + embed_type_filter(&["app.bsky.embed.video"]) 289 + .and(schema::author_feeds::typ.eq("post")), 290 + ), 291 + }; 235 292 236 - let pin = match query.include_pins && query.cursor.is_none() { 237 - false => None, 238 - true => match crate::db::get_pinned_post_uri(&mut conn, &did).await? { 239 - Some(post) => hyd.hydrate_post(post).await, 240 - None => None, 241 - }, 242 - }; 293 + let results = posts_query 294 + .order(schema::author_feeds::sort_at.desc()) 295 + .limit(limit as i64) 296 + .load(&mut conn) 297 + .await?; 243 298 244 - let limit = query.limit.unwrap_or(50).clamp(1, 100); 299 + let cursor = results 300 + .last() 301 + .map(|item| item.sort_at.timestamp_millis().to_string()); 302 + 303 + let at_uris = results 304 + .iter() 305 + .map(|item| item.post.clone()) 306 + .collect::<Vec<_>>(); 307 + 308 + // get the actor for if we have reposted 309 + let profile = hyd 310 + .hydrate_profile_basic(did) 311 + .await 312 + .ok_or(Error::server_error(None))?; 313 + 314 + let mut posts = hyd.hydrate_feed_posts(at_uris).await; 245 315 246 - let mut posts_query = schema::author_feeds::table 247 - .select(models::AuthorFeedItem::as_select()) 248 - .left_join(schema::posts::table.on(schema::posts::at_uri.eq(schema::author_feeds::post))) 249 - .filter(schema::author_feeds::did.eq(&did)) 250 - .into_boxed(); 316 + let mut feed: Vec<_> = results 317 + .into_iter() 318 + .filter_map(|item| { 319 + posts.remove(&item.post).map(|mut fvp| { 320 + if item.typ == "repost" { 321 + fvp.reason = Some(FeedViewPostReason::Repost(FeedReasonRepost { 322 + by: profile.clone(), 323 + uri: Some(item.uri), 324 + cid: Some(item.cid), 325 + indexed_at: Default::default(), 326 + })) 327 + } 328 + fvp 329 + }) 330 + }) 331 + .collect(); 251 332 252 - if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 253 - posts_query = posts_query.filter(schema::author_feeds::sort_at.lt(cursor)); 254 - } 333 + if let Some(post) = pin { 334 + feed.insert( 335 + 0, 336 + FeedViewPost { 337 + post, 338 + reply: None, 339 + reason: Some(FeedViewPostReason::Pin), 340 + feed_context: None, 341 + }, 342 + ); 343 + } 255 344 256 - posts_query = match query.filter { 257 - GetAuthorFeedFilter::PostsWithReplies => { 258 - posts_query.filter(schema::author_feeds::typ.eq("post")) 345 + Ok(Json(FeedRes { cursor, feed })) 259 346 } 260 - GetAuthorFeedFilter::PostsNoReplies => { 261 - posts_query.filter(schema::posts::parent_uri.is_null()) 347 + Err(_) => { 348 + // Couldn't resolve the DID, fall back to public API 349 + tracing::info!( 350 + "DID not found for {}, falling back to public API", 351 + query.actor 352 + ); 353 + fallback_to_public_api(&query).await 262 354 } 263 - GetAuthorFeedFilter::PostsWithMedia => posts_query.filter( 264 - embed_type_filter(&["app.bsky.embed.video", "app.bsky.embed.images"]) 265 - .and(schema::author_feeds::typ.eq("post")), 266 - ), 267 - GetAuthorFeedFilter::PostsAndAuthorThreads => posts_query.filter( 268 - (schema::posts::parent_uri 269 - .like(format!("at://{did}/%")) 270 - .or(schema::posts::parent_uri.is_null())) 271 - .and( 272 - schema::posts::root_uri 273 - .like(format!("at://{did}/%")) 274 - .or(schema::posts::root_uri.is_null()), 275 - ), 276 - ), 277 - GetAuthorFeedFilter::PostsWithVideo => posts_query.filter( 278 - embed_type_filter(&["app.bsky.embed.video"]).and(schema::author_feeds::typ.eq("post")), 279 - ), 355 + } 356 + } 357 + 358 + // Helper function to fetch author feed from the public API 359 + async fn fallback_to_public_api(query: &GetAuthorFeedQuery) -> XrpcResult<Json<FeedRes>> { 360 + tracing::info!("Fetching author feed for {} from public API", query.actor); 361 + 362 + // Build the URL with the appropriate parameters 363 + let client = reqwest::Client::new(); 364 + let mut url = format!( 365 + "https://public.api.bsky.app/xrpc/app.bsky.feed.getAuthorFeed?actor={}", 366 + query.actor 367 + ); 368 + 369 + // Add optional parameters 370 + if let Some(limit) = query.limit { 371 + url.push_str(&format!("&limit={}", limit)); 372 + } 373 + if let Some(cursor) = &query.cursor { 374 + url.push_str(&format!("&cursor={}", cursor)); 375 + } 376 + 377 + // Add filter parameter 378 + let filter = match query.filter { 379 + GetAuthorFeedFilter::PostsWithReplies => "posts_with_replies", 380 + GetAuthorFeedFilter::PostsNoReplies => "posts_no_replies", 381 + GetAuthorFeedFilter::PostsWithMedia => "posts_with_media", 382 + GetAuthorFeedFilter::PostsAndAuthorThreads => "posts_and_author_threads", 383 + GetAuthorFeedFilter::PostsWithVideo => "posts_with_video", 280 384 }; 385 + url.push_str(&format!("&filter={}", filter)); 281 386 282 - let results = posts_query 283 - .order(schema::author_feeds::sort_at.desc()) 284 - .limit(limit as i64) 285 - .load(&mut conn) 286 - .await?; 387 + // Add includePins parameter 388 + url.push_str(&format!("&includePins={}", query.include_pins)); 287 389 288 - let cursor = results 289 - .last() 290 - .map(|item| item.sort_at.timestamp_millis().to_string()); 390 + // Make the request 391 + match client.get(&url).send().await { 392 + Ok(response) => { 393 + if response.status().is_success() { 394 + // Parse the JSON response 395 + match response.text().await { 396 + Ok(text) => { 397 + tracing::info!("Successfully fetched author feed from public API"); 291 398 292 - let at_uris = results 293 - .iter() 294 - .map(|item| item.post.clone()) 295 - .collect::<Vec<_>>(); 399 + // First parse as generic JSON to get the cursor 400 + let json_response: serde_json::Value = serde_json::from_str(&text) 401 + .map_err(|e| { 402 + tracing::error!("Failed to parse response as JSON: {}", e); 403 + Error::not_found() 404 + })?; 296 405 297 - // get the actor for if we have reposted 298 - let profile = hyd 299 - .hydrate_profile_basic(did) 300 - .await 301 - .ok_or(Error::server_error(None))?; 406 + tracing::debug!("Response: {:?}", json_response); 302 407 303 - let mut posts = hyd.hydrate_feed_posts(at_uris).await; 408 + // Parse the feed field and return it 409 + let response_struct: FeedRes = 410 + serde_json::from_str(&text).map_err(|e| { 411 + tracing::error!("Failed to deserialize feed: {}", e); 412 + Error::not_found() 413 + })?; 304 414 305 - let mut feed: Vec<_> = results 306 - .into_iter() 307 - .filter_map(|item| { 308 - posts.remove(&item.post).map(|mut fvp| { 309 - if item.typ == "repost" { 310 - fvp.reason = Some(FeedViewPostReason::Repost(FeedReasonRepost { 311 - by: profile.clone(), 312 - uri: Some(item.uri), 313 - cid: Some(item.cid), 314 - indexed_at: Default::default(), 315 - })) 415 + Ok(Json(response_struct)) 416 + } 417 + Err(e) => { 418 + tracing::error!("Failed to parse response from public API: {}", e); 419 + Err(Error::not_found()) 420 + } 316 421 } 317 - fvp 318 - }) 319 - }) 320 - .collect(); 321 - 322 - if let Some(post) = pin { 323 - feed.insert( 324 - 0, 325 - FeedViewPost { 326 - post, 327 - reply: None, 328 - reason: Some(FeedViewPostReason::Pin), 329 - feed_context: None, 330 - }, 331 - ); 422 + } else { 423 + tracing::error!("Public API returned status {}", response.status()); 424 + Err(Error::not_found()) 425 + } 426 + } 427 + Err(e) => { 428 + tracing::error!("Request to public API failed: {}", e); 429 + Err(Error::not_found()) 430 + } 332 431 } 333 - 334 - Ok(Json(FeedRes { cursor, feed })) 335 432 } 336 433 337 434 // While fixing inactive accounts, i noticed that you can still call this endpoint for a list on an ··· 473 570 not_found: true, 474 571 }); 475 572 476 - root_parent = Some(parent); 573 + root_parent = Some(Box::new(parent)); 477 574 } 478 575 479 576 let replies = tmpbuf.remove(&root.uri).unwrap_or_default(); ··· 482 579 threadgate, 483 580 thread: ThreadViewPostType::Post(Box::new(ThreadViewPost { 484 581 post: root, 485 - parent: root_parent, 582 + parent: root_parent.map(|boxed| *boxed), 486 583 replies, 487 584 })), 488 585 })) ··· 734 831 735 832 fn postview_to_tvpt( 736 833 post: PostView, 737 - parent: Option<ThreadViewPostType>, 834 + parent: Option<Box<ThreadViewPostType>>, 738 835 replies: Vec<ThreadViewPostType>, 739 836 ) -> ThreadViewPostType { 740 837 match &post.author.viewer { ··· 748 845 }, 749 846 _ => ThreadViewPostType::Post(Box::new(ThreadViewPost { 750 847 post, 751 - parent, 848 + parent: parent.map(|boxed| *boxed), 752 849 replies, 753 850 })), 754 851 }
+2 -2
parakeet/src/xrpc/app_bsky/mod.rs
··· 12 12 #[rustfmt::skip] 13 13 pub fn routes() -> Router<crate::GlobalState> { 14 14 Router::new() 15 - .route("/app.bsky.actor.getPreferences", get(actor::get_preferences)) 16 - .route("/app.bsky.actor.putPreferences", post(actor::put_preferences)) 15 + // .route("/app.bsky.actor.getPreferences", get(actor::get_preferences)) 16 + // .route("/app.bsky.actor.putPreferences", post(actor::put_preferences)) 17 17 .route("/app.bsky.actor.getProfile", get(actor::get_profile)) 18 18 .route("/app.bsky.actor.getProfiles", get(actor::get_profiles)) 19 19 // TODO: app.bsky.actor.getSuggestions (recs)