Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

feat(parakeet): getPostThreadV2 (and ...Other)

authored by handle.invalid and committed by Tangled 65604b50 8f126b02

Changed files
+462
parakeet
parakeet-db
src
+1
parakeet-db/src/models.rs
··· 431 431 pub sort_at: DateTime<Utc>, 432 432 } 433 433 434 + pub use not_null_vec::TextArray; 434 435 mod not_null_vec { 435 436 use diesel::deserialize::FromSql; 436 437 use diesel::pg::Pg;
+56
parakeet/src/db.rs
··· 2 2 use diesel::sql_types::{Array, Bool, Integer, Nullable, Text}; 3 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 4 4 use parakeet_db::{schema, types}; 5 + use parakeet_db::models::TextArray; 5 6 6 7 pub async fn get_actor_status( 7 8 conn: &mut AsyncPgConnection, ··· 223 224 .await 224 225 } 225 226 227 + pub async fn get_thread_children_branching( 228 + conn: &mut AsyncPgConnection, 229 + uri: &str, 230 + depth: i32, 231 + branching_factor: i32, 232 + ) -> QueryResult<Vec<ThreadItem>> { 233 + diesel::sql_query(include_str!("sql/thread_branching.sql")) 234 + .bind::<Text, _>(uri) 235 + .bind::<Integer, _>(depth) 236 + .bind::<Integer, _>(branching_factor) 237 + .load(conn) 238 + .await 239 + } 240 + 241 + #[derive(Debug, QueryableByName)] 242 + #[diesel(check_for_backend(diesel::pg::Pg))] 243 + pub struct HiddenThreadChildItem { 244 + #[diesel(sql_type = Text)] 245 + pub at_uri: String, 246 + } 247 + 248 + pub async fn get_thread_children_hidden( 249 + conn: &mut AsyncPgConnection, 250 + uri: &str, 251 + root: &str, 252 + ) -> QueryResult<Vec<HiddenThreadChildItem>> { 253 + diesel::sql_query(include_str!("sql/thread_v2_hidden_children.sql")) 254 + .bind::<Text, _>(uri) 255 + .bind::<Text, _>(root) 256 + .load(conn) 257 + .await 258 + } 259 + 226 260 pub async fn get_thread_parents( 227 261 conn: &mut AsyncPgConnection, 228 262 uri: &str, ··· 234 268 .load(conn) 235 269 .await 236 270 } 271 + 272 + pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> { 273 + schema::posts::table 274 + .select(schema::posts::root_uri) 275 + .find(&uri) 276 + .get_result(conn) 277 + .await 278 + .optional() 279 + .map(|v| v.flatten()) 280 + } 281 + 282 + pub async fn get_threadgate_hiddens( 283 + conn: &mut AsyncPgConnection, 284 + uri: &str, 285 + ) -> QueryResult<Option<TextArray>> { 286 + schema::threadgates::table 287 + .select(schema::threadgates::hidden_replies) 288 + .find(&uri) 289 + .get_result(conn) 290 + .await 291 + .optional() 292 + }
+13
parakeet/src/sql/thread_branching.sql
··· 1 + with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth 2 + from posts 3 + where parent_uri = $1 4 + and violates_threadgate = FALSE 5 + union all 6 + (select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1 7 + from posts p 8 + join thread on p.parent_uri = thread.at_uri 9 + where thread.depth <= $2 10 + and violates_threadgate = FALSE 11 + LIMIT $3)) 12 + select * 13 + from thread;
+6
parakeet/src/sql/thread_v2_hidden_children.sql
··· 1 + select at_uri 2 + from posts 3 + where parent_uri = $1 4 + and at_uri = any (select unnest(hidden_replies) 5 + from threadgates 6 + where post_uri = $2)
+3
parakeet/src/xrpc/app_bsky/mod.rs
··· 6 6 mod feed; 7 7 mod graph; 8 8 mod labeler; 9 + mod unspecced; 9 10 10 11 #[rustfmt::skip] 11 12 pub fn routes() -> Router<crate::GlobalState> { ··· 64 65 // TODO: app.bsky.notification.putActivitySubscriptions 65 66 // TODO: app.bsky.notification.putPreferences 66 67 // TODO: app.bsky.notification.putPreferencesV2 68 + .route("/app.bsky.unspecced.getPostThreadV2", get(unspecced::thread_v2::get_post_thread_v2)) 69 + .route("/app.bsky.unspecced.getPostThreadOtherV2", get(unspecced::thread_v2::get_post_thread_other_v2)) 67 70 } 68 71 69 72 async fn not_implemented() -> axum::http::StatusCode {
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
··· 1 + pub mod thread_v2;
+382
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
··· 1 + use crate::db::ThreadItem; 2 + use crate::hydration::StatefulHydrator; 3 + use crate::xrpc::error::{Error, XrpcResult}; 4 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 5 + use crate::xrpc::normalise_at_uri; 6 + use crate::GlobalState; 7 + use axum::extract::{Query, State}; 8 + use axum::Json; 9 + use itertools::Itertools; 10 + use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadgateView}; 11 + use lexica::app_bsky::unspecced::{ThreadItemPost, ThreadV2Item, ThreadV2ItemType}; 12 + use serde::{Deserialize, Serialize}; 13 + use std::cmp::Ordering; 14 + use std::collections::{HashMap, HashSet}; 15 + 16 + const THREAD_PARENTS: usize = 50; 17 + const DEFAULT_BRANCHING: u32 = 10; 18 + const DEFAULT_DEPTH: u32 = 6; 19 + 20 + #[derive(Copy, Clone, Debug, Default, Deserialize)] 21 + #[serde(rename_all = "lowercase")] 22 + pub enum PostThreadSort { 23 + Newest, 24 + #[default] 25 + Oldest, 26 + Top, 27 + } 28 + 29 + #[derive(Debug, Deserialize)] 30 + #[serde(rename_all = "camelCase")] 31 + pub struct GetPostThreadV2Req { 32 + pub anchor: String, 33 + pub above: Option<bool>, 34 + pub below: Option<u32>, 35 + pub branching_factor: Option<u32>, 36 + #[serde(default)] 37 + pub sort: PostThreadSort, 38 + } 39 + 40 + #[derive(Debug, Serialize)] 41 + #[serde(rename_all = "camelCase")] 42 + pub struct GetPostThreadV2Res { 43 + pub thread: Vec<ThreadV2Item>, 44 + #[serde(skip_serializing_if = "Option::is_none")] 45 + pub threadgate: Option<ThreadgateView>, 46 + pub has_other_replies: bool, 47 + } 48 + 49 + pub async fn get_post_thread_v2( 50 + State(state): State<GlobalState>, 51 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 52 + maybe_auth: Option<AtpAuth>, 53 + Query(query): Query<GetPostThreadV2Req>, 54 + ) -> XrpcResult<Json<GetPostThreadV2Res>> { 55 + let mut conn = state.pool.get().await?; 56 + let maybe_did = maybe_auth.clone().map(|v| v.0); 57 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 58 + 59 + let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 60 + let depth = query.below.unwrap_or(DEFAULT_DEPTH).clamp(0, 20) as i32; 61 + let branching_factor = query 62 + .branching_factor 63 + .unwrap_or(DEFAULT_BRANCHING) 64 + .clamp(0, 100) as i32; 65 + 66 + let anchor = hyd 67 + .hydrate_post(uri.clone()) 68 + .await 69 + .ok_or(Error::not_found())?; 70 + 71 + if let Some(v) = &anchor.author.viewer { 72 + if v.blocked_by || v.blocking.is_some() { 73 + let block = ThreadV2ItemType::Blocked { 74 + author: BlockedAuthor { 75 + did: anchor.author.did, 76 + viewer: anchor.author.viewer, 77 + }, 78 + }; 79 + 80 + return Ok(Json(GetPostThreadV2Res { 81 + thread: vec![ThreadV2Item { 82 + uri, 83 + depth: 0, 84 + value: block, 85 + }], 86 + threadgate: anchor.threadgate, 87 + has_other_replies: false, 88 + })); 89 + } 90 + } 91 + 92 + // get the root post URI (if there is one) and return its author's DID. 93 + let root_uri = crate::db::get_root_post(&mut conn, &uri) 94 + .await? 95 + .unwrap_or(uri.clone()); 96 + let root_did = root_uri[5..].split('/').collect::<Vec<_>>()[0]; 97 + 98 + let replies = 99 + crate::db::get_thread_children_branching(&mut conn, &uri, depth, branching_factor + 1) 100 + .await?; 101 + let reply_uris = replies 102 + .iter() 103 + .map(|item| item.at_uri.clone()) 104 + .collect::<Vec<_>>(); 105 + 106 + // bluesky seems to use -50 atm. we get 1 extra to know if to set more_parents. 107 + let parents = match query.above.unwrap_or(true) { 108 + true => crate::db::get_thread_parents(&mut conn, &uri, THREAD_PARENTS as i32 + 1).await?, 109 + false => vec![], 110 + }; 111 + let parent_uris = parents 112 + .iter() 113 + .map(|item| item.at_uri.clone()) 114 + .collect::<Vec<_>>(); 115 + 116 + let (mut replies_hyd, mut parents_hyd) = tokio::join!( 117 + hyd.hydrate_posts(reply_uris), 118 + hyd.hydrate_posts(parent_uris), 119 + ); 120 + 121 + let threadgate = anchor.threadgate.clone(); 122 + let hidden: HashSet<_, std::hash::RandomState> = match &threadgate { 123 + Some(tg) => crate::db::get_threadgate_hiddens(&mut conn, &tg.uri).await?, 124 + None => None, 125 + } 126 + .map(|hiddens| HashSet::from_iter(Vec::from(hiddens))) 127 + .unwrap_or_default(); 128 + 129 + let root_has_more = parents.len() > THREAD_PARENTS; 130 + let mut is_op_thread = true; 131 + 132 + let mut thread = Vec::with_capacity(1 + replies.len() + parents.len()); 133 + 134 + thread.extend( 135 + parents 136 + .into_iter() 137 + .tail(THREAD_PARENTS) 138 + .enumerate() 139 + .map(|(idx, item)| { 140 + let value = parents_hyd 141 + .remove(&item.at_uri) 142 + .map(|post| { 143 + if let Some(v) = &post.author.viewer { 144 + if v.blocked_by || v.blocking.is_some() { 145 + return ThreadV2ItemType::Blocked { 146 + author: BlockedAuthor { 147 + did: post.author.did, 148 + viewer: post.author.viewer, 149 + }, 150 + }; 151 + } 152 + } 153 + 154 + let op_thread = (is_op_thread 155 + || item.root_uri.is_none() && item.parent_uri.is_none()) 156 + && post.author.did == root_did; 157 + 158 + ThreadV2ItemType::Post(ThreadItemPost { 159 + post, 160 + more_parents: idx == 0 && root_has_more, 161 + more_replies: 0, 162 + op_thread, 163 + hidden_by_threadgate: false, 164 + muted_by_viewer: false, 165 + }) 166 + }) 167 + .unwrap_or(ThreadV2ItemType::NotFound {}); 168 + 169 + ThreadV2Item { 170 + uri: item.at_uri, 171 + depth: -item.depth - 1, 172 + value, 173 + } 174 + }), 175 + ); 176 + 177 + is_op_thread = is_op_thread && anchor.author.did == root_did; 178 + thread.push(ThreadV2Item { 179 + uri: uri.clone(), 180 + depth: 0, 181 + value: ThreadV2ItemType::Post(ThreadItemPost { 182 + post: anchor, 183 + more_parents: false, 184 + more_replies: 0, 185 + op_thread: is_op_thread, 186 + hidden_by_threadgate: false, 187 + muted_by_viewer: false, 188 + }), 189 + }); 190 + 191 + let mut replies_grouped = replies 192 + .into_iter() 193 + .into_group_map_by(|item| item.parent_uri.clone().unwrap_or_default()); 194 + 195 + // start with the anchor 196 + let (children, has_other_replies) = build_thread_children( 197 + &mut replies_grouped, 198 + &mut replies_hyd, 199 + &hidden, 200 + &uri, 201 + is_op_thread, 202 + 1, 203 + &BuildThreadChildrenOpts { 204 + root_did, 205 + sort: query.sort, 206 + maybe_did: &maybe_did, 207 + max_depth: depth, 208 + }, 209 + ); 210 + thread.extend(children); 211 + 212 + Ok(Json(GetPostThreadV2Res { 213 + thread, 214 + threadgate, 215 + has_other_replies, 216 + })) 217 + } 218 + 219 + #[derive(Debug, Deserialize)] 220 + #[serde(rename_all = "camelCase")] 221 + pub struct GetPostThreadOtherV2Req { 222 + pub anchor: String, 223 + } 224 + 225 + #[derive(Debug, Serialize)] 226 + #[serde(rename_all = "camelCase")] 227 + pub struct GetPostThreadOtherV2Res { 228 + pub thread: Vec<ThreadV2Item>, 229 + } 230 + 231 + pub async fn get_post_thread_other_v2( 232 + State(state): State<GlobalState>, 233 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 234 + maybe_auth: Option<AtpAuth>, 235 + Query(query): Query<GetPostThreadOtherV2Req>, 236 + ) -> XrpcResult<Json<GetPostThreadOtherV2Res>> { 237 + let mut conn = state.pool.get().await?; 238 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 239 + 240 + let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 241 + 242 + let root = crate::db::get_root_post(&mut conn, &uri) 243 + .await? 244 + .unwrap_or(uri.clone()); 245 + 246 + // this only returns immediate children (depth==1) where hiddenByThreadgate=TRUE 247 + let replies = crate::db::get_thread_children_hidden(&mut conn, &uri, &root).await?; 248 + let reply_uris = replies 249 + .into_iter() 250 + .map(|item| item.at_uri) 251 + .collect::<Vec<_>>(); 252 + let thread = hyd 253 + .hydrate_posts(reply_uris) 254 + .await 255 + .into_iter() 256 + .filter(|(_, post)| match &post.author.viewer { 257 + Some(viewer) if viewer.blocked_by || viewer.blocking.is_some() => false, 258 + _ => true, 259 + }) 260 + .map(|(uri, post)| { 261 + let post = ThreadItemPost { 262 + post, 263 + more_parents: false, 264 + more_replies: 0, 265 + op_thread: false, 266 + hidden_by_threadgate: true, 267 + muted_by_viewer: false, 268 + }; 269 + 270 + ThreadV2Item { 271 + uri, 272 + depth: 1, 273 + value: ThreadV2ItemType::Post(post), 274 + } 275 + }) 276 + .collect(); 277 + 278 + Ok(Json(GetPostThreadOtherV2Res { thread })) 279 + } 280 + 281 + #[derive(Debug)] 282 + struct BuildThreadChildrenOpts<'a> { 283 + root_did: &'a str, 284 + sort: PostThreadSort, 285 + maybe_did: &'a Option<String>, 286 + max_depth: i32, 287 + } 288 + 289 + fn build_thread_children( 290 + grouped_replies: &mut HashMap<String, Vec<ThreadItem>>, 291 + replies_hyd: &mut HashMap<String, PostView>, 292 + hidden: &HashSet<String>, 293 + parent: &str, 294 + is_op_thread: bool, 295 + depth: i32, 296 + opts: &BuildThreadChildrenOpts, 297 + ) -> (Vec<ThreadV2Item>, bool) { 298 + let mut has_other_replies = false; 299 + 300 + let Some(replies) = grouped_replies.remove(parent) else { 301 + return (Vec::default(), has_other_replies); 302 + }; 303 + 304 + let replies = replies 305 + .into_iter() 306 + .filter_map(|item| replies_hyd.remove(&item.at_uri)) 307 + .sorted_by(sort_replies(&opts.sort)); 308 + 309 + let mut out = Vec::new(); 310 + 311 + for post in replies { 312 + let reply_count = grouped_replies 313 + .get(&post.uri) 314 + .map(|v| v.len()) 315 + .unwrap_or_default(); 316 + let at_max = depth == opts.max_depth; 317 + let more_replies = if at_max { reply_count } else { 0 }; 318 + let op_thread = is_op_thread && post.author.did == opts.root_did; 319 + 320 + // shouldn't push to the thread if there's a block relation. Bsky doesn't push a type of Blocked for replies... 321 + if let Some(v) = &post.author.viewer { 322 + if v.blocked_by || v.blocking.is_some() { 323 + continue; 324 + } 325 + } 326 + 327 + // check if the post is hidden AND we're NOT the author (hidden posts still show for their author) 328 + if hidden.contains(&post.uri) && !did_is_cur(opts.maybe_did, &post.author.did) { 329 + // post is hidden - do not ~pass go~ push to the thread. 330 + if depth == 1 { 331 + has_other_replies = true; 332 + } 333 + continue; 334 + } 335 + 336 + let uri = post.uri.clone(); 337 + out.push(ThreadV2Item { 338 + uri: post.uri.clone(), 339 + depth, 340 + value: ThreadV2ItemType::Post(ThreadItemPost { 341 + post, 342 + more_parents: false, 343 + more_replies: more_replies as i32, 344 + op_thread, 345 + hidden_by_threadgate: false, 346 + muted_by_viewer: false, 347 + }), 348 + }); 349 + 350 + if !at_max { 351 + // we don't care about has_other_replies when recursing 352 + let (children, _) = build_thread_children( 353 + grouped_replies, 354 + replies_hyd, 355 + hidden, 356 + &uri, 357 + op_thread, 358 + depth + 1, 359 + opts, 360 + ); 361 + 362 + out.extend(children); 363 + } 364 + } 365 + 366 + (out, has_other_replies) 367 + } 368 + 369 + fn sort_replies(sort: &PostThreadSort) -> impl Fn(&PostView, &PostView) -> Ordering + use<'_> { 370 + move |a: &PostView, b: &PostView| match sort { 371 + PostThreadSort::Newest => b.indexed_at.cmp(&a.indexed_at), 372 + PostThreadSort::Oldest => a.indexed_at.cmp(&b.indexed_at), 373 + PostThreadSort::Top => b.stats.like_count.cmp(&a.stats.like_count), 374 + } 375 + } 376 + 377 + fn did_is_cur(cur: &Option<String>, did: &String) -> bool { 378 + match cur { 379 + Some(cur) => did == cur, 380 + None => false, 381 + } 382 + }