+1
.envrc
+1
.envrc
···
1
+
use flake
+11
README.md
+11
README.md
···
3
3
Parakeet is a [Bluesky](https://bsky.app) [AppView](https://atproto.wiki/en/wiki/reference/core-architecture/appview)
4
4
aiming to implement most of the functionality required to support the Bluesky client. Notably not implemented is a CDN.
5
5
6
+
## Status and Roadmap
7
+
Most common functionality works, with notable omissions being like/repost/follow statuses, blocks and mutes don't get
8
+
applied, labels might not track CIDs properly, label redaction doesn't work at all (beware!).
9
+
10
+
Future work is tracked in issues, but the highlights are below. Help would be highly appreciated.
11
+
- Notifications
12
+
- Search
13
+
- Pinned Posts
14
+
- The Timeline
15
+
- Monitoring: metrics, tracing, and health checks.
16
+
6
17
## The Code
7
18
Parakeet is implemented in Rust, using Postgres as a database, Redis for caching and queue processing, RocksDB for
8
19
aggregation, and Diesel for migrations and querying.
+8
-8
consumer/src/backfill/downloader.rs
+8
-8
consumer/src/backfill/downloader.rs
···
109
109
Ok(Some(did_doc)) => {
110
110
let Some(service) = did_doc.find_service_by_id(PDS_SERVICE_ID) else {
111
111
tracing::warn!("bad DID doc for {did}");
112
-
db::backfill_job_write(&mut conn, &did, "failed.resolve")
112
+
db::backfill_job_write(&mut conn, &did, "failed.resolve.did_svc")
113
113
.await
114
114
.unwrap();
115
115
continue;
···
132
132
}
133
133
}
134
134
Ok(None) => {
135
-
tracing::warn!(did, "bad DID doc");
135
+
tracing::warn!(did, "bad/missing DID doc");
136
136
db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now())
137
137
.await
138
138
.unwrap();
139
-
db::backfill_job_write(&mut conn, &did, "failed.resolve")
139
+
db::backfill_job_write(&mut conn, &did, "failed.resolve.did_doc")
140
140
.await
141
141
.unwrap();
142
142
}
···
145
145
db::actor_set_sync_status(&mut conn, &did, ActorSyncState::Dirty, Utc::now())
146
146
.await
147
147
.unwrap();
148
-
db::backfill_job_write(&mut conn, &did, "failed.resolve")
148
+
db::backfill_job_write(&mut conn, &did, "failed.resolve.did")
149
149
.await
150
150
.unwrap();
151
151
}
···
179
179
Ok(false) => continue,
180
180
Err(e) => {
181
181
tracing::error!(pds, did, "failed to check repo status: {e}");
182
-
db::backfill_job_write(&mut conn, &did, "failed.resolve")
182
+
db::backfill_job_write(&mut conn, &did, "failed.resolve.status")
183
183
.await
184
184
.unwrap();
185
185
continue;
···
190
190
if let Some(handle) = maybe_handle {
191
191
if let Err(e) = resolve_and_set_handle(&conn, &resolver, &did, &handle).await {
192
192
tracing::error!(pds, did, "failed to resolve handle: {e}");
193
-
db::backfill_job_write(&mut conn, &did, "failed.resolve")
193
+
db::backfill_job_write(&mut conn, &did, "failed.resolve.handle")
194
194
.await
195
195
.unwrap();
196
196
}
···
253
253
pds: &str,
254
254
did: &str,
255
255
) -> eyre::Result<Option<(i32, i32)>> {
256
-
let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?;
257
-
258
256
let res = http
259
257
.get(format!("{pds}/xrpc/com.atproto.sync.getRepo?did={did}"))
260
258
.send()
261
259
.await?
262
260
.error_for_status()?;
261
+
262
+
let mut file = tokio::fs::File::create_new(tmp_dir.join(did)).await?;
263
263
264
264
let headers = res.headers();
265
265
let ratelimit_rem = header_to_int(headers, "ratelimit-remaining");
+1
-1
consumer/src/backfill/mod.rs
+1
-1
consumer/src/backfill/mod.rs
+5
-2
consumer/src/backfill/repo.rs
+5
-2
consumer/src/backfill/repo.rs
···
1
1
use super::{
2
-
types::{CarCommitEntry, CarEntry},
2
+
types::{CarCommitEntry, CarEntry, CarRecordEntry},
3
3
CopyStore,
4
4
};
5
5
use crate::indexer::records;
···
54
54
CarEntry::Commit(_) => {
55
55
tracing::warn!("got commit entry that was not in root")
56
56
}
57
-
CarEntry::Record(record) => {
57
+
CarEntry::Record(CarRecordEntry::Known(record)) => {
58
58
if let Some(path) = mst_nodes.remove(&cid) {
59
59
record_index(t, rc, &mut copies, &mut deltas, repo, &path, cid, record).await?;
60
60
} else {
61
61
records.insert(cid, record);
62
62
}
63
+
}
64
+
CarEntry::Record(CarRecordEntry::Other { ty }) => {
65
+
tracing::debug!("repo contains unknown record type: {ty} ({cid})");
63
66
}
64
67
CarEntry::Mst(mst) => {
65
68
let mut out = Vec::with_capacity(mst.e.len());
+11
-1
consumer/src/backfill/types.rs
+11
-1
consumer/src/backfill/types.rs
···
8
8
pub enum CarEntry {
9
9
Mst(CarMstEntry),
10
10
Commit(CarCommitEntry),
11
-
Record(RecordTypes),
11
+
Record(CarRecordEntry),
12
12
}
13
13
14
14
#[derive(Debug, Deserialize)]
···
33
33
pub rev: String,
34
34
pub prev: Option<Cid>,
35
35
pub sig: ByteBuf,
36
+
}
37
+
38
+
#[derive(Debug, Deserialize)]
39
+
#[serde(untagged)]
40
+
pub enum CarRecordEntry {
41
+
Known(RecordTypes),
42
+
Other {
43
+
#[serde(rename = "$type")]
44
+
ty: String,
45
+
},
36
46
}
37
47
38
48
#[derive(Debug, Deserialize)]
+1
-1
consumer/src/db/backfill.rs
+1
-1
consumer/src/db/backfill.rs
···
19
19
status: &str,
20
20
) -> PgExecResult {
21
21
conn.execute(
22
-
"INSERT INTO backfill_jobs (did, status) VALUES ($1, $2)",
22
+
"INSERT INTO backfill_jobs (did, status) VALUES ($1, $2) ON CONFLICT (did) DO UPDATE SET status = $2, updated_at = NOW()",
23
23
&[&did, &status],
24
24
)
25
25
.await
+32
consumer/src/db/record.rs
+32
consumer/src/db/record.rs
···
4
4
use chrono::prelude::*;
5
5
use deadpool_postgres::GenericClient;
6
6
use ipld_core::cid::Cid;
7
+
use lexica::community_lexicon::bookmarks::Bookmark;
7
8
8
9
pub async fn record_upsert<C: GenericClient>(
9
10
conn: &mut C,
···
20
21
pub async fn record_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
21
22
conn.execute("DELETE FROM records WHERE at_uri=$1", &[&at_uri])
22
23
.await
24
+
}
25
+
26
+
pub async fn bookmark_upsert<C: GenericClient>(
27
+
conn: &mut C,
28
+
rkey: &str,
29
+
repo: &str,
30
+
rec: Bookmark,
31
+
) -> PgExecResult {
32
+
// strip "at://" then break into parts by '/'
33
+
let rec_type = match rec.subject.strip_prefix("at://") {
34
+
Some(at_uri) => at_uri.split('/').collect::<Vec<_>>()[1],
35
+
None => "$uri",
36
+
};
37
+
38
+
conn.execute(
39
+
include_str!("sql/bookmarks_upsert.sql"),
40
+
&[&repo, &rkey, &rec.subject, &rec_type, &rec.tags, &rec.created_at],
41
+
)
42
+
.await
43
+
}
44
+
45
+
pub async fn bookmark_delete<C: GenericClient>(
46
+
conn: &mut C,
47
+
rkey: &str,
48
+
repo: &str,
49
+
) -> PgExecResult {
50
+
conn.execute(
51
+
"DELETE FROM bookmarks WHERE rkey=$1 AND did=$2",
52
+
&[&rkey, &repo],
53
+
)
54
+
.await
23
55
}
24
56
25
57
pub async fn block_insert<C: GenericClient>(
+5
consumer/src/db/sql/bookmarks_upsert.sql
+5
consumer/src/db/sql/bookmarks_upsert.sql
+15
consumer/src/firehose/mod.rs
+15
consumer/src/firehose/mod.rs
···
117
117
118
118
FirehoseEvent::Label(event)
119
119
}
120
+
"#sync" => {
121
+
counter!("firehose_events.total", "event" => "sync").increment(1);
122
+
let event: AtpSyncEvent =
123
+
serde_ipld_dagcbor::from_reader(&mut reader)?;
124
+
125
+
// increment the seq
126
+
if self.seq < event.seq {
127
+
self.seq = event.seq;
128
+
} else {
129
+
tracing::error!("Event sequence was not greater than previous seq, exiting. {} <= {}", event.seq, self.seq);
130
+
return Ok(FirehoseOutput::Close);
131
+
}
132
+
133
+
FirehoseEvent::Sync(event)
134
+
}
120
135
_ => {
121
136
tracing::warn!("unknown event type {ty}");
122
137
return Ok(FirehoseOutput::Continue);
+23
consumer/src/firehose/types.rs
+23
consumer/src/firehose/types.rs
···
31
31
Account(AtpAccountEvent),
32
32
Commit(AtpCommitEvent),
33
33
Label(AtpLabelEvent),
34
+
Sync(AtpSyncEvent),
34
35
}
35
36
36
37
#[derive(Debug, Deserialize)]
···
48
49
Suspended,
49
50
Deleted,
50
51
Deactivated,
52
+
Throttled,
53
+
Desynchronized,
51
54
}
52
55
53
56
impl AtpAccountStatus {
···
57
60
AtpAccountStatus::Suspended => "suspended",
58
61
AtpAccountStatus::Deleted => "deleted",
59
62
AtpAccountStatus::Deactivated => "deactivated",
63
+
AtpAccountStatus::Throttled => "throttled",
64
+
AtpAccountStatus::Desynchronized => "desynchronized",
60
65
}
61
66
}
62
67
}
···
68
73
AtpAccountStatus::Suspended => parakeet_db::types::ActorStatus::Suspended,
69
74
AtpAccountStatus::Deleted => parakeet_db::types::ActorStatus::Deleted,
70
75
AtpAccountStatus::Deactivated => parakeet_db::types::ActorStatus::Deactivated,
76
+
AtpAccountStatus::Throttled | AtpAccountStatus::Desynchronized => {
77
+
parakeet_db::types::ActorStatus::Active
78
+
}
71
79
}
72
80
}
73
81
}
···
90
98
pub since: Option<String>,
91
99
pub commit: Cid,
92
100
#[serde(rename = "tooBig")]
101
+
#[deprecated]
93
102
pub too_big: bool,
94
103
#[serde(default)]
95
104
pub blocks: ByteBuf,
96
105
#[serde(default)]
97
106
pub ops: Vec<CommitOp>,
98
107
#[serde(default)]
108
+
#[deprecated]
99
109
pub blobs: Vec<Cid>,
110
+
#[serde(rename = "prevData")]
111
+
pub prev_data: Option<Cid>,
100
112
}
101
113
102
114
#[derive(Debug, Deserialize)]
103
115
pub struct CommitOp {
104
116
pub action: String,
105
117
pub cid: Option<Cid>,
118
+
pub prev: Option<Cid>,
106
119
pub path: String,
107
120
}
108
121
···
124
137
pub seq: u64,
125
138
pub labels: Vec<AtpLabel>,
126
139
}
140
+
141
+
#[derive(Debug, Deserialize)]
142
+
pub struct AtpSyncEvent {
143
+
pub seq: u64,
144
+
pub did: String,
145
+
pub time: DateTime<Utc>,
146
+
pub rev: String,
147
+
#[serde(default)]
148
+
pub blocks: ByteBuf,
149
+
}
+32
-2
consumer/src/indexer/mod.rs
+32
-2
consumer/src/indexer/mod.rs
···
1
1
use crate::config::HistoryMode;
2
2
use crate::db;
3
3
use crate::firehose::{
4
-
AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, CommitOp, FirehoseConsumer, FirehoseEvent,
5
-
FirehoseOutput,
4
+
AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, AtpSyncEvent, CommitOp, FirehoseConsumer,
5
+
FirehoseEvent, FirehoseOutput,
6
6
};
7
7
use crate::indexer::types::{
8
8
AggregateDeltaStore, BackfillItem, BackfillItemInner, CollectionType, RecordTypes,
···
107
107
FirehoseEvent::Commit(commit) => {
108
108
index_commit(&mut state, &mut conn, &mut rc, commit).await
109
109
}
110
+
FirehoseEvent::Sync(sync) => {
111
+
process_sync(&state, &mut conn, &mut rc, sync).await
112
+
}
110
113
FirehoseEvent::Label(_) => unreachable!(),
111
114
};
112
115
···
188
191
FirehoseEvent::Identity(identity) => self.hasher.hash_one(&identity.did) % threads,
189
192
FirehoseEvent::Account(account) => self.hasher.hash_one(&account.did) % threads,
190
193
FirehoseEvent::Commit(commit) => self.hasher.hash_one(&commit.repo) % threads,
194
+
FirehoseEvent::Sync(sync) => self.hasher.hash_one(&sync.did) % threads,
191
195
FirehoseEvent::Label(_) => {
192
196
// We handle all labels through direct connections to labelers
193
197
tracing::warn!("got #labels from the relay");
···
199
203
tracing::error!("Error sending event: {e}");
200
204
}
201
205
}
206
+
}
207
+
208
+
#[instrument(skip_all, fields(seq = sync.seq, repo = sync.did))]
209
+
async fn process_sync(
210
+
state: &RelayIndexerState,
211
+
conn: &mut Object,
212
+
rc: &mut MultiplexedConnection,
213
+
sync: AtpSyncEvent,
214
+
) -> eyre::Result<()> {
215
+
let Some((sync_state, Some(current_rev))) = db::actor_get_repo_status(conn, &sync.did).await? else {
216
+
return Ok(());
217
+
};
218
+
219
+
// don't care if we're not synced. also no point if !do_backfill bc we might not have a worker
220
+
if sync_state == ActorSyncState::Synced && state.do_backfill && sync.rev > current_rev {
221
+
tracing::debug!("triggering backfill due to #sync");
222
+
rc.rpush::<_, _, i32>("backfill_queue", sync.did).await?;
223
+
}
224
+
225
+
Ok(())
202
226
}
203
227
204
228
#[instrument(skip_all, fields(seq = identity.seq, repo = identity.did))]
···
723
747
redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?;
724
748
}
725
749
}
750
+
RecordTypes::CommunityLexiconBookmark(record) => {
751
+
db::bookmark_upsert(conn, rkey, repo, record).await?;
752
+
}
726
753
}
727
754
728
755
db::record_upsert(conn, at_uri, repo, cid).await?;
···
832
859
CollectionType::ChatActorDecl => {
833
860
redis::AsyncTypedCommands::del(rc, format!("profile#{repo}")).await?;
834
861
db::chat_decl_delete(conn, repo).await?
862
+
}
863
+
CollectionType::CommunityLexiconBookmark => {
864
+
db::bookmark_delete(conn, rkey, repo).await?
835
865
}
836
866
_ => unreachable!(),
837
867
};
+5
consumer/src/indexer/types.rs
+5
consumer/src/indexer/types.rs
···
41
41
AppBskyNotificationDeclaration(records::AppBskyNotificationDeclaration),
42
42
#[serde(rename = "chat.bsky.actor.declaration")]
43
43
ChatBskyActorDeclaration(records::ChatBskyActorDeclaration),
44
+
#[serde(rename = "community.lexicon.bookmarks.bookmark")]
45
+
CommunityLexiconBookmark(lexica::community_lexicon::bookmarks::Bookmark)
44
46
}
45
47
46
48
#[derive(Debug, PartialOrd, PartialEq, Deserialize, Serialize)]
···
63
65
BskyLabelerService,
64
66
BskyNotificationDeclaration,
65
67
ChatActorDecl,
68
+
CommunityLexiconBookmark,
66
69
Unsupported,
67
70
}
68
71
···
87
90
"app.bsky.labeler.service" => CollectionType::BskyLabelerService,
88
91
"app.bsky.notification.declaration" => CollectionType::BskyNotificationDeclaration,
89
92
"chat.bsky.actor.declaration" => CollectionType::ChatActorDecl,
93
+
"community.lexicon.bookmarks.bookmark" => CollectionType::CommunityLexiconBookmark,
90
94
_ => CollectionType::Unsupported,
91
95
}
92
96
}
···
111
115
CollectionType::BskyVerification => false,
112
116
CollectionType::BskyLabelerService => true,
113
117
CollectionType::BskyNotificationDeclaration => true,
118
+
CollectionType::CommunityLexiconBookmark => true,
114
119
CollectionType::Unsupported => false,
115
120
}
116
121
}
+98
flake.lock
+98
flake.lock
···
1
+
{
2
+
"nodes": {
3
+
"crane": {
4
+
"locked": {
5
+
"lastModified": 1757183466,
6
+
"narHash": "sha256-kTdCCMuRE+/HNHES5JYsbRHmgtr+l9mOtf5dpcMppVc=",
7
+
"owner": "ipetkov",
8
+
"repo": "crane",
9
+
"rev": "d599ae4847e7f87603e7082d73ca673aa93c916d",
10
+
"type": "github"
11
+
},
12
+
"original": {
13
+
"owner": "ipetkov",
14
+
"repo": "crane",
15
+
"type": "github"
16
+
}
17
+
},
18
+
"flake-utils": {
19
+
"inputs": {
20
+
"systems": "systems"
21
+
},
22
+
"locked": {
23
+
"lastModified": 1731533236,
24
+
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
25
+
"owner": "numtide",
26
+
"repo": "flake-utils",
27
+
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
28
+
"type": "github"
29
+
},
30
+
"original": {
31
+
"owner": "numtide",
32
+
"repo": "flake-utils",
33
+
"type": "github"
34
+
}
35
+
},
36
+
"nixpkgs": {
37
+
"locked": {
38
+
"lastModified": 1758029226,
39
+
"narHash": "sha256-TjqVmbpoCqWywY9xIZLTf6ANFvDCXdctCjoYuYPYdMI=",
40
+
"owner": "NixOS",
41
+
"repo": "nixpkgs",
42
+
"rev": "08b8f92ac6354983f5382124fef6006cade4a1c1",
43
+
"type": "github"
44
+
},
45
+
"original": {
46
+
"owner": "NixOS",
47
+
"ref": "nixpkgs-unstable",
48
+
"repo": "nixpkgs",
49
+
"type": "github"
50
+
}
51
+
},
52
+
"root": {
53
+
"inputs": {
54
+
"crane": "crane",
55
+
"flake-utils": "flake-utils",
56
+
"nixpkgs": "nixpkgs",
57
+
"rust-overlay": "rust-overlay"
58
+
}
59
+
},
60
+
"rust-overlay": {
61
+
"inputs": {
62
+
"nixpkgs": [
63
+
"nixpkgs"
64
+
]
65
+
},
66
+
"locked": {
67
+
"lastModified": 1758162771,
68
+
"narHash": "sha256-hdZpMep6Z1gbgg9piUZ0BNusI6ZJaptBw6PHSN/3GD0=",
69
+
"owner": "oxalica",
70
+
"repo": "rust-overlay",
71
+
"rev": "d0cabb6ae8f5b38dffaff9f4e6db57c0ae21d729",
72
+
"type": "github"
73
+
},
74
+
"original": {
75
+
"owner": "oxalica",
76
+
"repo": "rust-overlay",
77
+
"type": "github"
78
+
}
79
+
},
80
+
"systems": {
81
+
"locked": {
82
+
"lastModified": 1681028828,
83
+
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
84
+
"owner": "nix-systems",
85
+
"repo": "default",
86
+
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
87
+
"type": "github"
88
+
},
89
+
"original": {
90
+
"owner": "nix-systems",
91
+
"repo": "default",
92
+
"type": "github"
93
+
}
94
+
}
95
+
},
96
+
"root": "root",
97
+
"version": 7
98
+
}
+464
flake.nix
+464
flake.nix
···
1
+
{
2
+
description = "Parakeet is a Rust-based Bluesky AppView";
3
+
inputs = {
4
+
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
5
+
crane.url = "github:ipetkov/crane";
6
+
flake-utils.url = "github:numtide/flake-utils";
7
+
rust-overlay = {
8
+
url = "github:oxalica/rust-overlay";
9
+
inputs.nixpkgs.follows = "nixpkgs";
10
+
};
11
+
};
12
+
outputs =
13
+
{
14
+
self,
15
+
nixpkgs,
16
+
crane,
17
+
flake-utils,
18
+
rust-overlay,
19
+
...
20
+
}:
21
+
flake-utils.lib.eachDefaultSystem (
22
+
system:
23
+
let
24
+
pkgs = import nixpkgs {
25
+
inherit system;
26
+
overlays = [ (import rust-overlay) ];
27
+
};
28
+
craneLib = (crane.mkLib pkgs).overrideToolchain (
29
+
p:
30
+
p.rust-bin.selectLatestNightlyWith (
31
+
toolchain:
32
+
toolchain.default.override {
33
+
extensions = [
34
+
"rust-src"
35
+
"rust-analyzer"
36
+
];
37
+
}
38
+
)
39
+
);
40
+
41
+
inherit (pkgs) lib;
42
+
unfilteredRoot = ./.; # The original, unfiltered source
43
+
src = lib.fileset.toSource {
44
+
root = unfilteredRoot;
45
+
fileset = lib.fileset.unions [
46
+
# Default files from crane (Rust and cargo files)
47
+
(craneLib.fileset.commonCargoSources unfilteredRoot)
48
+
];
49
+
};
50
+
# Common arguments can be set here to avoid repeating them later
51
+
commonArgs = {
52
+
inherit src;
53
+
strictDeps = true;
54
+
nativeBuildInputs = with pkgs; [
55
+
pkg-config
56
+
];
57
+
buildInputs = [
58
+
# Add additional build inputs here
59
+
pkgs.openssl
60
+
pkgs.postgresql
61
+
pkgs.libpq
62
+
pkgs.clang
63
+
pkgs.libclang
64
+
pkgs.lld
65
+
pkgs.protobuf
66
+
]
67
+
++ lib.optionals pkgs.stdenv.isDarwin [
68
+
# Additional darwin specific inputs can be set here
69
+
pkgs.libiconv
70
+
pkgs.darwin.apple_sdk.frameworks.Security
71
+
];
72
+
LIBCLANG_PATH = "${pkgs.llvmPackages_18.libclang.lib}/lib";
73
+
CLANG_PATH = "${pkgs.llvmPackages_18.clang}/bin/clang";
74
+
PROTOC_INCLUDE = "${pkgs.protobuf}/include";
75
+
PROTOC = "${pkgs.protobuf}/bin/protoc";
76
+
77
+
# Additional environment variables can be set directly
78
+
# MY_CUSTOM_VAR = "some value";
79
+
};
80
+
81
+
# Build *just* the cargo dependencies, so we can reuse
82
+
# all of that work (e.g. via cachix) when running in CI
83
+
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
84
+
85
+
individualCrateArgs = commonArgs // {
86
+
inherit cargoArtifacts;
87
+
inherit (craneLib.crateNameFromCargoToml { inherit src; }) version;
88
+
# NB: we disable tests since we'll run them all via cargo-nextest
89
+
doCheck = false;
90
+
};
91
+
fileSetForCrate =
92
+
crate:
93
+
lib.fileset.toSource {
94
+
root = ./.;
95
+
fileset = lib.fileset.unions [
96
+
./Cargo.toml
97
+
./Cargo.lock
98
+
./migrations
99
+
(craneLib.fileset.commonCargoSources ./consumer)
100
+
./consumer/src/db/sql
101
+
(craneLib.fileset.commonCargoSources ./dataloader-rs)
102
+
(craneLib.fileset.commonCargoSources ./did-resolver)
103
+
(craneLib.fileset.commonCargoSources ./lexica)
104
+
(craneLib.fileset.commonCargoSources ./parakeet)
105
+
./parakeet/src/sql
106
+
(craneLib.fileset.commonCargoSources ./parakeet-db)
107
+
(craneLib.fileset.commonCargoSources ./parakeet-index)
108
+
./parakeet-index/proto
109
+
(craneLib.fileset.commonCargoSources ./parakeet-lexgen)
110
+
(craneLib.fileset.commonCargoSources crate)
111
+
];
112
+
};
113
+
114
+
# Build the actual crate itself, reusing the dependency
115
+
# artifacts from above.
116
+
consumer = craneLib.buildPackage (
117
+
individualCrateArgs
118
+
// {
119
+
pname = "consumer";
120
+
cargoExtraArgs = "-p consumer";
121
+
src = fileSetForCrate ./consumer;
122
+
postInstall = ''
123
+
mkdir -p $out/{bin,lib/consumer}
124
+
'';
125
+
}
126
+
);
127
+
dataloader = craneLib.buildPackage (
128
+
individualCrateArgs
129
+
// {
130
+
pname = "dataloader";
131
+
cargoExtraArgs = "-p dataloader --features default";
132
+
src = fileSetForCrate ./dataloader-rs;
133
+
}
134
+
);
135
+
did-resolver = craneLib.buildPackage (
136
+
individualCrateArgs
137
+
// {
138
+
pname = "did-resolver";
139
+
cargoExtraArgs = "-p did-resolver";
140
+
src = fileSetForCrate ./did-resolver;
141
+
}
142
+
);
143
+
lexica = craneLib.buildPackage (
144
+
individualCrateArgs
145
+
// {
146
+
pname = "lexica";
147
+
cargoExtraArgs = "-p lexica";
148
+
src = fileSetForCrate ./lexica;
149
+
}
150
+
);
151
+
parakeet = craneLib.buildPackage (
152
+
individualCrateArgs
153
+
// {
154
+
pname = "parakeet";
155
+
cargoExtraArgs = "-p parakeet";
156
+
src = fileSetForCrate ./parakeet;
157
+
}
158
+
);
159
+
parakeet-db = craneLib.buildPackage (
160
+
individualCrateArgs
161
+
// {
162
+
pname = "parakeet-db";
163
+
cargoExtraArgs = "-p parakeet-db --features default";
164
+
src = fileSetForCrate ./parakeet-db;
165
+
}
166
+
);
167
+
parakeet-index = craneLib.buildPackage (
168
+
individualCrateArgs
169
+
// {
170
+
pname = "parakeet-index";
171
+
cargoExtraArgs = "-p parakeet-index --features server";
172
+
src = fileSetForCrate ./parakeet-index;
173
+
}
174
+
);
175
+
parakeet-lexgen = craneLib.buildPackage (
176
+
individualCrateArgs
177
+
// {
178
+
pname = "parakeet-lexgen";
179
+
cargoExtraArgs = "-p parakeet-lexgen";
180
+
src = fileSetForCrate ./parakeet-lexgen;
181
+
}
182
+
);
183
+
in
184
+
{
185
+
checks = {
186
+
# Build the crate as part of `nix flake check` for convenience
187
+
inherit
188
+
consumer
189
+
dataloader
190
+
did-resolver
191
+
lexica
192
+
parakeet
193
+
parakeet-db
194
+
parakeet-index
195
+
parakeet-lexgen
196
+
;
197
+
};
198
+
199
+
packages = {
200
+
default = parakeet;
201
+
inherit
202
+
consumer
203
+
dataloader
204
+
did-resolver
205
+
lexica
206
+
parakeet
207
+
parakeet-db
208
+
parakeet-index
209
+
parakeet-lexgen
210
+
;
211
+
};
212
+
213
+
devShells.default = craneLib.devShell {
214
+
# Inherit inputs from checks.
215
+
checks = self.checks.${system};
216
+
217
+
# Additional dev-shell environment variables can be set directly
218
+
RUST_BACKTRACE = 1;
219
+
NIXOS_OZONE_WL = 1;
220
+
LIBCLANG_PATH = "${pkgs.llvmPackages.libclang.lib}/lib";
221
+
222
+
# Extra inputs can be added here; cargo and rustc are provided by default.
223
+
packages = with pkgs; [
224
+
openssl
225
+
bacon
226
+
postgresql
227
+
rust-analyzer
228
+
rustfmt
229
+
clippy
230
+
git
231
+
nixd
232
+
direnv
233
+
libpq
234
+
clang
235
+
libclang
236
+
];
237
+
};
238
+
}
239
+
)
240
+
// flake-utils.lib.eachDefaultSystemPassThrough (system: {
241
+
nixosModules = {
242
+
default =
243
+
{
244
+
pkgs,
245
+
lib,
246
+
config,
247
+
...
248
+
}:
249
+
with lib;
250
+
let
251
+
cfg = config.services.parakeet;
252
+
253
+
inherit (lib)
254
+
mkEnableOption
255
+
mkIf
256
+
mkOption
257
+
types
258
+
;
259
+
in
260
+
{
261
+
options.services.parakeet = {
262
+
enable = mkEnableOption "parakeet";
263
+
264
+
package = mkOption {
265
+
type = types.package;
266
+
default = self.packages.${pkgs.system}.default;
267
+
description = "The path to the parakeet package.";
268
+
};
269
+
270
+
environmentFiles = mkOption {
271
+
type = types.listOf types.path;
272
+
default = [ "/var/lib/parakeet/config.env" ];
273
+
description = ''
274
+
File to load environment variables from. Loaded variables override
275
+
values set in {option}`environment`.
276
+
'';
277
+
};
278
+
};
279
+
config = mkIf cfg.enable {
280
+
environment.systemPackages = [
281
+
self.packages.${pkgs.system}.consumer
282
+
];
283
+
systemd.services.consumer = {
284
+
description = "consumer";
285
+
after = [ "network-online.target" ];
286
+
wants = [ "network-online.target" ];
287
+
wantedBy = [ "multi-user.target" ];
288
+
serviceConfig = {
289
+
ExecStart = "${self.packages.${pkgs.system}.consumer}/bin/consumer --indexer";
290
+
Type = "exec";
291
+
292
+
EnvironmentFile = cfg.environmentFiles;
293
+
User = "parakeet";
294
+
Group = "parakeet";
295
+
StateDirectory = "parakeet";
296
+
StateDirectoryMode = "0755";
297
+
Restart = "always";
298
+
299
+
# Hardening
300
+
RemoveIPC = true;
301
+
CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ];
302
+
NoNewPrivileges = true;
303
+
PrivateDevices = true;
304
+
ProtectClock = true;
305
+
ProtectKernelLogs = true;
306
+
ProtectControlGroups = true;
307
+
ProtectKernelModules = true;
308
+
PrivateMounts = true;
309
+
SystemCallArchitectures = [ "native" ];
310
+
MemoryDenyWriteExecute = false; # required by V8 JIT
311
+
RestrictNamespaces = true;
312
+
RestrictSUIDSGID = true;
313
+
ProtectHostname = true;
314
+
LockPersonality = true;
315
+
ProtectKernelTunables = true;
316
+
RestrictAddressFamilies = [
317
+
"AF_UNIX"
318
+
"AF_INET"
319
+
"AF_INET6"
320
+
];
321
+
RestrictRealtime = true;
322
+
DeviceAllow = [ "" ];
323
+
ProtectSystem = "full";
324
+
ProtectProc = "invisible";
325
+
ProcSubset = "pid";
326
+
ProtectHome = true;
327
+
PrivateUsers = true;
328
+
PrivateTmp = true;
329
+
UMask = "0077";
330
+
};
331
+
};
332
+
systemd.services.parakeet = {
333
+
description = "parakeet";
334
+
after = [ "network-online.target" ];
335
+
wants = [ "network-online.target" ];
336
+
wantedBy = [ "multi-user.target" ];
337
+
serviceConfig = {
338
+
ExecStart = "${cfg.package}/bin/parakeet";
339
+
Type = "exec";
340
+
341
+
EnvironmentFile = cfg.environmentFiles;
342
+
User = "parakeet";
343
+
Group = "parakeet";
344
+
StateDirectory = "parakeet";
345
+
StateDirectoryMode = "0755";
346
+
Restart = "always";
347
+
348
+
# Hardening
349
+
RemoveIPC = true;
350
+
CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ];
351
+
NoNewPrivileges = true;
352
+
PrivateDevices = true;
353
+
ProtectClock = true;
354
+
ProtectKernelLogs = true;
355
+
ProtectControlGroups = true;
356
+
ProtectKernelModules = true;
357
+
PrivateMounts = true;
358
+
SystemCallArchitectures = [ "native" ];
359
+
MemoryDenyWriteExecute = false; # required by V8 JIT
360
+
RestrictNamespaces = true;
361
+
RestrictSUIDSGID = true;
362
+
ProtectHostname = true;
363
+
LockPersonality = true;
364
+
ProtectKernelTunables = true;
365
+
RestrictAddressFamilies = [
366
+
"AF_UNIX"
367
+
"AF_INET"
368
+
"AF_INET6"
369
+
];
370
+
RestrictRealtime = true;
371
+
DeviceAllow = [ "" ];
372
+
ProtectSystem = "full";
373
+
ProtectProc = "invisible";
374
+
ProcSubset = "pid";
375
+
ProtectHome = true;
376
+
PrivateUsers = true;
377
+
PrivateTmp = true;
378
+
UMask = "0077";
379
+
};
380
+
};
381
+
systemd.services.parakeet-index = {
382
+
description = "parakeet-index";
383
+
after = [ "network-online.target" ];
384
+
wants = [ "network-online.target" ];
385
+
wantedBy = [ "multi-user.target" ];
386
+
serviceConfig = {
387
+
ExecStart = "${self.packages.${pkgs.system}.parakeet-index}/bin/parakeet-index";
388
+
Type = "exec";
389
+
390
+
EnvironmentFile = cfg.environmentFiles;
391
+
User = "parakeet";
392
+
Group = "parakeet";
393
+
StateDirectory = "parakeet";
394
+
StateDirectoryMode = "0755";
395
+
Restart = "always";
396
+
397
+
# Hardening
398
+
RemoveIPC = true;
399
+
CapabilityBoundingSet = [ "CAP_NET_BIND_SERVICE" ];
400
+
NoNewPrivileges = true;
401
+
PrivateDevices = true;
402
+
ProtectClock = true;
403
+
ProtectKernelLogs = true;
404
+
ProtectControlGroups = true;
405
+
ProtectKernelModules = true;
406
+
PrivateMounts = true;
407
+
SystemCallArchitectures = [ "native" ];
408
+
MemoryDenyWriteExecute = false; # required by V8 JIT
409
+
RestrictNamespaces = true;
410
+
RestrictSUIDSGID = true;
411
+
ProtectHostname = true;
412
+
LockPersonality = true;
413
+
ProtectKernelTunables = true;
414
+
RestrictAddressFamilies = [
415
+
"AF_UNIX"
416
+
"AF_INET"
417
+
"AF_INET6"
418
+
];
419
+
RestrictRealtime = true;
420
+
DeviceAllow = [ "" ];
421
+
ProtectSystem = "full";
422
+
ProtectProc = "invisible";
423
+
ProcSubset = "pid";
424
+
ProtectHome = true;
425
+
PrivateUsers = true;
426
+
PrivateTmp = true;
427
+
UMask = "0077";
428
+
};
429
+
};
430
+
users = {
431
+
users.parakeet = {
432
+
group = "parakeet";
433
+
isSystemUser = true;
434
+
};
435
+
groups.parakeet = { };
436
+
};
437
+
services.postgresql = {
438
+
enable = true;
439
+
ensureUsers = [
440
+
{
441
+
name = "parakeet";
442
+
ensureDBOwnership = true;
443
+
}
444
+
];
445
+
ensureDatabases = [ "parakeet" ];
446
+
authentication = pkgs.lib.mkOverride 10 ''
447
+
#type database DBuser auth-method
448
+
local all all trust
449
+
host all all 127.0.0.1/32 trust
450
+
host all all ::1/128 trust
451
+
'';
452
+
package = mkForce pkgs.postgresql_16;
453
+
};
454
+
services.redis.servers.parakeet = {
455
+
enable = true;
456
+
# port = 0;
457
+
unixSocket = "/run/redis-parakeet/redis.sock";
458
+
user = "parakeet";
459
+
};
460
+
};
461
+
};
462
+
};
463
+
});
464
+
}
+6
-28
lexica/src/app_bsky/actor.rs
+6
-28
lexica/src/app_bsky/actor.rs
···
1
1
use crate::app_bsky::embed::External;
2
-
use crate::app_bsky::graph::ListViewBasic;
3
2
use crate::com_atproto::label::Label;
4
3
use chrono::prelude::*;
5
4
use serde::{Deserialize, Serialize};
6
5
use std::fmt::Display;
7
6
use std::str::FromStr;
8
-
9
-
#[derive(Clone, Default, Debug, Serialize)]
10
-
#[serde(rename_all = "camelCase")]
11
-
pub struct ProfileViewerState {
12
-
pub muted: bool,
13
-
#[serde(skip_serializing_if = "Option::is_none")]
14
-
pub muted_by_list: Option<ListViewBasic>,
15
-
pub blocked_by: bool,
16
-
#[serde(skip_serializing_if = "Option::is_none")]
17
-
pub blocking: Option<String>,
18
-
#[serde(skip_serializing_if = "Option::is_none")]
19
-
pub blocking_by_list: Option<ListViewBasic>,
20
-
#[serde(skip_serializing_if = "Option::is_none")]
21
-
pub following: Option<String>,
22
-
#[serde(skip_serializing_if = "Option::is_none")]
23
-
pub followed_by: Option<String>,
24
-
// #[serde(skip_serializing_if = "Option::is_none")]
25
-
// pub known_followers: Option<()>,
26
-
// #[serde(skip_serializing_if = "Option::is_none")]
27
-
// pub activity_subscriptions: Option<()>,
28
-
}
29
7
30
8
#[derive(Clone, Default, Debug, Serialize)]
31
9
#[serde(rename_all = "camelCase")]
···
152
130
pub avatar: Option<String>,
153
131
#[serde(skip_serializing_if = "Option::is_none")]
154
132
pub associated: Option<ProfileAssociated>,
155
-
#[serde(skip_serializing_if = "Option::is_none")]
156
-
pub viewer: Option<ProfileViewerState>,
133
+
// #[serde(skip_serializing_if = "Option::is_none")]
134
+
// pub viewer: Option<()>,
157
135
#[serde(skip_serializing_if = "Vec::is_empty")]
158
136
pub labels: Vec<Label>,
159
137
#[serde(skip_serializing_if = "Option::is_none")]
···
178
156
pub avatar: Option<String>,
179
157
#[serde(skip_serializing_if = "Option::is_none")]
180
158
pub associated: Option<ProfileAssociated>,
181
-
#[serde(skip_serializing_if = "Option::is_none")]
182
-
pub viewer: Option<ProfileViewerState>,
159
+
// #[serde(skip_serializing_if = "Option::is_none")]
160
+
// pub viewer: Option<()>,
183
161
#[serde(skip_serializing_if = "Vec::is_empty")]
184
162
pub labels: Vec<Label>,
185
163
#[serde(skip_serializing_if = "Option::is_none")]
···
211
189
pub associated: Option<ProfileAssociated>,
212
190
// #[serde(skip_serializing_if = "Option::is_none")]
213
191
// pub joined_via_starter_pack: Option<()>,
214
-
#[serde(skip_serializing_if = "Option::is_none")]
215
-
pub viewer: Option<ProfileViewerState>,
192
+
// #[serde(skip_serializing_if = "Option::is_none")]
193
+
// pub viewer: Option<()>,
216
194
#[serde(skip_serializing_if = "Vec::is_empty")]
217
195
pub labels: Vec<Label>,
218
196
// #[serde(skip_serializing_if = "Option::is_none")]
+32
lexica/src/app_bsky/bookmark.rs
+32
lexica/src/app_bsky/bookmark.rs
···
1
+
use crate::app_bsky::feed::{BlockedAuthor, PostView};
2
+
use crate::StrongRef;
3
+
use chrono::prelude::*;
4
+
use serde::Serialize;
5
+
6
+
#[derive(Clone, Debug, Serialize)]
7
+
#[serde(rename_all = "camelCase")]
8
+
pub struct BookmarkView {
9
+
pub subject: StrongRef,
10
+
pub item: BookmarkViewItem,
11
+
pub created_at: DateTime<Utc>,
12
+
}
13
+
14
+
#[derive(Clone, Debug, Serialize)]
15
+
#[serde(tag = "$type")]
16
+
// This is technically the same as ReplyRefPost atm, but just in case...
17
+
pub enum BookmarkViewItem {
18
+
#[serde(rename = "app.bsky.feed.defs#postView")]
19
+
Post(PostView),
20
+
#[serde(rename = "app.bsky.feed.defs#notFoundPost")]
21
+
NotFound {
22
+
uri: String,
23
+
#[serde(rename = "notFound")]
24
+
not_found: bool,
25
+
},
26
+
#[serde(rename = "app.bsky.feed.defs#blockedPost")]
27
+
Blocked {
28
+
uri: String,
29
+
blocked: bool,
30
+
author: BlockedAuthor,
31
+
},
32
+
}
+9
-24
lexica/src/app_bsky/feed.rs
+9
-24
lexica/src/app_bsky/feed.rs
···
1
1
use super::RecordStats;
2
-
use crate::app_bsky::actor::{ProfileView, ProfileViewBasic, ProfileViewerState};
2
+
use crate::app_bsky::actor::{ProfileView, ProfileViewBasic};
3
3
use crate::app_bsky::embed::Embed;
4
4
use crate::app_bsky::graph::ListViewBasic;
5
5
use crate::app_bsky::richtext::FacetMain;
···
8
8
use serde::{Deserialize, Serialize};
9
9
use std::str::FromStr;
10
10
11
-
#[derive(Clone, Default, Debug, Serialize)]
12
-
#[serde(rename_all = "camelCase")]
13
-
pub struct PostViewerState {
14
-
pub repost: Option<String>,
15
-
pub like: Option<String>,
16
-
pub thread_muted: bool,
17
-
pub reply_disabled: bool,
18
-
pub embedding_disabled: bool,
19
-
pub pinned: bool,
20
-
}
21
-
22
11
#[derive(Clone, Debug, Serialize)]
23
12
#[serde(rename_all = "camelCase")]
24
13
pub struct PostView {
···
34
23
35
24
#[serde(skip_serializing_if = "Vec::is_empty")]
36
25
pub labels: Vec<Label>,
37
-
#[serde(skip_serializing_if = "Option::is_none")]
38
-
pub viewer: Option<PostViewerState>,
26
+
// #[serde(skip_serializing_if = "Option::is_none")]
27
+
// pub viewer: Option<()>,
39
28
#[serde(skip_serializing_if = "Option::is_none")]
40
29
pub threadgate: Option<ThreadgateView>,
41
30
···
135
124
#[derive(Clone, Debug, Serialize)]
136
125
pub struct BlockedAuthor {
137
126
pub uri: String,
138
-
pub viewer: Option<ProfileViewerState>,
139
-
}
140
-
141
-
#[derive(Clone, Default, Debug, Serialize)]
142
-
#[serde(rename_all = "camelCase")]
143
-
pub struct GeneratorViewerState {
144
-
pub like: Option<String>,
127
+
// pub viewer: Option<()>,
145
128
}
146
129
147
130
#[derive(Clone, Debug, Serialize)]
···
165
148
pub accepts_interactions: bool,
166
149
#[serde(skip_serializing_if = "Vec::is_empty")]
167
150
pub labels: Vec<Label>,
168
-
#[serde(skip_serializing_if = "Option::is_none")]
169
-
pub viewer: Option<GeneratorViewerState>,
151
+
// #[serde(skip_serializing_if = "Option::is_none")]
152
+
// pub viewer: Option<()>,
170
153
#[serde(skip_serializing_if = "Option::is_none")]
171
154
pub content_mode: Option<GeneratorContentMode>,
172
155
···
236
219
#[serde(rename = "app.bsky.feed.defs#skeletonReasonPin")]
237
220
Pin {},
238
221
#[serde(rename = "app.bsky.feed.defs#skeletonReasonRepost")]
239
-
Repost { repost: String },
222
+
Repost {
223
+
repost: String,
224
+
},
240
225
}
+4
-11
lexica/src/app_bsky/graph.rs
+4
-11
lexica/src/app_bsky/graph.rs
···
6
6
use serde::{Deserialize, Serialize};
7
7
use std::str::FromStr;
8
8
9
-
#[derive(Clone, Default, Debug, Serialize)]
10
-
#[serde(rename_all = "camelCase")]
11
-
pub struct ListViewerState {
12
-
pub muted: bool,
13
-
pub blocked: Option<String>,
14
-
}
15
-
16
9
#[derive(Clone, Debug, Serialize)]
17
10
#[serde(rename_all = "camelCase")]
18
11
pub struct ListViewBasic {
···
25
18
pub avatar: Option<String>,
26
19
pub list_item_count: i64,
27
20
28
-
#[serde(skip_serializing_if = "Option::is_none")]
29
-
pub viewer: Option<ListViewerState>,
21
+
// #[serde(skip_serializing_if = "Option::is_none")]
22
+
// pub viewer: Option<()>,
30
23
#[serde(skip_serializing_if = "Vec::is_empty")]
31
24
pub labels: Vec<Label>,
32
25
···
51
44
pub avatar: Option<String>,
52
45
pub list_item_count: i64,
53
46
54
-
#[serde(skip_serializing_if = "Option::is_none")]
55
-
pub viewer: Option<ListViewerState>,
47
+
// #[serde(skip_serializing_if = "Option::is_none")]
48
+
// pub viewer: Option<()>,
56
49
#[serde(skip_serializing_if = "Vec::is_empty")]
57
50
pub labels: Vec<Label>,
58
51
+4
-10
lexica/src/app_bsky/labeler.rs
+4
-10
lexica/src/app_bsky/labeler.rs
···
4
4
use chrono::prelude::*;
5
5
use serde::{Deserialize, Serialize};
6
6
7
-
#[derive(Clone, Default, Debug, Serialize)]
8
-
#[serde(rename_all = "camelCase")]
9
-
pub struct LabelerViewerState {
10
-
pub like: bool,
11
-
}
12
-
13
7
#[derive(Clone, Debug, Serialize)]
14
8
#[serde(rename_all = "camelCase")]
15
9
pub struct LabelerView {
···
18
12
pub creator: ProfileView,
19
13
20
14
pub like_count: i64,
21
-
#[serde(skip_serializing_if = "Option::is_none")]
22
-
pub viewer: Option<LabelerViewerState>,
15
+
// #[serde(skip_serializing_if = "Option::is_none")]
16
+
// pub viewer: Option<()>,
23
17
#[serde(skip_serializing_if = "Vec::is_empty")]
24
18
pub labels: Vec<Label>,
25
19
pub indexed_at: DateTime<Utc>,
···
33
27
pub creator: ProfileView,
34
28
35
29
pub like_count: i64,
36
-
#[serde(skip_serializing_if = "Option::is_none")]
37
-
pub viewer: Option<LabelerViewerState>,
30
+
// #[serde(skip_serializing_if = "Option::is_none")]
31
+
// pub viewer: Option<()>,
38
32
#[serde(skip_serializing_if = "Vec::is_empty")]
39
33
pub labels: Vec<Label>,
40
34
pub policies: LabelerPolicy,
+1
lexica/src/app_bsky/mod.rs
+1
lexica/src/app_bsky/mod.rs
+14
lexica/src/community_lexicon/bookmarks.rs
+14
lexica/src/community_lexicon/bookmarks.rs
···
1
+
use chrono::prelude::*;
2
+
use serde::{Deserialize, Serialize};
3
+
4
+
#[derive(Clone, Debug, Deserialize, Serialize)]
5
+
#[serde(tag = "$type")]
6
+
#[serde(rename = "community.lexicon.bookmarks.bookmark")]
7
+
#[serde(rename_all = "camelCase")]
8
+
pub struct Bookmark {
9
+
pub subject: String,
10
+
#[serde(default)]
11
+
#[serde(skip_serializing_if = "Vec::is_empty")]
12
+
pub tags: Vec<String>,
13
+
pub created_at: DateTime<Utc>,
14
+
}
+1
lexica/src/community_lexicon/mod.rs
+1
lexica/src/community_lexicon/mod.rs
···
1
+
pub mod bookmarks;
+8
lexica/src/lib.rs
+8
lexica/src/lib.rs
···
5
5
6
6
pub mod app_bsky;
7
7
pub mod com_atproto;
8
+
pub mod community_lexicon;
8
9
mod utils;
9
10
10
11
#[derive(Clone, Debug, Serialize)]
···
21
22
)]
22
23
pub cid: Cid,
23
24
pub uri: String,
25
+
}
26
+
27
+
impl StrongRef {
28
+
pub fn new_from_str(uri: String, cid: &str) -> Result<Self, cid::Error> {
29
+
let cid = cid.parse()?;
30
+
Ok(StrongRef { uri, cid })
31
+
}
24
32
}
25
33
26
34
#[derive(Clone, Debug, Deserialize, Serialize)]
+1
migrations/2025-09-02-190833_bookmarks/down.sql
+1
migrations/2025-09-02-190833_bookmarks/down.sql
···
1
+
drop table bookmarks;
+19
migrations/2025-09-02-190833_bookmarks/up.sql
+19
migrations/2025-09-02-190833_bookmarks/up.sql
···
1
+
create table bookmarks
2
+
(
3
+
did text not null references actors (did),
4
+
rkey text,
5
+
subject text not null,
6
+
subject_cid text,
7
+
subject_type text not null,
8
+
tags text[] not null default ARRAY []::text[],
9
+
10
+
created_at timestamptz not null default now(),
11
+
12
+
primary key (did, subject)
13
+
);
14
+
15
+
create index bookmarks_rkey_index on bookmarks (rkey);
16
+
create index bookmarks_subject_index on bookmarks (subject);
17
+
create index bookmarks_subject_type_index on bookmarks (subject_type);
18
+
create index bookmarks_tags_index on bookmarks using gin (tags);
19
+
create unique index bookmarks_rkey_ui on bookmarks (did, rkey);
-1
parakeet/src/hydration/feedgen.rs
-1
parakeet/src/hydration/feedgen.rs
-2
parakeet/src/hydration/labeler.rs
-2
parakeet/src/hydration/labeler.rs
···
18
18
cid: labeler.cid,
19
19
creator,
20
20
like_count: likes.unwrap_or_default() as i64,
21
-
viewer: None,
22
21
labels: map_labels(labels),
23
22
indexed_at: labeler.indexed_at.and_utc(),
24
23
}
···
78
77
cid: labeler.cid,
79
78
creator,
80
79
like_count: likes.unwrap_or_default() as i64,
81
-
viewer: None,
82
80
policies: LabelerPolicy {
83
81
label_values,
84
82
label_value_definitions,
-2
parakeet/src/hydration/list.rs
-2
parakeet/src/hydration/list.rs
···
22
22
purpose,
23
23
avatar,
24
24
list_item_count,
25
-
viewer: None,
26
25
labels: map_labels(labels),
27
26
indexed_at: list.created_at,
28
27
})
···
52
51
description_facets,
53
52
avatar,
54
53
list_item_count,
55
-
viewer: None,
56
54
labels: map_labels(labels),
57
55
indexed_at: list.created_at,
58
56
})
-1
parakeet/src/hydration/posts.rs
-1
parakeet/src/hydration/posts.rs
-3
parakeet/src/hydration/profile.rs
-3
parakeet/src/hydration/profile.rs
···
169
169
display_name: profile.display_name,
170
170
avatar,
171
171
associated,
172
-
viewer: None,
173
172
labels: map_labels(labels),
174
173
verification,
175
174
status,
···
196
195
description: profile.description,
197
196
avatar,
198
197
associated,
199
-
viewer: None,
200
198
labels: map_labels(labels),
201
199
verification,
202
200
status,
···
228
226
followers_count: stats.map(|v| v.followers as i64).unwrap_or_default(),
229
227
follows_count: stats.map(|v| v.following as i64).unwrap_or_default(),
230
228
associated,
231
-
viewer: None,
232
229
labels: map_labels(labels),
233
230
verification,
234
231
status,
+146
parakeet/src/xrpc/app_bsky/bookmark.rs
+146
parakeet/src/xrpc/app_bsky/bookmark.rs
···
1
+
use crate::hydration::StatefulHydrator;
2
+
use crate::xrpc::error::XrpcResult;
3
+
use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth};
4
+
use crate::xrpc::{datetime_cursor, CursorQuery};
5
+
use crate::GlobalState;
6
+
use axum::extract::{Query, State};
7
+
use axum::Json;
8
+
use diesel::prelude::*;
9
+
use diesel_async::RunQueryDsl;
10
+
use lexica::app_bsky::bookmark::{BookmarkView, BookmarkViewItem};
11
+
use parakeet_db::{models, schema};
12
+
use serde::{Deserialize, Serialize};
13
+
use lexica::StrongRef;
14
+
15
+
const BSKY_ALLOWED_TYPES: &[&str] = &["app.bsky.feed.post"];
16
+
17
+
#[derive(Debug, Deserialize)]
18
+
pub struct CreateBookmarkReq {
19
+
pub uri: String,
20
+
pub cid: String,
21
+
}
22
+
23
+
pub async fn create_bookmark(
24
+
State(state): State<GlobalState>,
25
+
auth: AtpAuth,
26
+
Json(form): Json<CreateBookmarkReq>,
27
+
) -> XrpcResult<()> {
28
+
let mut conn = state.pool.get().await?;
29
+
30
+
// strip "at://" then break into parts by '/'
31
+
let parts = form.uri[5..].split('/').collect::<Vec<_>>();
32
+
33
+
let data = models::NewBookmark {
34
+
did: &auth.0,
35
+
rkey: None,
36
+
subject: &form.uri,
37
+
subject_cid: Some(form.cid),
38
+
subject_type: &parts[1],
39
+
tags: vec![],
40
+
};
41
+
42
+
diesel::insert_into(schema::bookmarks::table)
43
+
.values(&data)
44
+
.on_conflict_do_nothing()
45
+
.execute(&mut conn)
46
+
.await?;
47
+
48
+
Ok(())
49
+
}
50
+
51
+
#[derive(Debug, Deserialize)]
52
+
pub struct DeleteBookmarkReq {
53
+
pub uri: String,
54
+
}
55
+
56
+
pub async fn delete_bookmark(
57
+
State(state): State<GlobalState>,
58
+
auth: AtpAuth,
59
+
Json(form): Json<DeleteBookmarkReq>,
60
+
) -> XrpcResult<()> {
61
+
let mut conn = state.pool.get().await?;
62
+
63
+
diesel::delete(schema::bookmarks::table)
64
+
.filter(
65
+
schema::bookmarks::did
66
+
.eq(&auth.0)
67
+
.and(schema::bookmarks::subject.eq(&form.uri)),
68
+
)
69
+
.execute(&mut conn)
70
+
.await?;
71
+
72
+
Ok(())
73
+
}
74
+
75
+
#[derive(Debug, Serialize)]
76
+
pub struct GetBookmarksRes {
77
+
#[serde(skip_serializing_if = "Option::is_none")]
78
+
cursor: Option<String>,
79
+
bookmarks: Vec<BookmarkView>,
80
+
}
81
+
82
+
pub async fn get_bookmarks(
83
+
State(state): State<GlobalState>,
84
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
85
+
auth: AtpAuth,
86
+
Query(query): Query<CursorQuery>,
87
+
) -> XrpcResult<Json<GetBookmarksRes>> {
88
+
let mut conn = state.pool.get().await?;
89
+
let did = auth.0.clone();
90
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, Some(auth));
91
+
92
+
let limit = query.limit.unwrap_or(50).clamp(1, 100);
93
+
94
+
let mut bookmarks_query = schema::bookmarks::table
95
+
.select(models::Bookmark::as_select())
96
+
.filter(schema::bookmarks::did.eq(&did))
97
+
.filter(schema::bookmarks::subject_type.eq_any(BSKY_ALLOWED_TYPES))
98
+
.into_boxed();
99
+
100
+
if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) {
101
+
bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor));
102
+
}
103
+
104
+
let results = bookmarks_query
105
+
.order(schema::bookmarks::created_at.desc())
106
+
.limit(limit as i64)
107
+
.load(&mut conn)
108
+
.await?;
109
+
110
+
let cursor = results
111
+
.last()
112
+
.map(|bm| bm.created_at.timestamp_millis().to_string());
113
+
114
+
let uris = results.iter().map(|bm| bm.subject.clone()).collect();
115
+
116
+
let mut posts = hyd.hydrate_posts(uris).await;
117
+
118
+
let bookmarks = results
119
+
.into_iter()
120
+
.filter_map(|bookmark| {
121
+
let maybe_item = posts.remove(&bookmark.subject);
122
+
let maybe_cid = maybe_item.as_ref().map(|v| v.cid.clone());
123
+
124
+
// ensure that either the cid is set in the bookmark record *or* in the post record
125
+
// otherwise just ditch. we should have one.
126
+
let cid = bookmark.subject_cid.or(maybe_cid)?;
127
+
128
+
let item = maybe_item.map(BookmarkViewItem::Post).unwrap_or(
129
+
BookmarkViewItem::NotFound {
130
+
uri: bookmark.subject.clone(),
131
+
not_found: true,
132
+
},
133
+
);
134
+
135
+
let subject = StrongRef::new_from_str(bookmark.subject, &cid).ok()?;
136
+
137
+
Some(BookmarkView {
138
+
subject,
139
+
item,
140
+
created_at: bookmark.created_at,
141
+
})
142
+
})
143
+
.collect();
144
+
145
+
Ok(Json(GetBookmarksRes { cursor, bookmarks }))
146
+
}
+4
parakeet/src/xrpc/app_bsky/mod.rs
+4
parakeet/src/xrpc/app_bsky/mod.rs
···
2
2
use axum::Router;
3
3
4
4
mod actor;
5
+
mod bookmark;
5
6
mod feed;
6
7
mod graph;
7
8
mod labeler;
···
14
15
// TODO: app.bsky.actor.getSuggestions (recs)
15
16
// TODO: app.bsky.actor.searchActor (search)
16
17
// TODO: app.bsky.actor.searchActorTypeahead (search)
18
+
.route("/app.bsky.bookmark.createBookmark", post(bookmark::create_bookmark))
19
+
.route("/app.bsky.bookmark.deleteBookmark", post(bookmark::delete_bookmark))
20
+
.route("/app.bsky.bookmark.getBookmarks", get(bookmark::get_bookmarks))
17
21
.route("/app.bsky.feed.getActorFeeds", get(feed::feedgen::get_actor_feeds))
18
22
.route("/app.bsky.feed.getActorLikes", get(feed::likes::get_actor_likes))
19
23
.route("/app.bsky.feed.getAuthorFeed", get(feed::posts::get_author_feed))
+69
parakeet/src/xrpc/community_lexicon/bookmarks.rs
+69
parakeet/src/xrpc/community_lexicon/bookmarks.rs
···
1
+
use crate::xrpc::datetime_cursor;
2
+
use crate::xrpc::error::XrpcResult;
3
+
use crate::xrpc::extract::AtpAuth;
4
+
use crate::GlobalState;
5
+
use axum::extract::{Query, State};
6
+
use axum::Json;
7
+
use diesel::prelude::*;
8
+
use diesel_async::RunQueryDsl;
9
+
use lexica::community_lexicon::bookmarks::Bookmark;
10
+
use parakeet_db::{models, schema};
11
+
use serde::{Deserialize, Serialize};
12
+
13
+
#[derive(Debug, Deserialize)]
14
+
pub struct BookmarkCursorQuery {
15
+
pub tags: Option<Vec<String>>,
16
+
pub limit: Option<u8>,
17
+
pub cursor: Option<String>,
18
+
}
19
+
20
+
#[derive(Debug, Serialize)]
21
+
pub struct GetActorBookmarksRes {
22
+
#[serde(skip_serializing_if = "Option::is_none")]
23
+
cursor: Option<String>,
24
+
bookmarks: Vec<Bookmark>,
25
+
}
26
+
27
+
pub async fn get_actor_bookmarks(
28
+
State(state): State<GlobalState>,
29
+
auth: AtpAuth,
30
+
Query(query): Query<BookmarkCursorQuery>,
31
+
) -> XrpcResult<Json<GetActorBookmarksRes>> {
32
+
let mut conn = state.pool.get().await?;
33
+
34
+
let limit = query.limit.unwrap_or(50).clamp(1, 100);
35
+
36
+
let mut bookmarks_query = schema::bookmarks::table
37
+
.select(models::Bookmark::as_select())
38
+
.filter(schema::bookmarks::did.eq(&auth.0))
39
+
.into_boxed();
40
+
41
+
if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) {
42
+
bookmarks_query = bookmarks_query.filter(schema::bookmarks::created_at.lt(cursor));
43
+
}
44
+
45
+
if let Some(tags) = query.tags {
46
+
bookmarks_query = bookmarks_query.filter(schema::bookmarks::tags.contains(tags));
47
+
}
48
+
49
+
let results = bookmarks_query
50
+
.order(schema::bookmarks::created_at.desc())
51
+
.limit(limit as i64)
52
+
.load(&mut conn)
53
+
.await?;
54
+
55
+
let cursor = results
56
+
.last()
57
+
.map(|bm| bm.created_at.timestamp_millis().to_string());
58
+
59
+
let bookmarks = results
60
+
.into_iter()
61
+
.map(|bookmark| Bookmark {
62
+
subject: bookmark.subject,
63
+
tags: bookmark.tags.into_iter().flatten().collect(),
64
+
created_at: bookmark.created_at,
65
+
})
66
+
.collect();
67
+
68
+
Ok(Json(GetActorBookmarksRes { cursor, bookmarks }))
69
+
}
+10
parakeet/src/xrpc/community_lexicon/mod.rs
+10
parakeet/src/xrpc/community_lexicon/mod.rs
+2
parakeet/src/xrpc/mod.rs
+2
parakeet/src/xrpc/mod.rs
···
8
8
mod app_bsky;
9
9
pub mod cdn;
10
10
mod com_atproto;
11
+
mod community_lexicon;
11
12
mod error;
12
13
pub mod extract;
13
14
pub mod jwt;
···
16
17
Router::new()
17
18
.merge(app_bsky::routes())
18
19
.merge(com_atproto::routes())
20
+
.merge(community_lexicon::routes())
19
21
}
20
22
21
23
fn datetime_cursor(cursor: Option<&String>) -> Option<chrono::DateTime<chrono::Utc>> {
+26
parakeet-db/src/models.rs
+26
parakeet-db/src/models.rs
···
383
383
pub did: &'a str,
384
384
pub list_uri: &'a str,
385
385
}
386
+
387
+
#[derive(Clone, Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable)]
388
+
#[diesel(table_name = crate::schema::bookmarks)]
389
+
#[diesel(primary_key(did, subject, subject_cid))]
390
+
#[diesel(check_for_backend(diesel::pg::Pg))]
391
+
pub struct Bookmark {
392
+
pub did: String,
393
+
pub rkey: Option<String>,
394
+
pub subject: String,
395
+
pub subject_cid: Option<String>,
396
+
pub subject_type: String,
397
+
pub tags: Vec<Option<String>>,
398
+
pub created_at: DateTime<Utc>,
399
+
}
400
+
401
+
#[derive(Debug, Insertable, AsChangeset)]
402
+
#[diesel(table_name = crate::schema::bookmarks)]
403
+
#[diesel(check_for_backend(diesel::pg::Pg))]
404
+
pub struct NewBookmark<'a> {
405
+
pub did: &'a str,
406
+
pub rkey: Option<String>,
407
+
pub subject: &'a str,
408
+
pub subject_cid: Option<String>,
409
+
pub subject_type: &'a str,
410
+
pub tags: Vec<String>,
411
+
}
+14
parakeet-db/src/schema.rs
+14
parakeet-db/src/schema.rs
···
43
43
}
44
44
45
45
diesel::table! {
46
+
bookmarks (did, subject) {
47
+
did -> Text,
48
+
rkey -> Nullable<Text>,
49
+
subject -> Text,
50
+
subject_cid -> Nullable<Text>,
51
+
subject_type -> Text,
52
+
tags -> Array<Nullable<Text>>,
53
+
created_at -> Timestamptz,
54
+
}
55
+
}
56
+
57
+
diesel::table! {
46
58
chat_decls (did) {
47
59
did -> Text,
48
60
allow_incoming -> Text,
···
375
387
376
388
diesel::joinable!(backfill -> actors (repo));
377
389
diesel::joinable!(blocks -> actors (did));
390
+
diesel::joinable!(bookmarks -> actors (did));
378
391
diesel::joinable!(chat_decls -> actors (did));
379
392
diesel::joinable!(feedgens -> actors (owner));
380
393
diesel::joinable!(follows -> actors (did));
···
405
418
backfill,
406
419
backfill_jobs,
407
420
blocks,
421
+
bookmarks,
408
422
chat_decls,
409
423
feedgens,
410
424
follows,
+4
-1
parakeet-index/build.rs
+4
-1
parakeet-index/build.rs
···
1
1
fn main() -> Result<(), Box<dyn std::error::Error>> {
2
-
tonic_build::configure().compile_protos(&["proto/parakeet.proto"], &[""])?;
2
+
tonic_build::configure().compile_protos(
3
+
&[std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("proto/parakeet.proto")],
4
+
&[std::path::Path::new(env!("CARGO_MANIFEST_DIR"))],
5
+
)?;
3
6
4
7
Ok(())
5
8
}