+300
-73
Cargo.lock
+300
-73
Cargo.lock
···
112
112
113
113
[[package]]
114
114
name = "anyhow"
115
-
version = "1.0.97"
115
+
version = "1.0.100"
116
116
source = "registry+https://github.com/rust-lang/crates.io-index"
117
-
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
117
+
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
118
118
119
119
[[package]]
120
120
name = "arbitrary"
···
127
127
version = "1.7.1"
128
128
source = "registry+https://github.com/rust-lang/crates.io-index"
129
129
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
130
+
131
+
[[package]]
132
+
name = "arrayref"
133
+
version = "0.3.9"
134
+
source = "registry+https://github.com/rust-lang/crates.io-index"
135
+
checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb"
130
136
131
137
[[package]]
132
138
name = "arrayvec"
···
192
198
"nom",
193
199
"num-traits",
194
200
"rusticata-macros",
195
-
"thiserror 2.0.16",
201
+
"thiserror 2.0.17",
196
202
"time",
197
203
]
198
204
···
644
650
"axum",
645
651
"handlebars",
646
652
"serde",
647
-
"thiserror 2.0.16",
653
+
"thiserror 2.0.17",
648
654
]
649
655
650
656
[[package]]
···
673
679
version = "0.2.0"
674
680
source = "registry+https://github.com/rust-lang/crates.io-index"
675
681
checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf"
682
+
683
+
[[package]]
684
+
name = "base256emoji"
685
+
version = "1.0.2"
686
+
source = "registry+https://github.com/rust-lang/crates.io-index"
687
+
checksum = "b5e9430d9a245a77c92176e649af6e275f20839a48389859d1661e9a128d077c"
688
+
dependencies = [
689
+
"const-str",
690
+
"match-lookup",
691
+
]
676
692
677
693
[[package]]
678
694
name = "base64"
···
812
828
checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
813
829
814
830
[[package]]
831
+
name = "blake3"
832
+
version = "1.8.2"
833
+
source = "registry+https://github.com/rust-lang/crates.io-index"
834
+
checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0"
835
+
dependencies = [
836
+
"arrayref",
837
+
"arrayvec",
838
+
"cc",
839
+
"cfg-if",
840
+
"constant_time_eq",
841
+
]
842
+
843
+
[[package]]
815
844
name = "block-buffer"
816
845
version = "0.10.4"
817
846
source = "registry+https://github.com/rust-lang/crates.io-index"
···
839
868
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
840
869
841
870
[[package]]
871
+
name = "byteorder-lite"
872
+
version = "0.1.0"
873
+
source = "registry+https://github.com/rust-lang/crates.io-index"
874
+
checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"
875
+
876
+
[[package]]
842
877
name = "bytes"
843
878
version = "1.10.1"
844
879
source = "registry+https://github.com/rust-lang/crates.io-index"
···
851
886
checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5"
852
887
853
888
[[package]]
889
+
name = "byteview"
890
+
version = "0.8.0"
891
+
source = "registry+https://github.com/rust-lang/crates.io-index"
892
+
checksum = "1e6b0e42e210b794e14b152c6fe1a55831e30ef4a0f5dc39d73d714fb5f1906c"
893
+
894
+
[[package]]
854
895
name = "bzip2-sys"
855
896
version = "0.1.13+1.0.8"
856
897
source = "registry+https://github.com/rust-lang/crates.io-index"
···
890
931
"enum_dispatch",
891
932
"serde",
892
933
]
934
+
935
+
[[package]]
936
+
name = "cbor4ii"
937
+
version = "0.2.14"
938
+
source = "registry+https://github.com/rust-lang/crates.io-index"
939
+
checksum = "b544cf8c89359205f4f990d0e6f3828db42df85b5dac95d09157a250eb0749c4"
940
+
dependencies = [
941
+
"serde",
942
+
]
943
+
944
+
[[package]]
945
+
name = "cbor4ii"
946
+
version = "1.2.0"
947
+
source = "registry+https://github.com/rust-lang/crates.io-index"
948
+
checksum = "b28d2802395e3bccd95cc4ae984bff7444b6c1f5981da46a41360c42a2c7e2d9"
893
949
894
950
[[package]]
895
951
name = "cc"
···
976
1032
"multihash",
977
1033
"serde",
978
1034
"serde_bytes",
979
-
"unsigned-varint",
1035
+
"unsigned-varint 0.8.0",
980
1036
]
981
1037
982
1038
[[package]]
···
992
1048
993
1049
[[package]]
994
1050
name = "clap"
995
-
version = "4.5.47"
1051
+
version = "4.5.48"
996
1052
source = "registry+https://github.com/rust-lang/crates.io-index"
997
-
checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931"
1053
+
checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae"
998
1054
dependencies = [
999
1055
"clap_builder",
1000
1056
"clap_derive",
···
1002
1058
1003
1059
[[package]]
1004
1060
name = "clap_builder"
1005
-
version = "4.5.47"
1061
+
version = "4.5.48"
1006
1062
source = "registry+https://github.com/rust-lang/crates.io-index"
1007
-
checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6"
1063
+
checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9"
1008
1064
dependencies = [
1009
1065
"anstream",
1010
1066
"anstyle",
···
1085
1141
version = "0.9.6"
1086
1142
source = "registry+https://github.com/rust-lang/crates.io-index"
1087
1143
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
1144
+
1145
+
[[package]]
1146
+
name = "const-str"
1147
+
version = "0.4.3"
1148
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1149
+
checksum = "2f421161cb492475f1661ddc9815a745a1c894592070661180fdec3d4872e9c3"
1150
+
1151
+
[[package]]
1152
+
name = "constant_time_eq"
1153
+
version = "0.3.1"
1154
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1155
+
checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
1088
1156
1089
1157
[[package]]
1090
1158
name = "constellation"
···
1353
1421
]
1354
1422
1355
1423
[[package]]
1424
+
name = "dasl"
1425
+
version = "0.2.0"
1426
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1427
+
checksum = "b59666035a4386b0fd272bd78da4cbc3ccb558941e97579ab00f0eb4639f2a49"
1428
+
dependencies = [
1429
+
"blake3",
1430
+
"cbor4ii 1.2.0",
1431
+
"data-encoding",
1432
+
"data-encoding-macro",
1433
+
"scopeguard",
1434
+
"serde",
1435
+
"serde_bytes",
1436
+
"sha2",
1437
+
"thiserror 2.0.17",
1438
+
]
1439
+
1440
+
[[package]]
1356
1441
name = "data-encoding"
1357
-
version = "2.8.0"
1442
+
version = "2.9.0"
1358
1443
source = "registry+https://github.com/rust-lang/crates.io-index"
1359
-
checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010"
1444
+
checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
1360
1445
1361
1446
[[package]]
1362
1447
name = "data-encoding-macro"
1363
-
version = "0.1.17"
1448
+
version = "0.1.18"
1364
1449
source = "registry+https://github.com/rust-lang/crates.io-index"
1365
-
checksum = "9f9724adfcf41f45bf652b3995837669d73c4d49a1b5ac1ff82905ac7d9b5558"
1450
+
checksum = "47ce6c96ea0102f01122a185683611bd5ac8d99e62bc59dd12e6bda344ee673d"
1366
1451
dependencies = [
1367
1452
"data-encoding",
1368
1453
"data-encoding-macro-internal",
···
1370
1455
1371
1456
[[package]]
1372
1457
name = "data-encoding-macro-internal"
1373
-
version = "0.1.15"
1458
+
version = "0.1.16"
1374
1459
source = "registry+https://github.com/rust-lang/crates.io-index"
1375
-
checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f"
1460
+
checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976"
1376
1461
dependencies = [
1377
1462
"data-encoding",
1378
-
"syn 1.0.109",
1463
+
"syn 2.0.106",
1379
1464
]
1380
1465
1381
1466
[[package]]
···
1579
1664
"slog-bunyan",
1580
1665
"slog-json",
1581
1666
"slog-term",
1582
-
"thiserror 2.0.16",
1667
+
"thiserror 2.0.17",
1583
1668
"tokio",
1584
1669
"tokio-rustls 0.25.0",
1585
1670
"toml 0.9.7",
···
1783
1868
checksum = "0b25ad44cd4360a0448a9b5a0a6f1c7a621101cca4578706d43c9a821418aebc"
1784
1869
dependencies = [
1785
1870
"byteorder",
1786
-
"byteview",
1871
+
"byteview 0.6.1",
1787
1872
"dashmap",
1788
1873
"log",
1789
-
"lsm-tree",
1874
+
"lsm-tree 2.10.4",
1790
1875
"path-absolutize",
1791
1876
"std-semaphore",
1792
1877
"tempfile",
···
1799
1884
source = "git+https://github.com/fjall-rs/fjall.git#42d811f7c8cc9004407d520d37d2a1d8d246c03d"
1800
1885
dependencies = [
1801
1886
"byteorder",
1802
-
"byteview",
1887
+
"byteview 0.6.1",
1803
1888
"dashmap",
1804
1889
"log",
1805
-
"lsm-tree",
1890
+
"lsm-tree 2.10.4",
1806
1891
"path-absolutize",
1807
1892
"std-semaphore",
1808
1893
"tempfile",
···
1810
1895
]
1811
1896
1812
1897
[[package]]
1898
+
name = "fjall"
1899
+
version = "3.0.0-pre.0"
1900
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1901
+
checksum = "467588c1f15d1cfa9e43f02a45cf55d82fa1f12a6ae961b848c520458525600c"
1902
+
dependencies = [
1903
+
"byteorder-lite",
1904
+
"byteview 0.8.0",
1905
+
"dashmap",
1906
+
"log",
1907
+
"lsm-tree 3.0.0-pre.0",
1908
+
"std-semaphore",
1909
+
"tempfile",
1910
+
"xxhash-rust",
1911
+
]
1912
+
1913
+
[[package]]
1813
1914
name = "flate2"
1814
1915
version = "1.1.2"
1815
1916
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1891
1992
"mixtrics",
1892
1993
"pin-project",
1893
1994
"serde",
1894
-
"thiserror 2.0.16",
1995
+
"thiserror 2.0.17",
1895
1996
"tokio",
1896
1997
"tracing",
1897
1998
]
···
1911
2012
"parking_lot",
1912
2013
"pin-project",
1913
2014
"serde",
1914
-
"thiserror 2.0.16",
2015
+
"thiserror 2.0.17",
1915
2016
"tokio",
1916
2017
"twox-hash",
1917
2018
]
···
1944
2045
"parking_lot",
1945
2046
"pin-project",
1946
2047
"serde",
1947
-
"thiserror 2.0.16",
2048
+
"thiserror 2.0.17",
1948
2049
"tokio",
1949
2050
"tracing",
1950
2051
]
···
1976
2077
"pin-project",
1977
2078
"rand 0.9.1",
1978
2079
"serde",
1979
-
"thiserror 2.0.16",
2080
+
"thiserror 2.0.17",
1980
2081
"tokio",
1981
2082
"tracing",
1982
2083
"twox-hash",
···
2220
2321
"pest_derive",
2221
2322
"serde",
2222
2323
"serde_json",
2223
-
"thiserror 2.0.16",
2324
+
"thiserror 2.0.17",
2224
2325
"walkdir",
2225
2326
]
2226
2327
···
2345
2446
"once_cell",
2346
2447
"rand 0.9.1",
2347
2448
"ring",
2348
-
"thiserror 2.0.16",
2449
+
"thiserror 2.0.17",
2349
2450
"tinyvec",
2350
2451
"tokio",
2351
2452
"tracing",
···
2368
2469
"rand 0.9.1",
2369
2470
"resolv-conf",
2370
2471
"smallvec",
2371
-
"thiserror 2.0.16",
2472
+
"thiserror 2.0.17",
2372
2473
"tokio",
2373
2474
"tracing",
2374
2475
]
···
2800
2901
]
2801
2902
2802
2903
[[package]]
2904
+
name = "iroh-car"
2905
+
version = "0.5.1"
2906
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2907
+
checksum = "cb7f8cd4cb9aa083fba8b52e921764252d0b4dcb1cd6d120b809dbfe1106e81a"
2908
+
dependencies = [
2909
+
"anyhow",
2910
+
"cid",
2911
+
"futures",
2912
+
"serde",
2913
+
"serde_ipld_dagcbor",
2914
+
"thiserror 1.0.69",
2915
+
"tokio",
2916
+
"unsigned-varint 0.7.2",
2917
+
]
2918
+
2919
+
[[package]]
2803
2920
name = "is-terminal"
2804
2921
version = "0.4.16"
2805
2922
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2863
2980
"metrics",
2864
2981
"serde",
2865
2982
"serde_json",
2866
-
"thiserror 2.0.16",
2983
+
"thiserror 2.0.17",
2867
2984
"tokio",
2868
2985
"tokio-tungstenite 0.26.2",
2869
2986
"url",
···
3045
3162
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
3046
3163
dependencies = [
3047
3164
"cfg-if",
3048
-
"windows-targets 0.48.5",
3165
+
"windows-targets 0.52.6",
3049
3166
]
3050
3167
3051
3168
[[package]]
···
3116
3233
version = "0.1.0"
3117
3234
dependencies = [
3118
3235
"anyhow",
3236
+
"dasl",
3119
3237
"fluent-uri",
3120
3238
"nom",
3121
-
"thiserror 2.0.16",
3239
+
"serde",
3240
+
"thiserror 2.0.17",
3122
3241
"tinyjson",
3123
3242
]
3124
3243
···
3186
3305
3187
3306
[[package]]
3188
3307
name = "lsm-tree"
3189
-
version = "2.10.2"
3308
+
version = "2.10.4"
3190
3309
source = "registry+https://github.com/rust-lang/crates.io-index"
3191
-
checksum = "55b6d7475a8dd22e749186968daacf8e2a77932b061b1bd263157987bbfc0c6c"
3310
+
checksum = "799399117a2bfb37660e08be33f470958babb98386b04185288d829df362ea15"
3192
3311
dependencies = [
3193
3312
"byteorder",
3194
3313
"crossbeam-skiplist",
···
3209
3328
]
3210
3329
3211
3330
[[package]]
3331
+
name = "lsm-tree"
3332
+
version = "3.0.0-pre.0"
3333
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3334
+
checksum = "be375d45e348328e78582dffbda4f1709dd52fca27c1a81c7bf6ca134e6335f7"
3335
+
dependencies = [
3336
+
"byteorder-lite",
3337
+
"byteview 0.8.0",
3338
+
"crossbeam-skiplist",
3339
+
"enum_dispatch",
3340
+
"interval-heap",
3341
+
"log",
3342
+
"lz4_flex",
3343
+
"quick_cache",
3344
+
"rustc-hash 2.1.1",
3345
+
"self_cell",
3346
+
"sfa",
3347
+
"tempfile",
3348
+
"varint-rs",
3349
+
"xxhash-rust",
3350
+
]
3351
+
3352
+
[[package]]
3212
3353
name = "lz4"
3213
3354
version = "1.28.1"
3214
3355
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3229
3370
3230
3371
[[package]]
3231
3372
name = "lz4_flex"
3232
-
version = "0.11.3"
3373
+
version = "0.11.5"
3233
3374
source = "registry+https://github.com/rust-lang/crates.io-index"
3234
-
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
3375
+
checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a"
3235
3376
3236
3377
[[package]]
3237
3378
name = "mach2"
···
3297
3438
]
3298
3439
3299
3440
[[package]]
3441
+
name = "match-lookup"
3442
+
version = "0.1.1"
3443
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3444
+
checksum = "1265724d8cb29dbbc2b0f06fffb8bf1a8c0cf73a78eede9ba73a4a66c52a981e"
3445
+
dependencies = [
3446
+
"proc-macro2",
3447
+
"quote",
3448
+
"syn 1.0.109",
3449
+
]
3450
+
3451
+
[[package]]
3300
3452
name = "match_cfg"
3301
3453
version = "0.1.0"
3302
3454
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3384
3536
"metrics",
3385
3537
"metrics-util 0.20.0",
3386
3538
"quanta",
3387
-
"thiserror 2.0.16",
3539
+
"thiserror 2.0.17",
3388
3540
"tokio",
3389
3541
"tracing",
3390
3542
]
···
3531
3683
3532
3684
[[package]]
3533
3685
name = "multibase"
3534
-
version = "0.9.1"
3686
+
version = "0.9.2"
3535
3687
source = "registry+https://github.com/rust-lang/crates.io-index"
3536
-
checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404"
3688
+
checksum = "8694bb4835f452b0e3bb06dbebb1d6fc5385b6ca1caf2e55fd165c042390ec77"
3537
3689
dependencies = [
3538
3690
"base-x",
3691
+
"base256emoji",
3539
3692
"data-encoding",
3540
3693
"data-encoding-macro",
3541
3694
]
···
3548
3701
dependencies = [
3549
3702
"core2",
3550
3703
"serde",
3551
-
"unsigned-varint",
3704
+
"unsigned-varint 0.8.0",
3552
3705
]
3553
3706
3554
3707
[[package]]
···
3926
4079
checksum = "1db05f56d34358a8b1066f67cbb203ee3e7ed2ba674a6263a1d5ec6db2204323"
3927
4080
dependencies = [
3928
4081
"memchr",
3929
-
"thiserror 2.0.16",
4082
+
"thiserror 2.0.17",
3930
4083
"ucd-trie",
3931
4084
]
3932
4085
···
4036
4189
"rusqlite",
4037
4190
"serde",
4038
4191
"serde_json",
4039
-
"thiserror 2.0.16",
4192
+
"thiserror 2.0.17",
4040
4193
"tokio",
4041
4194
"tracing-subscriber",
4042
4195
]
···
4079
4232
"smallvec",
4080
4233
"sync_wrapper",
4081
4234
"tempfile",
4082
-
"thiserror 2.0.16",
4235
+
"thiserror 2.0.17",
4083
4236
"tokio",
4084
4237
"tokio-rustls 0.26.2",
4085
4238
"tokio-stream",
···
4123
4276
"serde_json",
4124
4277
"serde_urlencoded",
4125
4278
"serde_yaml",
4126
-
"thiserror 2.0.16",
4279
+
"thiserror 2.0.17",
4127
4280
"tokio",
4128
4281
]
4129
4282
···
4142
4295
"quote",
4143
4296
"regex",
4144
4297
"syn 2.0.106",
4145
-
"thiserror 2.0.16",
4298
+
"thiserror 2.0.17",
4146
4299
]
4147
4300
4148
4301
[[package]]
···
4269
4422
4270
4423
[[package]]
4271
4424
name = "quick_cache"
4272
-
version = "0.6.12"
4425
+
version = "0.6.16"
4273
4426
source = "registry+https://github.com/rust-lang/crates.io-index"
4274
-
checksum = "8f8ed0655cbaf18a26966142ad23b95d8ab47221c50c4f73a1db7d0d2d6e3da8"
4427
+
checksum = "9ad6644cb07b7f3488b9f3d2fde3b4c0a7fa367cafefb39dff93a659f76eb786"
4275
4428
dependencies = [
4276
4429
"equivalent",
4277
4430
"hashbrown 0.15.2",
···
4291
4444
"rustc-hash 2.1.1",
4292
4445
"rustls 0.23.31",
4293
4446
"socket2 0.5.9",
4294
-
"thiserror 2.0.16",
4447
+
"thiserror 2.0.17",
4295
4448
"tokio",
4296
4449
"tracing",
4297
4450
"web-time",
···
4312
4465
"rustls 0.23.31",
4313
4466
"rustls-pki-types",
4314
4467
"slab",
4315
-
"thiserror 2.0.16",
4468
+
"thiserror 2.0.17",
4316
4469
"tinyvec",
4317
4470
"tracing",
4318
4471
"web-time",
···
4538
4691
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
4539
4692
4540
4693
[[package]]
4694
+
name = "repo-stream"
4695
+
version = "0.2.2"
4696
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4697
+
checksum = "093b48e604c138949bf3d4a1a9bc1165feb1db28a73af0101c84eb703d279f43"
4698
+
dependencies = [
4699
+
"bincode 2.0.1",
4700
+
"futures",
4701
+
"futures-core",
4702
+
"ipld-core",
4703
+
"iroh-car",
4704
+
"log",
4705
+
"multibase",
4706
+
"rusqlite",
4707
+
"serde",
4708
+
"serde_bytes",
4709
+
"serde_ipld_dagcbor",
4710
+
"sha2",
4711
+
"thiserror 2.0.17",
4712
+
"tokio",
4713
+
]
4714
+
4715
+
[[package]]
4541
4716
name = "reqwest"
4542
-
version = "0.12.22"
4717
+
version = "0.12.24"
4543
4718
source = "registry+https://github.com/rust-lang/crates.io-index"
4544
-
checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531"
4719
+
checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f"
4545
4720
dependencies = [
4546
4721
"async-compression",
4547
4722
"base64 0.22.1",
···
4581
4756
"url",
4582
4757
"wasm-bindgen",
4583
4758
"wasm-bindgen-futures",
4759
+
"wasm-streams",
4584
4760
"web-sys",
4585
4761
]
4586
4762
···
4962
5138
4963
5139
[[package]]
4964
5140
name = "self_cell"
4965
-
version = "1.1.0"
5141
+
version = "1.2.0"
4966
5142
source = "registry+https://github.com/rust-lang/crates.io-index"
4967
-
checksum = "c2fdfc24bc566f839a2da4c4295b82db7d25a24253867d5c64355abb5799bdbe"
5143
+
checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749"
4968
5144
4969
5145
[[package]]
4970
5146
name = "semver"
···
4984
5160
4985
5161
[[package]]
4986
5162
name = "serde_bytes"
4987
-
version = "0.11.17"
5163
+
version = "0.11.19"
4988
5164
source = "registry+https://github.com/rust-lang/crates.io-index"
4989
-
checksum = "8437fd221bde2d4ca316d61b90e337e9e702b3820b87d63caa9ba6c02bd06d96"
5165
+
checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8"
4990
5166
dependencies = [
4991
5167
"serde",
5168
+
"serde_core",
4992
5169
]
4993
5170
4994
5171
[[package]]
···
5036
5213
]
5037
5214
5038
5215
[[package]]
5216
+
name = "serde_ipld_dagcbor"
5217
+
version = "0.6.4"
5218
+
source = "registry+https://github.com/rust-lang/crates.io-index"
5219
+
checksum = "46182f4f08349a02b45c998ba3215d3f9de826246ba02bb9dddfe9a2a2100778"
5220
+
dependencies = [
5221
+
"cbor4ii 0.2.14",
5222
+
"ipld-core",
5223
+
"scopeguard",
5224
+
"serde",
5225
+
]
5226
+
5227
+
[[package]]
5039
5228
name = "serde_json"
5040
5229
version = "1.0.145"
5041
5230
source = "registry+https://github.com/rust-lang/crates.io-index"
···
5068
5257
"percent-encoding",
5069
5258
"ryu",
5070
5259
"serde",
5071
-
"thiserror 2.0.16",
5260
+
"thiserror 2.0.17",
5072
5261
]
5073
5262
5074
5263
[[package]]
···
5157
5346
]
5158
5347
5159
5348
[[package]]
5349
+
name = "sfa"
5350
+
version = "0.0.1"
5351
+
source = "registry+https://github.com/rust-lang/crates.io-index"
5352
+
checksum = "e5f5f9dc21f55409f15103d5a7e7601b804935923c7fe4746dc806c3a422a038"
5353
+
dependencies = [
5354
+
"byteorder-lite",
5355
+
"log",
5356
+
"xxhash-rust",
5357
+
]
5358
+
5359
+
[[package]]
5160
5360
name = "sha1"
5161
5361
version = "0.10.6"
5162
5362
source = "registry+https://github.com/rust-lang/crates.io-index"
···
5220
5420
dependencies = [
5221
5421
"num-bigint",
5222
5422
"num-traits",
5223
-
"thiserror 2.0.16",
5423
+
"thiserror 2.0.17",
5224
5424
"time",
5225
5425
]
5226
5426
···
5262
5462
"rustls 0.23.31",
5263
5463
"serde",
5264
5464
"serde_json",
5265
-
"thiserror 2.0.16",
5465
+
"thiserror 2.0.17",
5266
5466
"time",
5267
5467
"tokio",
5268
5468
"tokio-util",
···
5355
5555
name = "spacedust"
5356
5556
version = "0.1.0"
5357
5557
dependencies = [
5558
+
"anyhow",
5559
+
"async-channel",
5358
5560
"async-trait",
5359
5561
"clap",
5360
5562
"ctrlc",
5563
+
"dasl",
5361
5564
"dropshot",
5362
5565
"env_logger",
5566
+
"fjall 3.0.0-pre.0",
5363
5567
"futures",
5364
5568
"http",
5569
+
"ipld-core",
5365
5570
"jetstream",
5366
5571
"links",
5367
5572
"log",
5368
5573
"metrics",
5369
5574
"metrics-exporter-prometheus 0.17.2",
5370
5575
"rand 0.9.1",
5576
+
"repo-stream",
5577
+
"reqwest",
5371
5578
"schemars",
5372
5579
"semver",
5373
5580
"serde",
5581
+
"serde_ipld_dagcbor",
5374
5582
"serde_json",
5375
5583
"serde_qs",
5376
-
"thiserror 2.0.16",
5584
+
"thiserror 2.0.17",
5377
5585
"tinyjson",
5378
5586
"tokio",
5379
5587
"tokio-tungstenite 0.27.0",
···
5506
5714
5507
5715
[[package]]
5508
5716
name = "tempfile"
5509
-
version = "3.19.1"
5717
+
version = "3.23.0"
5510
5718
source = "registry+https://github.com/rust-lang/crates.io-index"
5511
-
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
5719
+
checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16"
5512
5720
dependencies = [
5513
5721
"fastrand",
5514
5722
"getrandom 0.3.3",
···
5539
5747
5540
5748
[[package]]
5541
5749
name = "thiserror"
5542
-
version = "2.0.16"
5750
+
version = "2.0.17"
5543
5751
source = "registry+https://github.com/rust-lang/crates.io-index"
5544
-
checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0"
5752
+
checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
5545
5753
dependencies = [
5546
-
"thiserror-impl 2.0.16",
5754
+
"thiserror-impl 2.0.17",
5547
5755
]
5548
5756
5549
5757
[[package]]
···
5559
5767
5560
5768
[[package]]
5561
5769
name = "thiserror-impl"
5562
-
version = "2.0.16"
5770
+
version = "2.0.17"
5563
5771
source = "registry+https://github.com/rust-lang/crates.io-index"
5564
-
checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960"
5772
+
checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
5565
5773
dependencies = [
5566
5774
"proc-macro2",
5567
5775
"quote",
···
5993
6201
"native-tls",
5994
6202
"rand 0.9.1",
5995
6203
"sha1",
5996
-
"thiserror 2.0.16",
6204
+
"thiserror 2.0.17",
5997
6205
"url",
5998
6206
"utf-8",
5999
6207
]
···
6011
6219
"log",
6012
6220
"rand 0.9.1",
6013
6221
"sha1",
6014
-
"thiserror 2.0.16",
6222
+
"thiserror 2.0.17",
6015
6223
"utf-8",
6016
6224
]
6017
6225
···
6054
6262
"http",
6055
6263
"jetstream",
6056
6264
"log",
6057
-
"lsm-tree",
6265
+
"lsm-tree 2.10.4",
6058
6266
"metrics",
6059
6267
"metrics-exporter-prometheus 0.17.2",
6060
6268
"schemars",
···
6064
6272
"serde_qs",
6065
6273
"sha2",
6066
6274
"tempfile",
6067
-
"thiserror 2.0.16",
6275
+
"thiserror 2.0.17",
6068
6276
"tikv-jemallocator",
6069
6277
"tokio",
6070
6278
"tokio-util",
···
6117
6325
6118
6326
[[package]]
6119
6327
name = "unsigned-varint"
6328
+
version = "0.7.2"
6329
+
source = "registry+https://github.com/rust-lang/crates.io-index"
6330
+
checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105"
6331
+
6332
+
[[package]]
6333
+
name = "unsigned-varint"
6120
6334
version = "0.8.0"
6121
6335
source = "registry+https://github.com/rust-lang/crates.io-index"
6122
6336
checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06"
···
6193
6407
checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c"
6194
6408
dependencies = [
6195
6409
"byteorder",
6196
-
"byteview",
6410
+
"byteview 0.6.1",
6197
6411
"interval-heap",
6198
6412
"log",
6199
6413
"path-absolutize",
···
6342
6556
]
6343
6557
6344
6558
[[package]]
6559
+
name = "wasm-streams"
6560
+
version = "0.4.2"
6561
+
source = "registry+https://github.com/rust-lang/crates.io-index"
6562
+
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
6563
+
dependencies = [
6564
+
"futures-util",
6565
+
"js-sys",
6566
+
"wasm-bindgen",
6567
+
"wasm-bindgen-futures",
6568
+
"web-sys",
6569
+
]
6570
+
6571
+
[[package]]
6345
6572
name = "web-sys"
6346
6573
version = "0.3.77"
6347
6574
source = "registry+https://github.com/rust-lang/crates.io-index"
···
6400
6627
"reqwest",
6401
6628
"serde",
6402
6629
"serde_json",
6403
-
"thiserror 2.0.16",
6630
+
"thiserror 2.0.17",
6404
6631
"tokio",
6405
6632
"tokio-util",
6406
6633
"url",
···
6440
6667
source = "registry+https://github.com/rust-lang/crates.io-index"
6441
6668
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
6442
6669
dependencies = [
6443
-
"windows-sys 0.48.0",
6670
+
"windows-sys 0.59.0",
6444
6671
]
6445
6672
6446
6673
[[package]]
···
6758
6985
"nom",
6759
6986
"oid-registry",
6760
6987
"rusticata-macros",
6761
-
"thiserror 2.0.16",
6988
+
"thiserror 2.0.17",
6762
6989
"time",
6763
6990
]
6764
6991
+29
-10
constellation/src/bin/main.rs
+29
-10
constellation/src/bin/main.rs
···
54
54
/// Saved jsonl from jetstream to use instead of a live subscription
55
55
#[arg(short, long)]
56
56
fixture: Option<PathBuf>,
57
+
/// run a scan across the target id table and write all key -> ids to id -> keys
58
+
#[arg(long, action)]
59
+
repair_target_ids: bool,
57
60
}
58
61
59
62
#[derive(Debug, Clone, ValueEnum)]
···
115
118
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
116
119
}
117
120
println!("rocks ready.");
118
-
run(
119
-
rocks,
120
-
fixture,
121
-
args.data,
122
-
stream,
123
-
bind,
124
-
metrics_bind,
125
-
stay_alive,
126
-
)
121
+
std::thread::scope(|s| {
122
+
if args.repair_target_ids {
123
+
let rocks = rocks.clone();
124
+
let stay_alive = stay_alive.clone();
125
+
s.spawn(move || {
126
+
let rep = rocks.run_repair(time::Duration::from_millis(0), stay_alive);
127
+
eprintln!("repair finished: {rep:?}");
128
+
rep
129
+
});
130
+
}
131
+
s.spawn(|| {
132
+
let r = run(
133
+
rocks,
134
+
fixture,
135
+
args.data,
136
+
stream,
137
+
bind,
138
+
metrics_bind,
139
+
stay_alive,
140
+
);
141
+
eprintln!("run finished: {r:?}");
142
+
r
143
+
});
144
+
});
145
+
Ok(())
127
146
}
128
147
}
129
148
}
···
213
232
214
233
'monitor: loop {
215
234
match readable.get_stats() {
216
-
Ok(StorageStats { dids, targetables, linking_records }) => {
235
+
Ok(StorageStats { dids, targetables, linking_records, .. }) => {
217
236
metrics::gauge!("storage.stats.dids").set(dids as f64);
218
237
metrics::gauge!("storage.stats.targetables").set(targetables as f64);
219
238
metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
+8
-6
constellation/src/server/filters.rs
+8
-6
constellation/src/server/filters.rs
···
5
5
Ok({
6
6
if let Some(link) = parse_any_link(s) {
7
7
match link {
8
-
Link::AtUri(at_uri) => at_uri.strip_prefix("at://").map(|noproto| {
9
-
format!("https://atproto-browser-plus-links.vercel.app/at/{noproto}")
10
-
}),
11
-
Link::Did(did) => Some(format!(
12
-
"https://atproto-browser-plus-links.vercel.app/at/{did}"
13
-
)),
8
+
Link::AtUri(at_uri) => at_uri
9
+
.strip_prefix("at://")
10
+
.map(|noproto| format!("https://pdsls.dev/at://{noproto}")),
11
+
Link::Did(did) => Some(format!("https://pdsls.dev/at://{did}")),
14
12
Link::Uri(uri) => Some(uri),
15
13
}
16
14
} else {
···
22
20
pub fn human_number(n: &u64) -> askama::Result<String> {
23
21
Ok(n.to_formatted_string(&Locale::en))
24
22
}
23
+
24
+
pub fn to_u64(n: usize) -> askama::Result<u64> {
25
+
Ok(n as u64)
26
+
}
+289
-18
constellation/src/server/mod.rs
+289
-18
constellation/src/server/mod.rs
···
14
14
use std::collections::{HashMap, HashSet};
15
15
use std::time::{Duration, UNIX_EPOCH};
16
16
use tokio::net::{TcpListener, ToSocketAddrs};
17
-
use tokio::task::block_in_place;
17
+
use tokio::task::spawn_blocking;
18
18
use tokio_util::sync::CancellationToken;
19
19
20
20
use crate::storage::{LinkReader, StorageStats};
···
28
28
const DEFAULT_CURSOR_LIMIT: u64 = 16;
29
29
const DEFAULT_CURSOR_LIMIT_MAX: u64 = 100;
30
30
31
-
const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this
31
+
fn get_default_cursor_limit() -> u64 {
32
+
DEFAULT_CURSOR_LIMIT
33
+
}
34
+
35
+
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
36
+
eprintln!("handler error: {e}");
37
+
http::StatusCode::INTERNAL_SERVER_ERROR
38
+
}
32
39
33
40
pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()>
34
41
where
···
41
48
"/",
42
49
get({
43
50
let store = store.clone();
44
-
move |accept| async { block_in_place(|| hello(accept, store)) }
51
+
move |accept| async {
52
+
spawn_blocking(|| hello(accept, store))
53
+
.await
54
+
.map_err(to500)?
55
+
}
56
+
}),
57
+
)
58
+
.route(
59
+
"/xrpc/blue.microcosm.links.getManyToManyCounts",
60
+
get({
61
+
let store = store.clone();
62
+
move |accept, query| async {
63
+
spawn_blocking(|| get_many_to_many_counts(accept, query, store))
64
+
.await
65
+
.map_err(to500)?
66
+
}
45
67
}),
46
68
)
47
69
.route(
48
70
"/links/count",
49
71
get({
50
72
let store = store.clone();
51
-
move |accept, query| async { block_in_place(|| count_links(accept, query, store)) }
73
+
move |accept, query| async {
74
+
spawn_blocking(|| count_links(accept, query, store))
75
+
.await
76
+
.map_err(to500)?
77
+
}
52
78
}),
53
79
)
54
80
.route(
···
56
82
get({
57
83
let store = store.clone();
58
84
move |accept, query| async {
59
-
block_in_place(|| count_distinct_dids(accept, query, store))
85
+
spawn_blocking(|| count_distinct_dids(accept, query, store))
86
+
.await
87
+
.map_err(to500)?
88
+
}
89
+
}),
90
+
)
91
+
.route(
92
+
"/xrpc/blue.microcosm.links.getBacklinks",
93
+
get({
94
+
let store = store.clone();
95
+
move |accept, query| async {
96
+
spawn_blocking(|| get_backlinks(accept, query, store))
97
+
.await
98
+
.map_err(to500)?
60
99
}
61
100
}),
62
101
)
···
64
103
"/links",
65
104
get({
66
105
let store = store.clone();
67
-
move |accept, query| async { block_in_place(|| get_links(accept, query, store)) }
106
+
move |accept, query| async {
107
+
spawn_blocking(|| get_links(accept, query, store))
108
+
.await
109
+
.map_err(to500)?
110
+
}
68
111
}),
69
112
)
70
113
.route(
···
72
115
get({
73
116
let store = store.clone();
74
117
move |accept, query| async {
75
-
block_in_place(|| get_distinct_dids(accept, query, store))
118
+
spawn_blocking(|| get_distinct_dids(accept, query, store))
119
+
.await
120
+
.map_err(to500)?
76
121
}
77
122
}),
78
123
)
···
82
127
get({
83
128
let store = store.clone();
84
129
move |accept, query| async {
85
-
block_in_place(|| count_all_links(accept, query, store))
130
+
spawn_blocking(|| count_all_links(accept, query, store))
131
+
.await
132
+
.map_err(to500)?
86
133
}
87
134
}),
88
135
)
···
91
138
get({
92
139
let store = store.clone();
93
140
move |accept, query| async {
94
-
block_in_place(|| explore_links(accept, query, store))
141
+
spawn_blocking(|| explore_links(accept, query, store))
142
+
.await
143
+
.map_err(to500)?
95
144
}
96
145
}),
97
146
)
···
150
199
#[template(path = "hello.html.j2")]
151
200
struct HelloReponse {
152
201
help: &'static str,
153
-
days_indexed: u64,
202
+
days_indexed: Option<u64>,
154
203
stats: StorageStats,
155
204
}
156
205
fn hello(
···
160
209
let stats = store
161
210
.get_stats()
162
211
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
163
-
let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS))
164
-
.elapsed()
212
+
let days_indexed = stats
213
+
.started_at
214
+
.map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed())
215
+
.transpose()
165
216
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
166
-
.as_secs()
167
-
/ 86400;
217
+
.map(|d| d.as_secs() / 86_400);
168
218
Ok(acceptable(accept, HelloReponse {
169
219
help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
170
220
days_indexed,
···
173
223
}
174
224
175
225
#[derive(Clone, Deserialize)]
226
+
#[serde(rename_all = "camelCase")]
227
+
struct GetManyToManyCountsQuery {
228
+
subject: String,
229
+
source: String,
230
+
/// path to the secondary link in the linking record
231
+
path_to_other: String,
232
+
/// filter to linking records (join of the m2m) by these DIDs
233
+
#[serde(default)]
234
+
did: Vec<String>,
235
+
/// filter to specific secondary records
236
+
#[serde(default)]
237
+
other_subject: Vec<String>,
238
+
cursor: Option<OpaqueApiCursor>,
239
+
/// Set the max number of links to return per page of results
240
+
#[serde(default = "get_default_cursor_limit")]
241
+
limit: u64,
242
+
}
243
+
#[derive(Serialize)]
244
+
struct OtherSubjectCount {
245
+
subject: String,
246
+
total: u64,
247
+
distinct: u64,
248
+
}
249
+
#[derive(Template, Serialize)]
250
+
#[template(path = "get-many-to-many-counts.html.j2")]
251
+
struct GetManyToManyCountsResponse {
252
+
counts_by_other_subject: Vec<OtherSubjectCount>,
253
+
cursor: Option<OpaqueApiCursor>,
254
+
#[serde(skip_serializing)]
255
+
query: GetManyToManyCountsQuery,
256
+
}
257
+
fn get_many_to_many_counts(
258
+
accept: ExtractAccept,
259
+
query: axum_extra::extract::Query<GetManyToManyCountsQuery>,
260
+
store: impl LinkReader,
261
+
) -> Result<impl IntoResponse, http::StatusCode> {
262
+
let cursor_key = query
263
+
.cursor
264
+
.clone()
265
+
.map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
266
+
.transpose()?
267
+
.map(|c| c.next);
268
+
269
+
let limit = query.limit;
270
+
if limit > DEFAULT_CURSOR_LIMIT_MAX {
271
+
return Err(http::StatusCode::BAD_REQUEST);
272
+
}
273
+
274
+
let filter_dids: HashSet<Did> = HashSet::from_iter(
275
+
query
276
+
.did
277
+
.iter()
278
+
.map(|d| d.trim())
279
+
.filter(|d| !d.is_empty())
280
+
.map(|d| Did(d.to_string())),
281
+
);
282
+
283
+
let filter_other_subjects: HashSet<String> = HashSet::from_iter(
284
+
query
285
+
.other_subject
286
+
.iter()
287
+
.map(|s| s.trim().to_string())
288
+
.filter(|s| !s.is_empty()),
289
+
);
290
+
291
+
let Some((collection, path)) = query.source.split_once(':') else {
292
+
return Err(http::StatusCode::BAD_REQUEST);
293
+
};
294
+
let path = format!(".{path}");
295
+
296
+
let path_to_other = format!(".{}", query.path_to_other);
297
+
298
+
let paged = store
299
+
.get_many_to_many_counts(
300
+
&query.subject,
301
+
collection,
302
+
&path,
303
+
&path_to_other,
304
+
limit,
305
+
cursor_key,
306
+
&filter_dids,
307
+
&filter_other_subjects,
308
+
)
309
+
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
310
+
311
+
let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into());
312
+
313
+
let items = paged
314
+
.items
315
+
.into_iter()
316
+
.map(|(subject, total, distinct)| OtherSubjectCount {
317
+
subject,
318
+
total,
319
+
distinct,
320
+
})
321
+
.collect();
322
+
323
+
Ok(acceptable(
324
+
accept,
325
+
GetManyToManyCountsResponse {
326
+
counts_by_other_subject: items,
327
+
cursor,
328
+
query: (*query).clone(),
329
+
},
330
+
))
331
+
}
332
+
333
+
#[derive(Clone, Deserialize)]
176
334
struct GetLinksCountQuery {
177
335
target: String,
178
336
collection: String,
···
233
391
}
234
392
235
393
#[derive(Clone, Deserialize)]
394
+
struct GetBacklinksQuery {
395
+
/// The link target
396
+
///
397
+
/// can be an AT-URI, plain DID, or regular URI
398
+
subject: String,
399
+
/// Filter links only from this link source
400
+
///
401
+
/// eg.: `app.bsky.feed.like:subject.uri`
402
+
source: String,
403
+
cursor: Option<OpaqueApiCursor>,
404
+
/// Filter links only from these DIDs
405
+
///
406
+
/// include multiple times to filter by multiple source DIDs
407
+
#[serde(default)]
408
+
did: Vec<String>,
409
+
/// Set the max number of links to return per page of results
410
+
#[serde(default = "get_default_cursor_limit")]
411
+
limit: u64,
412
+
// TODO: allow reverse (er, forward) order as well
413
+
}
414
+
#[derive(Template, Serialize)]
415
+
#[template(path = "get-backlinks.html.j2")]
416
+
struct GetBacklinksResponse {
417
+
total: u64,
418
+
records: Vec<RecordId>,
419
+
cursor: Option<OpaqueApiCursor>,
420
+
#[serde(skip_serializing)]
421
+
query: GetBacklinksQuery,
422
+
#[serde(skip_serializing)]
423
+
collection: String,
424
+
#[serde(skip_serializing)]
425
+
path: String,
426
+
}
427
+
fn get_backlinks(
428
+
accept: ExtractAccept,
429
+
query: axum_extra::extract::Query<GetBacklinksQuery>, // supports multiple param occurrences
430
+
store: impl LinkReader,
431
+
) -> Result<impl IntoResponse, http::StatusCode> {
432
+
let until = query
433
+
.cursor
434
+
.clone()
435
+
.map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST))
436
+
.transpose()?
437
+
.map(|c| c.next);
438
+
439
+
let limit = query.limit;
440
+
if limit > DEFAULT_CURSOR_LIMIT_MAX {
441
+
return Err(http::StatusCode::BAD_REQUEST);
442
+
}
443
+
444
+
let filter_dids: HashSet<Did> = HashSet::from_iter(
445
+
query
446
+
.did
447
+
.iter()
448
+
.map(|d| d.trim())
449
+
.filter(|d| !d.is_empty())
450
+
.map(|d| Did(d.to_string())),
451
+
);
452
+
453
+
let Some((collection, path)) = query.source.split_once(':') else {
454
+
return Err(http::StatusCode::BAD_REQUEST);
455
+
};
456
+
let path = format!(".{path}");
457
+
458
+
let paged = store
459
+
.get_links(
460
+
&query.subject,
461
+
collection,
462
+
&path,
463
+
limit,
464
+
until,
465
+
&filter_dids,
466
+
)
467
+
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
468
+
469
+
let cursor = paged.next.map(|next| {
470
+
ApiCursor {
471
+
version: paged.version,
472
+
next,
473
+
}
474
+
.into()
475
+
});
476
+
477
+
Ok(acceptable(
478
+
accept,
479
+
GetBacklinksResponse {
480
+
total: paged.total,
481
+
records: paged.items,
482
+
cursor,
483
+
query: (*query).clone(),
484
+
collection: collection.to_string(),
485
+
path,
486
+
},
487
+
))
488
+
}
489
+
490
+
#[derive(Clone, Deserialize)]
236
491
struct GetLinkItemsQuery {
237
492
target: String,
238
493
collection: String,
···
251
506
///
252
507
/// deprecated: use `did`, which can be repeated multiple times
253
508
from_dids: Option<String>, // comma separated: gross
254
-
#[serde(default = "get_default_limit")]
509
+
#[serde(default = "get_default_cursor_limit")]
255
510
limit: u64,
256
511
// TODO: allow reverse (er, forward) order as well
257
-
}
258
-
fn get_default_limit() -> u64 {
259
-
DEFAULT_CURSOR_LIMIT
260
512
}
261
513
#[derive(Template, Serialize)]
262
514
#[template(path = "links.html.j2")]
···
475
727
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
476
728
}
477
729
}
730
+
731
+
#[derive(Serialize, Deserialize)] // for bincode
732
+
struct ApiKeyedCursor {
733
+
next: String, // the key
734
+
}
735
+
736
+
impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor {
737
+
type Error = bincode::Error;
738
+
739
+
fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> {
740
+
bincode::DefaultOptions::new().deserialize(&item.0)
741
+
}
742
+
}
743
+
744
+
impl From<ApiKeyedCursor> for OpaqueApiCursor {
745
+
fn from(item: ApiKeyedCursor) -> Self {
746
+
OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap())
747
+
}
748
+
}
+78
-1
constellation/src/storage/mem_store.rs
+78
-1
constellation/src/storage/mem_store.rs
···
1
-
use super::{LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
1
+
use super::{
2
+
LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection, StorageStats,
3
+
};
2
4
use crate::{ActionableEvent, CountsByCount, Did, RecordId};
3
5
use anyhow::Result;
4
6
use links::CollectedLink;
···
132
134
}
133
135
134
136
impl LinkReader for MemStorage {
137
+
fn get_many_to_many_counts(
138
+
&self,
139
+
target: &str,
140
+
collection: &str,
141
+
path: &str,
142
+
path_to_other: &str,
143
+
limit: u64,
144
+
after: Option<String>,
145
+
filter_dids: &HashSet<Did>,
146
+
filter_to_targets: &HashSet<String>,
147
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
148
+
let data = self.0.lock().unwrap();
149
+
let Some(paths) = data.targets.get(&Target::new(target)) else {
150
+
return Ok(PagedOrderedCollection::default());
151
+
};
152
+
let Some(linkers) = paths.get(&Source::new(collection, path)) else {
153
+
return Ok(PagedOrderedCollection::default());
154
+
};
155
+
156
+
let path_to_other = RecordPath::new(path_to_other);
157
+
let filter_to_targets: HashSet<Target> =
158
+
HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s)));
159
+
160
+
let mut grouped_counts: HashMap<Target, (u64, HashSet<Did>)> = HashMap::new();
161
+
for (did, rkey) in linkers.iter().flatten().cloned() {
162
+
if !filter_dids.is_empty() && !filter_dids.contains(&did) {
163
+
continue;
164
+
}
165
+
if let Some(fwd_target) = data
166
+
.links
167
+
.get(&did)
168
+
.unwrap_or(&HashMap::new())
169
+
.get(&RepoId {
170
+
collection: collection.to_string(),
171
+
rkey,
172
+
})
173
+
.unwrap_or(&Vec::new())
174
+
.iter()
175
+
.filter_map(|(path, target)| {
176
+
if *path == path_to_other
177
+
&& (filter_to_targets.is_empty() || filter_to_targets.contains(target))
178
+
{
179
+
Some(target)
180
+
} else {
181
+
None
182
+
}
183
+
})
184
+
.take(1)
185
+
.next()
186
+
{
187
+
let e = grouped_counts.entry(fwd_target.clone()).or_default();
188
+
e.0 += 1;
189
+
e.1.insert(did.clone());
190
+
}
191
+
}
192
+
let mut items: Vec<(String, u64, u64)> = grouped_counts
193
+
.iter()
194
+
.map(|(k, (n, u))| (k.0.clone(), *n, u.len() as u64))
195
+
.collect();
196
+
items.sort();
197
+
items = items
198
+
.into_iter()
199
+
.skip_while(|(t, _, _)| after.as_ref().map(|a| t <= a).unwrap_or(false))
200
+
.take(limit as usize)
201
+
.collect();
202
+
let next = if items.len() as u64 >= limit {
203
+
items.last().map(|(t, _, _)| t.clone())
204
+
} else {
205
+
None
206
+
};
207
+
Ok(PagedOrderedCollection { items, next })
208
+
}
209
+
135
210
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
136
211
let data = self.0.lock().unwrap();
137
212
let Some(paths) = data.targets.get(&Target::new(target)) else {
···
353
428
dids,
354
429
targetables,
355
430
linking_records,
431
+
started_at: None,
432
+
other_data: Default::default(),
356
433
})
357
434
}
358
435
}
+225
constellation/src/storage/mod.rs
+225
constellation/src/storage/mod.rs
···
19
19
pub total: u64,
20
20
}
21
21
22
+
/// A paged collection whose keys are sorted instead of indexed
23
+
///
24
+
/// this has weaker guarantees than PagedAppendingCollection: it might
25
+
/// return a totally consistent snapshot. but it should avoid duplicates
26
+
/// and each page should at least be internally consistent.
27
+
#[derive(Debug, PartialEq, Default)]
28
+
pub struct PagedOrderedCollection<T, K: Ord> {
29
+
pub items: Vec<T>,
30
+
pub next: Option<K>,
31
+
}
32
+
22
33
#[derive(Debug, Deserialize, Serialize, PartialEq)]
23
34
pub struct StorageStats {
24
35
/// estimate of how many accounts we've seen create links. the _subjects_ of any links are not represented here.
···
33
44
/// records with multiple links are single-counted.
34
45
/// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
35
46
pub linking_records: u64,
47
+
48
+
/// first jetstream cursor when this instance first started
49
+
pub started_at: Option<u64>,
50
+
51
+
/// anything else we want to throw in
52
+
pub other_data: HashMap<String, u64>,
36
53
}
37
54
38
55
pub trait LinkStorage: Send + Sync {
···
48
65
}
49
66
50
67
pub trait LinkReader: Clone + Send + Sync + 'static {
68
+
#[allow(clippy::too_many_arguments)]
69
+
fn get_many_to_many_counts(
70
+
&self,
71
+
target: &str,
72
+
collection: &str,
73
+
path: &str,
74
+
path_to_other: &str,
75
+
limit: u64,
76
+
after: Option<String>,
77
+
filter_dids: &HashSet<Did>,
78
+
filter_to_targets: &HashSet<String>,
79
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>>;
80
+
51
81
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
52
82
53
83
fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>;
···
1326
1356
counts
1327
1357
});
1328
1358
assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1359
+
});
1360
+
1361
+
//////// many-to-many /////////
1362
+
1363
+
test_each_storage!(get_m2m_counts_empty, |storage| {
1364
+
assert_eq!(
1365
+
storage.get_many_to_many_counts(
1366
+
"a.com",
1367
+
"a.b.c",
1368
+
".d.e",
1369
+
".f.g",
1370
+
10,
1371
+
None,
1372
+
&HashSet::new(),
1373
+
&HashSet::new(),
1374
+
)?,
1375
+
PagedOrderedCollection {
1376
+
items: vec![],
1377
+
next: None,
1378
+
}
1379
+
);
1380
+
});
1381
+
1382
+
test_each_storage!(get_m2m_counts_single, |storage| {
1383
+
storage.push(
1384
+
&ActionableEvent::CreateLinks {
1385
+
record_id: RecordId {
1386
+
did: "did:plc:asdf".into(),
1387
+
collection: "app.t.c".into(),
1388
+
rkey: "asdf".into(),
1389
+
},
1390
+
links: vec![
1391
+
CollectedLink {
1392
+
target: Link::Uri("a.com".into()),
1393
+
path: ".abc.uri".into(),
1394
+
},
1395
+
CollectedLink {
1396
+
target: Link::Uri("b.com".into()),
1397
+
path: ".def.uri".into(),
1398
+
},
1399
+
CollectedLink {
1400
+
target: Link::Uri("b.com".into()),
1401
+
path: ".ghi.uri".into(),
1402
+
},
1403
+
],
1404
+
},
1405
+
0,
1406
+
)?;
1407
+
assert_eq!(
1408
+
storage.get_many_to_many_counts(
1409
+
"a.com",
1410
+
"app.t.c",
1411
+
".abc.uri",
1412
+
".def.uri",
1413
+
10,
1414
+
None,
1415
+
&HashSet::new(),
1416
+
&HashSet::new(),
1417
+
)?,
1418
+
PagedOrderedCollection {
1419
+
items: vec![("b.com".to_string(), 1, 1)],
1420
+
next: None,
1421
+
}
1422
+
);
1423
+
});
1424
+
1425
+
test_each_storage!(get_m2m_counts_filters, |storage| {
1426
+
storage.push(
1427
+
&ActionableEvent::CreateLinks {
1428
+
record_id: RecordId {
1429
+
did: "did:plc:asdf".into(),
1430
+
collection: "app.t.c".into(),
1431
+
rkey: "asdf".into(),
1432
+
},
1433
+
links: vec![
1434
+
CollectedLink {
1435
+
target: Link::Uri("a.com".into()),
1436
+
path: ".abc.uri".into(),
1437
+
},
1438
+
CollectedLink {
1439
+
target: Link::Uri("b.com".into()),
1440
+
path: ".def.uri".into(),
1441
+
},
1442
+
],
1443
+
},
1444
+
0,
1445
+
)?;
1446
+
storage.push(
1447
+
&ActionableEvent::CreateLinks {
1448
+
record_id: RecordId {
1449
+
did: "did:plc:asdfasdf".into(),
1450
+
collection: "app.t.c".into(),
1451
+
rkey: "asdf".into(),
1452
+
},
1453
+
links: vec![
1454
+
CollectedLink {
1455
+
target: Link::Uri("a.com".into()),
1456
+
path: ".abc.uri".into(),
1457
+
},
1458
+
CollectedLink {
1459
+
target: Link::Uri("b.com".into()),
1460
+
path: ".def.uri".into(),
1461
+
},
1462
+
],
1463
+
},
1464
+
1,
1465
+
)?;
1466
+
storage.push(
1467
+
&ActionableEvent::CreateLinks {
1468
+
record_id: RecordId {
1469
+
did: "did:plc:fdsa".into(),
1470
+
collection: "app.t.c".into(),
1471
+
rkey: "asdf".into(),
1472
+
},
1473
+
links: vec![
1474
+
CollectedLink {
1475
+
target: Link::Uri("a.com".into()),
1476
+
path: ".abc.uri".into(),
1477
+
},
1478
+
CollectedLink {
1479
+
target: Link::Uri("c.com".into()),
1480
+
path: ".def.uri".into(),
1481
+
},
1482
+
],
1483
+
},
1484
+
2,
1485
+
)?;
1486
+
storage.push(
1487
+
&ActionableEvent::CreateLinks {
1488
+
record_id: RecordId {
1489
+
did: "did:plc:fdsa".into(),
1490
+
collection: "app.t.c".into(),
1491
+
rkey: "asdf2".into(),
1492
+
},
1493
+
links: vec![
1494
+
CollectedLink {
1495
+
target: Link::Uri("a.com".into()),
1496
+
path: ".abc.uri".into(),
1497
+
},
1498
+
CollectedLink {
1499
+
target: Link::Uri("c.com".into()),
1500
+
path: ".def.uri".into(),
1501
+
},
1502
+
],
1503
+
},
1504
+
3,
1505
+
)?;
1506
+
assert_eq!(
1507
+
storage.get_many_to_many_counts(
1508
+
"a.com",
1509
+
"app.t.c",
1510
+
".abc.uri",
1511
+
".def.uri",
1512
+
10,
1513
+
None,
1514
+
&HashSet::new(),
1515
+
&HashSet::new(),
1516
+
)?,
1517
+
PagedOrderedCollection {
1518
+
items: vec![("b.com".to_string(), 2, 2), ("c.com".to_string(), 2, 1),],
1519
+
next: None,
1520
+
}
1521
+
);
1522
+
assert_eq!(
1523
+
storage.get_many_to_many_counts(
1524
+
"a.com",
1525
+
"app.t.c",
1526
+
".abc.uri",
1527
+
".def.uri",
1528
+
10,
1529
+
None,
1530
+
&HashSet::from_iter([Did("did:plc:fdsa".to_string())]),
1531
+
&HashSet::new(),
1532
+
)?,
1533
+
PagedOrderedCollection {
1534
+
items: vec![("c.com".to_string(), 2, 1),],
1535
+
next: None,
1536
+
}
1537
+
);
1538
+
assert_eq!(
1539
+
storage.get_many_to_many_counts(
1540
+
"a.com",
1541
+
"app.t.c",
1542
+
".abc.uri",
1543
+
".def.uri",
1544
+
10,
1545
+
None,
1546
+
&HashSet::new(),
1547
+
&HashSet::from_iter(["b.com".to_string()]),
1548
+
)?,
1549
+
PagedOrderedCollection {
1550
+
items: vec![("b.com".to_string(), 2, 2),],
1551
+
next: None,
1552
+
}
1553
+
);
1329
1554
});
1330
1555
}
+342
-40
constellation/src/storage/rocks_store.rs
+342
-40
constellation/src/storage/rocks_store.rs
···
1
-
use super::{ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, StorageStats};
1
+
use super::{
2
+
ActionableEvent, LinkReader, LinkStorage, PagedAppendingCollection, PagedOrderedCollection,
3
+
StorageStats,
4
+
};
2
5
use crate::{CountsByCount, Did, RecordId};
3
6
use anyhow::{bail, Result};
4
7
use bincode::Options as BincodeOptions;
···
11
14
MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
12
15
};
13
16
use serde::{Deserialize, Serialize};
14
-
use std::collections::{HashMap, HashSet};
17
+
use std::collections::{BTreeMap, HashMap, HashSet};
15
18
use std::io::Read;
16
19
use std::marker::PhantomData;
17
20
use std::path::{Path, PathBuf};
···
20
23
Arc,
21
24
};
22
25
use std::thread;
23
-
use std::time::{Duration, Instant};
26
+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
27
use tokio_util::sync::CancellationToken;
25
28
26
29
static DID_IDS_CF: &str = "did_ids";
···
29
32
static LINK_TARGETS_CF: &str = "link_targets";
30
33
31
34
static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
35
+
static STARTED_AT_KEY: &str = "jetstream_first_cursor";
36
+
// add reverse mappings for targets if this db was running before that was a thing
37
+
static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state";
38
+
39
+
static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started
40
+
41
+
#[derive(Debug, Clone, Serialize, Deserialize)]
42
+
struct TargetIdRepairState {
43
+
/// start time for repair, microseconds timestamp
44
+
current_us_started_at: u64,
45
+
/// id table's latest id when repair started
46
+
id_when_started: u64,
47
+
/// id table id
48
+
latest_repaired_i: u64,
49
+
}
50
+
impl AsRocksValue for TargetIdRepairState {}
51
+
impl ValueFromRocks for TargetIdRepairState {}
32
52
33
53
// todo: actually understand and set these options probably better
34
54
fn rocks_opts_base() -> Options {
···
56
76
#[derive(Debug, Clone)]
57
77
pub struct RocksStorage {
58
78
pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun)
59
-
did_id_table: IdTable<Did, DidIdValue, true>,
60
-
target_id_table: IdTable<TargetKey, TargetId, false>,
79
+
did_id_table: IdTable<Did, DidIdValue>,
80
+
target_id_table: IdTable<TargetKey, TargetId>,
61
81
is_writer: bool,
62
82
backup_task: Arc<Option<thread::JoinHandle<Result<()>>>>,
63
83
}
···
85
105
fn cf_descriptor(&self) -> ColumnFamilyDescriptor {
86
106
ColumnFamilyDescriptor::new(&self.name, rocks_opts_base())
87
107
}
88
-
fn init<const WITH_REVERSE: bool>(
89
-
self,
90
-
db: &DBWithThreadMode<MultiThreaded>,
91
-
) -> Result<IdTable<Orig, IdVal, WITH_REVERSE>> {
108
+
fn init(self, db: &DBWithThreadMode<MultiThreaded>) -> Result<IdTable<Orig, IdVal>> {
92
109
if db.cf_handle(&self.name).is_none() {
93
110
bail!("failed to get cf handle from db -- was the db open with our .cf_descriptor()?");
94
111
}
···
119
136
}
120
137
}
121
138
#[derive(Debug, Clone)]
122
-
struct IdTable<Orig, IdVal: IdTableValue, const WITH_REVERSE: bool>
139
+
struct IdTable<Orig, IdVal: IdTableValue>
123
140
where
124
141
Orig: KeyFromRocks,
125
142
for<'a> &'a Orig: AsRocksKey,
···
127
144
base: IdTableBase<Orig, IdVal>,
128
145
priv_id_seq: u64,
129
146
}
130
-
impl<Orig: Clone, IdVal: IdTableValue, const WITH_REVERSE: bool> IdTable<Orig, IdVal, WITH_REVERSE>
147
+
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal>
131
148
where
132
149
Orig: KeyFromRocks,
133
150
for<'v> &'v IdVal: AsRocksValue,
···
139
156
_key_marker: PhantomData,
140
157
_val_marker: PhantomData,
141
158
name: name.into(),
142
-
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1
159
+
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1
143
160
}
144
161
}
145
162
fn get_id_val(
···
178
195
id_value
179
196
}))
180
197
}
198
+
181
199
fn estimate_count(&self) -> u64 {
182
200
self.base.id_seq.load(Ordering::SeqCst) - 1 // -1 because seq zero is reserved
183
201
}
184
-
}
185
-
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, true>
186
-
where
187
-
Orig: KeyFromRocks,
188
-
for<'v> &'v IdVal: AsRocksValue,
189
-
for<'k> &'k Orig: AsRocksKey,
190
-
{
202
+
191
203
fn get_or_create_id_val(
192
204
&mut self,
193
205
db: &DBWithThreadMode<MultiThreaded>,
···
215
227
}
216
228
}
217
229
}
218
-
impl<Orig: Clone, IdVal: IdTableValue> IdTable<Orig, IdVal, false>
219
-
where
220
-
Orig: KeyFromRocks,
221
-
for<'v> &'v IdVal: AsRocksValue,
222
-
for<'k> &'k Orig: AsRocksKey,
223
-
{
224
-
fn get_or_create_id_val(
225
-
&mut self,
226
-
db: &DBWithThreadMode<MultiThreaded>,
227
-
batch: &mut WriteBatch,
228
-
orig: &Orig,
229
-
) -> Result<IdVal> {
230
-
let cf = db.cf_handle(&self.base.name).unwrap();
231
-
self.__get_or_create_id_val(&cf, db, batch, orig)
232
-
}
233
-
}
234
230
235
231
impl IdTableValue for DidIdValue {
236
232
fn new(v: u64) -> Self {
···
249
245
}
250
246
}
251
247
248
+
fn now() -> u64 {
249
+
SystemTime::now()
250
+
.duration_since(UNIX_EPOCH)
251
+
.unwrap()
252
+
.as_micros() as u64
253
+
}
254
+
252
255
impl RocksStorage {
253
256
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
254
257
Self::describe_metrics();
255
-
RocksStorage::open_readmode(path, false)
258
+
let me = RocksStorage::open_readmode(path, false)?;
259
+
me.global_init()?;
260
+
Ok(me)
256
261
}
257
262
258
263
pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
···
260
265
}
261
266
262
267
fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> {
263
-
let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF);
264
-
let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF);
268
+
let did_id_table = IdTable::setup(DID_IDS_CF);
269
+
let target_id_table = IdTable::setup(TARGET_IDS_CF);
265
270
271
+
// note: global stuff like jetstream cursor goes in the default cf
272
+
// these are bonus extra cfs
266
273
let cfs = vec![
267
274
// id reference tables
268
275
did_id_table.cf_descriptor(),
···
296
303
is_writer: !readonly,
297
304
backup_task: None.into(),
298
305
})
306
+
}
307
+
308
+
fn global_init(&self) -> Result<()> {
309
+
let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some();
310
+
if first_run {
311
+
self.db.put(STARTED_AT_KEY, _rv(now()))?;
312
+
313
+
// hack / temporary: if we're a new db, put in a completed repair
314
+
// state so we don't run repairs (repairs are for old-code dbs)
315
+
let completed = TargetIdRepairState {
316
+
id_when_started: 0,
317
+
current_us_started_at: 0,
318
+
latest_repaired_i: 0,
319
+
};
320
+
self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?;
321
+
}
322
+
Ok(())
323
+
}
324
+
325
+
pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> {
326
+
let mut state = match self
327
+
.db
328
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
329
+
.map(|s| _vr(&s))
330
+
.transpose()?
331
+
{
332
+
Some(s) => s,
333
+
None => TargetIdRepairState {
334
+
id_when_started: self.did_id_table.priv_id_seq,
335
+
current_us_started_at: now(),
336
+
latest_repaired_i: 0,
337
+
},
338
+
};
339
+
340
+
eprintln!("initial repair state: {state:?}");
341
+
342
+
let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
343
+
344
+
let mut iter = self.db.raw_iterator_cf(&cf);
345
+
iter.seek_to_first();
346
+
347
+
eprintln!("repair iterator sent to first key");
348
+
349
+
// skip ahead if we're done some, or take a single first step
350
+
for _ in 0..state.latest_repaired_i {
351
+
iter.next();
352
+
}
353
+
354
+
eprintln!(
355
+
"repair iterator skipped to {}th key",
356
+
state.latest_repaired_i
357
+
);
358
+
359
+
let mut maybe_done = false;
360
+
361
+
let mut write_fast = rocksdb::WriteOptions::default();
362
+
write_fast.set_sync(false);
363
+
write_fast.disable_wal(true);
364
+
365
+
while !stay_alive.is_cancelled() && !maybe_done {
366
+
// let mut batch = WriteBatch::default();
367
+
368
+
let mut any_written = false;
369
+
370
+
for _ in 0..1000 {
371
+
if state.latest_repaired_i % 1_000_000 == 0 {
372
+
eprintln!("target iter at {}", state.latest_repaired_i);
373
+
}
374
+
state.latest_repaired_i += 1;
375
+
376
+
if !iter.valid() {
377
+
eprintln!("invalid iter, are we done repairing?");
378
+
maybe_done = true;
379
+
break;
380
+
};
381
+
382
+
// eprintln!("iterator seems to be valid! getting the key...");
383
+
let raw_key = iter.key().unwrap();
384
+
if raw_key.len() == 8 {
385
+
// eprintln!("found an 8-byte key, skipping it since it's probably an id...");
386
+
iter.next();
387
+
continue;
388
+
}
389
+
let target: TargetKey = _kr::<TargetKey>(raw_key)?;
390
+
let target_id: TargetId = _vr(iter.value().unwrap())?;
391
+
392
+
self.db
393
+
.put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?;
394
+
any_written = true;
395
+
iter.next();
396
+
}
397
+
398
+
if any_written {
399
+
self.db
400
+
.put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?;
401
+
std::thread::sleep(breather);
402
+
}
403
+
}
404
+
405
+
eprintln!("repair iterator done.");
406
+
407
+
Ok(false)
299
408
}
300
409
301
410
pub fn start_backup(
···
826
935
}
827
936
828
937
impl LinkReader for RocksStorage {
938
+
fn get_many_to_many_counts(
939
+
&self,
940
+
target: &str,
941
+
collection: &str,
942
+
path: &str,
943
+
path_to_other: &str,
944
+
limit: u64,
945
+
after: Option<String>,
946
+
filter_dids: &HashSet<Did>,
947
+
filter_to_targets: &HashSet<String>,
948
+
) -> Result<PagedOrderedCollection<(String, u64, u64), String>> {
949
+
let collection = Collection(collection.to_string());
950
+
let path = RPath(path.to_string());
951
+
952
+
let target_key = TargetKey(Target(target.to_string()), collection.clone(), path.clone());
953
+
954
+
// unfortunately the cursor is a, uh, stringified number.
955
+
// this was easier for the memstore (plain target, not target id), and
956
+
// making it generic is a bit awful.
957
+
// so... parse the number out of a string here :(
958
+
// TODO: this should bubble up to a BAD_REQUEST response
959
+
let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?;
960
+
961
+
let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
962
+
eprintln!("nothin doin for this target, {target_key:?}");
963
+
return Ok(Default::default());
964
+
};
965
+
966
+
let filter_did_ids: HashMap<DidId, bool> = filter_dids
967
+
.iter()
968
+
.filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose())
969
+
.collect::<Result<Vec<DidIdValue>>>()?
970
+
.into_iter()
971
+
.map(|DidIdValue(id, active)| (id, active))
972
+
.collect();
973
+
974
+
// stored targets are keyed by triples of (target, collection, path).
975
+
// target filtering only consideres the target itself, so we actually
976
+
// need to do a prefix iteration of all target ids for this target and
977
+
// keep them all.
978
+
// i *think* the number of keys at a target prefix should usually be
979
+
// pretty small, so this is hopefully fine. but if it turns out to be
980
+
// large, we can push this filtering back into the main links loop and
981
+
// do forward db queries per backlink to get the raw target back out.
982
+
let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new();
983
+
for t in filter_to_targets {
984
+
for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) {
985
+
filter_to_target_ids.insert(target_id);
986
+
}
987
+
}
988
+
989
+
let linkers = self.get_target_linkers(&target_id)?;
990
+
991
+
let mut grouped_counts: BTreeMap<TargetId, (u64, HashSet<DidId>)> = BTreeMap::new();
992
+
993
+
for (did_id, rkey) in linkers.0 {
994
+
if did_id.is_empty() {
995
+
continue;
996
+
}
997
+
998
+
if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) {
999
+
continue;
1000
+
}
1001
+
1002
+
let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey);
1003
+
let Some(targets) = self.get_record_link_targets(&record_link_key)? else {
1004
+
continue;
1005
+
};
1006
+
1007
+
let Some(fwd_target) = targets
1008
+
.0
1009
+
.into_iter()
1010
+
.filter_map(|RecordLinkTarget(rpath, target_id)| {
1011
+
if rpath.0 == path_to_other
1012
+
&& (filter_to_target_ids.is_empty()
1013
+
|| filter_to_target_ids.contains(&target_id))
1014
+
{
1015
+
Some(target_id)
1016
+
} else {
1017
+
None
1018
+
}
1019
+
})
1020
+
.take(1)
1021
+
.next()
1022
+
else {
1023
+
eprintln!("no forward match");
1024
+
continue;
1025
+
};
1026
+
1027
+
// small relief: we page over target ids, so we can already bail
1028
+
// reprocessing previous pages here
1029
+
if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) {
1030
+
continue;
1031
+
}
1032
+
1033
+
// aand we can skip target ids that must be on future pages
1034
+
// (this check continues after the did-lookup, which we have to do)
1035
+
let page_is_full = grouped_counts.len() as u64 >= limit;
1036
+
if page_is_full {
1037
+
let current_max = grouped_counts.keys().next_back().unwrap(); // limit should be non-zero bleh
1038
+
if fwd_target > *current_max {
1039
+
continue;
1040
+
}
1041
+
}
1042
+
1043
+
// bit painful: 2-step lookup to make sure this did is active
1044
+
let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else {
1045
+
eprintln!("failed to look up did from did_id {did_id:?}");
1046
+
continue;
1047
+
};
1048
+
let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else {
1049
+
eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
1050
+
continue;
1051
+
};
1052
+
if !active {
1053
+
continue;
1054
+
}
1055
+
1056
+
// page-management, continued
1057
+
// if we have a full page, and we're inserting a *new* key less than
1058
+
// the current max, then we can evict the current max
1059
+
let mut should_evict = false;
1060
+
let entry = grouped_counts.entry(fwd_target.clone()).or_insert_with(|| {
1061
+
// this is a *new* key, so kick the max if we're full
1062
+
should_evict = page_is_full;
1063
+
Default::default()
1064
+
});
1065
+
entry.0 += 1;
1066
+
entry.1.insert(did_id);
1067
+
1068
+
if should_evict {
1069
+
grouped_counts.pop_last();
1070
+
}
1071
+
}
1072
+
1073
+
let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len());
1074
+
for (target_id, (n, dids)) in &grouped_counts {
1075
+
let Some(target) = self
1076
+
.target_id_table
1077
+
.get_val_from_id(&self.db, target_id.0)?
1078
+
else {
1079
+
eprintln!("failed to look up target from target_id {target_id:?}");
1080
+
continue;
1081
+
};
1082
+
items.push((target.0 .0, *n, dids.len() as u64));
1083
+
}
1084
+
1085
+
let next = if grouped_counts.len() as u64 >= limit {
1086
+
// yeah.... it's a number saved as a string......sorry
1087
+
grouped_counts
1088
+
.keys()
1089
+
.next_back()
1090
+
.map(|k| format!("{}", k.0))
1091
+
} else {
1092
+
None
1093
+
};
1094
+
1095
+
Ok(PagedOrderedCollection { items, next })
1096
+
}
1097
+
829
1098
fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> {
830
1099
let target_key = TargetKey(
831
1100
Target(target.to_string()),
···
1042
1311
.map(|s| s.parse::<u64>())
1043
1312
.transpose()?
1044
1313
.unwrap_or(0);
1314
+
let started_at = self
1315
+
.db
1316
+
.get(STARTED_AT_KEY)?
1317
+
.map(|c| _vr(&c))
1318
+
.transpose()?
1319
+
.unwrap_or(COZY_FIRST_CURSOR);
1320
+
1321
+
let other_data = self
1322
+
.db
1323
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
1324
+
.map(|s| _vr(&s))
1325
+
.transpose()?
1326
+
.map(
1327
+
|TargetIdRepairState {
1328
+
current_us_started_at,
1329
+
id_when_started,
1330
+
latest_repaired_i,
1331
+
}| {
1332
+
HashMap::from([
1333
+
("current_us_started_at".to_string(), current_us_started_at),
1334
+
("id_when_started".to_string(), id_when_started),
1335
+
("latest_repaired_i".to_string(), latest_repaired_i),
1336
+
])
1337
+
},
1338
+
)
1339
+
.unwrap_or(HashMap::default());
1340
+
1045
1341
Ok(StorageStats {
1046
1342
dids,
1047
1343
targetables,
1048
1344
linking_records,
1345
+
started_at: Some(started_at),
1346
+
other_data,
1049
1347
})
1050
1348
}
1051
1349
}
···
1071
1369
impl AsRocksValue for &TargetId {}
1072
1370
impl KeyFromRocks for TargetKey {}
1073
1371
impl ValueFromRocks for TargetId {}
1372
+
1373
+
// temp?
1374
+
impl KeyFromRocks for TargetId {}
1375
+
impl AsRocksValue for &TargetKey {}
1074
1376
1075
1377
// target_links table
1076
1378
impl AsRocksKey for &TargetId {}
···
1142
1444
}
1143
1445
1144
1446
// target ids
1145
-
#[derive(Debug, Clone, Serialize, Deserialize)]
1447
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)]
1146
1448
struct TargetId(u64); // key
1147
1449
1148
-
#[derive(Debug, Clone, Serialize, Deserialize)]
1450
+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
1149
1451
pub struct Target(pub String); // the actual target/uri
1150
1452
1151
1453
// targets (uris, dids, etc.): the reverse index
+1
-1
constellation/templates/dids.html.j2
+1
-1
constellation/templates/dids.html.j2
···
27
27
{% for did in linking_dids %}
28
28
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }}
29
29
-> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a>
30
-
-> browse <a href="https://atproto-browser-plus-links.vercel.app/at/{{ did.0|urlencode }}">this DID record</a></pre>
30
+
-> browse <a href="https://pdsls.dev/at://{{ did.0|urlencode }}">this DID record</a></pre>
31
31
{% endfor %}
32
32
33
33
{% if let Some(c) = cursor %}
+54
constellation/templates/get-backlinks.html.j2
+54
constellation/templates/get-backlinks.html.j2
···
1
+
{% extends "base.html.j2" %}
2
+
{% import "try-it-macros.html.j2" as try_it %}
3
+
4
+
{% block title %}Backlinks{% endblock %}
5
+
{% block description %}All {{ query.source }} records with links to {{ query.subject }}{% endblock %}
6
+
7
+
{% block content %}
8
+
9
+
{% call try_it::get_backlinks(query.subject, query.source, query.did, query.limit) %}
10
+
11
+
<h2>
12
+
Links to <code>{{ query.subject }}</code>
13
+
{% if let Some(browseable_uri) = query.subject|to_browseable %}
14
+
<small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small>
15
+
{% endif %}
16
+
</h2>
17
+
18
+
<p><strong>{{ total|human_number }} links</strong> from <code>{{ query.source }}</code>.</p>
19
+
20
+
<ul>
21
+
<li>See distinct linking DIDs at <code>/links/distinct-dids</code>: <a href="/links/distinct-dids?target={{ query.subject|urlencode }}&collection={{ collection|urlencode }}&path={{ path|urlencode }}">/links/distinct-dids?target={{ query.subject }}&collection={{ collection }}&path={{ path }}</a></li>
22
+
<li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li>
23
+
</ul>
24
+
25
+
<h3>Links, most recent first:</h3>
26
+
27
+
{% for record in records %}
28
+
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ record.did().0 }} (<a href="/links/all?target={{ record.did().0|urlencode }}">DID links</a>)
29
+
<strong>Collection</strong>: {{ record.collection }}
30
+
<strong>RKey</strong>: {{ record.rkey }}
31
+
-> <a href="https://pdsls.dev/at://{{ record.did().0 }}/{{ record.collection }}/{{ record.rkey }}">browse record</a></pre>
32
+
{% endfor %}
33
+
34
+
{% if let Some(c) = cursor %}
35
+
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
36
+
<input type="hidden" name="subject" value="{{ query.subject }}" />
37
+
<input type="hidden" name="source" value="{{ query.source }}" />
38
+
<input type="hidden" name="limit" value="{{ query.limit }}" />
39
+
{% for did in query.did %}
40
+
<input type="hidden" name="did" value="{{ did }}" />
41
+
{% endfor %}
42
+
<input type="hidden" name="cursor" value={{ c|json|safe }} />
43
+
<button type="submit">next page…</button>
44
+
</form>
45
+
{% else %}
46
+
<button disabled><em>end of results</em></button>
47
+
{% endif %}
48
+
49
+
<details>
50
+
<summary>Raw JSON response</summary>
51
+
<pre class="code">{{ self|tojson }}</pre>
52
+
</details>
53
+
54
+
{% endblock %}
+67
constellation/templates/get-many-to-many-counts.html.j2
+67
constellation/templates/get-many-to-many-counts.html.j2
···
1
+
{% extends "base.html.j2" %}
2
+
{% import "try-it-macros.html.j2" as try_it %}
3
+
4
+
{% block title %}Many to Many counts{% endblock %}
5
+
{% block description %}Counts of many-to-many {{ query.source }} join records with links to {{ query.subject }} and a secondary target at {{ query.path_to_other }}{% endblock %}
6
+
7
+
{% block content %}
8
+
9
+
{% call try_it::get_many_to_many_counts(
10
+
query.subject,
11
+
query.source,
12
+
query.path_to_other,
13
+
query.did,
14
+
query.other_subject,
15
+
query.limit,
16
+
) %}
17
+
18
+
<h2>
19
+
Many-to-many links to <code>{{ query.subject }}</code> joining through <code>{{ query.path_to_other }}</code>
20
+
{% if let Some(browseable_uri) = query.subject|to_browseable %}
21
+
<small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small>
22
+
{% endif %}
23
+
</h2>
24
+
25
+
<p><strong>{% if cursor.is_some() || query.cursor.is_some() %}more than {% endif %}{{ counts_by_other_subject.len()|to_u64|human_number }} joins</strong> <code>{{ query.source }}โ{{ query.path_to_other }}</code></p>
26
+
27
+
<ul>
28
+
<li>See direct backlinks at <code>/xrpc/blue.microcosm.links.getBacklinks</code>: <a href="/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject|urlencode }}&source={{ query.source|urlencode }}">/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject }}&source={{ query.source }}</a></li>
29
+
<li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li>
30
+
</ul>
31
+
32
+
<h3>Counts by other subject:</h3>
33
+
34
+
{% for counts in counts_by_other_subject %}
35
+
<pre style="display: block; margin: 1em 2em" class="code"><strong>Joined subject</strong>: {{ counts.subject }}
36
+
<strong>Joining records</strong>: {{ counts.total }}
37
+
<strong>Unique joiner ids</strong>: {{ counts.distinct }}
38
+
-> {% if let Some(browseable_uri) = counts.subject|to_browseable -%}
39
+
<a href="{{ browseable_uri }}">browse record</a>
40
+
{%- endif %}</pre>
41
+
{% endfor %}
42
+
43
+
{% if let Some(c) = cursor %}
44
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
45
+
<input type="hidden" name="subject" value="{{ query.subject }}" />
46
+
<input type="hidden" name="source" value="{{ query.source }}" />
47
+
<input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" />
48
+
{% for did in query.did %}
49
+
<input type="hidden" name="did" value="{{ did }}" />
50
+
{% endfor %}
51
+
{% for otherSubject in query.other_subject %}
52
+
<input type="hidden" name="otherSubject" value="{{ otherSubject }}" />
53
+
{% endfor %}
54
+
<input type="hidden" name="limit" value="{{ query.limit }}" />
55
+
<input type="hidden" name="cursor" value={{ c|json|safe }} />
56
+
<button type="submit">next page…</button>
57
+
</form>
58
+
{% else %}
59
+
<button disabled><em>end of results</em></button>
60
+
{% endif %}
61
+
62
+
<details>
63
+
<summary>Raw JSON response</summary>
64
+
<pre class="code">{{ self|tojson }}</pre>
65
+
</details>
66
+
67
+
{% endblock %}
+57
-2
constellation/templates/hello.html.j2
+57
-2
constellation/templates/hello.html.j2
···
19
19
<p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p>
20
20
21
21
<p>
22
-
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">{{ days_indexed|human_number }}</span> days.<br/>
22
+
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">
23
+
{%- if let Some(days) = days_indexed %}
24
+
{{ days|human_number }}
25
+
{% else %}
26
+
???
27
+
{% endif -%}
28
+
</span> days.<br/>
23
29
<small>(indexing new records in real time, backfill coming soon!)</small>
24
30
</p>
25
31
26
-
<p>But feel free to use it! If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
32
+
{# {% for k, v in stats.other_data.iter() %}
33
+
<p><strong>{{ k }}</strong>: {{ v }}</p>
34
+
{% endfor %} #}
35
+
36
+
<p>You're welcome to use this public instance! Please do not build the torment nexus. If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
27
37
28
38
29
39
<h2>API Endpoints</h2>
30
40
41
+
<h3 class="route"><code>GET /xrpc/blue.microcosm.links.getBacklinks</code></h3>
42
+
43
+
<p>A list of records linking to any record, identity, or uri.</p>
44
+
45
+
<h4>Query parameters:</h4>
46
+
47
+
<ul>
48
+
<li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
49
+
<li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li>
50
+
<li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li>
51
+
<li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li>
52
+
</ul>
53
+
54
+
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
55
+
{% call try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16) %}
56
+
57
+
58
+
<h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToManyCounts</code></h3>
59
+
60
+
<p>TODO: description</p>
61
+
62
+
<h4>Query parameters:</h4>
63
+
64
+
<ul>
65
+
<li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
66
+
<li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li>
67
+
<li><p><code>pathToOther</code>: required. Path to the secondary link in the many-to-many record. Example: <code>otherThing.uri</code></p></li>
68
+
<li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li>
69
+
<li><p><code>otherSubject</code>: optional, filter secondary links to specific subjects. Include multiple times to filter by multiple users. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
70
+
<li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li>
71
+
</ul>
72
+
73
+
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
74
+
{% call try_it::get_many_to_many_counts(
75
+
"at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue",
76
+
"sh.tangled.label.op:add[].key",
77
+
"subject",
78
+
[""],
79
+
[""],
80
+
25,
81
+
) %}
82
+
83
+
31
84
<h3 class="route"><code>GET /links</code></h3>
32
85
33
86
<p>A list of records linking to a target.</p>
87
+
88
+
<p>[DEPRECATED]: use <code>GET /xrpc/blue.microcosm.links.getBacklinks</code>. New apps should avoid it, but this endpoint <strong>will</strong> remain supported for the forseeable future.</p>
34
89
35
90
<h4>Query parameters:</h4>
36
91
+68
-1
constellation/templates/try-it-macros.html.j2
+68
-1
constellation/templates/try-it-macros.html.j2
···
1
+
{% macro get_backlinks(subject, source, dids, limit) %}
2
+
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
3
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinks
4
+
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
5
+
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
6
+
{%- for did in dids %}{% if !did.is_empty() %}
7
+
&did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %}
8
+
<span id="did-placeholder"></span> <button id="add-did">+ did filter</button>
9
+
&limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre>
10
+
</form>
11
+
<script>
12
+
const addDidButton = document.getElementById('add-did');
13
+
const didPlaceholder = document.getElementById('did-placeholder');
14
+
addDidButton.addEventListener('click', e => {
15
+
e.preventDefault();
16
+
const i = document.createElement('input');
17
+
i.placeholder = 'did:plc:...';
18
+
i.name = "did"
19
+
const p = addDidButton.parentNode;
20
+
p.insertBefore(document.createTextNode('&did= '), didPlaceholder);
21
+
p.insertBefore(i, didPlaceholder);
22
+
p.insertBefore(document.createTextNode('\n '), didPlaceholder);
23
+
});
24
+
</script>
25
+
{% endmacro %}
26
+
27
+
{% macro get_many_to_many_counts(subject, source, pathToOther, dids, otherSubjects, limit) %}
28
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
29
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getManyToManyCounts
30
+
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
31
+
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
32
+
&pathToOther= <input type="text" name="pathToOther" value="{{ pathToOther }}" placeholder="otherThing.uri" />
33
+
{%- for did in dids %}{% if !did.is_empty() %}
34
+
&did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %}
35
+
<span id="m2m-subject-placeholder"></span> <button id="m2m-add-subject">+ other subject filter</button>
36
+
{%- for otherSubject in otherSubjects %}{% if !otherSubject.is_empty() %}
37
+
&otherSubject= <input type="text" name="did" value="{{ otherSubject }}" placeholder="at-uri, did, uri..." />{% endif %}{% endfor %}
38
+
<span id="m2m-did-placeholder"></span> <button id="m2m-add-did">+ did filter</button>
39
+
&limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre>
40
+
</form>
41
+
<script>
42
+
const m2mAddDidButton = document.getElementById('m2m-add-did');
43
+
const m2mDidPlaceholder = document.getElementById('m2m-did-placeholder');
44
+
m2mAddDidButton.addEventListener('click', e => {
45
+
e.preventDefault();
46
+
const i = document.createElement('input');
47
+
i.placeholder = 'did:plc:...';
48
+
i.name = "did"
49
+
const p = m2mAddDidButton.parentNode;
50
+
p.insertBefore(document.createTextNode('&did= '), m2mDidPlaceholder);
51
+
p.insertBefore(i, m2mDidPlaceholder);
52
+
p.insertBefore(document.createTextNode('\n '), m2mDidPlaceholder);
53
+
});
54
+
const m2mAddSubjectButton = document.getElementById('m2m-add-subject');
55
+
const m2mSubjectPlaceholder = document.getElementById('m2m-subject-placeholder');
56
+
m2mAddSubjectButton.addEventListener('click', e => {
57
+
e.preventDefault();
58
+
const i = document.createElement('input');
59
+
i.placeholder = 'at-uri, did, uri...';
60
+
i.name = "otherSubject"
61
+
const p = m2mAddSubjectButton.parentNode;
62
+
p.insertBefore(document.createTextNode('&otherSubject= '), m2mSubjectPlaceholder);
63
+
p.insertBefore(i, m2mSubjectPlaceholder);
64
+
p.insertBefore(document.createTextNode('\n '), m2mSubjectPlaceholder);
65
+
});
66
+
</script>
67
+
{% endmacro %}
68
+
1
69
{% macro links(target, collection, path, dids, limit) %}
2
70
<form method="get" action="/links">
3
71
<pre class="code"><strong>GET</strong> /links
···
24
92
});
25
93
</script>
26
94
{% endmacro %}
27
-
28
95
29
96
{% macro dids(target, collection, path) %}
30
97
<form method="get" action="/links/distinct-dids">
+2
links/Cargo.toml
+2
links/Cargo.toml
+3
-2
links/src/lib.rs
+3
-2
links/src/lib.rs
···
1
1
use fluent_uri::Uri;
2
+
use serde::{Deserialize, Serialize};
2
3
3
4
pub mod at_uri;
4
5
pub mod did;
···
6
7
7
8
pub use record::collect_links;
8
9
9
-
#[derive(Debug, Clone, Ord, Eq, PartialOrd, PartialEq)]
10
+
#[derive(Debug, Clone, Ord, Eq, PartialOrd, PartialEq, Serialize, Deserialize)]
10
11
pub enum Link {
11
12
AtUri(String),
12
13
Uri(String),
···
59
60
}
60
61
}
61
62
62
-
#[derive(Debug, PartialEq)]
63
+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
63
64
pub struct CollectedLink {
64
65
pub path: String,
65
66
pub target: Link,
+41
links/src/record.rs
+41
links/src/record.rs
···
1
+
use dasl::drisl::Value as DrislValue;
1
2
use tinyjson::JsonValue;
2
3
3
4
use crate::{parse_any_link, CollectedLink};
···
36
37
}
37
38
}
38
39
40
+
pub fn walk_drisl(path: &str, v: &DrislValue, found: &mut Vec<CollectedLink>) {
41
+
match v {
42
+
DrislValue::Map(o) => {
43
+
for (key, child) in o {
44
+
walk_drisl(&format!("{path}.{key}"), child, found)
45
+
}
46
+
}
47
+
DrislValue::Array(a) => {
48
+
for child in a {
49
+
let child_p = match child {
50
+
DrislValue::Map(o) => {
51
+
if let Some(DrislValue::Text(t)) = o.get("$type") {
52
+
format!("{path}[{t}]")
53
+
} else {
54
+
format!("{path}[]")
55
+
}
56
+
}
57
+
_ => format!("{path}[]"),
58
+
};
59
+
walk_drisl(&child_p, child, found)
60
+
}
61
+
}
62
+
DrislValue::Text(s) => {
63
+
if let Some(link) = parse_any_link(s) {
64
+
found.push(CollectedLink {
65
+
path: path.to_string(),
66
+
target: link,
67
+
});
68
+
}
69
+
}
70
+
_ => {}
71
+
}
72
+
}
73
+
39
74
pub fn collect_links(v: &JsonValue) -> Vec<CollectedLink> {
40
75
let mut found = vec![];
41
76
walk_record("", v, &mut found);
77
+
found
78
+
}
79
+
80
+
pub fn collect_links_drisl(v: &DrislValue) -> Vec<CollectedLink> {
81
+
let mut found = vec![];
82
+
walk_drisl("", v, &mut found);
42
83
found
43
84
}
44
85
+8
spacedust/Cargo.toml
+8
spacedust/Cargo.toml
···
4
4
edition = "2024"
5
5
6
6
[dependencies]
7
+
anyhow = "1.0.100"
8
+
async-channel = "2.5.0"
7
9
async-trait = "0.1.88"
8
10
clap = { version = "4.5.40", features = ["derive"] }
9
11
ctrlc = "3.4.7"
12
+
dasl = "0.2.0"
10
13
dropshot = "0.16.2"
11
14
env_logger = "0.11.8"
15
+
fjall = "3.0.0-pre.0"
12
16
futures = "0.3.31"
13
17
http = "1.3.1"
18
+
ipld-core = { version = "0.4.2", features = ["serde"] }
14
19
jetstream = { path = "../jetstream", features = ["metrics"] }
15
20
links = { path = "../links" }
16
21
log = "0.4.27"
17
22
metrics = "0.24.2"
18
23
metrics-exporter-prometheus = { version = "0.17.1", features = ["http-listener"] }
19
24
rand = "0.9.1"
25
+
repo-stream = "0.2.2"
26
+
reqwest = { version = "0.12.24", features = ["json", "stream"] }
20
27
schemars = "0.8.22"
21
28
semver = "1.0.26"
22
29
serde = { version = "1.0.219", features = ["derive"] }
30
+
serde_ipld_dagcbor = "0.6.4"
23
31
serde_json = "1.0.140"
24
32
serde_qs = "1.0.0-rc.3"
25
33
thiserror = "2.0.12"
+21
spacedust/src/bin/import_car_file.rs
+21
spacedust/src/bin/import_car_file.rs
···
1
+
use clap::Parser;
2
+
use std::path::PathBuf;
3
+
4
+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
5
+
6
+
#[derive(Debug, Parser)]
7
+
struct Args {
8
+
#[arg()]
9
+
file: PathBuf,
10
+
}
11
+
12
+
#[tokio::main]
13
+
async fn main() -> Result<()> {
14
+
env_logger::init();
15
+
16
+
let Args { file } = Args::parse();
17
+
18
+
let _reader = tokio::fs::File::open(file).await?;
19
+
20
+
Ok(())
21
+
}
+258
spacedust/src/bin/import_scraped.rs
+258
spacedust/src/bin/import_scraped.rs
···
1
+
use clap::Parser;
2
+
use links::CollectedLink;
3
+
use repo_stream::{
4
+
DiskBuilder, DiskStore, Driver, DriverBuilder, Processable, drive::DriverBuilderWithProcessor,
5
+
drive::NeedDisk,
6
+
};
7
+
use std::path::PathBuf;
8
+
use std::sync::{
9
+
Arc,
10
+
atomic::{AtomicUsize, Ordering},
11
+
};
12
+
use tokio::{io::AsyncRead, task::JoinSet};
13
+
14
+
type Result<T> = anyhow::Result<T>; //std::result::Result<T, Box<dyn std::error::Error>>;
15
+
16
+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17
+
struct CollectedProcessed(CollectedLink);
18
+
19
+
impl Processable for CollectedProcessed {
20
+
fn get_size(&self) -> usize {
21
+
self.0.path.capacity() + self.0.target.as_str().len()
22
+
}
23
+
}
24
+
25
+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
26
+
struct ErrString(String);
27
+
28
+
impl Processable for ErrString {
29
+
fn get_size(&self) -> usize {
30
+
self.0.capacity()
31
+
}
32
+
}
33
+
34
+
type Processed = std::result::Result<Vec<CollectedProcessed>, ErrString>;
35
+
36
+
/// hacky for now: put errors in strings ๐คทโโ๏ธ
37
+
fn process(block: Vec<u8>) -> Processed {
38
+
let value: dasl::drisl::Value = dasl::drisl::from_slice(&block)
39
+
.map_err(|e| ErrString(format!("failed to parse block with drisl: {e:?}")))?;
40
+
let links = links::record::collect_links_drisl(&value)
41
+
.into_iter()
42
+
.map(CollectedProcessed)
43
+
.collect();
44
+
Ok(links)
45
+
}
46
+
47
+
#[derive(Debug, Parser)]
48
+
struct Args {
49
+
#[arg(long)]
50
+
cars_folder: PathBuf,
51
+
#[arg(long)]
52
+
mem_workers: usize,
53
+
#[arg(long)]
54
+
disk_workers: usize,
55
+
#[arg(long)]
56
+
disk_folder: PathBuf,
57
+
}
58
+
59
+
async fn get_cars(
60
+
cars_folder: PathBuf,
61
+
tx: async_channel::Sender<tokio::io::BufReader<tokio::fs::File>>,
62
+
) -> Result<()> {
63
+
let mut dir = tokio::fs::read_dir(cars_folder).await?;
64
+
while let Some(entry) = dir.next_entry().await? {
65
+
if !entry.file_type().await?.is_file() {
66
+
continue;
67
+
}
68
+
let reader = tokio::fs::File::open(&entry.path()).await?;
69
+
let reader = tokio::io::BufReader::new(reader);
70
+
tx.send(reader).await?;
71
+
}
72
+
Ok(())
73
+
}
74
+
75
+
async fn drive_mem<R: AsyncRead + Unpin + Send + Sync + 'static>(
76
+
f: R,
77
+
builder: &DriverBuilderWithProcessor<Processed>,
78
+
disk_tx: &async_channel::Sender<NeedDisk<R, Processed>>,
79
+
) -> Result<Option<(usize, usize)>> {
80
+
let mut n = 0;
81
+
let mut n_records = 0;
82
+
match builder.load_car(f).await? {
83
+
Driver::Memory(_commit, mut driver) => {
84
+
while let Some(chunk) = driver.next_chunk(512).await? {
85
+
n_records += chunk.len();
86
+
for (_key, links) in chunk {
87
+
match links {
88
+
Ok(links) => n += links.len(),
89
+
Err(e) => eprintln!("wat: {e:?}"),
90
+
}
91
+
}
92
+
}
93
+
Ok(Some((n, n_records)))
94
+
}
95
+
Driver::Disk(need_disk) => {
96
+
disk_tx.send(need_disk).await?;
97
+
Ok(None)
98
+
}
99
+
}
100
+
}
101
+
102
+
async fn mem_worker<R: AsyncRead + Unpin + Send + Sync + 'static>(
103
+
car_rx: async_channel::Receiver<R>,
104
+
disk_tx: async_channel::Sender<NeedDisk<R, Processed>>,
105
+
n: Arc<AtomicUsize>,
106
+
n_records: Arc<AtomicUsize>,
107
+
) -> Result<()> {
108
+
let builder = DriverBuilder::new()
109
+
.with_block_processor(process) // don't care just counting records
110
+
.with_mem_limit_mb(128);
111
+
while let Ok(f) = car_rx.recv().await {
112
+
let driven = match drive_mem(f, &builder, &disk_tx).await {
113
+
Ok(d) => d,
114
+
Err(e) => {
115
+
eprintln!("failed to drive mem: {e:?}. skipping...");
116
+
continue;
117
+
}
118
+
};
119
+
if let Some((drove, recs)) = driven {
120
+
n.fetch_add(drove, Ordering::Relaxed);
121
+
n_records.fetch_add(recs, Ordering::Relaxed);
122
+
}
123
+
}
124
+
Ok(())
125
+
}
126
+
127
+
async fn drive_disk<R: AsyncRead + Unpin>(
128
+
needed: NeedDisk<R, Processed>,
129
+
store: DiskStore,
130
+
) -> Result<(usize, usize, DiskStore)> {
131
+
let (_commit, mut driver) = needed.finish_loading(store).await?;
132
+
let mut n = 0;
133
+
let mut n_records = 0;
134
+
while let Some(chunk) = driver.next_chunk(512).await? {
135
+
n_records += chunk.len();
136
+
for (_key, links) in chunk {
137
+
match links {
138
+
Ok(links) => n += links.len(),
139
+
Err(e) => eprintln!("wat: {e:?}"),
140
+
}
141
+
}
142
+
}
143
+
let store = driver.reset_store().await?;
144
+
Ok((n, n_records, store))
145
+
}
146
+
147
+
async fn disk_worker<R: AsyncRead + Unpin>(
148
+
worker_id: usize,
149
+
disk_rx: async_channel::Receiver<NeedDisk<R, Processed>>,
150
+
folder: PathBuf,
151
+
n: Arc<AtomicUsize>,
152
+
n_records: Arc<AtomicUsize>,
153
+
disk_workers_active: Arc<AtomicUsize>,
154
+
) -> Result<()> {
155
+
let mut file = folder;
156
+
file.push(format!("disk-worker-{worker_id}.sqlite"));
157
+
let builder = DiskBuilder::new().with_cache_size_mb(128);
158
+
let mut store = builder.open(file.clone()).await?;
159
+
while let Ok(needed) = disk_rx.recv().await {
160
+
let active = disk_workers_active.fetch_add(1, Ordering::AcqRel);
161
+
println!("-> disk workers active: {}", active + 1);
162
+
let (drove, records) = match drive_disk(needed, store).await {
163
+
Ok((d, r, s)) => {
164
+
store = s;
165
+
(d, r)
166
+
}
167
+
Err(e) => {
168
+
eprintln!("failed to drive disk: {e:?}. skipping...");
169
+
store = builder.open(file.clone()).await?;
170
+
continue;
171
+
}
172
+
};
173
+
n.fetch_add(drove, Ordering::Relaxed);
174
+
n_records.fetch_add(records, Ordering::Relaxed);
175
+
let were_active = disk_workers_active.fetch_sub(1, Ordering::AcqRel);
176
+
println!("<- disk workers active: {}", were_active - 1);
177
+
}
178
+
Ok(())
179
+
}
180
+
181
+
#[tokio::main]
182
+
async fn main() -> Result<()> {
183
+
env_logger::init();
184
+
185
+
let Args {
186
+
cars_folder,
187
+
disk_folder,
188
+
disk_workers,
189
+
mem_workers,
190
+
} = Args::parse();
191
+
192
+
let mut set = JoinSet::<Result<()>>::new();
193
+
194
+
let (cars_tx, cars_rx) = async_channel::bounded(2);
195
+
set.spawn(get_cars(cars_folder, cars_tx));
196
+
197
+
let n: Arc<AtomicUsize> = Arc::new(0.into());
198
+
let n_records: Arc<AtomicUsize> = Arc::new(0.into());
199
+
let disk_workers_active: Arc<AtomicUsize> = Arc::new(0.into());
200
+
201
+
set.spawn({
202
+
let n = n.clone();
203
+
let n_records = n_records.clone();
204
+
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
205
+
async move {
206
+
let mut last_n = n.load(Ordering::Relaxed);
207
+
let mut last_n_records = n.load(Ordering::Relaxed);
208
+
loop {
209
+
interval.tick().await;
210
+
let n = n.load(Ordering::Relaxed);
211
+
let n_records = n_records.load(Ordering::Relaxed);
212
+
let diff_n = n - last_n;
213
+
let diff_records = n_records - last_n_records;
214
+
println!("rate: {} rec/sec; {} n/sec", diff_records / 10, diff_n / 10);
215
+
if n_records > 0 && diff_records == 0 {
216
+
println!("zero encountered, stopping rate calculation polling.");
217
+
break Ok(());
218
+
}
219
+
last_n = n;
220
+
last_n_records = n_records;
221
+
}
222
+
}
223
+
});
224
+
225
+
let (needs_disk_tx, needs_disk_rx) = async_channel::bounded(disk_workers);
226
+
227
+
for _ in 0..mem_workers {
228
+
set.spawn(mem_worker(
229
+
cars_rx.clone(),
230
+
needs_disk_tx.clone(),
231
+
n.clone(),
232
+
n_records.clone(),
233
+
));
234
+
}
235
+
drop(cars_rx);
236
+
drop(needs_disk_tx);
237
+
238
+
tokio::fs::create_dir_all(disk_folder.clone()).await?;
239
+
for id in 0..disk_workers {
240
+
set.spawn(disk_worker(
241
+
id,
242
+
needs_disk_rx.clone(),
243
+
disk_folder.clone(),
244
+
n.clone(),
245
+
n_records.clone(),
246
+
disk_workers_active.clone(),
247
+
));
248
+
}
249
+
drop(needs_disk_rx);
250
+
251
+
while let Some(res) = set.join_next().await {
252
+
println!("task from set joined: {res:?}");
253
+
}
254
+
255
+
eprintln!("total records processed: {n_records:?}; total n: {n:?}");
256
+
257
+
Ok(())
258
+
}
+137
spacedust/src/bin/scrape_pds.rs
+137
spacedust/src/bin/scrape_pds.rs
···
1
+
use clap::Parser;
2
+
use reqwest::Url;
3
+
use serde::Deserialize;
4
+
use std::path::PathBuf;
5
+
use tokio::io::AsyncWriteExt;
6
+
use tokio::{sync::mpsc, time};
7
+
8
+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
9
+
10
+
use futures::StreamExt;
11
+
12
+
#[derive(Debug, Parser)]
13
+
struct Args {
14
+
#[arg(long)]
15
+
pds: Url,
16
+
#[arg(long)]
17
+
throttle_ms: u64, // 100ms per pds?
18
+
#[arg(long)]
19
+
folder: PathBuf,
20
+
}
21
+
22
+
async fn download_repo(
23
+
client: &reqwest::Client,
24
+
mut pds: Url,
25
+
did: String,
26
+
mut path: PathBuf,
27
+
) -> Result<()> {
28
+
path.push(format!("{did}.car"));
29
+
let f = tokio::fs::File::create(path).await?;
30
+
let mut w = tokio::io::BufWriter::new(f);
31
+
32
+
pds.set_path("/xrpc/com.atproto.sync.getRepo");
33
+
pds.set_query(Some(&format!("did={did}")));
34
+
let mut byte_stream = client.get(pds).send().await?.bytes_stream();
35
+
36
+
while let Some(stuff) = byte_stream.next().await {
37
+
tokio::io::copy(&mut stuff?.as_ref(), &mut w).await?;
38
+
}
39
+
w.flush().await?;
40
+
41
+
Ok(())
42
+
}
43
+
44
+
#[derive(Debug, Deserialize)]
45
+
struct RepoInfo {
46
+
did: String,
47
+
active: bool,
48
+
}
49
+
50
+
#[derive(Debug, Deserialize)]
51
+
struct ListReposResponse {
52
+
cursor: Option<String>,
53
+
repos: Vec<RepoInfo>,
54
+
}
55
+
56
+
fn get_pds_dids(client: reqwest::Client, mut pds: Url) -> mpsc::Receiver<String> {
57
+
let (tx, rx) = mpsc::channel(2);
58
+
tokio::task::spawn(async move {
59
+
pds.set_path("/xrpc/com.atproto.sync.listRepos");
60
+
let mut cursor = None;
61
+
62
+
loop {
63
+
if let Some(c) = cursor {
64
+
pds.set_query(Some(&format!("cursor={c}")));
65
+
}
66
+
let res: ListReposResponse = client
67
+
.get(pds.clone())
68
+
.send()
69
+
.await
70
+
.expect("to send request")
71
+
.error_for_status()
72
+
.expect("to be ok")
73
+
.json()
74
+
.await
75
+
.expect("json response");
76
+
for repo in res.repos {
77
+
if repo.active {
78
+
tx.send(repo.did)
79
+
.await
80
+
.expect("to be able to send on the channel");
81
+
}
82
+
}
83
+
cursor = res.cursor;
84
+
if cursor.is_none() {
85
+
break;
86
+
}
87
+
}
88
+
});
89
+
rx
90
+
}
91
+
92
+
#[tokio::main]
93
+
async fn main() -> Result<()> {
94
+
env_logger::init();
95
+
96
+
let Args {
97
+
pds,
98
+
throttle_ms,
99
+
folder,
100
+
} = Args::parse();
101
+
102
+
tokio::fs::create_dir_all(folder.clone()).await?;
103
+
104
+
let client = reqwest::Client::builder()
105
+
.user_agent("microcosm/spacedust-testing")
106
+
.build()?;
107
+
108
+
let mut dids = get_pds_dids(client.clone(), pds.clone());
109
+
110
+
let mut interval = time::interval(time::Duration::from_millis(throttle_ms));
111
+
let mut oks = 0;
112
+
let mut single_fails = 0;
113
+
let mut double_fails = 0;
114
+
115
+
while let Some(did) = dids.recv().await {
116
+
interval.tick().await;
117
+
println!("did: {did:?}");
118
+
if let Err(e) = download_repo(&client, pds.clone(), did.clone(), folder.clone()).await {
119
+
single_fails += 1;
120
+
eprintln!("failed to download repo for did: {did:?}: {e:?}. retrying in a moment...");
121
+
tokio::time::sleep(time::Duration::from_secs(3)).await;
122
+
interval.reset();
123
+
if let Err(e) = download_repo(&client, pds.clone(), did.clone(), folder.clone()).await {
124
+
double_fails += 1;
125
+
eprintln!("failed again: {e:?}. moving on in a moment...");
126
+
tokio::time::sleep(time::Duration::from_secs(1)).await;
127
+
continue;
128
+
}
129
+
}
130
+
oks += 1;
131
+
println!(" -> done. did: {did:?}");
132
+
}
133
+
134
+
eprintln!("got {oks} repos. single fails: {single_fails}; doubles: {double_fails}.");
135
+
136
+
Ok(())
137
+
}
+1
spacedust/src/lib.rs
+1
spacedust/src/lib.rs
spacedust/src/storage/car/drive.rs
spacedust/src/storage/car/drive.rs
This is a binary file and will not be displayed.
+1
spacedust/src/storage/car/mod.rs
+1
spacedust/src/storage/car/mod.rs
···
1
+
spacedust/src/storage/car/walk.rs
spacedust/src/storage/car/walk.rs
This is a binary file and will not be displayed.
+9
spacedust/src/storage/fjall/mod.rs
+9
spacedust/src/storage/fjall/mod.rs