Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

Compare changes

Choose any two refs to compare.

Changed files
+1413 -582
consumer
dataloader-rs
lexica
migrations
2025-02-16-142357_posts
2025-09-27-171241_post-tweaks
parakeet
parakeet-db
+1 -245
Cargo.lock
··· 125 125 checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" 126 126 127 127 [[package]] 128 - name = "async-channel" 129 - version = "1.9.0" 130 - source = "registry+https://github.com/rust-lang/crates.io-index" 131 - checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" 132 - dependencies = [ 133 - "concurrent-queue", 134 - "event-listener 2.5.3", 135 - "futures-core", 136 - ] 137 - 138 - [[package]] 139 - name = "async-channel" 140 - version = "2.3.1" 141 - source = "registry+https://github.com/rust-lang/crates.io-index" 142 - checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" 143 - dependencies = [ 144 - "concurrent-queue", 145 - "event-listener-strategy", 146 - "futures-core", 147 - "pin-project-lite", 148 - ] 149 - 150 - [[package]] 151 128 name = "async-compression" 152 129 version = "0.4.22" 153 130 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 161 138 ] 162 139 163 140 [[package]] 164 - name = "async-executor" 165 - version = "1.13.1" 166 - source = "registry+https://github.com/rust-lang/crates.io-index" 167 - checksum = "30ca9a001c1e8ba5149f91a74362376cc6bc5b919d92d988668657bd570bdcec" 168 - dependencies = [ 169 - "async-task", 170 - "concurrent-queue", 171 - "fastrand", 172 - "futures-lite", 173 - "slab", 174 - ] 175 - 176 - [[package]] 177 - name = "async-global-executor" 178 - version = "2.4.1" 179 - source = "registry+https://github.com/rust-lang/crates.io-index" 180 - checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" 181 - dependencies = [ 182 - "async-channel 2.3.1", 183 - "async-executor", 184 - "async-io", 185 - "async-lock", 186 - "blocking", 187 - "futures-lite", 188 - "once_cell", 189 - ] 190 - 191 - [[package]] 192 - name = "async-io" 193 - version = "2.4.0" 194 - source = "registry+https://github.com/rust-lang/crates.io-index" 195 - checksum = "43a2b323ccce0a1d90b449fd71f2a06ca7faa7c54c2751f06c9bd851fc061059" 196 - dependencies = [ 197 - "async-lock", 198 - "cfg-if", 199 - "concurrent-queue", 200 - "futures-io", 201 - "futures-lite", 202 - "parking", 203 - "polling", 204 - "rustix", 205 - "slab", 206 - "tracing", 207 - "windows-sys 0.59.0", 208 - ] 209 - 210 - [[package]] 211 - name = "async-lock" 212 - version = "3.4.0" 213 - source = "registry+https://github.com/rust-lang/crates.io-index" 214 - checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" 215 - dependencies = [ 216 - "event-listener 5.4.0", 217 - "event-listener-strategy", 218 - "pin-project-lite", 219 - ] 220 - 221 - [[package]] 222 141 name = "async-recursion" 223 142 version = "1.1.1" 224 143 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 228 147 "quote", 229 148 "syn", 230 149 ] 231 - 232 - [[package]] 233 - name = "async-std" 234 - version = "1.13.0" 235 - source = "registry+https://github.com/rust-lang/crates.io-index" 236 - checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615" 237 - dependencies = [ 238 - "async-channel 1.9.0", 239 - "async-global-executor", 240 - "async-io", 241 - "async-lock", 242 - "crossbeam-utils", 243 - "futures-channel", 244 - "futures-core", 245 - "futures-io", 246 - "futures-lite", 247 - "gloo-timers", 248 - "kv-log-macro", 249 - "log", 250 - "memchr", 251 - "once_cell", 252 - "pin-project-lite", 253 - "pin-utils", 254 - "slab", 255 - "wasm-bindgen-futures", 256 - ] 257 - 258 - [[package]] 259 - name = "async-task" 260 - version = "4.7.1" 261 - source = "registry+https://github.com/rust-lang/crates.io-index" 262 - checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" 263 150 264 151 [[package]] 265 152 name = "async-trait" ··· 488 375 ] 489 376 490 377 [[package]] 491 - name = "blocking" 492 - version = "1.6.1" 493 - source = "registry+https://github.com/rust-lang/crates.io-index" 494 - checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" 495 - dependencies = [ 496 - "async-channel 2.3.1", 497 - "async-task", 498 - "futures-io", 499 - "futures-lite", 500 - "piper", 501 - ] 502 - 503 - [[package]] 504 378 name = "brotli" 505 379 version = "7.0.0" 506 380 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 727 601 ] 728 602 729 603 [[package]] 730 - name = "concurrent-queue" 731 - version = "2.5.0" 732 - source = "registry+https://github.com/rust-lang/crates.io-index" 733 - checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" 734 - dependencies = [ 735 - "crossbeam-utils", 736 - ] 737 - 738 - [[package]] 739 604 name = "const-oid" 740 605 version = "0.9.6" 741 606 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 968 833 name = "dataloader" 969 834 version = "0.18.0" 970 835 dependencies = [ 971 - "async-std", 972 836 "futures", 973 837 "tokio", 974 838 ] ··· 1256 1120 ] 1257 1121 1258 1122 [[package]] 1259 - name = "event-listener" 1260 - version = "2.5.3" 1261 - source = "registry+https://github.com/rust-lang/crates.io-index" 1262 - checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" 1263 - 1264 - [[package]] 1265 - name = "event-listener" 1266 - version = "5.4.0" 1267 - source = "registry+https://github.com/rust-lang/crates.io-index" 1268 - checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" 1269 - dependencies = [ 1270 - "concurrent-queue", 1271 - "parking", 1272 - "pin-project-lite", 1273 - ] 1274 - 1275 - [[package]] 1276 - name = "event-listener-strategy" 1277 - version = "0.5.3" 1278 - source = "registry+https://github.com/rust-lang/crates.io-index" 1279 - checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" 1280 - dependencies = [ 1281 - "event-listener 5.4.0", 1282 - "pin-project-lite", 1283 - ] 1284 - 1285 - [[package]] 1286 1123 name = "eyre" 1287 1124 version = "0.6.12" 1288 1125 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1453 1290 checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" 1454 1291 1455 1292 [[package]] 1456 - name = "futures-lite" 1457 - version = "2.6.0" 1458 - source = "registry+https://github.com/rust-lang/crates.io-index" 1459 - checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" 1460 - dependencies = [ 1461 - "fastrand", 1462 - "futures-core", 1463 - "futures-io", 1464 - "parking", 1465 - "pin-project-lite", 1466 - ] 1467 - 1468 - [[package]] 1469 1293 name = "futures-macro" 1470 1294 version = "0.3.31" 1471 1295 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1564 1388 checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" 1565 1389 1566 1390 [[package]] 1567 - name = "gloo-timers" 1568 - version = "0.3.0" 1569 - source = "registry+https://github.com/rust-lang/crates.io-index" 1570 - checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" 1571 - dependencies = [ 1572 - "futures-channel", 1573 - "futures-core", 1574 - "js-sys", 1575 - "wasm-bindgen", 1576 - ] 1577 - 1578 - [[package]] 1579 1391 name = "group" 1580 1392 version = "0.13.0" 1581 1393 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1665 1477 version = "0.3.9" 1666 1478 source = "registry+https://github.com/rust-lang/crates.io-index" 1667 1479 checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" 1668 - 1669 - [[package]] 1670 - name = "hermit-abi" 1671 - version = "0.4.0" 1672 - source = "registry+https://github.com/rust-lang/crates.io-index" 1673 - checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" 1674 1480 1675 1481 [[package]] 1676 1482 name = "hex" ··· 2248 2054 ] 2249 2055 2250 2056 [[package]] 2251 - name = "kv-log-macro" 2252 - version = "1.0.7" 2253 - source = "registry+https://github.com/rust-lang/crates.io-index" 2254 - checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" 2255 - dependencies = [ 2256 - "log", 2257 - ] 2258 - 2259 - [[package]] 2260 2057 name = "lazy_static" 2261 2058 version = "1.5.0" 2262 2059 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2361 2158 version = "0.4.25" 2362 2159 source = "registry+https://github.com/rust-lang/crates.io-index" 2363 2160 checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" 2364 - dependencies = [ 2365 - "value-bag", 2366 - ] 2367 2161 2368 2162 [[package]] 2369 2163 name = "lru-cache" ··· 2655 2449 source = "registry+https://github.com/rust-lang/crates.io-index" 2656 2450 checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" 2657 2451 dependencies = [ 2658 - "hermit-abi 0.3.9", 2452 + "hermit-abi", 2659 2453 "libc", 2660 2454 ] 2661 2455 ··· 2824 2618 ] 2825 2619 2826 2620 [[package]] 2827 - name = "parking" 2828 - version = "2.2.1" 2829 - source = "registry+https://github.com/rust-lang/crates.io-index" 2830 - checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" 2831 - 2832 - [[package]] 2833 2621 name = "parking_lot" 2834 2622 version = "0.11.2" 2835 2623 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2992 2780 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 2993 2781 2994 2782 [[package]] 2995 - name = "piper" 2996 - version = "0.2.4" 2997 - source = "registry+https://github.com/rust-lang/crates.io-index" 2998 - checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" 2999 - dependencies = [ 3000 - "atomic-waker", 3001 - "fastrand", 3002 - "futures-io", 3003 - ] 3004 - 3005 - [[package]] 3006 2783 name = "pkcs1" 3007 2784 version = "0.7.5" 3008 2785 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3028 2805 version = "0.3.31" 3029 2806 source = "registry+https://github.com/rust-lang/crates.io-index" 3030 2807 checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" 3031 - 3032 - [[package]] 3033 - name = "polling" 3034 - version = "3.7.4" 3035 - source = "registry+https://github.com/rust-lang/crates.io-index" 3036 - checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f" 3037 - dependencies = [ 3038 - "cfg-if", 3039 - "concurrent-queue", 3040 - "hermit-abi 0.4.0", 3041 - "pin-project-lite", 3042 - "rustix", 3043 - "tracing", 3044 - "windows-sys 0.59.0", 3045 - ] 3046 2808 3047 2809 [[package]] 3048 2810 name = "portable-atomic" ··· 4688 4450 version = "0.1.1" 4689 4451 source = "registry+https://github.com/rust-lang/crates.io-index" 4690 4452 checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" 4691 - 4692 - [[package]] 4693 - name = "value-bag" 4694 - version = "1.10.0" 4695 - source = "registry+https://github.com/rust-lang/crates.io-index" 4696 - checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2" 4697 4453 4698 4454 [[package]] 4699 4455 name = "vcpkg"
+6 -1
consumer/src/backfill/mod.rs
··· 275 275 follows: Vec<(String, String, DateTime<Utc>)>, 276 276 list_items: Vec<(String, records::AppBskyGraphListItem)>, 277 277 verifications: Vec<(String, Cid, records::AppBskyGraphVerification)>, 278 + threadgates: Vec<(String, Cid, records::AppBskyFeedThreadgate)>, // not COPY'd but needs to be kept until last. 278 279 records: Vec<(String, Cid)>, 279 280 } 280 281 281 282 impl CopyStore { 282 283 async fn submit(self, t: &mut Transaction<'_>, did: &str) -> Result<(), tokio_postgres::Error> { 283 284 db::copy::copy_likes(t, did, self.likes).await?; 284 - db::copy::copy_posts(t, did, self.posts).await?; 285 285 db::copy::copy_reposts(t, did, self.reposts).await?; 286 286 db::copy::copy_blocks(t, did, self.blocks).await?; 287 287 db::copy::copy_follows(t, did, self.follows).await?; 288 288 db::copy::copy_list_items(t, self.list_items).await?; 289 289 db::copy::copy_verification(t, did, self.verifications).await?; 290 + db::copy::copy_posts(t, did, self.posts).await?; 291 + for (at_uri, cid, record) in self.threadgates { 292 + db::threadgate_enforce_backfill(t, did, &record).await?; 293 + db::threadgate_upsert(t, &at_uri, cid, record).await?; 294 + } 290 295 db::copy::copy_records(t, did, self.records).await?; 291 296 292 297 Ok(())
+11 -1
consumer/src/backfill/repo.rs
··· 4 4 }; 5 5 use crate::indexer::records; 6 6 use crate::indexer::types::{AggregateDeltaStore, RecordTypes}; 7 + use crate::utils::at_uri_is_by; 7 8 use crate::{db, indexer}; 8 9 use deadpool_postgres::Transaction; 9 10 use ipld_core::cid::Cid; ··· 144 145 db::maintain_self_labels(t, did, Some(cid), &at_uri, labels).await?; 145 146 } 146 147 if let Some(embed) = rec.embed.clone().and_then(|embed| embed.into_bsky()) { 147 - db::post_embed_insert(t, &at_uri, embed, rec.created_at).await?; 148 + db::post_embed_insert(t, &at_uri, embed, rec.created_at, true).await?; 148 149 } 149 150 150 151 deltas.incr(did, AggregateType::ProfilePost).await; ··· 165 166 copies 166 167 .reposts 167 168 .push((rkey.to_string(), rec.subject, rec.via, rec.created_at)); 169 + } 170 + RecordTypes::AppBskyFeedThreadgate(record) => { 171 + if !at_uri_is_by(&record.post, did) { 172 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 173 + return Ok(()); 174 + } 175 + 176 + copies.push_record(&at_uri, cid); 177 + copies.threadgates.push((at_uri, cid, record)); 168 178 } 169 179 RecordTypes::AppBskyGraphBlock(rec) => { 170 180 copies.push_record(&at_uri, cid);
+38 -3
consumer/src/db/copy.rs
··· 1 1 use super::PgExecResult; 2 2 use crate::indexer::records; 3 - use crate::utils::strongref_to_parts; 3 + use crate::utils::{extract_mentions_and_tags, merge_tags, strongref_to_parts}; 4 4 use chrono::prelude::*; 5 5 use deadpool_postgres::Transaction; 6 6 use futures::pin_mut; ··· 119 119 .await 120 120 } 121 121 122 - const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, created_at) FROM STDIN (FORMAT binary)"; 122 + const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, mentions, created_at) FROM STDIN (FORMAT binary)"; 123 123 const POST_TYPES: &[Type] = &[ 124 124 Type::TEXT, 125 125 Type::TEXT, ··· 135 135 Type::TEXT, 136 136 Type::TEXT, 137 137 Type::TEXT, 138 + Type::TEXT_ARRAY, 138 139 Type::TIMESTAMP, 139 140 ]; 140 141 pub async fn copy_posts( ··· 159 160 160 161 for (at_uri, cid, post) in data { 161 162 let record = serde_json::to_value(&post).unwrap(); 163 + let (mentions, tags) = post 164 + .facets 165 + .as_ref() 166 + .map(|v| extract_mentions_and_tags(v)) 167 + .unzip(); 162 168 let facets = post.facets.and_then(|v| serde_json::to_value(v).ok()); 163 169 let embed = post.embed.as_ref().map(|v| v.as_str()); 164 170 let embed_subtype = post.embed.as_ref().and_then(|v| v.subtype()); 165 171 let (parent_uri, parent_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.parent)); 166 172 let (root_uri, root_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.root)); 173 + 174 + let tags = merge_tags(tags, post.tags); 167 175 168 176 let writer = writer.as_mut(); 169 177 writer ··· 175 183 &post.text, 176 184 &facets, 177 185 &post.langs.unwrap_or_default(), 178 - &post.tags.unwrap_or_default(), 186 + &tags, 179 187 &parent_uri, 180 188 &parent_cid, 181 189 &root_uri, 182 190 &root_cid, 183 191 &embed, 184 192 &embed_subtype, 193 + &mentions, 185 194 &post.created_at.naive_utc(), 186 195 ]) 187 196 .await?; 188 197 } 189 198 190 199 writer.finish().await?; 200 + 201 + let threadgated: Vec<(String, String, DateTime<Utc>)> = conn 202 + .query( 203 + "SELECT root_uri, p.at_uri, p.created_at FROM posts_tmp p INNER JOIN threadgates t ON root_uri = post_uri WHERE t.allow IS NOT NULL", 204 + &[], 205 + ) 206 + .await? 207 + .into_iter() 208 + .map(|v| (v.get(0), v.get(1), v.get(2))).collect(); 209 + 210 + for (root, post, created_at) in threadgated { 211 + match super::post_enforce_threadgate(conn, &root, did, created_at, true).await { 212 + Ok(true) => { 213 + conn.execute( 214 + "UPDATE posts_tmp SET violates_threadgate=TRUE WHERE at_uri=$1", 215 + &[&post], 216 + ) 217 + .await?; 218 + } 219 + Ok(false) => continue, 220 + Err(e) => { 221 + tracing::error!("failed to check threadgate enforcement: {e}"); 222 + continue; 223 + } 224 + } 225 + } 191 226 192 227 conn.execute("INSERT INTO posts (SELECT * FROM posts_tmp)", &[]) 193 228 .await
+208
consumer/src/db/gates.rs
··· 1 + use super::{PgExecResult, PgResult}; 2 + use crate::indexer::records::{ 3 + AppBskyFeedThreadgate, ThreadgateRule, THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING, 4 + THREADGATE_RULE_LIST, THREADGATE_RULE_MENTION, 5 + }; 6 + use chrono::prelude::*; 7 + use chrono::{DateTime, Utc}; 8 + use deadpool_postgres::GenericClient; 9 + use std::collections::HashSet; 10 + 11 + pub async fn post_enforce_threadgate<C: GenericClient>( 12 + conn: &mut C, 13 + root: &str, 14 + post_author: &str, 15 + post_created_at: DateTime<Utc>, 16 + is_backfill: bool, 17 + ) -> PgResult<bool> { 18 + // check if the root and the current post are the same author 19 + // strip "at://" then break into parts by '/' 20 + let parts = root[5..].split('/').collect::<Vec<_>>(); 21 + let root_author = parts[0]; 22 + if root_author == post_author { 23 + return Ok(false); 24 + } 25 + 26 + let tg_data = super::threadgate_get(conn, root).await?; 27 + 28 + let Some((created_at, allow, allow_lists)) = tg_data else { 29 + return Ok(false); 30 + }; 31 + 32 + // when backfilling, there's no point continuing if the record is dated before the threadgate 33 + if is_backfill && post_created_at < created_at { 34 + return Ok(false); 35 + } 36 + 37 + if allow.is_empty() { 38 + return Ok(true); 39 + } 40 + 41 + let allow: HashSet<String> = HashSet::from_iter(allow); 42 + 43 + if allow.contains(THREADGATE_RULE_FOLLOWER) || allow.contains(THREADGATE_RULE_FOLLOWING) { 44 + let profile_state: Option<(bool, bool)> = conn 45 + .query_opt( 46 + "SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2", 47 + &[&root_author, &post_author], 48 + ) 49 + .await? 50 + .map(|v| (v.get(0), v.get(1))); 51 + 52 + if let Some((following, followed)) = profile_state { 53 + if allow.contains(THREADGATE_RULE_FOLLOWER) && followed { 54 + return Ok(false); 55 + } 56 + 57 + if allow.contains(THREADGATE_RULE_FOLLOWING) && following { 58 + return Ok(false); 59 + } 60 + } 61 + } 62 + 63 + // check mentions 64 + if allow.contains(THREADGATE_RULE_MENTION) { 65 + let mentions: Vec<String> = conn 66 + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 67 + .await? 68 + .map(|r| r.get(0)) 69 + .unwrap_or_default(); 70 + 71 + if mentions.contains(&post_author.to_owned()) { 72 + return Ok(false); 73 + } 74 + } 75 + 76 + if allow.contains(THREADGATE_RULE_LIST) { 77 + if allow_lists.is_empty() { 78 + return Ok(true); 79 + } 80 + 81 + let count: i64 = conn 82 + .query_one( 83 + "SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2", 84 + &[&allow_lists, &post_author], 85 + ) 86 + .await? 87 + .get(0); 88 + if count != 0 { 89 + return Ok(false); 90 + } 91 + } 92 + 93 + Ok(true) 94 + } 95 + 96 + pub async fn postgate_maintain_detaches<C: GenericClient>( 97 + conn: &mut C, 98 + post: &str, 99 + detached: &[String], 100 + disable_effective: Option<NaiveDateTime>, 101 + ) -> PgExecResult { 102 + conn.execute( 103 + "SELECT maintain_postgates($1, $2, $3)", 104 + &[&post, &detached, &disable_effective], 105 + ) 106 + .await 107 + } 108 + 109 + // variant of post_enforce_threadgate that runs when backfilling to clean up any posts already in DB 110 + pub async fn threadgate_enforce_backfill<C: GenericClient>( 111 + conn: &mut C, 112 + root_author: &str, 113 + threadgate: &AppBskyFeedThreadgate, 114 + ) -> PgExecResult { 115 + // pull out allow - if it's None we can skip this gate. 116 + let Some(allow) = threadgate.allow.as_ref() else { 117 + return Ok(0); 118 + }; 119 + 120 + let root = &threadgate.post; 121 + 122 + if allow.is_empty() { 123 + // blind update everything 124 + return conn.execute( 125 + "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri=$1 AND did != $2 AND created_at >= $3", 126 + &[&root, &root_author, &threadgate.created_at], 127 + ).await; 128 + } 129 + 130 + // pull authors with our root_uri where the author is not the root author and are dated after created_at 131 + // this is mutable because we'll remove ALLOWED dids 132 + let mut dids: HashSet<String> = conn 133 + .query( 134 + "SELECT DISTINCT did FROM posts WHERE root_uri=$1 AND did != $2 AND created_at >= $3", 135 + &[&root, &root_author, &threadgate.created_at], 136 + ) 137 + .await? 138 + .into_iter() 139 + .map(|row| row.get(0)) 140 + .collect(); 141 + 142 + // this will be empty if there are no replies. 143 + if dids.is_empty() { 144 + return Ok(0); 145 + } 146 + 147 + let allowed_lists = allow 148 + .iter() 149 + .filter_map(|rule| match rule { 150 + ThreadgateRule::List { list } => Some(list), 151 + _ => None, 152 + }) 153 + .collect::<Vec<_>>(); 154 + 155 + let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str())); 156 + 157 + if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() { 158 + let current_dids: Vec<_> = dids.iter().collect(); 159 + 160 + let res = conn.query( 161 + "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", 162 + &[&root_author, &current_dids] 163 + ).await?; 164 + 165 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 166 + } 167 + 168 + if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { 169 + let current_dids: Vec<_> = dids.iter().collect(); 170 + 171 + let res = conn.query( 172 + "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", 173 + &[&root_author, &current_dids] 174 + ).await?; 175 + 176 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 177 + } 178 + 179 + if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { 180 + let mentions: Vec<String> = conn 181 + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 182 + .await? 183 + .map(|r| r.get(0)) 184 + .unwrap_or_default(); 185 + 186 + dids = &dids - &HashSet::from_iter(mentions); 187 + } 188 + 189 + if allow.contains(THREADGATE_RULE_LIST) && !dids.is_empty() { 190 + let current_dids: Vec<_> = dids.iter().collect(); 191 + 192 + let res = conn 193 + .query( 194 + "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", 195 + &[&allowed_lists, &current_dids], 196 + ) 197 + .await?; 198 + 199 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 200 + } 201 + 202 + let dids = dids.into_iter().collect::<Vec<_>>(); 203 + 204 + conn.execute( 205 + "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri = $1 AND did = ANY($2) AND created_at >= $3", 206 + &[&threadgate.post, &dids, &threadgate.created_at] 207 + ).await 208 + }
+2
consumer/src/db/mod.rs
··· 7 7 mod actor; 8 8 mod backfill; 9 9 pub mod copy; 10 + mod gates; 10 11 mod labels; 11 12 mod record; 12 13 13 14 pub use actor::*; 14 15 pub use backfill::*; 16 + pub use gates::*; 15 17 pub use labels::*; 16 18 pub use record::*;
+85 -41
consumer/src/db/record.rs
··· 1 1 use super::{PgExecResult, PgOptResult, PgResult}; 2 2 use crate::indexer::records::*; 3 - use crate::utils::{blob_ref, strongref_to_parts}; 3 + use crate::utils::{blob_ref, extract_mentions_and_tags, merge_tags, strongref_to_parts}; 4 4 use chrono::prelude::*; 5 5 use deadpool_postgres::GenericClient; 6 6 use ipld_core::cid::Cid; 7 7 use lexica::community_lexicon::bookmarks::Bookmark; 8 + use std::collections::HashSet; 8 9 9 10 pub async fn record_upsert<C: GenericClient>( 10 11 conn: &mut C, ··· 317 318 repo: &str, 318 319 cid: Cid, 319 320 rec: AppBskyFeedPost, 321 + is_backfill: bool, 320 322 ) -> PgExecResult { 321 323 let cid = cid.to_string(); 322 324 let record = serde_json::to_value(&rec).unwrap(); 325 + let (mentions, tags) = rec 326 + .facets 327 + .as_ref() 328 + .map(|v| extract_mentions_and_tags(v)) 329 + .unzip(); 323 330 let facets = rec.facets.and_then(|v| serde_json::to_value(v).ok()); 324 331 let (parent_uri, parent_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.parent)); 325 332 let (root_uri, root_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.root)); 326 333 let embed = rec.embed.as_ref().map(|v| v.as_str()); 327 334 let embed_subtype = rec.embed.as_ref().and_then(|v| v.subtype()); 328 335 336 + // if there is a root, we need to check for the presence of a threadgate. 337 + let violates_threadgate = match &root_uri { 338 + Some(root) => { 339 + super::post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await? 340 + } 341 + None => false, 342 + }; 343 + 344 + let tags = merge_tags(tags, rec.tags); 345 + 329 346 let count = conn 330 347 .execute( 331 348 include_str!("sql/post_insert.sql"), ··· 337 354 &rec.text, 338 355 &facets, 339 356 &rec.langs.unwrap_or_default(), 340 - &rec.tags.unwrap_or_default(), 357 + &tags, 341 358 &parent_uri, 342 359 &parent_cid, 343 360 &root_uri, 344 361 &root_cid, 345 362 &embed, 346 363 &embed_subtype, 364 + &mentions, 365 + &violates_threadgate, 347 366 &rec.created_at, 348 367 ], 349 368 ) 350 369 .await?; 351 370 352 371 if let Some(embed) = rec.embed.and_then(|embed| embed.into_bsky()) { 353 - post_embed_insert(conn, at_uri, embed, rec.created_at).await?; 372 + post_embed_insert(conn, at_uri, embed, rec.created_at, is_backfill).await?; 354 373 } 355 374 356 375 Ok(count) ··· 380 399 post: &str, 381 400 embed: AppBskyEmbed, 382 401 created_at: DateTime<Utc>, 402 + is_backfill: bool, 383 403 ) -> PgExecResult { 384 404 match embed { 385 405 AppBskyEmbed::Images(embed) => post_embed_image_insert(conn, post, embed).await, 386 406 AppBskyEmbed::Video(embed) => post_embed_video_insert(conn, post, embed).await, 387 407 AppBskyEmbed::External(embed) => post_embed_external_insert(conn, post, embed).await, 388 408 AppBskyEmbed::Record(embed) => { 389 - post_embed_record_insert(conn, post, embed, created_at).await 409 + post_embed_record_insert(conn, post, embed, created_at, is_backfill).await 390 410 } 391 411 AppBskyEmbed::RecordWithMedia(embed) => { 392 - post_embed_record_insert(conn, post, embed.record, created_at).await?; 412 + post_embed_record_insert(conn, post, embed.record, created_at, is_backfill).await?; 393 413 match *embed.media { 394 414 AppBskyEmbed::Images(embed) => post_embed_image_insert(conn, post, embed).await, 395 415 AppBskyEmbed::Video(embed) => post_embed_video_insert(conn, post, embed).await, ··· 476 496 ).await 477 497 } 478 498 499 + const PG_DISABLE_RULE: &str = "app.bsky.feed.postgate#disableRule"; 479 500 async fn post_embed_record_insert<C: GenericClient>( 480 501 conn: &mut C, 481 502 post: &str, 482 503 embed: AppBskyEmbedRecord, 483 504 post_created_at: DateTime<Utc>, 505 + is_backfill: bool, 484 506 ) -> PgExecResult { 485 507 // strip "at://" then break into parts by '/' 486 508 let parts = embed.record.uri[5..].split('/').collect::<Vec<_>>(); 487 509 488 510 let detached = if parts[1] == "app.bsky.feed.post" { 489 - let postgate_effective: Option<DateTime<Utc>> = conn 490 - .query_opt( 491 - "SELECT created_at FROM postgates WHERE post_uri=$1", 492 - &[&post], 493 - ) 494 - .await? 495 - .map(|v| v.get(0)); 511 + let pg_data = postgate_get(conn, post).await?; 496 512 497 - postgate_effective 498 - .map(|v| Utc::now().min(post_created_at) > v) 499 - .unwrap_or_default() 513 + if let Some((effective, detached, rules)) = pg_data { 514 + let detached: HashSet<String> = HashSet::from_iter(detached); 515 + let rules: HashSet<String> = HashSet::from_iter(rules); 516 + let compare_date = match is_backfill { 517 + true => post_created_at, 518 + false => Utc::now(), 519 + }; 520 + 521 + detached.contains(post) || (rules.contains(PG_DISABLE_RULE) && compare_date > effective) 522 + } else { 523 + false 524 + } 500 525 } else { 501 526 false 502 527 }; ··· 507 532 ).await 508 533 } 509 534 535 + async fn postgate_get<C: GenericClient>( 536 + conn: &mut C, 537 + post: &str, 538 + ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 539 + let res = conn 540 + .query_opt( 541 + "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 + &[&post], 543 + ) 544 + .await? 545 + .map(|v| (v.get(0), v.get(1), v.get(2))); 546 + 547 + Ok(res) 548 + } 549 + 510 550 pub async fn postgate_upsert<C: GenericClient>( 511 551 conn: &mut C, 512 552 at_uri: &str, ··· 536 576 pub async fn postgate_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { 537 577 conn.execute("DELETE FROM postgates WHERE at_uri=$1", &[&at_uri]) 538 578 .await 539 - } 540 - 541 - pub async fn postgate_maintain_detaches<C: GenericClient>( 542 - conn: &mut C, 543 - post: &str, 544 - detached: &[String], 545 - disable_effective: Option<NaiveDateTime>, 546 - ) -> PgExecResult { 547 - conn.execute( 548 - "SELECT maintain_postgates($1, $2, $3)", 549 - &[&post, &detached, &disable_effective], 550 - ) 551 - .await 552 579 } 553 580 554 581 pub async fn profile_upsert<C: GenericClient>( ··· 700 727 .await 701 728 } 702 729 730 + pub async fn threadgate_get<C: GenericClient>( 731 + conn: &mut C, 732 + post: &str, 733 + ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 734 + let res = conn 735 + .query_opt( 736 + "SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL", 737 + &[&post], 738 + ) 739 + .await? 740 + .map(|v| (v.get(0), v.get(1), v.get(2))); 741 + 742 + Ok(res) 743 + } 744 + 703 745 pub async fn threadgate_upsert<C: GenericClient>( 704 746 conn: &mut C, 705 747 at_uri: &str, ··· 708 750 ) -> PgExecResult { 709 751 let record = serde_json::to_value(&rec).unwrap(); 710 752 711 - let allowed_lists = rec 712 - .allow 713 - .iter() 714 - .filter_map(|rule| match rule { 715 - ThreadgateRule::List { list } => Some(list.clone()), 716 - _ => None, 717 - }) 718 - .collect::<Vec<_>>(); 753 + let allowed_lists = rec.allow.as_ref().map(|allow| { 754 + allow 755 + .iter() 756 + .filter_map(|rule| match rule { 757 + ThreadgateRule::List { list } => Some(list.clone()), 758 + _ => None, 759 + }) 760 + .collect::<Vec<_>>() 761 + }); 719 762 720 - let allow = rec 721 - .allow 722 - .into_iter() 723 - .map(|v| v.as_str().to_string()) 724 - .collect::<Vec<_>>(); 763 + let allow = rec.allow.map(|allow| { 764 + allow 765 + .into_iter() 766 + .map(|v| v.as_str().to_string()) 767 + .collect::<Vec<_>>() 768 + }); 725 769 726 770 conn.execute( 727 771 include_str!("sql/threadgate_upsert.sql"),
+2 -2
consumer/src/db/sql/post_insert.sql
··· 1 1 INSERT INTO posts (at_uri, did, cid, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, 2 - root_cid, embed, embed_subtype, created_at) 3 - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) 2 + root_cid, embed, embed_subtype, mentions, violates_threadgate, created_at) 3 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) 4 4 ON CONFLICT DO NOTHING
+1 -1
consumer/src/indexer/mod.rs
··· 625 625 }); 626 626 627 627 let labels = record.labels.clone(); 628 - db::post_insert(conn, at_uri, repo, cid, record).await?; 628 + db::post_insert(conn, at_uri, repo, cid, record, false).await?; 629 629 if let Some(labels) = labels { 630 630 db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; 631 631 }
+10 -6
consumer/src/indexer/records.rs
··· 267 267 pub struct AppBskyFeedThreadgate { 268 268 pub post: String, 269 269 pub created_at: DateTime<Utc>, 270 - #[serde(default)] 271 - pub allow: Vec<ThreadgateRule>, 270 + pub allow: Option<Vec<ThreadgateRule>>, 272 271 #[serde(default)] 273 272 pub hidden_replies: Vec<String>, 274 273 } 274 + 275 + pub const THREADGATE_RULE_MENTION: &str = "app.bsky.feed.threadgate#mentionRule"; 276 + pub const THREADGATE_RULE_FOLLOWER: &str = "app.bsky.feed.threadgate#followerRule"; 277 + pub const THREADGATE_RULE_FOLLOWING: &str = "app.bsky.feed.threadgate#followingRule"; 278 + pub const THREADGATE_RULE_LIST: &str = "app.bsky.feed.threadgate#listRule"; 275 279 276 280 #[derive(Debug, Deserialize, Serialize)] 277 281 #[serde(tag = "$type")] ··· 289 293 impl ThreadgateRule { 290 294 pub fn as_str(&self) -> &'static str { 291 295 match self { 292 - ThreadgateRule::Mention => "app.bsky.feed.threadgate#mentionRule", 293 - ThreadgateRule::Follower => "app.bsky.feed.threadgate#followerRule", 294 - ThreadgateRule::Following => "app.bsky.feed.threadgate#followingRule", 295 - ThreadgateRule::List { .. } => "app.bsky.feed.threadgate#listRule", 296 + ThreadgateRule::Mention => THREADGATE_RULE_MENTION, 297 + ThreadgateRule::Follower => THREADGATE_RULE_FOLLOWER, 298 + ThreadgateRule::Following => THREADGATE_RULE_FOLLOWING, 299 + ThreadgateRule::List { .. } => THREADGATE_RULE_LIST, 296 300 } 297 301 } 298 302 }
+31
consumer/src/utils.rs
··· 1 + use lexica::app_bsky::richtext::{Facet, FacetMain, FacetOuter}; 1 2 use lexica::{Blob, StrongRef}; 2 3 use serde::{Deserialize, Deserializer}; 3 4 ··· 39 40 40 41 did == split_aturi[2] 41 42 } 43 + 44 + pub fn extract_mentions_and_tags(from: &[FacetMain]) -> (Vec<String>, Vec<String>) { 45 + let (mentions, tags) = from 46 + .iter() 47 + .flat_map(|v| { 48 + v.features.iter().map(|facet| match facet { 49 + FacetOuter::Bsky(Facet::Mention { did }) => (Some(did), None), 50 + FacetOuter::Bsky(Facet::Tag { tag }) => (None, Some(tag)), 51 + _ => (None, None), 52 + }) 53 + }) 54 + .unzip::<_, _, Vec<_>, Vec<_>>(); 55 + 56 + let mentions = mentions.into_iter().flatten().cloned().collect(); 57 + let tags = tags.into_iter().flatten().cloned().collect(); 58 + 59 + (mentions, tags) 60 + } 61 + 62 + pub fn merge_tags<T>(t1: Option<Vec<T>>, t2: Option<Vec<T>>) -> Vec<T> { 63 + match (t1, t2) { 64 + (Some(t1), None) => t1, 65 + (None, Some(t2)) => t2, 66 + (Some(mut t1), Some(t2)) => { 67 + t1.extend(t2); 68 + t1 69 + } 70 + _ => Vec::default(), 71 + } 72 + }
+2 -12
dataloader-rs/Cargo.toml
··· 2 2 name = "dataloader" 3 3 version = "0.18.0" 4 4 edition = "2021" 5 - authors = ["cksac <cs.cksac@gmail.com>", "Lily"] 5 + authors = ["cksac <cs.cksac@gmail.com>", "Mia"] 6 6 description = "Rust implementation of Facebook's DataLoader using async-await." 7 7 keywords = ["batcher", "dataloader", "cache"] 8 8 categories = ["asynchronous", "caching"] ··· 15 15 [badges] 16 16 travis-ci = { repository = "/cksac/dataloader-rs" } 17 17 18 - [features] 19 - default = ["runtime-async-std"] 20 - runtime-async-std = [ 21 - "async-std", 22 - ] 23 - runtime-tokio = [ 24 - "tokio" 25 - ] 26 - 27 18 [dependencies] 28 - async-std = { version = "1", optional = true } 29 - tokio = { version = "1", features = [ "sync", "rt" ], optional = true } 19 + tokio = { version = "1", features = [ "sync", "rt" ] } 30 20 31 21 [dev-dependencies] 32 22 futures = "0.3"
-13
dataloader-rs/src/runtime.rs
··· 1 - // runtime-async-std 2 - #[cfg(feature = "runtime-async-std")] 3 - pub type Arc<T> = async_std::sync::Arc<T>; 4 - 5 - #[cfg(feature = "runtime-async-std")] 6 - pub type Mutex<T> = async_std::sync::Mutex<T>; 7 - 8 - #[cfg(feature = "runtime-async-std")] 9 - pub use async_std::task::yield_now; 10 - 11 1 // runtime-tokio 12 - #[cfg(feature = "runtime-tokio")] 13 2 pub type Arc<T> = std::sync::Arc<T>; 14 3 15 - #[cfg(feature = "runtime-tokio")] 16 4 pub type Mutex<T> = tokio::sync::Mutex<T>; 17 5 18 - #[cfg(feature = "runtime-tokio")] 19 6 pub use tokio::task::yield_now;
+4
lexica/src/app_bsky/actor.rs
··· 160 160 pub verification: Option<VerificationState>, 161 161 #[serde(skip_serializing_if = "Option::is_none")] 162 162 pub status: Option<StatusView>, 163 + #[serde(skip_serializing_if = "Option::is_none")] 164 + pub pronouns: Option<String>, 163 165 164 166 pub created_at: DateTime<Utc>, 165 167 } ··· 186 188 pub verification: Option<VerificationState>, 187 189 #[serde(skip_serializing_if = "Option::is_none")] 188 190 pub status: Option<StatusView>, 191 + #[serde(skip_serializing_if = "Option::is_none")] 192 + pub pronouns: Option<String>, 189 193 190 194 pub created_at: DateTime<Utc>, 191 195 pub indexed_at: NaiveDateTime,
+1
lexica/src/app_bsky/mod.rs
··· 7 7 pub mod graph; 8 8 pub mod labeler; 9 9 pub mod richtext; 10 + pub mod unspecced; 10 11 11 12 #[derive(Clone, Default, Debug, Serialize)] 12 13 #[serde(rename_all = "camelCase")]
+33
lexica/src/app_bsky/unspecced.rs
··· 1 + use crate::app_bsky::feed::{BlockedAuthor, PostView}; 2 + use serde::Serialize; 3 + 4 + #[derive(Clone, Debug, Serialize)] 5 + pub struct ThreadV2Item { 6 + pub uri: String, 7 + pub depth: i32, 8 + pub value: ThreadV2ItemType, 9 + } 10 + 11 + #[derive(Clone, Debug, Serialize)] 12 + #[serde(tag = "$type")] 13 + pub enum ThreadV2ItemType { 14 + #[serde(rename = "app.bsky.unspecced.defs#threadItemPost")] 15 + Post(ThreadItemPost), 16 + #[serde(rename = "app.bsky.unspecced.defs#threadItemNoUnauthenticated")] 17 + NoUnauthenticated {}, 18 + #[serde(rename = "app.bsky.unspecced.defs#threadItemNotFound")] 19 + NotFound {}, 20 + #[serde(rename = "app.bsky.unspecced.defs#threadItemBlocked")] 21 + Blocked { author: BlockedAuthor }, 22 + } 23 + 24 + #[derive(Clone, Debug, Serialize)] 25 + #[serde(rename_all = "camelCase")] 26 + pub struct ThreadItemPost { 27 + pub post: PostView, 28 + pub more_parents: bool, 29 + pub more_replies: i32, 30 + pub op_thread: bool, 31 + pub hidden_by_threadgate: bool, 32 + pub muted_by_viewer: bool, 33 + }
+2 -2
migrations/2025-02-16-142357_posts/up.sql
··· 123 123 post_uri text not null, 124 124 125 125 hidden_replies text[] not null, 126 - allow text[] not null, 127 - allowed_lists text[] not null, 126 + allow text[], 127 + allowed_lists text[], 128 128 129 129 record jsonb not null, 130 130
+15
migrations/2025-09-27-171241_post-tweaks/down.sql
··· 1 + alter table posts 2 + drop column mentions, 3 + drop column violates_threadgate; 4 + 5 + drop trigger t_author_feed_ins_post on posts; 6 + drop trigger t_author_feed_del_post on posts; 7 + drop trigger t_author_feed_ins_repost on reposts; 8 + drop trigger t_author_feed_del_repost on reposts; 9 + 10 + drop function f_author_feed_ins_post; 11 + drop function f_author_feed_del_post; 12 + drop function f_author_feed_ins_repost; 13 + drop function f_author_feed_del_repost; 14 + 15 + drop table author_feeds;
+79
migrations/2025-09-27-171241_post-tweaks/up.sql
··· 1 + alter table posts 2 + add column mentions text[], 3 + add column violates_threadgate bool not null default false; 4 + 5 + create table author_feeds 6 + ( 7 + uri text primary key, 8 + cid text not null, 9 + post text not null, 10 + did text not null, 11 + typ text not null, 12 + sort_at timestamptz not null 13 + ); 14 + 15 + -- author_feeds post triggers 16 + create function f_author_feed_ins_post() returns trigger 17 + language plpgsql as 18 + $$ 19 + begin 20 + insert into author_feeds (uri, cid, post, did, typ, sort_at) 21 + VALUES (NEW.at_uri, NEW.cid, NEW.at_uri, NEW.did, 'post', NEW.created_at) 22 + on conflict do nothing; 23 + return NEW; 24 + end; 25 + $$; 26 + 27 + create trigger t_author_feed_ins_post 28 + before insert 29 + on posts 30 + for each row 31 + execute procedure f_author_feed_ins_post(); 32 + 33 + create function f_author_feed_del_post() returns trigger 34 + language plpgsql as 35 + $$ 36 + begin 37 + delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post'; 38 + return OLD; 39 + end; 40 + $$; 41 + 42 + create trigger t_author_feed_del_post 43 + before delete 44 + on posts 45 + for each row 46 + execute procedure f_author_feed_del_post(); 47 + 48 + -- author_feeds repost triggers 49 + create function f_author_feed_ins_repost() returns trigger 50 + language plpgsql as 51 + $$ 52 + begin 53 + insert into author_feeds (uri, cid, post, did, typ, sort_at) 54 + VALUES ('at://' || NEW.did || 'app.bsky.feed.repost' || NEW.rkey, NEW.post_cid, NEW.post, NEW.did, 'repost', NEW.created_at) 55 + on conflict do nothing; 56 + return NEW; 57 + end; 58 + $$; 59 + 60 + create trigger t_author_feed_ins_repost 61 + before insert 62 + on reposts 63 + for each row 64 + execute procedure f_author_feed_ins_repost(); 65 + 66 + create function f_author_feed_del_repost() returns trigger 67 + language plpgsql as 68 + $$ 69 + begin 70 + delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost'; 71 + return OLD; 72 + end; 73 + $$; 74 + 75 + create trigger t_author_feed_del_repost 76 + before delete 77 + on reposts 78 + for each row 79 + execute procedure f_author_feed_del_repost();
+1 -1
parakeet/Cargo.toml
··· 9 9 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 10 10 base64 = "0.22" 11 11 chrono = { version = "0.4.39", features = ["serde"] } 12 - dataloader = { path = "../dataloader-rs", default-features = false, features = ["runtime-tokio"] } 12 + dataloader = { path = "../dataloader-rs" } 13 13 deadpool = { version = "0.12.1", features = ["managed"] } 14 14 did-resolver = { path = "../did-resolver" } 15 15 diesel = { version = "2.2.6", features = ["chrono", "serde_json"] }
+95 -1
parakeet/src/db.rs
··· 1 1 use diesel::prelude::*; 2 - use diesel::sql_types::{Array, Bool, Nullable, Text}; 2 + use diesel::sql_types::{Array, Bool, Integer, Nullable, Text}; 3 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 4 4 use parakeet_db::{schema, types}; 5 + use parakeet_db::models::TextArray; 5 6 6 7 pub async fn get_actor_status( 7 8 conn: &mut AsyncPgConnection, ··· 196 197 .await 197 198 .optional() 198 199 } 200 + 201 + #[derive(Debug, QueryableByName)] 202 + #[diesel(check_for_backend(diesel::pg::Pg))] 203 + #[allow(unused)] 204 + pub struct ThreadItem { 205 + #[diesel(sql_type = Text)] 206 + pub at_uri: String, 207 + #[diesel(sql_type = Nullable<Text>)] 208 + pub parent_uri: Option<String>, 209 + #[diesel(sql_type = Nullable<Text>)] 210 + pub root_uri: Option<String>, 211 + #[diesel(sql_type = Integer)] 212 + pub depth: i32, 213 + } 214 + 215 + pub async fn get_thread_children( 216 + conn: &mut AsyncPgConnection, 217 + uri: &str, 218 + depth: i32, 219 + ) -> QueryResult<Vec<ThreadItem>> { 220 + diesel::sql_query(include_str!("sql/thread.sql")) 221 + .bind::<Text, _>(uri) 222 + .bind::<Integer, _>(depth) 223 + .load(conn) 224 + .await 225 + } 226 + 227 + pub async fn get_thread_children_branching( 228 + conn: &mut AsyncPgConnection, 229 + uri: &str, 230 + depth: i32, 231 + branching_factor: i32, 232 + ) -> QueryResult<Vec<ThreadItem>> { 233 + diesel::sql_query(include_str!("sql/thread_branching.sql")) 234 + .bind::<Text, _>(uri) 235 + .bind::<Integer, _>(depth) 236 + .bind::<Integer, _>(branching_factor) 237 + .load(conn) 238 + .await 239 + } 240 + 241 + #[derive(Debug, QueryableByName)] 242 + #[diesel(check_for_backend(diesel::pg::Pg))] 243 + pub struct HiddenThreadChildItem { 244 + #[diesel(sql_type = Text)] 245 + pub at_uri: String, 246 + } 247 + 248 + pub async fn get_thread_children_hidden( 249 + conn: &mut AsyncPgConnection, 250 + uri: &str, 251 + root: &str, 252 + ) -> QueryResult<Vec<HiddenThreadChildItem>> { 253 + diesel::sql_query(include_str!("sql/thread_v2_hidden_children.sql")) 254 + .bind::<Text, _>(uri) 255 + .bind::<Text, _>(root) 256 + .load(conn) 257 + .await 258 + } 259 + 260 + pub async fn get_thread_parents( 261 + conn: &mut AsyncPgConnection, 262 + uri: &str, 263 + height: i32, 264 + ) -> QueryResult<Vec<ThreadItem>> { 265 + diesel::sql_query(include_str!("sql/thread_parent.sql")) 266 + .bind::<Text, _>(uri) 267 + .bind::<Integer, _>(height) 268 + .load(conn) 269 + .await 270 + } 271 + 272 + pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> { 273 + schema::posts::table 274 + .select(schema::posts::root_uri) 275 + .find(&uri) 276 + .get_result(conn) 277 + .await 278 + .optional() 279 + .map(|v| v.flatten()) 280 + } 281 + 282 + pub async fn get_threadgate_hiddens( 283 + conn: &mut AsyncPgConnection, 284 + uri: &str, 285 + ) -> QueryResult<Option<TextArray>> { 286 + schema::threadgates::table 287 + .select(schema::threadgates::hidden_replies) 288 + .find(&uri) 289 + .get_result(conn) 290 + .await 291 + .optional() 292 + }
+5 -9
parakeet/src/hydration/labeler.rs
··· 42 42 likes: Option<i32>, 43 43 ) -> LabelerViewDetailed { 44 44 let reason_types = labeler.reasons.map(|v| { 45 - v.into_iter() 46 - .flatten() 47 - .filter_map(|v| ReasonType::from_str(&v).ok()) 45 + v.iter() 46 + .filter_map(|v| ReasonType::from_str(v).ok()) 48 47 .collect() 49 48 }); 50 49 ··· 74 73 }) 75 74 .collect(); 76 75 let subject_types = labeler.subject_types.map(|v| { 77 - v.into_iter() 78 - .flatten() 79 - .filter_map(|v| SubjectType::from_str(&v).ok()) 76 + v.iter() 77 + .filter_map(|v| SubjectType::from_str(v).ok()) 80 78 .collect() 81 79 }); 82 - let subject_collections = labeler 83 - .subject_collections 84 - .map(|v| v.into_iter().flatten().collect()); 80 + let subject_collections = labeler.subject_collections.map(Vec::from); 85 81 86 82 LabelerViewDetailed { 87 83 uri: format!("at://{}/app.bsky.labeler.service/self", labeler.did),
+8
parakeet/src/hydration/list.rs
··· 78 78 } 79 79 80 80 pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> { 81 + if lists.is_empty() { 82 + return HashMap::new(); 83 + } 84 + 81 85 let labels = self.get_label_many(&lists).await; 82 86 let viewers = self.get_list_viewer_states(&lists).await; 83 87 let lists = self.loaders.list.load_many(lists).await; ··· 103 107 } 104 108 105 109 pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> { 110 + if lists.is_empty() { 111 + return HashMap::new(); 112 + } 113 + 106 114 let labels = self.get_label_many(&lists).await; 107 115 let viewers = self.get_list_viewer_states(&lists).await; 108 116 let lists = self.loaders.list.load_many(lists).await;
+173 -92
parakeet/src/hydration/posts.rs
··· 3 3 use lexica::app_bsky::actor::ProfileViewBasic; 4 4 use lexica::app_bsky::embed::Embed; 5 5 use lexica::app_bsky::feed::{ 6 - BlockedAuthor, FeedViewPost, PostView, PostViewerState, ReplyRef, ReplyRefPost, ThreadgateView, 6 + BlockedAuthor, FeedReasonRepost, FeedViewPost, FeedViewPostReason, PostView, PostViewerState, 7 + ReplyRef, ReplyRefPost, ThreadgateView, 7 8 }; 8 9 use lexica::app_bsky::graph::ListViewBasic; 9 10 use lexica::app_bsky::RecordStats; ··· 32 33 } 33 34 } 34 35 36 + type HydratePostsRet = ( 37 + models::Post, 38 + ProfileViewBasic, 39 + Vec<models::Label>, 40 + Option<Embed>, 41 + Option<ThreadgateView>, 42 + Option<PostViewerState>, 43 + Option<PostStats>, 44 + ); 45 + 35 46 fn build_postview( 36 - post: models::Post, 37 - author: ProfileViewBasic, 38 - labels: Vec<models::Label>, 39 - embed: Option<Embed>, 40 - threadgate: Option<ThreadgateView>, 41 - viewer: Option<PostViewerState>, 42 - stats: Option<PostStats>, 47 + (post, author, labels, embed, threadgate, viewer, stats): HydratePostsRet, 43 48 ) -> PostView { 44 49 let stats = stats 45 50 .map(|stats| RecordStats { ··· 83 88 ) -> Option<ThreadgateView> { 84 89 let threadgate = threadgate?; 85 90 86 - let lists = threadgate 87 - .allowed_lists 88 - .iter() 89 - .flatten() 90 - .cloned() 91 - .collect::<Vec<_>>(); 91 + let lists = match threadgate.allowed_lists.as_ref() { 92 + Some(allowed_lists) => allowed_lists.clone().into(), 93 + None => Vec::new(), 94 + }; 92 95 let lists = self.hydrate_lists_basic(lists).await; 93 96 94 97 Some(build_threadgate_view( ··· 102 105 threadgates: Vec<models::Threadgate>, 103 106 ) -> HashMap<String, ThreadgateView> { 104 107 let lists = threadgates.iter().fold(Vec::new(), |mut acc, c| { 105 - acc.extend(c.allowed_lists.iter().flatten().cloned()); 108 + if let Some(lists) = &c.allowed_lists { 109 + acc.extend(lists.clone().0); 110 + } 106 111 acc 107 112 }); 108 113 let lists = self.hydrate_lists_basic(lists).await; ··· 110 115 threadgates 111 116 .into_iter() 112 117 .map(|threadgate| { 113 - let this_lists = threadgate 114 - .allowed_lists 115 - .iter() 116 - .filter_map(|v| v.clone().and_then(|v| lists.get(&v).cloned())) 117 - .collect(); 118 + let this_lists = match &threadgate.allowed_lists { 119 + Some(allowed_lists) => allowed_lists 120 + .iter() 121 + .filter_map(|v| lists.get(v).cloned()) 122 + .collect(), 123 + None => Vec::new(), 124 + }; 118 125 119 126 ( 120 127 threadgate.at_uri.clone(), ··· 133 140 let threadgate = self.hydrate_threadgate(threadgate).await; 134 141 let labels = self.get_label(&post.at_uri).await; 135 142 136 - Some(build_postview( 143 + Some(build_postview(( 137 144 post, author, labels, embed, threadgate, viewer, stats, 138 - )) 145 + ))) 139 146 } 140 147 141 - pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 148 + async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> { 142 149 let stats = self.loaders.post_stats.load_many(posts.clone()).await; 143 150 let posts = self.loaders.posts.load_many(posts).await; 144 151 ··· 148 155 .unzip::<_, _, Vec<_>, Vec<_>>(); 149 156 let authors = self.hydrate_profiles_basic(authors).await; 150 157 151 - let post_labels = self.get_label_many(&post_uris).await; 152 - let viewer_data = self.get_post_viewer_states(&post_uris).await; 158 + let mut post_labels = self.get_label_many(&post_uris).await; 159 + let mut viewer_data = self.get_post_viewer_states(&post_uris).await; 153 160 154 161 let threadgates = posts 155 162 .values() ··· 157 164 .collect(); 158 165 let threadgates = self.hydrate_threadgates(threadgates).await; 159 166 160 - let embeds = self.hydrate_embeds(post_uris).await; 167 + let mut embeds = self.hydrate_embeds(post_uris).await; 161 168 162 169 posts 163 170 .into_iter() 164 171 .filter_map(|(uri, (post, threadgate))| { 165 - let author = authors.get(&post.did)?; 166 - let embed = embeds.get(&uri).cloned(); 172 + let author = authors.get(&post.did)?.clone(); 173 + let embed = embeds.remove(&uri); 167 174 let threadgate = threadgate.and_then(|tg| threadgates.get(&tg.at_uri).cloned()); 168 - let labels = post_labels.get(&uri).cloned().unwrap_or_default(); 175 + let labels = post_labels.remove(&uri).unwrap_or_default(); 169 176 let stats = stats.get(&uri).cloned(); 170 - let viewer = viewer_data.get(&uri).cloned(); 177 + let viewer = viewer_data.remove(&uri); 171 178 172 179 Some(( 173 180 uri, 174 - build_postview( 175 - post, 176 - author.to_owned(), 177 - labels, 178 - embed, 179 - threadgate, 180 - viewer, 181 - stats, 182 - ), 181 + (post, author, labels, embed, threadgate, viewer, stats), 183 182 )) 184 183 }) 185 184 .collect() 186 185 } 187 186 188 - pub async fn hydrate_feed_posts(&self, posts: Vec<String>) -> HashMap<String, FeedViewPost> { 189 - let stats = self.loaders.post_stats.load_many(posts.clone()).await; 190 - let posts = self.loaders.posts.load_many(posts).await; 187 + pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 188 + self.hydrate_posts_inner(posts) 189 + .await 190 + .into_iter() 191 + .map(|(uri, data)| (uri, build_postview(data))) 192 + .collect() 193 + } 191 194 192 - let (authors, post_uris) = posts 193 - .values() 194 - .map(|(post, _)| (post.did.clone(), post.at_uri.clone())) 195 - .unzip::<_, _, Vec<_>, Vec<_>>(); 196 - let authors = self.hydrate_profiles_basic(authors).await; 197 - 198 - let post_labels = self.get_label_many(&post_uris).await; 199 - let viewer_data = self.get_post_viewer_states(&post_uris).await; 200 - let embeds = self.hydrate_embeds(post_uris).await; 195 + pub async fn hydrate_feed_posts( 196 + &self, 197 + posts: Vec<RawFeedItem>, 198 + author_threads_only: bool, 199 + ) -> Vec<FeedViewPost> { 200 + let post_uris = posts 201 + .iter() 202 + .map(|item| item.post_uri().to_string()) 203 + .collect::<Vec<_>>(); 204 + let mut posts_hyd = self.hydrate_posts_inner(post_uris).await; 201 205 202 - let reply_refs = posts 206 + // we shouldn't show the parent when the post violates a threadgate. 207 + let reply_refs = posts_hyd 203 208 .values() 204 - .flat_map(|(post, _)| [post.parent_uri.clone(), post.root_uri.clone()]) 209 + .filter(|(post, ..)| !post.violates_threadgate) 210 + .flat_map(|(post, ..)| [post.parent_uri.clone(), post.root_uri.clone()]) 205 211 .flatten() 206 212 .collect::<Vec<_>>(); 213 + let reply_posts = self.hydrate_posts(reply_refs).await; 207 214 208 - let reply_posts = self.hydrate_posts(reply_refs).await; 215 + let repost_profiles = posts 216 + .iter() 217 + .filter_map(|item| item.repost_by()) 218 + .collect::<Vec<_>>(); 219 + let profiles_hydrated = self.hydrate_profiles_basic(repost_profiles).await; 209 220 210 221 posts 211 222 .into_iter() 212 - .filter_map(|(post_uri, (post, _))| { 213 - let author = authors.get(&post.did)?; 223 + .filter_map(|item| { 224 + let post = posts_hyd.remove(item.post_uri())?; 225 + let context = item.context(); 226 + 227 + let reply = if let RawFeedItem::Post { .. } = item { 228 + let root_uri = post.0.root_uri.as_ref(); 229 + let parent_uri = post.0.parent_uri.as_ref(); 230 + 231 + let (root, parent) = if author_threads_only { 232 + if root_uri.is_some() && parent_uri.is_some() { 233 + let root = root_uri.and_then(|uri| posts_hyd.get(uri))?; 234 + let parent = parent_uri.and_then(|uri| posts_hyd.get(uri))?; 235 + 236 + let root = build_postview(root.clone()); 237 + let parent = build_postview(parent.clone()); 214 238 215 - let root = post.root_uri.as_ref().and_then(|uri| reply_posts.get(uri)); 216 - let parent = post 217 - .parent_uri 218 - .as_ref() 219 - .and_then(|uri| reply_posts.get(uri)); 239 + (Some(root), Some(parent)) 240 + } else { 241 + (None, None) 242 + } 243 + } else { 244 + let root = root_uri.and_then(|uri| reply_posts.get(uri)).cloned(); 245 + let parent = parent_uri.and_then(|uri| reply_posts.get(uri)).cloned(); 220 246 221 - let reply = if post.parent_uri.is_some() && post.root_uri.is_some() { 222 - Some(ReplyRef { 223 - root: root.cloned().map(postview_to_replyref).unwrap_or( 224 - ReplyRefPost::NotFound { 225 - uri: post.root_uri.as_ref().unwrap().clone(), 226 - not_found: true, 227 - }, 228 - ), 229 - parent: parent.cloned().map(postview_to_replyref).unwrap_or( 230 - ReplyRefPost::NotFound { 231 - uri: post.parent_uri.as_ref().unwrap().clone(), 232 - not_found: true, 233 - }, 234 - ), 235 - grandparent_author: None, 236 - }) 247 + (root, parent) 248 + }; 249 + 250 + if root_uri.is_some() || parent_uri.is_some() { 251 + Some(ReplyRef { 252 + root: root.map(postview_to_replyref).unwrap_or( 253 + ReplyRefPost::NotFound { 254 + uri: root_uri.unwrap().to_owned(), 255 + not_found: true, 256 + }, 257 + ), 258 + parent: parent.map(postview_to_replyref).unwrap_or( 259 + ReplyRefPost::NotFound { 260 + uri: parent_uri.unwrap().to_owned(), 261 + not_found: true, 262 + }, 263 + ), 264 + grandparent_author: None, 265 + }) 266 + } else { 267 + None 268 + } 237 269 } else { 238 270 None 239 271 }; 240 272 241 - let embed = embeds.get(&post_uri).cloned(); 242 - let labels = post_labels.get(&post_uri).cloned().unwrap_or_default(); 243 - let stats = stats.get(&post_uri).cloned(); 244 - let viewer = viewer_data.get(&post_uri).cloned(); 245 - let post = 246 - build_postview(post, author.to_owned(), labels, embed, None, viewer, stats); 273 + let reason = match item { 274 + RawFeedItem::Repost { uri, by, at, .. } => { 275 + Some(FeedViewPostReason::Repost(FeedReasonRepost { 276 + by: profiles_hydrated.get(&by).cloned()?, 277 + uri: Some(uri), 278 + cid: None, 279 + indexed_at: at, 280 + })) 281 + } 282 + RawFeedItem::Pin { .. } => Some(FeedViewPostReason::Pin), 283 + _ => None, 284 + }; 247 285 248 - Some(( 249 - post_uri, 250 - FeedViewPost { 251 - post, 252 - reply, 253 - reason: None, 254 - feed_context: None, 255 - }, 256 - )) 286 + let post = build_postview(post); 287 + 288 + Some(FeedViewPost { 289 + post, 290 + reply, 291 + reason, 292 + feed_context: context, 293 + }) 257 294 }) 258 295 .collect() 259 296 } ··· 297 334 _ => ReplyRefPost::Post(post), 298 335 } 299 336 } 337 + 338 + #[derive(Debug)] 339 + pub enum RawFeedItem { 340 + Pin { 341 + uri: String, 342 + context: Option<String>, 343 + }, 344 + Post { 345 + uri: String, 346 + context: Option<String>, 347 + }, 348 + Repost { 349 + uri: String, 350 + post: String, 351 + by: String, 352 + at: chrono::DateTime<chrono::Utc>, 353 + context: Option<String>, 354 + }, 355 + } 356 + 357 + impl RawFeedItem { 358 + fn post_uri(&self) -> &str { 359 + match self { 360 + RawFeedItem::Pin { uri, .. } => uri, 361 + RawFeedItem::Post { uri, .. } => uri, 362 + RawFeedItem::Repost { post, .. } => post, 363 + } 364 + } 365 + 366 + fn repost_by(&self) -> Option<String> { 367 + match self { 368 + RawFeedItem::Repost { by, .. } => Some(by.clone()), 369 + _ => None, 370 + } 371 + } 372 + 373 + fn context(&self) -> Option<String> { 374 + match self { 375 + RawFeedItem::Pin { context, .. } => context.clone(), 376 + RawFeedItem::Post { context, .. } => context.clone(), 377 + RawFeedItem::Repost { context, .. } => context.clone(), 378 + } 379 + } 380 + }
+2
parakeet/src/hydration/profile.rs
··· 201 201 labels: map_labels(labels), 202 202 verification, 203 203 status, 204 + pronouns: profile.pronouns, 204 205 created_at: profile.created_at.and_utc(), 205 206 } 206 207 } ··· 229 230 labels: map_labels(labels), 230 231 verification, 231 232 status, 233 + pronouns: profile.pronouns, 232 234 created_at: profile.created_at.and_utc(), 233 235 indexed_at: profile.indexed_at, 234 236 }
+3 -7
parakeet/src/hydration/starter_packs.rs
··· 96 96 let feeds = sp 97 97 .feeds 98 98 .clone() 99 - .unwrap_or_default() 100 - .into_iter() 101 - .flatten() 102 - .collect(); 103 - let feeds = self.hydrate_feedgens(feeds).await.into_values().collect(); 99 + .unwrap_or_default(); 100 + let feeds = self.hydrate_feedgens(feeds.into()).await.into_values().collect(); 104 101 105 102 Some(build_spview(sp, creator, labels, list, feeds)) 106 103 } ··· 119 116 let feeds = packs 120 117 .values() 121 118 .filter_map(|pack| pack.feeds.clone()) 122 - .flat_map(|feeds| feeds.into_iter().flatten()) 119 + .flat_map(Vec::from) 123 120 .collect(); 124 121 125 122 let creators = self.hydrate_profiles_basic(creators).await; ··· 133 130 let list = lists.get(&pack.list).cloned(); 134 131 let feeds = pack.feeds.as_ref().map(|v| { 135 132 v.iter() 136 - .flatten() 137 133 .filter_map(|feed| feeds.get(feed).cloned()) 138 134 .collect() 139 135 });
+4 -1
parakeet/src/loaders.rs
··· 4 4 use dataloader::async_cached::Loader; 5 5 use dataloader::non_cached::Loader as NonCachedLoader; 6 6 use dataloader::BatchFn; 7 + use diesel::dsl::sql; 7 8 use diesel::prelude::*; 8 9 use diesel_async::pooled_connection::deadpool::Pool; 9 10 use diesel_async::{AsyncPgConnection, RunQueryDsl}; ··· 368 369 let mut conn = self.0.get().await.unwrap(); 369 370 370 371 let res = schema::posts::table 371 - .left_join(schema::threadgates::table) 372 + .left_join(schema::threadgates::table.on( 373 + schema::threadgates::post_uri.eq(sql("coalesce(posts.root_uri, posts.at_uri)")), 374 + )) 372 375 .select(( 373 376 models::Post::as_select(), 374 377 Option::<models::Threadgate>::as_select(),
+3 -3
parakeet/src/sql/thread.sql
··· 1 - with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth 1 + with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth 2 2 from posts 3 - where parent_uri = $1 3 + where parent_uri = $1 and violates_threadgate=FALSE 4 4 union all 5 5 select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1 6 6 from posts p 7 7 join thread on p.parent_uri = thread.at_uri 8 - where thread.depth <= $2) 8 + where thread.depth <= $2 and p.violates_threadgate=FALSE) 9 9 select * 10 10 from thread 11 11 order by depth desc;
+13
parakeet/src/sql/thread_branching.sql
··· 1 + with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth 2 + from posts 3 + where parent_uri = $1 4 + and violates_threadgate = FALSE 5 + union all 6 + (select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1 7 + from posts p 8 + join thread on p.parent_uri = thread.at_uri 9 + where thread.depth <= $2 10 + and violates_threadgate = FALSE 11 + LIMIT $3)) 12 + select * 13 + from thread;
+4 -2
parakeet/src/sql/thread_parent.sql
··· 1 1 with recursive parents as (select at_uri, cid, parent_uri, root_uri, 0 as depth 2 2 from posts 3 - where at_uri = (select parent_uri from posts where at_uri = $1) 3 + where 4 + at_uri = (select parent_uri from posts where at_uri = $1 and violates_threadgate = FALSE) 4 5 union all 5 6 select p.at_uri, p.cid, p.parent_uri, p.root_uri, parents.depth + 1 6 7 from posts p 7 8 join parents on p.at_uri = parents.parent_uri 8 - where parents.depth <= $2) 9 + where parents.depth <= $2 10 + and p.violates_threadgate = FALSE) 9 11 select * 10 12 from parents 11 13 order by depth desc;
+6
parakeet/src/sql/thread_v2_hidden_children.sql
··· 1 + select at_uri 2 + from posts 3 + where parent_uri = $1 4 + and at_uri = any (select unnest(hidden_replies) 5 + from threadgates 6 + where post_uri = $2)
+7 -8
parakeet/src/xrpc/app_bsky/feed/likes.rs
··· 1 + use crate::hydration::posts::RawFeedItem; 1 2 use crate::hydration::StatefulHydrator; 2 3 use crate::xrpc::error::{Error, XrpcResult}; 3 4 use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; ··· 57 58 .last() 58 59 .map(|(last, _)| last.timestamp_millis().to_string()); 59 60 60 - let at_uris = results 61 + let raw_feed = results 61 62 .iter() 62 - .map(|(_, uri)| uri.clone()) 63 + .map(|(_, uri)| RawFeedItem::Post { 64 + uri: uri.clone(), 65 + context: None, 66 + }) 63 67 .collect::<Vec<_>>(); 64 68 65 - let mut posts = hyd.hydrate_feed_posts(at_uris).await; 66 - 67 - let feed: Vec<_> = results 68 - .into_iter() 69 - .filter_map(|(_, uri)| posts.remove(&uri)) 70 - .collect(); 69 + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; 71 70 72 71 Ok(Json(FeedRes { cursor, feed })) 73 72 }
+85 -116
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 1 + use crate::hydration::posts::RawFeedItem; 1 2 use crate::hydration::StatefulHydrator; 2 3 use crate::xrpc::app_bsky::graph::lists::ListWithCursorQuery; 3 4 use crate::xrpc::error::{Error, XrpcResult}; ··· 16 17 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 17 18 use lexica::app_bsky::actor::ProfileView; 18 19 use lexica::app_bsky::feed::{ 19 - BlockedAuthor, FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason, 20 - PostView, SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView, 20 + BlockedAuthor, FeedSkeletonResponse, FeedViewPost, PostView, SkeletonReason, ThreadViewPost, 21 + ThreadViewPostType, ThreadgateView, 21 22 }; 22 - use parakeet_db::schema; 23 + use parakeet_db::{models, schema}; 23 24 use reqwest::Url; 24 25 use serde::{Deserialize, Serialize}; 25 26 use std::collections::HashMap; ··· 113 114 114 115 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 115 116 116 - let at_uris = skeleton.feed.iter().map(|v| v.post.clone()).collect(); 117 117 let repost_skeleton = skeleton 118 118 .feed 119 119 .iter() ··· 122 122 _ => None, 123 123 }) 124 124 .collect::<Vec<_>>(); 125 + let mut repost_data = get_skeleton_repost_data(&mut conn, repost_skeleton).await; 125 126 126 - let mut posts = hyd.hydrate_feed_posts(at_uris).await; 127 - let mut repost_data = get_skeleton_repost_data(&mut conn, &hyd, repost_skeleton).await; 128 - 129 - let feed = skeleton 127 + let raw_feed = skeleton 130 128 .feed 131 129 .into_iter() 132 - .filter_map(|item| { 133 - let mut post = posts.remove(&item.post)?; 134 - let reason = match item.reason { 135 - Some(SkeletonReason::Repost { repost }) => { 136 - repost_data.remove(&repost).map(FeedViewPostReason::Repost) 137 - } 138 - Some(SkeletonReason::Pin {}) => Some(FeedViewPostReason::Pin), 139 - _ => None, 140 - }; 141 - 142 - post.reason = reason; 143 - post.feed_context = item.feed_context; 144 - 145 - Some(post) 130 + .filter_map(|v| match v.reason { 131 + Some(SkeletonReason::Repost { repost }) => { 132 + repost_data 133 + .remove_entry(&repost) 134 + .map(|(uri, (by, at))| RawFeedItem::Repost { 135 + uri, 136 + post: v.post, 137 + by, 138 + at: at.and_utc(), 139 + context: v.feed_context, 140 + }) 141 + } 142 + Some(SkeletonReason::Pin {}) => Some(RawFeedItem::Pin { 143 + uri: v.post, 144 + context: v.feed_context, 145 + }), 146 + None => Some(RawFeedItem::Post { 147 + uri: v.post, 148 + context: v.feed_context, 149 + }), 146 150 }) 147 151 .collect(); 152 + 153 + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; 148 154 149 155 Ok(Json(FeedRes { 150 156 cursor: skeleton.cursor, ··· 152 158 })) 153 159 } 154 160 155 - #[derive(Debug, Deserialize)] 161 + #[derive(Debug, Default, Eq, PartialEq, Deserialize)] 156 162 #[serde(rename_all = "snake_case")] 157 163 pub enum GetAuthorFeedFilter { 164 + #[default] 158 165 PostsWithReplies, 159 166 PostsNoReplies, 160 167 PostsWithMedia, 161 168 PostsAndAuthorThreads, 162 169 PostsWithVideo, 163 - } 164 - 165 - impl Default for GetAuthorFeedFilter { 166 - fn default() -> Self { 167 - Self::PostsWithReplies 168 - } 169 170 } 170 171 171 172 #[derive(Debug, Deserialize)] ··· 209 210 210 211 let pin = match query.include_pins && query.cursor.is_none() { 211 212 false => None, 212 - true => match crate::db::get_pinned_post_uri(&mut conn, &did).await? { 213 - Some(post) => hyd.hydrate_post(post).await, 214 - None => None, 215 - }, 213 + true => crate::db::get_pinned_post_uri(&mut conn, &did).await?, 216 214 }; 217 215 218 216 let limit = query.limit.unwrap_or(50).clamp(1, 100); 219 217 220 - let mut posts_query = schema::posts::table 221 - .select((schema::posts::created_at, schema::posts::at_uri)) 222 - .filter(schema::posts::did.eq(did)) 218 + let mut posts_query = schema::author_feeds::table 219 + .select(models::AuthorFeedItem::as_select()) 220 + .left_join(schema::posts::table.on(schema::posts::at_uri.eq(schema::author_feeds::post))) 221 + .filter(schema::author_feeds::did.eq(&did)) 223 222 .into_boxed(); 224 223 225 224 if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 226 - posts_query = posts_query.filter(schema::posts::created_at.lt(cursor)); 225 + posts_query = posts_query.filter(schema::author_feeds::sort_at.lt(cursor)); 227 226 } 228 227 228 + let author_threads_only = query.filter == GetAuthorFeedFilter::PostsAndAuthorThreads; 229 229 posts_query = match query.filter { 230 - GetAuthorFeedFilter::PostsWithReplies => posts_query, 230 + GetAuthorFeedFilter::PostsWithReplies => { 231 + posts_query.filter(schema::author_feeds::typ.eq("post")) 232 + } 231 233 GetAuthorFeedFilter::PostsNoReplies => { 232 234 posts_query.filter(schema::posts::parent_uri.is_null()) 233 235 } 234 - GetAuthorFeedFilter::PostsWithMedia => posts_query.filter(embed_type_filter(&[ 235 - "app.bsky.embed.video", 236 - "app.bsky.embed.images", 237 - ])), 236 + GetAuthorFeedFilter::PostsWithMedia => posts_query.filter( 237 + embed_type_filter(&["app.bsky.embed.video", "app.bsky.embed.images"]) 238 + .and(schema::author_feeds::typ.eq("post")), 239 + ), 238 240 GetAuthorFeedFilter::PostsAndAuthorThreads => posts_query.filter( 239 241 (schema::posts::parent_uri 240 - .like(format!("at://{}/%", &query.actor)) 242 + .like(format!("at://{did}/%")) 241 243 .or(schema::posts::parent_uri.is_null())) 242 244 .and( 243 245 schema::posts::root_uri 244 - .like(format!("at://{}/%", &query.actor)) 246 + .like(format!("at://{did}/%")) 245 247 .or(schema::posts::root_uri.is_null()), 246 248 ), 247 249 ), 248 - GetAuthorFeedFilter::PostsWithVideo => { 249 - posts_query.filter(embed_type_filter(&["app.bsky.embed.video"])) 250 - } 250 + GetAuthorFeedFilter::PostsWithVideo => posts_query.filter( 251 + embed_type_filter(&["app.bsky.embed.video"]).and(schema::author_feeds::typ.eq("post")), 252 + ), 251 253 }; 252 254 253 255 let results = posts_query 254 - .order(schema::posts::created_at.desc()) 256 + .order(schema::author_feeds::sort_at.desc()) 255 257 .limit(limit as i64) 256 - .load::<(chrono::DateTime<chrono::Utc>, String)>(&mut conn) 258 + .load(&mut conn) 257 259 .await?; 258 260 259 261 let cursor = results 260 262 .last() 261 - .map(|(last, _)| last.timestamp_millis().to_string()); 263 + .map(|item| item.sort_at.timestamp_millis().to_string()); 262 264 263 - let at_uris = results 264 - .iter() 265 - .map(|(_, uri)| uri.clone()) 265 + let mut raw_feed = results 266 + .into_iter() 267 + .filter_map(|item| match &*item.typ { 268 + "post" => Some(RawFeedItem::Post { 269 + uri: item.post, 270 + context: None, 271 + }), 272 + "repost" => Some(RawFeedItem::Repost { 273 + uri: item.uri, 274 + post: item.post, 275 + by: item.did, 276 + at: item.sort_at, 277 + context: None, 278 + }), 279 + _ => None, 280 + }) 266 281 .collect::<Vec<_>>(); 267 282 268 - let mut posts = hyd.hydrate_feed_posts(at_uris).await; 269 - 270 - let mut feed: Vec<_> = results 271 - .into_iter() 272 - .filter_map(|(_, uri)| posts.remove(&uri)) 273 - .collect(); 274 - 275 283 if let Some(post) = pin { 276 - feed.insert( 284 + raw_feed.insert( 277 285 0, 278 - FeedViewPost { 279 - post, 280 - reply: None, 281 - reason: Some(FeedViewPostReason::Pin), 282 - feed_context: None, 286 + RawFeedItem::Pin { 287 + uri: post, 288 + context: None, 283 289 }, 284 290 ); 285 291 } 292 + 293 + let feed = hyd.hydrate_feed_posts(raw_feed, author_threads_only).await; 286 294 287 295 Ok(Json(FeedRes { cursor, feed })) 288 296 } ··· 325 333 .last() 326 334 .map(|(last, _)| last.timestamp_millis().to_string()); 327 335 328 - let at_uris = results 336 + let raw_feed = results 329 337 .iter() 330 - .map(|(_, uri)| uri.clone()) 338 + .map(|(_, uri)| RawFeedItem::Post { 339 + uri: uri.clone(), 340 + context: None, 341 + }) 331 342 .collect::<Vec<_>>(); 332 343 333 - let mut posts = hyd.hydrate_feed_posts(at_uris).await; 334 - 335 - let feed = results 336 - .into_iter() 337 - .filter_map(|(_, uri)| posts.remove(&uri)) 338 - .collect(); 344 + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; 339 345 340 346 Ok(Json(FeedRes { cursor, feed })) 341 347 } ··· 355 361 pub threadgate: Option<ThreadgateView>, 356 362 } 357 363 358 - #[derive(Debug, QueryableByName)] 359 - #[diesel(check_for_backend(diesel::pg::Pg))] 360 - struct ThreadItem { 361 - #[diesel(sql_type = diesel::sql_types::Text)] 362 - at_uri: String, 363 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 364 - parent_uri: Option<String>, 365 - // #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 366 - // root_uri: Option<String>, 367 - #[diesel(sql_type = diesel::sql_types::Integer)] 368 - depth: i32, 369 - } 370 - 371 364 pub async fn get_post_thread( 372 365 State(state): State<GlobalState>, 373 366 AtpAcceptLabelers(labelers): AtpAcceptLabelers, ··· 403 396 } 404 397 } 405 398 406 - let replies = diesel::sql_query(include_str!("../../../sql/thread.sql")) 407 - .bind::<diesel::sql_types::Text, _>(&uri) 408 - .bind::<diesel::sql_types::Integer, _>(depth as i32) 409 - .load::<ThreadItem>(&mut conn) 410 - .await?; 411 - 412 - let parents = diesel::sql_query(include_str!("../../../sql/thread_parent.sql")) 413 - .bind::<diesel::sql_types::Text, _>(&uri) 414 - .bind::<diesel::sql_types::Integer, _>(parent_height as i32) 415 - .load::<ThreadItem>(&mut conn) 416 - .await?; 399 + let replies = crate::db::get_thread_children(&mut conn, &uri, depth as i32).await?; 400 + let parents = crate::db::get_thread_parents(&mut conn, &uri, parent_height as i32).await?; 417 401 418 402 let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect(); 419 403 let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect(); ··· 669 653 } 670 654 } 671 655 672 - async fn get_skeleton_repost_data<'a>( 656 + async fn get_skeleton_repost_data( 673 657 conn: &mut AsyncPgConnection, 674 - hyd: &StatefulHydrator<'a>, 675 658 reposts: Vec<String>, 676 - ) -> HashMap<String, FeedReasonRepost> { 659 + ) -> HashMap<String, (String, NaiveDateTime)> { 677 660 let Ok(repost_data) = schema::records::table 678 661 .select(( 679 662 schema::records::at_uri, ··· 687 670 return HashMap::new(); 688 671 }; 689 672 690 - let profiles = repost_data.iter().map(|(_, did, _)| did.clone()).collect(); 691 - let profiles = hyd.hydrate_profiles_basic(profiles).await; 692 - 693 673 repost_data 694 674 .into_iter() 695 - .filter_map(|(uri, did, indexed_at)| { 696 - let by = profiles.get(&did).cloned()?; 697 - 698 - let repost = FeedReasonRepost { 699 - by, 700 - uri: Some(uri.clone()), 701 - cid: None, // okay, we do have this, but the app doesn't seem to be bothered about not setting it. 702 - indexed_at: indexed_at.and_utc(), 703 - }; 704 - 705 - Some((uri, repost)) 706 - }) 675 + .map(|(uri, did, at)| (uri, (did, at))) 707 676 .collect() 708 677 } 709 678
+3
parakeet/src/xrpc/app_bsky/mod.rs
··· 6 6 mod feed; 7 7 mod graph; 8 8 mod labeler; 9 + mod unspecced; 9 10 10 11 #[rustfmt::skip] 11 12 pub fn routes() -> Router<crate::GlobalState> { ··· 64 65 // TODO: app.bsky.notification.putActivitySubscriptions 65 66 // TODO: app.bsky.notification.putPreferences 66 67 // TODO: app.bsky.notification.putPreferencesV2 68 + .route("/app.bsky.unspecced.getPostThreadV2", get(unspecced::thread_v2::get_post_thread_v2)) 69 + .route("/app.bsky.unspecced.getPostThreadOtherV2", get(unspecced::thread_v2::get_post_thread_other_v2)) 67 70 } 68 71 69 72 async fn not_implemented() -> axum::http::StatusCode {
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
··· 1 + pub mod thread_v2;
+382
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
··· 1 + use crate::db::ThreadItem; 2 + use crate::hydration::StatefulHydrator; 3 + use crate::xrpc::error::{Error, XrpcResult}; 4 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 5 + use crate::xrpc::normalise_at_uri; 6 + use crate::GlobalState; 7 + use axum::extract::{Query, State}; 8 + use axum::Json; 9 + use itertools::Itertools; 10 + use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadgateView}; 11 + use lexica::app_bsky::unspecced::{ThreadItemPost, ThreadV2Item, ThreadV2ItemType}; 12 + use serde::{Deserialize, Serialize}; 13 + use std::cmp::Ordering; 14 + use std::collections::{HashMap, HashSet}; 15 + 16 + const THREAD_PARENTS: usize = 50; 17 + const DEFAULT_BRANCHING: u32 = 10; 18 + const DEFAULT_DEPTH: u32 = 6; 19 + 20 + #[derive(Copy, Clone, Debug, Default, Deserialize)] 21 + #[serde(rename_all = "lowercase")] 22 + pub enum PostThreadSort { 23 + Newest, 24 + #[default] 25 + Oldest, 26 + Top, 27 + } 28 + 29 + #[derive(Debug, Deserialize)] 30 + #[serde(rename_all = "camelCase")] 31 + pub struct GetPostThreadV2Req { 32 + pub anchor: String, 33 + pub above: Option<bool>, 34 + pub below: Option<u32>, 35 + pub branching_factor: Option<u32>, 36 + #[serde(default)] 37 + pub sort: PostThreadSort, 38 + } 39 + 40 + #[derive(Debug, Serialize)] 41 + #[serde(rename_all = "camelCase")] 42 + pub struct GetPostThreadV2Res { 43 + pub thread: Vec<ThreadV2Item>, 44 + #[serde(skip_serializing_if = "Option::is_none")] 45 + pub threadgate: Option<ThreadgateView>, 46 + pub has_other_replies: bool, 47 + } 48 + 49 + pub async fn get_post_thread_v2( 50 + State(state): State<GlobalState>, 51 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 52 + maybe_auth: Option<AtpAuth>, 53 + Query(query): Query<GetPostThreadV2Req>, 54 + ) -> XrpcResult<Json<GetPostThreadV2Res>> { 55 + let mut conn = state.pool.get().await?; 56 + let maybe_did = maybe_auth.clone().map(|v| v.0); 57 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 58 + 59 + let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 60 + let depth = query.below.unwrap_or(DEFAULT_DEPTH).clamp(0, 20) as i32; 61 + let branching_factor = query 62 + .branching_factor 63 + .unwrap_or(DEFAULT_BRANCHING) 64 + .clamp(0, 100) as i32; 65 + 66 + let anchor = hyd 67 + .hydrate_post(uri.clone()) 68 + .await 69 + .ok_or(Error::not_found())?; 70 + 71 + if let Some(v) = &anchor.author.viewer { 72 + if v.blocked_by || v.blocking.is_some() { 73 + let block = ThreadV2ItemType::Blocked { 74 + author: BlockedAuthor { 75 + did: anchor.author.did, 76 + viewer: anchor.author.viewer, 77 + }, 78 + }; 79 + 80 + return Ok(Json(GetPostThreadV2Res { 81 + thread: vec![ThreadV2Item { 82 + uri, 83 + depth: 0, 84 + value: block, 85 + }], 86 + threadgate: anchor.threadgate, 87 + has_other_replies: false, 88 + })); 89 + } 90 + } 91 + 92 + // get the root post URI (if there is one) and return its author's DID. 93 + let root_uri = crate::db::get_root_post(&mut conn, &uri) 94 + .await? 95 + .unwrap_or(uri.clone()); 96 + let root_did = root_uri[5..].split('/').collect::<Vec<_>>()[0]; 97 + 98 + let replies = 99 + crate::db::get_thread_children_branching(&mut conn, &uri, depth, branching_factor + 1) 100 + .await?; 101 + let reply_uris = replies 102 + .iter() 103 + .map(|item| item.at_uri.clone()) 104 + .collect::<Vec<_>>(); 105 + 106 + // bluesky seems to use -50 atm. we get 1 extra to know if to set more_parents. 107 + let parents = match query.above.unwrap_or(true) { 108 + true => crate::db::get_thread_parents(&mut conn, &uri, THREAD_PARENTS as i32 + 1).await?, 109 + false => vec![], 110 + }; 111 + let parent_uris = parents 112 + .iter() 113 + .map(|item| item.at_uri.clone()) 114 + .collect::<Vec<_>>(); 115 + 116 + let (mut replies_hyd, mut parents_hyd) = tokio::join!( 117 + hyd.hydrate_posts(reply_uris), 118 + hyd.hydrate_posts(parent_uris), 119 + ); 120 + 121 + let threadgate = anchor.threadgate.clone(); 122 + let hidden: HashSet<_, std::hash::RandomState> = match &threadgate { 123 + Some(tg) => crate::db::get_threadgate_hiddens(&mut conn, &tg.uri).await?, 124 + None => None, 125 + } 126 + .map(|hiddens| HashSet::from_iter(Vec::from(hiddens))) 127 + .unwrap_or_default(); 128 + 129 + let root_has_more = parents.len() > THREAD_PARENTS; 130 + let mut is_op_thread = true; 131 + 132 + let mut thread = Vec::with_capacity(1 + replies.len() + parents.len()); 133 + 134 + thread.extend( 135 + parents 136 + .into_iter() 137 + .tail(THREAD_PARENTS) 138 + .enumerate() 139 + .map(|(idx, item)| { 140 + let value = parents_hyd 141 + .remove(&item.at_uri) 142 + .map(|post| { 143 + if let Some(v) = &post.author.viewer { 144 + if v.blocked_by || v.blocking.is_some() { 145 + return ThreadV2ItemType::Blocked { 146 + author: BlockedAuthor { 147 + did: post.author.did, 148 + viewer: post.author.viewer, 149 + }, 150 + }; 151 + } 152 + } 153 + 154 + let op_thread = (is_op_thread 155 + || item.root_uri.is_none() && item.parent_uri.is_none()) 156 + && post.author.did == root_did; 157 + 158 + ThreadV2ItemType::Post(ThreadItemPost { 159 + post, 160 + more_parents: idx == 0 && root_has_more, 161 + more_replies: 0, 162 + op_thread, 163 + hidden_by_threadgate: false, 164 + muted_by_viewer: false, 165 + }) 166 + }) 167 + .unwrap_or(ThreadV2ItemType::NotFound {}); 168 + 169 + ThreadV2Item { 170 + uri: item.at_uri, 171 + depth: -item.depth - 1, 172 + value, 173 + } 174 + }), 175 + ); 176 + 177 + is_op_thread = is_op_thread && anchor.author.did == root_did; 178 + thread.push(ThreadV2Item { 179 + uri: uri.clone(), 180 + depth: 0, 181 + value: ThreadV2ItemType::Post(ThreadItemPost { 182 + post: anchor, 183 + more_parents: false, 184 + more_replies: 0, 185 + op_thread: is_op_thread, 186 + hidden_by_threadgate: false, 187 + muted_by_viewer: false, 188 + }), 189 + }); 190 + 191 + let mut replies_grouped = replies 192 + .into_iter() 193 + .into_group_map_by(|item| item.parent_uri.clone().unwrap_or_default()); 194 + 195 + // start with the anchor 196 + let (children, has_other_replies) = build_thread_children( 197 + &mut replies_grouped, 198 + &mut replies_hyd, 199 + &hidden, 200 + &uri, 201 + is_op_thread, 202 + 1, 203 + &BuildThreadChildrenOpts { 204 + root_did, 205 + sort: query.sort, 206 + maybe_did: &maybe_did, 207 + max_depth: depth, 208 + }, 209 + ); 210 + thread.extend(children); 211 + 212 + Ok(Json(GetPostThreadV2Res { 213 + thread, 214 + threadgate, 215 + has_other_replies, 216 + })) 217 + } 218 + 219 + #[derive(Debug, Deserialize)] 220 + #[serde(rename_all = "camelCase")] 221 + pub struct GetPostThreadOtherV2Req { 222 + pub anchor: String, 223 + } 224 + 225 + #[derive(Debug, Serialize)] 226 + #[serde(rename_all = "camelCase")] 227 + pub struct GetPostThreadOtherV2Res { 228 + pub thread: Vec<ThreadV2Item>, 229 + } 230 + 231 + pub async fn get_post_thread_other_v2( 232 + State(state): State<GlobalState>, 233 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 234 + maybe_auth: Option<AtpAuth>, 235 + Query(query): Query<GetPostThreadOtherV2Req>, 236 + ) -> XrpcResult<Json<GetPostThreadOtherV2Res>> { 237 + let mut conn = state.pool.get().await?; 238 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 239 + 240 + let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 241 + 242 + let root = crate::db::get_root_post(&mut conn, &uri) 243 + .await? 244 + .unwrap_or(uri.clone()); 245 + 246 + // this only returns immediate children (depth==1) where hiddenByThreadgate=TRUE 247 + let replies = crate::db::get_thread_children_hidden(&mut conn, &uri, &root).await?; 248 + let reply_uris = replies 249 + .into_iter() 250 + .map(|item| item.at_uri) 251 + .collect::<Vec<_>>(); 252 + let thread = hyd 253 + .hydrate_posts(reply_uris) 254 + .await 255 + .into_iter() 256 + .filter(|(_, post)| match &post.author.viewer { 257 + Some(viewer) if viewer.blocked_by || viewer.blocking.is_some() => false, 258 + _ => true, 259 + }) 260 + .map(|(uri, post)| { 261 + let post = ThreadItemPost { 262 + post, 263 + more_parents: false, 264 + more_replies: 0, 265 + op_thread: false, 266 + hidden_by_threadgate: true, 267 + muted_by_viewer: false, 268 + }; 269 + 270 + ThreadV2Item { 271 + uri, 272 + depth: 1, 273 + value: ThreadV2ItemType::Post(post), 274 + } 275 + }) 276 + .collect(); 277 + 278 + Ok(Json(GetPostThreadOtherV2Res { thread })) 279 + } 280 + 281 + #[derive(Debug)] 282 + struct BuildThreadChildrenOpts<'a> { 283 + root_did: &'a str, 284 + sort: PostThreadSort, 285 + maybe_did: &'a Option<String>, 286 + max_depth: i32, 287 + } 288 + 289 + fn build_thread_children( 290 + grouped_replies: &mut HashMap<String, Vec<ThreadItem>>, 291 + replies_hyd: &mut HashMap<String, PostView>, 292 + hidden: &HashSet<String>, 293 + parent: &str, 294 + is_op_thread: bool, 295 + depth: i32, 296 + opts: &BuildThreadChildrenOpts, 297 + ) -> (Vec<ThreadV2Item>, bool) { 298 + let mut has_other_replies = false; 299 + 300 + let Some(replies) = grouped_replies.remove(parent) else { 301 + return (Vec::default(), has_other_replies); 302 + }; 303 + 304 + let replies = replies 305 + .into_iter() 306 + .filter_map(|item| replies_hyd.remove(&item.at_uri)) 307 + .sorted_by(sort_replies(&opts.sort)); 308 + 309 + let mut out = Vec::new(); 310 + 311 + for post in replies { 312 + let reply_count = grouped_replies 313 + .get(&post.uri) 314 + .map(|v| v.len()) 315 + .unwrap_or_default(); 316 + let at_max = depth == opts.max_depth; 317 + let more_replies = if at_max { reply_count } else { 0 }; 318 + let op_thread = is_op_thread && post.author.did == opts.root_did; 319 + 320 + // shouldn't push to the thread if there's a block relation. Bsky doesn't push a type of Blocked for replies... 321 + if let Some(v) = &post.author.viewer { 322 + if v.blocked_by || v.blocking.is_some() { 323 + continue; 324 + } 325 + } 326 + 327 + // check if the post is hidden AND we're NOT the author (hidden posts still show for their author) 328 + if hidden.contains(&post.uri) && !did_is_cur(opts.maybe_did, &post.author.did) { 329 + // post is hidden - do not ~pass go~ push to the thread. 330 + if depth == 1 { 331 + has_other_replies = true; 332 + } 333 + continue; 334 + } 335 + 336 + let uri = post.uri.clone(); 337 + out.push(ThreadV2Item { 338 + uri: post.uri.clone(), 339 + depth, 340 + value: ThreadV2ItemType::Post(ThreadItemPost { 341 + post, 342 + more_parents: false, 343 + more_replies: more_replies as i32, 344 + op_thread, 345 + hidden_by_threadgate: false, 346 + muted_by_viewer: false, 347 + }), 348 + }); 349 + 350 + if !at_max { 351 + // we don't care about has_other_replies when recursing 352 + let (children, _) = build_thread_children( 353 + grouped_replies, 354 + replies_hyd, 355 + hidden, 356 + &uri, 357 + op_thread, 358 + depth + 1, 359 + opts, 360 + ); 361 + 362 + out.extend(children); 363 + } 364 + } 365 + 366 + (out, has_other_replies) 367 + } 368 + 369 + fn sort_replies(sort: &PostThreadSort) -> impl Fn(&PostView, &PostView) -> Ordering + use<'_> { 370 + move |a: &PostView, b: &PostView| match sort { 371 + PostThreadSort::Newest => b.indexed_at.cmp(&a.indexed_at), 372 + PostThreadSort::Oldest => a.indexed_at.cmp(&b.indexed_at), 373 + PostThreadSort::Top => b.stats.like_count.cmp(&a.stats.like_count), 374 + } 375 + } 376 + 377 + fn did_is_cur(cur: &Option<String>, did: &String) -> bool { 378 + match cur { 379 + Some(cur) => did == cur, 380 + None => false, 381 + } 382 + }
+1 -1
parakeet/src/xrpc/community_lexicon/bookmarks.rs
··· 60 60 .into_iter() 61 61 .map(|bookmark| Bookmark { 62 62 subject: bookmark.subject, 63 - tags: bookmark.tags.into_iter().flatten().collect(), 63 + tags: bookmark.tags.into(), 64 64 created_at: bookmark.created_at, 65 65 }) 66 66 .collect();
+70 -12
parakeet-db/src/models.rs
··· 137 137 138 138 pub content: String, 139 139 pub facets: Option<serde_json::Value>, 140 - pub languages: Vec<Option<String>>, 141 - pub tags: Vec<Option<String>>, 140 + pub languages: not_null_vec::TextArray, 141 + pub tags: not_null_vec::TextArray, 142 142 143 143 pub parent_uri: Option<String>, 144 144 pub parent_cid: Option<String>, ··· 147 147 148 148 pub embed: Option<String>, 149 149 pub embed_subtype: Option<String>, 150 + 151 + pub mentions: Option<not_null_vec::TextArray>, 152 + pub violates_threadgate: bool, 150 153 151 154 pub created_at: DateTime<Utc>, 152 155 pub indexed_at: NaiveDateTime, ··· 233 236 pub cid: String, 234 237 pub post_uri: String, 235 238 236 - pub detached: Vec<Option<String>>, 237 - pub rules: Vec<Option<String>>, 239 + pub detached: not_null_vec::TextArray, 240 + pub rules: not_null_vec::TextArray, 238 241 239 242 pub created_at: DateTime<Utc>, 240 243 pub indexed_at: NaiveDateTime, ··· 249 252 pub cid: String, 250 253 pub post_uri: String, 251 254 252 - pub hidden_replies: Vec<Option<String>>, 253 - pub allow: Vec<Option<String>>, 254 - pub allowed_lists: Vec<Option<String>>, 255 + pub hidden_replies: not_null_vec::TextArray, 256 + pub allow: Option<not_null_vec::TextArray>, 257 + pub allowed_lists: Option<not_null_vec::TextArray>, 255 258 256 259 pub record: serde_json::Value, 257 260 ··· 273 276 pub description: Option<String>, 274 277 pub description_facets: Option<serde_json::Value>, 275 278 pub list: String, 276 - pub feeds: Option<Vec<Option<String>>>, 279 + pub feeds: Option<not_null_vec::TextArray>, 277 280 278 281 pub created_at: DateTime<Utc>, 279 282 pub indexed_at: NaiveDateTime, ··· 287 290 pub did: String, 288 291 pub cid: String, 289 292 290 - pub reasons: Option<Vec<Option<String>>>, 291 - pub subject_types: Option<Vec<Option<String>>>, 292 - pub subject_collections: Option<Vec<Option<String>>>, 293 + pub reasons: Option<not_null_vec::TextArray>, 294 + pub subject_types: Option<not_null_vec::TextArray>, 295 + pub subject_collections: Option<not_null_vec::TextArray>, 293 296 294 297 pub created_at: NaiveDateTime, 295 298 pub indexed_at: NaiveDateTime, ··· 399 402 pub subject: String, 400 403 pub subject_cid: Option<String>, 401 404 pub subject_type: String, 402 - pub tags: Vec<Option<String>>, 405 + pub tags: not_null_vec::TextArray, 403 406 pub created_at: DateTime<Utc>, 404 407 } 405 408 ··· 414 417 pub subject_type: &'a str, 415 418 pub tags: Vec<String>, 416 419 } 420 + 421 + #[derive(Debug, Queryable, Selectable, Identifiable)] 422 + #[diesel(table_name = crate::schema::author_feeds)] 423 + #[diesel(primary_key(uri))] 424 + #[diesel(check_for_backend(diesel::pg::Pg))] 425 + pub struct AuthorFeedItem { 426 + pub uri: String, 427 + pub cid: String, 428 + pub post: String, 429 + pub did: String, 430 + pub typ: String, 431 + pub sort_at: DateTime<Utc>, 432 + } 433 + 434 + pub use not_null_vec::TextArray; 435 + mod not_null_vec { 436 + use diesel::deserialize::FromSql; 437 + use diesel::pg::Pg; 438 + use diesel::sql_types::{Array, Nullable, Text}; 439 + use diesel::{deserialize, FromSqlRow}; 440 + use serde::{Deserialize, Serialize}; 441 + use std::ops::{Deref, DerefMut}; 442 + 443 + #[derive(Clone, Debug, Default, Serialize, Deserialize, FromSqlRow)] 444 + #[diesel(sql_type = Array<Nullable<Text>>)] 445 + pub struct TextArray(pub Vec<String>); 446 + 447 + impl FromSql<Array<Nullable<Text>>, Pg> for TextArray { 448 + fn from_sql(bytes: diesel::pg::PgValue<'_>) -> deserialize::Result<Self> { 449 + let vec_with_nulls = 450 + <Vec<Option<String>> as FromSql<Array<Nullable<Text>>, Pg>>::from_sql(bytes)?; 451 + Ok(TextArray(vec_with_nulls.into_iter().flatten().collect())) 452 + } 453 + } 454 + 455 + impl Deref for TextArray { 456 + type Target = Vec<String>; 457 + 458 + fn deref(&self) -> &Self::Target { 459 + &self.0 460 + } 461 + } 462 + 463 + impl DerefMut for TextArray { 464 + fn deref_mut(&mut self) -> &mut Self::Target { 465 + &mut self.0 466 + } 467 + } 468 + 469 + impl From<TextArray> for Vec<String> { 470 + fn from(v: TextArray) -> Vec<String> { 471 + v.0 472 + } 473 + } 474 + }
+16 -2
parakeet-db/src/schema.rs
··· 13 13 } 14 14 15 15 diesel::table! { 16 + author_feeds (uri) { 17 + uri -> Text, 18 + cid -> Text, 19 + post -> Text, 20 + did -> Text, 21 + typ -> Text, 22 + sort_at -> Timestamptz, 23 + } 24 + } 25 + 26 + diesel::table! { 16 27 backfill (repo, repo_ver) { 17 28 repo -> Text, 18 29 repo_ver -> Text, ··· 284 295 embed_subtype -> Nullable<Text>, 285 296 created_at -> Timestamptz, 286 297 indexed_at -> Timestamp, 298 + mentions -> Nullable<Array<Nullable<Text>>>, 299 + violates_threadgate -> Bool, 287 300 } 288 301 } 289 302 ··· 378 391 cid -> Text, 379 392 post_uri -> Text, 380 393 hidden_replies -> Array<Nullable<Text>>, 381 - allow -> Array<Nullable<Text>>, 382 - allowed_lists -> Array<Nullable<Text>>, 394 + allow -> Nullable<Array<Nullable<Text>>>, 395 + allowed_lists -> Nullable<Array<Nullable<Text>>>, 383 396 record -> Jsonb, 384 397 created_at -> Timestamptz, 385 398 indexed_at -> Timestamp, ··· 429 442 430 443 diesel::allow_tables_to_appear_in_same_query!( 431 444 actors, 445 + author_feeds, 432 446 backfill, 433 447 backfill_jobs, 434 448 blocks,