Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

Post Tweaks #1

closed opened by mia.omg.lol targeting main from post-tweaks

"tweaks" might be the understatement of the year. This branch contains:

  • a timing fix for postgates
  • working threadgates
  • storing #tags (and mentions) in the posts table
  • reposts in getAuthorFeed
  • huge feed hydration refactors to make getAuthorFeed work properly.
Labels

None yet.

component

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:63y3oh7iakdueqhlj6trojbq/sh.tangled.repo.pull/3m2wnnoxb3j22
+897 -35
Diff #0
+36 -5
consumer/src/backfill/repo.rs
··· 1 1 2 2 3 3 4 + }; 5 + use crate::indexer::records; 6 + use crate::indexer::types::{AggregateDeltaStore, RecordTypes}; 7 + use crate::utils::at_uri_is_by; 8 + use crate::{db, indexer}; 9 + use deadpool_postgres::Transaction; 10 + use ipld_core::cid::Cid; 4 11 5 12 6 13 ··· 135 142 136 143 137 144 145 + db::maintain_self_labels(t, did, Some(cid), &at_uri, labels).await?; 146 + } 147 + if let Some(embed) = rec.embed.clone().and_then(|embed| embed.into_bsky()) { 148 + db::post_embed_insert(t, &at_uri, embed, rec.created_at, true).await?; 149 + } 150 + 151 + deltas.incr(did, AggregateType::ProfilePost).await; 138 152 139 153 140 154 141 155 142 156 143 157 144 - db::maintain_self_labels(t, did, Some(cid), &at_uri, labels).await?; 145 - } 146 - if let Some(embed) = rec.embed.clone().and_then(|embed| embed.into_bsky()) { 147 - db::post_embed_insert(t, &at_uri, embed, rec.created_at).await?; 158 + 159 + 160 + 161 + 162 + 163 + 164 + 165 + 166 + 167 + .reposts 168 + .push((rkey.to_string(), rec.subject, rec.via, rec.created_at)); 169 + } 170 + RecordTypes::AppBskyFeedThreadgate(record) => { 171 + if !at_uri_is_by(&record.post, did) { 172 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 173 + return Ok(()); 148 174 } 149 175 150 - deltas.incr(did, AggregateType::ProfilePost).await; 176 + copies.push_record(&at_uri, cid); 177 + copies.threadgates.push((at_uri, cid, record)); 178 + } 179 + RecordTypes::AppBskyGraphBlock(rec) => { 180 + copies.push_record(&at_uri, cid); 181 + copies
+1 -1
consumer/src/indexer/mod.rs
··· 625 625 }); 626 626 627 627 let labels = record.labels.clone(); 628 - db::post_insert(conn, at_uri, repo, cid, record).await?; 628 + db::post_insert(conn, at_uri, repo, cid, record, false).await?; 629 629 if let Some(labels) = labels { 630 630 db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; 631 631 }
+45 -3
consumer/src/db/copy.rs
··· 1 1 use super::PgExecResult; 2 2 use crate::indexer::records; 3 - use crate::utils::strongref_to_parts; 3 + use crate::utils::{extract_mentions_and_tags, merge_tags, strongref_to_parts}; 4 4 use chrono::prelude::*; 5 5 use deadpool_postgres::Transaction; 6 6 use futures::pin_mut; ··· 119 119 .await 120 120 } 121 121 122 - const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, created_at) FROM STDIN (FORMAT binary)"; 122 + const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, mentions, created_at) FROM STDIN (FORMAT binary)"; 123 123 const POST_TYPES: &[Type] = &[ 124 124 Type::TEXT, 125 125 Type::TEXT, ··· 135 135 Type::TEXT, 136 136 Type::TEXT, 137 137 Type::TEXT, 138 + Type::TEXT_ARRAY, 138 139 Type::TIMESTAMP, 139 140 ]; 140 141 pub async fn copy_posts( ··· 159 160 160 161 for (at_uri, cid, post) in data { 161 162 let record = serde_json::to_value(&post).unwrap(); 163 + let (mentions, tags) = post 164 + .facets 165 + .as_ref() 166 + .map(|v| extract_mentions_and_tags(v)) 167 + .unzip(); 162 168 let facets = post.facets.and_then(|v| serde_json::to_value(v).ok()); 163 169 let embed = post.embed.as_ref().map(|v| v.as_str()); 164 170 let embed_subtype = post.embed.as_ref().and_then(|v| v.subtype()); 165 171 let (parent_uri, parent_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.parent)); 166 172 let (root_uri, root_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.root)); 167 173 174 + let tags = merge_tags(tags, post.tags); 175 + 168 176 let writer = writer.as_mut(); 169 177 writer 170 178 .write(&[ ··· 175 183 &post.text, 176 184 &facets, 177 185 &post.langs.unwrap_or_default(), 178 - &post.tags.unwrap_or_default(), 186 + &tags, 179 187 &parent_uri, 180 188 &parent_cid, 181 189 &root_uri, 182 190 &root_cid, 183 191 &embed, 184 192 &embed_subtype, 193 + &mentions, 185 194 &post.created_at.naive_utc(), 186 195 ]) 187 196 .await?; 197 + 198 + 199 + writer.finish().await?; 200 + 201 + let threadgated: Vec<(String, String, DateTime<Utc>)> = conn 202 + .query( 203 + "SELECT root_uri, p.at_uri, p.created_at FROM posts_tmp p INNER JOIN threadgates t ON root_uri = post_uri WHERE t.allow IS NOT NULL", 204 + &[], 205 + ) 206 + .await? 207 + .into_iter() 208 + .map(|v| (v.get(0), v.get(1), v.get(2))).collect(); 209 + 210 + for (root, post, created_at) in threadgated { 211 + match super::post_enforce_threadgate(conn, &root, did, created_at, true).await { 212 + Ok(true) => { 213 + conn.execute( 214 + "UPDATE posts_tmp SET violates_threadgate=TRUE WHERE at_uri=$1", 215 + &[&post], 216 + ) 217 + .await?; 218 + } 219 + Ok(false) => continue, 220 + Err(e) => { 221 + tracing::error!("failed to check threadgate enforcement: {e}"); 222 + continue; 223 + } 224 + } 225 + } 226 + 227 + conn.execute("INSERT INTO posts (SELECT * FROM posts_tmp)", &[]) 228 + .await 229 + }
+2 -2
consumer/src/db/sql/post_insert.sql
··· 1 1 INSERT INTO posts (at_uri, did, cid, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, 2 - root_cid, embed, embed_subtype, created_at) 3 - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) 2 + root_cid, embed, embed_subtype, mentions, violates_threadgate, created_at) 3 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) 4 4 ON CONFLICT DO NOTHING
+31
consumer/src/utils.rs
··· 1 + use lexica::app_bsky::richtext::{Facet, FacetMain, FacetOuter}; 1 2 use lexica::{Blob, StrongRef}; 2 3 use serde::{Deserialize, Deserializer}; 3 4 ··· 39 40 40 41 did == split_aturi[2] 41 42 } 43 + 44 + pub fn extract_mentions_and_tags(from: &[FacetMain]) -> (Vec<String>, Vec<String>) { 45 + let (mentions, tags) = from 46 + .iter() 47 + .flat_map(|v| { 48 + v.features.iter().map(|facet| match facet { 49 + FacetOuter::Bsky(Facet::Mention { did }) => (Some(did), None), 50 + FacetOuter::Bsky(Facet::Tag { tag }) => (None, Some(tag)), 51 + _ => (None, None), 52 + }) 53 + }) 54 + .unzip::<_, _, Vec<_>, Vec<_>>(); 55 + 56 + let mentions = mentions.into_iter().flatten().cloned().collect(); 57 + let tags = tags.into_iter().flatten().cloned().collect(); 58 + 59 + (mentions, tags) 60 + } 61 + 62 + pub fn merge_tags<T>(t1: Option<Vec<T>>, t2: Option<Vec<T>>) -> Vec<T> { 63 + match (t1, t2) { 64 + (Some(t1), None) => t1, 65 + (None, Some(t2)) => t2, 66 + (Some(mut t1), Some(t2)) => { 67 + t1.extend(t2); 68 + t1 69 + } 70 + _ => Vec::default(), 71 + } 72 + }
+15
migrations/2025-09-27-171241_post-tweaks/down.sql
··· 1 + alter table posts 2 + drop column mentions, 3 + drop column violates_threadgate; 4 + 5 + drop trigger t_author_feed_ins_post on posts; 6 + drop trigger t_author_feed_del_post on posts; 7 + drop trigger t_author_feed_ins_repost on reposts; 8 + drop trigger t_author_feed_del_repost on reposts; 9 + 10 + drop function f_author_feed_ins_post; 11 + drop function f_author_feed_del_post; 12 + drop function f_author_feed_ins_repost; 13 + drop function f_author_feed_del_repost; 14 + 15 + drop table author_feeds;
+79
migrations/2025-09-27-171241_post-tweaks/up.sql
··· 1 + alter table posts 2 + add column mentions text[], 3 + add column violates_threadgate bool not null default false; 4 + 5 + create table author_feeds 6 + ( 7 + uri text primary key, 8 + cid text not null, 9 + post text not null, 10 + did text not null, 11 + typ text not null, 12 + sort_at timestamptz not null 13 + ); 14 + 15 + -- author_feeds post triggers 16 + create function f_author_feed_ins_post() returns trigger 17 + language plpgsql as 18 + $$ 19 + begin 20 + insert into author_feeds (uri, cid, post, did, typ, sort_at) 21 + VALUES (NEW.at_uri, NEW.cid, NEW.at_uri, NEW.did, 'post', NEW.created_at) 22 + on conflict do nothing; 23 + return NEW; 24 + end; 25 + $$; 26 + 27 + create trigger t_author_feed_ins_post 28 + before insert 29 + on posts 30 + for each row 31 + execute procedure f_author_feed_ins_post(); 32 + 33 + create function f_author_feed_del_post() returns trigger 34 + language plpgsql as 35 + $$ 36 + begin 37 + delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post'; 38 + return OLD; 39 + end; 40 + $$; 41 + 42 + create trigger t_author_feed_del_post 43 + before delete 44 + on posts 45 + for each row 46 + execute procedure f_author_feed_del_post(); 47 + 48 + -- author_feeds repost triggers 49 + create function f_author_feed_ins_repost() returns trigger 50 + language plpgsql as 51 + $$ 52 + begin 53 + insert into author_feeds (uri, cid, post, did, typ, sort_at) 54 + VALUES ('at://' || NEW.did || 'app.bsky.feed.repost' || NEW.rkey, NEW.post_cid, NEW.post, NEW.did, 'repost', NEW.created_at) 55 + on conflict do nothing; 56 + return NEW; 57 + end; 58 + $$; 59 + 60 + create trigger t_author_feed_ins_repost 61 + before insert 62 + on reposts 63 + for each row 64 + execute procedure f_author_feed_ins_repost(); 65 + 66 + create function f_author_feed_del_repost() returns trigger 67 + language plpgsql as 68 + $$ 69 + begin 70 + delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost'; 71 + return OLD; 72 + end; 73 + $$; 74 + 75 + create trigger t_author_feed_del_repost 76 + before delete 77 + on reposts 78 + for each row 79 + execute procedure f_author_feed_del_repost();
+163 -4
parakeet-db/src/schema.rs
··· 9 9 10 10 11 11 12 + } 13 + } 12 14 15 + diesel::table! { 16 + author_feeds (uri) { 17 + uri -> Text, 18 + cid -> Text, 19 + post -> Text, 20 + did -> Text, 21 + typ -> Text, 22 + sort_at -> Timestamptz, 23 + } 24 + } 13 25 14 - 15 - 16 - 17 - 26 + diesel::table! { 27 + backfill (repo, repo_ver) { 28 + repo -> Text, 18 29 19 30 20 31 ··· 284 295 embed_subtype -> Nullable<Text>, 285 296 created_at -> Timestamptz, 286 297 indexed_at -> Timestamp, 298 + mentions -> Nullable<Array<Nullable<Text>>>, 299 + violates_threadgate -> Bool, 287 300 } 288 301 } 289 302 303 + 304 + 305 + 306 + 307 + 308 + 309 + 310 + 311 + 312 + 313 + 314 + 315 + 316 + 317 + 318 + 319 + 320 + 321 + 322 + 323 + 324 + 325 + 326 + 327 + 328 + 329 + 330 + 331 + 332 + 333 + 334 + 335 + 336 + 337 + 338 + 339 + 340 + 341 + 342 + 343 + 344 + 345 + 346 + 347 + 348 + 349 + 350 + 351 + 352 + 353 + 354 + 355 + 356 + 357 + 358 + 359 + 360 + 361 + 362 + 363 + 364 + 365 + 366 + 367 + 368 + 369 + 370 + 371 + 372 + 373 + 374 + 375 + 376 + 377 + 378 + 379 + 380 + 381 + 382 + 383 + 384 + 385 + 386 + 387 + 388 + 389 + 390 + 391 + 392 + 393 + 394 + 395 + 396 + 397 + 398 + 399 + 400 + 401 + 402 + 403 + 404 + 405 + 406 + 407 + 408 + 409 + 410 + 411 + 412 + 413 + 414 + 415 + 416 + 417 + 418 + 419 + 420 + 421 + 422 + 423 + 424 + 425 + 426 + 427 + 428 + 429 + 430 + 431 + 432 + 433 + 434 + 435 + 436 + 437 + 438 + 439 + 440 + 441 + 442 + 443 + diesel::allow_tables_to_appear_in_same_query!( 444 + actors, 445 + author_feeds, 446 + backfill, 447 + backfill_jobs, 448 + blocks,
+208
consumer/src/db/gates.rs
··· 1 + use super::{PgExecResult, PgResult}; 2 + use crate::indexer::records::{ 3 + AppBskyFeedThreadgate, ThreadgateRule, THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING, 4 + THREADGATE_RULE_LIST, THREADGATE_RULE_MENTION, 5 + }; 6 + use chrono::prelude::*; 7 + use chrono::{DateTime, Utc}; 8 + use deadpool_postgres::GenericClient; 9 + use std::collections::HashSet; 10 + 11 + pub async fn post_enforce_threadgate<C: GenericClient>( 12 + conn: &mut C, 13 + root: &str, 14 + post_author: &str, 15 + post_created_at: DateTime<Utc>, 16 + is_backfill: bool, 17 + ) -> PgResult<bool> { 18 + // check if the root and the current post are the same author 19 + // strip "at://" then break into parts by '/' 20 + let parts = root[5..].split('/').collect::<Vec<_>>(); 21 + let root_author = parts[0]; 22 + if root_author == post_author { 23 + return Ok(false); 24 + } 25 + 26 + let tg_data = super::threadgate_get(conn, root).await?; 27 + 28 + let Some((created_at, allow, allow_lists)) = tg_data else { 29 + return Ok(false); 30 + }; 31 + 32 + // when backfilling, there's no point continuing if the record is dated before the threadgate 33 + if is_backfill && post_created_at < created_at { 34 + return Ok(false); 35 + } 36 + 37 + if allow.is_empty() { 38 + return Ok(true); 39 + } 40 + 41 + let allow: HashSet<String> = HashSet::from_iter(allow); 42 + 43 + if allow.contains(THREADGATE_RULE_FOLLOWER) || allow.contains(THREADGATE_RULE_FOLLOWING) { 44 + let profile_state: Option<(bool, bool)> = conn 45 + .query_opt( 46 + "SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2", 47 + &[&root_author, &post_author], 48 + ) 49 + .await? 50 + .map(|v| (v.get(0), v.get(1))); 51 + 52 + if let Some((following, followed)) = profile_state { 53 + if allow.contains(THREADGATE_RULE_FOLLOWER) && followed { 54 + return Ok(false); 55 + } 56 + 57 + if allow.contains(THREADGATE_RULE_FOLLOWING) && following { 58 + return Ok(false); 59 + } 60 + } 61 + } 62 + 63 + // check mentions 64 + if allow.contains(THREADGATE_RULE_MENTION) { 65 + let mentions: Vec<String> = conn 66 + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 67 + .await? 68 + .map(|r| r.get(0)) 69 + .unwrap_or_default(); 70 + 71 + if mentions.contains(&post_author.to_owned()) { 72 + return Ok(false); 73 + } 74 + } 75 + 76 + if allow.contains(THREADGATE_RULE_LIST) { 77 + if allow_lists.is_empty() { 78 + return Ok(true); 79 + } 80 + 81 + let count: i64 = conn 82 + .query_one( 83 + "SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2", 84 + &[&allow_lists, &post_author], 85 + ) 86 + .await? 87 + .get(0); 88 + if count != 0 { 89 + return Ok(false); 90 + } 91 + } 92 + 93 + Ok(true) 94 + } 95 + 96 + pub async fn postgate_maintain_detaches<C: GenericClient>( 97 + conn: &mut C, 98 + post: &str, 99 + detached: &[String], 100 + disable_effective: Option<NaiveDateTime>, 101 + ) -> PgExecResult { 102 + conn.execute( 103 + "SELECT maintain_postgates($1, $2, $3)", 104 + &[&post, &detached, &disable_effective], 105 + ) 106 + .await 107 + } 108 + 109 + // variant of post_enforce_threadgate that runs when backfilling to clean up any posts already in DB 110 + pub async fn threadgate_enforce_backfill<C: GenericClient>( 111 + conn: &mut C, 112 + root_author: &str, 113 + threadgate: &AppBskyFeedThreadgate, 114 + ) -> PgExecResult { 115 + // pull out allow - if it's None we can skip this gate. 116 + let Some(allow) = threadgate.allow.as_ref() else { 117 + return Ok(0); 118 + }; 119 + 120 + let root = &threadgate.post; 121 + 122 + if allow.is_empty() { 123 + // blind update everything 124 + return conn.execute( 125 + "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri=$1 AND did != $2 AND created_at >= $3", 126 + &[&root, &root_author, &threadgate.created_at], 127 + ).await; 128 + } 129 + 130 + // pull authors with our root_uri where the author is not the root author and are dated after created_at 131 + // this is mutable because we'll remove ALLOWED dids 132 + let mut dids: HashSet<String> = conn 133 + .query( 134 + "SELECT DISTINCT did FROM posts WHERE root_uri=$1 AND did != $2 AND created_at >= $3", 135 + &[&root, &root_author, &threadgate.created_at], 136 + ) 137 + .await? 138 + .into_iter() 139 + .map(|row| row.get(0)) 140 + .collect(); 141 + 142 + // this will be empty if there are no replies. 143 + if dids.is_empty() { 144 + return Ok(0); 145 + } 146 + 147 + let allowed_lists = allow 148 + .iter() 149 + .filter_map(|rule| match rule { 150 + ThreadgateRule::List { list } => Some(list), 151 + _ => None, 152 + }) 153 + .collect::<Vec<_>>(); 154 + 155 + let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str())); 156 + 157 + if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() { 158 + let current_dids: Vec<_> = dids.iter().collect(); 159 + 160 + let res = conn.query( 161 + "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", 162 + &[&root_author, &current_dids] 163 + ).await?; 164 + 165 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 166 + } 167 + 168 + if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { 169 + let current_dids: Vec<_> = dids.iter().collect(); 170 + 171 + let res = conn.query( 172 + "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", 173 + &[&root_author, &current_dids] 174 + ).await?; 175 + 176 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 177 + } 178 + 179 + if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { 180 + let mentions: Vec<String> = conn 181 + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 182 + .await? 183 + .map(|r| r.get(0)) 184 + .unwrap_or_default(); 185 + 186 + dids = &dids - &HashSet::from_iter(mentions); 187 + } 188 + 189 + if allow.contains(THREADGATE_RULE_LIST) && !dids.is_empty() { 190 + let current_dids: Vec<_> = dids.iter().collect(); 191 + 192 + let res = conn 193 + .query( 194 + "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", 195 + &[&allowed_lists, &current_dids], 196 + ) 197 + .await?; 198 + 199 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 200 + } 201 + 202 + let dids = dids.into_iter().collect::<Vec<_>>(); 203 + 204 + conn.execute( 205 + "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri = $1 AND did = ANY($2) AND created_at >= $3", 206 + &[&threadgate.post, &dids, &threadgate.created_at] 207 + ).await 208 + }
+2
consumer/src/db/mod.rs
··· 7 7 mod actor; 8 8 mod backfill; 9 9 pub mod copy; 10 + mod gates; 10 11 mod labels; 11 12 mod record; 12 13 13 14 pub use actor::*; 14 15 pub use backfill::*; 16 + pub use gates::*; 15 17 pub use labels::*; 16 18 pub use record::*;
+9 -4
consumer/src/indexer/records.rs
··· 272 272 pub hidden_replies: Vec<String>, 273 273 } 274 274 275 + pub const THREADGATE_RULE_MENTION: &str = "app.bsky.feed.threadgate#mentionRule"; 276 + pub const THREADGATE_RULE_FOLLOWER: &str = "app.bsky.feed.threadgate#followerRule"; 277 + pub const THREADGATE_RULE_FOLLOWING: &str = "app.bsky.feed.threadgate#followingRule"; 278 + pub const THREADGATE_RULE_LIST: &str = "app.bsky.feed.threadgate#listRule"; 279 + 275 280 #[derive(Debug, Deserialize, Serialize)] 276 281 #[serde(tag = "$type")] 277 282 pub enum ThreadgateRule { ··· 288 293 impl ThreadgateRule { 289 294 pub fn as_str(&self) -> &'static str { 290 295 match self { 291 - ThreadgateRule::Mention => "app.bsky.feed.threadgate#mentionRule", 292 - ThreadgateRule::Follower => "app.bsky.feed.threadgate#followerRule", 293 - ThreadgateRule::Following => "app.bsky.feed.threadgate#followingRule", 294 - ThreadgateRule::List { .. } => "app.bsky.feed.threadgate#listRule", 296 + ThreadgateRule::Mention => THREADGATE_RULE_MENTION, 297 + ThreadgateRule::Follower => THREADGATE_RULE_FOLLOWER, 298 + ThreadgateRule::Following => THREADGATE_RULE_FOLLOWING, 299 + ThreadgateRule::List { .. } => THREADGATE_RULE_LIST, 295 300 } 296 301 } 297 302 }
+6 -1
consumer/src/backfill/mod.rs
··· 275 275 follows: Vec<(String, String, DateTime<Utc>)>, 276 276 list_items: Vec<(String, records::AppBskyGraphListItem)>, 277 277 verifications: Vec<(String, Cid, records::AppBskyGraphVerification)>, 278 + threadgates: Vec<(String, Cid, records::AppBskyFeedThreadgate)>, // not COPY'd but needs to be kept until last. 278 279 records: Vec<(String, Cid)>, 279 280 } 280 281 281 282 impl CopyStore { 282 283 async fn submit(self, t: &mut Transaction<'_>, did: &str) -> Result<(), tokio_postgres::Error> { 283 284 db::copy::copy_likes(t, did, self.likes).await?; 284 - db::copy::copy_posts(t, did, self.posts).await?; 285 285 db::copy::copy_reposts(t, did, self.reposts).await?; 286 286 db::copy::copy_blocks(t, did, self.blocks).await?; 287 287 db::copy::copy_follows(t, did, self.follows).await?; 288 288 db::copy::copy_list_items(t, self.list_items).await?; 289 289 db::copy::copy_verification(t, did, self.verifications).await?; 290 + db::copy::copy_posts(t, did, self.posts).await?; 291 + for (at_uri, cid, record) in self.threadgates { 292 + db::threadgate_enforce_backfill(t, did, &record).await?; 293 + db::threadgate_upsert(t, &at_uri, cid, record).await?; 294 + } 290 295 db::copy::copy_records(t, did, self.records).await?; 291 296 292 297 Ok(())
+279
parakeet-db/src/models.rs
··· 148 148 pub embed: Option<String>, 149 149 pub embed_subtype: Option<String>, 150 150 151 + pub mentions: Option<Vec<Option<String>>>, 152 + pub violates_threadgate: bool, 153 + 151 154 pub created_at: DateTime<Utc>, 152 155 pub indexed_at: NaiveDateTime, 153 156 } 157 + 158 + 159 + 160 + 161 + 162 + 163 + 164 + 165 + 166 + 167 + 168 + 169 + 170 + 171 + 172 + 173 + 174 + 175 + 176 + 177 + 178 + 179 + 180 + 181 + 182 + 183 + 184 + 185 + 186 + 187 + 188 + 189 + 190 + 191 + 192 + 193 + 194 + 195 + 196 + 197 + 198 + 199 + 200 + 201 + 202 + 203 + 204 + 205 + 206 + 207 + 208 + 209 + 210 + 211 + 212 + 213 + 214 + 215 + 216 + 217 + 218 + 219 + 220 + 221 + 222 + 223 + 224 + 225 + 226 + 227 + 228 + 229 + 230 + 231 + 232 + 233 + 234 + 235 + 236 + 237 + 238 + 239 + 240 + 241 + 242 + 243 + 244 + 245 + 246 + 247 + 248 + 249 + 250 + 251 + 252 + 253 + 254 + 255 + 256 + 257 + 258 + 259 + 260 + 261 + 262 + 263 + 264 + 265 + 266 + 267 + 268 + 269 + 270 + 271 + 272 + 273 + 274 + 275 + 276 + 277 + 278 + 279 + 280 + 281 + 282 + 283 + 284 + 285 + 286 + 287 + 288 + 289 + 290 + 291 + 292 + 293 + 294 + 295 + 296 + 297 + 298 + 299 + 300 + 301 + 302 + 303 + 304 + 305 + 306 + 307 + 308 + 309 + 310 + 311 + 312 + 313 + 314 + 315 + 316 + 317 + 318 + 319 + 320 + 321 + 322 + 323 + 324 + 325 + 326 + 327 + 328 + 329 + 330 + 331 + 332 + 333 + 334 + 335 + 336 + 337 + 338 + 339 + 340 + 341 + 342 + 343 + 344 + 345 + 346 + 347 + 348 + 349 + 350 + 351 + 352 + 353 + 354 + 355 + 356 + 357 + 358 + 359 + 360 + 361 + 362 + 363 + 364 + 365 + 366 + 367 + 368 + 369 + 370 + 371 + 372 + 373 + 374 + 375 + 376 + 377 + 378 + 379 + 380 + 381 + 382 + 383 + 384 + 385 + 386 + 387 + 388 + 389 + 390 + 391 + 392 + 393 + 394 + 395 + 396 + 397 + 398 + 399 + 400 + 401 + 402 + 403 + 404 + 405 + 406 + 407 + 408 + 409 + 410 + 411 + 412 + 413 + 414 + 415 + 416 + 417 + pub subject_type: &'a str, 418 + pub tags: Vec<String>, 419 + } 420 + 421 + #[derive(Debug, Queryable, Selectable, Identifiable)] 422 + #[diesel(table_name = crate::schema::author_feeds)] 423 + #[diesel(primary_key(uri))] 424 + #[diesel(check_for_backend(diesel::pg::Pg))] 425 + pub struct AuthorFeedItem { 426 + pub uri: String, 427 + pub cid: String, 428 + pub post: String, 429 + pub did: String, 430 + pub typ: String, 431 + pub sort_at: DateTime<Utc>, 432 + }
+2 -2
parakeet/src/sql/thread.sql
··· 1 1 with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth 2 2 from posts 3 - where parent_uri = $1 3 + where parent_uri = $1 and violates_threadgate=FALSE 4 4 union all 5 5 select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1 6 6 from posts p 7 7 join thread on p.parent_uri = thread.at_uri 8 - where thread.depth <= $2) 8 + where thread.depth <= $2 and p.violates_threadgate=FALSE) 9 9 select * 10 10 from thread 11 11 order by depth desc;
+4 -2
parakeet/src/sql/thread_parent.sql
··· 1 1 with recursive parents as (select at_uri, cid, parent_uri, root_uri, 0 as depth 2 2 from posts 3 - where at_uri = (select parent_uri from posts where at_uri = $1) 3 + where 4 + at_uri = (select parent_uri from posts where at_uri = $1 and violates_threadgate = FALSE) 4 5 union all 5 6 select p.at_uri, p.cid, p.parent_uri, p.root_uri, parents.depth + 1 6 7 from posts p 7 8 join parents on p.at_uri = parents.parent_uri 8 - where parents.depth <= $2) 9 + where parents.depth <= $2 10 + and p.violates_threadgate = FALSE) 9 11 select * 10 12 from parents 11 13 order by depth desc;
+15 -11
parakeet/src/xrpc/app_bsky/feed/likes.rs
··· 1 + use crate::hydration::posts::RawFeedItem; 2 + use crate::hydration::StatefulHydrator; 3 + use crate::xrpc::error::{Error, XrpcResult}; 4 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 1 5 2 6 3 7 ··· 51 55 52 56 53 57 58 + .last() 59 + .map(|(last, _)| last.timestamp_millis().to_string()); 54 60 55 - 56 - 57 - 58 - 59 - 60 - 61 - 62 - .map(|(_, uri)| uri.clone()) 61 + let raw_feed = results 62 + .iter() 63 + .map(|(_, uri)| RawFeedItem::Post { 64 + uri: uri.clone(), 65 + context: None, 66 + }) 63 67 .collect::<Vec<_>>(); 64 68 65 - let mut posts = hyd.hydrate_feed_posts(at_uris).await; 69 + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; 66 70 67 - let feed: Vec<_> = results 68 - .into_iter() 71 + Ok(Json(FeedRes { cursor, feed })) 72 + }

History

1 round 0 comments
sign up or login to add to the discussion
mia.omg.lol submitted #0
15 commits
expand
postgates: trust provided timestamp only when backfilling
store mentions and tags from facets in DB
clippy
store if a post violates a threadgate
move postgate and threadgate enforcement into their own file
default deny threadgates
fix list allows
remove accidental dbg!
consts for threadgate rules
correct threadgate backfilling
update models
enforce threadgates
show reposts in author feeds
make author feeds work like bluesky
the big feed refactor
expand 0 comments
closed without merging