Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

feat: Feeds

Changed files
+281 -30
lexica
src
app_bsky
parakeet
src
hydration
xrpc
+1
Cargo.lock
··· 2748 2748 "multibase", 2749 2749 "parakeet-db", 2750 2750 "parakeet-index", 2751 + "reqwest", 2751 2752 "serde", 2752 2753 "serde_json", 2753 2754 "tokio",
+46 -8
lexica/src/app_bsky/feed.rs
··· 5 5 use crate::app_bsky::richtext::FacetMain; 6 6 use crate::com_atproto::label::Label; 7 7 use chrono::prelude::*; 8 - use serde::Serialize; 8 + use serde::{Deserialize, Serialize}; 9 9 use std::str::FromStr; 10 10 11 11 #[derive(Clone, Debug, Serialize)] ··· 39 39 pub reply: Option<ReplyRef>, 40 40 #[serde(skip_serializing_if = "Option::is_none")] 41 41 pub reason: Option<FeedViewPostReason>, 42 - // #[serde(skip_serializing_if = "Option::is_none")] 43 - // pub feed_context: Option<String>, 42 + #[serde(skip_serializing_if = "Option::is_none")] 43 + pub feed_context: Option<String>, 44 44 } 45 45 46 46 #[derive(Debug, Serialize)] ··· 75 75 #[serde(tag = "$type")] 76 76 pub enum FeedViewPostReason { 77 77 #[serde(rename = "app.bsky.feed.defs#reasonRepost")] 78 - Repost { 79 - by: ProfileViewBasic, 80 - #[serde(rename = "indexedAt")] 81 - indexed_at: DateTime<Utc>, 82 - }, 78 + Repost(FeedReasonRepost), 83 79 #[serde(rename = "app.bsky.feed.defs#reasonPin")] 84 80 Pin, 81 + } 82 + 83 + #[derive(Debug, Serialize)] 84 + #[serde(rename_all = "camelCase")] 85 + pub struct FeedReasonRepost { 86 + pub by: ProfileViewBasic, 87 + #[serde(skip_serializing_if = "Option::is_none")] 88 + pub uri: Option<String>, 89 + #[serde(skip_serializing_if = "Option::is_none")] 90 + pub cid: Option<String>, 91 + pub indexed_at: DateTime<Utc>, 85 92 } 86 93 87 94 #[derive(Debug, Serialize)] ··· 185 192 pub created_at: DateTime<Utc>, 186 193 pub indexed_at: DateTime<Utc>, 187 194 } 195 + 196 + #[derive(Clone, Debug, Deserialize, Serialize)] 197 + #[serde(rename_all = "camelCase")] 198 + pub struct FeedSkeletonResponse { 199 + pub feed: Vec<SkeletonFeedPost>, 200 + #[serde(skip_serializing_if = "Option::is_none")] 201 + pub cursor: Option<String>, 202 + #[serde(skip_serializing_if = "Option::is_none")] 203 + pub req_id: Option<String>, 204 + } 205 + 206 + #[derive(Clone, Debug, Deserialize, Serialize)] 207 + #[serde(rename_all = "camelCase")] 208 + pub struct SkeletonFeedPost { 209 + pub post: String, 210 + #[serde(skip_serializing_if = "Option::is_none")] 211 + pub reason: Option<SkeletonReason>, 212 + #[serde(skip_serializing_if = "Option::is_none")] 213 + pub feed_context: Option<String>, 214 + } 215 + 216 + #[derive(Clone, Debug, Deserialize, Serialize)] 217 + #[serde(tag = "$type")] 218 + pub enum SkeletonReason { 219 + #[serde(rename = "app.bsky.feed.defs#skeletonReasonPin")] 220 + Pin {}, 221 + #[serde(rename = "app.bsky.feed.defs#skeletonReasonRepost")] 222 + Repost { 223 + repost: String, 224 + }, 225 + }
+1
parakeet/Cargo.toml
··· 23 23 multibase = "0.9.1" 24 24 parakeet-db = { path = "../parakeet-db" } 25 25 parakeet-index = { path = "../parakeet-index" } 26 + reqwest = { version = "0.12", features = ["json"] } 26 27 serde = { version = "1.0.217", features = ["derive"] } 27 28 serde_json = "1.0.134" 28 29 tokio = { version = "1.42.0", features = ["full"] }
+1
parakeet/src/hydration/posts.rs
··· 207 207 post, 208 208 reply, 209 209 reason: None, 210 + feed_context: None, 210 211 }, 211 212 )) 212 213 })
+5 -3
parakeet/src/main.rs
··· 19 19 pub struct GlobalState { 20 20 pub pool: Pool<AsyncPgConnection>, 21 21 pub dataloaders: Arc<loaders::Dataloaders>, 22 + pub resolver: Arc<did_resolver::Resolver>, 22 23 pub index_client: parakeet_index::Client, 23 24 pub jwt: Arc<xrpc::jwt::JwtVerifier>, 24 25 pub cdn: Arc<xrpc::cdn::BskyCdn>, ··· 51 52 pool.clone(), 52 53 index_client.clone(), 53 54 )); 54 - let resolver = did_resolver::Resolver::new(did_resolver::ResolverOpts { 55 + let resolver = Arc::new(did_resolver::Resolver::new(did_resolver::ResolverOpts { 55 56 plc_directory: conf.plc_directory, 56 57 ..Default::default() 57 - })?; 58 + })?); 58 59 let jwt = Arc::new(xrpc::jwt::JwtVerifier::new( 59 60 conf.service.did.clone(), 60 - resolver, 61 + resolver.clone(), 61 62 )); 62 63 63 64 let cdn = Arc::new(xrpc::cdn::BskyCdn::new(conf.cdn.base, conf.cdn.video_base)); ··· 82 83 .with_state(GlobalState { 83 84 pool, 84 85 dataloaders, 86 + resolver, 85 87 index_client, 86 88 jwt, 87 89 cdn,
+215 -10
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 5 5 use crate::xrpc::{check_actor_status, datetime_cursor, get_actor_did, normalise_at_uri}; 6 6 use crate::GlobalState; 7 7 use axum::extract::{Query, State}; 8 + use axum::http::StatusCode; 8 9 use axum::Json; 9 10 use axum_extra::extract::Query as ExtraQuery; 11 + use axum_extra::headers::authorization::Bearer; 12 + use axum_extra::headers::Authorization; 13 + use axum_extra::TypedHeader; 14 + use chrono::prelude::*; 10 15 use diesel::prelude::*; 11 - use diesel_async::RunQueryDsl; 16 + use diesel_async::{AsyncPgConnection, RunQueryDsl}; 12 17 use lexica::app_bsky::actor::ProfileView; 13 18 use lexica::app_bsky::feed::{ 14 - FeedViewPost, PostView, ThreadViewPost, ThreadViewPostType, ThreadgateView, 19 + FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason, PostView, 20 + SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView, 15 21 }; 16 22 use parakeet_db::schema; 23 + use reqwest::Url; 17 24 use serde::{Deserialize, Serialize}; 18 25 use std::collections::HashMap; 19 26 20 - // TODO: getFeed: once we get auth! 27 + const FEEDGEN_SERVICE_ID: &str = "#bsky_fg"; 28 + 29 + #[derive(Debug, Serialize)] 30 + pub struct FeedRes { 31 + #[serde(skip_serializing_if = "Option::is_none")] 32 + cursor: Option<String>, 33 + feed: Vec<FeedViewPost>, 34 + } 35 + 36 + #[derive(Debug, Deserialize)] 37 + pub struct GetFeedQuery { 38 + pub feed: String, 39 + pub limit: Option<u8>, 40 + pub cursor: Option<String>, 41 + } 42 + 43 + pub async fn get_feed( 44 + State(state): State<GlobalState>, 45 + // we have to use Bearer because the tokens come with `aud` set to the feedgen did. 46 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 47 + maybe_tok: Option<TypedHeader<Authorization<Bearer>>>, 48 + Query(query): Query<GetFeedQuery>, 49 + ) -> XrpcResult<Json<FeedRes>> { 50 + let mut conn = state.pool.get().await?; 51 + 52 + // first, look up the feedgen 53 + let service_did: String = schema::feedgens::table 54 + .select(schema::feedgens::service_did) 55 + .find(&query.feed) 56 + .get_result(&mut conn) 57 + .await?; 58 + 59 + // resolve the did 60 + let did_doc = match state.resolver.resolve_did(&service_did).await { 61 + Ok(Some(did_doc)) => did_doc, 62 + Ok(None) => return Err(Error::invalid_request(None)), 63 + Err(err) => { 64 + tracing::error!( 65 + feedgen = service_did, 66 + "failed to resolve feedgen service did: {err}" 67 + ); 68 + return Err(Error::invalid_request(None)); 69 + } 70 + }; 71 + 72 + // find the service 73 + let Some(service) = did_doc.find_service_by_id(FEEDGEN_SERVICE_ID) else { 74 + tracing::error!( 75 + feedgen = service_did, 76 + "DID doc didn't contain BskyFeedGenerator service" 77 + ); 78 + return Err(Error::invalid_request(None)); 79 + }; 80 + 81 + let endpoint = service.service_endpoint.clone(); 82 + let skeleton = get_feed_skeleton( 83 + &query.feed, 84 + &endpoint, 85 + maybe_tok.as_ref(), 86 + query.limit, 87 + query.cursor, 88 + ) 89 + .await?; 90 + 91 + let maybe_auth = match maybe_tok { 92 + Some(hdr) => { 93 + match state 94 + .jwt 95 + .resolve_and_verify_jwt(hdr.token(), Some(&service_did)) 96 + .await 97 + { 98 + Some(claims) => match &state.did_allowlist { 99 + Some(allowlist) if !allowlist.contains(&claims.iss) => { 100 + return Err(Error::new( 101 + StatusCode::FORBIDDEN, 102 + "forbidden".to_string(), 103 + None, 104 + )); 105 + } 106 + _ => Some(AtpAuth(claims.iss)), 107 + }, 108 + None => None, 109 + } 110 + } 111 + None => None, 112 + }; 113 + 114 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 115 + 116 + let at_uris = skeleton.feed.iter().map(|v| v.post.clone()).collect(); 117 + let repost_skeleton = skeleton 118 + .feed 119 + .iter() 120 + .filter_map(|v| match &v.reason { 121 + Some(SkeletonReason::Repost { repost }) => Some(repost.clone()), 122 + _ => None, 123 + }) 124 + .collect::<Vec<_>>(); 125 + 126 + let mut posts = hyd.hydrate_feed_posts(at_uris).await; 127 + let mut repost_data = get_skeleton_repost_data(&mut conn, &hyd, repost_skeleton).await; 128 + 129 + let feed = skeleton 130 + .feed 131 + .into_iter() 132 + .filter_map(|item| { 133 + let mut post = posts.remove(&item.post)?; 134 + let reason = match item.reason { 135 + Some(SkeletonReason::Repost { repost }) => { 136 + repost_data.remove(&repost).map(FeedViewPostReason::Repost) 137 + } 138 + Some(SkeletonReason::Pin {}) => Some(FeedViewPostReason::Pin), 139 + _ => None, 140 + }; 141 + 142 + post.reason = reason; 143 + post.feed_context = item.feed_context; 144 + 145 + Some(post) 146 + }) 147 + .collect(); 148 + 149 + Ok(Json(FeedRes { 150 + cursor: skeleton.cursor, 151 + feed, 152 + })) 153 + } 21 154 22 155 #[derive(Debug, Deserialize)] 23 156 #[serde(rename_all = "snake_case")] ··· 45 178 pub filter: GetAuthorFeedFilter, 46 179 #[serde(default)] 47 180 pub include_pins: bool, 48 - } 49 - 50 - #[derive(Debug, Serialize)] 51 - pub struct FeedRes { 52 - #[serde(skip_serializing_if = "Option::is_none")] 53 - cursor: Option<String>, 54 - feed: Vec<FeedViewPost>, 55 181 } 56 182 57 183 pub async fn get_author_feed( ··· 460 586 .eq_any(filter) 461 587 .or(schema::posts::embed_subtype.eq_any(filter)) 462 588 } 589 + 590 + async fn get_feed_skeleton( 591 + feed: &str, 592 + service: &str, 593 + maybe_tok: Option<&TypedHeader<Authorization<Bearer>>>, 594 + limit: Option<u8>, 595 + cursor: Option<String>, 596 + ) -> XrpcResult<FeedSkeletonResponse> { 597 + let mut params = vec![("feed", feed.to_string())]; 598 + 599 + if let Some(cursor) = cursor { 600 + params.push(("cursor", cursor)); 601 + } 602 + if let Some(limit) = limit { 603 + params.push(("limit", limit.to_string())); 604 + } 605 + let url = Url::parse_with_params( 606 + &format!("{service}/xrpc/app.bsky.feed.getFeedSkeleton"), 607 + params, 608 + ) 609 + .unwrap(); 610 + 611 + let mut req = reqwest::Client::new().get(url); 612 + if let Some(auth) = maybe_tok { 613 + req = req.bearer_auth(auth.token()); 614 + } 615 + 616 + match req.send().await { 617 + Ok(skeleton) => match skeleton.json().await { 618 + Ok(skeleton) => Ok(skeleton), 619 + Err(err) => { 620 + tracing::error!("Failed to parse feed skeleton: {err}"); 621 + Err(Error::server_error(Some("Failed to fetch feed skeleton"))) 622 + } 623 + }, 624 + Err(err) => { 625 + tracing::error!("Failed to fetch feed skeleton: {err}"); 626 + Err(Error::server_error(Some("Failed to fetch feed skeleton"))) 627 + } 628 + } 629 + } 630 + 631 + async fn get_skeleton_repost_data<'a>( 632 + conn: &mut AsyncPgConnection, 633 + hyd: &StatefulHydrator<'a>, 634 + reposts: Vec<String>, 635 + ) -> HashMap<String, FeedReasonRepost> { 636 + let Ok(repost_data) = schema::records::table 637 + .select(( 638 + schema::records::at_uri, 639 + schema::records::did, 640 + schema::records::indexed_at, 641 + )) 642 + .filter(schema::records::at_uri.eq_any(&reposts)) 643 + .get_results::<(String, String, NaiveDateTime)>(conn) 644 + .await 645 + else { 646 + return HashMap::new(); 647 + }; 648 + 649 + let profiles = repost_data.iter().map(|(_, did, _)| did.clone()).collect(); 650 + let profiles = hyd.hydrate_profiles_basic(profiles).await; 651 + 652 + repost_data 653 + .into_iter() 654 + .filter_map(|(uri, did, indexed_at)| { 655 + let by = profiles.get(&did).cloned()?; 656 + 657 + let repost = FeedReasonRepost { 658 + by, 659 + uri: Some(uri.clone()), 660 + cid: None, // okay, we do have this, but the app doesn't seem to be bothered about not setting it. 661 + indexed_at: indexed_at.and_utc(), 662 + }; 663 + 664 + Some((uri, repost)) 665 + }) 666 + .collect() 667 + }
+1
parakeet/src/xrpc/app_bsky/mod.rs
··· 14 14 .route("/app.bsky.feed.getActorFeeds", get(feed::feedgen::get_actor_feeds)) 15 15 .route("/app.bsky.feed.getActorLikes", get(feed::likes::get_actor_likes)) 16 16 .route("/app.bsky.feed.getAuthorFeed", get(feed::posts::get_author_feed)) 17 + .route("/app.bsky.feed.getFeed", get(feed::posts::get_feed)) 17 18 .route("/app.bsky.feed.getLikes", get(feed::likes::get_likes)) 18 19 .route("/app.bsky.feed.getListFeed", get(feed::posts::get_list_feed)) 19 20 .route("/app.bsky.feed.getPostThread", get(feed::posts::get_post_thread))
+2 -2
parakeet/src/xrpc/extract.rs
··· 81 81 .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))? 82 82 .ok_or((StatusCode::UNAUTHORIZED, "missing JWT".to_string()))?; 83 83 84 - match state.jwt.resolve_and_verify_jwt(hdr.token()).await { 84 + match state.jwt.resolve_and_verify_jwt(hdr.token(), None).await { 85 85 Some(claims) => match &state.did_allowlist { 86 86 Some(allowlist) if !allowlist.contains(&claims.iss) => { 87 87 Err((StatusCode::FORBIDDEN, "forbidden".to_string())) ··· 110 110 return Ok(None); 111 111 }; 112 112 113 - match state.jwt.resolve_and_verify_jwt(hdr.token()).await { 113 + match state.jwt.resolve_and_verify_jwt(hdr.token(), None).await { 114 114 Some(claims) => match &state.did_allowlist { 115 115 Some(allowlist) if !allowlist.contains(&claims.iss) => { 116 116 Err((StatusCode::FORBIDDEN, "forbidden".to_string()))
+9 -7
parakeet/src/xrpc/jwt.rs
··· 2 2 use jsonwebtoken::{Algorithm, DecodingKey, Validation}; 3 3 use serde::{Deserialize, Serialize}; 4 4 use std::collections::HashMap; 5 - use std::sync::LazyLock; 5 + use std::sync::{Arc, LazyLock}; 6 6 use tokio::sync::RwLock; 7 7 8 8 static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[])); ··· 25 25 26 26 pub struct JwtVerifier { 27 27 aud: String, 28 - resolver: Resolver, 28 + resolver: Arc<Resolver>, 29 29 key_cache: RwLock<HashMap<String, String>>, 30 30 } 31 31 32 32 impl JwtVerifier { 33 - pub fn new(aud: String, resolver: Resolver) -> Self { 33 + pub fn new(aud: String, resolver: Arc<Resolver>) -> Self { 34 34 JwtVerifier { 35 35 aud, 36 36 resolver, ··· 38 38 } 39 39 } 40 40 41 - pub async fn resolve_and_verify_jwt(&self, token: &str) -> Option<Claims> { 41 + pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> { 42 42 // first we need to decode without verifying, to get iss. 43 43 let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?; 44 44 let unsafe_iss = unsafe_data.claims.iss; ··· 52 52 None => self.resolve_key(&unsafe_iss).await?, 53 53 }; 54 54 55 - self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg) 55 + let aud = aud.unwrap_or(&self.aud); 56 + self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud) 56 57 } 57 58 58 59 async fn resolve_key(&self, did: &str) -> Option<String> { ··· 73 74 pub fn verify_jwt_multibase(&self, token: &str, multibase_key: &str) -> Option<Claims> { 74 75 let alg = jsonwebtoken::decode_header(token).ok()?.alg; 75 76 76 - self.verify_jwt_multibase_with_alg(token, multibase_key, alg) 77 + self.verify_jwt_multibase_with_alg(token, multibase_key, alg, &self.aud) 77 78 } 78 79 79 80 pub fn verify_jwt_multibase_with_alg( ··· 81 82 token: &str, 82 83 multibase_key: &str, 83 84 alg: Algorithm, 85 + aud: &str, 84 86 ) -> Option<Claims> { 85 87 // decode the multibase key 86 88 let (_, key) = multibase::decode(multibase_key).ok()?; ··· 88 90 let key = DecodingKey::from_ec_der(&key[2..]); 89 91 90 92 let mut validation = Validation::new(alg); 91 - validation.set_audience(&[&self.aud]); 93 + validation.set_audience(&[&aud]); 92 94 93 95 let decoded = jsonwebtoken::decode::<Claims>(token, &key, &validation).ok()?; 94 96