Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

Compare changes

Choose any two refs to compare.

Changed files
+608 -60
lexica
src
app_bsky
parakeet
parakeet-db
src
+1
lexica/src/app_bsky/mod.rs
··· 7 7 pub mod graph; 8 8 pub mod labeler; 9 9 pub mod richtext; 10 + pub mod unspecced; 10 11 11 12 #[derive(Clone, Default, Debug, Serialize)] 12 13 #[serde(rename_all = "camelCase")]
+33
lexica/src/app_bsky/unspecced.rs
··· 1 + use crate::app_bsky::feed::{BlockedAuthor, PostView}; 2 + use serde::Serialize; 3 + 4 + #[derive(Clone, Debug, Serialize)] 5 + pub struct ThreadV2Item { 6 + pub uri: String, 7 + pub depth: i32, 8 + pub value: ThreadV2ItemType, 9 + } 10 + 11 + #[derive(Clone, Debug, Serialize)] 12 + #[serde(tag = "$type")] 13 + pub enum ThreadV2ItemType { 14 + #[serde(rename = "app.bsky.unspecced.defs#threadItemPost")] 15 + Post(ThreadItemPost), 16 + #[serde(rename = "app.bsky.unspecced.defs#threadItemNoUnauthenticated")] 17 + NoUnauthenticated {}, 18 + #[serde(rename = "app.bsky.unspecced.defs#threadItemNotFound")] 19 + NotFound {}, 20 + #[serde(rename = "app.bsky.unspecced.defs#threadItemBlocked")] 21 + Blocked { author: BlockedAuthor }, 22 + } 23 + 24 + #[derive(Clone, Debug, Serialize)] 25 + #[serde(rename_all = "camelCase")] 26 + pub struct ThreadItemPost { 27 + pub post: PostView, 28 + pub more_parents: bool, 29 + pub more_replies: i32, 30 + pub op_thread: bool, 31 + pub hidden_by_threadgate: bool, 32 + pub muted_by_viewer: bool, 33 + }
+95 -1
parakeet/src/db.rs
··· 1 1 use diesel::prelude::*; 2 - use diesel::sql_types::{Array, Bool, Nullable, Text}; 2 + use diesel::sql_types::{Array, Bool, Integer, Nullable, Text}; 3 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 4 4 use parakeet_db::{schema, types}; 5 + use parakeet_db::models::TextArray; 5 6 6 7 pub async fn get_actor_status( 7 8 conn: &mut AsyncPgConnection, ··· 196 197 .await 197 198 .optional() 198 199 } 200 + 201 + #[derive(Debug, QueryableByName)] 202 + #[diesel(check_for_backend(diesel::pg::Pg))] 203 + #[allow(unused)] 204 + pub struct ThreadItem { 205 + #[diesel(sql_type = Text)] 206 + pub at_uri: String, 207 + #[diesel(sql_type = Nullable<Text>)] 208 + pub parent_uri: Option<String>, 209 + #[diesel(sql_type = Nullable<Text>)] 210 + pub root_uri: Option<String>, 211 + #[diesel(sql_type = Integer)] 212 + pub depth: i32, 213 + } 214 + 215 + pub async fn get_thread_children( 216 + conn: &mut AsyncPgConnection, 217 + uri: &str, 218 + depth: i32, 219 + ) -> QueryResult<Vec<ThreadItem>> { 220 + diesel::sql_query(include_str!("sql/thread.sql")) 221 + .bind::<Text, _>(uri) 222 + .bind::<Integer, _>(depth) 223 + .load(conn) 224 + .await 225 + } 226 + 227 + pub async fn get_thread_children_branching( 228 + conn: &mut AsyncPgConnection, 229 + uri: &str, 230 + depth: i32, 231 + branching_factor: i32, 232 + ) -> QueryResult<Vec<ThreadItem>> { 233 + diesel::sql_query(include_str!("sql/thread_branching.sql")) 234 + .bind::<Text, _>(uri) 235 + .bind::<Integer, _>(depth) 236 + .bind::<Integer, _>(branching_factor) 237 + .load(conn) 238 + .await 239 + } 240 + 241 + #[derive(Debug, QueryableByName)] 242 + #[diesel(check_for_backend(diesel::pg::Pg))] 243 + pub struct HiddenThreadChildItem { 244 + #[diesel(sql_type = Text)] 245 + pub at_uri: String, 246 + } 247 + 248 + pub async fn get_thread_children_hidden( 249 + conn: &mut AsyncPgConnection, 250 + uri: &str, 251 + root: &str, 252 + ) -> QueryResult<Vec<HiddenThreadChildItem>> { 253 + diesel::sql_query(include_str!("sql/thread_v2_hidden_children.sql")) 254 + .bind::<Text, _>(uri) 255 + .bind::<Text, _>(root) 256 + .load(conn) 257 + .await 258 + } 259 + 260 + pub async fn get_thread_parents( 261 + conn: &mut AsyncPgConnection, 262 + uri: &str, 263 + height: i32, 264 + ) -> QueryResult<Vec<ThreadItem>> { 265 + diesel::sql_query(include_str!("sql/thread_parent.sql")) 266 + .bind::<Text, _>(uri) 267 + .bind::<Integer, _>(height) 268 + .load(conn) 269 + .await 270 + } 271 + 272 + pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> { 273 + schema::posts::table 274 + .select(schema::posts::root_uri) 275 + .find(&uri) 276 + .get_result(conn) 277 + .await 278 + .optional() 279 + .map(|v| v.flatten()) 280 + } 281 + 282 + pub async fn get_threadgate_hiddens( 283 + conn: &mut AsyncPgConnection, 284 + uri: &str, 285 + ) -> QueryResult<Option<TextArray>> { 286 + schema::threadgates::table 287 + .select(schema::threadgates::hidden_replies) 288 + .find(&uri) 289 + .get_result(conn) 290 + .await 291 + .optional() 292 + }
+5 -9
parakeet/src/hydration/labeler.rs
··· 42 42 likes: Option<i32>, 43 43 ) -> LabelerViewDetailed { 44 44 let reason_types = labeler.reasons.map(|v| { 45 - v.into_iter() 46 - .flatten() 47 - .filter_map(|v| ReasonType::from_str(&v).ok()) 45 + v.iter() 46 + .filter_map(|v| ReasonType::from_str(v).ok()) 48 47 .collect() 49 48 }); 50 49 ··· 74 73 }) 75 74 .collect(); 76 75 let subject_types = labeler.subject_types.map(|v| { 77 - v.into_iter() 78 - .flatten() 79 - .filter_map(|v| SubjectType::from_str(&v).ok()) 76 + v.iter() 77 + .filter_map(|v| SubjectType::from_str(v).ok()) 80 78 .collect() 81 79 }); 82 - let subject_collections = labeler 83 - .subject_collections 84 - .map(|v| v.into_iter().flatten().collect()); 80 + let subject_collections = labeler.subject_collections.map(Vec::from); 85 81 86 82 LabelerViewDetailed { 87 83 uri: format!("at://{}/app.bsky.labeler.service/self", labeler.did),
+3 -3
parakeet/src/hydration/posts.rs
··· 89 89 let threadgate = threadgate?; 90 90 91 91 let lists = match threadgate.allowed_lists.as_ref() { 92 - Some(allowed_lists) => allowed_lists.iter().flatten().cloned().collect(), 92 + Some(allowed_lists) => allowed_lists.clone().into(), 93 93 None => Vec::new(), 94 94 }; 95 95 let lists = self.hydrate_lists_basic(lists).await; ··· 106 106 ) -> HashMap<String, ThreadgateView> { 107 107 let lists = threadgates.iter().fold(Vec::new(), |mut acc, c| { 108 108 if let Some(lists) = &c.allowed_lists { 109 - acc.extend(lists.iter().flatten().cloned()); 109 + acc.extend(lists.clone().0); 110 110 } 111 111 acc 112 112 }); ··· 118 118 let this_lists = match &threadgate.allowed_lists { 119 119 Some(allowed_lists) => allowed_lists 120 120 .iter() 121 - .filter_map(|v| v.clone().and_then(|v| lists.get(&v).cloned())) 121 + .filter_map(|v| lists.get(v).cloned()) 122 122 .collect(), 123 123 None => Vec::new(), 124 124 };
+3 -7
parakeet/src/hydration/starter_packs.rs
··· 96 96 let feeds = sp 97 97 .feeds 98 98 .clone() 99 - .unwrap_or_default() 100 - .into_iter() 101 - .flatten() 102 - .collect(); 103 - let feeds = self.hydrate_feedgens(feeds).await.into_values().collect(); 99 + .unwrap_or_default(); 100 + let feeds = self.hydrate_feedgens(feeds.into()).await.into_values().collect(); 104 101 105 102 Some(build_spview(sp, creator, labels, list, feeds)) 106 103 } ··· 119 116 let feeds = packs 120 117 .values() 121 118 .filter_map(|pack| pack.feeds.clone()) 122 - .flat_map(|feeds| feeds.into_iter().flatten()) 119 + .flat_map(Vec::from) 123 120 .collect(); 124 121 125 122 let creators = self.hydrate_profiles_basic(creators).await; ··· 133 130 let list = lists.get(&pack.list).cloned(); 134 131 let feeds = pack.feeds.as_ref().map(|v| { 135 132 v.iter() 136 - .flatten() 137 133 .filter_map(|feed| feeds.get(feed).cloned()) 138 134 .collect() 139 135 });
+4 -1
parakeet/src/loaders.rs
··· 4 4 use dataloader::async_cached::Loader; 5 5 use dataloader::non_cached::Loader as NonCachedLoader; 6 6 use dataloader::BatchFn; 7 + use diesel::dsl::sql; 7 8 use diesel::prelude::*; 8 9 use diesel_async::pooled_connection::deadpool::Pool; 9 10 use diesel_async::{AsyncPgConnection, RunQueryDsl}; ··· 368 369 let mut conn = self.0.get().await.unwrap(); 369 370 370 371 let res = schema::posts::table 371 - .left_join(schema::threadgates::table) 372 + .left_join(schema::threadgates::table.on( 373 + schema::threadgates::post_uri.eq(sql("coalesce(posts.root_uri, posts.at_uri)")), 374 + )) 372 375 .select(( 373 376 models::Post::as_select(), 374 377 Option::<models::Threadgate>::as_select(),
+1 -1
parakeet/src/sql/thread.sql
··· 1 - with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth 1 + with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth 2 2 from posts 3 3 where parent_uri = $1 and violates_threadgate=FALSE 4 4 union all
+13
parakeet/src/sql/thread_branching.sql
··· 1 + with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth 2 + from posts 3 + where parent_uri = $1 4 + and violates_threadgate = FALSE 5 + union all 6 + (select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1 7 + from posts p 8 + join thread on p.parent_uri = thread.at_uri 9 + where thread.depth <= $2 10 + and violates_threadgate = FALSE 11 + LIMIT $3)) 12 + select * 13 + from thread;
+6
parakeet/src/sql/thread_v2_hidden_children.sql
··· 1 + select at_uri 2 + from posts 3 + where parent_uri = $1 4 + and at_uri = any (select unnest(hidden_replies) 5 + from threadgates 6 + where post_uri = $2)
+2 -24
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 361 361 pub threadgate: Option<ThreadgateView>, 362 362 } 363 363 364 - #[derive(Debug, QueryableByName)] 365 - #[diesel(check_for_backend(diesel::pg::Pg))] 366 - struct ThreadItem { 367 - #[diesel(sql_type = diesel::sql_types::Text)] 368 - at_uri: String, 369 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 370 - parent_uri: Option<String>, 371 - // #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 372 - // root_uri: Option<String>, 373 - #[diesel(sql_type = diesel::sql_types::Integer)] 374 - depth: i32, 375 - } 376 - 377 364 pub async fn get_post_thread( 378 365 State(state): State<GlobalState>, 379 366 AtpAcceptLabelers(labelers): AtpAcceptLabelers, ··· 409 396 } 410 397 } 411 398 412 - let replies = diesel::sql_query(include_str!("../../../sql/thread.sql")) 413 - .bind::<diesel::sql_types::Text, _>(&uri) 414 - .bind::<diesel::sql_types::Integer, _>(depth as i32) 415 - .load::<ThreadItem>(&mut conn) 416 - .await?; 417 - 418 - let parents = diesel::sql_query(include_str!("../../../sql/thread_parent.sql")) 419 - .bind::<diesel::sql_types::Text, _>(&uri) 420 - .bind::<diesel::sql_types::Integer, _>(parent_height as i32) 421 - .load::<ThreadItem>(&mut conn) 422 - .await?; 399 + let replies = crate::db::get_thread_children(&mut conn, &uri, depth as i32).await?; 400 + let parents = crate::db::get_thread_parents(&mut conn, &uri, parent_height as i32).await?; 423 401 424 402 let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect(); 425 403 let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect();
+3
parakeet/src/xrpc/app_bsky/mod.rs
··· 6 6 mod feed; 7 7 mod graph; 8 8 mod labeler; 9 + mod unspecced; 9 10 10 11 #[rustfmt::skip] 11 12 pub fn routes() -> Router<crate::GlobalState> { ··· 64 65 // TODO: app.bsky.notification.putActivitySubscriptions 65 66 // TODO: app.bsky.notification.putPreferences 66 67 // TODO: app.bsky.notification.putPreferencesV2 68 + .route("/app.bsky.unspecced.getPostThreadV2", get(unspecced::thread_v2::get_post_thread_v2)) 69 + .route("/app.bsky.unspecced.getPostThreadOtherV2", get(unspecced::thread_v2::get_post_thread_other_v2)) 67 70 } 68 71 69 72 async fn not_implemented() -> axum::http::StatusCode {
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
··· 1 + pub mod thread_v2;
+382
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
··· 1 + use crate::db::ThreadItem; 2 + use crate::hydration::StatefulHydrator; 3 + use crate::xrpc::error::{Error, XrpcResult}; 4 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 5 + use crate::xrpc::normalise_at_uri; 6 + use crate::GlobalState; 7 + use axum::extract::{Query, State}; 8 + use axum::Json; 9 + use itertools::Itertools; 10 + use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadgateView}; 11 + use lexica::app_bsky::unspecced::{ThreadItemPost, ThreadV2Item, ThreadV2ItemType}; 12 + use serde::{Deserialize, Serialize}; 13 + use std::cmp::Ordering; 14 + use std::collections::{HashMap, HashSet}; 15 + 16 + const THREAD_PARENTS: usize = 50; 17 + const DEFAULT_BRANCHING: u32 = 10; 18 + const DEFAULT_DEPTH: u32 = 6; 19 + 20 + #[derive(Copy, Clone, Debug, Default, Deserialize)] 21 + #[serde(rename_all = "lowercase")] 22 + pub enum PostThreadSort { 23 + Newest, 24 + #[default] 25 + Oldest, 26 + Top, 27 + } 28 + 29 + #[derive(Debug, Deserialize)] 30 + #[serde(rename_all = "camelCase")] 31 + pub struct GetPostThreadV2Req { 32 + pub anchor: String, 33 + pub above: Option<bool>, 34 + pub below: Option<u32>, 35 + pub branching_factor: Option<u32>, 36 + #[serde(default)] 37 + pub sort: PostThreadSort, 38 + } 39 + 40 + #[derive(Debug, Serialize)] 41 + #[serde(rename_all = "camelCase")] 42 + pub struct GetPostThreadV2Res { 43 + pub thread: Vec<ThreadV2Item>, 44 + #[serde(skip_serializing_if = "Option::is_none")] 45 + pub threadgate: Option<ThreadgateView>, 46 + pub has_other_replies: bool, 47 + } 48 + 49 + pub async fn get_post_thread_v2( 50 + State(state): State<GlobalState>, 51 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 52 + maybe_auth: Option<AtpAuth>, 53 + Query(query): Query<GetPostThreadV2Req>, 54 + ) -> XrpcResult<Json<GetPostThreadV2Res>> { 55 + let mut conn = state.pool.get().await?; 56 + let maybe_did = maybe_auth.clone().map(|v| v.0); 57 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 58 + 59 + let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 60 + let depth = query.below.unwrap_or(DEFAULT_DEPTH).clamp(0, 20) as i32; 61 + let branching_factor = query 62 + .branching_factor 63 + .unwrap_or(DEFAULT_BRANCHING) 64 + .clamp(0, 100) as i32; 65 + 66 + let anchor = hyd 67 + .hydrate_post(uri.clone()) 68 + .await 69 + .ok_or(Error::not_found())?; 70 + 71 + if let Some(v) = &anchor.author.viewer { 72 + if v.blocked_by || v.blocking.is_some() { 73 + let block = ThreadV2ItemType::Blocked { 74 + author: BlockedAuthor { 75 + did: anchor.author.did, 76 + viewer: anchor.author.viewer, 77 + }, 78 + }; 79 + 80 + return Ok(Json(GetPostThreadV2Res { 81 + thread: vec![ThreadV2Item { 82 + uri, 83 + depth: 0, 84 + value: block, 85 + }], 86 + threadgate: anchor.threadgate, 87 + has_other_replies: false, 88 + })); 89 + } 90 + } 91 + 92 + // get the root post URI (if there is one) and return its author's DID. 93 + let root_uri = crate::db::get_root_post(&mut conn, &uri) 94 + .await? 95 + .unwrap_or(uri.clone()); 96 + let root_did = root_uri[5..].split('/').collect::<Vec<_>>()[0]; 97 + 98 + let replies = 99 + crate::db::get_thread_children_branching(&mut conn, &uri, depth, branching_factor + 1) 100 + .await?; 101 + let reply_uris = replies 102 + .iter() 103 + .map(|item| item.at_uri.clone()) 104 + .collect::<Vec<_>>(); 105 + 106 + // bluesky seems to use -50 atm. we get 1 extra to know if to set more_parents. 107 + let parents = match query.above.unwrap_or(true) { 108 + true => crate::db::get_thread_parents(&mut conn, &uri, THREAD_PARENTS as i32 + 1).await?, 109 + false => vec![], 110 + }; 111 + let parent_uris = parents 112 + .iter() 113 + .map(|item| item.at_uri.clone()) 114 + .collect::<Vec<_>>(); 115 + 116 + let (mut replies_hyd, mut parents_hyd) = tokio::join!( 117 + hyd.hydrate_posts(reply_uris), 118 + hyd.hydrate_posts(parent_uris), 119 + ); 120 + 121 + let threadgate = anchor.threadgate.clone(); 122 + let hidden: HashSet<_, std::hash::RandomState> = match &threadgate { 123 + Some(tg) => crate::db::get_threadgate_hiddens(&mut conn, &tg.uri).await?, 124 + None => None, 125 + } 126 + .map(|hiddens| HashSet::from_iter(Vec::from(hiddens))) 127 + .unwrap_or_default(); 128 + 129 + let root_has_more = parents.len() > THREAD_PARENTS; 130 + let mut is_op_thread = true; 131 + 132 + let mut thread = Vec::with_capacity(1 + replies.len() + parents.len()); 133 + 134 + thread.extend( 135 + parents 136 + .into_iter() 137 + .tail(THREAD_PARENTS) 138 + .enumerate() 139 + .map(|(idx, item)| { 140 + let value = parents_hyd 141 + .remove(&item.at_uri) 142 + .map(|post| { 143 + if let Some(v) = &post.author.viewer { 144 + if v.blocked_by || v.blocking.is_some() { 145 + return ThreadV2ItemType::Blocked { 146 + author: BlockedAuthor { 147 + did: post.author.did, 148 + viewer: post.author.viewer, 149 + }, 150 + }; 151 + } 152 + } 153 + 154 + let op_thread = (is_op_thread 155 + || item.root_uri.is_none() && item.parent_uri.is_none()) 156 + && post.author.did == root_did; 157 + 158 + ThreadV2ItemType::Post(ThreadItemPost { 159 + post, 160 + more_parents: idx == 0 && root_has_more, 161 + more_replies: 0, 162 + op_thread, 163 + hidden_by_threadgate: false, 164 + muted_by_viewer: false, 165 + }) 166 + }) 167 + .unwrap_or(ThreadV2ItemType::NotFound {}); 168 + 169 + ThreadV2Item { 170 + uri: item.at_uri, 171 + depth: -item.depth - 1, 172 + value, 173 + } 174 + }), 175 + ); 176 + 177 + is_op_thread = is_op_thread && anchor.author.did == root_did; 178 + thread.push(ThreadV2Item { 179 + uri: uri.clone(), 180 + depth: 0, 181 + value: ThreadV2ItemType::Post(ThreadItemPost { 182 + post: anchor, 183 + more_parents: false, 184 + more_replies: 0, 185 + op_thread: is_op_thread, 186 + hidden_by_threadgate: false, 187 + muted_by_viewer: false, 188 + }), 189 + }); 190 + 191 + let mut replies_grouped = replies 192 + .into_iter() 193 + .into_group_map_by(|item| item.parent_uri.clone().unwrap_or_default()); 194 + 195 + // start with the anchor 196 + let (children, has_other_replies) = build_thread_children( 197 + &mut replies_grouped, 198 + &mut replies_hyd, 199 + &hidden, 200 + &uri, 201 + is_op_thread, 202 + 1, 203 + &BuildThreadChildrenOpts { 204 + root_did, 205 + sort: query.sort, 206 + maybe_did: &maybe_did, 207 + max_depth: depth, 208 + }, 209 + ); 210 + thread.extend(children); 211 + 212 + Ok(Json(GetPostThreadV2Res { 213 + thread, 214 + threadgate, 215 + has_other_replies, 216 + })) 217 + } 218 + 219 + #[derive(Debug, Deserialize)] 220 + #[serde(rename_all = "camelCase")] 221 + pub struct GetPostThreadOtherV2Req { 222 + pub anchor: String, 223 + } 224 + 225 + #[derive(Debug, Serialize)] 226 + #[serde(rename_all = "camelCase")] 227 + pub struct GetPostThreadOtherV2Res { 228 + pub thread: Vec<ThreadV2Item>, 229 + } 230 + 231 + pub async fn get_post_thread_other_v2( 232 + State(state): State<GlobalState>, 233 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 234 + maybe_auth: Option<AtpAuth>, 235 + Query(query): Query<GetPostThreadOtherV2Req>, 236 + ) -> XrpcResult<Json<GetPostThreadOtherV2Res>> { 237 + let mut conn = state.pool.get().await?; 238 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 239 + 240 + let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 241 + 242 + let root = crate::db::get_root_post(&mut conn, &uri) 243 + .await? 244 + .unwrap_or(uri.clone()); 245 + 246 + // this only returns immediate children (depth==1) where hiddenByThreadgate=TRUE 247 + let replies = crate::db::get_thread_children_hidden(&mut conn, &uri, &root).await?; 248 + let reply_uris = replies 249 + .into_iter() 250 + .map(|item| item.at_uri) 251 + .collect::<Vec<_>>(); 252 + let thread = hyd 253 + .hydrate_posts(reply_uris) 254 + .await 255 + .into_iter() 256 + .filter(|(_, post)| match &post.author.viewer { 257 + Some(viewer) if viewer.blocked_by || viewer.blocking.is_some() => false, 258 + _ => true, 259 + }) 260 + .map(|(uri, post)| { 261 + let post = ThreadItemPost { 262 + post, 263 + more_parents: false, 264 + more_replies: 0, 265 + op_thread: false, 266 + hidden_by_threadgate: true, 267 + muted_by_viewer: false, 268 + }; 269 + 270 + ThreadV2Item { 271 + uri, 272 + depth: 1, 273 + value: ThreadV2ItemType::Post(post), 274 + } 275 + }) 276 + .collect(); 277 + 278 + Ok(Json(GetPostThreadOtherV2Res { thread })) 279 + } 280 + 281 + #[derive(Debug)] 282 + struct BuildThreadChildrenOpts<'a> { 283 + root_did: &'a str, 284 + sort: PostThreadSort, 285 + maybe_did: &'a Option<String>, 286 + max_depth: i32, 287 + } 288 + 289 + fn build_thread_children( 290 + grouped_replies: &mut HashMap<String, Vec<ThreadItem>>, 291 + replies_hyd: &mut HashMap<String, PostView>, 292 + hidden: &HashSet<String>, 293 + parent: &str, 294 + is_op_thread: bool, 295 + depth: i32, 296 + opts: &BuildThreadChildrenOpts, 297 + ) -> (Vec<ThreadV2Item>, bool) { 298 + let mut has_other_replies = false; 299 + 300 + let Some(replies) = grouped_replies.remove(parent) else { 301 + return (Vec::default(), has_other_replies); 302 + }; 303 + 304 + let replies = replies 305 + .into_iter() 306 + .filter_map(|item| replies_hyd.remove(&item.at_uri)) 307 + .sorted_by(sort_replies(&opts.sort)); 308 + 309 + let mut out = Vec::new(); 310 + 311 + for post in replies { 312 + let reply_count = grouped_replies 313 + .get(&post.uri) 314 + .map(|v| v.len()) 315 + .unwrap_or_default(); 316 + let at_max = depth == opts.max_depth; 317 + let more_replies = if at_max { reply_count } else { 0 }; 318 + let op_thread = is_op_thread && post.author.did == opts.root_did; 319 + 320 + // shouldn't push to the thread if there's a block relation. Bsky doesn't push a type of Blocked for replies... 321 + if let Some(v) = &post.author.viewer { 322 + if v.blocked_by || v.blocking.is_some() { 323 + continue; 324 + } 325 + } 326 + 327 + // check if the post is hidden AND we're NOT the author (hidden posts still show for their author) 328 + if hidden.contains(&post.uri) && !did_is_cur(opts.maybe_did, &post.author.did) { 329 + // post is hidden - do not ~pass go~ push to the thread. 330 + if depth == 1 { 331 + has_other_replies = true; 332 + } 333 + continue; 334 + } 335 + 336 + let uri = post.uri.clone(); 337 + out.push(ThreadV2Item { 338 + uri: post.uri.clone(), 339 + depth, 340 + value: ThreadV2ItemType::Post(ThreadItemPost { 341 + post, 342 + more_parents: false, 343 + more_replies: more_replies as i32, 344 + op_thread, 345 + hidden_by_threadgate: false, 346 + muted_by_viewer: false, 347 + }), 348 + }); 349 + 350 + if !at_max { 351 + // we don't care about has_other_replies when recursing 352 + let (children, _) = build_thread_children( 353 + grouped_replies, 354 + replies_hyd, 355 + hidden, 356 + &uri, 357 + op_thread, 358 + depth + 1, 359 + opts, 360 + ); 361 + 362 + out.extend(children); 363 + } 364 + } 365 + 366 + (out, has_other_replies) 367 + } 368 + 369 + fn sort_replies(sort: &PostThreadSort) -> impl Fn(&PostView, &PostView) -> Ordering + use<'_> { 370 + move |a: &PostView, b: &PostView| match sort { 371 + PostThreadSort::Newest => b.indexed_at.cmp(&a.indexed_at), 372 + PostThreadSort::Oldest => a.indexed_at.cmp(&b.indexed_at), 373 + PostThreadSort::Top => b.stats.like_count.cmp(&a.stats.like_count), 374 + } 375 + } 376 + 377 + fn did_is_cur(cur: &Option<String>, did: &String) -> bool { 378 + match cur { 379 + Some(cur) => did == cur, 380 + None => false, 381 + } 382 + }
+1 -1
parakeet/src/xrpc/community_lexicon/bookmarks.rs
··· 60 60 .into_iter() 61 61 .map(|bookmark| Bookmark { 62 62 subject: bookmark.subject, 63 - tags: bookmark.tags.into_iter().flatten().collect(), 63 + tags: bookmark.tags.into(), 64 64 created_at: bookmark.created_at, 65 65 }) 66 66 .collect();
+55 -13
parakeet-db/src/models.rs
··· 137 137 138 138 pub content: String, 139 139 pub facets: Option<serde_json::Value>, 140 - pub languages: Vec<Option<String>>, 141 - pub tags: Vec<Option<String>>, 140 + pub languages: not_null_vec::TextArray, 141 + pub tags: not_null_vec::TextArray, 142 142 143 143 pub parent_uri: Option<String>, 144 144 pub parent_cid: Option<String>, ··· 148 148 pub embed: Option<String>, 149 149 pub embed_subtype: Option<String>, 150 150 151 - pub mentions: Option<Vec<Option<String>>>, 151 + pub mentions: Option<not_null_vec::TextArray>, 152 152 pub violates_threadgate: bool, 153 153 154 154 pub created_at: DateTime<Utc>, ··· 236 236 pub cid: String, 237 237 pub post_uri: String, 238 238 239 - pub detached: Vec<Option<String>>, 240 - pub rules: Vec<Option<String>>, 239 + pub detached: not_null_vec::TextArray, 240 + pub rules: not_null_vec::TextArray, 241 241 242 242 pub created_at: DateTime<Utc>, 243 243 pub indexed_at: NaiveDateTime, ··· 252 252 pub cid: String, 253 253 pub post_uri: String, 254 254 255 - pub hidden_replies: Vec<Option<String>>, 256 - pub allow: Option<Vec<Option<String>>>, 257 - pub allowed_lists: Option<Vec<Option<String>>>, 255 + pub hidden_replies: not_null_vec::TextArray, 256 + pub allow: Option<not_null_vec::TextArray>, 257 + pub allowed_lists: Option<not_null_vec::TextArray>, 258 258 259 259 pub record: serde_json::Value, 260 260 ··· 276 276 pub description: Option<String>, 277 277 pub description_facets: Option<serde_json::Value>, 278 278 pub list: String, 279 - pub feeds: Option<Vec<Option<String>>>, 279 + pub feeds: Option<not_null_vec::TextArray>, 280 280 281 281 pub created_at: DateTime<Utc>, 282 282 pub indexed_at: NaiveDateTime, ··· 290 290 pub did: String, 291 291 pub cid: String, 292 292 293 - pub reasons: Option<Vec<Option<String>>>, 294 - pub subject_types: Option<Vec<Option<String>>>, 295 - pub subject_collections: Option<Vec<Option<String>>>, 293 + pub reasons: Option<not_null_vec::TextArray>, 294 + pub subject_types: Option<not_null_vec::TextArray>, 295 + pub subject_collections: Option<not_null_vec::TextArray>, 296 296 297 297 pub created_at: NaiveDateTime, 298 298 pub indexed_at: NaiveDateTime, ··· 402 402 pub subject: String, 403 403 pub subject_cid: Option<String>, 404 404 pub subject_type: String, 405 - pub tags: Vec<Option<String>>, 405 + pub tags: not_null_vec::TextArray, 406 406 pub created_at: DateTime<Utc>, 407 407 } 408 408 ··· 430 430 pub typ: String, 431 431 pub sort_at: DateTime<Utc>, 432 432 } 433 + 434 + pub use not_null_vec::TextArray; 435 + mod not_null_vec { 436 + use diesel::deserialize::FromSql; 437 + use diesel::pg::Pg; 438 + use diesel::sql_types::{Array, Nullable, Text}; 439 + use diesel::{deserialize, FromSqlRow}; 440 + use serde::{Deserialize, Serialize}; 441 + use std::ops::{Deref, DerefMut}; 442 + 443 + #[derive(Clone, Debug, Default, Serialize, Deserialize, FromSqlRow)] 444 + #[diesel(sql_type = Array<Nullable<Text>>)] 445 + pub struct TextArray(pub Vec<String>); 446 + 447 + impl FromSql<Array<Nullable<Text>>, Pg> for TextArray { 448 + fn from_sql(bytes: diesel::pg::PgValue<'_>) -> deserialize::Result<Self> { 449 + let vec_with_nulls = 450 + <Vec<Option<String>> as FromSql<Array<Nullable<Text>>, Pg>>::from_sql(bytes)?; 451 + Ok(TextArray(vec_with_nulls.into_iter().flatten().collect())) 452 + } 453 + } 454 + 455 + impl Deref for TextArray { 456 + type Target = Vec<String>; 457 + 458 + fn deref(&self) -> &Self::Target { 459 + &self.0 460 + } 461 + } 462 + 463 + impl DerefMut for TextArray { 464 + fn deref_mut(&mut self) -> &mut Self::Target { 465 + &mut self.0 466 + } 467 + } 468 + 469 + impl From<TextArray> for Vec<String> { 470 + fn from(v: TextArray) -> Vec<String> { 471 + v.0 472 + } 473 + } 474 + }