at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 176 lines 5.2 kB view raw
1use jacquard_common::types::string::Did; 2use smol_str::SmolStr; 3 4use crate::db::types::{DbRkey, DbTid, TrimmedDid}; 5use url::Url; 6 7/// separator used for composite keys 8pub const SEP: u8 = b'|'; 9 10pub const CURSOR_KEY: &[u8] = b"firehose_cursor"; 11 12pub const BLOCK_REFS_CHECKPOINT_SEQ_KEY: &[u8] = b"block_refs_checkpoint_seq"; 13 14pub const EVENT_WATERMARK_PREFIX: &[u8] = b"ewm|"; 15 16// key format: {DID} 17pub fn repo_key<'a>(did: &'a Did) -> Vec<u8> { 18 let mut vec = Vec::with_capacity(32); 19 TrimmedDid::from(did).write_to_vec(&mut vec); 20 vec 21} 22 23pub fn pending_key(id: u64) -> [u8; 8] { 24 id.to_be_bytes() 25} 26 27pub fn reflog_key(seq: u64) -> [u8; 8] { 28 seq.to_be_bytes() 29} 30 31pub fn event_watermark_key(timestamp_secs: u64) -> Vec<u8> { 32 let mut key = Vec::with_capacity(EVENT_WATERMARK_PREFIX.len() + 8); 33 key.extend_from_slice(EVENT_WATERMARK_PREFIX); 34 key.extend_from_slice(&timestamp_secs.to_be_bytes()); 35 key 36} 37 38// prefix format: {DID}| (DID trimmed) 39pub fn record_prefix_did(did: &Did) -> Vec<u8> { 40 let repo = TrimmedDid::from(did); 41 let mut prefix = Vec::with_capacity(repo.len() + 1); 42 repo.write_to_vec(&mut prefix); 43 prefix.push(SEP); 44 prefix 45} 46 47// prefix format: {DID}|{collection}| 48pub fn record_prefix_collection(did: &Did, collection: &str) -> Vec<u8> { 49 let repo = TrimmedDid::from(did); 50 let mut prefix = Vec::with_capacity(repo.len() + 1 + collection.len() + 1); 51 repo.write_to_vec(&mut prefix); 52 prefix.push(SEP); 53 prefix.extend_from_slice(collection.as_bytes()); 54 prefix.push(SEP); 55 prefix 56} 57 58// key format: {DID}|{collection}|{rkey} 59pub fn record_key(did: &Did, collection: &str, rkey: &DbRkey) -> Vec<u8> { 60 let repo = TrimmedDid::from(did); 61 let mut key = Vec::with_capacity(repo.len() + 1 + collection.len() + 1 + rkey.len() + 1); 62 repo.write_to_vec(&mut key); 63 key.push(SEP); 64 key.extend_from_slice(collection.as_bytes()); 65 key.push(SEP); 66 write_rkey(&mut key, rkey); 67 key 68} 69 70pub fn write_rkey(buf: &mut Vec<u8>, rkey: &DbRkey) { 71 match rkey { 72 DbRkey::Tid(tid) => { 73 buf.push(b't'); 74 buf.extend_from_slice(tid.as_bytes()); 75 } 76 DbRkey::Str(s) => { 77 buf.push(b's'); 78 buf.extend_from_slice(s.as_bytes()); 79 } 80 } 81} 82 83pub fn parse_rkey(raw: &[u8]) -> miette::Result<DbRkey> { 84 let Some(kind) = raw.first() else { 85 miette::bail!("record key is empty"); 86 }; 87 let rkey = match kind { 88 b't' => { 89 DbRkey::Tid(DbTid::new_from_bytes(raw[1..].try_into().map_err(|e| { 90 miette::miette!("record key '{raw:?}' is invalid: {e}") 91 })?)) 92 } 93 b's' => DbRkey::Str(SmolStr::new( 94 std::str::from_utf8(&raw[1..]) 95 .map_err(|e| miette::miette!("record key '{raw:?}' is invalid: {e}"))?, 96 )), 97 _ => miette::bail!("invalid record key kind: {}", *kind as char), 98 }; 99 Ok(rkey) 100} 101 102// key format: {SEQ} 103pub fn event_key(seq: u64) -> [u8; 8] { 104 seq.to_be_bytes() 105} 106 107pub const COUNT_KS_PREFIX: &[u8] = &[b'k', SEP]; 108 109// count keys for the counts keyspace 110// key format: k\x00{keyspace_name} 111pub fn count_keyspace_key(name: &str) -> Vec<u8> { 112 let mut key = Vec::with_capacity(COUNT_KS_PREFIX.len() + name.len()); 113 key.extend_from_slice(COUNT_KS_PREFIX); 114 key.extend_from_slice(name.as_bytes()); 115 key 116} 117 118pub const COUNT_COLLECTION_PREFIX: &[u8] = &[b'r', SEP]; 119 120// key format: r|{DID}|{collection} (DID trimmed) 121pub fn count_collection_key(did: &Did, collection: &str) -> Vec<u8> { 122 let repo = TrimmedDid::from(did); 123 let mut key = 124 Vec::with_capacity(COUNT_COLLECTION_PREFIX.len() + repo.len() + 1 + collection.len()); 125 key.extend_from_slice(COUNT_COLLECTION_PREFIX); 126 repo.write_to_vec(&mut key); 127 key.push(SEP); 128 key.extend_from_slice(collection.as_bytes()); 129 key 130} 131 132// key format: {DID}|{rev} 133pub fn resync_buffer_key(did: &Did, rev: DbTid) -> Vec<u8> { 134 let repo = TrimmedDid::from(did); 135 let mut key = Vec::with_capacity(repo.len() + 1 + 8); 136 repo.write_to_vec(&mut key); 137 key.push(SEP); 138 key.extend_from_slice(&rev.as_bytes()); 139 key 140} 141 142// prefix format: {DID}| (DID trimmed) 143pub fn resync_buffer_prefix(did: &Did) -> Vec<u8> { 144 let repo = TrimmedDid::from(did); 145 let mut prefix = Vec::with_capacity(repo.len() + 1); 146 repo.write_to_vec(&mut prefix); 147 prefix.push(SEP); 148 prefix 149} 150 151/// key format: `ret|<did bytes>` 152pub const CRAWLER_RETRY_PREFIX: &[u8] = b"ret|"; 153 154pub fn crawler_retry_key(did: &Did) -> Vec<u8> { 155 let repo = TrimmedDid::from(did); 156 let mut key = Vec::with_capacity(CRAWLER_RETRY_PREFIX.len() + repo.len()); 157 key.extend_from_slice(CRAWLER_RETRY_PREFIX); 158 repo.write_to_vec(&mut key); 159 key 160} 161 162pub fn crawler_retry_parse_key(key: &[u8]) -> miette::Result<TrimmedDid<'_>> { 163 TrimmedDid::try_from(&key[CRAWLER_RETRY_PREFIX.len()..]) 164} 165 166pub fn crawler_cursor_key(relay: &Url) -> Vec<u8> { 167 let mut key = b"crawler_cursor|".to_vec(); 168 key.extend_from_slice(relay.as_str().as_bytes()); 169 key 170} 171 172pub fn firehose_cursor_key(relay: &Url) -> Vec<u8> { 173 let mut key = b"firehose_cursor|".to_vec(); 174 key.extend_from_slice(relay.as_str().as_bytes()); 175 key 176}