Rust AppView - highly experimental!

Compare changes

Choose any two refs to compare.

Changed files
+1087 -98
consumer
lexica
migrations
2025-09-02-190833_bookmarks
parakeet
parakeet-db
parakeet-index
+1
.envrc
··· 1 + use flake
+1
.gitignore
··· 4 4 .env 5 5 Config.toml 6 6 data/ 7 + .direnv/
+11
README.md
··· 3 3 Parakeet is a [Bluesky](https://bsky.app) [AppView](https://atproto.wiki/en/wiki/reference/core-architecture/appview) 4 4 aiming to implement most of the functionality required to support the Bluesky client. Notably not implemented is a CDN. 5 5 6 + ## Status and Roadmap 7 + Most common functionality works, with notable omissions being like/repost/follow statuses, blocks and mutes don't get 8 + applied, labels might not track CIDs properly, label redaction doesn't work at all (beware!). 9 + 10 + Future work is tracked in issues, but the highlights are below. Help would be highly appreciated. 11 + - Notifications 12 + - Search 13 + - Pinned Posts 14 + - The Timeline 15 + - Monitoring: metrics, tracing, and health checks. 16 + 6 17 ## The Code 7 18 Parakeet is implemented in Rust, using Postgres as a database, Redis for caching and queue processing, RocksDB for 8 19 aggregation, and Diesel for migrations and querying.
+8 -8
consumer/src/backfill/downloader.rs
··· 109 109 Ok(Some(did_doc)) => { 110 110 let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else { 111 111 tracing::warn!("bad DID doc for {did}"); 112 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 112 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did_svc") 113 113 .await 114 114 .unwrap(); 115 115 continue; ··· 132 132 } 133 133 } 134 134 Ok(None) => { 135 - tracing::warn!(did, "bad DID doc"); 135 + tracing::warn!(did, "bad/missing DID doc"); 136 136 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 137 137 .await 138 138 .unwrap(); 139 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 139 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did_doc") 140 140 .await 141 141 .unwrap(); 142 142 } ··· 145 145 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 146 146 .await 147 147 .unwrap(); 148 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 148 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did") 149 149 .await 150 150 .unwrap(); 151 151 } ··· 179 179 Ok(false) => continue, 180 180 Err(e) => { 181 181 tracing::error!(pds, did, "failed to check repo status: {e}"); 182 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 182 + db::backfill_job_write(&mut conn, &did, "failed.resolve.status") 183 183 .await 184 184 .unwrap(); 185 185 continue; ··· 190 190 if let Some(handle) = maybe_handle { 191 191 if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await { 192 192 tracing::error!(pds, did, "failed to resolve handle: {e}"); 193 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 193 + db::backfill_job_write(&mut conn, &did, "failed.resolve.handle") 194 194 .await 195 195 .unwrap(); 196 196 } ··· 253 253 pds: &str, 254 254 did: &str, 255 255 ) -> eyre::Result<Option<(i32, i32)>> { 256 - let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?; 257 - 258 256 let res = http 259 257 .get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={did}")) 260 258 .send() 261 259 .await? 262 260 .error_for_status()?; 261 + 262 + let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?; 263 263 264 264 let headers = res.headers(); 265 265 let ratelimit_rem = header_to_int(headers, "ratelimit-remaining");
+1 -1
consumer/src/backfill/mod.rs
··· 131 131 } 132 132 } 133 133 134 - #[instrument(skip(conn, inner))] 134 + #[instrument(skip(conn, rc, inner))] 135 135 async fn backfill_actor( 136 136 conn: &mut Object, 137 137 rc: &mut MultiplexedConnection,
+5 -2
consumer/src/backfill/repo.rs
··· 1 1 use super::{ 2 - types::{CarCommitEntry, CarEntry}, 2 + types::{CarCommitEntry, CarEntry, CarRecordEntry}, 3 3 CopyStore, 4 4 }; 5 5 use crate::indexer::records; ··· 54 54 CarEntry::Commit(_) => { 55 55 tracing::warn!("got commit entry that was not in root") 56 56 } 57 - CarEntry::Record(record) => { 57 + CarEntry::Record(CarRecordEntry::Known(record)) => { 58 58 if let Some(path) = mst_nodes.remove(&cid) { 59 59 record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 60 60 } else { 61 61 records.insert(cid, record); 62 62 } 63 + } 64 + CarEntry::Record(CarRecordEntry::Other { ty }) => { 65 + tracing::debug!("repo contains unknown record type: {ty} ({cid})"); 63 66 } 64 67 CarEntry::Mst(mst) => { 65 68 let mut out = Vec::with_capacity(mst.e.len());
+11 -1
consumer/src/backfill/types.rs
··· 8 8 pub enum CarEntry { 9 9 Mst(CarMstEntry), 10 10 Commit(CarCommitEntry), 11 - Record(RecordTypes), 11 + Record(CarRecordEntry), 12 12 } 13 13 14 14 #[derive(Debug, Deserialize)] ··· 33 33 pub rev: String, 34 34 pub prev: Option<Cid>, 35 35 pub sig: ByteBuf, 36 + } 37 + 38 + #[derive(Debug, Deserialize)] 39 + #[serde(untagged)] 40 + pub enum CarRecordEntry { 41 + Known(RecordTypes), 42 + Other { 43 + #[serde(rename = "$type")] 44 + ty: String, 45 + }, 36 46 } 37 47 38 48 #[derive(Debug, Deserialize)]
+1 -1
consumer/src/db/backfill.rs
··· 19 19 status: &str, 20 20 ) -> PgExecResult { 21 21 conn.execute( 22 - "INSERT INTO backfill_jobs (did, status) VALUES ($1, $2)", 22 + "INSERT INTO backfill_jobs (did, status) VALUES ($1, $2) ON CONFLICT (did) DO UPDATE SET status = $2, updated_at = NOW()", 23 23 &[&did, &status], 24 24 ) 25 25 .await
+32
consumer/src/db/record.rs
··· 4 4 use chrono::prelude::*; 5 5 use deadpool_postgres::GenericClient; 6 6 use ipld_core::cid::Cid; 7 + use lexica::community_lexicon::bookmarks::Bookmark; 7 8 8 9 pub async fn record_upsert<C: GenericClient>( 9 10 conn: &mut C, ··· 20 21 pub async fn record_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { 21 22 conn.execute("DELETE FROM records WHERE at_uri=$1", &[&at_uri]) 22 23 .await 24 + } 25 + 26 + pub async fn bookmark_upsert<C: GenericClient>( 27 + conn: &mut C, 28 + rkey: &str, 29 + repo: &str, 30 + rec: Bookmark, 31 + ) -> PgExecResult { 32 + // strip "at://" then break into parts by '/' 33 + let rec_type = match rec.subject.strip_prefix("at://") { 34 + Some(at_uri) => at_uri.split('/').collect::<Vec<_>>()[1], 35 + None => "$uri", 36 + }; 37 + 38 + conn.execute( 39 + include_str!("sql/bookmarks_upsert.sql"), 40 + &[&repo, &rkey, &rec.subject, &rec_type, &rec.tags, &rec.created_at], 41 + ) 42 + .await 43 + } 44 + 45 + pub async fn bookmark_delete<C: GenericClient>( 46 + conn: &mut C, 47 + rkey: &str, 48 + repo: &str, 49 + ) -> PgExecResult { 50 + conn.execute( 51 + "DELETE FROM bookmarks WHERE rkey=$1 AND did=$2", 52 + &[&rkey, &repo], 53 + ) 54 + .await 23 55 } 24 56 25 57 pub async fn block_insert<C: GenericClient>(
+5
consumer/src/db/sql/bookmarks_upsert.sql
··· 1 + INSERT INTO bookmarks (did, rkey, subject, subject_type, tags, created_at) 2 + VALUES ($1, $2, $3, $4, $5, $6) 3 + ON CONFLICT (did, rkey) DO UPDATE SET subject=EXCLUDED.subject, 4 + subject_type=EXCLUDED.subject_type, 5 + tags=EXCLUDED.tags
+15
consumer/src/firehose/mod.rs
··· 117 117 118 118 FirehoseEvent::Label(event) 119 119 } 120 + "#sync" => { 121 + counter!("firehose_events.total", "event" => "sync").increment(1); 122 + let event: AtpSyncEvent = 123 + serde_ipld_dagcbor::from_reader(&mut reader)?; 124 + 125 + // increment the seq 126 + if self.seq < event.seq { 127 + self.seq = event.seq; 128 + } else { 129 + tracing::error!("Event sequence was not greater than previous seq, exiting. {} <= {}", event.seq, self.seq); 130 + return Ok(FirehoseOutput::Close); 131 + } 132 + 133 + FirehoseEvent::Sync(event) 134 + } 120 135 _ => { 121 136 tracing::warn!("unknown event type {ty}"); 122 137 return Ok(FirehoseOutput::Continue);
+23
consumer/src/firehose/types.rs
··· 31 31 Account(AtpAccountEvent), 32 32 Commit(AtpCommitEvent), 33 33 Label(AtpLabelEvent), 34 + Sync(AtpSyncEvent), 34 35 } 35 36 36 37 #[derive(Debug, Deserialize)] ··· 48 49 Suspended, 49 50 Deleted, 50 51 Deactivated, 52 + Throttled, 53 + Desynchronized, 51 54 } 52 55 53 56 impl AtpAccountStatus { ··· 57 60 AtpAccountStatus::Suspended => "suspended", 58 61 AtpAccountStatus::Deleted => "deleted", 59 62 AtpAccountStatus::Deactivated => "deactivated", 63 + AtpAccountStatus::Throttled => "throttled", 64 + AtpAccountStatus::Desynchronized => "desynchronized", 60 65 } 61 66 } 62 67 } ··· 68 73 AtpAccountStatus::Suspended => parakeet_db::types::ActorStatus::Suspended, 69 74 AtpAccountStatus::Deleted => parakeet_db::types::ActorStatus::Deleted, 70 75 AtpAccountStatus::Deactivated => parakeet_db::types::ActorStatus::Deactivated, 76 + AtpAccountStatus::Throttled | AtpAccountStatus::Desynchronized => { 77 + parakeet_db::types::ActorStatus::Active 78 + } 71 79 } 72 80 } 73 81 } ··· 90 98 pub since: Option<String>, 91 99 pub commit: Cid, 92 100 #[serde(rename = "tooBig")] 101 + #[deprecated] 93 102 pub too_big: bool, 94 103 #[serde(default)] 95 104 pub blocks: ByteBuf, 96 105 #[serde(default)] 97 106 pub ops: Vec<CommitOp>, 98 107 #[serde(default)] 108 + #[deprecated] 99 109 pub blobs: Vec<Cid>, 110 + #[serde(rename = "prevData")] 111 + pub prev_data: Option<Cid>, 100 112 } 101 113 102 114 #[derive(Debug, Deserialize)] 103 115 pub struct CommitOp { 104 116 pub action: String, 105 117 pub cid: Option<Cid>, 118 + pub prev: Option<Cid>, 106 119 pub path: String, 107 120 } 108 121 ··· 124 137 pub seq: u64, 125 138 pub labels: Vec<AtpLabel>, 126 139 } 140 + 141 + #[derive(Debug, Deserialize)] 142 + pub struct AtpSyncEvent { 143 + pub seq: u64, 144 + pub did: String, 145 + pub time: DateTime<Utc>, 146 + pub rev: String, 147 + #[serde(default)] 148 + pub blocks: ByteBuf, 149 + }
+32 -2
consumer/src/indexer/mod.rs
··· 1 1 use crate::config::HistoryMode; 2 2 use crate::db; 3 3 use crate::firehose::{ 4 - AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, CommitOp, FirehoseConsumer, FirehoseEvent, 5 - FirehoseOutput, 4 + AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, AtpSyncEvent, CommitOp, FirehoseConsumer, 5 + FirehoseEvent, FirehoseOutput, 6 6 }; 7 7 use crate::indexer::types::{ 8 8 AggregateDeltaStore, BackfillItem, BackfillItemInner, CollectionType, RecordTypes, ··· 107 107 FirehoseEvent::Commit(commit) => { 108 108 index_commit(&mut state, &mut conn, &mut rc, commit).await 109 109 } 110 + FirehoseEvent::Sync(sync) => { 111 + process_sync(&state, &mut conn, &mut rc, sync).await 112 + } 110 113 FirehoseEvent::Label(_) => unreachable!(), 111 114 }; 112 115 ··· 188 191 FirehoseEvent::Identity(identity) => self.hasher.hash_one(&identity.did) % threads, 189 192 FirehoseEvent::Account(account) => self.hasher.hash_one(&account.did) % threads, 190 193 FirehoseEvent::Commit(commit) => self.hasher.hash_one(&commit.repo) % threads, 194 + FirehoseEvent::Sync(sync) => self.hasher.hash_one(&sync.did) % threads, 191 195 FirehoseEvent::Label(_) => { 192 196 // We handle all labels through direct connections to labelers 193 197 tracing::warn!("got #labels from the relay"); ··· 199 203 tracing::error!("Error sending event: {e}"); 200 204 } 201 205 } 206 + } 207 + 208 + #[instrument(skip_all, fields(seq = sync.seq, repo = sync.did))] 209 + async fn process_sync( 210 + state: &RelayIndexerState, 211 + conn: &mut Object, 212 + rc: &mut MultiplexedConnection, 213 + sync: AtpSyncEvent, 214 + ) -> eyre::Result<()> { 215 + let Some((sync_state, Some(current_rev))) = db::actor_get_repo_status(conn, &sync.did).await? else { 216 + return Ok(()); 217 + }; 218 + 219 + // don't care if we're not synced. also no point if !do_backfill bc we might not have a worker 220 + if sync_state == ActorSyncState::Synced && state.do_backfill && sync.rev > current_rev { 221 + tracing::debug!("triggering backfill due to #sync"); 222 + rc.rpush::<_, _, i32>("backfill_queue", sync.did).await?; 223 + } 224 + 225 + Ok(()) 202 226 } 203 227 204 228 #[instrument(skip_all, fields(seq = identity.seq, repo = identity.did))] ··· 723 747 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 724 748 } 725 749 } 750 + RecordTypes::CommunityLexiconBookmark(record) => { 751 + db::bookmark_upsert(conn, rkey, repo, record).await?; 752 + } 726 753 } 727 754 728 755 db::record_upsert(conn, at_uri, repo, cid).await?; ··· 832 859 CollectionType::ChatActorDecl => { 833 860 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 834 861 db::chat_decl_delete(conn, repo).await? 862 + } 863 + CollectionType::CommunityLexiconBookmark => { 864 + db::bookmark_delete(conn, rkey, repo).await? 835 865 } 836 866 _ => unreachable!(), 837 867 };
+5
consumer/src/indexer/types.rs
··· 41 41 AppBskyNotificationDeclaration(records::AppBskyNotificationDeclaration), 42 42 #[serde(rename = "chat.bsky.actor.declaration")] 43 43 ChatBskyActorDeclaration(records::ChatBskyActorDeclaration), 44 + #[serde(rename = "community.lexicon.bookmarks.bookmark")] 45 + CommunityLexiconBookmark(lexica::community_lexicon::bookmarks::Bookmark) 44 46 } 45 47 46 48 #[derive(Debug, PartialOrd, PartialEq, Deserialize, Serialize)] ··· 63 65 BskyLabelerService, 64 66 BskyNotificationDeclaration, 65 67 ChatActorDecl, 68 + CommunityLexiconBookmark, 66 69 Unsupported, 67 70 } 68 71 ··· 87 90 "app.bsky.labeler.service" => CollectionType::BskyLabelerService, 88 91 "app.bsky.notification.declaration" => CollectionType::BskyNotificationDeclaration, 89 92 "chat.bsky.actor.declaration" => CollectionType::ChatActorDecl, 93 + "community.lexicon.bookmarks.bookmark" => CollectionType::CommunityLexiconBookmark, 90 94 _ => CollectionType::Unsupported, 91 95 } 92 96 } ··· 111 115 CollectionType::BskyVerification => false, 112 116 CollectionType::BskyLabelerService => true, 113 117 CollectionType::BskyNotificationDeclaration => true, 118 + CollectionType::CommunityLexiconBookmark => true, 114 119 CollectionType::Unsupported => false, 115 120 } 116 121 }
+98
flake.lock
··· 1 + { 2 + "nodes": { 3 + "crane": { 4 + "locked": { 5 + "lastModified": 1757183466, 6 + "narHash": "sha256-kTdCCMuRE+/HNHES5JYsbRHmgtr+l9mOtf5dpcMppVc=", 7 + "owner": "ipetkov", 8 + "repo": "crane", 9 + "rev": "d599ae4847e7f87603e7082d73ca673aa93c916d", 10 + "type": "github" 11 + }, 12 + "original": { 13 + "owner": "ipetkov", 14 + "repo": "crane", 15 + "type": "github" 16 + } 17 + }, 18 + "flake-utils": { 19 + "inputs": { 20 + "systems": "systems" 21 + }, 22 + "locked": { 23 + "lastModified": 1731533236, 24 + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", 25 + "owner": "numtide", 26 + "repo": "flake-utils", 27 + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", 28 + "type": "github" 29 + }, 30 + "original": { 31 + "owner": "numtide", 32 + "repo": "flake-utils", 33 + "type": "github" 34 + } 35 + }, 36 + "nixpkgs": { 37 + "locked": { 38 + "lastModified": 1758029226, 39 + "narHash": "sha256-TjqVmbpoCqWywY9xIZLTf6ANFvDCXdctCjoYuYPYdMI=", 40 + "owner": "NixOS", 41 + "repo": "nixpkgs", 42 + "rev": "08b8f92ac6354983f5382124fef6006cade4a1c1", 43 + "type": "github" 44 + }, 45 + "original": { 46 + "owner": "NixOS", 47 + "ref": "nixpkgs-unstable", 48 + "repo": "nixpkgs", 49 + "type": "github" 50 + } 51 + }, 52 + "root": { 53 + "inputs": { 54 + "crane": "crane", 55 + "flake-utils": "flake-utils", 56 + "nixpkgs": "nixpkgs", 57 + "rust-overlay": "rust-overlay" 58 + } 59 + }, 60 + "rust-overlay": { 61 + "inputs": { 62 + "nixpkgs": [ 63 + "nixpkgs" 64 + ] 65 + }, 66 + "locked": { 67 + "lastModified": 1758162771, 68 + "narHash": "sha256-hdZpMep6Z1gbgg9piUZ0BNusI6ZJaptBw6PHSN/3GD0=", 69 + "owner": "oxalica", 70 + "repo": "rust-overlay", 71 + "rev": "d0cabb6ae8f5b38dffaff9f4e6db57c0ae21d729", 72 + "type": "github" 73 + }, 74 + "original": { 75 + "owner": "oxalica", 76 + "repo": "rust-overlay", 77 + "type": "github" 78 + } 79 + }, 80 + "systems": { 81 + "locked": { 82 + "lastModified": 1681028828, 83 + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", 84 + "owner": "nix-systems", 85 + "repo": "default", 86 + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", 87 + "type": "github" 88 + }, 89 + "original": { 90 + "owner": "nix-systems", 91 + "repo": "default", 92 + "type": "github" 93 + } 94 + } 95 + }, 96 + "root": "root", 97 + "version": 7 98 + }
+464
flake.nix
··· 1 + { 2 + description = "Parakeet is a Rust-based Bluesky AppView"; 3 + inputs = { 4 + nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; 5 + crane.url = "github:ipetkov/crane"; 6 + flake-utils.url = "github:numtide/flake-utils"; 7 + rust-overlay = { 8 + url = "github:oxalica/rust-overlay"; 9 + inputs.nixpkgs.follows = "nixpkgs"; 10 + }; 11 + }; 12 + outputs = 13 + { 14 + self, 15 + nixpkgs, 16 + crane, 17 + flake-utils, 18 + rust-overlay, 19 + ... 20 + }: 21 + flake-utils.lib.eachDefaultSystem ( 22 + system: 23 + let 24 + pkgs = import nixpkgs { 25 + inherit system; 26 + overlays = [ (import rust-overlay) ]; 27 + }; 28 + craneLib = (crane.mkLib pkgs).overrideToolchain ( 29 + p: 30 + p.rust-bin.selectLatestNightlyWith ( 31 + toolchain: 32 + toolchain.default.override { 33 + extensions = [ 34 + "rust-src" 35 + "rust-analyzer" 36 + ]; 37 + } 38 + ) 39 + ); 40 + 41 + inherit (pkgs) lib; 42 + unfilteredRoot = ./.; # The original, unfiltered source 43 + src = lib.fileset.toSource { 44 + root = unfilteredRoot; 45 + fileset = lib.fileset.unions [ 46 + # Default files from crane (Rust and cargo files) 47 + (craneLib.fileset.commonCargoSources unfilteredRoot) 48 + ]; 49 + }; 50 + # Common arguments can be set here to avoid repeating them later 51 + commonArgs = { 52 + inherit src; 53 + strictDeps = true; 54 + nativeBuildInputs = with pkgs; [ 55 + pkg-config 56 + ]; 57 + buildInputs = [ 58 + # Add additional build inputs here 59 + pkgs.openssl 60 + pkgs.postgresql 61 + pkgs.libpq 62 + pkgs.clang 63 + pkgs.libclang 64 + pkgs.lld 65 + pkgs.protobuf 66 + ] 67 + ++ lib.optionals pkgs.stdenv.isDarwin [ 68 + # Additional darwin specific inputs can be set here 69 + pkgs.libiconv 70 + pkgs.darwin.apple_sdk.frameworks.Security 71 + ]; 72 + LIBCLANG_PATH = "${pkgs.llvmPackages_18.libclang.lib}/lib"; 73 + CLANG_PATH = "${pkgs.llvmPackages_18.clang}/bin/clang"; 74 + PROTOC_INCLUDE = "${pkgs.protobuf}/include"; 75 + PROTOC = "${pkgs.protobuf}/bin/protoc"; 76 + 77 + # Additional environment variables can be set directly 78 + # MY_CUSTOM_VAR = "some value"; 79 + }; 80 + 81 + # Build *just* the cargo dependencies, so we can reuse 82 + # all of that work (e.g. via cachix) when running in CI 83 + cargoArtifacts = craneLib.buildDepsOnly commonArgs; 84 + 85 + individualCrateArgs = commonArgs // { 86 + inherit cargoArtifacts; 87 + inherit (craneLib.crateNameFromCargoToml { inherit src; }) version; 88 + # NB: we disable tests since we'll run them all via cargo-nextest 89 + doCheck = false; 90 + }; 91 + fileSetForCrate = 92 + crate: 93 + lib.fileset.toSource { 94 + root = ./.; 95 + fileset = lib.fileset.unions [ 96 + ./Cargo.toml 97 + ./Cargo.lock 98 + ./migrations 99 + (craneLib.fileset.commonCargoSources ./consumer) 100 + ./consumer/src/db/sql 101 + (craneLib.fileset.commonCargoSources ./dataloader-rs) 102 + (craneLib.fileset.commonCargoSources ./did-resolver) 103 + (craneLib.fileset.commonCargoSources ./lexica) 104 + (craneLib.fileset.commonCargoSources ./parakeet) 105 + ./parakeet/src/sql 106 + (craneLib.fileset.commonCargoSources ./parakeet-db) 107 + (craneLib.fileset.commonCargoSources ./parakeet-index) 108 + ./parakeet-index/proto 109 + (craneLib.fileset.commonCargoSources ./parakeet-lexgen) 110 + (craneLib.fileset.commonCargoSources crate) 111 + ]; 112 + }; 113 + 114 + # Build the actual crate itself, reusing the dependency 115 + # artifacts from above. 116 + consumer = craneLib.buildPackage ( 117 + individualCrateArgs 118 + // { 119 + pname = "consumer"; 120 + cargoExtraArgs = "-p consumer"; 121 + src = fileSetForCrate ./consumer; 122 + postInstall = '' 123 + mkdir -p $out/{bin,lib/consumer} 124 + ''; 125 + } 126 + ); 127 + dataloader = craneLib.buildPackage ( 128 + individualCrateArgs 129 + // { 130 + pname = "dataloader"; 131 + cargoExtraArgs = "-p dataloader --features default"; 132 + src = fileSetForCrate ./dataloader-rs; 133 + } 134 + ); 135 + did-resolver = craneLib.buildPackage ( 136 + individualCrateArgs 137 + // { 138 + pname = "did-resolver"; 139 + cargoExtraArgs = "-p did-resolver"; 140 + src = fileSetForCrate ./did-resolver; 141 + } 142 + ); 143 + lexica = craneLib.buildPackage ( 144 + individualCrateArgs 145 + // { 146 + pname = "lexica"; 147 + cargoExtraArgs = "-p lexica"; 148 + src = fileSetForCrate ./lexica; 149 + } 150 + ); 151 + parakeet = craneLib.buildPackage ( 152 + individualCrateArgs 153 + // { 154 + pname = "parakeet"; 155 + cargoExtraArgs = "-p parakeet"; 156 + src = fileSetForCrate ./parakeet; 157 + } 158 + ); 159 + parakeet-db = craneLib.buildPackage ( 160 + individualCrateArgs 161 + // { 162 + pname = "parakeet-db"; 163 + cargoExtraArgs = "-p parakeet-db --features default"; 164 + src = fileSetForCrate ./parakeet-db; 165 + } 166 + ); 167 + parakeet-index = craneLib.buildPackage ( 168 + individualCrateArgs 169 + // { 170 + pname = "parakeet-index"; 171 + cargoExtraArgs = "-p parakeet-index --features server"; 172 + src = fileSetForCrate ./parakeet-index; 173 + } 174 + ); 175 + parakeet-lexgen = craneLib.buildPackage ( 176 + individualCrateArgs 177 + // { 178 + pname = "parakeet-lexgen"; 179 + cargoExtraArgs = "-p parakeet-lexgen"; 180 + src = fileSetForCrate ./parakeet-lexgen; 181 + } 182 + ); 183 + in 184 + { 185 + checks = { 186 + # Build the crate as part of `nix flake check` for convenience 187 + inherit 188 + consumer 189 + dataloader 190 + did-resolver 191 + lexica 192 + parakeet 193 + parakeet-db 194 + parakeet-index 195 + parakeet-lexgen 196 + ; 197 + }; 198 + 199 + packages = { 200 + default = parakeet; 201 + inherit 202 + consumer 203 + dataloader 204 + did-resolver 205 + lexica 206 + parakeet 207 + parakeet-db 208 + parakeet-index 209 + parakeet-lexgen 210 + ; 211 + }; 212 + 213 + devShells.default = craneLib.devShell { 214 + # Inherit inputs from checks. 215 + checks = self.checks.${system}; 216 + 217 + # Additional dev-shell environment variables can be set directly 218 + RUST_BACKTRACE = 1; 219 + NIXOS_OZONE_WL = 1; 220 + LIBCLANG_PATH = "${pkgs.llvmPackages.libclang.lib}/lib"; 221 + 222 + # Extra inputs can be added here; cargo and rustc are provided by default. 223 + packages = with pkgs; [ 224 + openssl 225 + bacon 226 + postgresql 227 + rust-analyzer 228 + rustfmt 229 + clippy 230 + git 231 + nixd 232 + direnv 233 + libpq 234 + clang 235 + libclang 236 + ]; 237 + }; 238 + } 239 + ) 240 + // flake-utils.lib.eachDefaultSystemPassThrough (system: { 241 + nixosModules = { 242 + default = 243 + { 244 + pkgs, 245 + lib, 246 + config, 247 + ... 248 + }: 249 + with lib; 250 + let 251 + cfg = config.services.parakeet; 252 + 253 + inherit (lib) 254 + mkEnableOption 255 + mkIf 256 + mkOption 257 + types 258 + ; 259 + in 260 + { 261 + options.services.parakeet = { 262 + enable = mkEnableOption "parakeet"; 263 + 264 + package = mkOption { 265 + type = types.package; 266 + default = self.packages.${pkgs.system}.default; 267 + description = "The path to the parakeet package."; 268 + }; 269 + 270 + environmentFiles = mkOption { 271 + type = types.listOf types.path; 272 + default = [ "/var/lib/parakeet/config.env" ]; 273 + description = '' 274 + File to load environment variables from. Loaded variables override 275 + values set in {option}`environment`. 276 + ''; 277 + }; 278 + }; 279 + config = mkIf cfg.enable { 280 + environment.systemPackages = [ 281 + self.packages.${pkgs.system}.consumer 282 + ]; 283 + systemd.services.consumer = { 284 + description = "consumer"; 285 + after = [ "network-online.target" ]; 286 + wants = [ "network-online.target" ]; 287 + wantedBy = [ "multi-user.target" ]; 288 + serviceConfig = { 289 + ExecStart = "${self.packages.${pkgs.system}.consumer}/bin/consumer --indexer"; 290 + Type = "exec"; 291 + 292 + EnvironmentFile = cfg.environmentFiles; 293 + User = "parakeet"; 294 + Group = "parakeet"; 295 + StateDirectory = "parakeet"; 296 + StateDirectoryMode = "0755"; 297 + Restart = "always"; 298 + 299 + # Hardening 300 + RemoveIPC = true; 301 + CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ]; 302 + NoNewPrivileges = true; 303 + PrivateDevices = true; 304 + ProtectClock = true; 305 + ProtectKernelLogs = true; 306 + ProtectControlGroups = true; 307 + ProtectKernelModules = true; 308 + PrivateMounts = true; 309 + SystemCallArchitectures = [ "native" ]; 310 + MemoryDenyWriteExecute = false; # required by V8 JIT 311 + RestrictNamespaces = true; 312 + RestrictSUIDSGID = true; 313 + ProtectHostname = true; 314 + LockPersonality = true; 315 + ProtectKernelTunables = true; 316 + RestrictAddressFamilies = [ 317 + "AF_UNIX" 318 + "AF_INET" 319 + "AF_INET6" 320 + ]; 321 + RestrictRealtime = true; 322 + DeviceAllow = [ "" ]; 323 + ProtectSystem = "full"; 324 + ProtectProc = "invisible"; 325 + ProcSubset = "pid"; 326 + ProtectHome = true; 327 + PrivateUsers = true; 328 + PrivateTmp = true; 329 + UMask = "0077"; 330 + }; 331 + }; 332 + systemd.services.parakeet = { 333 + description = "parakeet"; 334 + after = [ "network-online.target" ]; 335 + wants = [ "network-online.target" ]; 336 + wantedBy = [ "multi-user.target" ]; 337 + serviceConfig = { 338 + ExecStart = "${cfg.package}/bin/parakeet"; 339 + Type = "exec"; 340 + 341 + EnvironmentFile = cfg.environmentFiles; 342 + User = "parakeet"; 343 + Group = "parakeet"; 344 + StateDirectory = "parakeet"; 345 + StateDirectoryMode = "0755"; 346 + Restart = "always"; 347 + 348 + # Hardening 349 + RemoveIPC = true; 350 + CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ]; 351 + NoNewPrivileges = true; 352 + PrivateDevices = true; 353 + ProtectClock = true; 354 + ProtectKernelLogs = true; 355 + ProtectControlGroups = true; 356 + ProtectKernelModules = true; 357 + PrivateMounts = true; 358 + SystemCallArchitectures = [ "native" ]; 359 + MemoryDenyWriteExecute = false; # required by V8 JIT 360 + RestrictNamespaces = true; 361 + RestrictSUIDSGID = true; 362 + ProtectHostname = true; 363 + LockPersonality = true; 364 + ProtectKernelTunables = true; 365 + RestrictAddressFamilies = [ 366 + "AF_UNIX" 367 + "AF_INET" 368 + "AF_INET6" 369 + ]; 370 + RestrictRealtime = true; 371 + DeviceAllow = [ "" ]; 372 + ProtectSystem = "full"; 373 + ProtectProc = "invisible"; 374 + ProcSubset = "pid"; 375 + ProtectHome = true; 376 + PrivateUsers = true; 377 + PrivateTmp = true; 378 + UMask = "0077"; 379 + }; 380 + }; 381 + systemd.services.parakeet-index = { 382 + description = "parakeet-index"; 383 + after = [ "network-online.target" ]; 384 + wants = [ "network-online.target" ]; 385 + wantedBy = [ "multi-user.target" ]; 386 + serviceConfig = { 387 + ExecStart = "${self.packages.${pkgs.system}.parakeet-index}/bin/parakeet-index"; 388 + Type = "exec"; 389 + 390 + EnvironmentFile = cfg.environmentFiles; 391 + User = "parakeet"; 392 + Group = "parakeet"; 393 + StateDirectory = "parakeet"; 394 + StateDirectoryMode = "0755"; 395 + Restart = "always"; 396 + 397 + # Hardening 398 + RemoveIPC = true; 399 + CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ]; 400 + NoNewPrivileges = true; 401 + PrivateDevices = true; 402 + ProtectClock = true; 403 + ProtectKernelLogs = true; 404 + ProtectControlGroups = true; 405 + ProtectKernelModules = true; 406 + PrivateMounts = true; 407 + SystemCallArchitectures = [ "native" ]; 408 + MemoryDenyWriteExecute = false; # required by V8 JIT 409 + RestrictNamespaces = true; 410 + RestrictSUIDSGID = true; 411 + ProtectHostname = true; 412 + LockPersonality = true; 413 + ProtectKernelTunables = true; 414 + RestrictAddressFamilies = [ 415 + "AF_UNIX" 416 + "AF_INET" 417 + "AF_INET6" 418 + ]; 419 + RestrictRealtime = true; 420 + DeviceAllow = [ "" ]; 421 + ProtectSystem = "full"; 422 + ProtectProc = "invisible"; 423 + ProcSubset = "pid"; 424 + ProtectHome = true; 425 + PrivateUsers = true; 426 + PrivateTmp = true; 427 + UMask = "0077"; 428 + }; 429 + }; 430 + users = { 431 + users.parakeet = { 432 + group = "parakeet"; 433 + isSystemUser = true; 434 + }; 435 + groups.parakeet = { }; 436 + }; 437 + services.postgresql = { 438 + enable = true; 439 + ensureUsers = [ 440 + { 441 + name = "parakeet"; 442 + ensureDBOwnership = true; 443 + } 444 + ]; 445 + ensureDatabases = [ "parakeet" ]; 446 + authentication = pkgs.lib.mkOverride 10 '' 447 + #type database DBuser auth-method 448 + local all all trust 449 + host all all 127.0.0.1/32 trust 450 + host all all ::1/128 trust 451 + ''; 452 + package = mkForce pkgs.postgresql_16; 453 + }; 454 + services.redis.servers.parakeet = { 455 + enable = true; 456 + # port = 0; 457 + unixSocket = "/run/redis-parakeet/redis.sock"; 458 + user = "parakeet"; 459 + }; 460 + }; 461 + }; 462 + }; 463 + }); 464 + }
+6 -28
lexica/src/app_bsky/actor.rs
··· 1 1 use crate::app_bsky::embed::External; 2 - use crate::app_bsky::graph::ListViewBasic; 3 2 use crate::com_atproto::label::Label; 4 3 use chrono::prelude::*; 5 4 use serde::{Deserialize, Serialize}; 6 5 use std::fmt::Display; 7 6 use std::str::FromStr; 8 - 9 - #[derive(Clone, Default, Debug, Serialize)] 10 - #[serde(rename_all = "camelCase")] 11 - pub struct ProfileViewerState { 12 - pub muted: bool, 13 - #[serde(skip_serializing_if = "Option::is_none")] 14 - pub muted_by_list: Option<ListViewBasic>, 15 - pub blocked_by: bool, 16 - #[serde(skip_serializing_if = "Option::is_none")] 17 - pub blocking: Option<String>, 18 - #[serde(skip_serializing_if = "Option::is_none")] 19 - pub blocking_by_list: Option<ListViewBasic>, 20 - #[serde(skip_serializing_if = "Option::is_none")] 21 - pub following: Option<String>, 22 - #[serde(skip_serializing_if = "Option::is_none")] 23 - pub followed_by: Option<String>, 24 - // #[serde(skip_serializing_if = "Option::is_none")] 25 - // pub known_followers: Option<()>, 26 - // #[serde(skip_serializing_if = "Option::is_none")] 27 - // pub activity_subscriptions: Option<()>, 28 - } 29 7 30 8 #[derive(Clone, Default, Debug, Serialize)] 31 9 #[serde(rename_all = "camelCase")] ··· 152 130 pub avatar: Option<String>, 153 131 #[serde(skip_serializing_if = "Option::is_none")] 154 132 pub associated: Option<ProfileAssociated>, 155 - #[serde(skip_serializing_if = "Option::is_none")] 156 - pub viewer: Option<ProfileViewerState>, 133 + // #[serde(skip_serializing_if = "Option::is_none")] 134 + // pub viewer: Option<()>, 157 135 #[serde(skip_serializing_if = "Vec::is_empty")] 158 136 pub labels: Vec<Label>, 159 137 #[serde(skip_serializing_if = "Option::is_none")] ··· 178 156 pub avatar: Option<String>, 179 157 #[serde(skip_serializing_if = "Option::is_none")] 180 158 pub associated: Option<ProfileAssociated>, 181 - #[serde(skip_serializing_if = "Option::is_none")] 182 - pub viewer: Option<ProfileViewerState>, 159 + // #[serde(skip_serializing_if = "Option::is_none")] 160 + // pub viewer: Option<()>, 183 161 #[serde(skip_serializing_if = "Vec::is_empty")] 184 162 pub labels: Vec<Label>, 185 163 #[serde(skip_serializing_if = "Option::is_none")] ··· 211 189 pub associated: Option<ProfileAssociated>, 212 190 // #[serde(skip_serializing_if = "Option::is_none")] 213 191 // pub joined_via_starter_pack: Option<()>, 214 - #[serde(skip_serializing_if = "Option::is_none")] 215 - pub viewer: Option<ProfileViewerState>, 192 + // #[serde(skip_serializing_if = "Option::is_none")] 193 + // pub viewer: Option<()>, 216 194 #[serde(skip_serializing_if = "Vec::is_empty")] 217 195 pub labels: Vec<Label>, 218 196 // #[serde(skip_serializing_if = "Option::is_none")]
+32
lexica/src/app_bsky/bookmark.rs
··· 1 + use crate::app_bsky::feed::{BlockedAuthor, PostView}; 2 + use crate::StrongRef; 3 + use chrono::prelude::*; 4 + use serde::Serialize; 5 + 6 + #[derive(Clone, Debug, Serialize)] 7 + #[serde(rename_all = "camelCase")] 8 + pub struct BookmarkView { 9 + pub subject: StrongRef, 10 + pub item: BookmarkViewItem, 11 + pub created_at: DateTime<Utc>, 12 + } 13 + 14 + #[derive(Clone, Debug, Serialize)] 15 + #[serde(tag = "$type")] 16 + // This is technically the same as ReplyRefPost atm, but just in case... 17 + pub enum BookmarkViewItem { 18 + #[serde(rename = "app.bsky.feed.defs#postView")] 19 + Post(PostView), 20 + #[serde(rename = "app.bsky.feed.defs#notFoundPost")] 21 + NotFound { 22 + uri: String, 23 + #[serde(rename = "notFound")] 24 + not_found: bool, 25 + }, 26 + #[serde(rename = "app.bsky.feed.defs#blockedPost")] 27 + Blocked { 28 + uri: String, 29 + blocked: bool, 30 + author: BlockedAuthor, 31 + }, 32 + }
+9 -24
lexica/src/app_bsky/feed.rs
··· 1 1 use super::RecordStats; 2 - use crate::app_bsky::actor::{ProfileView, ProfileViewBasic, ProfileViewerState}; 2 + use crate::app_bsky::actor::{ProfileView, ProfileViewBasic}; 3 3 use crate::app_bsky::embed::Embed; 4 4 use crate::app_bsky::graph::ListViewBasic; 5 5 use crate::app_bsky::richtext::FacetMain; ··· 8 8 use serde::{Deserialize, Serialize}; 9 9 use std::str::FromStr; 10 10 11 - #[derive(Clone, Default, Debug, Serialize)] 12 - #[serde(rename_all = "camelCase")] 13 - pub struct PostViewerState { 14 - pub repost: Option<String>, 15 - pub like: Option<String>, 16 - pub thread_muted: bool, 17 - pub reply_disabled: bool, 18 - pub embedding_disabled: bool, 19 - pub pinned: bool, 20 - } 21 - 22 11 #[derive(Clone, Debug, Serialize)] 23 12 #[serde(rename_all = "camelCase")] 24 13 pub struct PostView { ··· 34 23 35 24 #[serde(skip_serializing_if = "Vec::is_empty")] 36 25 pub labels: Vec<Label>, 37 - #[serde(skip_serializing_if = "Option::is_none")] 38 - pub viewer: Option<PostViewerState>, 26 + // #[serde(skip_serializing_if = "Option::is_none")] 27 + // pub viewer: Option<()>, 39 28 #[serde(skip_serializing_if = "Option::is_none")] 40 29 pub threadgate: Option<ThreadgateView>, 41 30 ··· 135 124 #[derive(Clone, Debug, Serialize)] 136 125 pub struct BlockedAuthor { 137 126 pub uri: String, 138 - pub viewer: Option<ProfileViewerState>, 139 - } 140 - 141 - #[derive(Clone, Default, Debug, Serialize)] 142 - #[serde(rename_all = "camelCase")] 143 - pub struct GeneratorViewerState { 144 - pub like: Option<String>, 127 + // pub viewer: Option<()>, 145 128 } 146 129 147 130 #[derive(Clone, Debug, Serialize)] ··· 165 148 pub accepts_interactions: bool, 166 149 #[serde(skip_serializing_if = "Vec::is_empty")] 167 150 pub labels: Vec<Label>, 168 - #[serde(skip_serializing_if = "Option::is_none")] 169 - pub viewer: Option<GeneratorViewerState>, 151 + // #[serde(skip_serializing_if = "Option::is_none")] 152 + // pub viewer: Option<()>, 170 153 #[serde(skip_serializing_if = "Option::is_none")] 171 154 pub content_mode: Option<GeneratorContentMode>, 172 155 ··· 236 219 #[serde(rename = "app.bsky.feed.defs#skeletonReasonPin")] 237 220 Pin {}, 238 221 #[serde(rename = "app.bsky.feed.defs#skeletonReasonRepost")] 239 - Repost { repost: String }, 222 + Repost { 223 + repost: String, 224 + }, 240 225 }
+4 -11
lexica/src/app_bsky/graph.rs
··· 6 6 use serde::{Deserialize, Serialize}; 7 7 use std::str::FromStr; 8 8 9 - #[derive(Clone, Default, Debug, Serialize)] 10 - #[serde(rename_all = "camelCase")] 11 - pub struct ListViewerState { 12 - pub muted: bool, 13 - pub blocked: Option<String>, 14 - } 15 - 16 9 #[derive(Clone, Debug, Serialize)] 17 10 #[serde(rename_all = "camelCase")] 18 11 pub struct ListViewBasic { ··· 25 18 pub avatar: Option<String>, 26 19 pub list_item_count: i64, 27 20 28 - #[serde(skip_serializing_if = "Option::is_none")] 29 - pub viewer: Option<ListViewerState>, 21 + // #[serde(skip_serializing_if = "Option::is_none")] 22 + // pub viewer: Option<()>, 30 23 #[serde(skip_serializing_if = "Vec::is_empty")] 31 24 pub labels: Vec<Label>, 32 25 ··· 51 44 pub avatar: Option<String>, 52 45 pub list_item_count: i64, 53 46 54 - #[serde(skip_serializing_if = "Option::is_none")] 55 - pub viewer: Option<ListViewerState>, 47 + // #[serde(skip_serializing_if = "Option::is_none")] 48 + // pub viewer: Option<()>, 56 49 #[serde(skip_serializing_if = "Vec::is_empty")] 57 50 pub labels: Vec<Label>, 58 51
+4 -10
lexica/src/app_bsky/labeler.rs
··· 4 4 use chrono::prelude::*; 5 5 use serde::{Deserialize, Serialize}; 6 6 7 - #[derive(Clone, Default, Debug, Serialize)] 8 - #[serde(rename_all = "camelCase")] 9 - pub struct LabelerViewerState { 10 - pub like: bool, 11 - } 12 - 13 7 #[derive(Clone, Debug, Serialize)] 14 8 #[serde(rename_all = "camelCase")] 15 9 pub struct LabelerView { ··· 18 12 pub creator: ProfileView, 19 13 20 14 pub like_count: i64, 21 - #[serde(skip_serializing_if = "Option::is_none")] 22 - pub viewer: Option<LabelerViewerState>, 15 + // #[serde(skip_serializing_if = "Option::is_none")] 16 + // pub viewer: Option<()>, 23 17 #[serde(skip_serializing_if = "Vec::is_empty")] 24 18 pub labels: Vec<Label>, 25 19 pub indexed_at: DateTime<Utc>, ··· 33 27 pub creator: ProfileView, 34 28 35 29 pub like_count: i64, 36 - #[serde(skip_serializing_if = "Option::is_none")] 37 - pub viewer: Option<LabelerViewerState>, 30 + // #[serde(skip_serializing_if = "Option::is_none")] 31 + // pub viewer: Option<()>, 38 32 #[serde(skip_serializing_if = "Vec::is_empty")] 39 33 pub labels: Vec<Label>, 40 34 pub policies: LabelerPolicy,
+1
lexica/src/app_bsky/mod.rs
··· 1 1 use serde::Serialize; 2 2 3 3 pub mod actor; 4 + pub mod bookmark; 4 5 pub mod embed; 5 6 pub mod feed; 6 7 pub mod graph;
+14
lexica/src/community_lexicon/bookmarks.rs
··· 1 + use chrono::prelude::*; 2 + use serde::{Deserialize, Serialize}; 3 + 4 + #[derive(Clone, Debug, Deserialize, Serialize)] 5 + #[serde(tag = "$type")] 6 + #[serde(rename = "community.lexicon.bookmarks.bookmark")] 7 + #[serde(rename_all = "camelCase")] 8 + pub struct Bookmark { 9 + pub subject: String, 10 + #[serde(default)] 11 + #[serde(skip_serializing_if = "Vec::is_empty")] 12 + pub tags: Vec<String>, 13 + pub created_at: DateTime<Utc>, 14 + }
+1
lexica/src/community_lexicon/mod.rs
··· 1 + pub mod bookmarks;
+8
lexica/src/lib.rs
··· 5 5 6 6 pub mod app_bsky; 7 7 pub mod com_atproto; 8 + pub mod community_lexicon; 8 9 mod utils; 9 10 10 11 #[derive(Clone, Debug, Serialize)] ··· 21 22 )] 22 23 pub cid: Cid, 23 24 pub uri: String, 25 + } 26 + 27 + impl StrongRef { 28 + pub fn new_from_str(uri: String, cid: &str) -> Result<Self, cid::Error> { 29 + let cid = cid.parse()?; 30 + Ok(StrongRef { uri, cid }) 31 + } 24 32 } 25 33 26 34 #[derive(Clone, Debug, Deserialize, Serialize)]
+1
migrations/2025-09-02-190833_bookmarks/down.sql
··· 1 + drop table bookmarks;
+19
migrations/2025-09-02-190833_bookmarks/up.sql
··· 1 + create table bookmarks 2 + ( 3 + did text not null references actors (did), 4 + rkey text, 5 + subject text not null, 6 + subject_cid text, 7 + subject_type text not null, 8 + tags text[] not null default ARRAY []::text[], 9 + 10 + created_at timestamptz not null default now(), 11 + 12 + primary key (did, subject) 13 + ); 14 + 15 + create index bookmarks_rkey_index on bookmarks (rkey); 16 + create index bookmarks_subject_index on bookmarks (subject); 17 + create index bookmarks_subject_type_index on bookmarks (subject_type); 18 + create index bookmarks_tags_index on bookmarks using gin (tags); 19 + create unique index bookmarks_rkey_ui on bookmarks (did, rkey);
-1
parakeet/src/hydration/feedgen.rs
··· 35 35 like_count: likes.unwrap_or_default() as i64, 36 36 accepts_interactions: feedgen.accepts_interactions, 37 37 labels: map_labels(labels), 38 - viewer: None, 39 38 content_mode, 40 39 indexed_at: feedgen.created_at, 41 40 }
-2
parakeet/src/hydration/labeler.rs
··· 18 18 cid: labeler.cid, 19 19 creator, 20 20 like_count: likes.unwrap_or_default() as i64, 21 - viewer: None, 22 21 labels: map_labels(labels), 23 22 indexed_at: labeler.indexed_at.and_utc(), 24 23 } ··· 78 77 cid: labeler.cid, 79 78 creator, 80 79 like_count: likes.unwrap_or_default() as i64, 81 - viewer: None, 82 80 policies: LabelerPolicy { 83 81 label_values, 84 82 label_value_definitions,
-2
parakeet/src/hydration/list.rs
··· 22 22 purpose, 23 23 avatar, 24 24 list_item_count, 25 - viewer: None, 26 25 labels: map_labels(labels), 27 26 indexed_at: list.created_at, 28 27 }) ··· 52 51 description_facets, 53 52 avatar, 54 53 list_item_count, 55 - viewer: None, 56 54 labels: map_labels(labels), 57 55 indexed_at: list.created_at, 58 56 })
-1
parakeet/src/hydration/posts.rs
··· 33 33 embed, 34 34 stats, 35 35 labels: map_labels(labels), 36 - viewer: None, 37 36 threadgate, 38 37 indexed_at: post.created_at, 39 38 }
-3
parakeet/src/hydration/profile.rs
··· 169 169 display_name: profile.display_name, 170 170 avatar, 171 171 associated, 172 - viewer: None, 173 172 labels: map_labels(labels), 174 173 verification, 175 174 status, ··· 196 195 description: profile.description, 197 196 avatar, 198 197 associated, 199 - viewer: None, 200 198 labels: map_labels(labels), 201 199 verification, 202 200 status, ··· 228 226 followers_count: stats.map(|v| v.followers as i64).unwrap_or_default(), 229 227 follows_count: stats.map(|v| v.following as i64).unwrap_or_default(), 230 228 associated, 231 - viewer: None, 232 229 labels: map_labels(labels), 233 230 verification, 234 231 status,
+146
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 1 + use crate::hydration::StatefulHydrator; 2 + use crate::xrpc::error::XrpcResult; 3 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 4 + use crate::xrpc::{datetime_cursor, CursorQuery}; 5 + use crate::GlobalState; 6 + use axum::extract::{Query, State}; 7 + use axum::Json; 8 + use diesel::prelude::*; 9 + use diesel_async::RunQueryDsl; 10 + use lexica::app_bsky::bookmark::{BookmarkView, BookmarkViewItem}; 11 + use parakeet_db::{models, schema}; 12 + use serde::{Deserialize, Serialize}; 13 + use lexica::StrongRef; 14 + 15 + const BSKY_ALLOWED_TYPES: &[&str] = &["app.bsky.feed.post"]; 16 + 17 + #[derive(Debug, Deserialize)] 18 + pub struct CreateBookmarkReq { 19 + pub uri: String, 20 + pub cid: String, 21 + } 22 + 23 + pub async fn create_bookmark( 24 + State(state): State<GlobalState>, 25 + auth: AtpAuth, 26 + Json(form): Json<CreateBookmarkReq>, 27 + ) -> XrpcResult<()> { 28 + let mut conn = state.pool.get().await?; 29 + 30 + // strip "at://" then break into parts by '/' 31 + let parts = form.uri[5..].split('/').collect::<Vec<_>>(); 32 + 33 + let data = models::NewBookmark { 34 + did: &auth.0, 35 + rkey: None, 36 + subject: &form.uri, 37 + subject_cid: Some(form.cid), 38 + subject_type: &parts[1], 39 + tags: vec![], 40 + }; 41 + 42 + diesel::insert_into(schema::bookmarks::table) 43 + .values(&data) 44 + .on_conflict_do_nothing() 45 + .execute(&mut conn) 46 + .await?; 47 + 48 + Ok(()) 49 + } 50 + 51 + #[derive(Debug, Deserialize)] 52 + pub struct DeleteBookmarkReq { 53 + pub uri: String, 54 + } 55 + 56 + pub async fn delete_bookmark( 57 + State(state): State<GlobalState>, 58 + auth: AtpAuth, 59 + Json(form): Json<DeleteBookmarkReq>, 60 + ) -> XrpcResult<()> { 61 + let mut conn = state.pool.get().await?; 62 + 63 + diesel::delete(schema::bookmarks::table) 64 + .filter( 65 + schema::bookmarks::did 66 + .eq(&auth.0) 67 + .and(schema::bookmarks::subject.eq(&form.uri)), 68 + ) 69 + .execute(&mut conn) 70 + .await?; 71 + 72 + Ok(()) 73 + } 74 + 75 + #[derive(Debug, Serialize)] 76 + pub struct GetBookmarksRes { 77 + #[serde(skip_serializing_if = "Option::is_none")] 78 + cursor: Option<String>, 79 + bookmarks: Vec<BookmarkView>, 80 + } 81 + 82 + pub async fn get_bookmarks( 83 + State(state): State<GlobalState>, 84 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 85 + auth: AtpAuth, 86 + Query(query): Query<CursorQuery>, 87 + ) -> XrpcResult<Json<GetBookmarksRes>> { 88 + let mut conn = state.pool.get().await?; 89 + let did = auth.0.clone(); 90 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, Some(auth)); 91 + 92 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 93 + 94 + let mut bookmarks_query = schema::bookmarks::table 95 + .select(models::Bookmark::as_select()) 96 + .filter(schema::bookmarks::did.eq(&did)) 97 + .filter(schema::bookmarks::subject_type.eq_any(BSKY_ALLOWED_TYPES)) 98 + .into_boxed(); 99 + 100 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 101 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor)); 102 + } 103 + 104 + let results = bookmarks_query 105 + .order(schema::bookmarks::created_at.desc()) 106 + .limit(limit as i64) 107 + .load(&mut conn) 108 + .await?; 109 + 110 + let cursor = results 111 + .last() 112 + .map(|bm| bm.created_at.timestamp_millis().to_string()); 113 + 114 + let uris = results.iter().map(|bm| bm.subject.clone()).collect(); 115 + 116 + let mut posts = hyd.hydrate_posts(uris).await; 117 + 118 + let bookmarks = results 119 + .into_iter() 120 + .filter_map(|bookmark| { 121 + let maybe_item = posts.remove(&bookmark.subject); 122 + let maybe_cid = maybe_item.as_ref().map(|v| v.cid.clone()); 123 + 124 + // ensure that either the cid is set in the bookmark record *or* in the post record 125 + // otherwise just ditch. we should have one. 126 + let cid = bookmark.subject_cid.or(maybe_cid)?; 127 + 128 + let item = maybe_item.map(BookmarkViewItem::Post).unwrap_or( 129 + BookmarkViewItem::NotFound { 130 + uri: bookmark.subject.clone(), 131 + not_found: true, 132 + }, 133 + ); 134 + 135 + let subject = StrongRef::new_from_str(bookmark.subject, &cid).ok()?; 136 + 137 + Some(BookmarkView { 138 + subject, 139 + item, 140 + created_at: bookmark.created_at, 141 + }) 142 + }) 143 + .collect(); 144 + 145 + Ok(Json(GetBookmarksRes { cursor, bookmarks })) 146 + }
+4
parakeet/src/xrpc/app_bsky/mod.rs
··· 2 2 use axum::Router; 3 3 4 4 mod actor; 5 + mod bookmark; 5 6 mod feed; 6 7 mod graph; 7 8 mod labeler; ··· 14 15 // TODO: app.bsky.actor.getSuggestions (recs) 15 16 // TODO: app.bsky.actor.searchActor (search) 16 17 // TODO: app.bsky.actor.searchActorTypeahead (search) 18 + .route("/app.bsky.bookmark.createBookmark", post(bookmark::create_bookmark)) 19 + .route("/app.bsky.bookmark.deleteBookmark", post(bookmark::delete_bookmark)) 20 + .route("/app.bsky.bookmark.getBookmarks", get(bookmark::get_bookmarks)) 17 21 .route("/app.bsky.feed.getActorFeeds", get(feed::feedgen::get_actor_feeds)) 18 22 .route("/app.bsky.feed.getActorLikes", get(feed::likes::get_actor_likes)) 19 23 .route("/app.bsky.feed.getAuthorFeed", get(feed::posts::get_author_feed))
+69
parakeet/src/xrpc/community_lexicon/bookmarks.rs
··· 1 + use crate::xrpc::datetime_cursor; 2 + use crate::xrpc::error::XrpcResult; 3 + use crate::xrpc::extract::AtpAuth; 4 + use crate::GlobalState; 5 + use axum::extract::{Query, State}; 6 + use axum::Json; 7 + use diesel::prelude::*; 8 + use diesel_async::RunQueryDsl; 9 + use lexica::community_lexicon::bookmarks::Bookmark; 10 + use parakeet_db::{models, schema}; 11 + use serde::{Deserialize, Serialize}; 12 + 13 + #[derive(Debug, Deserialize)] 14 + pub struct BookmarkCursorQuery { 15 + pub tags: Option<Vec<String>>, 16 + pub limit: Option<u8>, 17 + pub cursor: Option<String>, 18 + } 19 + 20 + #[derive(Debug, Serialize)] 21 + pub struct GetActorBookmarksRes { 22 + #[serde(skip_serializing_if = "Option::is_none")] 23 + cursor: Option<String>, 24 + bookmarks: Vec<Bookmark>, 25 + } 26 + 27 + pub async fn get_actor_bookmarks( 28 + State(state): State<GlobalState>, 29 + auth: AtpAuth, 30 + Query(query): Query<BookmarkCursorQuery>, 31 + ) -> XrpcResult<Json<GetActorBookmarksRes>> { 32 + let mut conn = state.pool.get().await?; 33 + 34 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 35 + 36 + let mut bookmarks_query = schema::bookmarks::table 37 + .select(models::Bookmark::as_select()) 38 + .filter(schema::bookmarks::did.eq(&auth.0)) 39 + .into_boxed(); 40 + 41 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 42 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor)); 43 + } 44 + 45 + if let Some(tags) = query.tags { 46 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::tags.contains(tags)); 47 + } 48 + 49 + let results = bookmarks_query 50 + .order(schema::bookmarks::created_at.desc()) 51 + .limit(limit as i64) 52 + .load(&mut conn) 53 + .await?; 54 + 55 + let cursor = results 56 + .last() 57 + .map(|bm| bm.created_at.timestamp_millis().to_string()); 58 + 59 + let bookmarks = results 60 + .into_iter() 61 + .map(|bookmark| Bookmark { 62 + subject: bookmark.subject, 63 + tags: bookmark.tags.into_iter().flatten().collect(), 64 + created_at: bookmark.created_at, 65 + }) 66 + .collect(); 67 + 68 + Ok(Json(GetActorBookmarksRes { cursor, bookmarks })) 69 + }
+10
parakeet/src/xrpc/community_lexicon/mod.rs
··· 1 + use axum::routing::get; 2 + use axum::Router; 3 + 4 + pub mod bookmarks; 5 + 6 + #[rustfmt::skip] 7 + pub fn routes() -> Router<crate::GlobalState> { 8 + Router::new() 9 + .route("/community.lexicon.bookmarks.getActorBookmarks", get(bookmarks::get_actor_bookmarks)) 10 + }
+2
parakeet/src/xrpc/mod.rs
··· 8 8 mod app_bsky; 9 9 pub mod cdn; 10 10 mod com_atproto; 11 + mod community_lexicon; 11 12 mod error; 12 13 pub mod extract; 13 14 pub mod jwt; ··· 16 17 Router::new() 17 18 .merge(app_bsky::routes()) 18 19 .merge(com_atproto::routes()) 20 + .merge(community_lexicon::routes()) 19 21 } 20 22 21 23 fn datetime_cursor(cursor: Option<&String>) -> Option<chrono::DateTime<chrono::Utc>> {
+26
parakeet-db/src/models.rs
··· 383 383 pub did: &'a str, 384 384 pub list_uri: &'a str, 385 385 } 386 + 387 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 388 + #[diesel(table_name = crate::schema::bookmarks)] 389 + #[diesel(primary_key(did, subject, subject_cid))] 390 + #[diesel(check_for_backend(diesel::pg::Pg))] 391 + pub struct Bookmark { 392 + pub did: String, 393 + pub rkey: Option<String>, 394 + pub subject: String, 395 + pub subject_cid: Option<String>, 396 + pub subject_type: String, 397 + pub tags: Vec<Option<String>>, 398 + pub created_at: DateTime<Utc>, 399 + } 400 + 401 + #[derive(Debug, Insertable, AsChangeset)] 402 + #[diesel(table_name = crate::schema::bookmarks)] 403 + #[diesel(check_for_backend(diesel::pg::Pg))] 404 + pub struct NewBookmark<'a> { 405 + pub did: &'a str, 406 + pub rkey: Option<String>, 407 + pub subject: &'a str, 408 + pub subject_cid: Option<String>, 409 + pub subject_type: &'a str, 410 + pub tags: Vec<String>, 411 + }
+14
parakeet-db/src/schema.rs
··· 43 43 } 44 44 45 45 diesel::table! { 46 + bookmarks (did, subject) { 47 + did -> Text, 48 + rkey -> Nullable<Text>, 49 + subject -> Text, 50 + subject_cid -> Nullable<Text>, 51 + subject_type -> Text, 52 + tags -> Array<Nullable<Text>>, 53 + created_at -> Timestamptz, 54 + } 55 + } 56 + 57 + diesel::table! { 46 58 chat_decls (did) { 47 59 did -> Text, 48 60 allow_incoming -> Text, ··· 375 387 376 388 diesel::joinable!(backfill -> actors (repo)); 377 389 diesel::joinable!(blocks -> actors (did)); 390 + diesel::joinable!(bookmarks -> actors (did)); 378 391 diesel::joinable!(chat_decls -> actors (did)); 379 392 diesel::joinable!(feedgens -> actors (owner)); 380 393 diesel::joinable!(follows -> actors (did)); ··· 405 418 backfill, 406 419 backfill_jobs, 407 420 blocks, 421 + bookmarks, 408 422 chat_decls, 409 423 feedgens, 410 424 follows,
+4 -1
parakeet-index/build.rs
··· 1 1 fn main() -> Result<(), Box<dyn std::error::Error>> { 2 - tonic_build::configure().compile_protos(&["proto/parakeet.proto"], &[""])?; 2 + tonic_build::configure().compile_protos( 3 + &[std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("proto/parakeet.proto")], 4 + &[std::path::Path::new(env!("CARGO_MANIFEST_DIR"))], 5 + )?; 3 6 4 7 Ok(()) 5 8 }