com.atproto.sync.listReposByCollection
lightrail is written in rust. assuming you've got cargo installed, you should be able to
$ cargo run -- --subscribe <pds-or-relay>
in the project root to be up and running!
do not manually modify any contents in src/generated_lexicons/ -- to
regenerate lexicons from the local defs in ./lexicons, just do:
$ cargo run --example lexgen
and the codegen'd lexicons will be updated based on the JSON lexicon definitions
- TODO: make local dev work without needing an actual live firehose and backfill etc.
before submitting a pull request, please run rustfmt, clippy, and all tests
$ cargo test && cargo fmt && cargo clippy
-
prefer local error enums over generic string-y errors
-
for jacquard's CoW-wrapped types:
- accept borrows like
fn do_thing(did: &Did<'_>). - accept possibly-needing ownership like
fn do_thing(did: Did<'_>). - accept full ownership like
fn do_thing(did: Did<'static>).
- accept borrows like
-
TODO: configure tangled CI to run these on PR
nextest#
the full test suite should run in single-digit seconds on a fast machine, but you can make it go slightly faster (and with prettier output) with cargo-nextest
$ cargo install cargo-nextest --locked
then do cargo nextest run. or cargo t -- aliased in ./cargo/config.toml.
if your terminal makes annoying noisy output for BEL chars like mine does (which, nextest outputs many!!), there's a wrapper to strip them, ./test-quiet.sh.
the wrapper also sets the concurrency to CPUs * 3 which seems to be the fastest for this test suite for me. if you're not using the wrapper, you can do it manually like cargo t -j 30.
current implementation status#
lightrail is like 4% interesting approach to listReposByCollection and 96% just a sync1.1 implementation.
- get collections list for repo
- db keys
- server impl and api endpoints
- DID -> host resolution? or do we just rely on the firehose source to redirect?
- actual backfill
- resync handling
- actually running things
- seems like it's not shutting down when one of the tasks fails?
- firehose event routing
- per-did work queue + worker dispatch
- #identity: invalidate cache (in-queue so old commits use old pubkey)
- #account: update account state
- #commit and #sync
- make sure blocking db calls are in
spawn_blocking!! - db queries
- implement
com.atproto.sync.listRepos - sync1.1!!!
- verify #commit event
- verify #sync event
- inductive proof for #commits
- configuration
- copy applicable from tap
- [-] copy applicable from collectiondir
- filter dids from inactive accounts
- metrics
- basic metrics
- serve prom-style
- [-] copy applicable ones from collectiondir
- PR a change to com.atproto.sync.listReposByCollection
- codegen for local lexicons (with updated listReposByCollection)
- multi-collection parallel walk/merge
- back out the collections-list index (not doing prefix matching after all)
- #sync shortcuts
- noop if rev/cid are unchanged
- [-] drop any preceeding #commits from the queue? (tricky with noop)
- swap in repo-stream for backfill
- with memory limit
- [-] with a global concurrency limit for big repos (maybe later)
- [-] with disk spilling for huge repo (maybe later)
- [-] with queueing resync for large repos if resources are taken?? (maybe later)
- (self-reminder: get_repo should be rare in lightrail)
- prefix-merge walker (limit by total collections to be merged?)
- add an all-collections index
- [~] actually firehose-index!!
- extract collections-added/removed directly from CAR slice
- (spend some time on tests here)
- do the thing (write them to the db)
- swap in repo-stream
- extract collections-added/removed directly from CAR slice
- actually wire in the resync buffer (oops)
- make sure we're doing the right thing on decode errors (seems we are, tungstenite closes connection)
- "deep crawl" mode for relays
- listHosts -> listRepos on host instead of relying on relay listRepos
- defensive loop-cursor handling
- lenient pre-sync1.1
- don't allow non-validating commits that look like sync1.1
- rachet by PDS host: be lenient if we have never seen a sync1.1-looking commit, always strict after we see one.
- [?] boooo we might need more handling for pre-sync1.1 repos if they don't include adjacent keys
- split the keyspace: put the rbc/cbr indexes on a second keyspace with larger block size, expect hits on main keyspace
- firehose websocket
- [-]
ping/pong (unless jacquard is already doing it):seems like no but we can skip it - no-events-received timeout reconnect
- [-]
- account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale?
- resync short-circuit: tiny repos may actually return their entire CAR for getRecord
- commit CAR handling: generate a list of keys with gaps noted, to reliably detect missing adjacent keys
- repo-stream: drop record block contents with processor fn
- meta/metrics keyspace for general stats
- total repos (hyperloglog estimate?)
- resync queue size
very much still todo but i'm getting tired
- config: add a
--heavymode that always usesgetRepoand neverdescribeRepo - config: db mem limit
--fjall-cache-mb - config: per-host request rate self-throttling
--crawl-qps(name from collectiondir) - resync: estimate CAR size from
getRecordmst height;getRepoif it's likely very small - special did:web ident cache behaviour to keep reusing a stale resolution on failure
- admin view of backfill state etc
- vanity stats for optimizations, like how many in-flight repos were saved from resync due to high-water-mark firehose cursor persistence
- if the upstream is a PDS (check with describeServer?) then make only accept events for DIDs that have it as their PDS
- use
sinceon getRepo for resync to get a smaller partial export in many cases (and then more-carefully do the actual resync) - combine the throttled http client instance, the db, and the admin info into an appstate fineeeee
- bad word filtering? (collectiondir has it)
- check response headers and adjust self-throttling rate limits per-host if present
- make backfill go really fast
going to be annoying but doable
- multi-relay subscriber
special-casing#
- bridgy doesn't give proof for not-found on sync.getRecord! (right now we always fall back to getRepo) (bug report)
some choices#
- tokio for async runtime: works good
- jacquard almost everywhere: works good
- repo-stream for CAR processing
- fjall: workload is write-heavy so LSM works good, space efficiency also very nice
resync: getting a repo's full collection list#
on initial backfill, a #sync event with a new MST root, or on detection of discontinuity (rev jump or sync1.1 inductive proof fail), we need to fetch the full collection list of a repo.
right now two approaches are implemented:
-
get the whole repo and scan it.
this is nice because we receive a
commitobject as well, so we get full sync1.1 integrity on cutover.this is not nice because whole-repo exports are big.
-
call
com.atproto.repo.describeRepoand use the returnedcollectionslistthis is nice because it's a single, small request
this is not nice because the response has neither the MST root signature nor the repo's
rev, so we have to do extra work.
the extra work for describeRepo currently entails calling er, calling com.atproto.sync.getLatestCommit firstcom.atproto.sync.getRecord with an arbitrary key first, to get a rev and MST root CID. this isn't sync-1.1 level robust: we're racing in between the two requests against potential new collections being added or removed. but doing cutover from a latest-commit before describeRepo was called should make the list converge to the correct state.
there are a few other ways we might get a fully-authenticated collections list, see ./authenticated-collection-list.md for some possible approaches.
state/db models#
taking inspiration from tap here!
see src/storage/mod.rs for an accurate key summary. rough overview:
main index:
"rbc"||<collection>||<did> => ()
note: value unused for now
reversed index:
"cbr"||<did>||<collection> => ()
note: supports `#sync` diffing and account deletion
subscribeRepos (firehose) cursor:
"sub"||<subscribe_host>||"cursor" => u64
subscribeRepos' host listRepos progress:
"lsr"||<subscribe_host> => {
cursor: String,
completed: Option<DateTime>,
}
note: alternatively we could just delete the key when done. we'd know not to
restart because the subscribeRepos entry existing could mean that.
per-repo state stuff:
"repo"||<did> => {
state: RepoState,
status: RepoStatus,
error: Option<String>,
}
per-repo transient sync state:
"rev"||<did> => <rev:string>||<prevData:cid>
note: kept separate and small because it very frequently updates!
all-collections list:
"col"||<collection> => ()
resync queue:
"rsq"||<after:timestamp/u64_be>||<did> => {
commit: cbor,
retryCount: u16,
retryReason: string,
}
TODO: per-did resync rate-limit? state might need to live somewhere
resync buffer:
"rsb"||<did>||<seq_be:u64> => <raw firehose event:cbor>
the || concatenations are NULL bytes in most places (\0), which works because DIDs, NSIDs, and URLs all disallow null bytes. key and value structures here are simple enough that we just encode/decode everything manually.
upstream rate-limiting#
TODO: describe this better, but:
- we have basic per-host throttles to prevent spamming non-defensive upstream hosts
- on any 429 response, the upstream host is marked for a cooldown period, requests are avoided until this completes.
identity resolution#
TODO: describe the cache behaviour (it's reasonable)
relay cursor#
tap persist a cursor value once per second and doesn't try to ensure it's the minimum-safe-cursor. unfinished events-in-flight with lower sequence numbers are lost whenever the tap process exits, and will be detected as a repo discontinuity on that DID's next event, triggering a resync.
it's fine but a little annoying. you can have events missing for a long time if a repo doesn't emit another commit for a long time, and resyncs require pulling the whole CAR.
wip: hoping we can track in-flight event sequence numbers here, and only permit the maximum-successful-before-minimum-in-flight sequence to the db.
parallel work#
there several implementations of worker pools: one for backfill, one for firehose commits, etc. they work slightly differently from Bluesky's parallel scheduler (used in tap, relay, jetstream, ..):
Bluesky's parallel scheduler assigns work by sharding on the associated DID: each worker is essentially assigned a subset of DIDs it's responsible for. This is really nice and pretty simple, and upholds the important thing: work for a specific DID is never assigned to more than one worder, so all event for any specific DID are always handled sequentially.
The dispatchers in this project uphold the same sequential-handling rule, but use a "busy set" of DIDs to ensure that two workers never try to process work on the same DID simultaneously. The one (very small) advantage is that any available work can be claimed by any worker.
return order of listReposByCollection:#
TODO: move this to the main readme probably
collectiondir indexes repos by discovery time, so you haven't missed any newly-added repos since you starting paging through. if we cursor over DIDs, then there can be some added mid-paging. is that a problem?
i think clients should be listening to the firehose before they start walking here, which should help them avoid missing any repos (newly-added ones while paging would be seen in the firehose).
not sure if there would be value in it but we could also grab a keyspace snapshot so that a client has an exact consistent view while they page through... but for full-network paging that can take a long time, this is maybe not such a space-friendly idea.
my current thinking is that ordering repos by did is probably ok