Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

feat: chat.bsky declarations

closes #3

Changed files
+153 -21
consumer
lexica
src
app_bsky
migrations
2025-04-08-171113_chat-decl
parakeet
src
parakeet-db
+26
consumer/src/indexer/db.rs
··· 667 667 .execute(conn) 668 668 .await 669 669 } 670 + 671 + pub async fn upsert_chat_decl( 672 + conn: &mut AsyncPgConnection, 673 + did: &str, 674 + rec: records::ChatBskyActorDeclaration, 675 + ) -> QueryResult<usize> { 676 + let data = models::NewChatDecl { 677 + did, 678 + allow_incoming: rec.allow_incoming.to_string(), 679 + }; 680 + 681 + diesel::insert_into(schema::chat_decls::table) 682 + .values(&data) 683 + .on_conflict(schema::chat_decls::did) 684 + .do_update() 685 + .set(&data) 686 + .execute(conn) 687 + .await 688 + } 689 + 690 + pub async fn delete_chat_decl(conn: &mut AsyncPgConnection, did: &str) -> QueryResult<usize> { 691 + diesel::delete(schema::chat_decls::table) 692 + .filter(schema::chat_decls::did.eq(did)) 693 + .execute(conn) 694 + .await 695 + }
+6
consumer/src/indexer/mod.rs
··· 420 420 421 421 db::insert_list_item(conn, at_uri, record).await?; 422 422 } 423 + RecordTypes::ChatBskyActorDeclaration(record) => { 424 + if rkey == "self" { 425 + db::upsert_chat_decl(conn, repo, record).await?; 426 + } 427 + } 423 428 } 424 429 425 430 Ok(()) ··· 453 458 } 454 459 CollectionType::BskyListBlock => db::delete_list_block(conn, at_uri).await?, 455 460 CollectionType::BskyListItem => db::delete_list_item(conn, at_uri).await?, 461 + CollectionType::ChatActorDecl => db::delete_chat_decl(conn, at_uri).await?, 456 462 _ => unreachable!(), 457 463 }; 458 464
+7
consumer/src/indexer/records.rs
··· 1 1 use crate::utils; 2 2 use chrono::{DateTime, Utc}; 3 3 use ipld_core::cid::Cid; 4 + use lexica::app_bsky::actor::ChatAllowIncoming; 4 5 use lexica::app_bsky::embed::AspectRatio; 5 6 use lexica::app_bsky::richtext::FacetMain; 6 7 use serde::{Deserialize, Serialize}; ··· 291 292 pub subject: String, 292 293 pub created_at: DateTime<Utc>, 293 294 } 295 + 296 + #[derive(Debug, Deserialize, Serialize)] 297 + #[serde(rename_all = "camelCase")] 298 + pub struct ChatBskyActorDeclaration { 299 + pub allow_incoming: ChatAllowIncoming, 300 + }
+5
consumer/src/indexer/types.rs
··· 29 29 AppBskyGraphListBlock(records::AppBskyGraphListBlock), 30 30 #[serde(rename = "app.bsky.graph.listitem")] 31 31 AppBskyGraphListItem(records::AppBskyGraphListItem), 32 + #[serde(rename = "chat.bsky.actor.declaration")] 33 + ChatBskyActorDeclaration(records::ChatBskyActorDeclaration), 32 34 } 33 35 34 36 #[derive(Debug, PartialOrd, PartialEq, Deserialize, Serialize)] ··· 45 47 BskyList, 46 48 BskyListBlock, 47 49 BskyListItem, 50 + ChatActorDecl, 48 51 Unsupported, 49 52 } 50 53 ··· 63 66 "app.bsky.graph.list" => CollectionType::BskyList, 64 67 "app.bsky.graph.listblock" => CollectionType::BskyListBlock, 65 68 "app.bsky.graph.listitem" => CollectionType::BskyListItem, 69 + "chat.bsky.actor.declaration" => CollectionType::ChatActorDecl, 66 70 _ => CollectionType::Unsupported, 67 71 } 68 72 } ··· 81 85 CollectionType::BskyList => true, 82 86 CollectionType::BskyListBlock => false, 83 87 CollectionType::BskyListItem => false, 88 + CollectionType::ChatActorDecl => true, 84 89 CollectionType::Unsupported => false, 85 90 } 86 91 }
+26 -2
lexica/src/app_bsky/actor.rs
··· 1 + use std::str::FromStr; 1 2 use chrono::prelude::*; 2 - use serde::Serialize; 3 + use serde::{Deserialize, Serialize}; 3 4 4 5 #[derive(Clone, Default, Debug, Serialize)] 5 6 #[serde(rename_all = "camelCase")] ··· 18 19 pub allow_incoming: ChatAllowIncoming, 19 20 } 20 21 21 - #[derive(Copy, Clone, Debug, Serialize)] 22 + #[derive(Copy, Clone, Debug, Deserialize, Serialize)] 22 23 #[serde(rename_all = "lowercase")] 23 24 pub enum ChatAllowIncoming { 24 25 All, 25 26 None, 26 27 Following, 28 + } 29 + 30 + impl ToString for ChatAllowIncoming { 31 + fn to_string(&self) -> String { 32 + match self { 33 + ChatAllowIncoming::All => "all".into(), 34 + ChatAllowIncoming::None => "none".into(), 35 + ChatAllowIncoming::Following => "following".into(), 36 + } 37 + } 38 + } 39 + 40 + impl FromStr for ChatAllowIncoming { 41 + type Err = String; 42 + 43 + fn from_str(s: &str) -> Result<Self, Self::Err> { 44 + match s { 45 + "all" => Ok(ChatAllowIncoming::All), 46 + "none" => Ok(ChatAllowIncoming::None), 47 + "following" => Ok(ChatAllowIncoming::Following), 48 + x => Err(format!("Unrecognized variant {}", x).into()), 49 + } 50 + } 27 51 } 28 52 29 53 #[derive(Clone, Debug, Serialize)]
+1
migrations/2025-04-08-171113_chat-decl/down.sql
··· 1 + drop table chat_decls;
+6
migrations/2025-04-08-171113_chat-decl/up.sql
··· 1 + create table chat_decls 2 + ( 3 + did text primary key references actors (did), 4 + allow_incoming text not null, 5 + indexed_at timestamp not null default now() 6 + );
+8
parakeet-db/src/models.rs
··· 546 546 pub post_cid: String, 547 547 pub created_at: NaiveDateTime, 548 548 } 549 + 550 + #[derive(Insertable, AsChangeset)] 551 + #[diesel(table_name = crate::schema::chat_decls)] 552 + #[diesel(check_for_backend(diesel::pg::Pg))] 553 + pub struct NewChatDecl<'a> { 554 + pub did: &'a str, 555 + pub allow_incoming: String, 556 + }
+10
parakeet-db/src/schema.rs
··· 32 32 } 33 33 34 34 diesel::table! { 35 + chat_decls (did) { 36 + did -> Text, 37 + allow_incoming -> Text, 38 + indexed_at -> Timestamp, 39 + } 40 + } 41 + 42 + diesel::table! { 35 43 feedgens (at_uri) { 36 44 at_uri -> Text, 37 45 cid -> Text, ··· 241 249 242 250 diesel::joinable!(backfill -> actors (repo)); 243 251 diesel::joinable!(blocks -> actors (did)); 252 + diesel::joinable!(chat_decls -> actors (did)); 244 253 diesel::joinable!(feedgens -> actors (owner)); 245 254 diesel::joinable!(follows -> actors (did)); 246 255 diesel::joinable!(likes -> actors (did)); ··· 261 270 actors, 262 271 backfill, 263 272 blocks, 273 + chat_decls, 264 274 feedgens, 265 275 follow_stats, 266 276 follows,
+48 -16
parakeet/src/hydration/profile.rs
··· 1 1 use crate::loaders::Dataloaders; 2 - use lexica::app_bsky::actor::{ProfileView, ProfileViewBasic, ProfileViewDetailed}; 2 + use lexica::app_bsky::actor::{ 3 + ChatAllowIncoming, ProfileAssociated, ProfileAssociatedChat, ProfileView, ProfileViewBasic, 4 + ProfileViewDetailed, 5 + }; 3 6 use parakeet_db::models; 4 7 use std::collections::HashMap; 5 8 6 - fn build_basic(handle: Option<String>, profile: models::Profile) -> ProfileViewBasic { 9 + fn build_associated(chat: Option<ChatAllowIncoming>) -> Option<ProfileAssociated> { 10 + if let Some(allow_incoming) = chat { 11 + Some(ProfileAssociated { 12 + lists: 0, 13 + feedgens: 0, 14 + starter_packs: 0, 15 + labeler: false, 16 + chat: Some(ProfileAssociatedChat { allow_incoming }), 17 + }) 18 + } else { 19 + None 20 + } 21 + } 22 + 23 + fn build_basic( 24 + handle: Option<String>, 25 + profile: models::Profile, 26 + chat_decl: Option<ChatAllowIncoming>, 27 + ) -> ProfileViewBasic { 28 + let associated = build_associated(chat_decl); 29 + 7 30 ProfileViewBasic { 8 31 did: profile.did, 9 32 handle: handle.unwrap_or("handle.invalid".to_string()), ··· 11 34 avatar: profile 12 35 .avatar_cid 13 36 .map(|v| format!("https://localhost/avatar/{v}")), 14 - associated: None, 37 + associated, 15 38 created_at: profile.created_at.and_utc(), 16 39 } 17 40 } 18 41 19 - fn build_profile(handle: Option<String>, profile: models::Profile) -> ProfileView { 42 + fn build_profile( 43 + handle: Option<String>, 44 + profile: models::Profile, 45 + chat_decl: Option<ChatAllowIncoming>, 46 + ) -> ProfileView { 47 + let associated = build_associated(chat_decl); 48 + 20 49 ProfileView { 21 50 did: profile.did, 22 51 handle: handle.unwrap_or("handle.invalid".to_string()), ··· 25 54 avatar: profile 26 55 .avatar_cid 27 56 .map(|v| format!("https://localhost/avatar/{v}")), 28 - associated: None, 57 + associated, 29 58 created_at: profile.created_at.and_utc(), 30 59 indexed_at: profile.indexed_at, 31 60 } ··· 35 64 handle: Option<String>, 36 65 profile: models::Profile, 37 66 follow_stats: Option<models::FollowStats>, 67 + chat_decl: Option<ChatAllowIncoming>, 38 68 ) -> ProfileViewDetailed { 69 + let associated = build_associated(chat_decl); 70 + 39 71 ProfileViewDetailed { 40 72 did: profile.did, 41 73 handle: handle.unwrap_or("handle.invalid".to_string()), ··· 55 87 .as_ref() 56 88 .map(|v| v.following as i64) 57 89 .unwrap_or_default(), 58 - associated: None, 90 + associated, 59 91 created_at: profile.created_at.and_utc(), 60 92 indexed_at: profile.indexed_at, 61 93 } 62 94 } 63 95 64 96 pub async fn hydrate_profile_basic(loaders: &Dataloaders, did: String) -> Option<ProfileViewBasic> { 65 - let (handle, profile, _) = loaders.profile.load(did).await?; 97 + let (handle, profile, _, chat_decl) = loaders.profile.load(did).await?; 66 98 67 - Some(build_basic(handle, profile)) 99 + Some(build_basic(handle, profile, chat_decl)) 68 100 } 69 101 70 102 pub async fn hydrate_profiles_basic( ··· 75 107 76 108 profiles 77 109 .into_iter() 78 - .map(|(k, (handle, profile, _))| (k, build_basic(handle, profile))) 110 + .map(|(k, (handle, profile, _, chat_decl))| (k, build_basic(handle, profile, chat_decl))) 79 111 .collect() 80 112 } 81 113 82 114 pub async fn hydrate_profile(loaders: &Dataloaders, did: String) -> Option<ProfileView> { 83 - let (handle, profile, _) = loaders.profile.load(did).await?; 115 + let (handle, profile, _, chat_decl) = loaders.profile.load(did).await?; 84 116 85 - Some(build_profile(handle, profile)) 117 + Some(build_profile(handle, profile, chat_decl)) 86 118 } 87 119 88 120 pub async fn hydrate_profiles( ··· 93 125 94 126 profiles 95 127 .into_iter() 96 - .map(|(k, (handle, profile, _))| (k, build_profile(handle, profile))) 128 + .map(|(k, (handle, profile, _, chat_decl))| (k, build_profile(handle, profile, chat_decl))) 97 129 .collect() 98 130 } 99 131 ··· 101 133 loaders: &Dataloaders, 102 134 did: String, 103 135 ) -> Option<ProfileViewDetailed> { 104 - let (handle, profile, follow_stats) = loaders.profile.load(did).await?; 136 + let (handle, profile, follow_stats, chat_decl) = loaders.profile.load(did).await?; 105 137 106 - Some(build_detailed(handle, profile, follow_stats)) 138 + Some(build_detailed(handle, profile, follow_stats, chat_decl)) 107 139 } 108 140 109 141 pub async fn hydrate_profiles_detailed( ··· 114 146 115 147 profiles 116 148 .into_iter() 117 - .map(|(k, (handle, profile, follow_stats))| { 118 - (k, build_detailed(handle, profile, follow_stats)) 149 + .map(|(k, (handle, profile, follow_stats, chat_decl))| { 150 + (k, build_detailed(handle, profile, follow_stats, chat_decl)) 119 151 }) 120 152 .collect() 121 153 }
+10 -3
parakeet/src/loaders.rs
··· 6 6 use itertools::Itertools; 7 7 use parakeet_db::{models, schema}; 8 8 use std::collections::HashMap; 9 + use std::str::FromStr; 10 + use lexica::app_bsky::actor::ChatAllowIncoming; 9 11 10 12 pub struct Dataloaders { 11 13 pub embed: Loader<String, EmbedLoaderRet, EmbedLoader>, ··· 56 58 } 57 59 58 60 pub struct ProfileLoader(Pool<AsyncPgConnection>); 59 - type ProfileLoaderRet = (Option<String>, models::Profile, Option<models::FollowStats>); 61 + type ProfileLoaderRet = (Option<String>, models::Profile, Option<models::FollowStats>, Option<ChatAllowIncoming>); 60 62 impl BatchFn<String, ProfileLoaderRet> for ProfileLoader { 61 63 async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> { 62 64 let mut conn = self.0.get().await.unwrap(); ··· 66 68 .left_join( 67 69 schema::follow_stats::table.on(schema::follow_stats::did.eq(schema::actors::did)), 68 70 ) 71 + .left_join(schema::chat_decls::table.on(schema::chat_decls::did.eq(schema::actors::did))) 69 72 .select(( 70 73 schema::actors::did, 71 74 schema::actors::handle, 72 75 models::Profile::as_select(), 73 76 Option::<models::FollowStats>::as_select(), 77 + schema::chat_decls::allow_incoming.nullable(), 74 78 )) 75 79 .filter(schema::actors::did.eq_any(keys)) 76 80 .load::<( ··· 78 82 Option<String>, 79 83 models::Profile, 80 84 Option<models::FollowStats>, 85 + Option<String>, 81 86 )>(&mut conn) 82 87 .await; 83 88 84 89 match res { 85 90 Ok(res) => { 86 - HashMap::from_iter(res.into_iter().map(|(did, handle, profile, follow_stats)| { 87 - (did, (handle, profile, follow_stats)) 91 + HashMap::from_iter(res.into_iter().map(|(did, handle, profile, follow_stats, chat_decl)| { 92 + let chat_decl = chat_decl.and_then(|v| ChatAllowIncoming::from_str(&v).ok()); 93 + 94 + (did, (handle, profile, follow_stats, chat_decl)) 88 95 })) 89 96 } 90 97 Err(e) => {