at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1#!/usr/bin/env nu
2use common.nu *
3
4def run-test-instance [name: string, scenario_closure: closure] {
5 let port = 3004
6 let debug_port = $port + 1
7 let url = $"http://localhost:($port)"
8 let debug_url = $"http://localhost:($debug_port)"
9 let db_path = (mktemp -d -t hydrant_gc_test.XXXXXX)
10
11 print $"--- running scenario: ($name) ---"
12 print $"database path: ($db_path)"
13
14 let binary = build-hydrant
15 let instance = start-hydrant $binary $db_path $port
16
17 try {
18 if not (wait-for-api $url) {
19 error make {msg: "api failed to start"}
20 }
21
22 do $scenario_closure $url $debug_url
23
24 print $"PASSED: ($name)\n"
25 } catch { |e|
26 print $"test failed: ($e.msg)"
27 try { kill $instance.pid }
28 exit 1
29 }
30
31 try { kill $instance.pid }
32}
33
34def wait-for-blocks [debug_url: string] {
35 print "waiting for blocks to appear..."
36 mut blocks = {}
37 mut count = 0
38 for i in 1..30 {
39 $blocks = (http get $"($debug_url)/debug/iter?partition=blocks&limit=1000")
40 $count = ($blocks.items | length)
41 if $count > 0 {
42 break
43 }
44 sleep 2sec
45 }
46 if $count == 0 {
47 error make {msg: "FAILED: no blocks found after backfill"}
48 }
49 $count
50}
51
52def compact-and-check-blocks [debug_url: string, expected_count: int] {
53 print "triggering major compaction on blocks partition..."
54 http post -H [Content-Length 0] $"($debug_url)/debug/compact?partition=blocks" ""
55
56 let blocks_after = http get $"($debug_url)/debug/iter?partition=blocks&limit=1000"
57 let after_count = ($blocks_after.items | length)
58
59 if $after_count != $expected_count {
60 error make {msg: $"FAILED: expected ($expected_count) blocks after compaction, found ($after_count)"}
61 }
62}
63
64def check-repo-refcounts [debug_url: string, did: string, expected_refcount: int] {
65 let refs = (http get $"($debug_url)/debug/repo_refcounts?did=($did)")
66 let cids = $refs.cids
67 let count = ($cids | columns | length)
68 if $count == 0 {
69 if $expected_refcount != 0 {
70 error make {msg: $"FAILED: expected refcounts to be ($expected_refcount) but found no cids for ($did)"}
71 }
72 print $"blocks for ($did) completely verified as 0"
73 return
74 }
75
76 for rcid in ($cids | transpose key value) {
77 if $rcid.value < $expected_refcount {
78 error make {msg: $"FAILED: expected refcount for ($rcid.key) to be >= ($expected_refcount), but found ($rcid.value)"}
79 }
80 if $expected_refcount == 0 and $rcid.value != 0 {
81 error make {msg: $"FAILED: expected refcount for ($rcid.key) to be 0, but found ($rcid.value)"}
82 }
83 }
84 print $"all ($count) tracked blocks for ($did) verified with refcount >= ($expected_refcount)"
85}
86
87def ack-all-events [debug_url: string, url: string] {
88 print "acking all events..."
89 mut total_acked = 0
90 mut items = []
91
92 # wait for at least some events
93 for i in 1..30 {
94 let events = http get $"($debug_url)/debug/iter?partition=events&limit=1000"
95 $items = $events.items
96 if ($items | length) > 0 {
97 break
98 }
99 sleep 2sec
100 }
101
102 if ($items | length) == 0 {
103 error make {msg: "FAILED: no events to ack"}
104 }
105
106 loop {
107 let event_ids = ($items | each { |x| ($x | first | into int) })
108 http post -t application/json $"($url)/stream/ack" { ids: $event_ids }
109 $total_acked += ($event_ids | length)
110
111 # getting next batch
112 let next_events = http get $"($debug_url)/debug/iter?partition=events&limit=1000"
113 $items = $next_events.items
114 if ($items | length) == 0 {
115 break
116 }
117 }
118
119 print $"acked ($total_acked) events"
120}
121
122def main [] {
123 let repo1 = "did:web:guestbook.gaze.systems"
124 let repo2 = "did:plc:dfl62fgb7wtjj3fcbb72naae"
125
126 run-test-instance "delete repo only" { |url, debug_url|
127 print $"adding repo ($repo1) to tracking..."
128 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ]
129
130 let before_count = (wait-for-blocks $debug_url)
131 print $"found ($before_count) blocks before GC"
132
133 check-repo-refcounts $debug_url $repo1 2
134
135 print "deleting repo..."
136 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ]
137 sleep 1sec
138
139 check-repo-refcounts $debug_url $repo1 1
140
141 compact-and-check-blocks $debug_url $before_count
142 }
143
144 run-test-instance "ack events only" { |url, debug_url|
145 print $"adding repo ($repo1) to tracking..."
146 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ]
147
148 let before_count = (wait-for-blocks $debug_url)
149 print $"found ($before_count) blocks before GC"
150
151 check-repo-refcounts $debug_url $repo1 2
152
153 ack-all-events $debug_url $url
154 sleep 1sec
155
156 check-repo-refcounts $debug_url $repo1 1
157
158 compact-and-check-blocks $debug_url $before_count
159 }
160
161 run-test-instance "delete repo, ack events" { |url, debug_url|
162 print $"adding repo ($repo1) to tracking..."
163 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ]
164
165 let before_count = (wait-for-blocks $debug_url)
166 print $"found ($before_count) blocks before GC"
167
168 check-repo-refcounts $debug_url $repo1 2
169
170 print "deleting repo..."
171 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ]
172
173 ack-all-events $debug_url $url
174 sleep 1sec
175
176 check-repo-refcounts $debug_url $repo1 0
177
178 compact-and-check-blocks $debug_url 0
179 }
180
181 run-test-instance "delete repo, compact, ack events, compact" { |url, debug_url|
182 print $"adding repo ($repo1) to tracking..."
183 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ]
184
185 let before_count = (wait-for-blocks $debug_url)
186 print $"found ($before_count) blocks before GC"
187
188 check-repo-refcounts $debug_url $repo1 2
189
190 print "deleting repo..."
191 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ]
192 sleep 1sec
193
194 check-repo-refcounts $debug_url $repo1 1
195
196 compact-and-check-blocks $debug_url $before_count
197
198 ack-all-events $debug_url $url
199 sleep 1sec
200
201 check-repo-refcounts $debug_url $repo1 0
202
203 compact-and-check-blocks $debug_url 0
204 }
205
206 run-test-instance "ack events, compact, delete repo, compact" { |url, debug_url|
207 print $"adding repo ($repo1) to tracking..."
208 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ]
209
210 let before_count = (wait-for-blocks $debug_url)
211 print $"found ($before_count) blocks before GC"
212
213 check-repo-refcounts $debug_url $repo1 2
214
215 ack-all-events $debug_url $url
216 sleep 1sec
217
218 check-repo-refcounts $debug_url $repo1 1
219
220 compact-and-check-blocks $debug_url $before_count
221
222 print "deleting repo..."
223 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ]
224 sleep 1sec
225
226 check-repo-refcounts $debug_url $repo1 0
227
228 compact-and-check-blocks $debug_url 0
229 }
230
231 run-test-instance "multiple repos" { |url, debug_url|
232 print $"adding repo ($repo2) to tracking..."
233 http put -t application/json $"($url)/repos" [ { did: ($repo2) } ]
234 let repo2_blocks = (wait-for-blocks $debug_url)
235 print $"found ($repo2_blocks) blocks for repo2"
236
237 print $"adding repo ($repo1) to tracking..."
238 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ]
239
240 # wait a bit more for repo1 blocks to finish
241 sleep 5sec
242 let total_blocks = (http get $"($debug_url)/debug/iter?partition=blocks&limit=1000000" | get items | length)
243 print $"found ($total_blocks) total blocks before GC"
244
245 print $"deleting repo ($repo1)..."
246 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ]
247
248 # ack events specifically for repo1? Actually wait, the events endpoint contains all events.
249 # we will ack all events to be safe. Since repo2 is NOT deleted, its refcount should be fine even if events are acked.
250 ack-all-events $debug_url $url
251 sleep 1sec
252
253 # repo1 should have expected refcount 0
254 check-repo-refcounts $debug_url $repo1 0
255 # Wait, for repo2, we didn't delete it, but events are acked, so its refcount should be 1
256 check-repo-refcounts $debug_url $repo2 1
257
258 compact-and-check-blocks $debug_url $repo2_blocks
259 }
260
261 print "all gc tests passed!"
262}