at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 262 lines 8.9 kB view raw
1#!/usr/bin/env nu 2use common.nu * 3 4def run-test-instance [name: string, scenario_closure: closure] { 5 let port = 3004 6 let debug_port = $port + 1 7 let url = $"http://localhost:($port)" 8 let debug_url = $"http://localhost:($debug_port)" 9 let db_path = (mktemp -d -t hydrant_gc_test.XXXXXX) 10 11 print $"--- running scenario: ($name) ---" 12 print $"database path: ($db_path)" 13 14 let binary = build-hydrant 15 let instance = start-hydrant $binary $db_path $port 16 17 try { 18 if not (wait-for-api $url) { 19 error make {msg: "api failed to start"} 20 } 21 22 do $scenario_closure $url $debug_url 23 24 print $"PASSED: ($name)\n" 25 } catch { |e| 26 print $"test failed: ($e.msg)" 27 try { kill $instance.pid } 28 exit 1 29 } 30 31 try { kill $instance.pid } 32} 33 34def wait-for-blocks [debug_url: string] { 35 print "waiting for blocks to appear..." 36 mut blocks = {} 37 mut count = 0 38 for i in 1..30 { 39 $blocks = (http get $"($debug_url)/debug/iter?partition=blocks&limit=1000") 40 $count = ($blocks.items | length) 41 if $count > 0 { 42 break 43 } 44 sleep 2sec 45 } 46 if $count == 0 { 47 error make {msg: "FAILED: no blocks found after backfill"} 48 } 49 $count 50} 51 52def compact-and-check-blocks [debug_url: string, expected_count: int] { 53 print "triggering major compaction on blocks partition..." 54 http post -H [Content-Length 0] $"($debug_url)/debug/compact?partition=blocks" "" 55 56 let blocks_after = http get $"($debug_url)/debug/iter?partition=blocks&limit=1000" 57 let after_count = ($blocks_after.items | length) 58 59 if $after_count != $expected_count { 60 error make {msg: $"FAILED: expected ($expected_count) blocks after compaction, found ($after_count)"} 61 } 62} 63 64def check-repo-refcounts [debug_url: string, did: string, expected_refcount: int] { 65 let refs = (http get $"($debug_url)/debug/repo_refcounts?did=($did)") 66 let cids = $refs.cids 67 let count = ($cids | columns | length) 68 if $count == 0 { 69 if $expected_refcount != 0 { 70 error make {msg: $"FAILED: expected refcounts to be ($expected_refcount) but found no cids for ($did)"} 71 } 72 print $"blocks for ($did) completely verified as 0" 73 return 74 } 75 76 for rcid in ($cids | transpose key value) { 77 if $rcid.value < $expected_refcount { 78 error make {msg: $"FAILED: expected refcount for ($rcid.key) to be >= ($expected_refcount), but found ($rcid.value)"} 79 } 80 if $expected_refcount == 0 and $rcid.value != 0 { 81 error make {msg: $"FAILED: expected refcount for ($rcid.key) to be 0, but found ($rcid.value)"} 82 } 83 } 84 print $"all ($count) tracked blocks for ($did) verified with refcount >= ($expected_refcount)" 85} 86 87def ack-all-events [debug_url: string, url: string] { 88 print "acking all events..." 89 mut total_acked = 0 90 mut items = [] 91 92 # wait for at least some events 93 for i in 1..30 { 94 let events = http get $"($debug_url)/debug/iter?partition=events&limit=1000" 95 $items = $events.items 96 if ($items | length) > 0 { 97 break 98 } 99 sleep 2sec 100 } 101 102 if ($items | length) == 0 { 103 error make {msg: "FAILED: no events to ack"} 104 } 105 106 loop { 107 let event_ids = ($items | each { |x| ($x | first | into int) }) 108 http post -t application/json $"($url)/stream/ack" { ids: $event_ids } 109 $total_acked += ($event_ids | length) 110 111 # getting next batch 112 let next_events = http get $"($debug_url)/debug/iter?partition=events&limit=1000" 113 $items = $next_events.items 114 if ($items | length) == 0 { 115 break 116 } 117 } 118 119 print $"acked ($total_acked) events" 120} 121 122def main [] { 123 let repo1 = "did:web:guestbook.gaze.systems" 124 let repo2 = "did:plc:dfl62fgb7wtjj3fcbb72naae" 125 126 run-test-instance "delete repo only" { |url, debug_url| 127 print $"adding repo ($repo1) to tracking..." 128 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 129 130 let before_count = (wait-for-blocks $debug_url) 131 print $"found ($before_count) blocks before GC" 132 133 check-repo-refcounts $debug_url $repo1 2 134 135 print "deleting repo..." 136 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 137 sleep 1sec 138 139 check-repo-refcounts $debug_url $repo1 1 140 141 compact-and-check-blocks $debug_url $before_count 142 } 143 144 run-test-instance "ack events only" { |url, debug_url| 145 print $"adding repo ($repo1) to tracking..." 146 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 147 148 let before_count = (wait-for-blocks $debug_url) 149 print $"found ($before_count) blocks before GC" 150 151 check-repo-refcounts $debug_url $repo1 2 152 153 ack-all-events $debug_url $url 154 sleep 1sec 155 156 check-repo-refcounts $debug_url $repo1 1 157 158 compact-and-check-blocks $debug_url $before_count 159 } 160 161 run-test-instance "delete repo, ack events" { |url, debug_url| 162 print $"adding repo ($repo1) to tracking..." 163 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 164 165 let before_count = (wait-for-blocks $debug_url) 166 print $"found ($before_count) blocks before GC" 167 168 check-repo-refcounts $debug_url $repo1 2 169 170 print "deleting repo..." 171 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 172 173 ack-all-events $debug_url $url 174 sleep 1sec 175 176 check-repo-refcounts $debug_url $repo1 0 177 178 compact-and-check-blocks $debug_url 0 179 } 180 181 run-test-instance "delete repo, compact, ack events, compact" { |url, debug_url| 182 print $"adding repo ($repo1) to tracking..." 183 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 184 185 let before_count = (wait-for-blocks $debug_url) 186 print $"found ($before_count) blocks before GC" 187 188 check-repo-refcounts $debug_url $repo1 2 189 190 print "deleting repo..." 191 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 192 sleep 1sec 193 194 check-repo-refcounts $debug_url $repo1 1 195 196 compact-and-check-blocks $debug_url $before_count 197 198 ack-all-events $debug_url $url 199 sleep 1sec 200 201 check-repo-refcounts $debug_url $repo1 0 202 203 compact-and-check-blocks $debug_url 0 204 } 205 206 run-test-instance "ack events, compact, delete repo, compact" { |url, debug_url| 207 print $"adding repo ($repo1) to tracking..." 208 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 209 210 let before_count = (wait-for-blocks $debug_url) 211 print $"found ($before_count) blocks before GC" 212 213 check-repo-refcounts $debug_url $repo1 2 214 215 ack-all-events $debug_url $url 216 sleep 1sec 217 218 check-repo-refcounts $debug_url $repo1 1 219 220 compact-and-check-blocks $debug_url $before_count 221 222 print "deleting repo..." 223 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 224 sleep 1sec 225 226 check-repo-refcounts $debug_url $repo1 0 227 228 compact-and-check-blocks $debug_url 0 229 } 230 231 run-test-instance "multiple repos" { |url, debug_url| 232 print $"adding repo ($repo2) to tracking..." 233 http put -t application/json $"($url)/repos" [ { did: ($repo2) } ] 234 let repo2_blocks = (wait-for-blocks $debug_url) 235 print $"found ($repo2_blocks) blocks for repo2" 236 237 print $"adding repo ($repo1) to tracking..." 238 http put -t application/json $"($url)/repos" [ { did: ($repo1) } ] 239 240 # wait a bit more for repo1 blocks to finish 241 sleep 5sec 242 let total_blocks = (http get $"($debug_url)/debug/iter?partition=blocks&limit=1000000" | get items | length) 243 print $"found ($total_blocks) total blocks before GC" 244 245 print $"deleting repo ($repo1)..." 246 http delete -t application/json $"($url)/repos" --data [ { did: ($repo1), delete_data: true } ] 247 248 # ack events specifically for repo1? Actually wait, the events endpoint contains all events. 249 # we will ack all events to be safe. Since repo2 is NOT deleted, its refcount should be fine even if events are acked. 250 ack-all-events $debug_url $url 251 sleep 1sec 252 253 # repo1 should have expected refcount 0 254 check-repo-refcounts $debug_url $repo1 0 255 # Wait, for repo2, we didn't delete it, but events are acked, so its refcount should be 1 256 check-repo-refcounts $debug_url $repo2 1 257 258 compact-and-check-blocks $debug_url $repo2_blocks 259 } 260 261 print "all gc tests passed!" 262}