Rust AppView - highly experimental!

Compare changes

Choose any two refs to compare.

Changed files
+1088 -169
consumer
lexica
src
migrations
2025-09-02-190833_bookmarks
parakeet
src
xrpc
app_bsky
community_lexicon
parakeet-db
parakeet-index
+1
.envrc
··· 1 + use flake
+1
.gitignore
··· 4 4 .env 5 5 Config.toml 6 6 data/ 7 + .direnv/
+11
README.md
··· 3 3 Parakeet is a [Bluesky](https://bsky.app) [AppView](https://atproto.wiki/en/wiki/reference/core-architecture/appview) 4 4 aiming to implement most of the functionality required to support the Bluesky client. Notably not implemented is a CDN. 5 5 6 + ## Status and Roadmap 7 + Most common functionality works, with notable omissions being like/repost/follow statuses, blocks and mutes don't get 8 + applied, labels might not track CIDs properly, label redaction doesn't work at all (beware!). 9 + 10 + Future work is tracked in issues, but the highlights are below. Help would be highly appreciated. 11 + - Notifications 12 + - Search 13 + - Pinned Posts 14 + - The Timeline 15 + - Monitoring: metrics, tracing, and health checks. 16 + 6 17 ## The Code 7 18 Parakeet is implemented in Rust, using Postgres as a database, Redis for caching and queue processing, RocksDB for 8 19 aggregation, and Diesel for migrations and querying.
+8 -8
consumer/src/backfill/downloader.rs
··· 109 109 Ok(Some(did_doc)) => { 110 110 let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else { 111 111 tracing::warn!("bad DID doc for {did}"); 112 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 112 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did_svc") 113 113 .await 114 114 .unwrap(); 115 115 continue; ··· 132 132 } 133 133 } 134 134 Ok(None) => { 135 - tracing::warn!(did, "bad DID doc"); 135 + tracing::warn!(did, "bad/missing DID doc"); 136 136 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 137 137 .await 138 138 .unwrap(); 139 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 139 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did_doc") 140 140 .await 141 141 .unwrap(); 142 142 } ··· 145 145 db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now()) 146 146 .await 147 147 .unwrap(); 148 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 148 + db::backfill_job_write(&mut conn, &did, "failed.resolve.did") 149 149 .await 150 150 .unwrap(); 151 151 } ··· 179 179 Ok(false) => continue, 180 180 Err(e) => { 181 181 tracing::error!(pds, did, "failed to check repo status: {e}"); 182 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 182 + db::backfill_job_write(&mut conn, &did, "failed.resolve.status") 183 183 .await 184 184 .unwrap(); 185 185 continue; ··· 190 190 if let Some(handle) = maybe_handle { 191 191 if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await { 192 192 tracing::error!(pds, did, "failed to resolve handle: {e}"); 193 - db::backfill_job_write(&mut conn, &did, "failed.resolve") 193 + db::backfill_job_write(&mut conn, &did, "failed.resolve.handle") 194 194 .await 195 195 .unwrap(); 196 196 } ··· 253 253 pds: &str, 254 254 did: &str, 255 255 ) -> eyre::Result<Option<(i32, i32)>> { 256 - let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?; 257 - 258 256 let res = http 259 257 .get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={did}")) 260 258 .send() 261 259 .await? 262 260 .error_for_status()?; 261 + 262 + let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?; 263 263 264 264 let headers = res.headers(); 265 265 let ratelimit_rem = header_to_int(headers, "ratelimit-remaining");
+1 -1
consumer/src/backfill/mod.rs
··· 131 131 } 132 132 } 133 133 134 - #[instrument(skip(conn, inner))] 134 + #[instrument(skip(conn, rc, inner))] 135 135 async fn backfill_actor( 136 136 conn: &mut Object, 137 137 rc: &mut MultiplexedConnection,
+5 -2
consumer/src/backfill/repo.rs
··· 1 1 use super::{ 2 - types::{CarCommitEntry, CarEntry}, 2 + types::{CarCommitEntry, CarEntry, CarRecordEntry}, 3 3 CopyStore, 4 4 }; 5 5 use crate::indexer::records; ··· 54 54 CarEntry::Commit(_) => { 55 55 tracing::warn!("got commit entry that was not in root") 56 56 } 57 - CarEntry::Record(record) => { 57 + CarEntry::Record(CarRecordEntry::Known(record)) => { 58 58 if let Some(path) = mst_nodes.remove(&cid) { 59 59 record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?; 60 60 } else { 61 61 records.insert(cid, record); 62 62 } 63 + } 64 + CarEntry::Record(CarRecordEntry::Other { ty }) => { 65 + tracing::debug!("repo contains unknown record type: {ty} ({cid})"); 63 66 } 64 67 CarEntry::Mst(mst) => { 65 68 let mut out = Vec::with_capacity(mst.e.len());
+11 -1
consumer/src/backfill/types.rs
··· 8 8 pub enum CarEntry { 9 9 Mst(CarMstEntry), 10 10 Commit(CarCommitEntry), 11 - Record(RecordTypes), 11 + Record(CarRecordEntry), 12 12 } 13 13 14 14 #[derive(Debug, Deserialize)] ··· 33 33 pub rev: String, 34 34 pub prev: Option<Cid>, 35 35 pub sig: ByteBuf, 36 + } 37 + 38 + #[derive(Debug, Deserialize)] 39 + #[serde(untagged)] 40 + pub enum CarRecordEntry { 41 + Known(RecordTypes), 42 + Other { 43 + #[serde(rename = "$type")] 44 + ty: String, 45 + }, 36 46 } 37 47 38 48 #[derive(Debug, Deserialize)]
+1 -1
consumer/src/db/backfill.rs
··· 19 19 status: &str, 20 20 ) -> PgExecResult { 21 21 conn.execute( 22 - "INSERT INTO backfill_jobs (did, status) VALUES ($1, $2)", 22 + "INSERT INTO backfill_jobs (did, status) VALUES ($1, $2) ON CONFLICT (did) DO UPDATE SET status = $2, updated_at = NOW()", 23 23 &[&did, &status], 24 24 ) 25 25 .await
+32
consumer/src/db/record.rs
··· 4 4 use chrono::prelude::*; 5 5 use deadpool_postgres::GenericClient; 6 6 use ipld_core::cid::Cid; 7 + use lexica::community_lexicon::bookmarks::Bookmark; 7 8 8 9 pub async fn record_upsert<C: GenericClient>( 9 10 conn: &mut C, ··· 20 21 pub async fn record_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { 21 22 conn.execute("DELETE FROM records WHERE at_uri=$1", &[&at_uri]) 22 23 .await 24 + } 25 + 26 + pub async fn bookmark_upsert<C: GenericClient>( 27 + conn: &mut C, 28 + rkey: &str, 29 + repo: &str, 30 + rec: Bookmark, 31 + ) -> PgExecResult { 32 + // strip "at://" then break into parts by '/' 33 + let rec_type = match rec.subject.strip_prefix("at://") { 34 + Some(at_uri) => at_uri.split('/').collect::<Vec<_>>()[1], 35 + None => "$uri", 36 + }; 37 + 38 + conn.execute( 39 + include_str!("sql/bookmarks_upsert.sql"), 40 + &[&repo, &rkey, &rec.subject, &rec_type, &rec.tags, &rec.created_at], 41 + ) 42 + .await 43 + } 44 + 45 + pub async fn bookmark_delete<C: GenericClient>( 46 + conn: &mut C, 47 + rkey: &str, 48 + repo: &str, 49 + ) -> PgExecResult { 50 + conn.execute( 51 + "DELETE FROM bookmarks WHERE rkey=$1 AND did=$2", 52 + &[&rkey, &repo], 53 + ) 54 + .await 23 55 } 24 56 25 57 pub async fn block_insert<C: GenericClient>(
+5
consumer/src/db/sql/bookmarks_upsert.sql
··· 1 + INSERT INTO bookmarks (did, rkey, subject, subject_type, tags, created_at) 2 + VALUES ($1, $2, $3, $4, $5, $6) 3 + ON CONFLICT (did, rkey) DO UPDATE SET subject=EXCLUDED.subject, 4 + subject_type=EXCLUDED.subject_type, 5 + tags=EXCLUDED.tags
+15
consumer/src/firehose/mod.rs
··· 117 117 118 118 FirehoseEvent::Label(event) 119 119 } 120 + "#sync" => { 121 + counter!("firehose_events.total", "event" => "sync").increment(1); 122 + let event: AtpSyncEvent = 123 + serde_ipld_dagcbor::from_reader(&mut reader)?; 124 + 125 + // increment the seq 126 + if self.seq < event.seq { 127 + self.seq = event.seq; 128 + } else { 129 + tracing::error!("Event sequence was not greater than previous seq, exiting. {} <= {}", event.seq, self.seq); 130 + return Ok(FirehoseOutput::Close); 131 + } 132 + 133 + FirehoseEvent::Sync(event) 134 + } 120 135 _ => { 121 136 tracing::warn!("unknown event type {ty}"); 122 137 return Ok(FirehoseOutput::Continue);
+23
consumer/src/firehose/types.rs
··· 31 31 Account(AtpAccountEvent), 32 32 Commit(AtpCommitEvent), 33 33 Label(AtpLabelEvent), 34 + Sync(AtpSyncEvent), 34 35 } 35 36 36 37 #[derive(Debug, Deserialize)] ··· 48 49 Suspended, 49 50 Deleted, 50 51 Deactivated, 52 + Throttled, 53 + Desynchronized, 51 54 } 52 55 53 56 impl AtpAccountStatus { ··· 57 60 AtpAccountStatus::Suspended => "suspended", 58 61 AtpAccountStatus::Deleted => "deleted", 59 62 AtpAccountStatus::Deactivated => "deactivated", 63 + AtpAccountStatus::Throttled => "throttled", 64 + AtpAccountStatus::Desynchronized => "desynchronized", 60 65 } 61 66 } 62 67 } ··· 68 73 AtpAccountStatus::Suspended => parakeet_db::types::ActorStatus::Suspended, 69 74 AtpAccountStatus::Deleted => parakeet_db::types::ActorStatus::Deleted, 70 75 AtpAccountStatus::Deactivated => parakeet_db::types::ActorStatus::Deactivated, 76 + AtpAccountStatus::Throttled | AtpAccountStatus::Desynchronized => { 77 + parakeet_db::types::ActorStatus::Active 78 + } 71 79 } 72 80 } 73 81 } ··· 90 98 pub since: Option<String>, 91 99 pub commit: Cid, 92 100 #[serde(rename = "tooBig")] 101 + #[deprecated] 93 102 pub too_big: bool, 94 103 #[serde(default)] 95 104 pub blocks: ByteBuf, 96 105 #[serde(default)] 97 106 pub ops: Vec<CommitOp>, 98 107 #[serde(default)] 108 + #[deprecated] 99 109 pub blobs: Vec<Cid>, 110 + #[serde(rename = "prevData")] 111 + pub prev_data: Option<Cid>, 100 112 } 101 113 102 114 #[derive(Debug, Deserialize)] 103 115 pub struct CommitOp { 104 116 pub action: String, 105 117 pub cid: Option<Cid>, 118 + pub prev: Option<Cid>, 106 119 pub path: String, 107 120 } 108 121 ··· 124 137 pub seq: u64, 125 138 pub labels: Vec<AtpLabel>, 126 139 } 140 + 141 + #[derive(Debug, Deserialize)] 142 + pub struct AtpSyncEvent { 143 + pub seq: u64, 144 + pub did: String, 145 + pub time: DateTime<Utc>, 146 + pub rev: String, 147 + #[serde(default)] 148 + pub blocks: ByteBuf, 149 + }
+32 -2
consumer/src/indexer/mod.rs
··· 1 1 use crate::config::HistoryMode; 2 2 use crate::db; 3 3 use crate::firehose::{ 4 - AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, CommitOp, FirehoseConsumer, FirehoseEvent, 5 - FirehoseOutput, 4 + AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, AtpSyncEvent, CommitOp, FirehoseConsumer, 5 + FirehoseEvent, FirehoseOutput, 6 6 }; 7 7 use crate::indexer::types::{ 8 8 AggregateDeltaStore, BackfillItem, BackfillItemInner, CollectionType, RecordTypes, ··· 107 107 FirehoseEvent::Commit(commit) => { 108 108 index_commit(&mut state, &mut conn, &mut rc, commit).await 109 109 } 110 + FirehoseEvent::Sync(sync) => { 111 + process_sync(&state, &mut conn, &mut rc, sync).await 112 + } 110 113 FirehoseEvent::Label(_) => unreachable!(), 111 114 }; 112 115 ··· 188 191 FirehoseEvent::Identity(identity) => self.hasher.hash_one(&identity.did) % threads, 189 192 FirehoseEvent::Account(account) => self.hasher.hash_one(&account.did) % threads, 190 193 FirehoseEvent::Commit(commit) => self.hasher.hash_one(&commit.repo) % threads, 194 + FirehoseEvent::Sync(sync) => self.hasher.hash_one(&sync.did) % threads, 191 195 FirehoseEvent::Label(_) => { 192 196 // We handle all labels through direct connections to labelers 193 197 tracing::warn!("got #labels from the relay"); ··· 199 203 tracing::error!("Error sending event: {e}"); 200 204 } 201 205 } 206 + } 207 + 208 + #[instrument(skip_all, fields(seq = sync.seq, repo = sync.did))] 209 + async fn process_sync( 210 + state: &RelayIndexerState, 211 + conn: &mut Object, 212 + rc: &mut MultiplexedConnection, 213 + sync: AtpSyncEvent, 214 + ) -> eyre::Result<()> { 215 + let Some((sync_state, Some(current_rev))) = db::actor_get_repo_status(conn, &sync.did).await? else { 216 + return Ok(()); 217 + }; 218 + 219 + // don't care if we're not synced. also no point if !do_backfill bc we might not have a worker 220 + if sync_state == ActorSyncState::Synced && state.do_backfill && sync.rev > current_rev { 221 + tracing::debug!("triggering backfill due to #sync"); 222 + rc.rpush::<_, _, i32>("backfill_queue", sync.did).await?; 223 + } 224 + 225 + Ok(()) 202 226 } 203 227 204 228 #[instrument(skip_all, fields(seq = identity.seq, repo = identity.did))] ··· 723 747 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 724 748 } 725 749 } 750 + RecordTypes::CommunityLexiconBookmark(record) => { 751 + db::bookmark_upsert(conn, rkey, repo, record).await?; 752 + } 726 753 } 727 754 728 755 db::record_upsert(conn, at_uri, repo, cid).await?; ··· 832 859 CollectionType::ChatActorDecl => { 833 860 redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?; 834 861 db::chat_decl_delete(conn, repo).await? 862 + } 863 + CollectionType::CommunityLexiconBookmark => { 864 + db::bookmark_delete(conn, rkey, repo).await? 835 865 } 836 866 _ => unreachable!(), 837 867 };
+5
consumer/src/indexer/types.rs
··· 41 41 AppBskyNotificationDeclaration(records::AppBskyNotificationDeclaration), 42 42 #[serde(rename = "chat.bsky.actor.declaration")] 43 43 ChatBskyActorDeclaration(records::ChatBskyActorDeclaration), 44 + #[serde(rename = "community.lexicon.bookmarks.bookmark")] 45 + CommunityLexiconBookmark(lexica::community_lexicon::bookmarks::Bookmark) 44 46 } 45 47 46 48 #[derive(Debug, PartialOrd, PartialEq, Deserialize, Serialize)] ··· 63 65 BskyLabelerService, 64 66 BskyNotificationDeclaration, 65 67 ChatActorDecl, 68 + CommunityLexiconBookmark, 66 69 Unsupported, 67 70 } 68 71 ··· 87 90 "app.bsky.labeler.service" => CollectionType::BskyLabelerService, 88 91 "app.bsky.notification.declaration" => CollectionType::BskyNotificationDeclaration, 89 92 "chat.bsky.actor.declaration" => CollectionType::ChatActorDecl, 93 + "community.lexicon.bookmarks.bookmark" => CollectionType::CommunityLexiconBookmark, 90 94 _ => CollectionType::Unsupported, 91 95 } 92 96 } ··· 111 115 CollectionType::BskyVerification => false, 112 116 CollectionType::BskyLabelerService => true, 113 117 CollectionType::BskyNotificationDeclaration => true, 118 + CollectionType::CommunityLexiconBookmark => true, 114 119 CollectionType::Unsupported => false, 115 120 } 116 121 }
+98
flake.lock
··· 1 + { 2 + "nodes": { 3 + "crane": { 4 + "locked": { 5 + "lastModified": 1757183466, 6 + "narHash": "sha256-kTdCCMuRE+/HNHES5JYsbRHmgtr+l9mOtf5dpcMppVc=", 7 + "owner": "ipetkov", 8 + "repo": "crane", 9 + "rev": "d599ae4847e7f87603e7082d73ca673aa93c916d", 10 + "type": "github" 11 + }, 12 + "original": { 13 + "owner": "ipetkov", 14 + "repo": "crane", 15 + "type": "github" 16 + } 17 + }, 18 + "flake-utils": { 19 + "inputs": { 20 + "systems": "systems" 21 + }, 22 + "locked": { 23 + "lastModified": 1731533236, 24 + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", 25 + "owner": "numtide", 26 + "repo": "flake-utils", 27 + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", 28 + "type": "github" 29 + }, 30 + "original": { 31 + "owner": "numtide", 32 + "repo": "flake-utils", 33 + "type": "github" 34 + } 35 + }, 36 + "nixpkgs": { 37 + "locked": { 38 + "lastModified": 1758029226, 39 + "narHash": "sha256-TjqVmbpoCqWywY9xIZLTf6ANFvDCXdctCjoYuYPYdMI=", 40 + "owner": "NixOS", 41 + "repo": "nixpkgs", 42 + "rev": "08b8f92ac6354983f5382124fef6006cade4a1c1", 43 + "type": "github" 44 + }, 45 + "original": { 46 + "owner": "NixOS", 47 + "ref": "nixpkgs-unstable", 48 + "repo": "nixpkgs", 49 + "type": "github" 50 + } 51 + }, 52 + "root": { 53 + "inputs": { 54 + "crane": "crane", 55 + "flake-utils": "flake-utils", 56 + "nixpkgs": "nixpkgs", 57 + "rust-overlay": "rust-overlay" 58 + } 59 + }, 60 + "rust-overlay": { 61 + "inputs": { 62 + "nixpkgs": [ 63 + "nixpkgs" 64 + ] 65 + }, 66 + "locked": { 67 + "lastModified": 1758162771, 68 + "narHash": "sha256-hdZpMep6Z1gbgg9piUZ0BNusI6ZJaptBw6PHSN/3GD0=", 69 + "owner": "oxalica", 70 + "repo": "rust-overlay", 71 + "rev": "d0cabb6ae8f5b38dffaff9f4e6db57c0ae21d729", 72 + "type": "github" 73 + }, 74 + "original": { 75 + "owner": "oxalica", 76 + "repo": "rust-overlay", 77 + "type": "github" 78 + } 79 + }, 80 + "systems": { 81 + "locked": { 82 + "lastModified": 1681028828, 83 + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", 84 + "owner": "nix-systems", 85 + "repo": "default", 86 + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", 87 + "type": "github" 88 + }, 89 + "original": { 90 + "owner": "nix-systems", 91 + "repo": "default", 92 + "type": "github" 93 + } 94 + } 95 + }, 96 + "root": "root", 97 + "version": 7 98 + }
+464
flake.nix
··· 1 + { 2 + description = "Parakeet is a Rust-based Bluesky AppView"; 3 + inputs = { 4 + nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable"; 5 + crane.url = "github:ipetkov/crane"; 6 + flake-utils.url = "github:numtide/flake-utils"; 7 + rust-overlay = { 8 + url = "github:oxalica/rust-overlay"; 9 + inputs.nixpkgs.follows = "nixpkgs"; 10 + }; 11 + }; 12 + outputs = 13 + { 14 + self, 15 + nixpkgs, 16 + crane, 17 + flake-utils, 18 + rust-overlay, 19 + ... 20 + }: 21 + flake-utils.lib.eachDefaultSystem ( 22 + system: 23 + let 24 + pkgs = import nixpkgs { 25 + inherit system; 26 + overlays = [ (import rust-overlay) ]; 27 + }; 28 + craneLib = (crane.mkLib pkgs).overrideToolchain ( 29 + p: 30 + p.rust-bin.selectLatestNightlyWith ( 31 + toolchain: 32 + toolchain.default.override { 33 + extensions = [ 34 + "rust-src" 35 + "rust-analyzer" 36 + ]; 37 + } 38 + ) 39 + ); 40 + 41 + inherit (pkgs) lib; 42 + unfilteredRoot = ./.; # The original, unfiltered source 43 + src = lib.fileset.toSource { 44 + root = unfilteredRoot; 45 + fileset = lib.fileset.unions [ 46 + # Default files from crane (Rust and cargo files) 47 + (craneLib.fileset.commonCargoSources unfilteredRoot) 48 + ]; 49 + }; 50 + # Common arguments can be set here to avoid repeating them later 51 + commonArgs = { 52 + inherit src; 53 + strictDeps = true; 54 + nativeBuildInputs = with pkgs; [ 55 + pkg-config 56 + ]; 57 + buildInputs = [ 58 + # Add additional build inputs here 59 + pkgs.openssl 60 + pkgs.postgresql 61 + pkgs.libpq 62 + pkgs.clang 63 + pkgs.libclang 64 + pkgs.lld 65 + pkgs.protobuf 66 + ] 67 + ++ lib.optionals pkgs.stdenv.isDarwin [ 68 + # Additional darwin specific inputs can be set here 69 + pkgs.libiconv 70 + pkgs.darwin.apple_sdk.frameworks.Security 71 + ]; 72 + LIBCLANG_PATH = "${pkgs.llvmPackages_18.libclang.lib}/lib"; 73 + CLANG_PATH = "${pkgs.llvmPackages_18.clang}/bin/clang"; 74 + PROTOC_INCLUDE = "${pkgs.protobuf}/include"; 75 + PROTOC = "${pkgs.protobuf}/bin/protoc"; 76 + 77 + # Additional environment variables can be set directly 78 + # MY_CUSTOM_VAR = "some value"; 79 + }; 80 + 81 + # Build *just* the cargo dependencies, so we can reuse 82 + # all of that work (e.g. via cachix) when running in CI 83 + cargoArtifacts = craneLib.buildDepsOnly commonArgs; 84 + 85 + individualCrateArgs = commonArgs // { 86 + inherit cargoArtifacts; 87 + inherit (craneLib.crateNameFromCargoToml { inherit src; }) version; 88 + # NB: we disable tests since we'll run them all via cargo-nextest 89 + doCheck = false; 90 + }; 91 + fileSetForCrate = 92 + crate: 93 + lib.fileset.toSource { 94 + root = ./.; 95 + fileset = lib.fileset.unions [ 96 + ./Cargo.toml 97 + ./Cargo.lock 98 + ./migrations 99 + (craneLib.fileset.commonCargoSources ./consumer) 100 + ./consumer/src/db/sql 101 + (craneLib.fileset.commonCargoSources ./dataloader-rs) 102 + (craneLib.fileset.commonCargoSources ./did-resolver) 103 + (craneLib.fileset.commonCargoSources ./lexica) 104 + (craneLib.fileset.commonCargoSources ./parakeet) 105 + ./parakeet/src/sql 106 + (craneLib.fileset.commonCargoSources ./parakeet-db) 107 + (craneLib.fileset.commonCargoSources ./parakeet-index) 108 + ./parakeet-index/proto 109 + (craneLib.fileset.commonCargoSources ./parakeet-lexgen) 110 + (craneLib.fileset.commonCargoSources crate) 111 + ]; 112 + }; 113 + 114 + # Build the actual crate itself, reusing the dependency 115 + # artifacts from above. 116 + consumer = craneLib.buildPackage ( 117 + individualCrateArgs 118 + // { 119 + pname = "consumer"; 120 + cargoExtraArgs = "-p consumer"; 121 + src = fileSetForCrate ./consumer; 122 + postInstall = '' 123 + mkdir -p $out/{bin,lib/consumer} 124 + ''; 125 + } 126 + ); 127 + dataloader = craneLib.buildPackage ( 128 + individualCrateArgs 129 + // { 130 + pname = "dataloader"; 131 + cargoExtraArgs = "-p dataloader --features default"; 132 + src = fileSetForCrate ./dataloader-rs; 133 + } 134 + ); 135 + did-resolver = craneLib.buildPackage ( 136 + individualCrateArgs 137 + // { 138 + pname = "did-resolver"; 139 + cargoExtraArgs = "-p did-resolver"; 140 + src = fileSetForCrate ./did-resolver; 141 + } 142 + ); 143 + lexica = craneLib.buildPackage ( 144 + individualCrateArgs 145 + // { 146 + pname = "lexica"; 147 + cargoExtraArgs = "-p lexica"; 148 + src = fileSetForCrate ./lexica; 149 + } 150 + ); 151 + parakeet = craneLib.buildPackage ( 152 + individualCrateArgs 153 + // { 154 + pname = "parakeet"; 155 + cargoExtraArgs = "-p parakeet"; 156 + src = fileSetForCrate ./parakeet; 157 + } 158 + ); 159 + parakeet-db = craneLib.buildPackage ( 160 + individualCrateArgs 161 + // { 162 + pname = "parakeet-db"; 163 + cargoExtraArgs = "-p parakeet-db --features default"; 164 + src = fileSetForCrate ./parakeet-db; 165 + } 166 + ); 167 + parakeet-index = craneLib.buildPackage ( 168 + individualCrateArgs 169 + // { 170 + pname = "parakeet-index"; 171 + cargoExtraArgs = "-p parakeet-index --features server"; 172 + src = fileSetForCrate ./parakeet-index; 173 + } 174 + ); 175 + parakeet-lexgen = craneLib.buildPackage ( 176 + individualCrateArgs 177 + // { 178 + pname = "parakeet-lexgen"; 179 + cargoExtraArgs = "-p parakeet-lexgen"; 180 + src = fileSetForCrate ./parakeet-lexgen; 181 + } 182 + ); 183 + in 184 + { 185 + checks = { 186 + # Build the crate as part of `nix flake check` for convenience 187 + inherit 188 + consumer 189 + dataloader 190 + did-resolver 191 + lexica 192 + parakeet 193 + parakeet-db 194 + parakeet-index 195 + parakeet-lexgen 196 + ; 197 + }; 198 + 199 + packages = { 200 + default = parakeet; 201 + inherit 202 + consumer 203 + dataloader 204 + did-resolver 205 + lexica 206 + parakeet 207 + parakeet-db 208 + parakeet-index 209 + parakeet-lexgen 210 + ; 211 + }; 212 + 213 + devShells.default = craneLib.devShell { 214 + # Inherit inputs from checks. 215 + checks = self.checks.${system}; 216 + 217 + # Additional dev-shell environment variables can be set directly 218 + RUST_BACKTRACE = 1; 219 + NIXOS_OZONE_WL = 1; 220 + LIBCLANG_PATH = "${pkgs.llvmPackages.libclang.lib}/lib"; 221 + 222 + # Extra inputs can be added here; cargo and rustc are provided by default. 223 + packages = with pkgs; [ 224 + openssl 225 + bacon 226 + postgresql 227 + rust-analyzer 228 + rustfmt 229 + clippy 230 + git 231 + nixd 232 + direnv 233 + libpq 234 + clang 235 + libclang 236 + ]; 237 + }; 238 + } 239 + ) 240 + // flake-utils.lib.eachDefaultSystemPassThrough (system: { 241 + nixosModules = { 242 + default = 243 + { 244 + pkgs, 245 + lib, 246 + config, 247 + ... 248 + }: 249 + with lib; 250 + let 251 + cfg = config.services.parakeet; 252 + 253 + inherit (lib) 254 + mkEnableOption 255 + mkIf 256 + mkOption 257 + types 258 + ; 259 + in 260 + { 261 + options.services.parakeet = { 262 + enable = mkEnableOption "parakeet"; 263 + 264 + package = mkOption { 265 + type = types.package; 266 + default = self.packages.${pkgs.system}.default; 267 + description = "The path to the parakeet package."; 268 + }; 269 + 270 + environmentFiles = mkOption { 271 + type = types.listOf types.path; 272 + default = [ "/var/lib/parakeet/config.env" ]; 273 + description = '' 274 + File to load environment variables from. Loaded variables override 275 + values set in {option}`environment`. 276 + ''; 277 + }; 278 + }; 279 + config = mkIf cfg.enable { 280 + environment.systemPackages = [ 281 + self.packages.${pkgs.system}.consumer 282 + ]; 283 + systemd.services.consumer = { 284 + description = "consumer"; 285 + after = [ "network-online.target" ]; 286 + wants = [ "network-online.target" ]; 287 + wantedBy = [ "multi-user.target" ]; 288 + serviceConfig = { 289 + ExecStart = "${self.packages.${pkgs.system}.consumer}/bin/consumer --indexer"; 290 + Type = "exec"; 291 + 292 + EnvironmentFile = cfg.environmentFiles; 293 + User = "parakeet"; 294 + Group = "parakeet"; 295 + StateDirectory = "parakeet"; 296 + StateDirectoryMode = "0755"; 297 + Restart = "always"; 298 + 299 + # Hardening 300 + RemoveIPC = true; 301 + CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ]; 302 + NoNewPrivileges = true; 303 + PrivateDevices = true; 304 + ProtectClock = true; 305 + ProtectKernelLogs = true; 306 + ProtectControlGroups = true; 307 + ProtectKernelModules = true; 308 + PrivateMounts = true; 309 + SystemCallArchitectures = [ "native" ]; 310 + MemoryDenyWriteExecute = false; # required by V8 JIT 311 + RestrictNamespaces = true; 312 + RestrictSUIDSGID = true; 313 + ProtectHostname = true; 314 + LockPersonality = true; 315 + ProtectKernelTunables = true; 316 + RestrictAddressFamilies = [ 317 + "AF_UNIX" 318 + "AF_INET" 319 + "AF_INET6" 320 + ]; 321 + RestrictRealtime = true; 322 + DeviceAllow = [ "" ]; 323 + ProtectSystem = "full"; 324 + ProtectProc = "invisible"; 325 + ProcSubset = "pid"; 326 + ProtectHome = true; 327 + PrivateUsers = true; 328 + PrivateTmp = true; 329 + UMask = "0077"; 330 + }; 331 + }; 332 + systemd.services.parakeet = { 333 + description = "parakeet"; 334 + after = [ "network-online.target" ]; 335 + wants = [ "network-online.target" ]; 336 + wantedBy = [ "multi-user.target" ]; 337 + serviceConfig = { 338 + ExecStart = "${cfg.package}/bin/parakeet"; 339 + Type = "exec"; 340 + 341 + EnvironmentFile = cfg.environmentFiles; 342 + User = "parakeet"; 343 + Group = "parakeet"; 344 + StateDirectory = "parakeet"; 345 + StateDirectoryMode = "0755"; 346 + Restart = "always"; 347 + 348 + # Hardening 349 + RemoveIPC = true; 350 + CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ]; 351 + NoNewPrivileges = true; 352 + PrivateDevices = true; 353 + ProtectClock = true; 354 + ProtectKernelLogs = true; 355 + ProtectControlGroups = true; 356 + ProtectKernelModules = true; 357 + PrivateMounts = true; 358 + SystemCallArchitectures = [ "native" ]; 359 + MemoryDenyWriteExecute = false; # required by V8 JIT 360 + RestrictNamespaces = true; 361 + RestrictSUIDSGID = true; 362 + ProtectHostname = true; 363 + LockPersonality = true; 364 + ProtectKernelTunables = true; 365 + RestrictAddressFamilies = [ 366 + "AF_UNIX" 367 + "AF_INET" 368 + "AF_INET6" 369 + ]; 370 + RestrictRealtime = true; 371 + DeviceAllow = [ "" ]; 372 + ProtectSystem = "full"; 373 + ProtectProc = "invisible"; 374 + ProcSubset = "pid"; 375 + ProtectHome = true; 376 + PrivateUsers = true; 377 + PrivateTmp = true; 378 + UMask = "0077"; 379 + }; 380 + }; 381 + systemd.services.parakeet-index = { 382 + description = "parakeet-index"; 383 + after = [ "network-online.target" ]; 384 + wants = [ "network-online.target" ]; 385 + wantedBy = [ "multi-user.target" ]; 386 + serviceConfig = { 387 + ExecStart = "${self.packages.${pkgs.system}.parakeet-index}/bin/parakeet-index"; 388 + Type = "exec"; 389 + 390 + EnvironmentFile = cfg.environmentFiles; 391 + User = "parakeet"; 392 + Group = "parakeet"; 393 + StateDirectory = "parakeet"; 394 + StateDirectoryMode = "0755"; 395 + Restart = "always"; 396 + 397 + # Hardening 398 + RemoveIPC = true; 399 + CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ]; 400 + NoNewPrivileges = true; 401 + PrivateDevices = true; 402 + ProtectClock = true; 403 + ProtectKernelLogs = true; 404 + ProtectControlGroups = true; 405 + ProtectKernelModules = true; 406 + PrivateMounts = true; 407 + SystemCallArchitectures = [ "native" ]; 408 + MemoryDenyWriteExecute = false; # required by V8 JIT 409 + RestrictNamespaces = true; 410 + RestrictSUIDSGID = true; 411 + ProtectHostname = true; 412 + LockPersonality = true; 413 + ProtectKernelTunables = true; 414 + RestrictAddressFamilies = [ 415 + "AF_UNIX" 416 + "AF_INET" 417 + "AF_INET6" 418 + ]; 419 + RestrictRealtime = true; 420 + DeviceAllow = [ "" ]; 421 + ProtectSystem = "full"; 422 + ProtectProc = "invisible"; 423 + ProcSubset = "pid"; 424 + ProtectHome = true; 425 + PrivateUsers = true; 426 + PrivateTmp = true; 427 + UMask = "0077"; 428 + }; 429 + }; 430 + users = { 431 + users.parakeet = { 432 + group = "parakeet"; 433 + isSystemUser = true; 434 + }; 435 + groups.parakeet = { }; 436 + }; 437 + services.postgresql = { 438 + enable = true; 439 + ensureUsers = [ 440 + { 441 + name = "parakeet"; 442 + ensureDBOwnership = true; 443 + } 444 + ]; 445 + ensureDatabases = [ "parakeet" ]; 446 + authentication = pkgs.lib.mkOverride 10 '' 447 + #type database DBuser auth-method 448 + local all all trust 449 + host all all 127.0.0.1/32 trust 450 + host all all ::1/128 trust 451 + ''; 452 + package = mkForce pkgs.postgresql_16; 453 + }; 454 + services.redis.servers.parakeet = { 455 + enable = true; 456 + # port = 0; 457 + unixSocket = "/run/redis-parakeet/redis.sock"; 458 + user = "parakeet"; 459 + }; 460 + }; 461 + }; 462 + }; 463 + }); 464 + }
+32
lexica/src/app_bsky/bookmark.rs
··· 1 + use crate::app_bsky::feed::{BlockedAuthor, PostView}; 2 + use crate::StrongRef; 3 + use chrono::prelude::*; 4 + use serde::Serialize; 5 + 6 + #[derive(Clone, Debug, Serialize)] 7 + #[serde(rename_all = "camelCase")] 8 + pub struct BookmarkView { 9 + pub subject: StrongRef, 10 + pub item: BookmarkViewItem, 11 + pub created_at: DateTime<Utc>, 12 + } 13 + 14 + #[derive(Clone, Debug, Serialize)] 15 + #[serde(tag = "$type")] 16 + // This is technically the same as ReplyRefPost atm, but just in case... 17 + pub enum BookmarkViewItem { 18 + #[serde(rename = "app.bsky.feed.defs#postView")] 19 + Post(PostView), 20 + #[serde(rename = "app.bsky.feed.defs#notFoundPost")] 21 + NotFound { 22 + uri: String, 23 + #[serde(rename = "notFound")] 24 + not_found: bool, 25 + }, 26 + #[serde(rename = "app.bsky.feed.defs#blockedPost")] 27 + Blocked { 28 + uri: String, 29 + blocked: bool, 30 + author: BlockedAuthor, 31 + }, 32 + }
+1 -1
lexica/src/app_bsky/mod.rs
··· 1 1 use serde::Serialize; 2 2 3 3 pub mod actor; 4 + pub mod bookmark; 4 5 pub mod embed; 5 6 pub mod feed; 6 7 pub mod graph; 7 8 pub mod labeler; 8 9 pub mod richtext; 9 - pub mod unspecced; 10 10 11 11 #[derive(Clone, Default, Debug, Serialize)] 12 12 #[serde(rename_all = "camelCase")]
-33
lexica/src/app_bsky/unspecced.rs
··· 1 - use crate::app_bsky::feed::{BlockedAuthor, PostView}; 2 - use serde::Serialize; 3 - 4 - #[derive(Clone, Debug, Serialize)] 5 - pub struct ThreadV2Item { 6 - pub uri: String, 7 - pub depth: i32, 8 - pub value: ThreadV2ItemType, 9 - } 10 - 11 - #[derive(Clone, Debug, Serialize)] 12 - #[serde(tag = "$type")] 13 - pub enum ThreadV2ItemType { 14 - #[serde(rename = "app.bsky.unspecced.defs#threadItemPost")] 15 - Post(ThreadItemPost), 16 - #[serde(rename = "app.bsky.unspecced.defs#threadItemNoUnauthenticated")] 17 - NoUnauthenticated {}, 18 - #[serde(rename = "app.bsky.unspecced.defs#threadItemNotFound")] 19 - NotFound {}, 20 - #[serde(rename = "app.bsky.unspecced.defs#threadItemBlocked")] 21 - Blocked { author: BlockedAuthor }, 22 - } 23 - 24 - #[derive(Clone, Debug, Serialize)] 25 - #[serde(rename_all = "camelCase")] 26 - pub struct ThreadItemPost { 27 - pub post: PostView, 28 - pub more_parents: bool, 29 - pub more_replies: i32, 30 - pub op_thread: bool, 31 - pub hidden_by_threadgate: bool, 32 - pub muted_by_viewer: bool, 33 - }
+14
lexica/src/community_lexicon/bookmarks.rs
··· 1 + use chrono::prelude::*; 2 + use serde::{Deserialize, Serialize}; 3 + 4 + #[derive(Clone, Debug, Deserialize, Serialize)] 5 + #[serde(tag = "$type")] 6 + #[serde(rename = "community.lexicon.bookmarks.bookmark")] 7 + #[serde(rename_all = "camelCase")] 8 + pub struct Bookmark { 9 + pub subject: String, 10 + #[serde(default)] 11 + #[serde(skip_serializing_if = "Vec::is_empty")] 12 + pub tags: Vec<String>, 13 + pub created_at: DateTime<Utc>, 14 + }
+1
lexica/src/community_lexicon/mod.rs
··· 1 + pub mod bookmarks;
+8
lexica/src/lib.rs
··· 5 5 6 6 pub mod app_bsky; 7 7 pub mod com_atproto; 8 + pub mod community_lexicon; 8 9 mod utils; 9 10 10 11 #[derive(Clone, Debug, Serialize)] ··· 21 22 )] 22 23 pub cid: Cid, 23 24 pub uri: String, 25 + } 26 + 27 + impl StrongRef { 28 + pub fn new_from_str(uri: String, cid: &str) -> Result<Self, cid::Error> { 29 + let cid = cid.parse()?; 30 + Ok(StrongRef { uri, cid }) 31 + } 24 32 } 25 33 26 34 #[derive(Clone, Debug, Deserialize, Serialize)]
+1
migrations/2025-09-02-190833_bookmarks/down.sql
··· 1 + drop table bookmarks;
+19
migrations/2025-09-02-190833_bookmarks/up.sql
··· 1 + create table bookmarks 2 + ( 3 + did text not null references actors (did), 4 + rkey text, 5 + subject text not null, 6 + subject_cid text, 7 + subject_type text not null, 8 + tags text[] not null default ARRAY []::text[], 9 + 10 + created_at timestamptz not null default now(), 11 + 12 + primary key (did, subject) 13 + ); 14 + 15 + create index bookmarks_rkey_index on bookmarks (rkey); 16 + create index bookmarks_subject_index on bookmarks (subject); 17 + create index bookmarks_subject_type_index on bookmarks (subject_type); 18 + create index bookmarks_tags_index on bookmarks using gin (tags); 19 + create unique index bookmarks_rkey_ui on bookmarks (did, rkey);
-30
parakeet/src/db.rs
··· 13 13 .await 14 14 .optional() 15 15 } 16 - 17 - #[derive(Debug, QueryableByName)] 18 - #[diesel(check_for_backend(diesel::pg::Pg))] 19 - #[allow(unused)] 20 - pub struct ThreadItem { 21 - #[diesel(sql_type = diesel::sql_types::Text)] 22 - pub at_uri: String, 23 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 24 - pub parent_uri: Option<String>, 25 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 26 - pub root_uri: Option<String>, 27 - #[diesel(sql_type = diesel::sql_types::Integer)] 28 - pub depth: i32, 29 - } 30 - 31 - pub async fn get_thread_children(conn: &mut AsyncPgConnection, uri: &str, depth: i32) -> QueryResult<Vec<ThreadItem>> { 32 - diesel::sql_query(include_str!("sql/thread.sql")) 33 - .bind::<diesel::sql_types::Text, _>(uri) 34 - .bind::<diesel::sql_types::Integer, _>(depth) 35 - .load(conn) 36 - .await 37 - } 38 - 39 - pub async fn get_thread_parents(conn: &mut AsyncPgConnection, uri: &str, height: i32) -> QueryResult<Vec<ThreadItem>> { 40 - diesel::sql_query(include_str!("sql/thread_parent.sql")) 41 - .bind::<diesel::sql_types::Text, _>(uri) 42 - .bind::<diesel::sql_types::Integer, _>(height) 43 - .load(conn) 44 - .await 45 - }
+146
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 1 + use crate::hydration::StatefulHydrator; 2 + use crate::xrpc::error::XrpcResult; 3 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 4 + use crate::xrpc::{datetime_cursor, CursorQuery}; 5 + use crate::GlobalState; 6 + use axum::extract::{Query, State}; 7 + use axum::Json; 8 + use diesel::prelude::*; 9 + use diesel_async::RunQueryDsl; 10 + use lexica::app_bsky::bookmark::{BookmarkView, BookmarkViewItem}; 11 + use parakeet_db::{models, schema}; 12 + use serde::{Deserialize, Serialize}; 13 + use lexica::StrongRef; 14 + 15 + const BSKY_ALLOWED_TYPES: &[&str] = &["app.bsky.feed.post"]; 16 + 17 + #[derive(Debug, Deserialize)] 18 + pub struct CreateBookmarkReq { 19 + pub uri: String, 20 + pub cid: String, 21 + } 22 + 23 + pub async fn create_bookmark( 24 + State(state): State<GlobalState>, 25 + auth: AtpAuth, 26 + Json(form): Json<CreateBookmarkReq>, 27 + ) -> XrpcResult<()> { 28 + let mut conn = state.pool.get().await?; 29 + 30 + // strip "at://" then break into parts by '/' 31 + let parts = form.uri[5..].split('/').collect::<Vec<_>>(); 32 + 33 + let data = models::NewBookmark { 34 + did: &auth.0, 35 + rkey: None, 36 + subject: &form.uri, 37 + subject_cid: Some(form.cid), 38 + subject_type: &parts[1], 39 + tags: vec![], 40 + }; 41 + 42 + diesel::insert_into(schema::bookmarks::table) 43 + .values(&data) 44 + .on_conflict_do_nothing() 45 + .execute(&mut conn) 46 + .await?; 47 + 48 + Ok(()) 49 + } 50 + 51 + #[derive(Debug, Deserialize)] 52 + pub struct DeleteBookmarkReq { 53 + pub uri: String, 54 + } 55 + 56 + pub async fn delete_bookmark( 57 + State(state): State<GlobalState>, 58 + auth: AtpAuth, 59 + Json(form): Json<DeleteBookmarkReq>, 60 + ) -> XrpcResult<()> { 61 + let mut conn = state.pool.get().await?; 62 + 63 + diesel::delete(schema::bookmarks::table) 64 + .filter( 65 + schema::bookmarks::did 66 + .eq(&auth.0) 67 + .and(schema::bookmarks::subject.eq(&form.uri)), 68 + ) 69 + .execute(&mut conn) 70 + .await?; 71 + 72 + Ok(()) 73 + } 74 + 75 + #[derive(Debug, Serialize)] 76 + pub struct GetBookmarksRes { 77 + #[serde(skip_serializing_if = "Option::is_none")] 78 + cursor: Option<String>, 79 + bookmarks: Vec<BookmarkView>, 80 + } 81 + 82 + pub async fn get_bookmarks( 83 + State(state): State<GlobalState>, 84 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 85 + auth: AtpAuth, 86 + Query(query): Query<CursorQuery>, 87 + ) -> XrpcResult<Json<GetBookmarksRes>> { 88 + let mut conn = state.pool.get().await?; 89 + let did = auth.0.clone(); 90 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, Some(auth)); 91 + 92 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 93 + 94 + let mut bookmarks_query = schema::bookmarks::table 95 + .select(models::Bookmark::as_select()) 96 + .filter(schema::bookmarks::did.eq(&did)) 97 + .filter(schema::bookmarks::subject_type.eq_any(BSKY_ALLOWED_TYPES)) 98 + .into_boxed(); 99 + 100 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 101 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor)); 102 + } 103 + 104 + let results = bookmarks_query 105 + .order(schema::bookmarks::created_at.desc()) 106 + .limit(limit as i64) 107 + .load(&mut conn) 108 + .await?; 109 + 110 + let cursor = results 111 + .last() 112 + .map(|bm| bm.created_at.timestamp_millis().to_string()); 113 + 114 + let uris = results.iter().map(|bm| bm.subject.clone()).collect(); 115 + 116 + let mut posts = hyd.hydrate_posts(uris).await; 117 + 118 + let bookmarks = results 119 + .into_iter() 120 + .filter_map(|bookmark| { 121 + let maybe_item = posts.remove(&bookmark.subject); 122 + let maybe_cid = maybe_item.as_ref().map(|v| v.cid.clone()); 123 + 124 + // ensure that either the cid is set in the bookmark record *or* in the post record 125 + // otherwise just ditch. we should have one. 126 + let cid = bookmark.subject_cid.or(maybe_cid)?; 127 + 128 + let item = maybe_item.map(BookmarkViewItem::Post).unwrap_or( 129 + BookmarkViewItem::NotFound { 130 + uri: bookmark.subject.clone(), 131 + not_found: true, 132 + }, 133 + ); 134 + 135 + let subject = StrongRef::new_from_str(bookmark.subject, &cid).ok()?; 136 + 137 + Some(BookmarkView { 138 + subject, 139 + item, 140 + created_at: bookmark.created_at, 141 + }) 142 + }) 143 + .collect(); 144 + 145 + Ok(Json(GetBookmarksRes { cursor, bookmarks })) 146 + }
+24 -2
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 321 321 pub threadgate: Option<ThreadgateView>, 322 322 } 323 323 324 + #[derive(Debug, QueryableByName)] 325 + #[diesel(check_for_backend(diesel::pg::Pg))] 326 + struct ThreadItem { 327 + #[diesel(sql_type = diesel::sql_types::Text)] 328 + at_uri: String, 329 + #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 330 + parent_uri: Option<String>, 331 + // #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 332 + // root_uri: Option<String>, 333 + #[diesel(sql_type = diesel::sql_types::Integer)] 334 + depth: i32, 335 + } 336 + 324 337 pub async fn get_post_thread( 325 338 State(state): State<GlobalState>, 326 339 AtpAcceptLabelers(labelers): AtpAcceptLabelers, ··· 334 347 let depth = query.depth.unwrap_or(6).clamp(0, 1000); 335 348 let parent_height = query.parent_height.unwrap_or(80).clamp(0, 1000); 336 349 337 - let replies = crate::db::get_thread_children(&mut conn, &uri, depth as i32).await?; 338 - let parents = crate::db::get_thread_parents(&mut conn, &uri, parent_height as i32).await?; 350 + let replies = diesel::sql_query(include_str!("../../../sql/thread.sql")) 351 + .bind::<diesel::sql_types::Text, _>(&uri) 352 + .bind::<diesel::sql_types::Integer, _>(depth as i32) 353 + .load::<ThreadItem>(&mut conn) 354 + .await?; 355 + 356 + let parents = diesel::sql_query(include_str!("../../../sql/thread_parent.sql")) 357 + .bind::<diesel::sql_types::Text, _>(&uri) 358 + .bind::<diesel::sql_types::Integer, _>(parent_height as i32) 359 + .load::<ThreadItem>(&mut conn) 360 + .await?; 339 361 340 362 let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect(); 341 363 let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect();
+4 -3
parakeet/src/xrpc/app_bsky/mod.rs
··· 2 2 use axum::Router; 3 3 4 4 mod actor; 5 + mod bookmark; 5 6 mod feed; 6 7 mod graph; 7 8 mod labeler; 8 - mod unspecced; 9 9 10 10 #[rustfmt::skip] 11 11 pub fn routes() -> Router<crate::GlobalState> { ··· 15 15 // TODO: app.bsky.actor.getSuggestions (recs) 16 16 // TODO: app.bsky.actor.searchActor (search) 17 17 // TODO: app.bsky.actor.searchActorTypeahead (search) 18 + .route("/app.bsky.bookmark.createBookmark", post(bookmark::create_bookmark)) 19 + .route("/app.bsky.bookmark.deleteBookmark", post(bookmark::delete_bookmark)) 20 + .route("/app.bsky.bookmark.getBookmarks", get(bookmark::get_bookmarks)) 18 21 .route("/app.bsky.feed.getActorFeeds", get(feed::feedgen::get_actor_feeds)) 19 22 .route("/app.bsky.feed.getActorLikes", get(feed::likes::get_actor_likes)) 20 23 .route("/app.bsky.feed.getAuthorFeed", get(feed::posts::get_author_feed)) ··· 59 62 // TODO: app.bsky.notification.putActivitySubscriptions 60 63 // TODO: app.bsky.notification.putPreferences 61 64 // TODO: app.bsky.notification.putPreferencesV2 62 - .route("/app.bsky.unspecced.getPostThreadV2", get(unspecced::thread_v2::get_post_thread_v2)) 63 - .route("/app.bsky.unspecced.getPostThreadOtherV2", get(unspecced::thread_v2::get_post_thread_other_v2)) 64 65 }
-1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
··· 1 - pub mod thread_v2;
-83
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
··· 1 - use crate::hydration::StatefulHydrator; 2 - use crate::xrpc::error::XrpcResult; 3 - use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 4 - use crate::xrpc::normalise_at_uri; 5 - use crate::GlobalState; 6 - use axum::extract::{Query, State}; 7 - use axum::Json; 8 - use serde::{Deserialize, Serialize}; 9 - use lexica::app_bsky::feed::ThreadgateView; 10 - use lexica::app_bsky::unspecced::ThreadV2Item; 11 - 12 - #[derive(Debug, Default, Deserialize)] 13 - #[serde(rename_all = "lowercase")] 14 - pub enum PostThreadSort { 15 - Newest, 16 - #[default] 17 - Oldest, 18 - Top, 19 - } 20 - 21 - #[derive(Debug, Deserialize)] 22 - #[serde(rename_all = "camelCase")] 23 - pub struct GetPostThreadV2Req { 24 - pub anchor: String, 25 - pub above: bool, 26 - pub below: Option<u32>, 27 - pub branching_factor: Option<u32>, 28 - #[serde(default)] 29 - pub prioritize_followed_users: bool, 30 - #[serde(default)] 31 - pub sort: PostThreadSort, 32 - } 33 - 34 - #[derive(Debug, Serialize)] 35 - #[serde(rename_all = "camelCase")] 36 - pub struct GetPostThreadV2Res { 37 - pub thread: Vec<ThreadV2Item>, 38 - #[serde(skip_serializing_if = "Option::is_none")] 39 - pub threadgate: Option<ThreadgateView>, 40 - pub has_other_replies: bool, 41 - } 42 - 43 - pub async fn get_post_thread_v2( 44 - State(state): State<GlobalState>, 45 - AtpAcceptLabelers(labelers): AtpAcceptLabelers, 46 - maybe_auth: Option<AtpAuth>, 47 - Query(query): Query<GetPostThreadV2Req>, 48 - ) -> XrpcResult<Json<GetPostThreadV2Res>> { 49 - let mut conn = state.pool.get().await?; 50 - let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 51 - 52 - let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 53 - 54 - todo!() 55 - } 56 - 57 - #[derive(Debug, Deserialize)] 58 - #[serde(rename_all = "camelCase")] 59 - pub struct GetPostThreadOtherV2Req { 60 - pub anchor: String, 61 - #[serde(default)] 62 - pub prioritize_followed_users: bool, 63 - } 64 - 65 - #[derive(Debug, Serialize)] 66 - #[serde(rename_all = "camelCase")] 67 - pub struct GetPostThreadOtherV2Res { 68 - pub thread: Vec<ThreadV2Item>, 69 - } 70 - 71 - pub async fn get_post_thread_other_v2( 72 - State(state): State<GlobalState>, 73 - AtpAcceptLabelers(labelers): AtpAcceptLabelers, 74 - maybe_auth: Option<AtpAuth>, 75 - Query(query): Query<GetPostThreadOtherV2Req>, 76 - ) -> XrpcResult<Json<GetPostThreadOtherV2Res>> { 77 - let mut conn = state.pool.get().await?; 78 - let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 79 - 80 - let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 81 - 82 - todo!() 83 - }
+69
parakeet/src/xrpc/community_lexicon/bookmarks.rs
··· 1 + use crate::xrpc::datetime_cursor; 2 + use crate::xrpc::error::XrpcResult; 3 + use crate::xrpc::extract::AtpAuth; 4 + use crate::GlobalState; 5 + use axum::extract::{Query, State}; 6 + use axum::Json; 7 + use diesel::prelude::*; 8 + use diesel_async::RunQueryDsl; 9 + use lexica::community_lexicon::bookmarks::Bookmark; 10 + use parakeet_db::{models, schema}; 11 + use serde::{Deserialize, Serialize}; 12 + 13 + #[derive(Debug, Deserialize)] 14 + pub struct BookmarkCursorQuery { 15 + pub tags: Option<Vec<String>>, 16 + pub limit: Option<u8>, 17 + pub cursor: Option<String>, 18 + } 19 + 20 + #[derive(Debug, Serialize)] 21 + pub struct GetActorBookmarksRes { 22 + #[serde(skip_serializing_if = "Option::is_none")] 23 + cursor: Option<String>, 24 + bookmarks: Vec<Bookmark>, 25 + } 26 + 27 + pub async fn get_actor_bookmarks( 28 + State(state): State<GlobalState>, 29 + auth: AtpAuth, 30 + Query(query): Query<BookmarkCursorQuery>, 31 + ) -> XrpcResult<Json<GetActorBookmarksRes>> { 32 + let mut conn = state.pool.get().await?; 33 + 34 + let limit = query.limit.unwrap_or(50).clamp(1, 100); 35 + 36 + let mut bookmarks_query = schema::bookmarks::table 37 + .select(models::Bookmark::as_select()) 38 + .filter(schema::bookmarks::did.eq(&auth.0)) 39 + .into_boxed(); 40 + 41 + if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { 42 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor)); 43 + } 44 + 45 + if let Some(tags) = query.tags { 46 + bookmarks_query = bookmarks_query.filter(schema::bookmarks::tags.contains(tags)); 47 + } 48 + 49 + let results = bookmarks_query 50 + .order(schema::bookmarks::created_at.desc()) 51 + .limit(limit as i64) 52 + .load(&mut conn) 53 + .await?; 54 + 55 + let cursor = results 56 + .last() 57 + .map(|bm| bm.created_at.timestamp_millis().to_string()); 58 + 59 + let bookmarks = results 60 + .into_iter() 61 + .map(|bookmark| Bookmark { 62 + subject: bookmark.subject, 63 + tags: bookmark.tags.into_iter().flatten().collect(), 64 + created_at: bookmark.created_at, 65 + }) 66 + .collect(); 67 + 68 + Ok(Json(GetActorBookmarksRes { cursor, bookmarks })) 69 + }
+10
parakeet/src/xrpc/community_lexicon/mod.rs
··· 1 + use axum::routing::get; 2 + use axum::Router; 3 + 4 + pub mod bookmarks; 5 + 6 + #[rustfmt::skip] 7 + pub fn routes() -> Router<crate::GlobalState> { 8 + Router::new() 9 + .route("/community.lexicon.bookmarks.getActorBookmarks", get(bookmarks::get_actor_bookmarks)) 10 + }
+2
parakeet/src/xrpc/mod.rs
··· 8 8 mod app_bsky; 9 9 pub mod cdn; 10 10 mod com_atproto; 11 + mod community_lexicon; 11 12 mod error; 12 13 pub mod extract; 13 14 pub mod jwt; ··· 16 17 Router::new() 17 18 .merge(app_bsky::routes()) 18 19 .merge(com_atproto::routes()) 20 + .merge(community_lexicon::routes()) 19 21 } 20 22 21 23 fn datetime_cursor(cursor: Option<&String>) -> Option<chrono::DateTime<chrono::Utc>> {
+26
parakeet-db/src/models.rs
··· 383 383 pub did: &'a str, 384 384 pub list_uri: &'a str, 385 385 } 386 + 387 + #[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)] 388 + #[diesel(table_name = crate::schema::bookmarks)] 389 + #[diesel(primary_key(did, subject, subject_cid))] 390 + #[diesel(check_for_backend(diesel::pg::Pg))] 391 + pub struct Bookmark { 392 + pub did: String, 393 + pub rkey: Option<String>, 394 + pub subject: String, 395 + pub subject_cid: Option<String>, 396 + pub subject_type: String, 397 + pub tags: Vec<Option<String>>, 398 + pub created_at: DateTime<Utc>, 399 + } 400 + 401 + #[derive(Debug, Insertable, AsChangeset)] 402 + #[diesel(table_name = crate::schema::bookmarks)] 403 + #[diesel(check_for_backend(diesel::pg::Pg))] 404 + pub struct NewBookmark<'a> { 405 + pub did: &'a str, 406 + pub rkey: Option<String>, 407 + pub subject: &'a str, 408 + pub subject_cid: Option<String>, 409 + pub subject_type: &'a str, 410 + pub tags: Vec<String>, 411 + }
+14
parakeet-db/src/schema.rs
··· 43 43 } 44 44 45 45 diesel::table! { 46 + bookmarks (did, subject) { 47 + did -> Text, 48 + rkey -> Nullable<Text>, 49 + subject -> Text, 50 + subject_cid -> Nullable<Text>, 51 + subject_type -> Text, 52 + tags -> Array<Nullable<Text>>, 53 + created_at -> Timestamptz, 54 + } 55 + } 56 + 57 + diesel::table! { 46 58 chat_decls (did) { 47 59 did -> Text, 48 60 allow_incoming -> Text, ··· 375 387 376 388 diesel::joinable!(backfill -> actors (repo)); 377 389 diesel::joinable!(blocks -> actors (did)); 390 + diesel::joinable!(bookmarks -> actors (did)); 378 391 diesel::joinable!(chat_decls -> actors (did)); 379 392 diesel::joinable!(feedgens -> actors (owner)); 380 393 diesel::joinable!(follows -> actors (did)); ··· 405 418 backfill, 406 419 backfill_jobs, 407 420 blocks, 421 + bookmarks, 408 422 chat_decls, 409 423 feedgens, 410 424 follows,
+4 -1
parakeet-index/build.rs
··· 1 1 fn main() -> Result<(), Box<dyn std::error::Error>> { 2 - tonic_build::configure().compile_protos(&["proto/parakeet.proto"], &[""])?; 2 + tonic_build::configure().compile_protos( 3 + &[std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("proto/parakeet.proto")], 4 + &[std::path::Path::new(env!("CARGO_MANIFEST_DIR"))], 5 + )?; 3 6 4 7 Ok(()) 5 8 }