Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

Post Tweaks #1

closed opened by mia.omg.lol targeting main from post-tweaks

"tweaks" might be the understatement of the year. This branch contains:

  • a timing fix for postgates
  • working threadgates
  • storing #tags (and mentions) in the posts table
  • reposts in getAuthorFeed
  • huge feed hydration refactors to make getAuthorFeed work properly.
Labels

None yet.

component

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:63y3oh7iakdueqhlj6trojbq/sh.tangled.repo.pull/3m2wnnoxb3j22
+897 -35
Diff #0
+36 -5
consumer/src/backfill/repo.rs
··· 1 2 3 4 5 6 ··· 135 136 137 138 139 140 141 142 143 144 - db::maintain_self_labels(t, did, Some(cid), &at_uri, labels).await?; 145 - } 146 - if let Some(embed) = rec.embed.clone().and_then(|embed| embed.into_bsky()) { 147 - db::post_embed_insert(t, &at_uri, embed, rec.created_at).await?; 148 } 149 150 - deltas.incr(did, AggregateType::ProfilePost).await;
··· 1 2 3 4 + }; 5 + use crate::indexer::records; 6 + use crate::indexer::types::{AggregateDeltaStore, RecordTypes}; 7 + use crate::utils::at_uri_is_by; 8 + use crate::{db, indexer}; 9 + use deadpool_postgres::Transaction; 10 + use ipld_core::cid::Cid; 11 12 13 ··· 142 143 144 145 + db::maintain_self_labels(t, did, Some(cid), &at_uri, labels).await?; 146 + } 147 + if let Some(embed) = rec.embed.clone().and_then(|embed| embed.into_bsky()) { 148 + db::post_embed_insert(t, &at_uri, embed, rec.created_at, true).await?; 149 + } 150 + 151 + deltas.incr(did, AggregateType::ProfilePost).await; 152 153 154 155 156 157 158 + 159 + 160 + 161 + 162 + 163 + 164 + 165 + 166 + 167 + .reposts 168 + .push((rkey.to_string(), rec.subject, rec.via, rec.created_at)); 169 + } 170 + RecordTypes::AppBskyFeedThreadgate(record) => { 171 + if !at_uri_is_by(&record.post, did) { 172 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 173 + return Ok(()); 174 } 175 176 + copies.push_record(&at_uri, cid); 177 + copies.threadgates.push((at_uri, cid, record)); 178 + } 179 + RecordTypes::AppBskyGraphBlock(rec) => { 180 + copies.push_record(&at_uri, cid); 181 + copies
+1 -1
consumer/src/indexer/mod.rs
··· 625 }); 626 627 let labels = record.labels.clone(); 628 - db::post_insert(conn, at_uri, repo, cid, record).await?; 629 if let Some(labels) = labels { 630 db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; 631 }
··· 625 }); 626 627 let labels = record.labels.clone(); 628 + db::post_insert(conn, at_uri, repo, cid, record, false).await?; 629 if let Some(labels) = labels { 630 db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; 631 }
+45 -3
consumer/src/db/copy.rs
··· 1 use super::PgExecResult; 2 use crate::indexer::records; 3 - use crate::utils::strongref_to_parts; 4 use chrono::prelude::*; 5 use deadpool_postgres::Transaction; 6 use futures::pin_mut; ··· 119 .await 120 } 121 122 - const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, created_at) FROM STDIN (FORMAT binary)"; 123 const POST_TYPES: &[Type] = &[ 124 Type::TEXT, 125 Type::TEXT, ··· 135 Type::TEXT, 136 Type::TEXT, 137 Type::TEXT, 138 Type::TIMESTAMP, 139 ]; 140 pub async fn copy_posts( ··· 159 160 for (at_uri, cid, post) in data { 161 let record = serde_json::to_value(&post).unwrap(); 162 let facets = post.facets.and_then(|v| serde_json::to_value(v).ok()); 163 let embed = post.embed.as_ref().map(|v| v.as_str()); 164 let embed_subtype = post.embed.as_ref().and_then(|v| v.subtype()); 165 let (parent_uri, parent_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.parent)); 166 let (root_uri, root_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.root)); 167 168 let writer = writer.as_mut(); 169 writer 170 .write(&[ ··· 175 &post.text, 176 &facets, 177 &post.langs.unwrap_or_default(), 178 - &post.tags.unwrap_or_default(), 179 &parent_uri, 180 &parent_cid, 181 &root_uri, 182 &root_cid, 183 &embed, 184 &embed_subtype, 185 &post.created_at.naive_utc(), 186 ]) 187 .await?;
··· 1 use super::PgExecResult; 2 use crate::indexer::records; 3 + use crate::utils::{extract_mentions_and_tags, merge_tags, strongref_to_parts}; 4 use chrono::prelude::*; 5 use deadpool_postgres::Transaction; 6 use futures::pin_mut; ··· 119 .await 120 } 121 122 + const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, mentions, created_at) FROM STDIN (FORMAT binary)"; 123 const POST_TYPES: &[Type] = &[ 124 Type::TEXT, 125 Type::TEXT, ··· 135 Type::TEXT, 136 Type::TEXT, 137 Type::TEXT, 138 + Type::TEXT_ARRAY, 139 Type::TIMESTAMP, 140 ]; 141 pub async fn copy_posts( ··· 160 161 for (at_uri, cid, post) in data { 162 let record = serde_json::to_value(&post).unwrap(); 163 + let (mentions, tags) = post 164 + .facets 165 + .as_ref() 166 + .map(|v| extract_mentions_and_tags(v)) 167 + .unzip(); 168 let facets = post.facets.and_then(|v| serde_json::to_value(v).ok()); 169 let embed = post.embed.as_ref().map(|v| v.as_str()); 170 let embed_subtype = post.embed.as_ref().and_then(|v| v.subtype()); 171 let (parent_uri, parent_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.parent)); 172 let (root_uri, root_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.root)); 173 174 + let tags = merge_tags(tags, post.tags); 175 + 176 let writer = writer.as_mut(); 177 writer 178 .write(&[ ··· 183 &post.text, 184 &facets, 185 &post.langs.unwrap_or_default(), 186 + &tags, 187 &parent_uri, 188 &parent_cid, 189 &root_uri, 190 &root_cid, 191 &embed, 192 &embed_subtype, 193 + &mentions, 194 &post.created_at.naive_utc(), 195 ]) 196 .await?; 197 + 198 + 199 + writer.finish().await?; 200 + 201 + let threadgated: Vec<(String, String, DateTime<Utc>)> = conn 202 + .query( 203 + "SELECT root_uri, p.at_uri, p.created_at FROM posts_tmp p INNER JOIN threadgates t ON root_uri = post_uri WHERE t.allow IS NOT NULL", 204 + &[], 205 + ) 206 + .await? 207 + .into_iter() 208 + .map(|v| (v.get(0), v.get(1), v.get(2))).collect(); 209 + 210 + for (root, post, created_at) in threadgated { 211 + match super::post_enforce_threadgate(conn, &root, did, created_at, true).await { 212 + Ok(true) => { 213 + conn.execute( 214 + "UPDATE posts_tmp SET violates_threadgate=TRUE WHERE at_uri=$1", 215 + &[&post], 216 + ) 217 + .await?; 218 + } 219 + Ok(false) => continue, 220 + Err(e) => { 221 + tracing::error!("failed to check threadgate enforcement: {e}"); 222 + continue; 223 + } 224 + } 225 + } 226 + 227 + conn.execute("INSERT INTO posts (SELECT * FROM posts_tmp)", &[]) 228 + .await 229 + }
+2 -2
consumer/src/db/sql/post_insert.sql
··· 1 INSERT INTO posts (at_uri, did, cid, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, 2 - root_cid, embed, embed_subtype, created_at) 3 - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) 4 ON CONFLICT DO NOTHING
··· 1 INSERT INTO posts (at_uri, did, cid, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, 2 + root_cid, embed, embed_subtype, mentions, violates_threadgate, created_at) 3 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) 4 ON CONFLICT DO NOTHING
+31
consumer/src/utils.rs
··· 1 use lexica::{Blob, StrongRef}; 2 use serde::{Deserialize, Deserializer}; 3 ··· 39 40 did == split_aturi[2] 41 }
··· 1 + use lexica::app_bsky::richtext::{Facet, FacetMain, FacetOuter}; 2 use lexica::{Blob, StrongRef}; 3 use serde::{Deserialize, Deserializer}; 4 ··· 40 41 did == split_aturi[2] 42 } 43 + 44 + pub fn extract_mentions_and_tags(from: &[FacetMain]) -> (Vec<String>, Vec<String>) { 45 + let (mentions, tags) = from 46 + .iter() 47 + .flat_map(|v| { 48 + v.features.iter().map(|facet| match facet { 49 + FacetOuter::Bsky(Facet::Mention { did }) => (Some(did), None), 50 + FacetOuter::Bsky(Facet::Tag { tag }) => (None, Some(tag)), 51 + _ => (None, None), 52 + }) 53 + }) 54 + .unzip::<_, _, Vec<_>, Vec<_>>(); 55 + 56 + let mentions = mentions.into_iter().flatten().cloned().collect(); 57 + let tags = tags.into_iter().flatten().cloned().collect(); 58 + 59 + (mentions, tags) 60 + } 61 + 62 + pub fn merge_tags<T>(t1: Option<Vec<T>>, t2: Option<Vec<T>>) -> Vec<T> { 63 + match (t1, t2) { 64 + (Some(t1), None) => t1, 65 + (None, Some(t2)) => t2, 66 + (Some(mut t1), Some(t2)) => { 67 + t1.extend(t2); 68 + t1 69 + } 70 + _ => Vec::default(), 71 + } 72 + }
+15
migrations/2025-09-27-171241_post-tweaks/down.sql
···
··· 1 + alter table posts 2 + drop column mentions, 3 + drop column violates_threadgate; 4 + 5 + drop trigger t_author_feed_ins_post on posts; 6 + drop trigger t_author_feed_del_post on posts; 7 + drop trigger t_author_feed_ins_repost on reposts; 8 + drop trigger t_author_feed_del_repost on reposts; 9 + 10 + drop function f_author_feed_ins_post; 11 + drop function f_author_feed_del_post; 12 + drop function f_author_feed_ins_repost; 13 + drop function f_author_feed_del_repost; 14 + 15 + drop table author_feeds;
+79
migrations/2025-09-27-171241_post-tweaks/up.sql
···
··· 1 + alter table posts 2 + add column mentions text[], 3 + add column violates_threadgate bool not null default false; 4 + 5 + create table author_feeds 6 + ( 7 + uri text primary key, 8 + cid text not null, 9 + post text not null, 10 + did text not null, 11 + typ text not null, 12 + sort_at timestamptz not null 13 + ); 14 + 15 + -- author_feeds post triggers 16 + create function f_author_feed_ins_post() returns trigger 17 + language plpgsql as 18 + $$ 19 + begin 20 + insert into author_feeds (uri, cid, post, did, typ, sort_at) 21 + VALUES (NEW.at_uri, NEW.cid, NEW.at_uri, NEW.did, 'post', NEW.created_at) 22 + on conflict do nothing; 23 + return NEW; 24 + end; 25 + $$; 26 + 27 + create trigger t_author_feed_ins_post 28 + before insert 29 + on posts 30 + for each row 31 + execute procedure f_author_feed_ins_post(); 32 + 33 + create function f_author_feed_del_post() returns trigger 34 + language plpgsql as 35 + $$ 36 + begin 37 + delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post'; 38 + return OLD; 39 + end; 40 + $$; 41 + 42 + create trigger t_author_feed_del_post 43 + before delete 44 + on posts 45 + for each row 46 + execute procedure f_author_feed_del_post(); 47 + 48 + -- author_feeds repost triggers 49 + create function f_author_feed_ins_repost() returns trigger 50 + language plpgsql as 51 + $$ 52 + begin 53 + insert into author_feeds (uri, cid, post, did, typ, sort_at) 54 + VALUES ('at://' || NEW.did || 'app.bsky.feed.repost' || NEW.rkey, NEW.post_cid, NEW.post, NEW.did, 'repost', NEW.created_at) 55 + on conflict do nothing; 56 + return NEW; 57 + end; 58 + $$; 59 + 60 + create trigger t_author_feed_ins_repost 61 + before insert 62 + on reposts 63 + for each row 64 + execute procedure f_author_feed_ins_repost(); 65 + 66 + create function f_author_feed_del_repost() returns trigger 67 + language plpgsql as 68 + $$ 69 + begin 70 + delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost'; 71 + return OLD; 72 + end; 73 + $$; 74 + 75 + create trigger t_author_feed_del_repost 76 + before delete 77 + on reposts 78 + for each row 79 + execute procedure f_author_feed_del_repost();
+163 -4
parakeet-db/src/schema.rs
··· 9 10 11 12 13 14 - 15 - 16 - 17 - 18 19 20 ··· 284 embed_subtype -> Nullable<Text>, 285 created_at -> Timestamptz, 286 indexed_at -> Timestamp, 287 } 288 } 289
··· 9 10 11 12 + } 13 + } 14 15 + diesel::table! { 16 + author_feeds (uri) { 17 + uri -> Text, 18 + cid -> Text, 19 + post -> Text, 20 + did -> Text, 21 + typ -> Text, 22 + sort_at -> Timestamptz, 23 + } 24 + } 25 26 + diesel::table! { 27 + backfill (repo, repo_ver) { 28 + repo -> Text, 29 30 31 ··· 295 embed_subtype -> Nullable<Text>, 296 created_at -> Timestamptz, 297 indexed_at -> Timestamp, 298 + mentions -> Nullable<Array<Nullable<Text>>>, 299 + violates_threadgate -> Bool, 300 } 301 } 302 303 + 304 + 305 + 306 + 307 + 308 + 309 + 310 + 311 + 312 + 313 + 314 + 315 + 316 + 317 + 318 + 319 + 320 + 321 + 322 + 323 + 324 + 325 + 326 + 327 + 328 + 329 + 330 + 331 + 332 + 333 + 334 + 335 + 336 + 337 + 338 + 339 + 340 + 341 + 342 + 343 + 344 + 345 + 346 + 347 + 348 + 349 + 350 + 351 + 352 + 353 + 354 + 355 + 356 + 357 + 358 + 359 + 360 + 361 + 362 + 363 + 364 + 365 + 366 + 367 + 368 + 369 + 370 + 371 + 372 + 373 + 374 + 375 + 376 + 377 + 378 + 379 + 380 + 381 + 382 + 383 + 384 + 385 + 386 + 387 + 388 + 389 + 390 + 391 + 392 + 393 + 394 + 395 + 396 + 397 + 398 + 399 + 400 + 401 + 402 + 403 + 404 + 405 + 406 + 407 + 408 + 409 + 410 + 411 + 412 + 413 + 414 + 415 + 416 + 417 + 418 + 419 + 420 + 421 + 422 + 423 + 424 + 425 + 426 + 427 + 428 + 429 + 430 + 431 + 432 + 433 + 434 + 435 + 436 + 437 + 438 + 439 + 440 + 441 + 442 + 443 + diesel::allow_tables_to_appear_in_same_query!( 444 + actors, 445 + author_feeds, 446 + backfill, 447 + backfill_jobs, 448 + blocks,
+208
consumer/src/db/gates.rs
···
··· 1 + use super::{PgExecResult, PgResult}; 2 + use crate::indexer::records::{ 3 + AppBskyFeedThreadgate, ThreadgateRule, THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING, 4 + THREADGATE_RULE_LIST, THREADGATE_RULE_MENTION, 5 + }; 6 + use chrono::prelude::*; 7 + use chrono::{DateTime, Utc}; 8 + use deadpool_postgres::GenericClient; 9 + use std::collections::HashSet; 10 + 11 + pub async fn post_enforce_threadgate<C: GenericClient>( 12 + conn: &mut C, 13 + root: &str, 14 + post_author: &str, 15 + post_created_at: DateTime<Utc>, 16 + is_backfill: bool, 17 + ) -> PgResult<bool> { 18 + // check if the root and the current post are the same author 19 + // strip "at://" then break into parts by '/' 20 + let parts = root[5..].split('/').collect::<Vec<_>>(); 21 + let root_author = parts[0]; 22 + if root_author == post_author { 23 + return Ok(false); 24 + } 25 + 26 + let tg_data = super::threadgate_get(conn, root).await?; 27 + 28 + let Some((created_at, allow, allow_lists)) = tg_data else { 29 + return Ok(false); 30 + }; 31 + 32 + // when backfilling, there's no point continuing if the record is dated before the threadgate 33 + if is_backfill && post_created_at < created_at { 34 + return Ok(false); 35 + } 36 + 37 + if allow.is_empty() { 38 + return Ok(true); 39 + } 40 + 41 + let allow: HashSet<String> = HashSet::from_iter(allow); 42 + 43 + if allow.contains(THREADGATE_RULE_FOLLOWER) || allow.contains(THREADGATE_RULE_FOLLOWING) { 44 + let profile_state: Option<(bool, bool)> = conn 45 + .query_opt( 46 + "SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2", 47 + &[&root_author, &post_author], 48 + ) 49 + .await? 50 + .map(|v| (v.get(0), v.get(1))); 51 + 52 + if let Some((following, followed)) = profile_state { 53 + if allow.contains(THREADGATE_RULE_FOLLOWER) && followed { 54 + return Ok(false); 55 + } 56 + 57 + if allow.contains(THREADGATE_RULE_FOLLOWING) && following { 58 + return Ok(false); 59 + } 60 + } 61 + } 62 + 63 + // check mentions 64 + if allow.contains(THREADGATE_RULE_MENTION) { 65 + let mentions: Vec<String> = conn 66 + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 67 + .await? 68 + .map(|r| r.get(0)) 69 + .unwrap_or_default(); 70 + 71 + if mentions.contains(&post_author.to_owned()) { 72 + return Ok(false); 73 + } 74 + } 75 + 76 + if allow.contains(THREADGATE_RULE_LIST) { 77 + if allow_lists.is_empty() { 78 + return Ok(true); 79 + } 80 + 81 + let count: i64 = conn 82 + .query_one( 83 + "SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2", 84 + &[&allow_lists, &post_author], 85 + ) 86 + .await? 87 + .get(0); 88 + if count != 0 { 89 + return Ok(false); 90 + } 91 + } 92 + 93 + Ok(true) 94 + } 95 + 96 + pub async fn postgate_maintain_detaches<C: GenericClient>( 97 + conn: &mut C, 98 + post: &str, 99 + detached: &[String], 100 + disable_effective: Option<NaiveDateTime>, 101 + ) -> PgExecResult { 102 + conn.execute( 103 + "SELECT maintain_postgates($1, $2, $3)", 104 + &[&post, &detached, &disable_effective], 105 + ) 106 + .await 107 + } 108 + 109 + // variant of post_enforce_threadgate that runs when backfilling to clean up any posts already in DB 110 + pub async fn threadgate_enforce_backfill<C: GenericClient>( 111 + conn: &mut C, 112 + root_author: &str, 113 + threadgate: &AppBskyFeedThreadgate, 114 + ) -> PgExecResult { 115 + // pull out allow - if it's None we can skip this gate. 116 + let Some(allow) = threadgate.allow.as_ref() else { 117 + return Ok(0); 118 + }; 119 + 120 + let root = &threadgate.post; 121 + 122 + if allow.is_empty() { 123 + // blind update everything 124 + return conn.execute( 125 + "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri=$1 AND did != $2 AND created_at >= $3", 126 + &[&root, &root_author, &threadgate.created_at], 127 + ).await; 128 + } 129 + 130 + // pull authors with our root_uri where the author is not the root author and are dated after created_at 131 + // this is mutable because we'll remove ALLOWED dids 132 + let mut dids: HashSet<String> = conn 133 + .query( 134 + "SELECT DISTINCT did FROM posts WHERE root_uri=$1 AND did != $2 AND created_at >= $3", 135 + &[&root, &root_author, &threadgate.created_at], 136 + ) 137 + .await? 138 + .into_iter() 139 + .map(|row| row.get(0)) 140 + .collect(); 141 + 142 + // this will be empty if there are no replies. 143 + if dids.is_empty() { 144 + return Ok(0); 145 + } 146 + 147 + let allowed_lists = allow 148 + .iter() 149 + .filter_map(|rule| match rule { 150 + ThreadgateRule::List { list } => Some(list), 151 + _ => None, 152 + }) 153 + .collect::<Vec<_>>(); 154 + 155 + let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str())); 156 + 157 + if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() { 158 + let current_dids: Vec<_> = dids.iter().collect(); 159 + 160 + let res = conn.query( 161 + "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", 162 + &[&root_author, &current_dids] 163 + ).await?; 164 + 165 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 166 + } 167 + 168 + if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { 169 + let current_dids: Vec<_> = dids.iter().collect(); 170 + 171 + let res = conn.query( 172 + "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", 173 + &[&root_author, &current_dids] 174 + ).await?; 175 + 176 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 177 + } 178 + 179 + if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { 180 + let mentions: Vec<String> = conn 181 + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 182 + .await? 183 + .map(|r| r.get(0)) 184 + .unwrap_or_default(); 185 + 186 + dids = &dids - &HashSet::from_iter(mentions); 187 + } 188 + 189 + if allow.contains(THREADGATE_RULE_LIST) && !dids.is_empty() { 190 + let current_dids: Vec<_> = dids.iter().collect(); 191 + 192 + let res = conn 193 + .query( 194 + "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", 195 + &[&allowed_lists, &current_dids], 196 + ) 197 + .await?; 198 + 199 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 200 + } 201 + 202 + let dids = dids.into_iter().collect::<Vec<_>>(); 203 + 204 + conn.execute( 205 + "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri = $1 AND did = ANY($2) AND created_at >= $3", 206 + &[&threadgate.post, &dids, &threadgate.created_at] 207 + ).await 208 + }
+2
consumer/src/db/mod.rs
··· 7 mod actor; 8 mod backfill; 9 pub mod copy; 10 mod labels; 11 mod record; 12 13 pub use actor::*; 14 pub use backfill::*; 15 pub use labels::*; 16 pub use record::*;
··· 7 mod actor; 8 mod backfill; 9 pub mod copy; 10 + mod gates; 11 mod labels; 12 mod record; 13 14 pub use actor::*; 15 pub use backfill::*; 16 + pub use gates::*; 17 pub use labels::*; 18 pub use record::*;
+9 -4
consumer/src/indexer/records.rs
··· 272 pub hidden_replies: Vec<String>, 273 } 274 275 #[derive(Debug, Deserialize, Serialize)] 276 #[serde(tag = "$type")] 277 pub enum ThreadgateRule { ··· 288 impl ThreadgateRule { 289 pub fn as_str(&self) -> &'static str { 290 match self { 291 - ThreadgateRule::Mention => "app.bsky.feed.threadgate#mentionRule", 292 - ThreadgateRule::Follower => "app.bsky.feed.threadgate#followerRule", 293 - ThreadgateRule::Following => "app.bsky.feed.threadgate#followingRule", 294 - ThreadgateRule::List { .. } => "app.bsky.feed.threadgate#listRule", 295 } 296 } 297 }
··· 272 pub hidden_replies: Vec<String>, 273 } 274 275 + pub const THREADGATE_RULE_MENTION: &str = "app.bsky.feed.threadgate#mentionRule"; 276 + pub const THREADGATE_RULE_FOLLOWER: &str = "app.bsky.feed.threadgate#followerRule"; 277 + pub const THREADGATE_RULE_FOLLOWING: &str = "app.bsky.feed.threadgate#followingRule"; 278 + pub const THREADGATE_RULE_LIST: &str = "app.bsky.feed.threadgate#listRule"; 279 + 280 #[derive(Debug, Deserialize, Serialize)] 281 #[serde(tag = "$type")] 282 pub enum ThreadgateRule { ··· 293 impl ThreadgateRule { 294 pub fn as_str(&self) -> &'static str { 295 match self { 296 + ThreadgateRule::Mention => THREADGATE_RULE_MENTION, 297 + ThreadgateRule::Follower => THREADGATE_RULE_FOLLOWER, 298 + ThreadgateRule::Following => THREADGATE_RULE_FOLLOWING, 299 + ThreadgateRule::List { .. } => THREADGATE_RULE_LIST, 300 } 301 } 302 }
+6 -1
consumer/src/backfill/mod.rs
··· 275 follows: Vec<(String, String, DateTime<Utc>)>, 276 list_items: Vec<(String, records::AppBskyGraphListItem)>, 277 verifications: Vec<(String, Cid, records::AppBskyGraphVerification)>, 278 records: Vec<(String, Cid)>, 279 } 280 281 impl CopyStore { 282 async fn submit(self, t: &mut Transaction<'_>, did: &str) -> Result<(), tokio_postgres::Error> { 283 db::copy::copy_likes(t, did, self.likes).await?; 284 - db::copy::copy_posts(t, did, self.posts).await?; 285 db::copy::copy_reposts(t, did, self.reposts).await?; 286 db::copy::copy_blocks(t, did, self.blocks).await?; 287 db::copy::copy_follows(t, did, self.follows).await?; 288 db::copy::copy_list_items(t, self.list_items).await?; 289 db::copy::copy_verification(t, did, self.verifications).await?; 290 db::copy::copy_records(t, did, self.records).await?; 291 292 Ok(())
··· 275 follows: Vec<(String, String, DateTime<Utc>)>, 276 list_items: Vec<(String, records::AppBskyGraphListItem)>, 277 verifications: Vec<(String, Cid, records::AppBskyGraphVerification)>, 278 + threadgates: Vec<(String, Cid, records::AppBskyFeedThreadgate)>, // not COPY'd but needs to be kept until last. 279 records: Vec<(String, Cid)>, 280 } 281 282 impl CopyStore { 283 async fn submit(self, t: &mut Transaction<'_>, did: &str) -> Result<(), tokio_postgres::Error> { 284 db::copy::copy_likes(t, did, self.likes).await?; 285 db::copy::copy_reposts(t, did, self.reposts).await?; 286 db::copy::copy_blocks(t, did, self.blocks).await?; 287 db::copy::copy_follows(t, did, self.follows).await?; 288 db::copy::copy_list_items(t, self.list_items).await?; 289 db::copy::copy_verification(t, did, self.verifications).await?; 290 + db::copy::copy_posts(t, did, self.posts).await?; 291 + for (at_uri, cid, record) in self.threadgates { 292 + db::threadgate_enforce_backfill(t, did, &record).await?; 293 + db::threadgate_upsert(t, &at_uri, cid, record).await?; 294 + } 295 db::copy::copy_records(t, did, self.records).await?; 296 297 Ok(())
+279
parakeet-db/src/models.rs
··· 148 pub embed: Option<String>, 149 pub embed_subtype: Option<String>, 150 151 pub created_at: DateTime<Utc>, 152 pub indexed_at: NaiveDateTime, 153 }
··· 148 pub embed: Option<String>, 149 pub embed_subtype: Option<String>, 150 151 + pub mentions: Option<Vec<Option<String>>>, 152 + pub violates_threadgate: bool, 153 + 154 pub created_at: DateTime<Utc>, 155 pub indexed_at: NaiveDateTime, 156 } 157 + 158 + 159 + 160 + 161 + 162 + 163 + 164 + 165 + 166 + 167 + 168 + 169 + 170 + 171 + 172 + 173 + 174 + 175 + 176 + 177 + 178 + 179 + 180 + 181 + 182 + 183 + 184 + 185 + 186 + 187 + 188 + 189 + 190 + 191 + 192 + 193 + 194 + 195 + 196 + 197 + 198 + 199 + 200 + 201 + 202 + 203 + 204 + 205 + 206 + 207 + 208 + 209 + 210 + 211 + 212 + 213 + 214 + 215 + 216 + 217 + 218 + 219 + 220 + 221 + 222 + 223 + 224 + 225 + 226 + 227 + 228 + 229 + 230 + 231 + 232 + 233 + 234 + 235 + 236 + 237 + 238 + 239 + 240 + 241 + 242 + 243 + 244 + 245 + 246 + 247 + 248 + 249 + 250 + 251 + 252 + 253 + 254 + 255 + 256 + 257 + 258 + 259 + 260 + 261 + 262 + 263 + 264 + 265 + 266 + 267 + 268 + 269 + 270 + 271 + 272 + 273 + 274 + 275 + 276 + 277 + 278 + 279 + 280 + 281 + 282 + 283 + 284 + 285 + 286 + 287 + 288 + 289 + 290 + 291 + 292 + 293 + 294 + 295 + 296 + 297 + 298 + 299 + 300 + 301 + 302 + 303 + 304 + 305 + 306 + 307 + 308 + 309 + 310 + 311 + 312 + 313 + 314 + 315 + 316 + 317 + 318 + 319 + 320 + 321 + 322 + 323 + 324 + 325 + 326 + 327 + 328 + 329 + 330 + 331 + 332 + 333 + 334 + 335 + 336 + 337 + 338 + 339 + 340 + 341 + 342 + 343 + 344 + 345 + 346 + 347 + 348 + 349 + 350 + 351 + 352 + 353 + 354 + 355 + 356 + 357 + 358 + 359 + 360 + 361 + 362 + 363 + 364 + 365 + 366 + 367 + 368 + 369 + 370 + 371 + 372 + 373 + 374 + 375 + 376 + 377 + 378 + 379 + 380 + 381 + 382 + 383 + 384 + 385 + 386 + 387 + 388 + 389 + 390 + 391 + 392 + 393 + 394 + 395 + 396 + 397 + 398 + 399 + 400 + 401 + 402 + 403 + 404 + 405 + 406 + 407 + 408 + 409 + 410 + 411 + 412 + 413 + 414 + 415 + 416 + 417 + pub subject_type: &'a str, 418 + pub tags: Vec<String>, 419 + } 420 + 421 + #[derive(Debug, Queryable, Selectable, Identifiable)] 422 + #[diesel(table_name = crate::schema::author_feeds)] 423 + #[diesel(primary_key(uri))] 424 + #[diesel(check_for_backend(diesel::pg::Pg))] 425 + pub struct AuthorFeedItem { 426 + pub uri: String, 427 + pub cid: String, 428 + pub post: String, 429 + pub did: String, 430 + pub typ: String, 431 + pub sort_at: DateTime<Utc>, 432 + }
+2 -2
parakeet/src/sql/thread.sql
··· 1 with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth 2 from posts 3 - where parent_uri = $1 4 union all 5 select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1 6 from posts p 7 join thread on p.parent_uri = thread.at_uri 8 - where thread.depth <= $2) 9 select * 10 from thread 11 order by depth desc;
··· 1 with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth 2 from posts 3 + where parent_uri = $1 and violates_threadgate=FALSE 4 union all 5 select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1 6 from posts p 7 join thread on p.parent_uri = thread.at_uri 8 + where thread.depth <= $2 and p.violates_threadgate=FALSE) 9 select * 10 from thread 11 order by depth desc;
+4 -2
parakeet/src/sql/thread_parent.sql
··· 1 with recursive parents as (select at_uri, cid, parent_uri, root_uri, 0 as depth 2 from posts 3 - where at_uri = (select parent_uri from posts where at_uri = $1) 4 union all 5 select p.at_uri, p.cid, p.parent_uri, p.root_uri, parents.depth + 1 6 from posts p 7 join parents on p.at_uri = parents.parent_uri 8 - where parents.depth <= $2) 9 select * 10 from parents 11 order by depth desc;
··· 1 with recursive parents as (select at_uri, cid, parent_uri, root_uri, 0 as depth 2 from posts 3 + where 4 + at_uri = (select parent_uri from posts where at_uri = $1 and violates_threadgate = FALSE) 5 union all 6 select p.at_uri, p.cid, p.parent_uri, p.root_uri, parents.depth + 1 7 from posts p 8 join parents on p.at_uri = parents.parent_uri 9 + where parents.depth <= $2 10 + and p.violates_threadgate = FALSE) 11 select * 12 from parents 13 order by depth desc;
+15 -11
parakeet/src/xrpc/app_bsky/feed/likes.rs
··· 1 2 3 ··· 51 52 53 54 55 - 56 - 57 - 58 - 59 - 60 - 61 - 62 - .map(|(_, uri)| uri.clone()) 63 .collect::<Vec<_>>(); 64 65 - let mut posts = hyd.hydrate_feed_posts(at_uris).await; 66 67 - let feed: Vec<_> = results 68 - .into_iter()
··· 1 + use crate::hydration::posts::RawFeedItem; 2 + use crate::hydration::StatefulHydrator; 3 + use crate::xrpc::error::{Error, XrpcResult}; 4 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 5 6 7 ··· 55 56 57 58 + .last() 59 + .map(|(last, _)| last.timestamp_millis().to_string()); 60 61 + let raw_feed = results 62 + .iter() 63 + .map(|(_, uri)| RawFeedItem::Post { 64 + uri: uri.clone(), 65 + context: None, 66 + }) 67 .collect::<Vec<_>>(); 68 69 + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; 70 71 + Ok(Json(FeedRes { cursor, feed })) 72 + }

History

1 round 0 comments
sign up or login to add to the discussion
mia.omg.lol submitted #0
15 commits
expand
postgates: trust provided timestamp only when backfilling
store mentions and tags from facets in DB
clippy
store if a post violates a threadgate
move postgate and threadgate enforcement into their own file
default deny threadgates
fix list allows
remove accidental dbg!
consts for threadgate rules
correct threadgate backfilling
update models
enforce threadgates
show reposts in author feeds
make author feeds work like bluesky
the big feed refactor
expand 0 comments
closed without merging