Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
69
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'feat-bookmarks' into 'main'

Bookmarks

See merge request parakeet-social/parakeet!21

+395
+32
consumer/src/db/record.rs
··· 4 4 use chrono::prelude::*; 5 5 use deadpool_postgres::GenericClient; 6 6 use ipld_core::cid::Cid; 7 + use lexica::community_lexicon::bookmarks::Bookmark; 7 8 8 9 pub async fn record_upsert<C: GenericClient>( 9 10 conn: &mut C, ··· 20 21 pub async fn record_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { 21 22 conn.execute("DELETE FROM records WHERE at_uri=$1", &[&at_uri]) 22 23 .await 24 + } 25 + 26 + pub async fn bookmark_upsert<C: GenericClient>( 27 + conn: &mut C, 28 + rkey: &str, 29 + repo: &str, 30 + rec: Bookmark, 31 + ) -> PgExecResult { 32 + // strip "at://" then break into parts by '/' 33 + let rec_type = match rec.subject.strip_prefix("at://") { 34 + Some(at_uri) => at_uri.split('/').collect::<Vec<_>>()[1], 35 + None => "$uri", 36 + }; 37 + 38 + conn.execute( 39 + include_str!("sql/bookmarks_upsert.sql"), 40 + &[&repo, &rkey, &rec.subject, &rec_type, &rec.tags, &rec.created_at], 41 + ) 42 + .await 43 + } 44 + 45 + pub async fn bookmark_delete<C: GenericClient>( 46 + conn: &mut C, 47 + rkey: &str, 48 + repo: &str, 49 + ) -> PgExecResult { 50 + conn.execute( 51 + "DELETE FROM bookmarks WHERE rkey=$1 AND did=$2", 52 + &[&rkey, &repo], 53 + ) 54 + .await 23 55 } 24 56 25 57 pub async fn block_insert<C: GenericClient>(
+5
consumer/src/db/sql/bookmarks_upsert.sql
··· 1 + INSERT INTO bookmarks (did, rkey, subject, subject_type, tags, created_at) 2 + VALUES ($1, $2, $3, $4, $5, $6) 3 + ON CONFLICT (did, rkey) DO UPDATE SET subject=EXCLUDED.subject, 4 + subject_type=EXCLUDED.subject_type, 5 + tags=EXCLUDED.tags
+6
consumer/src/indexer/mod.rs
··· 723 723 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 724 724 } 725 725 } 726 + RecordTypes::CommunityLexiconBookmark(record) => { 727 + db::bookmark_upsert(conn, rkey, repo, record).await?; 728 + } 726 729 } 727 730 728 731 db::record_upsert(conn, at_uri, repo, cid).await?; ··· 832 835 CollectionType::ChatActorDecl => { 833 836 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 834 837 db::chat_decl_delete(conn, repo).await? 838 + } 839 + CollectionType::CommunityLexiconBookmark => { 840 + db::bookmark_delete(conn, rkey, repo).await? 835 841 } 836 842 _ => unreachable!(), 837 843 };
+5
consumer/src/indexer/types.rs
··· 41 41 AppBskyNotificationDeclaration(records::AppBskyNotificationDeclaration), 42 42 #[serde(rename = "chat.bsky.actor.declaration")] 43 43 ChatBskyActorDeclaration(records::ChatBskyActorDeclaration), 44 + #[serde(rename = "community.lexicon.bookmarks.bookmark")] 45 + CommunityLexiconBookmark(lexica::community_lexicon::bookmarks::Bookmark) 44 46 } 45 47 46 48 #[derive(Debug, PartialOrd, PartialEq, Deserialize, Serialize)] ··· 63 65 BskyLabelerService, 64 66 BskyNotificationDeclaration, 65 67 ChatActorDecl, 68 + CommunityLexiconBookmark, 66 69 Unsupported, 67 70 } 68 71 ··· 87 90 "app.bsky.labeler.service" => CollectionType::BskyLabelerService, 88 91 "app.bsky.notification.declaration" => CollectionType::BskyNotificationDeclaration, 89 92 "chat.bsky.actor.declaration" => CollectionType::ChatActorDecl, 93 + "community.lexicon.bookmarks.bookmark" => CollectionType::CommunityLexiconBookmark, 90 94 _ => CollectionType::Unsupported, 91 95 } 92 96 } ··· 111 115 CollectionType::BskyVerification => false, 112 116 CollectionType::BskyLabelerService => true, 113 117 CollectionType::BskyNotificationDeclaration => true, 118 + CollectionType::CommunityLexiconBookmark => true, 114 119 CollectionType::Unsupported => false, 115 120 } 116 121 }
+32
lexica/src/app_bsky/bookmark.rs
··· 1 + use crate::app_bsky::feed::{BlockedAuthor, PostView}; 2 + use crate::StrongRef; 3 + use chrono::prelude::*; 4 + use serde::Serialize; 5 + 6 + #[derive(Clone, Debug, Serialize)] 7 + #[serde(rename_all = "camelCase")] 8 + pub struct BookmarkView { 9 + pub subject: StrongRef, 10 + pub item: BookmarkViewItem, 11 + pub created_at: DateTime<Utc>, 12 + } 13 + 14 + #[derive(Clone, Debug, Serialize)] 15 + #[serde(tag = "$type")] 16 + // This is technically the same as ReplyRefPost atm, but just in case... 17 + pub enum BookmarkViewItem { 18 + #[serde(rename = "app.bsky.feed.defs#postView")] 19 + Post(PostView), 20 + #[serde(rename = "app.bsky.feed.defs#notFoundPost")] 21 + NotFound { 22 + uri: String, 23 + #[serde(rename = "notFound")] 24 + not_found: bool, 25 + }, 26 + #[serde(rename = "app.bsky.feed.defs#blockedPost")] 27 + Blocked { 28 + uri: String, 29 + blocked: bool, 30 + author: BlockedAuthor, 31 + }, 32 + }
+1
lexica/src/app_bsky/mod.rs
··· 1 1 use serde::Serialize; 2 2 3 3 pub mod actor; 4 + pub mod bookmark; 4 5 pub mod embed; 5 6 pub mod feed; 6 7 pub mod graph;
+14
lexica/src/community_lexicon/bookmarks.rs
··· 1 + use chrono::prelude::*; 2 + use serde::{Deserialize, Serialize}; 3 + 4 + #[derive(Clone, Debug, Deserialize, Serialize)] 5 + #[serde(tag = "$type")] 6 + #[serde(rename = "community.lexicon.bookmarks.bookmark")] 7 + #[serde(rename_all = "camelCase")] 8 + pub struct Bookmark { 9 + pub subject: String, 10 + #[serde(default)] 11 + #[serde(skip_serializing_if = "Vec::is_empty")] 12 + pub tags: Vec<String>, 13 + pub created_at: DateTime<Utc>, 14 + }
+1
lexica/src/community_lexicon/mod.rs
··· 1 + pub mod bookmarks;
+8
lexica/src/lib.rs
··· 5 5 6 6 pub mod app_bsky; 7 7 pub mod com_atproto; 8 + pub mod community_lexicon; 8 9 mod utils; 9 10 10 11 #[derive(Clone, Debug, Serialize)] ··· 21 22 )] 22 23 pub cid: Cid, 23 24 pub uri: String, 25 + } 26 + 27 + impl StrongRef { 28 + pub fn new_from_str(uri: String, cid: &str) -> Result<Self, cid::Error> { 29 + let cid = cid.parse()?; 30 + Ok(StrongRef { uri, cid }) 31 + } 24 32 } 25 33 26 34 #[derive(Clone, Debug, Deserialize, Serialize)]
+1
migrations/2025-09-02-190833_bookmarks/down.sql
··· 1 + drop table bookmarks;
+19
migrations/2025-09-02-190833_bookmarks/up.sql
··· 1 + create table bookmarks 2 + ( 3 + did text not null references actors (did), 4 + rkey text, 5 + subject text not null, 6 + subject_cid text, 7 + subject_type text not null, 8 + tags text[] not null default ARRAY []::text[], 9 + 10 + created_at timestamptz not null default now(), 11 + 12 + primary key (did, subject) 13 + ); 14 + 15 + create index bookmarks_rkey_index on bookmarks (rkey); 16 + create index bookmarks_subject_index on bookmarks (subject); 17 + create index bookmarks_subject_type_index on bookmarks (subject_type); 18 + create index bookmarks_tags_index on bookmarks using gin (tags); 19 + create unique index bookmarks_rkey_ui on bookmarks (did, rkey);
+26
parakeet-db/src/models.rs
··· 383 383 pub did: &'a str, 384 384 pub list_uri: &'a str, 385 385 } 386 + 387 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 388 + #[diesel(table_name = crate::schema::bookmarks)] 389 + #[diesel(primary_key(did, subject, subject_cid))] 390 + #[diesel(check_for_backend(diesel::pg::Pg))] 391 + pub struct Bookmark { 392 + pub did: String, 393 + pub rkey: Option<String>, 394 + pub subject: String, 395 + pub subject_cid: Option<String>, 396 + pub subject_type: String, 397 + pub tags: Vec<Option<String>>, 398 + pub created_at: DateTime<Utc>, 399 + } 400 + 401 + #[derive(Debug, Insertable, AsChangeset)] 402 + #[diesel(table_name = crate::schema::bookmarks)] 403 + #[diesel(check_for_backend(diesel::pg::Pg))] 404 + pub struct NewBookmark<'a> { 405 + pub did: &'a str, 406 + pub rkey: Option<String>, 407 + pub subject: &'a str, 408 + pub subject_cid: Option<String>, 409 + pub subject_type: &'a str, 410 + pub tags: Vec<String>, 411 + }
+14
parakeet-db/src/schema.rs
··· 43 43 } 44 44 45 45 diesel::table! { 46 + bookmarks (did, subject) { 47 + did -> Text, 48 + rkey -> Nullable<Text>, 49 + subject -> Text, 50 + subject_cid -> Nullable<Text>, 51 + subject_type -> Text, 52 + tags -> Array<Nullable<Text>>, 53 + created_at -> Timestamptz, 54 + } 55 + } 56 + 57 + diesel::table! { 46 58 chat_decls (did) { 47 59 did -> Text, 48 60 allow_incoming -> Text, ··· 375 387 376 388 diesel::joinable!(backfill -> actors (repo)); 377 389 diesel::joinable!(blocks -> actors (did)); 390 + diesel::joinable!(bookmarks -> actors (did)); 378 391 diesel::joinable!(chat_decls -> actors (did)); 379 392 diesel::joinable!(feedgens -> actors (owner)); 380 393 diesel::joinable!(follows -> actors (did)); ··· 405 418 backfill, 406 419 backfill_jobs, 407 420 blocks, 421 + bookmarks, 408 422 chat_decls, 409 423 feedgens, 410 424 follows,
+146
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 1 + use crate::hydration::StatefulHydrator; 2 + use crate::xrpc::error::XrpcResult; 3 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 4 + use crate::xrpc::{datetime_cursor, CursorQuery}; 5 + use crate::GlobalState; 6 + use axum::extract::{Query, State}; 7 + use axum::Json; 8 + use diesel::prelude::*; 9 + use diesel_async::RunQueryDsl; 10 + use lexica::app_bsky::bookmark::{BookmarkView, BookmarkViewItem}; 11 + use parakeet_db::{models, schema}; 12 + use serde::{Deserialize, Serialize}; 13 + use lexica::StrongRef; 14 + 15 + const BSKY_ALLOWED_TYPES: &[&str] = &["app.bsky.feed.post"]; 16 + 17 + #[derive(Debug, Deserialize)] 18 + pub struct CreateBookmarkReq { 19 + pub uri: String, 20 + pub cid: String, 21 + } 22 + 23 + pub async fn create_bookmark( 24 + State(state): State<GlobalState>, 25 + auth: AtpAuth, 26 + Json(form): Json<CreateBookmarkReq>, 27 + ) -> XrpcResult<()> { 28 + let mut conn = state.pool.get().await?; 29 + 30 + // strip "at://" then break into parts by '/' 31 + let parts = form.uri[5..].split('/').collect::<Vec<_>>(); 32 + 33 + let data = models::NewBookmark { 34 + did: &auth.0, 35 + rkey: None, 36 + subject: &form.uri, 37 + subject_cid: Some(form.cid), 38 + subject_type: &parts[1], 39 + tags: vec![], 40 + }; 41 + 42 + diesel::insert_into(schema::bookmarks::table) 43 + .values(&data) 44 + .on_conflict_do_nothing() 45 + .execute(&mut conn) 46 + .await?; 47 + 48 + Ok(()) 49 + } 50 + 51 + #[derive(Debug, Deserialize)] 52 + pub struct DeleteBookmarkReq { 53 + pub uri: String, 54 + } 55 + 56 + pub async fn delete_bookmark( 57 + State(state): State<GlobalState>, 58 + auth: AtpAuth, 59 + Json(form): Json<DeleteBookmarkReq>, 60 + ) -> XrpcResult<()> { 61 + let mut conn = state.pool.get().await?; 62 + 63 + diesel::delete(schema::bookmarks::table) 64 + .filter( 65 + schema::bookmarks::did 66 + .eq(&auth.0) 67 + .and(schema::bookmarks::subject.eq(&form.uri)), 68 + ) 69 + .execute(&mut conn) 70 + .await?; 71 + 72 + Ok(()) 73 + } 74 + 75 + #[derive(Debug, Serialize)] 76 + pub struct GetBookmarksRes { 77 + #[serde(skip_serializing_if = "Option::is_none")] 78 + cursor: Option<String>, 79 + bookmarks: Vec<BookmarkView>, 80 + } 81 + 82 + pub async fn get_bookmarks( 83 + State(state): State<GlobalState>, 84 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 85 + auth: AtpAuth, 86 + Query(query): Query<CursorQuery>, 87 + ) -> XrpcResult<Json<GetBookmarksRes>> { 88 + let mut conn = state.pool.get().await?; 89 + let did = auth.0.clone(); 90 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, Some(auth)); 91 + 92 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 93 + 94 + let mut bookmarks_query = schema::bookmarks::table 95 + .select(models::Bookmark::as_select()) 96 + .filter(schema::bookmarks::did.eq(&did)) 97 + .filter(schema::bookmarks::subject_type.eq_any(BSKY_ALLOWED_TYPES)) 98 + .into_boxed(); 99 + 100 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 101 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor)); 102 + } 103 + 104 + let results = bookmarks_query 105 + .order(schema::bookmarks::created_at.desc()) 106 + .limit(limit as i64) 107 + .load(&mut conn) 108 + .await?; 109 + 110 + let cursor = results 111 + .last() 112 + .map(|bm| bm.created_at.timestamp_millis().to_string()); 113 + 114 + let uris = results.iter().map(|bm| bm.subject.clone()).collect(); 115 + 116 + let mut posts = hyd.hydrate_posts(uris).await; 117 + 118 + let bookmarks = results 119 + .into_iter() 120 + .filter_map(|bookmark| { 121 + let maybe_item = posts.remove(&bookmark.subject); 122 + let maybe_cid = maybe_item.as_ref().map(|v| v.cid.clone()); 123 + 124 + // ensure that either the cid is set in the bookmark record *or* in the post record 125 + // otherwise just ditch. we should have one. 126 + let cid = bookmark.subject_cid.or(maybe_cid)?; 127 + 128 + let item = maybe_item.map(BookmarkViewItem::Post).unwrap_or( 129 + BookmarkViewItem::NotFound { 130 + uri: bookmark.subject.clone(), 131 + not_found: true, 132 + }, 133 + ); 134 + 135 + let subject = StrongRef::new_from_str(bookmark.subject, &cid).ok()?; 136 + 137 + Some(BookmarkView { 138 + subject, 139 + item, 140 + created_at: bookmark.created_at, 141 + }) 142 + }) 143 + .collect(); 144 + 145 + Ok(Json(GetBookmarksRes { cursor, bookmarks })) 146 + }
+4
parakeet/src/xrpc/app_bsky/mod.rs
··· 2 2 use axum::Router; 3 3 4 4 mod actor; 5 + mod bookmark; 5 6 mod feed; 6 7 mod graph; 7 8 mod labeler; ··· 14 15 // TODO: app.bsky.actor.getSuggestions (recs) 15 16 // TODO: app.bsky.actor.searchActor (search) 16 17 // TODO: app.bsky.actor.searchActorTypeahead (search) 18 + .route("/app.bsky.bookmark.createBookmark", post(bookmark::create_bookmark)) 19 + .route("/app.bsky.bookmark.deleteBookmark", post(bookmark::delete_bookmark)) 20 + .route("/app.bsky.bookmark.getBookmarks", get(bookmark::get_bookmarks)) 17 21 .route("/app.bsky.feed.getActorFeeds", get(feed::feedgen::get_actor_feeds)) 18 22 .route("/app.bsky.feed.getActorLikes", get(feed::likes::get_actor_likes)) 19 23 .route("/app.bsky.feed.getAuthorFeed", get(feed::posts::get_author_feed))
+69
parakeet/src/xrpc/community_lexicon/bookmarks.rs
··· 1 + use crate::xrpc::datetime_cursor; 2 + use crate::xrpc::error::XrpcResult; 3 + use crate::xrpc::extract::AtpAuth; 4 + use crate::GlobalState; 5 + use axum::extract::{Query, State}; 6 + use axum::Json; 7 + use diesel::prelude::*; 8 + use diesel_async::RunQueryDsl; 9 + use lexica::community_lexicon::bookmarks::Bookmark; 10 + use parakeet_db::{models, schema}; 11 + use serde::{Deserialize, Serialize}; 12 + 13 + #[derive(Debug, Deserialize)] 14 + pub struct BookmarkCursorQuery { 15 + pub tags: Option<Vec<String>>, 16 + pub limit: Option<u8>, 17 + pub cursor: Option<String>, 18 + } 19 + 20 + #[derive(Debug, Serialize)] 21 + pub struct GetActorBookmarksRes { 22 + #[serde(skip_serializing_if = "Option::is_none")] 23 + cursor: Option<String>, 24 + bookmarks: Vec<Bookmark>, 25 + } 26 + 27 + pub async fn get_actor_bookmarks( 28 + State(state): State<GlobalState>, 29 + auth: AtpAuth, 30 + Query(query): Query<BookmarkCursorQuery>, 31 + ) -> XrpcResult<Json<GetActorBookmarksRes>> { 32 + let mut conn = state.pool.get().await?; 33 + 34 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 35 + 36 + let mut bookmarks_query = schema::bookmarks::table 37 + .select(models::Bookmark::as_select()) 38 + .filter(schema::bookmarks::did.eq(&auth.0)) 39 + .into_boxed(); 40 + 41 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 42 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor)); 43 + } 44 + 45 + if let Some(tags) = query.tags { 46 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::tags.contains(tags)); 47 + } 48 + 49 + let results = bookmarks_query 50 + .order(schema::bookmarks::created_at.desc()) 51 + .limit(limit as i64) 52 + .load(&mut conn) 53 + .await?; 54 + 55 + let cursor = results 56 + .last() 57 + .map(|bm| bm.created_at.timestamp_millis().to_string()); 58 + 59 + let bookmarks = results 60 + .into_iter() 61 + .map(|bookmark| Bookmark { 62 + subject: bookmark.subject, 63 + tags: bookmark.tags.into_iter().flatten().collect(), 64 + created_at: bookmark.created_at, 65 + }) 66 + .collect(); 67 + 68 + Ok(Json(GetActorBookmarksRes { cursor, bookmarks })) 69 + }
+10
parakeet/src/xrpc/community_lexicon/mod.rs
··· 1 + use axum::routing::get; 2 + use axum::Router; 3 + 4 + pub mod bookmarks; 5 + 6 + #[rustfmt::skip] 7 + pub fn routes() -> Router<crate::GlobalState> { 8 + Router::new() 9 + .route("/community.lexicon.bookmarks.getActorBookmarks", get(bookmarks::get_actor_bookmarks)) 10 + }
+2
parakeet/src/xrpc/mod.rs
··· 8 8 mod app_bsky; 9 9 pub mod cdn; 10 10 mod com_atproto; 11 + mod community_lexicon; 11 12 mod error; 12 13 pub mod extract; 13 14 pub mod jwt; ··· 16 17 Router::new() 17 18 .merge(app_bsky::routes()) 18 19 .merge(com_atproto::routes()) 20 + .merge(community_lexicon::routes()) 19 21 } 20 22 21 23 fn datetime_cursor(cursor: Option<&String>) -> Option<chrono::DateTime<chrono::Utc>> {