at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use jacquard_common::types::string::Did;
2use smol_str::SmolStr;
3
4use crate::db::types::{DbRkey, DbTid, TrimmedDid};
5use url::Url;
6
7/// separator used for composite keys
8pub const SEP: u8 = b'|';
9
10pub const CURSOR_KEY: &[u8] = b"firehose_cursor";
11
12pub const BLOCK_REFS_CHECKPOINT_SEQ_KEY: &[u8] = b"block_refs_checkpoint_seq";
13
14pub const EVENT_WATERMARK_PREFIX: &[u8] = b"ewm|";
15
16// key format: {DID}
17pub fn repo_key<'a>(did: &'a Did) -> Vec<u8> {
18 let mut vec = Vec::with_capacity(32);
19 TrimmedDid::from(did).write_to_vec(&mut vec);
20 vec
21}
22
23pub fn pending_key(id: u64) -> [u8; 8] {
24 id.to_be_bytes()
25}
26
27pub fn reflog_key(seq: u64) -> [u8; 8] {
28 seq.to_be_bytes()
29}
30
31pub fn event_watermark_key(timestamp_secs: u64) -> Vec<u8> {
32 let mut key = Vec::with_capacity(EVENT_WATERMARK_PREFIX.len() + 8);
33 key.extend_from_slice(EVENT_WATERMARK_PREFIX);
34 key.extend_from_slice(×tamp_secs.to_be_bytes());
35 key
36}
37
38// prefix format: {DID}| (DID trimmed)
39pub fn record_prefix_did(did: &Did) -> Vec<u8> {
40 let repo = TrimmedDid::from(did);
41 let mut prefix = Vec::with_capacity(repo.len() + 1);
42 repo.write_to_vec(&mut prefix);
43 prefix.push(SEP);
44 prefix
45}
46
47// prefix format: {DID}|{collection}|
48pub fn record_prefix_collection(did: &Did, collection: &str) -> Vec<u8> {
49 let repo = TrimmedDid::from(did);
50 let mut prefix = Vec::with_capacity(repo.len() + 1 + collection.len() + 1);
51 repo.write_to_vec(&mut prefix);
52 prefix.push(SEP);
53 prefix.extend_from_slice(collection.as_bytes());
54 prefix.push(SEP);
55 prefix
56}
57
58// key format: {DID}|{collection}|{rkey}
59pub fn record_key(did: &Did, collection: &str, rkey: &DbRkey) -> Vec<u8> {
60 let repo = TrimmedDid::from(did);
61 let mut key = Vec::with_capacity(repo.len() + 1 + collection.len() + 1 + rkey.len() + 1);
62 repo.write_to_vec(&mut key);
63 key.push(SEP);
64 key.extend_from_slice(collection.as_bytes());
65 key.push(SEP);
66 write_rkey(&mut key, rkey);
67 key
68}
69
70pub fn write_rkey(buf: &mut Vec<u8>, rkey: &DbRkey) {
71 match rkey {
72 DbRkey::Tid(tid) => {
73 buf.push(b't');
74 buf.extend_from_slice(tid.as_bytes());
75 }
76 DbRkey::Str(s) => {
77 buf.push(b's');
78 buf.extend_from_slice(s.as_bytes());
79 }
80 }
81}
82
83pub fn parse_rkey(raw: &[u8]) -> miette::Result<DbRkey> {
84 let Some(kind) = raw.first() else {
85 miette::bail!("record key is empty");
86 };
87 let rkey = match kind {
88 b't' => {
89 DbRkey::Tid(DbTid::new_from_bytes(raw[1..].try_into().map_err(|e| {
90 miette::miette!("record key '{raw:?}' is invalid: {e}")
91 })?))
92 }
93 b's' => DbRkey::Str(SmolStr::new(
94 std::str::from_utf8(&raw[1..])
95 .map_err(|e| miette::miette!("record key '{raw:?}' is invalid: {e}"))?,
96 )),
97 _ => miette::bail!("invalid record key kind: {}", *kind as char),
98 };
99 Ok(rkey)
100}
101
102// key format: {SEQ}
103pub fn event_key(seq: u64) -> [u8; 8] {
104 seq.to_be_bytes()
105}
106
107pub const COUNT_KS_PREFIX: &[u8] = &[b'k', SEP];
108
109// count keys for the counts keyspace
110// key format: k\x00{keyspace_name}
111pub fn count_keyspace_key(name: &str) -> Vec<u8> {
112 let mut key = Vec::with_capacity(COUNT_KS_PREFIX.len() + name.len());
113 key.extend_from_slice(COUNT_KS_PREFIX);
114 key.extend_from_slice(name.as_bytes());
115 key
116}
117
118pub const COUNT_COLLECTION_PREFIX: &[u8] = &[b'r', SEP];
119
120// key format: r|{DID}|{collection} (DID trimmed)
121pub fn count_collection_key(did: &Did, collection: &str) -> Vec<u8> {
122 let repo = TrimmedDid::from(did);
123 let mut key =
124 Vec::with_capacity(COUNT_COLLECTION_PREFIX.len() + repo.len() + 1 + collection.len());
125 key.extend_from_slice(COUNT_COLLECTION_PREFIX);
126 repo.write_to_vec(&mut key);
127 key.push(SEP);
128 key.extend_from_slice(collection.as_bytes());
129 key
130}
131
132// key format: {DID}|{rev}
133pub fn resync_buffer_key(did: &Did, rev: DbTid) -> Vec<u8> {
134 let repo = TrimmedDid::from(did);
135 let mut key = Vec::with_capacity(repo.len() + 1 + 8);
136 repo.write_to_vec(&mut key);
137 key.push(SEP);
138 key.extend_from_slice(&rev.as_bytes());
139 key
140}
141
142// prefix format: {DID}| (DID trimmed)
143pub fn resync_buffer_prefix(did: &Did) -> Vec<u8> {
144 let repo = TrimmedDid::from(did);
145 let mut prefix = Vec::with_capacity(repo.len() + 1);
146 repo.write_to_vec(&mut prefix);
147 prefix.push(SEP);
148 prefix
149}
150
151/// key format: `ret|<did bytes>`
152pub const CRAWLER_RETRY_PREFIX: &[u8] = b"ret|";
153
154pub fn crawler_retry_key(did: &Did) -> Vec<u8> {
155 let repo = TrimmedDid::from(did);
156 let mut key = Vec::with_capacity(CRAWLER_RETRY_PREFIX.len() + repo.len());
157 key.extend_from_slice(CRAWLER_RETRY_PREFIX);
158 repo.write_to_vec(&mut key);
159 key
160}
161
162pub fn crawler_retry_parse_key(key: &[u8]) -> miette::Result<TrimmedDid<'_>> {
163 TrimmedDid::try_from(&key[CRAWLER_RETRY_PREFIX.len()..])
164}
165
166pub fn crawler_cursor_key(relay: &Url) -> Vec<u8> {
167 let mut key = b"crawler_cursor|".to_vec();
168 key.extend_from_slice(relay.as_str().as_bytes());
169 key
170}
171
172pub fn firehose_cursor_key(relay: &Url) -> Vec<u8> {
173 let mut key = b"firehose_cursor|".to_vec();
174 key.extend_from_slice(relay.as_str().as_bytes());
175 key
176}