+293
-16
Cargo.lock
+293
-16
Cargo.lock
···
401
401
source = "registry+https://github.com/rust-lang/crates.io-index"
402
402
checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088"
403
403
dependencies = [
404
-
"bitflags",
404
+
"bitflags 2.8.0",
405
405
"cexpr",
406
406
"clang-sys",
407
407
"itertools 0.12.1",
···
417
417
"syn",
418
418
"which",
419
419
]
420
+
421
+
[[package]]
422
+
name = "bitflags"
423
+
version = "1.3.2"
424
+
source = "registry+https://github.com/rust-lang/crates.io-index"
425
+
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
420
426
421
427
[[package]]
422
428
name = "bitflags"
···
656
662
"metrics",
657
663
"metrics-exporter-prometheus",
658
664
"parakeet-db",
665
+
"parakeet-index",
659
666
"reqwest",
660
667
"serde",
661
668
"serde_bytes",
···
663
670
"serde_json",
664
671
"tokio",
665
672
"tokio-postgres",
673
+
"tokio-stream",
666
674
"tokio-tungstenite",
667
675
"tracing",
668
676
"tracing-subscriber",
···
710
718
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
711
719
dependencies = [
712
720
"libc",
721
+
]
722
+
723
+
[[package]]
724
+
name = "crc32fast"
725
+
version = "1.4.2"
726
+
source = "registry+https://github.com/rust-lang/crates.io-index"
727
+
checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3"
728
+
dependencies = [
729
+
"cfg-if",
713
730
]
714
731
715
732
[[package]]
···
847
864
source = "registry+https://github.com/rust-lang/crates.io-index"
848
865
checksum = "ccf1bedf64cdb9643204a36dd15b19a6ce8e7aa7f7b105868e9f1fad5ffa7d12"
849
866
dependencies = [
850
-
"bitflags",
867
+
"bitflags 2.8.0",
851
868
"byteorder",
852
869
"chrono",
853
870
"diesel_derives",
···
1041
1058
]
1042
1059
1043
1060
[[package]]
1061
+
name = "fixedbitset"
1062
+
version = "0.5.7"
1063
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1064
+
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
1065
+
1066
+
[[package]]
1044
1067
name = "flume"
1045
1068
version = "0.11.1"
1046
1069
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1086
1109
checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
1087
1110
dependencies = [
1088
1111
"percent-encoding",
1112
+
]
1113
+
1114
+
[[package]]
1115
+
name = "fs2"
1116
+
version = "0.4.3"
1117
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1118
+
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
1119
+
dependencies = [
1120
+
"libc",
1121
+
"winapi",
1089
1122
]
1090
1123
1091
1124
[[package]]
···
1197
1230
]
1198
1231
1199
1232
[[package]]
1233
+
name = "fxhash"
1234
+
version = "0.2.1"
1235
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1236
+
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
1237
+
dependencies = [
1238
+
"byteorder",
1239
+
]
1240
+
1241
+
[[package]]
1200
1242
name = "generic-array"
1201
1243
version = "0.14.7"
1202
1244
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1335
1377
"ipconfig",
1336
1378
"lru-cache",
1337
1379
"once_cell",
1338
-
"parking_lot",
1380
+
"parking_lot 0.12.3",
1339
1381
"rand",
1340
1382
"resolv-conf",
1341
1383
"smallvec",
···
1459
1501
]
1460
1502
1461
1503
[[package]]
1504
+
name = "hyper-timeout"
1505
+
version = "0.5.2"
1506
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1507
+
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
1508
+
dependencies = [
1509
+
"hyper",
1510
+
"hyper-util",
1511
+
"pin-project-lite",
1512
+
"tokio",
1513
+
"tower-service",
1514
+
]
1515
+
1516
+
[[package]]
1462
1517
name = "hyper-tls"
1463
1518
version = "0.6.0"
1464
1519
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1684
1739
checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb"
1685
1740
1686
1741
[[package]]
1742
+
name = "instant"
1743
+
version = "0.1.13"
1744
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1745
+
checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
1746
+
dependencies = [
1747
+
"cfg-if",
1748
+
]
1749
+
1750
+
[[package]]
1687
1751
name = "ipconfig"
1688
1752
version = "0.3.2"
1689
1753
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1999
2063
]
2000
2064
2001
2065
[[package]]
2066
+
name = "multimap"
2067
+
version = "0.10.0"
2068
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2069
+
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
2070
+
2071
+
[[package]]
2002
2072
name = "nanorand"
2003
2073
version = "0.7.0"
2004
2074
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2084
2154
source = "registry+https://github.com/rust-lang/crates.io-index"
2085
2155
checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5"
2086
2156
dependencies = [
2087
-
"bitflags",
2157
+
"bitflags 2.8.0",
2088
2158
"cfg-if",
2089
2159
"foreign-types",
2090
2160
"libc",
···
2145
2215
"itertools 0.14.0",
2146
2216
"lexica",
2147
2217
"parakeet-db",
2218
+
"parakeet-index",
2148
2219
"serde",
2149
2220
"serde_json",
2150
2221
"tokio",
···
2163
2234
]
2164
2235
2165
2236
[[package]]
2237
+
name = "parakeet-index"
2238
+
version = "0.1.0"
2239
+
dependencies = [
2240
+
"eyre",
2241
+
"figment",
2242
+
"itertools 0.14.0",
2243
+
"prost",
2244
+
"serde",
2245
+
"sled",
2246
+
"tokio",
2247
+
"tonic",
2248
+
"tonic-build",
2249
+
"tracing",
2250
+
"tracing-subscriber",
2251
+
]
2252
+
2253
+
[[package]]
2166
2254
name = "parakeet-lexgen"
2167
2255
version = "0.1.0"
2168
2256
dependencies = [
···
2182
2270
2183
2271
[[package]]
2184
2272
name = "parking_lot"
2273
+
version = "0.11.2"
2274
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2275
+
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
2276
+
dependencies = [
2277
+
"instant",
2278
+
"lock_api",
2279
+
"parking_lot_core 0.8.6",
2280
+
]
2281
+
2282
+
[[package]]
2283
+
name = "parking_lot"
2185
2284
version = "0.12.3"
2186
2285
source = "registry+https://github.com/rust-lang/crates.io-index"
2187
2286
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
2188
2287
dependencies = [
2189
2288
"lock_api",
2190
-
"parking_lot_core",
2289
+
"parking_lot_core 0.9.10",
2290
+
]
2291
+
2292
+
[[package]]
2293
+
name = "parking_lot_core"
2294
+
version = "0.8.6"
2295
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2296
+
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
2297
+
dependencies = [
2298
+
"cfg-if",
2299
+
"instant",
2300
+
"libc",
2301
+
"redox_syscall 0.2.16",
2302
+
"smallvec",
2303
+
"winapi",
2191
2304
]
2192
2305
2193
2306
[[package]]
···
2198
2311
dependencies = [
2199
2312
"cfg-if",
2200
2313
"libc",
2201
-
"redox_syscall",
2314
+
"redox_syscall 0.5.8",
2202
2315
"smallvec",
2203
2316
"windows-targets 0.52.6",
2204
2317
]
···
2239
2352
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
2240
2353
2241
2354
[[package]]
2355
+
name = "petgraph"
2356
+
version = "0.7.1"
2357
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2358
+
checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772"
2359
+
dependencies = [
2360
+
"fixedbitset",
2361
+
"indexmap",
2362
+
]
2363
+
2364
+
[[package]]
2242
2365
name = "phf"
2243
2366
version = "0.11.3"
2244
2367
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2254
2377
checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5"
2255
2378
dependencies = [
2256
2379
"siphasher",
2380
+
]
2381
+
2382
+
[[package]]
2383
+
name = "pin-project"
2384
+
version = "1.1.10"
2385
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2386
+
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
2387
+
dependencies = [
2388
+
"pin-project-internal",
2389
+
]
2390
+
2391
+
[[package]]
2392
+
name = "pin-project-internal"
2393
+
version = "1.1.10"
2394
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2395
+
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
2396
+
dependencies = [
2397
+
"proc-macro2",
2398
+
"quote",
2399
+
"syn",
2257
2400
]
2258
2401
2259
2402
[[package]]
···
2378
2521
]
2379
2522
2380
2523
[[package]]
2524
+
name = "prost"
2525
+
version = "0.13.5"
2526
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2527
+
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
2528
+
dependencies = [
2529
+
"bytes",
2530
+
"prost-derive",
2531
+
]
2532
+
2533
+
[[package]]
2534
+
name = "prost-build"
2535
+
version = "0.13.5"
2536
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2537
+
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
2538
+
dependencies = [
2539
+
"heck",
2540
+
"itertools 0.14.0",
2541
+
"log",
2542
+
"multimap",
2543
+
"once_cell",
2544
+
"petgraph",
2545
+
"prettyplease",
2546
+
"prost",
2547
+
"prost-types",
2548
+
"regex",
2549
+
"syn",
2550
+
"tempfile",
2551
+
]
2552
+
2553
+
[[package]]
2554
+
name = "prost-derive"
2555
+
version = "0.13.5"
2556
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2557
+
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
2558
+
dependencies = [
2559
+
"anyhow",
2560
+
"itertools 0.14.0",
2561
+
"proc-macro2",
2562
+
"quote",
2563
+
"syn",
2564
+
]
2565
+
2566
+
[[package]]
2567
+
name = "prost-types"
2568
+
version = "0.13.5"
2569
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2570
+
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
2571
+
dependencies = [
2572
+
"prost",
2573
+
]
2574
+
2575
+
[[package]]
2381
2576
name = "quanta"
2382
2577
version = "0.12.5"
2383
2578
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2452
2647
source = "registry+https://github.com/rust-lang/crates.io-index"
2453
2648
checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146"
2454
2649
dependencies = [
2455
-
"bitflags",
2650
+
"bitflags 2.8.0",
2651
+
]
2652
+
2653
+
[[package]]
2654
+
name = "redox_syscall"
2655
+
version = "0.2.16"
2656
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2657
+
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
2658
+
dependencies = [
2659
+
"bitflags 1.3.2",
2456
2660
]
2457
2661
2458
2662
[[package]]
···
2461
2665
source = "registry+https://github.com/rust-lang/crates.io-index"
2462
2666
checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834"
2463
2667
dependencies = [
2464
-
"bitflags",
2668
+
"bitflags 2.8.0",
2465
2669
]
2466
2670
2467
2671
[[package]]
···
2580
2784
source = "registry+https://github.com/rust-lang/crates.io-index"
2581
2785
checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154"
2582
2786
dependencies = [
2583
-
"bitflags",
2787
+
"bitflags 2.8.0",
2584
2788
"errno",
2585
2789
"libc",
2586
2790
"linux-raw-sys",
···
2691
2895
source = "registry+https://github.com/rust-lang/crates.io-index"
2692
2896
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
2693
2897
dependencies = [
2694
-
"bitflags",
2898
+
"bitflags 2.8.0",
2695
2899
"core-foundation 0.9.4",
2696
2900
"core-foundation-sys",
2697
2901
"libc",
···
2704
2908
source = "registry+https://github.com/rust-lang/crates.io-index"
2705
2909
checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316"
2706
2910
dependencies = [
2707
-
"bitflags",
2911
+
"bitflags 2.8.0",
2708
2912
"core-foundation 0.10.0",
2709
2913
"core-foundation-sys",
2710
2914
"libc",
···
2886
3090
]
2887
3091
2888
3092
[[package]]
3093
+
name = "sled"
3094
+
version = "0.34.7"
3095
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3096
+
checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935"
3097
+
dependencies = [
3098
+
"crc32fast",
3099
+
"crossbeam-epoch",
3100
+
"crossbeam-utils",
3101
+
"fs2",
3102
+
"fxhash",
3103
+
"libc",
3104
+
"log",
3105
+
"parking_lot 0.11.2",
3106
+
]
3107
+
3108
+
[[package]]
2889
3109
name = "smallvec"
2890
3110
version = "1.13.2"
2891
3111
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2976
3196
source = "registry+https://github.com/rust-lang/crates.io-index"
2977
3197
checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b"
2978
3198
dependencies = [
2979
-
"bitflags",
3199
+
"bitflags 2.8.0",
2980
3200
"core-foundation 0.9.4",
2981
3201
"system-configuration-sys",
2982
3202
]
···
3090
3310
"bytes",
3091
3311
"libc",
3092
3312
"mio",
3093
-
"parking_lot",
3313
+
"parking_lot 0.12.3",
3094
3314
"pin-project-lite",
3095
3315
"signal-hook-registry",
3096
3316
"socket2",
···
3132
3352
"futures-channel",
3133
3353
"futures-util",
3134
3354
"log",
3135
-
"parking_lot",
3355
+
"parking_lot 0.12.3",
3136
3356
"percent-encoding",
3137
3357
"phf",
3138
3358
"pin-project-lite",
···
3156
3376
]
3157
3377
3158
3378
[[package]]
3379
+
name = "tokio-stream"
3380
+
version = "0.1.17"
3381
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3382
+
checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047"
3383
+
dependencies = [
3384
+
"futures-core",
3385
+
"pin-project-lite",
3386
+
"tokio",
3387
+
]
3388
+
3389
+
[[package]]
3159
3390
name = "tokio-tungstenite"
3160
3391
version = "0.26.1"
3161
3392
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3217
3448
]
3218
3449
3219
3450
[[package]]
3451
+
name = "tonic"
3452
+
version = "0.13.0"
3453
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3454
+
checksum = "85839f0b32fd242bb3209262371d07feda6d780d16ee9d2bc88581b89da1549b"
3455
+
dependencies = [
3456
+
"async-trait",
3457
+
"axum",
3458
+
"base64",
3459
+
"bytes",
3460
+
"h2",
3461
+
"http",
3462
+
"http-body",
3463
+
"http-body-util",
3464
+
"hyper",
3465
+
"hyper-timeout",
3466
+
"hyper-util",
3467
+
"percent-encoding",
3468
+
"pin-project",
3469
+
"prost",
3470
+
"socket2",
3471
+
"tokio",
3472
+
"tokio-stream",
3473
+
"tower",
3474
+
"tower-layer",
3475
+
"tower-service",
3476
+
"tracing",
3477
+
]
3478
+
3479
+
[[package]]
3480
+
name = "tonic-build"
3481
+
version = "0.13.0"
3482
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3483
+
checksum = "d85f0383fadd15609306383a90e85eaed44169f931a5d2be1b42c76ceff1825e"
3484
+
dependencies = [
3485
+
"prettyplease",
3486
+
"proc-macro2",
3487
+
"prost-build",
3488
+
"prost-types",
3489
+
"quote",
3490
+
"syn",
3491
+
]
3492
+
3493
+
[[package]]
3220
3494
name = "tower"
3221
3495
version = "0.5.2"
3222
3496
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3224
3498
dependencies = [
3225
3499
"futures-core",
3226
3500
"futures-util",
3501
+
"indexmap",
3227
3502
"pin-project-lite",
3503
+
"slab",
3228
3504
"sync_wrapper",
3229
3505
"tokio",
3506
+
"tokio-util",
3230
3507
"tower-layer",
3231
3508
"tower-service",
3232
3509
"tracing",
···
3238
3515
source = "registry+https://github.com/rust-lang/crates.io-index"
3239
3516
checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697"
3240
3517
dependencies = [
3241
-
"bitflags",
3518
+
"bitflags 2.8.0",
3242
3519
"bytes",
3243
3520
"http",
3244
3521
"http-body",
···
3592
3869
source = "registry+https://github.com/rust-lang/crates.io-index"
3593
3870
checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d"
3594
3871
dependencies = [
3595
-
"redox_syscall",
3872
+
"redox_syscall 0.5.8",
3596
3873
"wasite",
3597
3874
"web-sys",
3598
3875
]
+1
Cargo.toml
+1
Cargo.toml
+2
consumer/Cargo.toml
+2
consumer/Cargo.toml
···
20
20
metrics = "0.24.1"
21
21
metrics-exporter-prometheus = "0.16.2"
22
22
parakeet-db = { path = "../parakeet-db" }
23
+
parakeet-index = { path = "../parakeet-index" }
23
24
reqwest = { version = "0.12.12", features = ["native-tls"] }
24
25
serde = { version = "1.0.217", features = ["derive"] }
25
26
serde_bytes = "0.11"
···
27
28
serde_json = "1.0.134"
28
29
tokio = { version = "1.42.0", features = ["full"] }
29
30
tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] }
31
+
tokio-stream = "0.1.17"
30
32
tokio-tungstenite = { version = "0.26.1", features = ["native-tls"] }
31
33
tracing = "0.1.40"
32
34
tracing-subscriber = "0.3.18"
+1
consumer/run.sh
+1
consumer/run.sh
···
1
+
cargo run
+68
-51
consumer/src/backfill/mod.rs
+68
-51
consumer/src/backfill/mod.rs
···
1
1
use crate::config::HistoryMode;
2
-
use crate::indexer::types::{BackfillItem, BackfillItemInner, CollectionType, RecordTypes};
2
+
use crate::indexer::types::{AggregateDeltaStore, BackfillItem, BackfillItemInner};
3
3
use crate::indexer::{self, db as indexer_db};
4
4
use did_resolver::Resolver;
5
5
use diesel_async::pooled_connection::deadpool::Pool;
···
9
9
use metrics::counter;
10
10
use parakeet_db::types::{ActorStatus, ActorSyncState};
11
11
use reqwest::{Client, StatusCode};
12
+
use std::collections::HashMap;
12
13
use std::str::FromStr;
13
14
use std::sync::Arc;
14
15
use tracing::{instrument, Instrument};
···
18
19
mod types;
19
20
20
21
const PDS_SERVICE_ID: &str = "#atproto_pds";
22
+
// There's a 4MiB limit on parakeet-index, so break delta batches up if there's loads.
23
+
// this should be plenty low enough to not trigger the size limit. (59k did slightly)
24
+
const DELTA_BATCH_SIZE: usize = 32 * 1024;
21
25
22
26
#[derive(Clone)]
23
27
pub struct BackfillManagerInner {
24
28
pool: Pool<AsyncPgConnection>,
25
29
resolver: Arc<Resolver>,
26
30
client: Client,
31
+
index_client: parakeet_index::Client,
27
32
}
28
33
29
34
pub struct BackfillManager {
···
37
42
pool: Pool<AsyncPgConnection>,
38
43
history_mode: HistoryMode,
39
44
resolver: Arc<Resolver>,
45
+
index_client: parakeet_index::Client,
40
46
) -> eyre::Result<(Self, Sender<String>)> {
41
47
let client = Client::new();
42
48
···
47
53
pool,
48
54
resolver,
49
55
client,
56
+
index_client,
50
57
},
51
58
rx,
52
59
do_backfill: history_mode == HistoryMode::BackfillHistory,
···
61
68
if self.do_backfill {
62
69
for idx in 0..threads {
63
70
let rx = self.rx.clone();
64
-
let inner = self.inner.clone();
71
+
let mut inner = self.inner.clone();
65
72
66
73
js.spawn(
67
74
async move {
68
75
while let Ok(did) = rx.recv_async().await {
69
76
tracing::trace!("backfilling {did}");
70
-
if let Err(e) = backfill_actor(&inner, &did).await {
77
+
if let Err(e) = backfill_actor(&mut inner, &did).await {
71
78
tracing::error!(did, "backfill failed: {e}");
72
79
counter!("backfill_failure").increment(1);
73
80
} else {
···
90
97
}
91
98
92
99
#[instrument(skip(inner))]
93
-
async fn backfill_actor(inner: &BackfillManagerInner, did: &str) -> eyre::Result<()> {
100
+
async fn backfill_actor(inner: &mut BackfillManagerInner, did: &str) -> eyre::Result<()> {
94
101
let mut conn = inner.pool.get().await?;
95
102
96
103
let (status, sync_state) = db::get_actor_status(&mut conn, did).await?;
···
161
168
162
169
tracing::trace!("repo pulled - inserting");
163
170
164
-
conn.transaction::<(), diesel::result::Error, _>(|t| {
165
-
Box::pin(async move {
166
-
db::defer(t).await?;
171
+
let delta_store = conn
172
+
.transaction::<_, diesel::result::Error, _>(|t| {
173
+
Box::pin(async move {
174
+
let mut delta_store = HashMap::new();
167
175
168
-
indexer_db::update_repo_version(t, did, &rev, cid).await?;
176
+
db::defer(t).await?;
169
177
170
-
let mut follow_stats = vec![did.to_string()];
178
+
indexer_db::update_repo_version(t, did, &rev, cid).await?;
171
179
172
-
for (path, (cid, record)) in records {
173
-
let Some((collection, rkey)) = path.split_once("/") else {
174
-
tracing::warn!("record contained invalid path {}", path);
175
-
return Err(diesel::result::Error::RollbackTransaction);
176
-
};
180
+
// let mut follow_stats = vec![did.to_string()];
177
181
178
-
counter!("backfilled_commits", "collection" => collection.to_string()).increment(1);
182
+
for (path, (cid, record)) in records {
183
+
let Some((collection, rkey)) = path.split_once("/") else {
184
+
tracing::warn!("record contained invalid path {}", path);
185
+
return Err(diesel::result::Error::RollbackTransaction);
186
+
};
179
187
180
-
let full_path = format!("at://{did}/{path}");
188
+
counter!("backfilled_commits", "collection" => collection.to_string())
189
+
.increment(1);
181
190
182
-
match record {
183
-
RecordTypes::AppBskyGraphFollow(record) => {
184
-
follow_stats.push(record.subject.clone());
185
-
indexer_db::insert_follow(t, did, &full_path, record).await?;
186
-
}
187
-
_ => indexer::index_op(t, did, cid, record, &full_path, rkey).await?,
191
+
let full_path = format!("at://{did}/{path}");
192
+
193
+
indexer::index_op(t, &mut delta_store, did, cid, record, &full_path, rkey)
194
+
.await?
188
195
}
189
-
}
190
196
191
-
db::update_repo_sync_state(t, did, ActorSyncState::Synced).await?;
197
+
db::update_repo_sync_state(t, did, ActorSyncState::Synced).await?;
192
198
193
-
handle_backfill_rows(t, &mut follow_stats, did, &rev).await?;
199
+
handle_backfill_rows(t, &mut delta_store, did, &rev).await?;
200
+
tracing::trace!("insertion finished");
194
201
195
-
// on second thought, should this be done after the transaction?
196
-
// if we're loading a chunky repo, we might be a few seconds+ out of date?
197
-
indexer_db::update_follow_stats(t, &follow_stats).await?;
202
+
Ok(delta_store)
203
+
})
204
+
})
205
+
.await?;
198
206
199
-
tracing::trace!("insertion finished");
200
-
Ok(())
207
+
// submit the deltas
208
+
let delta_store = delta_store
209
+
.into_iter()
210
+
.map(|((uri, typ), delta)| parakeet_index::AggregateDeltaReq {
211
+
typ,
212
+
uri: uri.to_string(),
213
+
delta,
201
214
})
202
-
})
203
-
.await?;
215
+
.collect::<Vec<_>>();
216
+
217
+
let mut read = 0;
218
+
219
+
while read < delta_store.len() {
220
+
let rem = delta_store.len() - read;
221
+
let take = DELTA_BATCH_SIZE.min(rem);
222
+
223
+
tracing::debug!("reading & submitting {take} deltas");
224
+
225
+
let deltas = delta_store[read..read + take].to_vec();
226
+
inner
227
+
.index_client
228
+
.submit_aggregate_delta_batch(parakeet_index::AggregateDeltaBatchReq { deltas })
229
+
.await?;
230
+
231
+
read += take;
232
+
tracing::debug!("read {read} of {} deltas", delta_store.len());
233
+
}
204
234
205
235
Ok(())
206
236
}
207
237
208
238
async fn handle_backfill_rows(
209
239
conn: &mut AsyncPgConnection,
210
-
follow_stats: &mut Vec<String>,
240
+
deltas: &mut impl AggregateDeltaStore,
211
241
repo: &str,
212
242
rev: &str,
213
243
) -> diesel::QueryResult<()> {
···
233
263
continue;
234
264
};
235
265
236
-
match record {
237
-
RecordTypes::AppBskyGraphFollow(follow) => {
238
-
follow_stats.push(follow.subject.clone());
239
-
indexer_db::insert_follow(conn, repo, &item.at_uri, follow).await?;
240
-
}
241
-
_ => indexer::index_op(conn, repo, cid, record, &item.at_uri, rkey).await?,
242
-
}
266
+
indexer::index_op(conn, deltas, repo, cid, record, &item.at_uri, rkey).await?
267
+
}
268
+
BackfillItemInner::Delete => {
269
+
indexer::index_op_delete(conn, deltas, repo, item.collection, &item.at_uri)
270
+
.await?
243
271
}
244
-
BackfillItemInner::Delete => match item.collection {
245
-
CollectionType::BskyFollow => {
246
-
if let Some(subject) = indexer_db::delete_follow(conn, &item.at_uri).await?
247
-
{
248
-
follow_stats.push(subject);
249
-
}
250
-
}
251
-
_ => {
252
-
indexer::index_op_delete(conn, repo, item.collection, &item.at_uri).await?
253
-
}
254
-
},
255
272
}
256
273
}
257
274
}
+1
consumer/src/config.rs
+1
consumer/src/config.rs
+33
-4
consumer/src/indexer/db.rs
+33
-4
consumer/src/indexer/db.rs
···
522
522
.await
523
523
}
524
524
525
+
pub async fn get_post_info_for_delete(
526
+
conn: &mut AsyncPgConnection,
527
+
at_uri: &str,
528
+
) -> QueryResult<Option<(Option<String>, Option<String>)>> {
529
+
schema::posts::table
530
+
.left_join(
531
+
schema::post_embed_record::table
532
+
.on(schema::posts::at_uri.eq(schema::post_embed_record::post_uri)),
533
+
)
534
+
.select((
535
+
schema::posts::parent_uri,
536
+
schema::post_embed_record::uri.nullable(),
537
+
))
538
+
.filter(schema::posts::at_uri.eq(at_uri))
539
+
.get_result(conn)
540
+
.await
541
+
.optional()
542
+
}
543
+
525
544
pub async fn upsert_postgate(
526
545
conn: &mut AsyncPgConnection,
527
546
at_uri: &str,
···
642
661
.await
643
662
}
644
663
645
-
pub async fn delete_like(conn: &mut AsyncPgConnection, at_uri: &str) -> QueryResult<usize> {
664
+
pub async fn delete_like(
665
+
conn: &mut AsyncPgConnection,
666
+
at_uri: &str,
667
+
) -> QueryResult<Option<String>> {
646
668
diesel::delete(schema::likes::table)
647
669
.filter(schema::likes::at_uri.eq(at_uri))
648
-
.execute(conn)
670
+
.returning(schema::likes::subject)
671
+
.get_result(conn)
649
672
.await
673
+
.optional()
650
674
}
651
675
652
676
pub async fn insert_repost(
···
669
693
.await
670
694
}
671
695
672
-
pub async fn delete_repost(conn: &mut AsyncPgConnection, at_uri: &str) -> QueryResult<usize> {
696
+
pub async fn delete_repost(
697
+
conn: &mut AsyncPgConnection,
698
+
at_uri: &str,
699
+
) -> QueryResult<Option<String>> {
673
700
diesel::delete(schema::reposts::table)
674
701
.filter(schema::reposts::at_uri.eq(at_uri))
675
-
.execute(conn)
702
+
.returning(schema::reposts::post)
703
+
.get_result(conn)
676
704
.await
705
+
.optional()
677
706
}
678
707
679
708
pub async fn upsert_chat_decl(
+114
-21
consumer/src/indexer/mod.rs
+114
-21
consumer/src/indexer/mod.rs
···
1
1
use crate::config::HistoryMode;
2
2
use crate::firehose::{AtpAccountEvent, AtpCommitEvent, AtpIdentityEvent, CommitOp, FirehoseEvent};
3
-
use crate::indexer::types::{BackfillItem, BackfillItemInner, CollectionType, RecordTypes};
3
+
use crate::indexer::types::{
4
+
AggregateDeltaStore, BackfillItem, BackfillItemInner, CollectionType, RecordTypes,
5
+
};
4
6
use did_resolver::Resolver;
5
7
use diesel_async::pooled_connection::deadpool::Pool;
6
8
use diesel_async::{AsyncConnection, AsyncPgConnection};
···
9
11
use ipld_core::cid::Cid;
10
12
use metrics::counter;
11
13
use parakeet_db::types::{ActorStatus, ActorSyncState};
14
+
use parakeet_index::AggregateType;
12
15
use std::collections::HashMap;
13
16
use std::hash::BuildHasher;
14
17
use std::sync::Arc;
···
22
25
#[derive(Clone)]
23
26
struct RelayIndexerState {
24
27
backfill_tx: flume::Sender<String>,
28
+
idxc_tx: Sender<parakeet_index::AggregateDeltaReq>,
25
29
resolver: Arc<Resolver>,
26
30
do_backfill: bool,
27
31
}
···
37
41
pub async fn new(
38
42
pool: Pool<AsyncPgConnection>,
39
43
backfill_tx: flume::Sender<String>,
44
+
idxc_tx: Sender<parakeet_index::AggregateDeltaReq>,
40
45
resolver: Arc<Resolver>,
41
46
history_mode: HistoryMode,
42
47
) -> eyre::Result<(Self, Sender<FirehoseEvent>)> {
···
48
53
backfill_tx,
49
54
resolver,
50
55
do_backfill: history_mode == HistoryMode::BackfillHistory,
56
+
idxc_tx,
51
57
},
52
58
rx,
53
59
hasher: RandomState::default(),
···
60
66
let (submit, _handles) = (0..threads)
61
67
.map(|idx| {
62
68
let pool = self.pool.clone();
63
-
let state = self.state.clone();
69
+
let mut state = self.state.clone();
64
70
let (tx, mut rx) = channel(16);
65
71
66
72
let handle = tokio::spawn(async move {
···
76
82
index_account(&state, &mut conn, account).await
77
83
}
78
84
FirehoseEvent::Commit(commit) => {
79
-
index_commit(&state, &mut conn, commit).await
85
+
index_commit(&mut state, &mut conn, commit).await
80
86
}
81
87
FirehoseEvent::Label(_) => unreachable!(),
82
88
};
···
186
192
187
193
#[instrument(skip_all, fields(seq = commit.seq, repo = commit.repo, rev = commit.rev))]
188
194
async fn index_commit(
189
-
state: &RelayIndexerState,
195
+
state: &mut RelayIndexerState,
190
196
conn: &mut AsyncPgConnection,
191
197
commit: AtpCommitEvent,
192
198
) -> eyre::Result<()> {
···
262
268
}
263
269
264
270
for op in &commit.ops {
265
-
process_op(t, &commit.repo, op, &blocks).await?;
271
+
process_op(t, &mut state.idxc_tx, &commit.repo, op, &blocks).await?;
266
272
}
267
273
} else {
268
274
let items = commit
···
333
339
#[inline(always)]
334
340
async fn process_op(
335
341
conn: &mut AsyncPgConnection,
342
+
deltas: &mut impl AggregateDeltaStore,
336
343
repo: &str,
337
344
op: &CommitOp,
338
345
blocks: &HashMap<Cid, Vec<u8>>,
···
361
368
return Ok(());
362
369
};
363
370
364
-
index_op(conn, repo, cid, decoded, &full_path, rkey).await?;
371
+
index_op(conn, deltas, repo, cid, decoded, &full_path, rkey).await?;
365
372
} else if op.action == "delete" {
366
-
index_op_delete(conn, repo, collection, &full_path).await?;
373
+
index_op_delete(conn, deltas, repo, collection, &full_path).await?;
367
374
} else {
368
375
tracing::warn!("op contained invalid action {}", op.action);
369
376
}
···
388
395
389
396
pub async fn index_op(
390
397
conn: &mut AsyncPgConnection,
398
+
deltas: &mut impl AggregateDeltaStore,
391
399
repo: &str,
392
400
cid: Cid,
393
401
record: RecordTypes,
···
407
415
}
408
416
RecordTypes::AppBskyFeedGenerator(record) => {
409
417
let labels = record.labels.clone();
410
-
db::upsert_feedgen(conn, repo, cid, at_uri, record).await?;
418
+
let count = db::upsert_feedgen(conn, repo, cid, at_uri, record).await?;
411
419
412
420
if let Some(labels) = labels {
413
421
db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?;
414
422
}
423
+
424
+
deltas
425
+
.add_delta(repo, AggregateType::ProfileFeed, count as i32)
426
+
.await;
415
427
}
416
428
RecordTypes::AppBskyFeedLike(record) => {
417
-
db::insert_like(conn, repo, at_uri, record).await?;
429
+
let subject = record.subject.uri.clone();
430
+
let count = db::insert_like(conn, repo, at_uri, record).await?;
431
+
432
+
deltas
433
+
.add_delta(&subject, AggregateType::Like, count as i32)
434
+
.await;
418
435
}
419
436
RecordTypes::AppBskyFeedPost(record) => {
420
437
if let Some(records::AppBskyEmbed::RecordWithMedia(embed)) = &record.embed {
···
423
440
}
424
441
}
425
442
443
+
let maybe_reply = record.reply.as_ref().map(|v| v.parent.uri.clone());
444
+
let maybe_embed = record.embed.as_ref().and_then(|v| match v {
445
+
records::AppBskyEmbed::Record(r) => Some(r.record.uri.clone()),
446
+
records::AppBskyEmbed::RecordWithMedia(r) => Some(r.record.record.uri.clone()),
447
+
_ => None,
448
+
});
449
+
426
450
let labels = record.labels.clone();
427
451
db::insert_post(conn, repo, cid, at_uri, record).await?;
428
452
if let Some(labels) = labels {
429
453
db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?;
430
454
}
455
+
456
+
deltas.incr(repo, AggregateType::ProfilePost).await;
457
+
if let Some(reply) = maybe_reply {
458
+
deltas.incr(&reply, AggregateType::Reply).await;
459
+
}
460
+
if let Some(embed) = maybe_embed {
461
+
deltas.incr(&embed, AggregateType::Embed).await;
462
+
}
431
463
}
432
464
RecordTypes::AppBskyFeedPostgate(record) => {
433
465
let split_aturi = record.post.rsplitn(4, '/').collect::<Vec<_>>();
···
452
484
.await?;
453
485
}
454
486
RecordTypes::AppBskyFeedRepost(record) => {
487
+
deltas
488
+
.incr(&record.subject.uri, AggregateType::Repost)
489
+
.await;
455
490
db::insert_repost(conn, repo, at_uri, record).await?;
456
491
}
457
492
RecordTypes::AppBskyFeedThreadgate(record) => {
···
467
502
db::insert_block(conn, repo, at_uri, record).await?;
468
503
}
469
504
RecordTypes::AppBskyGraphFollow(record) => {
470
-
db::insert_follow(conn, repo, at_uri, record).await?;
505
+
let subject = record.subject.clone();
506
+
let count = db::insert_follow(conn, repo, at_uri, record).await?;
507
+
508
+
deltas
509
+
.add_delta(repo, AggregateType::Follow, count as i32)
510
+
.await;
511
+
deltas
512
+
.add_delta(&subject, AggregateType::Follower, count as i32)
513
+
.await;
471
514
}
472
515
RecordTypes::AppBskyGraphList(record) => {
473
516
let labels = record.labels.clone();
474
-
db::upsert_list(conn, repo, at_uri, cid, record).await?;
517
+
let count = db::upsert_list(conn, repo, at_uri, cid, record).await?;
475
518
476
519
if let Some(labels) = labels {
477
520
db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?;
478
521
}
479
522
480
-
// todo: when we have profile stats, update them.
523
+
deltas
524
+
.add_delta(repo, AggregateType::ProfileList, count as i32)
525
+
.await;
481
526
}
482
527
RecordTypes::AppBskyGraphListBlock(record) => {
483
528
db::insert_list_block(conn, repo, at_uri, record).await?;
···
493
538
db::insert_list_item(conn, at_uri, record).await?;
494
539
}
495
540
RecordTypes::AppBskyGraphStarterPack(record) => {
496
-
db::upsert_starterpack(conn, repo, cid, at_uri, record).await?;
541
+
let count = db::upsert_starterpack(conn, repo, cid, at_uri, record).await?;
542
+
deltas
543
+
.add_delta(repo, AggregateType::ProfileStarterpack, count as i32)
544
+
.await;
497
545
}
498
546
RecordTypes::AppBskyGraphVerification(record) => {
499
547
db::upsert_verification(conn, repo, cid, at_uri, record).await?;
···
520
568
521
569
pub async fn index_op_delete(
522
570
conn: &mut AsyncPgConnection,
571
+
deltas: &mut impl AggregateDeltaStore,
523
572
repo: &str,
524
573
collection: CollectionType,
525
574
at_uri: &str,
···
527
576
match collection {
528
577
CollectionType::BskyProfile => db::delete_profile(conn, repo).await?,
529
578
CollectionType::BskyBlock => db::delete_block(conn, at_uri).await?,
530
-
CollectionType::BskyFeedGen => db::delete_feedgen(conn, at_uri).await?,
531
-
CollectionType::BskyFeedLike => db::delete_like(conn, at_uri).await?,
532
-
CollectionType::BskyFeedPost => db::delete_post(conn, at_uri).await?,
579
+
CollectionType::BskyFeedGen => {
580
+
let count = db::delete_feedgen(conn, at_uri).await?;
581
+
deltas
582
+
.add_delta(repo, AggregateType::ProfileFeed, -(count as i32))
583
+
.await;
584
+
count
585
+
}
586
+
CollectionType::BskyFeedLike => {
587
+
if let Some(subject) = db::delete_like(conn, at_uri).await? {
588
+
deltas.decr(&subject, AggregateType::Like).await;
589
+
}
590
+
0
591
+
}
592
+
CollectionType::BskyFeedPost => {
593
+
let post_info = db::get_post_info_for_delete(conn, at_uri).await?;
594
+
595
+
db::delete_post(conn, at_uri).await?;
596
+
597
+
if let Some((reply_to, embed)) = post_info {
598
+
deltas.decr(repo, AggregateType::ProfilePost).await;
599
+
if let Some(reply_to) = reply_to {
600
+
deltas.decr(&reply_to, AggregateType::Reply).await;
601
+
}
602
+
if let Some(embed) = embed {
603
+
deltas.decr(&embed, AggregateType::Embed).await;
604
+
}
605
+
}
606
+
607
+
0
608
+
}
533
609
CollectionType::BskyFeedPostgate => db::delete_postgate(conn, at_uri).await?,
534
-
CollectionType::BskyFeedRepost => db::delete_repost(conn, at_uri).await?,
610
+
CollectionType::BskyFeedRepost => {
611
+
if let Some(subject) = db::delete_repost(conn, at_uri).await? {
612
+
deltas.decr(&subject, AggregateType::Repost).await;
613
+
}
614
+
0
615
+
}
535
616
CollectionType::BskyFeedThreadgate => db::delete_threadgate(conn, at_uri).await?,
536
617
CollectionType::BskyFollow => {
537
-
db::delete_follow(conn, at_uri).await?;
618
+
if let Some(followee) = db::delete_follow(conn, at_uri).await? {
619
+
deltas.decr(&followee, AggregateType::Follower).await;
620
+
deltas.decr(repo, AggregateType::Follow).await;
621
+
}
538
622
0
539
623
}
540
624
CollectionType::BskyList => {
541
-
db::delete_list(conn, at_uri).await?
542
-
// todo: when we have profile stats, update them.
625
+
let count = db::delete_list(conn, at_uri).await?;
626
+
deltas
627
+
.add_delta(repo, AggregateType::ProfileList, -(count as i32))
628
+
.await;
629
+
count
543
630
}
544
631
CollectionType::BskyListBlock => db::delete_list_block(conn, at_uri).await?,
545
632
CollectionType::BskyListItem => db::delete_list_item(conn, at_uri).await?,
546
-
CollectionType::BskyStarterPack => db::delete_starterpack(conn, at_uri).await?,
633
+
CollectionType::BskyStarterPack => {
634
+
let count = db::delete_starterpack(conn, at_uri).await?;
635
+
deltas
636
+
.add_delta(repo, AggregateType::ProfileStarterpack, -(count as i32))
637
+
.await;
638
+
count
639
+
}
547
640
CollectionType::BskyVerification => db::delete_verification(conn, at_uri).await?,
548
641
CollectionType::BskyLabelerService => db::delete_label_service(conn, at_uri).await?,
549
642
CollectionType::ChatActorDecl => db::delete_chat_decl(conn, at_uri).await?,
+33
consumer/src/indexer/types.rs
+33
consumer/src/indexer/types.rs
···
121
121
Update(RecordTypes),
122
122
Delete,
123
123
}
124
+
125
+
pub trait AggregateDeltaStore {
126
+
async fn add_delta(&mut self, uri: &str, typ: parakeet_index::AggregateType, delta: i32);
127
+
async fn incr(&mut self, uri: &str, typ: parakeet_index::AggregateType) {
128
+
self.add_delta(uri, typ, 1).await
129
+
}
130
+
async fn decr(&mut self, uri: &str, typ: parakeet_index::AggregateType) {
131
+
self.add_delta(uri, typ, -1).await
132
+
}
133
+
}
134
+
135
+
impl AggregateDeltaStore for tokio::sync::mpsc::Sender<parakeet_index::AggregateDeltaReq> {
136
+
async fn add_delta(&mut self, uri: &str, typ: parakeet_index::AggregateType, delta: i32) {
137
+
let res = self
138
+
.send(parakeet_index::AggregateDeltaReq {
139
+
typ: typ.into(),
140
+
uri: uri.to_string(),
141
+
delta,
142
+
})
143
+
.await;
144
+
145
+
if let Err(e) = res {
146
+
tracing::error!("failed to send aggregate delta: {e}");
147
+
}
148
+
}
149
+
}
150
+
151
+
impl AggregateDeltaStore for std::collections::HashMap<(String, i32), i32> {
152
+
async fn add_delta(&mut self, uri: &str, typ: parakeet_index::AggregateType, delta: i32) {
153
+
let key = (uri.to_string(), typ.into());
154
+
self.entry(key).and_modify(|v| *v += delta).or_insert(delta);
155
+
}
156
+
}
+16
-1
consumer/src/main.rs
+16
-1
consumer/src/main.rs
···
31
31
..Default::default()
32
32
})?);
33
33
34
+
let index_client = parakeet_index::Client::connect(conf.index_uri).await?;
35
+
34
36
let (label_mgr, label_svc_tx) = label_indexer::LabelServiceManager::new(
35
37
&conf.database_url,
36
38
resolver.clone(),
···
42
44
let (backfiller, backfill_tx) =
43
45
backfill::BackfillManager::new(pool.clone(), conf.history_mode, resolver.clone()).await?;
44
46
47
+
let (idxc_tx, idxc_rx) = tokio::sync::mpsc::channel(128);
48
+
45
49
let (relay_indexer, tx) = indexer::RelayIndexer::new(
46
50
pool.clone(),
47
51
backfill_tx,
52
+
idxc_tx,
48
53
resolver.clone(),
49
54
conf.history_mode,
50
55
)
51
56
.await?;
52
57
53
-
let (firehose_res, indexer_res, backfill_res, label_res) = tokio::try_join! {
58
+
let (firehose_res, indexer_res, backfill_res, label_res, idxt_res) = tokio::try_join! {
54
59
tokio::spawn(relay_consumer(relay_firehose, tx)),
55
60
tokio::spawn(relay_indexer.run(conf.indexer_workers)),
56
61
tokio::spawn(backfiller.run(conf.backfill_workers)),
57
62
tokio::spawn(label_mgr.run(conf.initial_label_services)),
63
+
tokio::spawn(index_transport(index_client, idxc_rx)),
58
64
}?;
59
65
60
66
firehose_res
61
67
.and(indexer_res)
62
68
.and(backfill_res)
63
69
.and(label_res)
70
+
.and(idxt_res)
64
71
}
65
72
66
73
async fn relay_consumer(
···
83
90
}
84
91
}
85
92
}
93
+
94
+
Ok(())
95
+
}
96
+
97
+
async fn index_transport(mut idxc: parakeet_index::Client, rx: tokio::sync::mpsc::Receiver<parakeet_index::AggregateDeltaReq>) -> eyre::Result<()> {
98
+
use tokio_stream::wrappers::ReceiverStream;
99
+
100
+
idxc.submit_aggregate_delta_stream(ReceiverStream::new(rx)).await?;
86
101
87
102
Ok(())
88
103
}
+27
parakeet-index/Cargo.toml
+27
parakeet-index/Cargo.toml
···
1
+
[package]
2
+
name = "parakeet-index"
3
+
version = "0.1.0"
4
+
edition = "2024"
5
+
6
+
[[bin]]
7
+
name = "parakeet-index"
8
+
required-features = ["server"]
9
+
10
+
[dependencies]
11
+
tonic = "0.13.0"
12
+
prost = "0.13.5"
13
+
14
+
eyre = { version = "0.6.12", optional = true }
15
+
figment = { version = "0.10.19", features = ["env", "toml"], optional = true }
16
+
itertools = { version = "0.14.0", optional = true }
17
+
serde = { version = "1.0.217", features = ["derive"], optional = true }
18
+
sled = { version = "0.34.7", optional = true }
19
+
tokio = { version = "1.42.0", features = ["full"], optional = true }
20
+
tracing = { version = "0.1.40", optional = true }
21
+
tracing-subscriber = { version = "0.3.18", optional = true }
22
+
23
+
[build-dependencies]
24
+
tonic-build = "0.13.0"
25
+
26
+
[features]
27
+
server = ["dep:eyre", "dep:figment", "dep:itertools", "dep:serde", "dep:sled", "dep:tokio", "dep:tracing", "dep:tracing-subscriber"]
+5
parakeet-index/build.rs
+5
parakeet-index/build.rs
+96
parakeet-index/proto/parakeet.proto
+96
parakeet-index/proto/parakeet.proto
···
1
+
syntax = "proto3";
2
+
package parakeet;
3
+
4
+
service Index {
5
+
rpc SubmitAggregateDelta(AggregateDeltaReq) returns (AggregateDeltaRes);
6
+
rpc SubmitAggregateDeltaBatch(AggregateDeltaBatchReq) returns (AggregateDeltaRes);
7
+
rpc SubmitAggregateDeltaStream(stream AggregateDeltaReq) returns (AggregateDeltaRes);
8
+
9
+
rpc GetProfileStats(GetStatsReq) returns (GetProfileStatsRes);
10
+
rpc GetProfileStatsMany(GetStatsManyReq) returns (GetProfileStatsManyRes);
11
+
rpc GetPostStats(GetStatsReq) returns (GetPostStatsRes);
12
+
rpc GetPostStatsMany(GetStatsManyReq) returns (GetPostStatsManyRes);
13
+
rpc GetLikeCount(GetStatsReq) returns (GetLikeCountRes);
14
+
rpc GetLikeCountMany(GetStatsManyReq) returns (GetLikeCountManyRes);
15
+
}
16
+
17
+
enum AggregateType {
18
+
UNKNOWN = 0;
19
+
FOLLOW = 1;
20
+
FOLLOWER = 2;
21
+
LIKE = 3;
22
+
REPLY = 4;
23
+
REPOST = 5;
24
+
// aka Quotes (in the context of posts)
25
+
EMBED = 6;
26
+
PROFILE_POST = 7;
27
+
PROFILE_LIST = 8;
28
+
PROFILE_FEED = 9;
29
+
PROFILE_STARTERPACK = 10;
30
+
}
31
+
32
+
message AggregateDeltaReq {
33
+
// The type of aggregate to change
34
+
AggregateType typ = 1;
35
+
// The entry to change. Can be a full at:// uri for items or a did for actors/profiles
36
+
string uri = 2;
37
+
sint32 delta = 3;
38
+
}
39
+
40
+
message AggregateDeltaBatchReq {
41
+
repeated AggregateDeltaReq deltas = 1;
42
+
}
43
+
44
+
message AggregateDeltaRes {}
45
+
46
+
message GetStatsReq {
47
+
string uri = 1;
48
+
}
49
+
50
+
message GetStatsManyReq {
51
+
repeated string uris = 1;
52
+
}
53
+
54
+
message ProfileStats {
55
+
int32 followers = 1;
56
+
int32 following = 2;
57
+
int32 posts = 3;
58
+
int32 lists = 4;
59
+
int32 feeds = 5;
60
+
int32 starterpacks = 6;
61
+
}
62
+
63
+
message GetProfileStatsRes {
64
+
optional ProfileStats stats = 1;
65
+
}
66
+
67
+
message GetProfileStatsManyRes {
68
+
map<string, ProfileStats> entries = 1;
69
+
}
70
+
71
+
message PostStats {
72
+
int32 replies = 1;
73
+
int32 likes = 2;
74
+
int32 reposts = 3;
75
+
int32 quotes = 4;
76
+
}
77
+
78
+
message GetPostStatsRes {
79
+
optional PostStats stats = 1;
80
+
}
81
+
82
+
message GetPostStatsManyRes {
83
+
map<string, PostStats> entries = 1;
84
+
}
85
+
86
+
message LikeCount {
87
+
int32 likes = 1;
88
+
}
89
+
90
+
message GetLikeCountRes {
91
+
optional LikeCount likes = 1;
92
+
}
93
+
94
+
message GetLikeCountManyRes {
95
+
map<string, LikeCount> entries = 1;
96
+
}
+1
parakeet-index/run.sh
+1
parakeet-index/run.sh
···
1
+
cargo run --features server
+10
parakeet-index/src/lib.rs
+10
parakeet-index/src/lib.rs
+24
parakeet-index/src/main.rs
+24
parakeet-index/src/main.rs
···
1
+
use parakeet_index::index_server::IndexServer;
2
+
use parakeet_index::server::service::Service;
3
+
use parakeet_index::server::{GlobalState, config};
4
+
use std::sync::Arc;
5
+
use tonic::transport::Server;
6
+
7
+
#[tokio::main]
8
+
async fn main() -> eyre::Result<()> {
9
+
tracing_subscriber::fmt::init();
10
+
11
+
let conf = config::load_config()?;
12
+
13
+
let db_root = conf.index_db_path.parse()?;
14
+
let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port);
15
+
let state = Arc::new(GlobalState::new(db_root)?);
16
+
17
+
let service = Service::new(state.clone());
18
+
Server::builder()
19
+
.add_service(IndexServer::new(service))
20
+
.serve(addr)
21
+
.await?;
22
+
23
+
Ok(())
24
+
}
+45
parakeet-index/src/server/config.rs
+45
parakeet-index/src/server/config.rs
···
1
+
use figment::Figment;
2
+
use figment::providers::{Env, Format, Toml};
3
+
use serde::Deserialize;
4
+
5
+
pub fn load_config() -> eyre::Result<Config> {
6
+
let conf = Figment::new()
7
+
.merge(Toml::file("Config.toml"))
8
+
.merge(Env::prefixed("PKI_"))
9
+
.extract()?;
10
+
11
+
Ok(conf)
12
+
}
13
+
14
+
#[derive(Debug, Deserialize)]
15
+
pub struct Config {
16
+
pub database_url: String,
17
+
pub index_db_path: String,
18
+
#[serde(default)]
19
+
pub server: ConfigServer,
20
+
}
21
+
22
+
#[derive(Debug, Deserialize)]
23
+
pub struct ConfigServer {
24
+
#[serde(default = "default_bind_address")]
25
+
pub bind_address: String,
26
+
#[serde(default = "default_port")]
27
+
pub port: u16,
28
+
}
29
+
30
+
impl Default for ConfigServer {
31
+
fn default() -> Self {
32
+
ConfigServer {
33
+
bind_address: default_bind_address(),
34
+
port: default_port(),
35
+
}
36
+
}
37
+
}
38
+
39
+
fn default_bind_address() -> String {
40
+
"0.0.0.0".to_string()
41
+
}
42
+
43
+
fn default_port() -> u16 {
44
+
6001
45
+
}
+103
parakeet-index/src/server/db.rs
+103
parakeet-index/src/server/db.rs
···
1
+
use crate::all_none;
2
+
use crate::server::utils::{ToIntExt, TreeExt, slice_as_i32};
3
+
use sled::{Db, MergeOperator, Tree};
4
+
use std::path::PathBuf;
5
+
6
+
pub struct DbStore {
7
+
pub agg_db: Db,
8
+
pub label_db: Db,
9
+
10
+
pub follows: Tree,
11
+
pub followers: Tree,
12
+
pub likes: Tree,
13
+
pub replies: Tree,
14
+
pub reposts: Tree,
15
+
pub embeds: Tree,
16
+
pub profile_posts: Tree,
17
+
pub profile_lists: Tree,
18
+
pub profile_feeds: Tree,
19
+
pub profile_starterpacks: Tree,
20
+
}
21
+
22
+
impl DbStore {
23
+
pub fn new(db_root: PathBuf) -> eyre::Result<Self> {
24
+
let agg_db = sled::open(db_root.join("aggdb"))?;
25
+
let label_db = sled::open(db_root.join("labeldb"))?;
26
+
27
+
Ok(DbStore {
28
+
follows: open_tree(&agg_db, "follows", merge_delta)?,
29
+
followers: open_tree(&agg_db, "followers", merge_delta)?,
30
+
likes: open_tree(&agg_db, "likes", merge_delta)?,
31
+
replies: open_tree(&agg_db, "replies", merge_delta)?,
32
+
reposts: open_tree(&agg_db, "reposts", merge_delta)?,
33
+
embeds: open_tree(&agg_db, "embeds", merge_delta)?,
34
+
profile_posts: open_tree(&agg_db, "profile_posts", merge_delta)?,
35
+
profile_lists: open_tree(&agg_db, "profile_lists", merge_delta)?,
36
+
profile_feeds: open_tree(&agg_db, "profile_feeds", merge_delta)?,
37
+
profile_starterpacks: open_tree(&agg_db, "profile_starterpacks", merge_delta)?,
38
+
39
+
agg_db,
40
+
label_db,
41
+
})
42
+
}
43
+
44
+
pub fn get_post_stats(&self, post: &str) -> Option<crate::PostStats> {
45
+
let replies = self.replies.get_i32(post);
46
+
let likes = self.likes.get_i32(post);
47
+
let reposts = self.reposts.get_i32(post);
48
+
let quotes = self.embeds.get_i32(post);
49
+
50
+
if all_none![replies, likes, reposts, quotes] {
51
+
return None;
52
+
}
53
+
54
+
Some(crate::PostStats {
55
+
replies: replies.unwrap_or_default(),
56
+
likes: likes.unwrap_or_default(),
57
+
reposts: reposts.unwrap_or_default(),
58
+
quotes: quotes.unwrap_or_default(),
59
+
})
60
+
}
61
+
62
+
pub fn get_profile_stats(&self, did: &str) -> Option<crate::ProfileStats> {
63
+
let followers = self.followers.get_i32(did);
64
+
let following = self.follows.get_i32(did);
65
+
let posts = self.profile_posts.get_i32(did);
66
+
let lists = self.profile_lists.get_i32(did);
67
+
let feeds = self.profile_feeds.get_i32(did);
68
+
let starterpacks = self.profile_starterpacks.get_i32(did);
69
+
70
+
if all_none![followers, following, posts, lists, feeds, starterpacks] {
71
+
return None;
72
+
}
73
+
74
+
Some(crate::ProfileStats {
75
+
followers: followers.unwrap_or_default(),
76
+
following: following.unwrap_or_default(),
77
+
posts: posts.unwrap_or_default(),
78
+
lists: lists.unwrap_or_default(),
79
+
feeds: feeds.unwrap_or_default(),
80
+
starterpacks: starterpacks.unwrap_or_default(),
81
+
})
82
+
}
83
+
}
84
+
85
+
fn open_tree(db: &Db, name: &str, merge: impl MergeOperator + 'static) -> eyre::Result<Tree> {
86
+
let tree = db.open_tree(name)?;
87
+
88
+
tree.set_merge_operator(merge);
89
+
90
+
Ok(tree)
91
+
}
92
+
93
+
fn merge_delta(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
94
+
let old = old.and_then(slice_as_i32);
95
+
let new = slice_as_i32(new)?;
96
+
97
+
let res = match old {
98
+
Some(old) => old + new,
99
+
None => new,
100
+
};
101
+
102
+
Some(Vec::from_i32(res))
103
+
}
+18
parakeet-index/src/server/mod.rs
+18
parakeet-index/src/server/mod.rs
···
1
+
use std::path::PathBuf;
2
+
3
+
pub mod config;
4
+
pub mod db;
5
+
pub mod service;
6
+
mod utils;
7
+
8
+
pub struct GlobalState {
9
+
pub dbs: db::DbStore,
10
+
}
11
+
12
+
impl GlobalState {
13
+
pub fn new(db_root: PathBuf) -> eyre::Result<Self> {
14
+
let dbs = db::DbStore::new(db_root)?;
15
+
16
+
Ok(GlobalState { dbs })
17
+
}
18
+
}
+201
parakeet-index/src/server/service.rs
+201
parakeet-index/src/server/service.rs
···
1
+
use crate::index::*;
2
+
use crate::server::GlobalState;
3
+
use crate::server::utils::TreeExt;
4
+
use std::collections::HashMap;
5
+
use std::ops::Deref;
6
+
use std::sync::Arc;
7
+
use tonic::codegen::tokio_stream::StreamExt;
8
+
use tonic::{Request, Response, Status, Streaming, async_trait};
9
+
10
+
pub struct Service(Arc<GlobalState>);
11
+
12
+
impl Service {
13
+
pub fn new(state: Arc<GlobalState>) -> Self {
14
+
Service(state)
15
+
}
16
+
17
+
fn apply_delta(
18
+
&self,
19
+
uri: &str,
20
+
typ: AggregateType,
21
+
delta: i32,
22
+
) -> sled::Result<Option<sled::IVec>> {
23
+
let val = delta.to_le_bytes();
24
+
25
+
match typ {
26
+
AggregateType::Unknown => todo!(),
27
+
AggregateType::Follow => self.dbs.follows.merge(uri, val),
28
+
AggregateType::Follower => self.dbs.followers.merge(uri, val),
29
+
AggregateType::Like => self.dbs.likes.merge(uri, val),
30
+
AggregateType::Reply => self.dbs.replies.merge(uri, val),
31
+
AggregateType::Repost => self.dbs.reposts.merge(uri, val),
32
+
AggregateType::Embed => self.dbs.embeds.merge(uri, val),
33
+
AggregateType::ProfilePost => self.dbs.profile_posts.merge(uri, val),
34
+
AggregateType::ProfileList => self.dbs.profile_lists.merge(uri, val),
35
+
AggregateType::ProfileFeed => self.dbs.profile_feeds.merge(uri, val),
36
+
AggregateType::ProfileStarterpack => self.dbs.profile_starterpacks.merge(uri, val),
37
+
}
38
+
}
39
+
}
40
+
41
+
impl Deref for Service {
42
+
type Target = Arc<GlobalState>;
43
+
44
+
fn deref(&self) -> &Self::Target {
45
+
&self.0
46
+
}
47
+
}
48
+
49
+
#[async_trait]
50
+
impl index_server::Index for Service {
51
+
async fn submit_aggregate_delta(
52
+
&self,
53
+
request: Request<AggregateDeltaReq>,
54
+
) -> Result<Response<AggregateDeltaRes>, Status> {
55
+
let inner = request.into_inner();
56
+
57
+
let res = self.apply_delta(&inner.uri, inner.typ(), inner.delta);
58
+
59
+
if let Err(e) = res {
60
+
tracing::error!("failed to update stats DB: {e}");
61
+
return Err(Status::unknown("failed to update stats DB"));
62
+
}
63
+
64
+
Ok(Response::new(AggregateDeltaRes {}))
65
+
}
66
+
67
+
async fn submit_aggregate_delta_batch(
68
+
&self,
69
+
request: Request<AggregateDeltaBatchReq>,
70
+
) -> Result<Response<AggregateDeltaRes>, Status> {
71
+
let inner = request.into_inner();
72
+
73
+
for data in inner.deltas {
74
+
let res = self.apply_delta(&data.uri, data.typ(), data.delta);
75
+
76
+
if let Err(e) = res {
77
+
tracing::error!("failed to update stats DB: {e}");
78
+
return Err(Status::unknown("failed to update stats DB"));
79
+
}
80
+
}
81
+
82
+
Ok(Response::new(AggregateDeltaRes {}))
83
+
}
84
+
85
+
async fn submit_aggregate_delta_stream(
86
+
&self,
87
+
request: Request<Streaming<AggregateDeltaReq>>,
88
+
) -> Result<Response<AggregateDeltaRes>, Status> {
89
+
let mut inner = request.into_inner();
90
+
91
+
while let Some(req) = inner.next().await {
92
+
if let Ok(data) = req {
93
+
let res = self.apply_delta(&data.uri, data.typ(), data.delta);
94
+
95
+
if let Err(e) = res {
96
+
tracing::error!("failed to update stats DB: {e}");
97
+
return Err(Status::unknown("failed to update stats DB"));
98
+
}
99
+
} else {
100
+
tracing::error!("failed to read stream item")
101
+
}
102
+
}
103
+
104
+
Ok(Response::new(AggregateDeltaRes {}))
105
+
}
106
+
107
+
async fn get_profile_stats(
108
+
&self,
109
+
request: Request<GetStatsReq>,
110
+
) -> Result<Response<GetProfileStatsRes>, Status> {
111
+
let inner = request.into_inner();
112
+
113
+
let stats = self.dbs.get_profile_stats(&inner.uri);
114
+
115
+
Ok(Response::new(GetProfileStatsRes { stats }))
116
+
}
117
+
118
+
async fn get_profile_stats_many(
119
+
&self,
120
+
request: Request<GetStatsManyReq>,
121
+
) -> Result<Response<GetProfileStatsManyRes>, Status> {
122
+
let inner = request.into_inner();
123
+
124
+
// idk if this is the best way of doing this????
125
+
let entries = inner
126
+
.uris
127
+
.into_iter()
128
+
.filter_map(|uri| {
129
+
let stats = self.dbs.get_profile_stats(&uri)?;
130
+
131
+
Some((uri, stats))
132
+
})
133
+
.collect::<HashMap<_, _>>();
134
+
135
+
Ok(Response::new(GetProfileStatsManyRes { entries }))
136
+
}
137
+
138
+
async fn get_post_stats(
139
+
&self,
140
+
request: Request<GetStatsReq>,
141
+
) -> Result<Response<GetPostStatsRes>, Status> {
142
+
let inner = request.into_inner();
143
+
144
+
let stats = self.dbs.get_post_stats(&inner.uri);
145
+
146
+
Ok(Response::new(GetPostStatsRes { stats }))
147
+
}
148
+
149
+
async fn get_post_stats_many(
150
+
&self,
151
+
request: Request<GetStatsManyReq>,
152
+
) -> Result<Response<GetPostStatsManyRes>, Status> {
153
+
let inner = request.into_inner();
154
+
155
+
let entries = inner
156
+
.uris
157
+
.into_iter()
158
+
.filter_map(|uri| {
159
+
let stats = self.dbs.get_post_stats(&uri)?;
160
+
161
+
Some((uri, stats))
162
+
})
163
+
.collect::<HashMap<_, _>>();
164
+
165
+
Ok(Response::new(GetPostStatsManyRes { entries }))
166
+
}
167
+
168
+
async fn get_like_count(
169
+
&self,
170
+
request: Request<GetStatsReq>,
171
+
) -> Result<Response<GetLikeCountRes>, Status> {
172
+
let inner = request.into_inner();
173
+
174
+
let likes = self
175
+
.dbs
176
+
.likes
177
+
.get_i32(inner.uri)
178
+
.map(|likes| LikeCount { likes });
179
+
180
+
Ok(Response::new(GetLikeCountRes { likes }))
181
+
}
182
+
183
+
async fn get_like_count_many(
184
+
&self,
185
+
request: Request<GetStatsManyReq>,
186
+
) -> Result<Response<GetLikeCountManyRes>, Status> {
187
+
let inner = request.into_inner();
188
+
189
+
let entries = inner
190
+
.uris
191
+
.into_iter()
192
+
.filter_map(|uri| {
193
+
let likes = self.dbs.likes.get_i32(&uri)?;
194
+
195
+
Some((uri, LikeCount { likes }))
196
+
})
197
+
.collect();
198
+
199
+
Ok(Response::new(GetLikeCountManyRes { entries }))
200
+
}
201
+
}
+59
parakeet-index/src/server/utils.rs
+59
parakeet-index/src/server/utils.rs
···
1
+
use sled::{IVec, Tree};
2
+
3
+
pub trait ToIntExt {
4
+
fn as_i32(&self) -> Option<i32>;
5
+
fn from_i32(i: i32) -> Self;
6
+
}
7
+
8
+
impl ToIntExt for IVec {
9
+
fn as_i32(&self) -> Option<i32> {
10
+
if self.len() == 4 {
11
+
let bytes = self[0..4].try_into().ok()?;
12
+
Some(i32::from_le_bytes(bytes))
13
+
} else {
14
+
None
15
+
}
16
+
}
17
+
18
+
fn from_i32(i: i32) -> Self {
19
+
IVec::from(&i.to_le_bytes())
20
+
}
21
+
}
22
+
23
+
impl ToIntExt for Vec<u8> {
24
+
fn as_i32(&self) -> Option<i32> {
25
+
if self.len() == 4 {
26
+
let bytes = self[0..4].try_into().ok()?;
27
+
Some(i32::from_le_bytes(bytes))
28
+
} else {
29
+
None
30
+
}
31
+
}
32
+
33
+
fn from_i32(i: i32) -> Self {
34
+
Vec::from(&i.to_le_bytes())
35
+
}
36
+
}
37
+
38
+
pub fn slice_as_i32(data: &[u8]) -> Option<i32> {
39
+
let bytes = data[0..4].try_into().ok()?;
40
+
41
+
Some(i32::from_le_bytes(bytes))
42
+
}
43
+
44
+
pub trait TreeExt {
45
+
fn get_i32(&self, key: impl AsRef<[u8]>) -> Option<i32>;
46
+
}
47
+
48
+
impl TreeExt for Tree {
49
+
fn get_i32(&self, key: impl AsRef<[u8]>) -> Option<i32> {
50
+
self.get(key).ok().flatten().and_then(|v| v.as_i32())
51
+
}
52
+
}
53
+
54
+
#[macro_export]
55
+
macro_rules! all_none {
56
+
($var0:ident, $($var:ident),*) => {
57
+
$var0.is_none() $(&& $var.is_none())*
58
+
};
59
+
}
+1
parakeet/Cargo.toml
+1
parakeet/Cargo.toml
···
17
17
itertools = "0.14.0"
18
18
lexica = { path = "../lexica" }
19
19
parakeet-db = { path = "../parakeet-db" }
20
+
parakeet-index = { path = "../parakeet-index" }
20
21
serde = { version = "1.0.217", features = ["derive"] }
21
22
serde_json = "1.0.134"
22
23
tokio = { version = "1.42.0", features = ["full"] }
+1
parakeet/run.sh
+1
parakeet/run.sh
···
1
+
cargo run
+1
parakeet/src/config.rs
+1
parakeet/src/config.rs
+10
-6
parakeet/src/hydration/feedgen.rs
+10
-6
parakeet/src/hydration/feedgen.rs
···
11
11
feedgen: models::FeedGen,
12
12
creator: ProfileView,
13
13
labels: Vec<models::Label>,
14
+
likes: Option<i32>,
14
15
) -> GeneratorView {
15
16
let content_mode = feedgen
16
17
.content_mode
···
31
32
avatar: feedgen
32
33
.avatar_cid
33
34
.map(|v| format!("https://localhost/feedgen/{v}")),
34
-
like_count: 0,
35
+
like_count: likes.unwrap_or_default() as i64,
35
36
accepts_interactions: feedgen.accepts_interactions,
36
37
labels: map_labels(labels),
37
38
content_mode,
···
45
46
apply_labelers: &[LabelConfigItem],
46
47
) -> Option<GeneratorView> {
47
48
let labels = loaders.label.load(&feedgen, apply_labelers).await;
48
-
let feedgen = loaders.feedgen.load(feedgen).await?;
49
+
let (feedgen, likes) = loaders.feedgen.load(feedgen).await?;
49
50
let profile = hydrate_profile(loaders, feedgen.owner.clone(), apply_labelers).await?;
50
51
51
-
Some(build_feedgen(feedgen, profile, labels))
52
+
Some(build_feedgen(feedgen, profile, labels, likes))
52
53
}
53
54
54
55
pub async fn hydrate_feedgens(
···
61
62
62
63
let creators = feedgens
63
64
.values()
64
-
.map(|feedgen| feedgen.owner.clone())
65
+
.map(|(feedgen, _)| feedgen.owner.clone())
65
66
.collect();
66
67
let creators = hydrate_profiles(loaders, creators, apply_labelers).await;
67
68
68
69
feedgens
69
70
.into_iter()
70
-
.filter_map(|(uri, feedgen)| {
71
+
.filter_map(|(uri, (feedgen, likes))| {
71
72
let creator = creators.get(&feedgen.owner)?;
72
73
let labels = labels.get(&uri).cloned().unwrap_or_default();
73
74
74
-
Some((uri, build_feedgen(feedgen, creator.to_owned(), labels)))
75
+
Some((
76
+
uri,
77
+
build_feedgen(feedgen, creator.to_owned(), labels, likes),
78
+
))
75
79
})
76
80
.collect()
77
81
}
+17
-12
parakeet/src/hydration/labeler.rs
+17
-12
parakeet/src/hydration/labeler.rs
···
12
12
labeler: models::LabelerService,
13
13
creator: ProfileView,
14
14
labels: Vec<models::Label>,
15
+
likes: Option<i32>,
15
16
) -> LabelerView {
16
17
LabelerView {
17
18
uri: format!("at://{}/app.bsky.labeler.service/self", labeler.did),
18
19
cid: labeler.cid,
19
20
creator,
20
-
like_count: 0,
21
+
like_count: likes.unwrap_or_default() as i64,
21
22
labels: map_labels(labels),
22
23
indexed_at: labeler.indexed_at,
23
24
}
···
28
29
defs: Vec<models::LabelDefinition>,
29
30
creator: ProfileView,
30
31
labels: Vec<models::Label>,
32
+
likes: Option<i32>,
31
33
) -> LabelerViewDetailed {
32
34
let reason_types = labeler.reasons.map(|v| {
33
35
v.into_iter()
···
66
68
uri: format!("at://{}/app.bsky.labeler.service/self", labeler.did),
67
69
cid: labeler.cid,
68
70
creator,
69
-
like_count: 0,
71
+
like_count: likes.unwrap_or_default() as i64,
70
72
policies: LabelerPolicy {
71
73
label_values,
72
74
label_value_definitions,
···
85
87
apply_labelers: &[LabelConfigItem],
86
88
) -> Option<LabelerView> {
87
89
let labels = loaders.label.load(&labeler, apply_labelers).await;
88
-
let (labeler, _) = loaders.labeler.load(labeler).await?;
90
+
let (labeler, _, likes) = loaders.labeler.load(labeler).await?;
89
91
let creator = hydrate_profile(loaders, labeler.did.clone(), apply_labelers).await?;
90
92
91
-
Some(build_view(labeler, creator, labels))
93
+
Some(build_view(labeler, creator, labels, likes))
92
94
}
93
95
94
96
pub async fn hydrate_labelers(
···
101
103
102
104
let creators = labelers
103
105
.values()
104
-
.map(|(labeler, _)| labeler.did.clone())
106
+
.map(|(labeler, _, _)| labeler.did.clone())
105
107
.collect();
106
108
let creators = hydrate_profiles(loaders, creators, apply_labelers).await;
107
109
108
110
labelers
109
111
.into_iter()
110
-
.filter_map(|(k, (labeler, _))| {
112
+
.filter_map(|(k, (labeler, _, likes))| {
111
113
let creator = creators.get(&labeler.did).cloned()?;
112
114
let labels = labels.get(&k).cloned().unwrap_or_default();
113
115
114
-
Some((k, build_view(labeler, creator, labels)))
116
+
Some((k, build_view(labeler, creator, labels, likes)))
115
117
})
116
118
.collect()
117
119
}
···
122
124
apply_labelers: &[LabelConfigItem],
123
125
) -> Option<LabelerViewDetailed> {
124
126
let labels = loaders.label.load(&labeler, apply_labelers).await;
125
-
let (labeler, defs) = loaders.labeler.load(labeler).await?;
127
+
let (labeler, defs, likes) = loaders.labeler.load(labeler).await?;
126
128
let creator = hydrate_profile(loaders, labeler.did.clone(), apply_labelers).await?;
127
129
128
-
Some(build_view_detailed(labeler, defs, creator, labels))
130
+
Some(build_view_detailed(labeler, defs, creator, labels, likes))
129
131
}
130
132
131
133
pub async fn hydrate_labelers_detailed(
···
138
140
139
141
let creators = labelers
140
142
.values()
141
-
.map(|(labeler, _)| labeler.did.clone())
143
+
.map(|(labeler, _, _)| labeler.did.clone())
142
144
.collect();
143
145
let creators = hydrate_profiles(loaders, creators, apply_labelers).await;
144
146
145
147
labelers
146
148
.into_iter()
147
-
.filter_map(|(k, (labeler, defs))| {
149
+
.filter_map(|(k, (labeler, defs, likes))| {
148
150
let creator = creators.get(&labeler.did).cloned()?;
149
151
let labels = labels.get(&k).cloned().unwrap_or_default();
150
152
151
-
Some((k, build_view_detailed(labeler, defs, creator, labels)))
153
+
Some((
154
+
k,
155
+
build_view_detailed(labeler, defs, creator, labels, likes),
156
+
))
152
157
})
153
158
.collect()
154
159
}
+25
-11
parakeet/src/hydration/posts.rs
+25
-11
parakeet/src/hydration/posts.rs
···
7
7
use lexica::app_bsky::embed::{AspectRatio, Embed};
8
8
use lexica::app_bsky::feed::{FeedViewPost, PostView, ReplyRef, ReplyRefPost, ThreadgateView};
9
9
use lexica::app_bsky::graph::ListViewBasic;
10
+
use lexica::app_bsky::RecordStats;
10
11
use parakeet_db::models;
12
+
use parakeet_index::PostStats;
11
13
use std::collections::HashMap;
12
14
13
15
fn build_postview(
···
16
18
labels: Vec<models::Label>,
17
19
embed: Option<Embed>,
18
20
threadgate: Option<ThreadgateView>,
21
+
stats: Option<PostStats>,
19
22
) -> PostView {
23
+
let stats = stats
24
+
.map(|stats| RecordStats {
25
+
reply_count: stats.replies as i64,
26
+
repost_count: stats.reposts as i64,
27
+
like_count: stats.likes as i64,
28
+
quote_count: stats.quotes as i64,
29
+
})
30
+
.unwrap_or_default();
31
+
20
32
PostView {
21
33
uri: post.at_uri,
22
34
cid: post.cid,
23
35
author,
24
36
record: post.record,
25
37
embed,
26
-
stats: Default::default(),
38
+
stats,
27
39
labels: map_labels(labels),
28
40
threadgate,
29
41
indexed_at: post.created_at,
···
96
108
post: String,
97
109
apply_labelers: &[LabelConfigItem],
98
110
) -> Option<PostView> {
99
-
let (post, threadgate) = loaders.posts.load(post).await?;
111
+
let (post, threadgate, stats) = loaders.posts.load(post).await?;
100
112
let embed = hydrate_embed(loaders, post.at_uri.clone(), apply_labelers).await;
101
113
let author = hydrate_profile_basic(loaders, post.did.clone(), apply_labelers).await?;
102
114
let threadgate = hydrate_threadgate(loaders, threadgate, apply_labelers).await;
103
115
let labels = loaders.label.load(&post.at_uri, apply_labelers).await;
104
116
105
-
Some(build_postview(post, author, labels, embed, threadgate))
117
+
Some(build_postview(
118
+
post, author, labels, embed, threadgate, stats,
119
+
))
106
120
}
107
121
108
122
pub async fn hydrate_posts(
···
114
128
115
129
let (authors, post_uris) = posts
116
130
.values()
117
-
.map(|(post, _)| (post.did.clone(), post.at_uri.clone()))
131
+
.map(|(post, _, _)| (post.did.clone(), post.at_uri.clone()))
118
132
.unzip::<_, _, Vec<_>, Vec<_>>();
119
133
let authors = hydrate_profiles_basic(loaders, authors, apply_labelers).await;
120
134
···
122
136
123
137
let threadgates = posts
124
138
.values()
125
-
.filter_map(|(_, threadgate)| threadgate.clone())
139
+
.filter_map(|(_, threadgate, _)| threadgate.clone())
126
140
.collect();
127
141
let threadgates = hydrate_threadgates(loaders, threadgates, apply_labelers).await;
128
142
···
130
144
131
145
posts
132
146
.into_iter()
133
-
.filter_map(|(uri, (post, threadgate))| {
147
+
.filter_map(|(uri, (post, threadgate, stats))| {
134
148
let author = authors.get(&post.did)?;
135
149
let embed = embeds.get(&uri).cloned();
136
150
let threadgate = threadgate.and_then(|tg| threadgates.get(&tg.at_uri).cloned());
···
138
152
139
153
Some((
140
154
uri,
141
-
build_postview(post, author.to_owned(), labels, embed, threadgate),
155
+
build_postview(post, author.to_owned(), labels, embed, threadgate, stats),
142
156
))
143
157
})
144
158
.collect()
···
153
167
154
168
let (authors, post_uris) = posts
155
169
.values()
156
-
.map(|(post, _)| (post.did.clone(), post.at_uri.clone()))
170
+
.map(|(post, _, _)| (post.did.clone(), post.at_uri.clone()))
157
171
.unzip::<_, _, Vec<_>, Vec<_>>();
158
172
let authors = hydrate_profiles_basic(loaders, authors, apply_labelers).await;
159
173
···
163
177
164
178
let reply_refs = posts
165
179
.values()
166
-
.flat_map(|(post, _)| [post.parent_uri.clone(), post.root_uri.clone()])
180
+
.flat_map(|(post, _, _)| [post.parent_uri.clone(), post.root_uri.clone()])
167
181
.flatten()
168
182
.collect::<Vec<_>>();
169
183
···
171
185
172
186
posts
173
187
.into_iter()
174
-
.filter_map(|(post_uri, (post, threadgate))| {
188
+
.filter_map(|(post_uri, (post, threadgate, stats))| {
175
189
let author = authors.get(&post.did)?;
176
190
177
191
let root = post.root_uri.as_ref().and_then(|uri| reply_posts.get(uri));
···
203
217
204
218
let embed = embeds.get(&post_uri).cloned();
205
219
let labels = post_labels.get(&post_uri).cloned().unwrap_or_default();
206
-
let post = build_postview(post, author.to_owned(), labels, embed, None);
220
+
let post = build_postview(post, author.to_owned(), labels, embed, None, stats);
207
221
208
222
Some((
209
223
post_uri,
+32
-43
parakeet/src/hydration/profile.rs
+32
-43
parakeet/src/hydration/profile.rs
···
2
2
use crate::loaders::Dataloaders;
3
3
use lexica::app_bsky::actor::*;
4
4
use parakeet_db::models;
5
+
use parakeet_index::ProfileStats;
5
6
use std::collections::HashMap;
6
7
use std::sync::OnceLock;
7
8
8
9
pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new();
9
10
10
-
fn build_associated(chat: Option<ChatAllowIncoming>, labeler: bool) -> Option<ProfileAssociated> {
11
-
if chat.is_some() || labeler {
11
+
fn build_associated(
12
+
chat: Option<ChatAllowIncoming>,
13
+
labeler: bool,
14
+
stats: Option<ProfileStats>,
15
+
) -> Option<ProfileAssociated> {
16
+
if chat.is_some() || labeler || stats.is_some() {
17
+
let stats = stats.unwrap_or_default();
18
+
12
19
Some(ProfileAssociated {
13
-
lists: 0,
14
-
feedgens: 0,
15
-
starter_packs: 0,
20
+
lists: stats.lists as i64,
21
+
feedgens: stats.feeds as i64,
22
+
starter_packs: stats.starterpacks as i64,
16
23
labeler,
17
24
chat: chat.map(|v| ProfileAssociatedChat { allow_incoming: v }),
18
25
})
···
110
117
is_labeler: bool,
111
118
labels: Vec<models::Label>,
112
119
verifications: Option<Vec<models::VerificationEntry>>,
120
+
stats: Option<ProfileStats>,
113
121
) -> ProfileViewBasic {
114
-
let associated = build_associated(chat_decl, is_labeler);
122
+
let associated = build_associated(chat_decl, is_labeler, stats);
115
123
let verification = build_verification(&profile, &handle, verifications);
116
124
117
125
ProfileViewBasic {
···
135
143
is_labeler: bool,
136
144
labels: Vec<models::Label>,
137
145
verifications: Option<Vec<models::VerificationEntry>>,
146
+
stats: Option<ProfileStats>,
138
147
) -> ProfileView {
139
-
let associated = build_associated(chat_decl, is_labeler);
148
+
let associated = build_associated(chat_decl, is_labeler, stats);
140
149
let verification = build_verification(&profile, &handle, verifications);
141
150
142
151
ProfileView {
···
158
167
fn build_detailed(
159
168
handle: Option<String>,
160
169
profile: models::Profile,
161
-
follow_stats: Option<models::FollowStats>,
162
170
chat_decl: Option<ChatAllowIncoming>,
163
171
is_labeler: bool,
164
172
labels: Vec<models::Label>,
165
173
verifications: Option<Vec<models::VerificationEntry>>,
174
+
stats: Option<ProfileStats>,
166
175
) -> ProfileViewDetailed {
167
-
let associated = build_associated(chat_decl, is_labeler);
176
+
let associated = build_associated(chat_decl, is_labeler, stats);
168
177
let verification = build_verification(&profile, &handle, verifications);
169
178
170
179
ProfileViewDetailed {
···
178
187
banner: profile
179
188
.banner_cid
180
189
.map(|v| format!("https://localhost/banner/{v}")),
181
-
followers_count: follow_stats
182
-
.as_ref()
183
-
.map(|v| v.followers as i64)
184
-
.unwrap_or_default(),
185
-
follows_count: follow_stats
186
-
.as_ref()
187
-
.map(|v| v.following as i64)
188
-
.unwrap_or_default(),
190
+
followers_count: stats.map(|v| v.followers as i64).unwrap_or_default(),
191
+
follows_count: stats.map(|v| v.following as i64).unwrap_or_default(),
189
192
associated,
190
193
labels: map_labels(labels),
191
194
verification,
···
201
204
) -> Option<ProfileViewBasic> {
202
205
let labels = loaders.label.load(&did, apply_labelers).await;
203
206
let verif = loaders.verification.load(did.clone()).await;
204
-
let (handle, profile, _, chat_decl, labeler) = loaders.profile.load(did).await?;
207
+
let (handle, profile, chat_decl, labeler, stats) = loaders.profile.load(did).await?;
205
208
206
209
Some(build_basic(
207
-
handle, profile, chat_decl, labeler, labels, verif,
210
+
handle, profile, chat_decl, labeler, labels, verif, stats,
208
211
))
209
212
}
210
213
···
219
222
220
223
profiles
221
224
.into_iter()
222
-
.map(|(k, (handle, profile, _, chat_decl, labeler))| {
225
+
.map(|(k, (handle, profile, chat_decl, labeler, stats))| {
223
226
let labels = labels.get(&k).cloned().unwrap_or_default();
224
227
let verif = verif.get(&k).cloned();
225
228
226
229
(
227
230
k,
228
-
build_basic(handle, profile, chat_decl, labeler, labels, verif),
231
+
build_basic(handle, profile, chat_decl, labeler, labels, verif, stats),
229
232
)
230
233
})
231
234
.collect()
···
238
241
) -> Option<ProfileView> {
239
242
let labels = loaders.label.load(&did, apply_labelers).await;
240
243
let verif = loaders.verification.load(did.clone()).await;
241
-
let (handle, profile, _, chat_decl, labeler) = loaders.profile.load(did).await?;
244
+
let (handle, profile, chat_decl, labeler, stats) = loaders.profile.load(did).await?;
242
245
243
246
Some(build_profile(
244
-
handle, profile, chat_decl, labeler, labels, verif,
247
+
handle, profile, chat_decl, labeler, labels, verif, stats,
245
248
))
246
249
}
247
250
···
256
259
257
260
profiles
258
261
.into_iter()
259
-
.map(|(k, (handle, profile, _, chat_decl, labeler))| {
262
+
.map(|(k, (handle, profile, chat_decl, labeler, stats))| {
260
263
let labels = labels.get(&k).cloned().unwrap_or_default();
261
264
let verif = verif.get(&k).cloned();
262
265
263
266
(
264
267
k,
265
-
build_profile(handle, profile, chat_decl, labeler, labels, verif),
268
+
build_profile(handle, profile, chat_decl, labeler, labels, verif, stats),
266
269
)
267
270
})
268
271
.collect()
···
275
278
) -> Option<ProfileViewDetailed> {
276
279
let labels = loaders.label.load(&did, apply_labelers).await;
277
280
let verif = loaders.verification.load(did.clone()).await;
278
-
let (handle, profile, follow_stats, chat_decl, labeler) = loaders.profile.load(did).await?;
281
+
let (handle, profile, chat_decl, labeler, stats) = loaders.profile.load(did).await?;
279
282
280
283
Some(build_detailed(
281
-
handle,
282
-
profile,
283
-
follow_stats,
284
-
chat_decl,
285
-
labeler,
286
-
labels,
287
-
verif,
284
+
handle, profile, chat_decl, labeler, labels, verif, stats,
288
285
))
289
286
}
290
287
···
299
296
300
297
profiles
301
298
.into_iter()
302
-
.map(|(k, (handle, profile, follow_stats, chat_decl, labeler))| {
299
+
.map(|(k, (handle, profile, chat_decl, labeler, stats))| {
303
300
let labels = labels.get(&k).cloned().unwrap_or_default();
304
301
let verif = verif.get(&k).cloned();
305
302
306
303
(
307
304
k,
308
-
build_detailed(
309
-
handle,
310
-
profile,
311
-
follow_stats,
312
-
chat_decl,
313
-
labeler,
314
-
labels,
315
-
verif,
316
-
),
305
+
build_detailed(handle, profile, chat_decl, labeler, labels, verif, stats),
317
306
)
318
307
})
319
308
.collect()
+90
-26
parakeet/src/loaders.rs
+90
-26
parakeet/src/loaders.rs
···
26
26
impl Dataloaders {
27
27
// for the moment, we set up memory cached loaders
28
28
// we should build a redis/valkey backend at some point in the future.
29
-
pub fn new(pool: Pool<AsyncPgConnection>) -> Dataloaders {
29
+
pub fn new(pool: Pool<AsyncPgConnection>, idxc: parakeet_index::Client) -> Dataloaders {
30
30
Dataloaders {
31
31
embed: Loader::new(EmbedLoader(pool.clone())),
32
-
feedgen: Loader::new(FeedGenLoader(pool.clone())),
32
+
feedgen: Loader::new(FeedGenLoader(pool.clone(), idxc.clone())),
33
33
handle: Loader::new(HandleLoader(pool.clone())),
34
34
label: LabelLoader(pool.clone()), // CARE: never cache this.
35
-
labeler: Loader::new(LabelServiceLoader(pool.clone())),
35
+
labeler: Loader::new(LabelServiceLoader(pool.clone(), idxc.clone())),
36
36
list: Loader::new(ListLoader(pool.clone())),
37
-
posts: Loader::new(PostLoader(pool.clone())),
38
-
profile: Loader::new(ProfileLoader(pool.clone())),
37
+
posts: Loader::new(PostLoader(pool.clone(), idxc.clone())),
38
+
profile: Loader::new(ProfileLoader(pool.clone(), idxc.clone())),
39
39
starterpacks: Loader::new(StarterPackLoader(pool.clone())),
40
40
verification: Loader::new(VerificationLoader(pool.clone())),
41
41
}
···
66
66
}
67
67
}
68
68
69
-
pub struct ProfileLoader(Pool<AsyncPgConnection>);
69
+
pub struct ProfileLoader(Pool<AsyncPgConnection>, parakeet_index::Client);
70
70
type ProfileLoaderRet = (
71
71
Option<String>,
72
72
models::Profile,
73
-
Option<models::FollowStats>,
74
73
Option<ChatAllowIncoming>,
75
74
bool,
75
+
Option<parakeet_index::ProfileStats>,
76
76
);
77
77
impl BatchFn<String, ProfileLoaderRet> for ProfileLoader {
78
78
async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> {
···
91
91
schema::actors::did,
92
92
schema::actors::handle,
93
93
models::Profile::as_select(),
94
-
Option::<models::FollowStats>::as_select(),
95
94
schema::chat_decls::allow_incoming.nullable(),
96
95
schema::labelers::cid.nullable(),
97
96
))
···
100
99
String,
101
100
Option<String>,
102
101
models::Profile,
103
-
Option<models::FollowStats>,
104
102
Option<String>,
105
103
Option<String>,
106
104
)>(&mut conn)
107
105
.await;
108
106
107
+
let stats_req = parakeet_index::GetStatsManyReq {
108
+
uris: keys.to_vec(),
109
+
};
110
+
let mut stats = self
111
+
.1
112
+
.get_profile_stats_many(stats_req)
113
+
.await
114
+
.unwrap()
115
+
.into_inner()
116
+
.entries;
117
+
109
118
match res {
110
119
Ok(res) => HashMap::from_iter(res.into_iter().map(
111
-
|(did, handle, profile, follow_stats, chat_decl, labeler_cid)| {
120
+
|(did, handle, profile, chat_decl, labeler_cid)| {
112
121
let chat_decl = chat_decl.and_then(|v| ChatAllowIncoming::from_str(&v).ok());
113
122
let is_labeler = labeler_cid.is_some();
123
+
let maybe_stats = stats.remove(&did);
114
124
115
-
let val = (handle, profile, follow_stats, chat_decl, is_labeler);
125
+
let val = (handle, profile, chat_decl, is_labeler, maybe_stats);
116
126
117
127
(did, val)
118
128
},
···
157
167
}
158
168
}
159
169
160
-
pub struct FeedGenLoader(Pool<AsyncPgConnection>);
161
-
type FeedGenLoaderRet = models::FeedGen; //todo: when we have likes, we'll need the count here
170
+
pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client);
171
+
type FeedGenLoaderRet = (models::FeedGen, Option<i32>);
162
172
impl BatchFn<String, FeedGenLoaderRet> for FeedGenLoader {
163
173
async fn load(&mut self, keys: &[String]) -> HashMap<String, FeedGenLoaderRet> {
164
174
let mut conn = self.0.get().await.unwrap();
···
169
179
.load(&mut conn)
170
180
.await;
171
181
182
+
let stats_req = parakeet_index::GetStatsManyReq {
183
+
uris: keys.to_vec(),
184
+
};
185
+
let mut stats = self
186
+
.1
187
+
.get_like_count_many(stats_req)
188
+
.await
189
+
.unwrap()
190
+
.into_inner()
191
+
.entries;
192
+
172
193
match res {
173
-
Ok(res) => HashMap::from_iter(
174
-
res.into_iter()
175
-
.map(|feedgen| (feedgen.at_uri.clone(), feedgen)),
176
-
),
194
+
Ok(res) => HashMap::from_iter(res.into_iter().map(|feedgen| {
195
+
let likes = stats.remove(&feedgen.at_uri).map(|v| v.likes);
196
+
197
+
(feedgen.at_uri.clone(), (feedgen, likes))
198
+
})),
177
199
Err(e) => {
178
200
tracing::error!("feedgen load failed: {e}");
179
201
HashMap::new()
···
182
204
}
183
205
}
184
206
185
-
pub struct PostLoader(Pool<AsyncPgConnection>);
186
-
type PostLoaderRet = (models::Post, Option<models::Threadgate>);
207
+
pub struct PostLoader(Pool<AsyncPgConnection>, parakeet_index::Client);
208
+
type PostLoaderRet = (
209
+
models::Post,
210
+
Option<models::Threadgate>,
211
+
Option<parakeet_index::PostStats>,
212
+
);
187
213
impl BatchFn<String, PostLoaderRet> for PostLoader {
188
214
async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> {
189
215
let mut conn = self.0.get().await.unwrap();
···
198
224
.load(&mut conn)
199
225
.await;
200
226
227
+
let stats_req = parakeet_index::GetStatsManyReq {
228
+
uris: keys.to_vec(),
229
+
};
230
+
let mut stats = self
231
+
.1
232
+
.get_post_stats_many(stats_req)
233
+
.await
234
+
.unwrap()
235
+
.into_inner()
236
+
.entries;
237
+
201
238
match res {
202
-
Ok(res) => HashMap::from_iter(
203
-
res.into_iter()
204
-
.map(|(post, threadgate)| (post.at_uri.clone(), (post, threadgate))),
205
-
),
239
+
Ok(res) => HashMap::from_iter(res.into_iter().map(|(post, threadgate)| {
240
+
let maybe_stats = stats.remove(&post.at_uri);
241
+
242
+
(post.at_uri.clone(), (post, threadgate, maybe_stats))
243
+
})),
206
244
Err(e) => {
207
245
tracing::error!("post load failed: {e}");
208
246
HashMap::new()
···
323
361
}
324
362
}
325
363
326
-
pub struct LabelServiceLoader(Pool<AsyncPgConnection>);
327
-
type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>);
364
+
pub struct LabelServiceLoader(Pool<AsyncPgConnection>, parakeet_index::Client);
365
+
type LabelServiceLoaderRet = (
366
+
models::LabelerService,
367
+
Vec<models::LabelDefinition>,
368
+
Option<i32>,
369
+
);
328
370
impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader {
329
371
async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> {
330
372
let mut conn = self.0.get().await.unwrap();
···
343
385
344
386
let defs = defs.grouped_by(&labelers);
345
387
388
+
let uris = keys
389
+
.iter()
390
+
.map(|v| format!("at://{v}/app.bsky.labeler.service/self"))
391
+
.collect();
392
+
let stats_req = parakeet_index::GetStatsManyReq { uris };
393
+
let mut stats = self
394
+
.1
395
+
.get_like_count_many(stats_req)
396
+
.await
397
+
.unwrap()
398
+
.into_inner()
399
+
.entries;
400
+
346
401
labelers
347
402
.into_iter()
348
403
.zip(defs)
349
-
.map(|(labeler, defs)| (labeler.did.clone(), (labeler, defs)))
404
+
.map(|(labeler, defs)| {
405
+
let likes = stats
406
+
.remove(&format!(
407
+
"at://{}/app.bsky.labeler.service/self",
408
+
&labeler.did
409
+
))
410
+
.map(|v| v.likes);
411
+
412
+
(labeler.did.clone(), (labeler, defs, likes))
413
+
})
350
414
.collect()
351
415
}
352
416
}
+12
-2
parakeet/src/main.rs
+12
-2
parakeet/src/main.rs
···
14
14
pub struct GlobalState {
15
15
pub pool: Pool<AsyncPgConnection>,
16
16
pub dataloaders: Arc<loaders::Dataloaders>,
17
+
pub index_client: parakeet_index::Client,
17
18
}
18
19
19
20
#[tokio::main]
···
25
26
let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url);
26
27
let pool = Pool::builder(db_mgr).build()?;
27
28
28
-
let dataloaders = Arc::new(loaders::Dataloaders::new(pool.clone()));
29
+
let index_client = parakeet_index::Client::connect(conf.index_uri).await?;
30
+
31
+
let dataloaders = Arc::new(loaders::Dataloaders::new(
32
+
pool.clone(),
33
+
index_client.clone(),
34
+
));
29
35
30
36
#[allow(unused)]
31
37
hydration::TRUSTED_VERIFIERS.set(conf.trusted_verifiers);
···
44
50
)
45
51
.layer(TraceLayer::new_for_http())
46
52
.layer(cors)
47
-
.with_state(GlobalState { pool, dataloaders });
53
+
.with_state(GlobalState {
54
+
pool,
55
+
dataloaders,
56
+
index_client,
57
+
});
48
58
49
59
let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port);
50
60
let listener = tokio::net::TcpListener::bind(addr).await?;