Rust AppView - highly experimental!

Compare changes

Choose any two refs to compare.

Changed files
+1087 -98
consumer
lexica
migrations
2025-09-02-190833_bookmarks
parakeet
parakeet-db
parakeet-index
+1
.envrc
···
··· 1 + use flake
+1
.gitignore
··· 4 .env 5 Config.toml 6 data/
··· 4 .env 5 Config.toml 6 data/ 7 + .direnv/
+11
README.md
··· 3 Parakeet is a [Bluesky](https://bsky.app) [AppView](https://atproto.wiki/en/wiki/reference/core-architecture/appview) 4 aiming to implement most of the functionality required to support the Bluesky client. Notably not implemented is a CDN. 5 6 ## The Code 7 Parakeet is implemented in Rust, using Postgres as a database, Redis for caching and queue processing, RocksDB for 8 aggregation, and Diesel for migrations and querying.
··· 3 Parakeet is a [Bluesky](https://bsky.app) [AppView](https://atproto.wiki/en/wiki/reference/core-architecture/appview) 4 aiming to implement most of the functionality required to support the Bluesky client. Notably not implemented is a CDN. 5 6 + ## Status and Roadmap 7 + Most common functionality works, with notable omissions being like/repost/follow statuses, blocks and mutes don't get 8 + applied, labels might not track CIDs properly, label redaction doesn't work at all (beware!). 9 + 10 + Future work is tracked in issues, but the highlights are below. Help would be highly appreciated. 11 + - Notifications 12 + - Search 13 + - Pinned Posts 14 + - The Timeline 15 + - Monitoring: metrics, tracing, and health checks. 16 + 17 ## The Code 18 Parakeet is implemented in Rust, using Postgres as a database, Redis for caching and queue processing, RocksDB for 19 aggregation, and Diesel for migrations and querying.
+8 -8
consumer/src/backfill/downloader.rs
··· 109 Ok(Some(did_doc)) => { 110 let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else { 111 tracing::warn!("bad DID doc for {did}"); 112 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 113 .await 114 .unwrap(); 115 continue; ··· 132 } 133 } 134 Ok(None) => { 135 - tracing::warn!(did, "bad DID doc"); 136 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 137 .await 138 .unwrap(); 139 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 140 .await 141 .unwrap(); 142 } ··· 145 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 146 .await 147 .unwrap(); 148 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 149 .await 150 .unwrap(); 151 } ··· 179 Ok(false) => continue, 180 Err(e) => { 181 tracing::error!(pds, did, "failed to check repo status: {e}"); 182 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 183 .await 184 .unwrap(); 185 continue; ··· 190 if let Some(handle) = maybe_handle { 191 if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await { 192 tracing::error!(pds, did, "failed to resolve handle: {e}"); 193 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 194 .await 195 .unwrap(); 196 } ··· 253 pds: &str, 254 did: &str, 255 ) -> eyre::Result<Option<(i32, i32)>> { 256 - let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?; 257 - 258 let res = http 259 .get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={did}")) 260 .send() 261 .await? 262 .error_for_status()?; 263 264 let headers = res.headers(); 265 let ratelimit_rem = header_to_int(headers, "ratelimit-remaining");
··· 109 Ok(Some(did_doc)) => { 110 let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else { 111 tracing::warn!("bad DID doc for {did}"); 112 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did_svc") 113 .await 114 .unwrap(); 115 continue; ··· 132 } 133 } 134 Ok(None) => { 135 + tracing::warn!(did, "bad/missing DID doc"); 136 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 137 .await 138 .unwrap(); 139 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did_doc") 140 .await 141 .unwrap(); 142 } ··· 145 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 146 .await 147 .unwrap(); 148 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did") 149 .await 150 .unwrap(); 151 } ··· 179 Ok(false) => continue, 180 Err(e) => { 181 tracing::error!(pds, did, "failed to check repo status: {e}"); 182 + db::backfill_job_write(&mut conn, &did, "failed.resolve.status") 183 .await 184 .unwrap(); 185 continue; ··· 190 if let Some(handle) = maybe_handle { 191 if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await { 192 tracing::error!(pds, did, "failed to resolve handle: {e}"); 193 + db::backfill_job_write(&mut conn, &did, "failed.resolve.handle") 194 .await 195 .unwrap(); 196 } ··· 253 pds: &str, 254 did: &str, 255 ) -> eyre::Result<Option<(i32, i32)>> { 256 let res = http 257 .get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={did}")) 258 .send() 259 .await? 260 .error_for_status()?; 261 + 262 + let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?; 263 264 let headers = res.headers(); 265 let ratelimit_rem = header_to_int(headers, "ratelimit-remaining");
+1 -1
consumer/src/backfill/mod.rs
··· 131 } 132 } 133 134 - #[instrument(skip(conn, inner))] 135 async fn backfill_actor( 136 conn: &mut Object, 137 rc: &mut MultiplexedConnection,
··· 131 } 132 } 133 134 + #[instrument(skip(conn, rc, inner))] 135 async fn backfill_actor( 136 conn: &mut Object, 137 rc: &mut MultiplexedConnection,
+5 -2
consumer/src/backfill/repo.rs
··· 1 use super::{ 2 - types::{CarCommitEntry, CarEntry}, 3 CopyStore, 4 }; 5 use crate::indexer::records; ··· 54 CarEntry::Commit(_) => { 55 tracing::warn!("got commit entry that was not in root") 56 } 57 - CarEntry::Record(record) => { 58 if let Some(path) = mst_nodes.remove(&cid) { 59 record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 60 } else { 61 records.insert(cid, record); 62 } 63 } 64 CarEntry::Mst(mst) => { 65 let mut out = Vec::with_capacity(mst.e.len());
··· 1 use super::{ 2 + types::{CarCommitEntry, CarEntry, CarRecordEntry}, 3 CopyStore, 4 }; 5 use crate::indexer::records; ··· 54 CarEntry::Commit(_) => { 55 tracing::warn!("got commit entry that was not in root") 56 } 57 + CarEntry::Record(CarRecordEntry::Known(record)) => { 58 if let Some(path) = mst_nodes.remove(&cid) { 59 record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 60 } else { 61 records.insert(cid, record); 62 } 63 + } 64 + CarEntry::Record(CarRecordEntry::Other { ty }) => { 65 + tracing::debug!("repo contains unknown record type: {ty} ({cid})"); 66 } 67 CarEntry::Mst(mst) => { 68 let mut out = Vec::with_capacity(mst.e.len());
+11 -1
consumer/src/backfill/types.rs
··· 8 pub enum CarEntry { 9 Mst(CarMstEntry), 10 Commit(CarCommitEntry), 11 - Record(RecordTypes), 12 } 13 14 #[derive(Debug, Deserialize)] ··· 33 pub rev: String, 34 pub prev: Option<Cid>, 35 pub sig: ByteBuf, 36 } 37 38 #[derive(Debug, Deserialize)]
··· 8 pub enum CarEntry { 9 Mst(CarMstEntry), 10 Commit(CarCommitEntry), 11 + Record(CarRecordEntry), 12 } 13 14 #[derive(Debug, Deserialize)] ··· 33 pub rev: String, 34 pub prev: Option<Cid>, 35 pub sig: ByteBuf, 36 + } 37 + 38 + #[derive(Debug, Deserialize)] 39 + #[serde(untagged)] 40 + pub enum CarRecordEntry { 41 + Known(RecordTypes), 42 + Other { 43 + #[serde(rename = "$type")] 44 + ty: String, 45 + }, 46 } 47 48 #[derive(Debug, Deserialize)]
+1 -1
consumer/src/db/backfill.rs
··· 19 status: &str, 20 ) -> PgExecResult { 21 conn.execute( 22 - "INSERT INTO backfill_jobs (did, status) VALUES ($1, $2)", 23 &[&did, &status], 24 ) 25 .await
··· 19 status: &str, 20 ) -> PgExecResult { 21 conn.execute( 22 + "INSERT INTO backfill_jobs (did, status) VALUES ($1, $2) ON CONFLICT (did) DO UPDATE SET status = $2, updated_at = NOW()", 23 &[&did, &status], 24 ) 25 .await
+32
consumer/src/db/record.rs
··· 4 use chrono::prelude::*; 5 use deadpool_postgres::GenericClient; 6 use ipld_core::cid::Cid; 7 8 pub async fn record_upsert<C: GenericClient>( 9 conn: &mut C, ··· 20 pub async fn record_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { 21 conn.execute("DELETE FROM records WHERE at_uri=$1", &[&at_uri]) 22 .await 23 } 24 25 pub async fn block_insert<C: GenericClient>(
··· 4 use chrono::prelude::*; 5 use deadpool_postgres::GenericClient; 6 use ipld_core::cid::Cid; 7 + use lexica::community_lexicon::bookmarks::Bookmark; 8 9 pub async fn record_upsert<C: GenericClient>( 10 conn: &mut C, ··· 21 pub async fn record_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { 22 conn.execute("DELETE FROM records WHERE at_uri=$1", &[&at_uri]) 23 .await 24 + } 25 + 26 + pub async fn bookmark_upsert<C: GenericClient>( 27 + conn: &mut C, 28 + rkey: &str, 29 + repo: &str, 30 + rec: Bookmark, 31 + ) -> PgExecResult { 32 + // strip "at://" then break into parts by '/' 33 + let rec_type = match rec.subject.strip_prefix("at://") { 34 + Some(at_uri) => at_uri.split('/').collect::<Vec<_>>()[1], 35 + None => "$uri", 36 + }; 37 + 38 + conn.execute( 39 + include_str!("sql/bookmarks_upsert.sql"), 40 + &[&repo, &rkey, &rec.subject, &rec_type, &rec.tags, &rec.created_at], 41 + ) 42 + .await 43 + } 44 + 45 + pub async fn bookmark_delete<C: GenericClient>( 46 + conn: &mut C, 47 + rkey: &str, 48 + repo: &str, 49 + ) -> PgExecResult { 50 + conn.execute( 51 + "DELETE FROM bookmarks WHERE rkey=$1 AND did=$2", 52 + &[&rkey, &repo], 53 + ) 54 + .await 55 } 56 57 pub async fn block_insert<C: GenericClient>(
+5
consumer/src/db/sql/bookmarks_upsert.sql
···
··· 1 + INSERT INTO bookmarks (did, rkey, subject, subject_type, tags, created_at) 2 + VALUES ($1, $2, $3, $4, $5, $6) 3 + ON CONFLICT (did, rkey) DO UPDATE SET subject=EXCLUDED.subject, 4 + subject_type=EXCLUDED.subject_type, 5 + tags=EXCLUDED.tags
+15
consumer/src/firehose/mod.rs
··· 117 118 FirehoseEvent::Label(event) 119 } 120 _ => { 121 tracing::warn!("unknown event type {ty}"); 122 return Ok(FirehoseOutput::Continue);
··· 117 118 FirehoseEvent::Label(event) 119 } 120 + "#sync" => { 121 + counter!("firehose_events.total", "event" => "sync").increment(1); 122 + let event: AtpSyncEvent = 123 + serde_ipld_dagcbor::from_reader(&mut reader)?; 124 + 125 + // increment the seq 126 + if self.seq < event.seq { 127 + self.seq = event.seq; 128 + } else { 129 + tracing::error!("Event sequence was not greater than previous seq, exiting. {} <= {}", event.seq, self.seq); 130 + return Ok(FirehoseOutput::Close); 131 + } 132 + 133 + FirehoseEvent::Sync(event) 134 + } 135 _ => { 136 tracing::warn!("unknown event type {ty}"); 137 return Ok(FirehoseOutput::Continue);
+23
consumer/src/firehose/types.rs
··· 31 Account(AtpAccountEvent), 32 Commit(AtpCommitEvent), 33 Label(AtpLabelEvent), 34 } 35 36 #[derive(Debug, Deserialize)] ··· 48 Suspended, 49 Deleted, 50 Deactivated, 51 } 52 53 impl AtpAccountStatus { ··· 57 AtpAccountStatus::Suspended => "suspended", 58 AtpAccountStatus::Deleted => "deleted", 59 AtpAccountStatus::Deactivated => "deactivated", 60 } 61 } 62 } ··· 68 AtpAccountStatus::Suspended => parakeet_db::types::ActorStatus::Suspended, 69 AtpAccountStatus::Deleted => parakeet_db::types::ActorStatus::Deleted, 70 AtpAccountStatus::Deactivated => parakeet_db::types::ActorStatus::Deactivated, 71 } 72 } 73 } ··· 90 pub since: Option<String>, 91 pub commit: Cid, 92 #[serde(rename = "tooBig")] 93 pub too_big: bool, 94 #[serde(default)] 95 pub blocks: ByteBuf, 96 #[serde(default)] 97 pub ops: Vec<CommitOp>, 98 #[serde(default)] 99 pub blobs: Vec<Cid>, 100 } 101 102 #[derive(Debug, Deserialize)] 103 pub struct CommitOp { 104 pub action: String, 105 pub cid: Option<Cid>, 106 pub path: String, 107 } 108 ··· 124 pub seq: u64, 125 pub labels: Vec<AtpLabel>, 126 }
··· 31 Account(AtpAccountEvent), 32 Commit(AtpCommitEvent), 33 Label(AtpLabelEvent), 34 + Sync(AtpSyncEvent), 35 } 36 37 #[derive(Debug, Deserialize)] ··· 49 Suspended, 50 Deleted, 51 Deactivated, 52 + Throttled, 53 + Desynchronized, 54 } 55 56 impl AtpAccountStatus { ··· 60 AtpAccountStatus::Suspended => "suspended", 61 AtpAccountStatus::Deleted => "deleted", 62 AtpAccountStatus::Deactivated => "deactivated", 63 + AtpAccountStatus::Throttled => "throttled", 64 + AtpAccountStatus::Desynchronized => "desynchronized", 65 } 66 } 67 } ··· 73 AtpAccountStatus::Suspended => parakeet_db::types::ActorStatus::Suspended, 74 AtpAccountStatus::Deleted => parakeet_db::types::ActorStatus::Deleted, 75 AtpAccountStatus::Deactivated => parakeet_db::types::ActorStatus::Deactivated, 76 + AtpAccountStatus::Throttled | AtpAccountStatus::Desynchronized => { 77 + parakeet_db::types::ActorStatus::Active 78 + } 79 } 80 } 81 } ··· 98 pub since: Option<String>, 99 pub commit: Cid, 100 #[serde(rename = "tooBig")] 101 + #[deprecated] 102 pub too_big: bool, 103 #[serde(default)] 104 pub blocks: ByteBuf, 105 #[serde(default)] 106 pub ops: Vec<CommitOp>, 107 #[serde(default)] 108 + #[deprecated] 109 pub blobs: Vec<Cid>, 110 + #[serde(rename = "prevData")] 111 + pub prev_data: Option<Cid>, 112 } 113 114 #[derive(Debug, Deserialize)] 115 pub struct CommitOp { 116 pub action: String, 117 pub cid: Option<Cid>, 118 + pub prev: Option<Cid>, 119 pub path: String, 120 } 121 ··· 137 pub seq: u64, 138 pub labels: Vec<AtpLabel>, 139 } 140 + 141 + #[derive(Debug, Deserialize)] 142 + pub struct AtpSyncEvent { 143 + pub seq: u64, 144 + pub did: String, 145 + pub time: DateTime<Utc>, 146 + pub rev: String, 147 + #[serde(default)] 148 + pub blocks: ByteBuf, 149 + }
+32 -2
consumer/src/indexer/mod.rs
··· 1 use crate::config::HistoryMode; 2 use crate::db; 3 use crate::firehose::{ 4 - AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, CommitOp, FirehoseConsumer, FirehoseEvent, 5 - FirehoseOutput, 6 }; 7 use crate::indexer::types::{ 8 AggregateDeltaStore, BackfillItem, BackfillItemInner, CollectionType, RecordTypes, ··· 107 FirehoseEvent::Commit(commit) => { 108 index_commit(&mut state, &mut conn, &mut rc, commit).await 109 } 110 FirehoseEvent::Label(_) => unreachable!(), 111 }; 112 ··· 188 FirehoseEvent::Identity(identity) => self.hasher.hash_one(&identity.did) % threads, 189 FirehoseEvent::Account(account) => self.hasher.hash_one(&account.did) % threads, 190 FirehoseEvent::Commit(commit) => self.hasher.hash_one(&commit.repo) % threads, 191 FirehoseEvent::Label(_) => { 192 // We handle all labels through direct connections to labelers 193 tracing::warn!("got #labels from the relay"); ··· 199 tracing::error!("Error sending event: {e}"); 200 } 201 } 202 } 203 204 #[instrument(skip_all, fields(seq = identity.seq, repo = identity.did))] ··· 723 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 724 } 725 } 726 } 727 728 db::record_upsert(conn, at_uri, repo, cid).await?; ··· 832 CollectionType::ChatActorDecl => { 833 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 834 db::chat_decl_delete(conn, repo).await? 835 } 836 _ => unreachable!(), 837 };
··· 1 use crate::config::HistoryMode; 2 use crate::db; 3 use crate::firehose::{ 4 + AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, AtpSyncEvent, CommitOp, FirehoseConsumer, 5 + FirehoseEvent, FirehoseOutput, 6 }; 7 use crate::indexer::types::{ 8 AggregateDeltaStore, BackfillItem, BackfillItemInner, CollectionType, RecordTypes, ··· 107 FirehoseEvent::Commit(commit) => { 108 index_commit(&mut state, &mut conn, &mut rc, commit).await 109 } 110 + FirehoseEvent::Sync(sync) => { 111 + process_sync(&state, &mut conn, &mut rc, sync).await 112 + } 113 FirehoseEvent::Label(_) => unreachable!(), 114 }; 115 ··· 191 FirehoseEvent::Identity(identity) => self.hasher.hash_one(&identity.did) % threads, 192 FirehoseEvent::Account(account) => self.hasher.hash_one(&account.did) % threads, 193 FirehoseEvent::Commit(commit) => self.hasher.hash_one(&commit.repo) % threads, 194 + FirehoseEvent::Sync(sync) => self.hasher.hash_one(&sync.did) % threads, 195 FirehoseEvent::Label(_) => { 196 // We handle all labels through direct connections to labelers 197 tracing::warn!("got #labels from the relay"); ··· 203 tracing::error!("Error sending event: {e}"); 204 } 205 } 206 + } 207 + 208 + #[instrument(skip_all, fields(seq = sync.seq, repo = sync.did))] 209 + async fn process_sync( 210 + state: &RelayIndexerState, 211 + conn: &mut Object, 212 + rc: &mut MultiplexedConnection, 213 + sync: AtpSyncEvent, 214 + ) -> eyre::Result<()> { 215 + let Some((sync_state, Some(current_rev))) = db::actor_get_repo_status(conn, &sync.did).await? else { 216 + return Ok(()); 217 + }; 218 + 219 + // don't care if we're not synced. also no point if !do_backfill bc we might not have a worker 220 + if sync_state == ActorSyncState::Synced && state.do_backfill && sync.rev > current_rev { 221 + tracing::debug!("triggering backfill due to #sync"); 222 + rc.rpush::<_, _, i32>("backfill_queue", sync.did).await?; 223 + } 224 + 225 + Ok(()) 226 } 227 228 #[instrument(skip_all, fields(seq = identity.seq, repo = identity.did))] ··· 747 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 748 } 749 } 750 + RecordTypes::CommunityLexiconBookmark(record) => { 751 + db::bookmark_upsert(conn, rkey, repo, record).await?; 752 + } 753 } 754 755 db::record_upsert(conn, at_uri, repo, cid).await?; ··· 859 CollectionType::ChatActorDecl => { 860 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 861 db::chat_decl_delete(conn, repo).await? 862 + } 863 + CollectionType::CommunityLexiconBookmark => { 864 + db::bookmark_delete(conn, rkey, repo).await? 865 } 866 _ => unreachable!(), 867 };
+5
consumer/src/indexer/types.rs
··· 41 AppBskyNotificationDeclaration(records::AppBskyNotificationDeclaration), 42 #[serde(rename = "chat.bsky.actor.declaration")] 43 ChatBskyActorDeclaration(records::ChatBskyActorDeclaration), 44 } 45 46 #[derive(Debug, PartialOrd, PartialEq, Deserialize, Serialize)] ··· 63 BskyLabelerService, 64 BskyNotificationDeclaration, 65 ChatActorDecl, 66 Unsupported, 67 } 68 ··· 87 "app.bsky.labeler.service" => CollectionType::BskyLabelerService, 88 "app.bsky.notification.declaration" => CollectionType::BskyNotificationDeclaration, 89 "chat.bsky.actor.declaration" => CollectionType::ChatActorDecl, 90 _ => CollectionType::Unsupported, 91 } 92 } ··· 111 CollectionType::BskyVerification => false, 112 CollectionType::BskyLabelerService => true, 113 CollectionType::BskyNotificationDeclaration => true, 114 CollectionType::Unsupported => false, 115 } 116 }
··· 41 AppBskyNotificationDeclaration(records::AppBskyNotificationDeclaration), 42 #[serde(rename = "chat.bsky.actor.declaration")] 43 ChatBskyActorDeclaration(records::ChatBskyActorDeclaration), 44 + #[serde(rename = "community.lexicon.bookmarks.bookmark")] 45 + CommunityLexiconBookmark(lexica::community_lexicon::bookmarks::Bookmark) 46 } 47 48 #[derive(Debug, PartialOrd, PartialEq, Deserialize, Serialize)] ··· 65 BskyLabelerService, 66 BskyNotificationDeclaration, 67 ChatActorDecl, 68 + CommunityLexiconBookmark, 69 Unsupported, 70 } 71 ··· 90 "app.bsky.labeler.service" => CollectionType::BskyLabelerService, 91 "app.bsky.notification.declaration" => CollectionType::BskyNotificationDeclaration, 92 "chat.bsky.actor.declaration" => CollectionType::ChatActorDecl, 93 + "community.lexicon.bookmarks.bookmark" => CollectionType::CommunityLexiconBookmark, 94 _ => CollectionType::Unsupported, 95 } 96 } ··· 115 CollectionType::BskyVerification => false, 116 CollectionType::BskyLabelerService => true, 117 CollectionType::BskyNotificationDeclaration => true, 118 + CollectionType::CommunityLexiconBookmark => true, 119 CollectionType::Unsupported => false, 120 } 121 }
+98
flake.lock
···
··· 1 + { 2 + "nodes": { 3 + "crane": { 4 + "locked": { 5 + "lastModified": 1757183466, 6 + "narHash": "sha256-kTdCCMuRE+/HNHES5JYsbRHmgtr+l9mOtf5dpcMppVc=", 7 + "owner": "ipetkov", 8 + "repo": "crane", 9 + "rev": "d599ae4847e7f87603e7082d73ca673aa93c916d", 10 + "type": "github" 11 + }, 12 + "original": { 13 + "owner": "ipetkov", 14 + "repo": "crane", 15 + "type": "github" 16 + } 17 + }, 18 + "flake-utils": { 19 + "inputs": { 20 + "systems": "systems" 21 + }, 22 + "locked": { 23 + "lastModified": 1731533236, 24 + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", 25 + "owner": "numtide", 26 + "repo": "flake-utils", 27 + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", 28 + "type": "github" 29 + }, 30 + "original": { 31 + "owner": "numtide", 32 + "repo": "flake-utils", 33 + "type": "github" 34 + } 35 + }, 36 + "nixpkgs": { 37 + "locked": { 38 + "lastModified": 1758029226, 39 + "narHash": "sha256-TjqVmbpoCqWywY9xIZLTf6ANFvDCXdctCjoYuYPYdMI=", 40 + "owner": "NixOS", 41 + "repo": "nixpkgs", 42 + "rev": "08b8f92ac6354983f5382124fef6006cade4a1c1", 43 + "type": "github" 44 + }, 45 + "original": { 46 + "owner": "NixOS", 47 + "ref": "nixpkgs-unstable", 48 + "repo": "nixpkgs", 49 + "type": "github" 50 + } 51 + }, 52 + "root": { 53 + "inputs": { 54 + "crane": "crane", 55 + "flake-utils": "flake-utils", 56 + "nixpkgs": "nixpkgs", 57 + "rust-overlay": "rust-overlay" 58 + } 59 + }, 60 + "rust-overlay": { 61 + "inputs": { 62 + "nixpkgs": [ 63 + "nixpkgs" 64 + ] 65 + }, 66 + "locked": { 67 + "lastModified": 1758162771, 68 + "narHash": "sha256-hdZpMep6Z1gbgg9piUZ0BNusI6ZJaptBw6PHSN/3GD0=", 69 + "owner": "oxalica", 70 + "repo": "rust-overlay", 71 + "rev": "d0cabb6ae8f5b38dffaff9f4e6db57c0ae21d729", 72 + "type": "github" 73 + }, 74 + "original": { 75 + "owner": "oxalica", 76 + "repo": "rust-overlay", 77 + "type": "github" 78 + } 79 + }, 80 + "systems": { 81 + "locked": { 82 + "lastModified": 1681028828, 83 + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", 84 + "owner": "nix-systems", 85 + "repo": "default", 86 + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", 87 + "type": "github" 88 + }, 89 + "original": { 90 + "owner": "nix-systems", 91 + "repo": "default", 92 + "type": "github" 93 + } 94 + } 95 + }, 96 + "root": "root", 97 + "version": 7 98 + }
+464
flake.nix
···
··· 1 + { 2 + description = "Parakeet is a Rust-based Bluesky AppView"; 3 + inputs = { 4 + nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; 5 + crane.url = "github:ipetkov/crane"; 6 + flake-utils.url = "github:numtide/flake-utils"; 7 + rust-overlay = { 8 + url = "github:oxalica/rust-overlay"; 9 + inputs.nixpkgs.follows = "nixpkgs"; 10 + }; 11 + }; 12 + outputs = 13 + { 14 + self, 15 + nixpkgs, 16 + crane, 17 + flake-utils, 18 + rust-overlay, 19 + ... 20 + }: 21 + flake-utils.lib.eachDefaultSystem ( 22 + system: 23 + let 24 + pkgs = import nixpkgs { 25 + inherit system; 26 + overlays = [ (import rust-overlay) ]; 27 + }; 28 + craneLib = (crane.mkLib pkgs).overrideToolchain ( 29 + p: 30 + p.rust-bin.selectLatestNightlyWith ( 31 + toolchain: 32 + toolchain.default.override { 33 + extensions = [ 34 + "rust-src" 35 + "rust-analyzer" 36 + ]; 37 + } 38 + ) 39 + ); 40 + 41 + inherit (pkgs) lib; 42 + unfilteredRoot = ./.; # The original, unfiltered source 43 + src = lib.fileset.toSource { 44 + root = unfilteredRoot; 45 + fileset = lib.fileset.unions [ 46 + # Default files from crane (Rust and cargo files) 47 + (craneLib.fileset.commonCargoSources unfilteredRoot) 48 + ]; 49 + }; 50 + # Common arguments can be set here to avoid repeating them later 51 + commonArgs = { 52 + inherit src; 53 + strictDeps = true; 54 + nativeBuildInputs = with pkgs; [ 55 + pkg-config 56 + ]; 57 + buildInputs = [ 58 + # Add additional build inputs here 59 + pkgs.openssl 60 + pkgs.postgresql 61 + pkgs.libpq 62 + pkgs.clang 63 + pkgs.libclang 64 + pkgs.lld 65 + pkgs.protobuf 66 + ] 67 + ++ lib.optionals pkgs.stdenv.isDarwin [ 68 + # Additional darwin specific inputs can be set here 69 + pkgs.libiconv 70 + pkgs.darwin.apple_sdk.frameworks.Security 71 + ]; 72 + LIBCLANG_PATH = "${pkgs.llvmPackages_18.libclang.lib}/lib"; 73 + CLANG_PATH = "${pkgs.llvmPackages_18.clang}/bin/clang"; 74 + PROTOC_INCLUDE = "${pkgs.protobuf}/include"; 75 + PROTOC = "${pkgs.protobuf}/bin/protoc"; 76 + 77 + # Additional environment variables can be set directly 78 + # MY_CUSTOM_VAR = "some value"; 79 + }; 80 + 81 + # Build *just* the cargo dependencies, so we can reuse 82 + # all of that work (e.g. via cachix) when running in CI 83 + cargoArtifacts = craneLib.buildDepsOnly commonArgs; 84 + 85 + individualCrateArgs = commonArgs // { 86 + inherit cargoArtifacts; 87 + inherit (craneLib.crateNameFromCargoToml { inherit src; }) version; 88 + # NB: we disable tests since we'll run them all via cargo-nextest 89 + doCheck = false; 90 + }; 91 + fileSetForCrate = 92 + crate: 93 + lib.fileset.toSource { 94 + root = ./.; 95 + fileset = lib.fileset.unions [ 96 + ./Cargo.toml 97 + ./Cargo.lock 98 + ./migrations 99 + (craneLib.fileset.commonCargoSources ./consumer) 100 + ./consumer/src/db/sql 101 + (craneLib.fileset.commonCargoSources ./dataloader-rs) 102 + (craneLib.fileset.commonCargoSources ./did-resolver) 103 + (craneLib.fileset.commonCargoSources ./lexica) 104 + (craneLib.fileset.commonCargoSources ./parakeet) 105 + ./parakeet/src/sql 106 + (craneLib.fileset.commonCargoSources ./parakeet-db) 107 + (craneLib.fileset.commonCargoSources ./parakeet-index) 108 + ./parakeet-index/proto 109 + (craneLib.fileset.commonCargoSources ./parakeet-lexgen) 110 + (craneLib.fileset.commonCargoSources crate) 111 + ]; 112 + }; 113 + 114 + # Build the actual crate itself, reusing the dependency 115 + # artifacts from above. 116 + consumer = craneLib.buildPackage ( 117 + individualCrateArgs 118 + // { 119 + pname = "consumer"; 120 + cargoExtraArgs = "-p consumer"; 121 + src = fileSetForCrate ./consumer; 122 + postInstall = '' 123 + mkdir -p $out/{bin,lib/consumer} 124 + ''; 125 + } 126 + ); 127 + dataloader = craneLib.buildPackage ( 128 + individualCrateArgs 129 + // { 130 + pname = "dataloader"; 131 + cargoExtraArgs = "-p dataloader --features default"; 132 + src = fileSetForCrate ./dataloader-rs; 133 + } 134 + ); 135 + did-resolver = craneLib.buildPackage ( 136 + individualCrateArgs 137 + // { 138 + pname = "did-resolver"; 139 + cargoExtraArgs = "-p did-resolver"; 140 + src = fileSetForCrate ./did-resolver; 141 + } 142 + ); 143 + lexica = craneLib.buildPackage ( 144 + individualCrateArgs 145 + // { 146 + pname = "lexica"; 147 + cargoExtraArgs = "-p lexica"; 148 + src = fileSetForCrate ./lexica; 149 + } 150 + ); 151 + parakeet = craneLib.buildPackage ( 152 + individualCrateArgs 153 + // { 154 + pname = "parakeet"; 155 + cargoExtraArgs = "-p parakeet"; 156 + src = fileSetForCrate ./parakeet; 157 + } 158 + ); 159 + parakeet-db = craneLib.buildPackage ( 160 + individualCrateArgs 161 + // { 162 + pname = "parakeet-db"; 163 + cargoExtraArgs = "-p parakeet-db --features default"; 164 + src = fileSetForCrate ./parakeet-db; 165 + } 166 + ); 167 + parakeet-index = craneLib.buildPackage ( 168 + individualCrateArgs 169 + // { 170 + pname = "parakeet-index"; 171 + cargoExtraArgs = "-p parakeet-index --features server"; 172 + src = fileSetForCrate ./parakeet-index; 173 + } 174 + ); 175 + parakeet-lexgen = craneLib.buildPackage ( 176 + individualCrateArgs 177 + // { 178 + pname = "parakeet-lexgen"; 179 + cargoExtraArgs = "-p parakeet-lexgen"; 180 + src = fileSetForCrate ./parakeet-lexgen; 181 + } 182 + ); 183 + in 184 + { 185 + checks = { 186 + # Build the crate as part of `nix flake check` for convenience 187 + inherit 188 + consumer 189 + dataloader 190 + did-resolver 191 + lexica 192 + parakeet 193 + parakeet-db 194 + parakeet-index 195 + parakeet-lexgen 196 + ; 197 + }; 198 + 199 + packages = { 200 + default = parakeet; 201 + inherit 202 + consumer 203 + dataloader 204 + did-resolver 205 + lexica 206 + parakeet 207 + parakeet-db 208 + parakeet-index 209 + parakeet-lexgen 210 + ; 211 + }; 212 + 213 + devShells.default = craneLib.devShell { 214 + # Inherit inputs from checks. 215 + checks = self.checks.${system}; 216 + 217 + # Additional dev-shell environment variables can be set directly 218 + RUST_BACKTRACE = 1; 219 + NIXOS_OZONE_WL = 1; 220 + LIBCLANG_PATH = "${pkgs.llvmPackages.libclang.lib}/lib"; 221 + 222 + # Extra inputs can be added here; cargo and rustc are provided by default. 223 + packages = with pkgs; [ 224 + openssl 225 + bacon 226 + postgresql 227 + rust-analyzer 228 + rustfmt 229 + clippy 230 + git 231 + nixd 232 + direnv 233 + libpq 234 + clang 235 + libclang 236 + ]; 237 + }; 238 + } 239 + ) 240 + // flake-utils.lib.eachDefaultSystemPassThrough (system: { 241 + nixosModules = { 242 + default = 243 + { 244 + pkgs, 245 + lib, 246 + config, 247 + ... 248 + }: 249 + with lib; 250 + let 251 + cfg = config.services.parakeet; 252 + 253 + inherit (lib) 254 + mkEnableOption 255 + mkIf 256 + mkOption 257 + types 258 + ; 259 + in 260 + { 261 + options.services.parakeet = { 262 + enable = mkEnableOption "parakeet"; 263 + 264 + package = mkOption { 265 + type = types.package; 266 + default = self.packages.${pkgs.system}.default; 267 + description = "The path to the parakeet package."; 268 + }; 269 + 270 + environmentFiles = mkOption { 271 + type = types.listOf types.path; 272 + default = [ "/var/lib/parakeet/config.env" ]; 273 + description = '' 274 + File to load environment variables from. Loaded variables override 275 + values set in {option}`environment`. 276 + ''; 277 + }; 278 + }; 279 + config = mkIf cfg.enable { 280 + environment.systemPackages = [ 281 + self.packages.${pkgs.system}.consumer 282 + ]; 283 + systemd.services.consumer = { 284 + description = "consumer"; 285 + after = [ "network-online.target" ]; 286 + wants = [ "network-online.target" ]; 287 + wantedBy = [ "multi-user.target" ]; 288 + serviceConfig = { 289 + ExecStart = "${self.packages.${pkgs.system}.consumer}/bin/consumer --indexer"; 290 + Type = "exec"; 291 + 292 + EnvironmentFile = cfg.environmentFiles; 293 + User = "parakeet"; 294 + Group = "parakeet"; 295 + StateDirectory = "parakeet"; 296 + StateDirectoryMode = "0755"; 297 + Restart = "always"; 298 + 299 + # Hardening 300 + RemoveIPC = true; 301 + CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ]; 302 + NoNewPrivileges = true; 303 + PrivateDevices = true; 304 + ProtectClock = true; 305 + ProtectKernelLogs = true; 306 + ProtectControlGroups = true; 307 + ProtectKernelModules = true; 308 + PrivateMounts = true; 309 + SystemCallArchitectures = [ "native" ]; 310 + MemoryDenyWriteExecute = false; # required by V8 JIT 311 + RestrictNamespaces = true; 312 + RestrictSUIDSGID = true; 313 + ProtectHostname = true; 314 + LockPersonality = true; 315 + ProtectKernelTunables = true; 316 + RestrictAddressFamilies = [ 317 + "AF_UNIX" 318 + "AF_INET" 319 + "AF_INET6" 320 + ]; 321 + RestrictRealtime = true; 322 + DeviceAllow = [ "" ]; 323 + ProtectSystem = "full"; 324 + ProtectProc = "invisible"; 325 + ProcSubset = "pid"; 326 + ProtectHome = true; 327 + PrivateUsers = true; 328 + PrivateTmp = true; 329 + UMask = "0077"; 330 + }; 331 + }; 332 + systemd.services.parakeet = { 333 + description = "parakeet"; 334 + after = [ "network-online.target" ]; 335 + wants = [ "network-online.target" ]; 336 + wantedBy = [ "multi-user.target" ]; 337 + serviceConfig = { 338 + ExecStart = "${cfg.package}/bin/parakeet"; 339 + Type = "exec"; 340 + 341 + EnvironmentFile = cfg.environmentFiles; 342 + User = "parakeet"; 343 + Group = "parakeet"; 344 + StateDirectory = "parakeet"; 345 + StateDirectoryMode = "0755"; 346 + Restart = "always"; 347 + 348 + # Hardening 349 + RemoveIPC = true; 350 + CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ]; 351 + NoNewPrivileges = true; 352 + PrivateDevices = true; 353 + ProtectClock = true; 354 + ProtectKernelLogs = true; 355 + ProtectControlGroups = true; 356 + ProtectKernelModules = true; 357 + PrivateMounts = true; 358 + SystemCallArchitectures = [ "native" ]; 359 + MemoryDenyWriteExecute = false; # required by V8 JIT 360 + RestrictNamespaces = true; 361 + RestrictSUIDSGID = true; 362 + ProtectHostname = true; 363 + LockPersonality = true; 364 + ProtectKernelTunables = true; 365 + RestrictAddressFamilies = [ 366 + "AF_UNIX" 367 + "AF_INET" 368 + "AF_INET6" 369 + ]; 370 + RestrictRealtime = true; 371 + DeviceAllow = [ "" ]; 372 + ProtectSystem = "full"; 373 + ProtectProc = "invisible"; 374 + ProcSubset = "pid"; 375 + ProtectHome = true; 376 + PrivateUsers = true; 377 + PrivateTmp = true; 378 + UMask = "0077"; 379 + }; 380 + }; 381 + systemd.services.parakeet-index = { 382 + description = "parakeet-index"; 383 + after = [ "network-online.target" ]; 384 + wants = [ "network-online.target" ]; 385 + wantedBy = [ "multi-user.target" ]; 386 + serviceConfig = { 387 + ExecStart = "${self.packages.${pkgs.system}.parakeet-index}/bin/parakeet-index"; 388 + Type = "exec"; 389 + 390 + EnvironmentFile = cfg.environmentFiles; 391 + User = "parakeet"; 392 + Group = "parakeet"; 393 + StateDirectory = "parakeet"; 394 + StateDirectoryMode = "0755"; 395 + Restart = "always"; 396 + 397 + # Hardening 398 + RemoveIPC = true; 399 + CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ]; 400 + NoNewPrivileges = true; 401 + PrivateDevices = true; 402 + ProtectClock = true; 403 + ProtectKernelLogs = true; 404 + ProtectControlGroups = true; 405 + ProtectKernelModules = true; 406 + PrivateMounts = true; 407 + SystemCallArchitectures = [ "native" ]; 408 + MemoryDenyWriteExecute = false; # required by V8 JIT 409 + RestrictNamespaces = true; 410 + RestrictSUIDSGID = true; 411 + ProtectHostname = true; 412 + LockPersonality = true; 413 + ProtectKernelTunables = true; 414 + RestrictAddressFamilies = [ 415 + "AF_UNIX" 416 + "AF_INET" 417 + "AF_INET6" 418 + ]; 419 + RestrictRealtime = true; 420 + DeviceAllow = [ "" ]; 421 + ProtectSystem = "full"; 422 + ProtectProc = "invisible"; 423 + ProcSubset = "pid"; 424 + ProtectHome = true; 425 + PrivateUsers = true; 426 + PrivateTmp = true; 427 + UMask = "0077"; 428 + }; 429 + }; 430 + users = { 431 + users.parakeet = { 432 + group = "parakeet"; 433 + isSystemUser = true; 434 + }; 435 + groups.parakeet = { }; 436 + }; 437 + services.postgresql = { 438 + enable = true; 439 + ensureUsers = [ 440 + { 441 + name = "parakeet"; 442 + ensureDBOwnership = true; 443 + } 444 + ]; 445 + ensureDatabases = [ "parakeet" ]; 446 + authentication = pkgs.lib.mkOverride 10 '' 447 + #type database DBuser auth-method 448 + local all all trust 449 + host all all 127.0.0.1/32 trust 450 + host all all ::1/128 trust 451 + ''; 452 + package = mkForce pkgs.postgresql_16; 453 + }; 454 + services.redis.servers.parakeet = { 455 + enable = true; 456 + # port = 0; 457 + unixSocket = "/run/redis-parakeet/redis.sock"; 458 + user = "parakeet"; 459 + }; 460 + }; 461 + }; 462 + }; 463 + }); 464 + }
+6 -28
lexica/src/app_bsky/actor.rs
··· 1 use crate::app_bsky::embed::External; 2 - use crate::app_bsky::graph::ListViewBasic; 3 use crate::com_atproto::label::Label; 4 use chrono::prelude::*; 5 use serde::{Deserialize, Serialize}; 6 use std::fmt::Display; 7 use std::str::FromStr; 8 - 9 - #[derive(Clone, Default, Debug, Serialize)] 10 - #[serde(rename_all = "camelCase")] 11 - pub struct ProfileViewerState { 12 - pub muted: bool, 13 - #[serde(skip_serializing_if = "Option::is_none")] 14 - pub muted_by_list: Option<ListViewBasic>, 15 - pub blocked_by: bool, 16 - #[serde(skip_serializing_if = "Option::is_none")] 17 - pub blocking: Option<String>, 18 - #[serde(skip_serializing_if = "Option::is_none")] 19 - pub blocking_by_list: Option<ListViewBasic>, 20 - #[serde(skip_serializing_if = "Option::is_none")] 21 - pub following: Option<String>, 22 - #[serde(skip_serializing_if = "Option::is_none")] 23 - pub followed_by: Option<String>, 24 - // #[serde(skip_serializing_if = "Option::is_none")] 25 - // pub known_followers: Option<()>, 26 - // #[serde(skip_serializing_if = "Option::is_none")] 27 - // pub activity_subscriptions: Option<()>, 28 - } 29 30 #[derive(Clone, Default, Debug, Serialize)] 31 #[serde(rename_all = "camelCase")] ··· 152 pub avatar: Option<String>, 153 #[serde(skip_serializing_if = "Option::is_none")] 154 pub associated: Option<ProfileAssociated>, 155 - #[serde(skip_serializing_if = "Option::is_none")] 156 - pub viewer: Option<ProfileViewerState>, 157 #[serde(skip_serializing_if = "Vec::is_empty")] 158 pub labels: Vec<Label>, 159 #[serde(skip_serializing_if = "Option::is_none")] ··· 178 pub avatar: Option<String>, 179 #[serde(skip_serializing_if = "Option::is_none")] 180 pub associated: Option<ProfileAssociated>, 181 - #[serde(skip_serializing_if = "Option::is_none")] 182 - pub viewer: Option<ProfileViewerState>, 183 #[serde(skip_serializing_if = "Vec::is_empty")] 184 pub labels: Vec<Label>, 185 #[serde(skip_serializing_if = "Option::is_none")] ··· 211 pub associated: Option<ProfileAssociated>, 212 // #[serde(skip_serializing_if = "Option::is_none")] 213 // pub joined_via_starter_pack: Option<()>, 214 - #[serde(skip_serializing_if = "Option::is_none")] 215 - pub viewer: Option<ProfileViewerState>, 216 #[serde(skip_serializing_if = "Vec::is_empty")] 217 pub labels: Vec<Label>, 218 // #[serde(skip_serializing_if = "Option::is_none")]
··· 1 use crate::app_bsky::embed::External; 2 use crate::com_atproto::label::Label; 3 use chrono::prelude::*; 4 use serde::{Deserialize, Serialize}; 5 use std::fmt::Display; 6 use std::str::FromStr; 7 8 #[derive(Clone, Default, Debug, Serialize)] 9 #[serde(rename_all = "camelCase")] ··· 130 pub avatar: Option<String>, 131 #[serde(skip_serializing_if = "Option::is_none")] 132 pub associated: Option<ProfileAssociated>, 133 + // #[serde(skip_serializing_if = "Option::is_none")] 134 + // pub viewer: Option<()>, 135 #[serde(skip_serializing_if = "Vec::is_empty")] 136 pub labels: Vec<Label>, 137 #[serde(skip_serializing_if = "Option::is_none")] ··· 156 pub avatar: Option<String>, 157 #[serde(skip_serializing_if = "Option::is_none")] 158 pub associated: Option<ProfileAssociated>, 159 + // #[serde(skip_serializing_if = "Option::is_none")] 160 + // pub viewer: Option<()>, 161 #[serde(skip_serializing_if = "Vec::is_empty")] 162 pub labels: Vec<Label>, 163 #[serde(skip_serializing_if = "Option::is_none")] ··· 189 pub associated: Option<ProfileAssociated>, 190 // #[serde(skip_serializing_if = "Option::is_none")] 191 // pub joined_via_starter_pack: Option<()>, 192 + // #[serde(skip_serializing_if = "Option::is_none")] 193 + // pub viewer: Option<()>, 194 #[serde(skip_serializing_if = "Vec::is_empty")] 195 pub labels: Vec<Label>, 196 // #[serde(skip_serializing_if = "Option::is_none")]
+32
lexica/src/app_bsky/bookmark.rs
···
··· 1 + use crate::app_bsky::feed::{BlockedAuthor, PostView}; 2 + use crate::StrongRef; 3 + use chrono::prelude::*; 4 + use serde::Serialize; 5 + 6 + #[derive(Clone, Debug, Serialize)] 7 + #[serde(rename_all = "camelCase")] 8 + pub struct BookmarkView { 9 + pub subject: StrongRef, 10 + pub item: BookmarkViewItem, 11 + pub created_at: DateTime<Utc>, 12 + } 13 + 14 + #[derive(Clone, Debug, Serialize)] 15 + #[serde(tag = "$type")] 16 + // This is technically the same as ReplyRefPost atm, but just in case... 17 + pub enum BookmarkViewItem { 18 + #[serde(rename = "app.bsky.feed.defs#postView")] 19 + Post(PostView), 20 + #[serde(rename = "app.bsky.feed.defs#notFoundPost")] 21 + NotFound { 22 + uri: String, 23 + #[serde(rename = "notFound")] 24 + not_found: bool, 25 + }, 26 + #[serde(rename = "app.bsky.feed.defs#blockedPost")] 27 + Blocked { 28 + uri: String, 29 + blocked: bool, 30 + author: BlockedAuthor, 31 + }, 32 + }
+9 -24
lexica/src/app_bsky/feed.rs
··· 1 use super::RecordStats; 2 - use crate::app_bsky::actor::{ProfileView, ProfileViewBasic, ProfileViewerState}; 3 use crate::app_bsky::embed::Embed; 4 use crate::app_bsky::graph::ListViewBasic; 5 use crate::app_bsky::richtext::FacetMain; ··· 8 use serde::{Deserialize, Serialize}; 9 use std::str::FromStr; 10 11 - #[derive(Clone, Default, Debug, Serialize)] 12 - #[serde(rename_all = "camelCase")] 13 - pub struct PostViewerState { 14 - pub repost: Option<String>, 15 - pub like: Option<String>, 16 - pub thread_muted: bool, 17 - pub reply_disabled: bool, 18 - pub embedding_disabled: bool, 19 - pub pinned: bool, 20 - } 21 - 22 #[derive(Clone, Debug, Serialize)] 23 #[serde(rename_all = "camelCase")] 24 pub struct PostView { ··· 34 35 #[serde(skip_serializing_if = "Vec::is_empty")] 36 pub labels: Vec<Label>, 37 - #[serde(skip_serializing_if = "Option::is_none")] 38 - pub viewer: Option<PostViewerState>, 39 #[serde(skip_serializing_if = "Option::is_none")] 40 pub threadgate: Option<ThreadgateView>, 41 ··· 135 #[derive(Clone, Debug, Serialize)] 136 pub struct BlockedAuthor { 137 pub uri: String, 138 - pub viewer: Option<ProfileViewerState>, 139 - } 140 - 141 - #[derive(Clone, Default, Debug, Serialize)] 142 - #[serde(rename_all = "camelCase")] 143 - pub struct GeneratorViewerState { 144 - pub like: Option<String>, 145 } 146 147 #[derive(Clone, Debug, Serialize)] ··· 165 pub accepts_interactions: bool, 166 #[serde(skip_serializing_if = "Vec::is_empty")] 167 pub labels: Vec<Label>, 168 - #[serde(skip_serializing_if = "Option::is_none")] 169 - pub viewer: Option<GeneratorViewerState>, 170 #[serde(skip_serializing_if = "Option::is_none")] 171 pub content_mode: Option<GeneratorContentMode>, 172 ··· 236 #[serde(rename = "app.bsky.feed.defs#skeletonReasonPin")] 237 Pin {}, 238 #[serde(rename = "app.bsky.feed.defs#skeletonReasonRepost")] 239 - Repost { repost: String }, 240 }
··· 1 use super::RecordStats; 2 + use crate::app_bsky::actor::{ProfileView, ProfileViewBasic}; 3 use crate::app_bsky::embed::Embed; 4 use crate::app_bsky::graph::ListViewBasic; 5 use crate::app_bsky::richtext::FacetMain; ··· 8 use serde::{Deserialize, Serialize}; 9 use std::str::FromStr; 10 11 #[derive(Clone, Debug, Serialize)] 12 #[serde(rename_all = "camelCase")] 13 pub struct PostView { ··· 23 24 #[serde(skip_serializing_if = "Vec::is_empty")] 25 pub labels: Vec<Label>, 26 + // #[serde(skip_serializing_if = "Option::is_none")] 27 + // pub viewer: Option<()>, 28 #[serde(skip_serializing_if = "Option::is_none")] 29 pub threadgate: Option<ThreadgateView>, 30 ··· 124 #[derive(Clone, Debug, Serialize)] 125 pub struct BlockedAuthor { 126 pub uri: String, 127 + // pub viewer: Option<()>, 128 } 129 130 #[derive(Clone, Debug, Serialize)] ··· 148 pub accepts_interactions: bool, 149 #[serde(skip_serializing_if = "Vec::is_empty")] 150 pub labels: Vec<Label>, 151 + // #[serde(skip_serializing_if = "Option::is_none")] 152 + // pub viewer: Option<()>, 153 #[serde(skip_serializing_if = "Option::is_none")] 154 pub content_mode: Option<GeneratorContentMode>, 155 ··· 219 #[serde(rename = "app.bsky.feed.defs#skeletonReasonPin")] 220 Pin {}, 221 #[serde(rename = "app.bsky.feed.defs#skeletonReasonRepost")] 222 + Repost { 223 + repost: String, 224 + }, 225 }
+4 -11
lexica/src/app_bsky/graph.rs
··· 6 use serde::{Deserialize, Serialize}; 7 use std::str::FromStr; 8 9 - #[derive(Clone, Default, Debug, Serialize)] 10 - #[serde(rename_all = "camelCase")] 11 - pub struct ListViewerState { 12 - pub muted: bool, 13 - pub blocked: Option<String>, 14 - } 15 - 16 #[derive(Clone, Debug, Serialize)] 17 #[serde(rename_all = "camelCase")] 18 pub struct ListViewBasic { ··· 25 pub avatar: Option<String>, 26 pub list_item_count: i64, 27 28 - #[serde(skip_serializing_if = "Option::is_none")] 29 - pub viewer: Option<ListViewerState>, 30 #[serde(skip_serializing_if = "Vec::is_empty")] 31 pub labels: Vec<Label>, 32 ··· 51 pub avatar: Option<String>, 52 pub list_item_count: i64, 53 54 - #[serde(skip_serializing_if = "Option::is_none")] 55 - pub viewer: Option<ListViewerState>, 56 #[serde(skip_serializing_if = "Vec::is_empty")] 57 pub labels: Vec<Label>, 58
··· 6 use serde::{Deserialize, Serialize}; 7 use std::str::FromStr; 8 9 #[derive(Clone, Debug, Serialize)] 10 #[serde(rename_all = "camelCase")] 11 pub struct ListViewBasic { ··· 18 pub avatar: Option<String>, 19 pub list_item_count: i64, 20 21 + // #[serde(skip_serializing_if = "Option::is_none")] 22 + // pub viewer: Option<()>, 23 #[serde(skip_serializing_if = "Vec::is_empty")] 24 pub labels: Vec<Label>, 25 ··· 44 pub avatar: Option<String>, 45 pub list_item_count: i64, 46 47 + // #[serde(skip_serializing_if = "Option::is_none")] 48 + // pub viewer: Option<()>, 49 #[serde(skip_serializing_if = "Vec::is_empty")] 50 pub labels: Vec<Label>, 51
+4 -10
lexica/src/app_bsky/labeler.rs
··· 4 use chrono::prelude::*; 5 use serde::{Deserialize, Serialize}; 6 7 - #[derive(Clone, Default, Debug, Serialize)] 8 - #[serde(rename_all = "camelCase")] 9 - pub struct LabelerViewerState { 10 - pub like: bool, 11 - } 12 - 13 #[derive(Clone, Debug, Serialize)] 14 #[serde(rename_all = "camelCase")] 15 pub struct LabelerView { ··· 18 pub creator: ProfileView, 19 20 pub like_count: i64, 21 - #[serde(skip_serializing_if = "Option::is_none")] 22 - pub viewer: Option<LabelerViewerState>, 23 #[serde(skip_serializing_if = "Vec::is_empty")] 24 pub labels: Vec<Label>, 25 pub indexed_at: DateTime<Utc>, ··· 33 pub creator: ProfileView, 34 35 pub like_count: i64, 36 - #[serde(skip_serializing_if = "Option::is_none")] 37 - pub viewer: Option<LabelerViewerState>, 38 #[serde(skip_serializing_if = "Vec::is_empty")] 39 pub labels: Vec<Label>, 40 pub policies: LabelerPolicy,
··· 4 use chrono::prelude::*; 5 use serde::{Deserialize, Serialize}; 6 7 #[derive(Clone, Debug, Serialize)] 8 #[serde(rename_all = "camelCase")] 9 pub struct LabelerView { ··· 12 pub creator: ProfileView, 13 14 pub like_count: i64, 15 + // #[serde(skip_serializing_if = "Option::is_none")] 16 + // pub viewer: Option<()>, 17 #[serde(skip_serializing_if = "Vec::is_empty")] 18 pub labels: Vec<Label>, 19 pub indexed_at: DateTime<Utc>, ··· 27 pub creator: ProfileView, 28 29 pub like_count: i64, 30 + // #[serde(skip_serializing_if = "Option::is_none")] 31 + // pub viewer: Option<()>, 32 #[serde(skip_serializing_if = "Vec::is_empty")] 33 pub labels: Vec<Label>, 34 pub policies: LabelerPolicy,
+1
lexica/src/app_bsky/mod.rs
··· 1 use serde::Serialize; 2 3 pub mod actor; 4 pub mod embed; 5 pub mod feed; 6 pub mod graph;
··· 1 use serde::Serialize; 2 3 pub mod actor; 4 + pub mod bookmark; 5 pub mod embed; 6 pub mod feed; 7 pub mod graph;
+14
lexica/src/community_lexicon/bookmarks.rs
···
··· 1 + use chrono::prelude::*; 2 + use serde::{Deserialize, Serialize}; 3 + 4 + #[derive(Clone, Debug, Deserialize, Serialize)] 5 + #[serde(tag = "$type")] 6 + #[serde(rename = "community.lexicon.bookmarks.bookmark")] 7 + #[serde(rename_all = "camelCase")] 8 + pub struct Bookmark { 9 + pub subject: String, 10 + #[serde(default)] 11 + #[serde(skip_serializing_if = "Vec::is_empty")] 12 + pub tags: Vec<String>, 13 + pub created_at: DateTime<Utc>, 14 + }
+1
lexica/src/community_lexicon/mod.rs
···
··· 1 + pub mod bookmarks;
+8
lexica/src/lib.rs
··· 5 6 pub mod app_bsky; 7 pub mod com_atproto; 8 mod utils; 9 10 #[derive(Clone, Debug, Serialize)] ··· 21 )] 22 pub cid: Cid, 23 pub uri: String, 24 } 25 26 #[derive(Clone, Debug, Deserialize, Serialize)]
··· 5 6 pub mod app_bsky; 7 pub mod com_atproto; 8 + pub mod community_lexicon; 9 mod utils; 10 11 #[derive(Clone, Debug, Serialize)] ··· 22 )] 23 pub cid: Cid, 24 pub uri: String, 25 + } 26 + 27 + impl StrongRef { 28 + pub fn new_from_str(uri: String, cid: &str) -> Result<Self, cid::Error> { 29 + let cid = cid.parse()?; 30 + Ok(StrongRef { uri, cid }) 31 + } 32 } 33 34 #[derive(Clone, Debug, Deserialize, Serialize)]
+1
migrations/2025-09-02-190833_bookmarks/down.sql
···
··· 1 + drop table bookmarks;
+19
migrations/2025-09-02-190833_bookmarks/up.sql
···
··· 1 + create table bookmarks 2 + ( 3 + did text not null references actors (did), 4 + rkey text, 5 + subject text not null, 6 + subject_cid text, 7 + subject_type text not null, 8 + tags text[] not null default ARRAY []::text[], 9 + 10 + created_at timestamptz not null default now(), 11 + 12 + primary key (did, subject) 13 + ); 14 + 15 + create index bookmarks_rkey_index on bookmarks (rkey); 16 + create index bookmarks_subject_index on bookmarks (subject); 17 + create index bookmarks_subject_type_index on bookmarks (subject_type); 18 + create index bookmarks_tags_index on bookmarks using gin (tags); 19 + create unique index bookmarks_rkey_ui on bookmarks (did, rkey);
-1
parakeet/src/hydration/feedgen.rs
··· 35 like_count: likes.unwrap_or_default() as i64, 36 accepts_interactions: feedgen.accepts_interactions, 37 labels: map_labels(labels), 38 - viewer: None, 39 content_mode, 40 indexed_at: feedgen.created_at, 41 }
··· 35 like_count: likes.unwrap_or_default() as i64, 36 accepts_interactions: feedgen.accepts_interactions, 37 labels: map_labels(labels), 38 content_mode, 39 indexed_at: feedgen.created_at, 40 }
-2
parakeet/src/hydration/labeler.rs
··· 18 cid: labeler.cid, 19 creator, 20 like_count: likes.unwrap_or_default() as i64, 21 - viewer: None, 22 labels: map_labels(labels), 23 indexed_at: labeler.indexed_at.and_utc(), 24 } ··· 78 cid: labeler.cid, 79 creator, 80 like_count: likes.unwrap_or_default() as i64, 81 - viewer: None, 82 policies: LabelerPolicy { 83 label_values, 84 label_value_definitions,
··· 18 cid: labeler.cid, 19 creator, 20 like_count: likes.unwrap_or_default() as i64, 21 labels: map_labels(labels), 22 indexed_at: labeler.indexed_at.and_utc(), 23 } ··· 77 cid: labeler.cid, 78 creator, 79 like_count: likes.unwrap_or_default() as i64, 80 policies: LabelerPolicy { 81 label_values, 82 label_value_definitions,
-2
parakeet/src/hydration/list.rs
··· 22 purpose, 23 avatar, 24 list_item_count, 25 - viewer: None, 26 labels: map_labels(labels), 27 indexed_at: list.created_at, 28 }) ··· 52 description_facets, 53 avatar, 54 list_item_count, 55 - viewer: None, 56 labels: map_labels(labels), 57 indexed_at: list.created_at, 58 })
··· 22 purpose, 23 avatar, 24 list_item_count, 25 labels: map_labels(labels), 26 indexed_at: list.created_at, 27 }) ··· 51 description_facets, 52 avatar, 53 list_item_count, 54 labels: map_labels(labels), 55 indexed_at: list.created_at, 56 })
-1
parakeet/src/hydration/posts.rs
··· 33 embed, 34 stats, 35 labels: map_labels(labels), 36 - viewer: None, 37 threadgate, 38 indexed_at: post.created_at, 39 }
··· 33 embed, 34 stats, 35 labels: map_labels(labels), 36 threadgate, 37 indexed_at: post.created_at, 38 }
-3
parakeet/src/hydration/profile.rs
··· 169 display_name: profile.display_name, 170 avatar, 171 associated, 172 - viewer: None, 173 labels: map_labels(labels), 174 verification, 175 status, ··· 196 description: profile.description, 197 avatar, 198 associated, 199 - viewer: None, 200 labels: map_labels(labels), 201 verification, 202 status, ··· 228 followers_count: stats.map(|v| v.followers as i64).unwrap_or_default(), 229 follows_count: stats.map(|v| v.following as i64).unwrap_or_default(), 230 associated, 231 - viewer: None, 232 labels: map_labels(labels), 233 verification, 234 status,
··· 169 display_name: profile.display_name, 170 avatar, 171 associated, 172 labels: map_labels(labels), 173 verification, 174 status, ··· 195 description: profile.description, 196 avatar, 197 associated, 198 labels: map_labels(labels), 199 verification, 200 status, ··· 226 followers_count: stats.map(|v| v.followers as i64).unwrap_or_default(), 227 follows_count: stats.map(|v| v.following as i64).unwrap_or_default(), 228 associated, 229 labels: map_labels(labels), 230 verification, 231 status,
+146
parakeet/src/xrpc/app_bsky/bookmark.rs
···
··· 1 + use crate::hydration::StatefulHydrator; 2 + use crate::xrpc::error::XrpcResult; 3 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 4 + use crate::xrpc::{datetime_cursor, CursorQuery}; 5 + use crate::GlobalState; 6 + use axum::extract::{Query, State}; 7 + use axum::Json; 8 + use diesel::prelude::*; 9 + use diesel_async::RunQueryDsl; 10 + use lexica::app_bsky::bookmark::{BookmarkView, BookmarkViewItem}; 11 + use parakeet_db::{models, schema}; 12 + use serde::{Deserialize, Serialize}; 13 + use lexica::StrongRef; 14 + 15 + const BSKY_ALLOWED_TYPES: &[&str] = &["app.bsky.feed.post"]; 16 + 17 + #[derive(Debug, Deserialize)] 18 + pub struct CreateBookmarkReq { 19 + pub uri: String, 20 + pub cid: String, 21 + } 22 + 23 + pub async fn create_bookmark( 24 + State(state): State<GlobalState>, 25 + auth: AtpAuth, 26 + Json(form): Json<CreateBookmarkReq>, 27 + ) -> XrpcResult<()> { 28 + let mut conn = state.pool.get().await?; 29 + 30 + // strip "at://" then break into parts by '/' 31 + let parts = form.uri[5..].split('/').collect::<Vec<_>>(); 32 + 33 + let data = models::NewBookmark { 34 + did: &auth.0, 35 + rkey: None, 36 + subject: &form.uri, 37 + subject_cid: Some(form.cid), 38 + subject_type: &parts[1], 39 + tags: vec![], 40 + }; 41 + 42 + diesel::insert_into(schema::bookmarks::table) 43 + .values(&data) 44 + .on_conflict_do_nothing() 45 + .execute(&mut conn) 46 + .await?; 47 + 48 + Ok(()) 49 + } 50 + 51 + #[derive(Debug, Deserialize)] 52 + pub struct DeleteBookmarkReq { 53 + pub uri: String, 54 + } 55 + 56 + pub async fn delete_bookmark( 57 + State(state): State<GlobalState>, 58 + auth: AtpAuth, 59 + Json(form): Json<DeleteBookmarkReq>, 60 + ) -> XrpcResult<()> { 61 + let mut conn = state.pool.get().await?; 62 + 63 + diesel::delete(schema::bookmarks::table) 64 + .filter( 65 + schema::bookmarks::did 66 + .eq(&auth.0) 67 + .and(schema::bookmarks::subject.eq(&form.uri)), 68 + ) 69 + .execute(&mut conn) 70 + .await?; 71 + 72 + Ok(()) 73 + } 74 + 75 + #[derive(Debug, Serialize)] 76 + pub struct GetBookmarksRes { 77 + #[serde(skip_serializing_if = "Option::is_none")] 78 + cursor: Option<String>, 79 + bookmarks: Vec<BookmarkView>, 80 + } 81 + 82 + pub async fn get_bookmarks( 83 + State(state): State<GlobalState>, 84 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 85 + auth: AtpAuth, 86 + Query(query): Query<CursorQuery>, 87 + ) -> XrpcResult<Json<GetBookmarksRes>> { 88 + let mut conn = state.pool.get().await?; 89 + let did = auth.0.clone(); 90 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, Some(auth)); 91 + 92 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 93 + 94 + let mut bookmarks_query = schema::bookmarks::table 95 + .select(models::Bookmark::as_select()) 96 + .filter(schema::bookmarks::did.eq(&did)) 97 + .filter(schema::bookmarks::subject_type.eq_any(BSKY_ALLOWED_TYPES)) 98 + .into_boxed(); 99 + 100 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 101 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor)); 102 + } 103 + 104 + let results = bookmarks_query 105 + .order(schema::bookmarks::created_at.desc()) 106 + .limit(limit as i64) 107 + .load(&mut conn) 108 + .await?; 109 + 110 + let cursor = results 111 + .last() 112 + .map(|bm| bm.created_at.timestamp_millis().to_string()); 113 + 114 + let uris = results.iter().map(|bm| bm.subject.clone()).collect(); 115 + 116 + let mut posts = hyd.hydrate_posts(uris).await; 117 + 118 + let bookmarks = results 119 + .into_iter() 120 + .filter_map(|bookmark| { 121 + let maybe_item = posts.remove(&bookmark.subject); 122 + let maybe_cid = maybe_item.as_ref().map(|v| v.cid.clone()); 123 + 124 + // ensure that either the cid is set in the bookmark record *or* in the post record 125 + // otherwise just ditch. we should have one. 126 + let cid = bookmark.subject_cid.or(maybe_cid)?; 127 + 128 + let item = maybe_item.map(BookmarkViewItem::Post).unwrap_or( 129 + BookmarkViewItem::NotFound { 130 + uri: bookmark.subject.clone(), 131 + not_found: true, 132 + }, 133 + ); 134 + 135 + let subject = StrongRef::new_from_str(bookmark.subject, &cid).ok()?; 136 + 137 + Some(BookmarkView { 138 + subject, 139 + item, 140 + created_at: bookmark.created_at, 141 + }) 142 + }) 143 + .collect(); 144 + 145 + Ok(Json(GetBookmarksRes { cursor, bookmarks })) 146 + }
+4
parakeet/src/xrpc/app_bsky/mod.rs
··· 2 use axum::Router; 3 4 mod actor; 5 mod feed; 6 mod graph; 7 mod labeler; ··· 14 // TODO: app.bsky.actor.getSuggestions (recs) 15 // TODO: app.bsky.actor.searchActor (search) 16 // TODO: app.bsky.actor.searchActorTypeahead (search) 17 .route("/app.bsky.feed.getActorFeeds", get(feed::feedgen::get_actor_feeds)) 18 .route("/app.bsky.feed.getActorLikes", get(feed::likes::get_actor_likes)) 19 .route("/app.bsky.feed.getAuthorFeed", get(feed::posts::get_author_feed))
··· 2 use axum::Router; 3 4 mod actor; 5 + mod bookmark; 6 mod feed; 7 mod graph; 8 mod labeler; ··· 15 // TODO: app.bsky.actor.getSuggestions (recs) 16 // TODO: app.bsky.actor.searchActor (search) 17 // TODO: app.bsky.actor.searchActorTypeahead (search) 18 + .route("/app.bsky.bookmark.createBookmark", post(bookmark::create_bookmark)) 19 + .route("/app.bsky.bookmark.deleteBookmark", post(bookmark::delete_bookmark)) 20 + .route("/app.bsky.bookmark.getBookmarks", get(bookmark::get_bookmarks)) 21 .route("/app.bsky.feed.getActorFeeds", get(feed::feedgen::get_actor_feeds)) 22 .route("/app.bsky.feed.getActorLikes", get(feed::likes::get_actor_likes)) 23 .route("/app.bsky.feed.getAuthorFeed", get(feed::posts::get_author_feed))
+69
parakeet/src/xrpc/community_lexicon/bookmarks.rs
···
··· 1 + use crate::xrpc::datetime_cursor; 2 + use crate::xrpc::error::XrpcResult; 3 + use crate::xrpc::extract::AtpAuth; 4 + use crate::GlobalState; 5 + use axum::extract::{Query, State}; 6 + use axum::Json; 7 + use diesel::prelude::*; 8 + use diesel_async::RunQueryDsl; 9 + use lexica::community_lexicon::bookmarks::Bookmark; 10 + use parakeet_db::{models, schema}; 11 + use serde::{Deserialize, Serialize}; 12 + 13 + #[derive(Debug, Deserialize)] 14 + pub struct BookmarkCursorQuery { 15 + pub tags: Option<Vec<String>>, 16 + pub limit: Option<u8>, 17 + pub cursor: Option<String>, 18 + } 19 + 20 + #[derive(Debug, Serialize)] 21 + pub struct GetActorBookmarksRes { 22 + #[serde(skip_serializing_if = "Option::is_none")] 23 + cursor: Option<String>, 24 + bookmarks: Vec<Bookmark>, 25 + } 26 + 27 + pub async fn get_actor_bookmarks( 28 + State(state): State<GlobalState>, 29 + auth: AtpAuth, 30 + Query(query): Query<BookmarkCursorQuery>, 31 + ) -> XrpcResult<Json<GetActorBookmarksRes>> { 32 + let mut conn = state.pool.get().await?; 33 + 34 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 35 + 36 + let mut bookmarks_query = schema::bookmarks::table 37 + .select(models::Bookmark::as_select()) 38 + .filter(schema::bookmarks::did.eq(&auth.0)) 39 + .into_boxed(); 40 + 41 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 42 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor)); 43 + } 44 + 45 + if let Some(tags) = query.tags { 46 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::tags.contains(tags)); 47 + } 48 + 49 + let results = bookmarks_query 50 + .order(schema::bookmarks::created_at.desc()) 51 + .limit(limit as i64) 52 + .load(&mut conn) 53 + .await?; 54 + 55 + let cursor = results 56 + .last() 57 + .map(|bm| bm.created_at.timestamp_millis().to_string()); 58 + 59 + let bookmarks = results 60 + .into_iter() 61 + .map(|bookmark| Bookmark { 62 + subject: bookmark.subject, 63 + tags: bookmark.tags.into_iter().flatten().collect(), 64 + created_at: bookmark.created_at, 65 + }) 66 + .collect(); 67 + 68 + Ok(Json(GetActorBookmarksRes { cursor, bookmarks })) 69 + }
+10
parakeet/src/xrpc/community_lexicon/mod.rs
···
··· 1 + use axum::routing::get; 2 + use axum::Router; 3 + 4 + pub mod bookmarks; 5 + 6 + #[rustfmt::skip] 7 + pub fn routes() -> Router<crate::GlobalState> { 8 + Router::new() 9 + .route("/community.lexicon.bookmarks.getActorBookmarks", get(bookmarks::get_actor_bookmarks)) 10 + }
+2
parakeet/src/xrpc/mod.rs
··· 8 mod app_bsky; 9 pub mod cdn; 10 mod com_atproto; 11 mod error; 12 pub mod extract; 13 pub mod jwt; ··· 16 Router::new() 17 .merge(app_bsky::routes()) 18 .merge(com_atproto::routes()) 19 } 20 21 fn datetime_cursor(cursor: Option<&String>) -> Option<chrono::DateTime<chrono::Utc>> {
··· 8 mod app_bsky; 9 pub mod cdn; 10 mod com_atproto; 11 + mod community_lexicon; 12 mod error; 13 pub mod extract; 14 pub mod jwt; ··· 17 Router::new() 18 .merge(app_bsky::routes()) 19 .merge(com_atproto::routes()) 20 + .merge(community_lexicon::routes()) 21 } 22 23 fn datetime_cursor(cursor: Option<&String>) -> Option<chrono::DateTime<chrono::Utc>> {
+26
parakeet-db/src/models.rs
··· 383 pub did: &'a str, 384 pub list_uri: &'a str, 385 }
··· 383 pub did: &'a str, 384 pub list_uri: &'a str, 385 } 386 + 387 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 388 + #[diesel(table_name = crate::schema::bookmarks)] 389 + #[diesel(primary_key(did, subject, subject_cid))] 390 + #[diesel(check_for_backend(diesel::pg::Pg))] 391 + pub struct Bookmark { 392 + pub did: String, 393 + pub rkey: Option<String>, 394 + pub subject: String, 395 + pub subject_cid: Option<String>, 396 + pub subject_type: String, 397 + pub tags: Vec<Option<String>>, 398 + pub created_at: DateTime<Utc>, 399 + } 400 + 401 + #[derive(Debug, Insertable, AsChangeset)] 402 + #[diesel(table_name = crate::schema::bookmarks)] 403 + #[diesel(check_for_backend(diesel::pg::Pg))] 404 + pub struct NewBookmark<'a> { 405 + pub did: &'a str, 406 + pub rkey: Option<String>, 407 + pub subject: &'a str, 408 + pub subject_cid: Option<String>, 409 + pub subject_type: &'a str, 410 + pub tags: Vec<String>, 411 + }
+14
parakeet-db/src/schema.rs
··· 43 } 44 45 diesel::table! { 46 chat_decls (did) { 47 did -> Text, 48 allow_incoming -> Text, ··· 375 376 diesel::joinable!(backfill -> actors (repo)); 377 diesel::joinable!(blocks -> actors (did)); 378 diesel::joinable!(chat_decls -> actors (did)); 379 diesel::joinable!(feedgens -> actors (owner)); 380 diesel::joinable!(follows -> actors (did)); ··· 405 backfill, 406 backfill_jobs, 407 blocks, 408 chat_decls, 409 feedgens, 410 follows,
··· 43 } 44 45 diesel::table! { 46 + bookmarks (did, subject) { 47 + did -> Text, 48 + rkey -> Nullable<Text>, 49 + subject -> Text, 50 + subject_cid -> Nullable<Text>, 51 + subject_type -> Text, 52 + tags -> Array<Nullable<Text>>, 53 + created_at -> Timestamptz, 54 + } 55 + } 56 + 57 + diesel::table! { 58 chat_decls (did) { 59 did -> Text, 60 allow_incoming -> Text, ··· 387 388 diesel::joinable!(backfill -> actors (repo)); 389 diesel::joinable!(blocks -> actors (did)); 390 + diesel::joinable!(bookmarks -> actors (did)); 391 diesel::joinable!(chat_decls -> actors (did)); 392 diesel::joinable!(feedgens -> actors (owner)); 393 diesel::joinable!(follows -> actors (did)); ··· 418 backfill, 419 backfill_jobs, 420 blocks, 421 + bookmarks, 422 chat_decls, 423 feedgens, 424 follows,
+4 -1
parakeet-index/build.rs
··· 1 fn main() -> Result<(), Box<dyn std::error::Error>> { 2 - tonic_build::configure().compile_protos(&["proto/parakeet.proto"], &[""])?; 3 4 Ok(()) 5 }
··· 1 fn main() -> Result<(), Box<dyn std::error::Error>> { 2 + tonic_build::configure().compile_protos( 3 + &[std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("proto/parakeet.proto")], 4 + &[std::path::Path::new(env!("CARGO_MANIFEST_DIR"))], 5 + )?; 6 7 Ok(()) 8 }