+364
-12
Cargo.lock
+364
-12
Cargo.lock
···
286
286
]
287
287
288
288
[[package]]
289
+
name = "axum-tracing-opentelemetry"
290
+
version = "0.32.1"
291
+
source = "registry+https://github.com/rust-lang/crates.io-index"
292
+
checksum = "328c8ddd5ca871b2a5acb00be0b4f103aa62f5d6b6db4071ccf3b12b0629e7c1"
293
+
dependencies = [
294
+
"axum",
295
+
"futures-core",
296
+
"futures-util",
297
+
"http",
298
+
"opentelemetry",
299
+
"opentelemetry-semantic-conventions",
300
+
"pin-project-lite",
301
+
"tower",
302
+
"tracing",
303
+
"tracing-opentelemetry",
304
+
"tracing-opentelemetry-instrumentation-sdk",
305
+
]
306
+
307
+
[[package]]
289
308
name = "backtrace"
290
309
version = "0.3.74"
291
310
source = "registry+https://github.com/rust-lang/crates.io-index"
···
347
366
"proc-macro2",
348
367
"quote",
349
368
"regex",
350
-
"rustc-hash",
369
+
"rustc-hash 1.1.0",
351
370
"shlex",
352
371
"syn",
353
372
"which",
···
465
484
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
466
485
467
486
[[package]]
487
+
name = "cfg_aliases"
488
+
version = "0.2.1"
489
+
source = "registry+https://github.com/rust-lang/crates.io-index"
490
+
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
491
+
492
+
[[package]]
468
493
name = "chrono"
469
494
version = "0.4.41"
470
495
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1370
1395
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
1371
1396
dependencies = [
1372
1397
"cfg-if",
1398
+
"js-sys",
1373
1399
"libc",
1374
1400
"r-efi",
1375
1401
"wasi 0.14.2+wasi-0.2.4",
1402
+
"wasm-bindgen",
1376
1403
]
1377
1404
1378
1405
[[package]]
···
2169
2196
]
2170
2197
2171
2198
[[package]]
2199
+
name = "lru-slab"
2200
+
version = "0.1.2"
2201
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2202
+
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
2203
+
2204
+
[[package]]
2172
2205
name = "lz4-sys"
2173
2206
version = "1.11.1+lz4-1.10.0"
2174
2207
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2185
2218
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
2186
2219
2187
2220
[[package]]
2221
+
name = "matchers"
2222
+
version = "0.1.0"
2223
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2224
+
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
2225
+
dependencies = [
2226
+
"regex-automata 0.1.10",
2227
+
]
2228
+
2229
+
[[package]]
2188
2230
name = "matchit"
2189
2231
version = "0.8.4"
2190
2232
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2513
2555
]
2514
2556
2515
2557
[[package]]
2558
+
name = "opentelemetry"
2559
+
version = "0.31.0"
2560
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2561
+
checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0"
2562
+
dependencies = [
2563
+
"futures-core",
2564
+
"futures-sink",
2565
+
"js-sys",
2566
+
"pin-project-lite",
2567
+
"thiserror 2.0.12",
2568
+
"tracing",
2569
+
]
2570
+
2571
+
[[package]]
2572
+
name = "opentelemetry-http"
2573
+
version = "0.31.0"
2574
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2575
+
checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d"
2576
+
dependencies = [
2577
+
"async-trait",
2578
+
"bytes",
2579
+
"http",
2580
+
"opentelemetry",
2581
+
"reqwest",
2582
+
]
2583
+
2584
+
[[package]]
2585
+
name = "opentelemetry-otlp"
2586
+
version = "0.31.0"
2587
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2588
+
checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf"
2589
+
dependencies = [
2590
+
"http",
2591
+
"opentelemetry",
2592
+
"opentelemetry-http",
2593
+
"opentelemetry-proto",
2594
+
"opentelemetry_sdk",
2595
+
"prost 0.14.1",
2596
+
"reqwest",
2597
+
"thiserror 2.0.12",
2598
+
"tracing",
2599
+
]
2600
+
2601
+
[[package]]
2602
+
name = "opentelemetry-proto"
2603
+
version = "0.31.0"
2604
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2605
+
checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f"
2606
+
dependencies = [
2607
+
"opentelemetry",
2608
+
"opentelemetry_sdk",
2609
+
"prost 0.14.1",
2610
+
"tonic 0.14.2",
2611
+
"tonic-prost",
2612
+
]
2613
+
2614
+
[[package]]
2615
+
name = "opentelemetry-semantic-conventions"
2616
+
version = "0.31.0"
2617
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2618
+
checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846"
2619
+
2620
+
[[package]]
2621
+
name = "opentelemetry_sdk"
2622
+
version = "0.31.0"
2623
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2624
+
checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd"
2625
+
dependencies = [
2626
+
"futures-channel",
2627
+
"futures-executor",
2628
+
"futures-util",
2629
+
"opentelemetry",
2630
+
"percent-encoding",
2631
+
"rand 0.9.1",
2632
+
"thiserror 2.0.12",
2633
+
]
2634
+
2635
+
[[package]]
2516
2636
name = "overload"
2517
2637
version = "0.1.1"
2518
2638
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2549
2669
"async-recursion",
2550
2670
"axum",
2551
2671
"axum-extra",
2672
+
"axum-tracing-opentelemetry",
2552
2673
"base64 0.22.1",
2553
2674
"chrono",
2554
2675
"dataloader",
···
2563
2684
"jsonwebtoken",
2564
2685
"lexica",
2565
2686
"multibase",
2687
+
"opentelemetry",
2688
+
"opentelemetry-otlp",
2689
+
"opentelemetry_sdk",
2566
2690
"parakeet-db",
2567
2691
"parakeet-index",
2568
2692
"redis",
···
2571
2695
"serde_ipld_dagcbor",
2572
2696
"serde_json",
2573
2697
"tokio",
2698
+
"tower",
2574
2699
"tower-http",
2575
2700
"tracing",
2701
+
"tracing-opentelemetry",
2576
2702
"tracing-subscriber",
2577
2703
]
2578
2704
···
2594
2720
"eyre",
2595
2721
"figment",
2596
2722
"itertools 0.14.0",
2597
-
"prost",
2723
+
"opentelemetry",
2724
+
"opentelemetry-otlp",
2725
+
"opentelemetry_sdk",
2726
+
"prost 0.13.5",
2598
2727
"rocksdb",
2599
2728
"serde",
2600
2729
"tokio",
2601
-
"tonic",
2730
+
"tonic 0.13.1",
2602
2731
"tonic-build",
2603
2732
"tonic-health",
2733
+
"tonic-tracing-opentelemetry",
2734
+
"tower",
2604
2735
"tracing",
2736
+
"tracing-opentelemetry",
2605
2737
"tracing-subscriber",
2606
2738
]
2607
2739
···
2907
3039
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
2908
3040
dependencies = [
2909
3041
"bytes",
2910
-
"prost-derive",
3042
+
"prost-derive 0.13.5",
3043
+
]
3044
+
3045
+
[[package]]
3046
+
name = "prost"
3047
+
version = "0.14.1"
3048
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3049
+
checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d"
3050
+
dependencies = [
3051
+
"bytes",
3052
+
"prost-derive 0.14.1",
2911
3053
]
2912
3054
2913
3055
[[package]]
···
2923
3065
"once_cell",
2924
3066
"petgraph",
2925
3067
"prettyplease",
2926
-
"prost",
3068
+
"prost 0.13.5",
2927
3069
"prost-types",
2928
3070
"regex",
2929
3071
"syn",
···
2944
3086
]
2945
3087
2946
3088
[[package]]
3089
+
name = "prost-derive"
3090
+
version = "0.14.1"
3091
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3092
+
checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425"
3093
+
dependencies = [
3094
+
"anyhow",
3095
+
"itertools 0.14.0",
3096
+
"proc-macro2",
3097
+
"quote",
3098
+
"syn",
3099
+
]
3100
+
3101
+
[[package]]
2947
3102
name = "prost-types"
2948
3103
version = "0.13.5"
2949
3104
source = "registry+https://github.com/rust-lang/crates.io-index"
2950
3105
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
2951
3106
dependencies = [
2952
-
"prost",
3107
+
"prost 0.13.5",
2953
3108
]
2954
3109
2955
3110
[[package]]
···
2974
3129
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
2975
3130
2976
3131
[[package]]
3132
+
name = "quinn"
3133
+
version = "0.11.9"
3134
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3135
+
checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20"
3136
+
dependencies = [
3137
+
"bytes",
3138
+
"cfg_aliases",
3139
+
"pin-project-lite",
3140
+
"quinn-proto",
3141
+
"quinn-udp",
3142
+
"rustc-hash 2.1.1",
3143
+
"rustls",
3144
+
"socket2 0.6.0",
3145
+
"thiserror 2.0.12",
3146
+
"tokio",
3147
+
"tracing",
3148
+
"web-time",
3149
+
]
3150
+
3151
+
[[package]]
3152
+
name = "quinn-proto"
3153
+
version = "0.11.13"
3154
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3155
+
checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31"
3156
+
dependencies = [
3157
+
"bytes",
3158
+
"getrandom 0.3.3",
3159
+
"lru-slab",
3160
+
"rand 0.9.1",
3161
+
"ring",
3162
+
"rustc-hash 2.1.1",
3163
+
"rustls",
3164
+
"rustls-pki-types",
3165
+
"slab",
3166
+
"thiserror 2.0.12",
3167
+
"tinyvec",
3168
+
"tracing",
3169
+
"web-time",
3170
+
]
3171
+
3172
+
[[package]]
3173
+
name = "quinn-udp"
3174
+
version = "0.5.14"
3175
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3176
+
checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd"
3177
+
dependencies = [
3178
+
"cfg_aliases",
3179
+
"libc",
3180
+
"once_cell",
3181
+
"socket2 0.6.0",
3182
+
"tracing",
3183
+
"windows-sys 0.59.0",
3184
+
]
3185
+
3186
+
[[package]]
2977
3187
name = "quote"
2978
3188
version = "1.0.38"
2979
3189
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3135
3345
dependencies = [
3136
3346
"aho-corasick",
3137
3347
"memchr",
3138
-
"regex-automata",
3139
-
"regex-syntax",
3348
+
"regex-automata 0.4.9",
3349
+
"regex-syntax 0.8.5",
3350
+
]
3351
+
3352
+
[[package]]
3353
+
name = "regex-automata"
3354
+
version = "0.1.10"
3355
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3356
+
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
3357
+
dependencies = [
3358
+
"regex-syntax 0.6.29",
3140
3359
]
3141
3360
3142
3361
[[package]]
···
3147
3366
dependencies = [
3148
3367
"aho-corasick",
3149
3368
"memchr",
3150
-
"regex-syntax",
3369
+
"regex-syntax 0.8.5",
3151
3370
]
3152
3371
3153
3372
[[package]]
3154
3373
name = "regex-syntax"
3374
+
version = "0.6.29"
3375
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3376
+
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
3377
+
3378
+
[[package]]
3379
+
name = "regex-syntax"
3155
3380
version = "0.8.5"
3156
3381
source = "registry+https://github.com/rust-lang/crates.io-index"
3157
3382
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
···
3166
3391
"base64 0.22.1",
3167
3392
"bytes",
3168
3393
"encoding_rs",
3394
+
"futures-channel",
3169
3395
"futures-core",
3170
3396
"futures-util",
3171
3397
"h2",
···
3184
3410
"once_cell",
3185
3411
"percent-encoding",
3186
3412
"pin-project-lite",
3413
+
"quinn",
3414
+
"rustls",
3415
+
"rustls-native-certs",
3187
3416
"rustls-pemfile",
3417
+
"rustls-pki-types",
3188
3418
"serde",
3189
3419
"serde_json",
3190
3420
"serde_urlencoded",
···
3192
3422
"system-configuration",
3193
3423
"tokio",
3194
3424
"tokio-native-tls",
3425
+
"tokio-rustls",
3195
3426
"tokio-util",
3196
3427
"tower",
3197
3428
"tower-service",
···
3281
3512
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
3282
3513
3283
3514
[[package]]
3515
+
name = "rustc-hash"
3516
+
version = "2.1.1"
3517
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3518
+
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
3519
+
3520
+
[[package]]
3284
3521
name = "rustc_version"
3285
3522
version = "0.4.1"
3286
3523
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3310
3547
dependencies = [
3311
3548
"aws-lc-rs",
3312
3549
"once_cell",
3550
+
"ring",
3313
3551
"rustls-pki-types",
3314
3552
"rustls-webpki",
3315
3553
"subtle",
···
3342
3580
version = "1.11.0"
3343
3581
source = "registry+https://github.com/rust-lang/crates.io-index"
3344
3582
checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c"
3583
+
dependencies = [
3584
+
"web-time",
3585
+
]
3345
3586
3346
3587
[[package]]
3347
3588
name = "rustls-webpki"
···
4185
4426
"hyper-util",
4186
4427
"percent-encoding",
4187
4428
"pin-project",
4188
-
"prost",
4429
+
"prost 0.13.5",
4189
4430
"socket2 0.5.8",
4190
4431
"tokio",
4191
4432
"tokio-stream",
4192
4433
"tower",
4434
+
"tower-layer",
4435
+
"tower-service",
4436
+
"tracing",
4437
+
]
4438
+
4439
+
[[package]]
4440
+
name = "tonic"
4441
+
version = "0.14.2"
4442
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4443
+
checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203"
4444
+
dependencies = [
4445
+
"async-trait",
4446
+
"base64 0.22.1",
4447
+
"bytes",
4448
+
"http",
4449
+
"http-body",
4450
+
"http-body-util",
4451
+
"percent-encoding",
4452
+
"pin-project",
4453
+
"sync_wrapper",
4454
+
"tokio-stream",
4193
4455
"tower-layer",
4194
4456
"tower-service",
4195
4457
"tracing",
···
4215
4477
source = "registry+https://github.com/rust-lang/crates.io-index"
4216
4478
checksum = "cb87334d340313fefa513b6e60794d44a86d5f039b523229c99c323e4e19ca4b"
4217
4479
dependencies = [
4218
-
"prost",
4480
+
"prost 0.13.5",
4219
4481
"tokio",
4220
4482
"tokio-stream",
4221
-
"tonic",
4483
+
"tonic 0.13.1",
4484
+
]
4485
+
4486
+
[[package]]
4487
+
name = "tonic-prost"
4488
+
version = "0.14.2"
4489
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4490
+
checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67"
4491
+
dependencies = [
4492
+
"bytes",
4493
+
"prost 0.14.1",
4494
+
"tonic 0.14.2",
4495
+
]
4496
+
4497
+
[[package]]
4498
+
name = "tonic-tracing-opentelemetry"
4499
+
version = "0.32.0"
4500
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4501
+
checksum = "31f57ac46b32b08989476b498239364c300b09d75928c1fa2e46cb489a41c8e3"
4502
+
dependencies = [
4503
+
"futures-core",
4504
+
"futures-util",
4505
+
"http",
4506
+
"http-body",
4507
+
"hyper",
4508
+
"opentelemetry",
4509
+
"pin-project-lite",
4510
+
"tonic 0.14.2",
4511
+
"tower",
4512
+
"tracing",
4513
+
"tracing-opentelemetry",
4514
+
"tracing-opentelemetry-instrumentation-sdk",
4222
4515
]
4223
4516
4224
4517
[[package]]
···
4313
4606
]
4314
4607
4315
4608
[[package]]
4609
+
name = "tracing-opentelemetry"
4610
+
version = "0.32.0"
4611
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4612
+
checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e"
4613
+
dependencies = [
4614
+
"js-sys",
4615
+
"opentelemetry",
4616
+
"opentelemetry_sdk",
4617
+
"rustversion",
4618
+
"smallvec",
4619
+
"thiserror 2.0.12",
4620
+
"tracing",
4621
+
"tracing-core",
4622
+
"tracing-log",
4623
+
"tracing-subscriber",
4624
+
"web-time",
4625
+
]
4626
+
4627
+
[[package]]
4628
+
name = "tracing-opentelemetry-instrumentation-sdk"
4629
+
version = "0.32.1"
4630
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4631
+
checksum = "7a1a4dcfb798af2cef9e47c30a14e13c108b4b40e057120401b2025ec622c416"
4632
+
dependencies = [
4633
+
"http",
4634
+
"opentelemetry",
4635
+
"opentelemetry-semantic-conventions",
4636
+
"tracing",
4637
+
"tracing-opentelemetry",
4638
+
]
4639
+
4640
+
[[package]]
4641
+
name = "tracing-serde"
4642
+
version = "0.2.0"
4643
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4644
+
checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1"
4645
+
dependencies = [
4646
+
"serde",
4647
+
"tracing-core",
4648
+
]
4649
+
4650
+
[[package]]
4316
4651
name = "tracing-subscriber"
4317
4652
version = "0.3.19"
4318
4653
source = "registry+https://github.com/rust-lang/crates.io-index"
4319
4654
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
4320
4655
dependencies = [
4656
+
"matchers",
4321
4657
"nu-ansi-term",
4658
+
"once_cell",
4659
+
"regex",
4660
+
"serde",
4661
+
"serde_json",
4322
4662
"sharded-slab",
4323
4663
"smallvec",
4324
4664
"thread_local",
4665
+
"tracing",
4325
4666
"tracing-core",
4326
4667
"tracing-log",
4668
+
"tracing-serde",
4327
4669
]
4328
4670
4329
4671
[[package]]
···
4592
4934
version = "0.3.77"
4593
4935
source = "registry+https://github.com/rust-lang/crates.io-index"
4594
4936
checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2"
4937
+
dependencies = [
4938
+
"js-sys",
4939
+
"wasm-bindgen",
4940
+
]
4941
+
4942
+
[[package]]
4943
+
name = "web-time"
4944
+
version = "1.1.0"
4945
+
source = "registry+https://github.com/rust-lang/crates.io-index"
4946
+
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
4595
4947
dependencies = [
4596
4948
"js-sys",
4597
4949
"wasm-bindgen",
+1
-1
consumer/Cargo.toml
+1
-1
consumer/Cargo.toml
+3
-3
consumer/src/backfill/downloader.rs
+3
-3
consumer/src/backfill/downloader.rs
···
84
84
85
85
// has the repo already been downloaded?
86
86
if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() {
87
-
tracing::warn!("skipping duplicate repo {did}");
87
+
tracing::info!("skipping duplicate repo {did}");
88
88
continue;
89
89
}
90
90
···
92
92
match db::actor_get_statuses(&mut conn, &did).await {
93
93
Ok(Some((_, state))) => {
94
94
if state == ActorSyncState::Synced || state == ActorSyncState::Processing {
95
-
tracing::warn!("skipping duplicate repo {did}");
95
+
tracing::info!("skipping duplicate repo {did}");
96
96
continue;
97
97
}
98
98
}
···
206
206
let _ = rc.zadd(BF_REM_KEY, &pds, rem).await;
207
207
let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await;
208
208
}
209
-
Ok(_) => tracing::warn!(pds, "got response with no ratelimit headers."),
209
+
Ok(_) => tracing::debug!(pds, "got response with no ratelimit headers."),
210
210
Err(e) => {
211
211
tracing::error!(pds, did, "failed to download repo: {e}");
212
212
continue;
+4
-5
consumer/src/backfill/repo.rs
+4
-5
consumer/src/backfill/repo.rs
···
53
53
54
54
match block {
55
55
CarEntry::Commit(_) => {
56
-
tracing::warn!("got commit entry that was not in root")
56
+
tracing::debug!("got commit entry that was not in root")
57
57
}
58
58
CarEntry::Record(CarRecordEntry::Known(record)) => {
59
59
if let Some(path) = mst_nodes.remove(&cid) {
···
96
96
}
97
97
}
98
98
99
-
let commit = commit.unwrap();
99
+
let Some(commit) = commit else {
100
+
eyre::bail!("repo contained no commit?");
101
+
};
100
102
101
103
Ok((commit, deltas, copies))
102
104
}
···
169
171
}
170
172
RecordTypes::AppBskyFeedThreadgate(record) => {
171
173
if !at_uri_is_by(&record.post, did) {
172
-
tracing::warn!("tried to create a threadgate on a post we don't control!");
173
174
return Ok(());
174
175
}
175
176
···
194
195
RecordTypes::AppBskyGraphListItem(rec) => {
195
196
let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>();
196
197
if did != split_aturi[2] {
197
-
// it's also probably a bad idea to log *all* the attempts to do this...
198
-
tracing::warn!("tried to create a listitem on a list we don't control!");
199
198
return Ok(());
200
199
}
201
200
+8
consumer/src/config.rs
+8
consumer/src/config.rs
···
13
13
14
14
#[derive(Debug, Deserialize)]
15
15
pub struct Config {
16
+
#[serde(flatten)]
17
+
pub instruments: ConfigInstruments,
16
18
pub index_uri: String,
17
19
pub database: deadpool_postgres::Config,
18
20
pub redis_uri: String,
···
27
29
pub indexer: Option<IndexerConfig>,
28
30
/// Configuration items specific to backfill
29
31
pub backfill: Option<BackfillConfig>,
32
+
}
33
+
34
+
#[derive(Debug, Deserialize)]
35
+
pub struct ConfigInstruments {
36
+
#[serde(default)]
37
+
pub log_json: bool,
30
38
}
31
39
32
40
#[derive(Debug, Deserialize)]
+3
-3
consumer/src/db/actor.rs
+3
-3
consumer/src/db/actor.rs
···
69
69
)
70
70
.await?;
71
71
72
-
Ok(res.map(|v| (v.get(0), v.get(1))))
72
+
res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()
73
73
}
74
74
75
75
pub async fn actor_get_repo_status<C: GenericClient>(
···
83
83
)
84
84
.await?;
85
85
86
-
Ok(res.map(|v| (v.get(0), v.get(1))))
86
+
res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()
87
87
}
88
88
89
89
pub async fn actor_get_statuses<C: GenericClient>(
···
97
97
)
98
98
.await?;
99
99
100
-
Ok(res.map(|v| (v.get(0), v.get(1))))
100
+
res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()
101
101
}
+10
-9
consumer/src/db/backfill.rs
+10
-9
consumer/src/db/backfill.rs
···
51
51
)
52
52
.await?;
53
53
54
-
Ok(res
55
-
.into_iter()
56
-
.map(|row| BackfillRow {
57
-
repo: row.get(0),
58
-
repo_ver: row.get(1),
59
-
cid: row.get(2),
60
-
data: row.get(3),
61
-
indexed_at: row.get(4),
54
+
res.into_iter()
55
+
.map(|row| {
56
+
Ok(BackfillRow {
57
+
repo: row.try_get(0)?,
58
+
repo_ver: row.try_get(1)?,
59
+
cid: row.try_get(2)?,
60
+
data: row.try_get(3)?,
61
+
indexed_at: row.try_get(4)?,
62
+
})
62
63
})
63
-
.collect())
64
+
.collect()
64
65
}
65
66
66
67
pub async fn backfill_delete_rows<C: GenericClient>(conn: &mut C, repo: &str) -> PgExecResult {
+1
-1
consumer/src/db/copy.rs
+1
-1
consumer/src/db/copy.rs
···
205
205
)
206
206
.await?
207
207
.into_iter()
208
-
.map(|v| (v.get(0), v.get(1), v.get(2))).collect();
208
+
.map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).collect::<Result<_, _>>()?;
209
209
210
210
for (root, post, created_at) in threadgated {
211
211
match super::post_enforce_threadgate(conn, &root, did, created_at, true).await {
+18
-13
consumer/src/db/gates.rs
+18
-13
consumer/src/db/gates.rs
···
47
47
&[&root_author, &post_author],
48
48
)
49
49
.await?
50
-
.map(|v| (v.get(0), v.get(1)));
50
+
.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()?;
51
51
52
52
if let Some((following, followed)) = profile_state {
53
53
if allow.contains(THREADGATE_RULE_FOLLOWER) && followed {
···
65
65
let mentions: Vec<String> = conn
66
66
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
67
67
.await?
68
-
.map(|r| r.get(0))
68
+
.and_then(|r| r.try_get::<_, Option<_>>(0).transpose())
69
+
.transpose()?
69
70
.unwrap_or_default();
70
71
71
72
if mentions.contains(&post_author.to_owned()) {
···
84
85
&[&allow_lists, &post_author],
85
86
)
86
87
.await?
87
-
.get(0);
88
+
.try_get(0)?;
88
89
if count != 0 {
89
90
return Ok(false);
90
91
}
···
136
137
)
137
138
.await?
138
139
.into_iter()
139
-
.map(|row| row.get(0))
140
-
.collect();
140
+
.map(|row| row.try_get(0))
141
+
.collect::<Result<_, _>>()?;
141
142
142
143
// this will be empty if there are no replies.
143
144
if dids.is_empty() {
···
152
153
})
153
154
.collect::<Vec<_>>();
154
155
155
-
let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str()));
156
+
let allow: HashSet<_> = HashSet::from_iter(allow.iter().map(|v| v.as_str()));
156
157
157
158
if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() {
158
159
let current_dids: Vec<_> = dids.iter().collect();
···
160
161
let res = conn.query(
161
162
"SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL",
162
163
&[&root_author, ¤t_dids]
163
-
).await?;
164
+
).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<HashSet<_>, _>>()?;
164
165
165
-
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
166
+
dids = &dids - &res;
166
167
}
167
168
168
169
if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() {
···
171
172
let res = conn.query(
172
173
"SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL",
173
174
&[&root_author, ¤t_dids]
174
-
).await?;
175
+
).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<_, _>>()?;
175
176
176
-
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
177
+
dids = &dids - &res;
177
178
}
178
179
179
180
if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() {
180
181
let mentions: Vec<String> = conn
181
182
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
182
183
.await?
183
-
.map(|r| r.get(0))
184
+
.and_then(|r| r.try_get::<_, Option<_>>(0).transpose())
185
+
.transpose()?
184
186
.unwrap_or_default();
185
187
186
188
dids = &dids - &HashSet::from_iter(mentions);
···
194
196
"SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)",
195
197
&[&allowed_lists, ¤t_dids],
196
198
)
197
-
.await?;
199
+
.await?
200
+
.into_iter()
201
+
.map(|row| row.try_get(0))
202
+
.collect::<Result<_, _>>()?;
198
203
199
-
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
204
+
dids = &dids - &res;
200
205
}
201
206
202
207
let dids = dids.into_iter().collect::<Vec<_>>();
+17
-20
consumer/src/db/record.rs
+17
-20
consumer/src/db/record.rs
···
127
127
],
128
128
)
129
129
.await
130
-
.map(|r| r.get::<_, i32>(0) == 0)
130
+
.and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0))
131
131
}
132
132
133
133
pub async fn feedgen_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
···
159
159
)
160
160
.await?;
161
161
162
-
Ok(res.map(|v| v.get(0)))
162
+
res.map(|v| v.try_get(0)).transpose()
163
163
}
164
164
165
165
pub async fn labeler_upsert<C: GenericClient>(
···
224
224
)
225
225
.await?;
226
226
227
-
Ok(res.map(|v| v.get(0)))
227
+
res.map(|v| v.try_get(0)).transpose()
228
228
}
229
229
230
230
pub async fn list_upsert<C: GenericClient>(
···
255
255
],
256
256
)
257
257
.await
258
-
.map(|r| r.get::<_, i32>(0) == 0)
258
+
.and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0))
259
259
}
260
260
261
261
pub async fn list_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
···
391
391
)
392
392
.await?;
393
393
394
-
Ok(res.map(|row| (row.get(0), row.get(1))))
394
+
res.map(|row| Ok((row.try_get(0)?, row.try_get(1)?)))
395
+
.transpose()
395
396
}
396
397
397
398
pub async fn post_embed_insert<C: GenericClient>(
···
536
537
conn: &mut C,
537
538
post: &str,
538
539
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
539
-
let res = conn
540
-
.query_opt(
541
-
"SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1",
542
-
&[&post],
543
-
)
544
-
.await?
545
-
.map(|v| (v.get(0), v.get(1), v.get(2)));
546
-
547
-
Ok(res)
540
+
conn.query_opt(
541
+
"SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1",
542
+
&[&post],
543
+
)
544
+
.await?
545
+
.map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?)))
546
+
.transpose()
548
547
}
549
548
550
549
pub async fn postgate_upsert<C: GenericClient>(
···
651
650
)
652
651
.await?;
653
652
654
-
Ok(res.map(|v| v.get(0)))
653
+
res.map(|v| v.try_get(0)).transpose()
655
654
}
656
655
657
656
pub async fn starter_pack_upsert<C: GenericClient>(
···
686
685
],
687
686
)
688
687
.await
689
-
.map(|r| r.get::<_, i32>(0) == 0)
688
+
.and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0))
690
689
}
691
690
692
691
pub async fn starter_pack_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
···
731
730
conn: &mut C,
732
731
post: &str,
733
732
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
734
-
let res = conn
733
+
conn
735
734
.query_opt(
736
735
"SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL",
737
736
&[&post],
738
737
)
739
738
.await?
740
-
.map(|v| (v.get(0), v.get(1), v.get(2)));
741
-
742
-
Ok(res)
739
+
.map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).transpose()
743
740
}
744
741
745
742
pub async fn threadgate_upsert<C: GenericClient>(
-4
consumer/src/indexer/mod.rs
-4
consumer/src/indexer/mod.rs
···
640
640
}
641
641
RecordTypes::AppBskyFeedPostgate(record) => {
642
642
if !at_uri_is_by(&record.post, repo) {
643
-
tracing::warn!("tried to create a postgate on a post we don't control!");
644
643
return Ok(());
645
644
}
646
645
···
670
669
}
671
670
RecordTypes::AppBskyFeedThreadgate(record) => {
672
671
if !at_uri_is_by(&record.post, repo) {
673
-
tracing::warn!("tried to create a threadgate on a post we don't control!");
674
672
return Ok(());
675
673
}
676
674
···
710
708
}
711
709
RecordTypes::AppBskyGraphListItem(record) => {
712
710
if !at_uri_is_by(&record.list, repo) {
713
-
// it's also probably a bad idea to log *all* the attempts to do this...
714
-
tracing::warn!("tried to create a listitem on a list we don't control!");
715
711
return Ok(());
716
712
}
717
713
+25
consumer/src/instrumentation.rs
+25
consumer/src/instrumentation.rs
···
1
+
use tracing::Subscriber;
2
+
use tracing_subscriber::filter::Filtered;
3
+
use tracing_subscriber::layer::SubscriberExt;
4
+
use tracing_subscriber::registry::LookupSpan;
5
+
use tracing_subscriber::util::SubscriberInitExt;
6
+
use tracing_subscriber::{EnvFilter, Layer};
7
+
8
+
pub fn init_instruments(cfg: &crate::config::ConfigInstruments) {
9
+
let log_layer = init_log(cfg.log_json);
10
+
11
+
tracing_subscriber::registry().with(log_layer).init();
12
+
}
13
+
14
+
fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S>
15
+
where
16
+
S: Subscriber + for<'span> LookupSpan<'span>,
17
+
{
18
+
let stdout_filter = EnvFilter::from_default_env();
19
+
20
+
match json {
21
+
true => tracing_subscriber::fmt::layer().json().boxed(),
22
+
false => tracing_subscriber::fmt::layer().boxed(),
23
+
}
24
+
.with_filter(stdout_filter)
25
+
}
+2
-1
consumer/src/main.rs
+2
-1
consumer/src/main.rs
···
12
12
mod db;
13
13
mod firehose;
14
14
mod indexer;
15
+
mod instrumentation;
15
16
mod label_indexer;
16
17
mod utils;
17
18
18
19
#[tokio::main]
19
20
async fn main() -> eyre::Result<()> {
20
-
tracing_subscriber::fmt::init();
21
21
PrometheusBuilder::new().install()?;
22
22
23
23
let cli = cmd::parse();
24
24
let conf = config::load_config()?;
25
25
26
+
instrumentation::init_instruments(&conf.instruments);
26
27
let user_agent = build_ua(&conf.ua_contact);
27
28
28
29
let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
+1
lexica/src/app_bsky/mod.rs
+1
lexica/src/app_bsky/mod.rs
+33
lexica/src/app_bsky/unspecced.rs
+33
lexica/src/app_bsky/unspecced.rs
···
1
+
use crate::app_bsky::feed::{BlockedAuthor, PostView};
2
+
use serde::Serialize;
3
+
4
+
#[derive(Clone, Debug, Serialize)]
5
+
pub struct ThreadV2Item {
6
+
pub uri: String,
7
+
pub depth: i32,
8
+
pub value: ThreadV2ItemType,
9
+
}
10
+
11
+
#[derive(Clone, Debug, Serialize)]
12
+
#[serde(tag = "$type")]
13
+
pub enum ThreadV2ItemType {
14
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemPost")]
15
+
Post(ThreadItemPost),
16
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemNoUnauthenticated")]
17
+
NoUnauthenticated {},
18
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemNotFound")]
19
+
NotFound {},
20
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemBlocked")]
21
+
Blocked { author: BlockedAuthor },
22
+
}
23
+
24
+
#[derive(Clone, Debug, Serialize)]
25
+
#[serde(rename_all = "camelCase")]
26
+
pub struct ThreadItemPost {
27
+
pub post: PostView,
28
+
pub more_parents: bool,
29
+
pub more_replies: i32,
30
+
pub op_thread: bool,
31
+
pub hidden_by_threadgate: bool,
32
+
pub muted_by_viewer: bool,
33
+
}
+2
-2
migrations/2025-09-27-171241_post-tweaks/up.sql
+2
-2
migrations/2025-09-27-171241_post-tweaks/up.sql
···
34
34
language plpgsql as
35
35
$$
36
36
begin
37
-
delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post';
37
+
delete from author_feeds where did = OLD.did and uri = OLD.at_uri and typ = 'post';
38
38
return OLD;
39
39
end;
40
40
$$;
···
67
67
language plpgsql as
68
68
$$
69
69
begin
70
-
delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost';
70
+
delete from author_feeds where did = OLD.did and post = OLD.post and typ = 'repost';
71
71
return OLD;
72
72
end;
73
73
$$;
+8
-2
parakeet/Cargo.toml
+8
-2
parakeet/Cargo.toml
···
6
6
[dependencies]
7
7
async-recursion = "1.1.1"
8
8
axum = { version = "0.8", features = ["json"] }
9
+
axum-tracing-opentelemetry = "0.32"
9
10
axum-extra = { version = "0.10.0", features = ["query", "typed-header"] }
10
11
base64 = "0.22"
11
12
chrono = { version = "0.4.39", features = ["serde"] }
···
21
22
jsonwebtoken = { git = "https://gitlab.com/parakeet-social/jsonwebtoken", branch = "es256k" }
22
23
lexica = { path = "../lexica" }
23
24
multibase = "0.9.1"
25
+
opentelemetry = "0.31.0"
26
+
opentelemetry-otlp = "0.31.0"
27
+
opentelemetry_sdk = "0.31.0"
24
28
parakeet-db = { path = "../parakeet-db" }
25
-
parakeet-index = { path = "../parakeet-index" }
29
+
parakeet-index = { path = "../parakeet-index", features = ["otel"] }
26
30
redis = { version = "0.32", features = ["tokio-native-tls-comp"] }
27
31
reqwest = { version = "0.12", features = ["json"] }
28
32
serde = { version = "1.0.217", features = ["derive"] }
29
33
serde_ipld_dagcbor = "0.6.1"
30
34
serde_json = "1.0.134"
31
35
tokio = { version = "1.42.0", features = ["full"] }
36
+
tower = "0.5"
32
37
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
33
38
tracing = "0.1.40"
34
-
tracing-subscriber = "0.3.18"
39
+
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
40
+
tracing-opentelemetry = "0.32"
+2
-2
parakeet/src/cache.rs
+2
-2
parakeet/src/cache.rs
···
29
29
type Val = V;
30
30
31
31
async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> {
32
-
let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, &key).await.ok()?;
32
+
let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, key).await.ok()?;
33
33
34
34
match serde_ipld_dagcbor::from_slice(&res?) {
35
35
Ok(v) => Some(v),
···
57
57
}
58
58
59
59
async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> {
60
-
let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, &key)
60
+
let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, key)
61
61
.await
62
62
.ok()?;
63
63
+10
parakeet/src/config.rs
+10
parakeet/src/config.rs
···
13
13
14
14
#[derive(Debug, Deserialize)]
15
15
pub struct Config {
16
+
#[serde(flatten)]
17
+
pub instruments: ConfigInstruments,
16
18
pub index_uri: String,
17
19
pub database_url: String,
18
20
pub redis_uri: String,
···
27
29
pub did_allowlist: Option<Vec<String>>,
28
30
#[serde(default)]
29
31
pub migrate: bool,
32
+
}
33
+
34
+
#[derive(Debug, Deserialize)]
35
+
pub struct ConfigInstruments {
36
+
#[serde(default)]
37
+
pub otel_enable: bool,
38
+
#[serde(default)]
39
+
pub log_json: bool,
30
40
}
31
41
32
42
#[derive(Debug, Deserialize)]
+115
-1
parakeet/src/db.rs
+115
-1
parakeet/src/db.rs
···
1
1
use diesel::prelude::*;
2
-
use diesel::sql_types::{Array, Bool, Nullable, Text};
2
+
use diesel::sql_types::{Array, Bool, Integer, Nullable, Text};
3
3
use diesel_async::{AsyncPgConnection, RunQueryDsl};
4
+
use parakeet_db::models::TextArray;
4
5
use parakeet_db::{schema, types};
6
+
use tracing::instrument;
5
7
8
+
#[instrument(skip_all)]
6
9
pub async fn get_actor_status(
7
10
conn: &mut AsyncPgConnection,
8
11
did: &str,
···
37
40
#[diesel(sql_type = Nullable<Text>)]
38
41
pub list_mute: Option<String>,
39
42
}
43
+
44
+
#[instrument(skip_all)]
40
45
pub async fn get_profile_state(
41
46
conn: &mut AsyncPgConnection,
42
47
did: &str,
···
49
54
.await
50
55
.optional()
51
56
}
57
+
58
+
#[instrument(skip_all)]
52
59
pub async fn get_profile_states(
53
60
conn: &mut AsyncPgConnection,
54
61
did: &str,
···
83
90
#[diesel(sql_type = diesel::sql_types::Bool)]
84
91
pub pinned: bool,
85
92
}
93
+
94
+
#[instrument(skip_all)]
86
95
pub async fn get_post_state(
87
96
conn: &mut AsyncPgConnection,
88
97
did: &str,
···
96
105
.optional()
97
106
}
98
107
108
+
#[instrument(skip_all)]
99
109
pub async fn get_post_states(
100
110
conn: &mut AsyncPgConnection,
101
111
did: &str,
···
119
129
pub block: Option<String>,
120
130
}
121
131
132
+
#[instrument(skip_all)]
122
133
pub async fn get_list_state(
123
134
conn: &mut AsyncPgConnection,
124
135
did: &str,
···
132
143
.optional()
133
144
}
134
145
146
+
#[instrument(skip_all)]
135
147
pub async fn get_list_states(
136
148
conn: &mut AsyncPgConnection,
137
149
did: &str,
···
144
156
.await
145
157
}
146
158
159
+
#[instrument(skip_all)]
147
160
pub async fn get_like_state(
148
161
conn: &mut AsyncPgConnection,
149
162
did: &str,
···
161
174
.optional()
162
175
}
163
176
177
+
#[instrument(skip_all)]
164
178
pub async fn get_like_states(
165
179
conn: &mut AsyncPgConnection,
166
180
did: &str,
···
181
195
.await
182
196
}
183
197
198
+
#[instrument(skip_all)]
184
199
pub async fn get_pinned_post_uri(
185
200
conn: &mut AsyncPgConnection,
186
201
did: &str,
···
196
211
.await
197
212
.optional()
198
213
}
214
+
215
+
#[derive(Debug, QueryableByName)]
216
+
#[diesel(check_for_backend(diesel::pg::Pg))]
217
+
#[allow(unused)]
218
+
pub struct ThreadItem {
219
+
#[diesel(sql_type = Text)]
220
+
pub at_uri: String,
221
+
#[diesel(sql_type = Nullable<Text>)]
222
+
pub parent_uri: Option<String>,
223
+
#[diesel(sql_type = Nullable<Text>)]
224
+
pub root_uri: Option<String>,
225
+
#[diesel(sql_type = Integer)]
226
+
pub depth: i32,
227
+
}
228
+
229
+
#[instrument(skip_all)]
230
+
pub async fn get_thread_children(
231
+
conn: &mut AsyncPgConnection,
232
+
uri: &str,
233
+
depth: i32,
234
+
) -> QueryResult<Vec<ThreadItem>> {
235
+
diesel::sql_query(include_str!("sql/thread.sql"))
236
+
.bind::<Text, _>(uri)
237
+
.bind::<Integer, _>(depth)
238
+
.load(conn)
239
+
.await
240
+
}
241
+
242
+
#[instrument(skip_all)]
243
+
pub async fn get_thread_children_branching(
244
+
conn: &mut AsyncPgConnection,
245
+
uri: &str,
246
+
depth: i32,
247
+
branching_factor: i32,
248
+
) -> QueryResult<Vec<ThreadItem>> {
249
+
diesel::sql_query(include_str!("sql/thread_branching.sql"))
250
+
.bind::<Text, _>(uri)
251
+
.bind::<Integer, _>(depth)
252
+
.bind::<Integer, _>(branching_factor)
253
+
.load(conn)
254
+
.await
255
+
}
256
+
257
+
#[derive(Debug, QueryableByName)]
258
+
#[diesel(check_for_backend(diesel::pg::Pg))]
259
+
pub struct HiddenThreadChildItem {
260
+
#[diesel(sql_type = Text)]
261
+
pub at_uri: String,
262
+
}
263
+
264
+
#[instrument(skip_all)]
265
+
pub async fn get_thread_children_hidden(
266
+
conn: &mut AsyncPgConnection,
267
+
uri: &str,
268
+
root: &str,
269
+
) -> QueryResult<Vec<HiddenThreadChildItem>> {
270
+
diesel::sql_query(include_str!("sql/thread_v2_hidden_children.sql"))
271
+
.bind::<Text, _>(uri)
272
+
.bind::<Text, _>(root)
273
+
.load(conn)
274
+
.await
275
+
}
276
+
277
+
#[instrument(skip_all)]
278
+
pub async fn get_thread_parents(
279
+
conn: &mut AsyncPgConnection,
280
+
uri: &str,
281
+
height: i32,
282
+
) -> QueryResult<Vec<ThreadItem>> {
283
+
diesel::sql_query(include_str!("sql/thread_parent.sql"))
284
+
.bind::<Text, _>(uri)
285
+
.bind::<Integer, _>(height)
286
+
.load(conn)
287
+
.await
288
+
}
289
+
290
+
#[instrument(skip_all)]
291
+
pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> {
292
+
schema::posts::table
293
+
.select(schema::posts::root_uri)
294
+
.find(&uri)
295
+
.get_result(conn)
296
+
.await
297
+
.optional()
298
+
.map(|v| v.flatten())
299
+
}
300
+
301
+
#[instrument(skip_all)]
302
+
pub async fn get_threadgate_hiddens(
303
+
conn: &mut AsyncPgConnection,
304
+
uri: &str,
305
+
) -> QueryResult<Option<TextArray>> {
306
+
schema::threadgates::table
307
+
.select(schema::threadgates::hidden_replies)
308
+
.find(&uri)
309
+
.get_result(conn)
310
+
.await
311
+
.optional()
312
+
}
+3
parakeet/src/hydration/embed.rs
+3
parakeet/src/hydration/embed.rs
···
8
8
use lexica::app_bsky::feed::PostView;
9
9
use parakeet_db::models;
10
10
use std::collections::HashMap;
11
+
use tracing::instrument;
11
12
12
13
fn build_aspect_ratio(height: Option<i32>, width: Option<i32>) -> Option<AspectRatio> {
13
14
height
···
176
177
out
177
178
}
178
179
180
+
#[instrument(skip_all)]
179
181
pub async fn hydrate_embed(&self, post: String) -> Option<Embed> {
180
182
let (embed, author) = self.loaders.embed.load(post).await?;
181
183
···
195
197
}
196
198
}
197
199
200
+
#[instrument(skip_all)]
198
201
pub async fn hydrate_embeds(&self, posts: Vec<String>) -> HashMap<String, Embed> {
199
202
let embeds = self.loaders.embed.load_many(posts).await;
200
203
+5
parakeet/src/hydration/feedgen.rs
+5
parakeet/src/hydration/feedgen.rs
···
5
5
use parakeet_db::models;
6
6
use std::collections::HashMap;
7
7
use std::str::FromStr;
8
+
use tracing::instrument;
8
9
9
10
fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState {
10
11
GeneratorViewerState {
···
49
50
}
50
51
51
52
impl super::StatefulHydrator<'_> {
53
+
#[instrument(skip_all)]
52
54
pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> {
53
55
let labels = self.get_label(&feedgen).await;
54
56
let viewer = self.get_feedgen_viewer_state(&feedgen).await;
···
61
63
))
62
64
}
63
65
66
+
#[instrument(skip_all)]
64
67
pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> {
65
68
let labels = self.get_label_many(&feedgens).await;
66
69
let viewers = self.get_feedgen_viewer_states(&feedgens).await;
···
90
93
.collect()
91
94
}
92
95
96
+
#[instrument(skip_all)]
93
97
async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> {
94
98
if let Some(viewer) = &self.current_actor {
95
99
let data = self.loaders.like_state.get(viewer, subject).await?;
···
100
104
}
101
105
}
102
106
107
+
#[instrument(skip_all)]
103
108
async fn get_feedgen_viewer_states(
104
109
&self,
105
110
subjects: &[String],
+12
-9
parakeet/src/hydration/labeler.rs
+12
-9
parakeet/src/hydration/labeler.rs
···
8
8
use parakeet_db::models;
9
9
use std::collections::HashMap;
10
10
use std::str::FromStr;
11
+
use tracing::instrument;
11
12
12
13
fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState {
13
14
LabelerViewerState {
···
42
43
likes: Option<i32>,
43
44
) -> LabelerViewDetailed {
44
45
let reason_types = labeler.reasons.map(|v| {
45
-
v.into_iter()
46
-
.flatten()
47
-
.filter_map(|v| ReasonType::from_str(&v).ok())
46
+
v.iter()
47
+
.filter_map(|v| ReasonType::from_str(v).ok())
48
48
.collect()
49
49
});
50
50
···
74
74
})
75
75
.collect();
76
76
let subject_types = labeler.subject_types.map(|v| {
77
-
v.into_iter()
78
-
.flatten()
79
-
.filter_map(|v| SubjectType::from_str(&v).ok())
77
+
v.iter()
78
+
.filter_map(|v| SubjectType::from_str(v).ok())
80
79
.collect()
81
80
});
82
-
let subject_collections = labeler
83
-
.subject_collections
84
-
.map(|v| v.into_iter().flatten().collect());
81
+
let subject_collections = labeler.subject_collections.map(Vec::from);
85
82
86
83
LabelerViewDetailed {
87
84
uri: format!("at://{}/app.bsky.labeler.service/self", labeler.did),
···
102
99
}
103
100
104
101
impl StatefulHydrator<'_> {
102
+
#[instrument(skip_all)]
105
103
pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> {
106
104
let labels = self.get_label(&labeler).await;
107
105
let viewer = self.get_labeler_viewer_state(&labeler).await;
···
112
110
Some(build_view(labeler, creator, labels, viewer, likes))
113
111
}
114
112
113
+
#[instrument(skip_all)]
115
114
pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> {
116
115
let labels = self.get_label_many(&labelers).await;
117
116
let labelers = self.loaders.labeler.load_many(labelers).await;
···
137
136
.collect()
138
137
}
139
138
139
+
#[instrument(skip_all)]
140
140
pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> {
141
141
let labels = self.get_label(&labeler).await;
142
142
let viewer = self.get_labeler_viewer_state(&labeler).await;
···
149
149
))
150
150
}
151
151
152
+
#[instrument(skip_all)]
152
153
pub async fn hydrate_labelers_detailed(
153
154
&self,
154
155
labelers: Vec<String>,
···
179
180
.collect()
180
181
}
181
182
183
+
#[instrument(skip_all)]
182
184
async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> {
183
185
if let Some(viewer) = &self.current_actor {
184
186
let data = self
···
193
195
}
194
196
}
195
197
198
+
#[instrument(skip_all)]
196
199
async fn get_labeler_viewer_states(
197
200
&self,
198
201
subjects: &[String],
+7
parakeet/src/hydration/list.rs
+7
parakeet/src/hydration/list.rs
···
6
6
use parakeet_db::models;
7
7
use std::collections::HashMap;
8
8
use std::str::FromStr;
9
+
use tracing::instrument;
9
10
10
11
fn build_viewer(data: ListStateRet) -> ListViewerState {
11
12
ListViewerState {
···
69
70
}
70
71
71
72
impl StatefulHydrator<'_> {
73
+
#[instrument(skip_all)]
72
74
pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> {
73
75
let labels = self.get_label(&list).await;
74
76
let viewer = self.get_list_viewer_state(&list).await;
···
77
79
build_basic(list, count, labels, viewer, &self.cdn)
78
80
}
79
81
82
+
#[instrument(skip_all)]
80
83
pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> {
81
84
if lists.is_empty() {
82
85
return HashMap::new();
···
97
100
.collect()
98
101
}
99
102
103
+
#[instrument(skip_all)]
100
104
pub async fn hydrate_list(&self, list: String) -> Option<ListView> {
101
105
let labels = self.get_label(&list).await;
102
106
let viewer = self.get_list_viewer_state(&list).await;
···
106
110
build_listview(list, count, profile, labels, viewer, &self.cdn)
107
111
}
108
112
113
+
#[instrument(skip_all)]
109
114
pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> {
110
115
if lists.is_empty() {
111
116
return HashMap::new();
···
131
136
.collect()
132
137
}
133
138
139
+
#[instrument(skip_all)]
134
140
async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> {
135
141
if let Some(viewer) = &self.current_actor {
136
142
let data = self.loaders.list_state.get(viewer, subject).await?;
···
141
147
}
142
148
}
143
149
150
+
#[instrument(skip_all)]
144
151
async fn get_list_viewer_states(
145
152
&self,
146
153
subjects: &[String],
+4
parakeet/src/hydration/mod.rs
+4
parakeet/src/hydration/mod.rs
···
63
63
}
64
64
}
65
65
66
+
#[tracing::instrument(skip_all)]
66
67
async fn get_label(&self, uri: &str) -> Vec<parakeet_db::models::Label> {
67
68
self.loaders.label.load(uri, self.accept_labelers).await
68
69
}
69
70
71
+
#[tracing::instrument(skip_all)]
70
72
async fn get_profile_label(&self, did: &str) -> Vec<parakeet_db::models::Label> {
71
73
let uris = &[
72
74
did.to_string(),
···
80
82
.collect()
81
83
}
82
84
85
+
#[tracing::instrument(skip_all)]
83
86
async fn get_label_many(
84
87
&self,
85
88
uris: &[String],
···
90
93
.await
91
94
}
92
95
96
+
#[tracing::instrument(skip_all)]
93
97
async fn get_profile_label_many(
94
98
&self,
95
99
uris: &[String],
+12
-3
parakeet/src/hydration/posts.rs
+12
-3
parakeet/src/hydration/posts.rs
···
11
11
use parakeet_db::models;
12
12
use parakeet_index::PostStats;
13
13
use std::collections::HashMap;
14
+
use tracing::instrument;
14
15
15
16
fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState {
16
17
let is_me = did == data.did;
···
82
83
}
83
84
84
85
impl StatefulHydrator<'_> {
86
+
#[instrument(skip_all)]
85
87
async fn hydrate_threadgate(
86
88
&self,
87
89
threadgate: Option<models::Threadgate>,
···
89
91
let threadgate = threadgate?;
90
92
91
93
let lists = match threadgate.allowed_lists.as_ref() {
92
-
Some(allowed_lists) => allowed_lists.iter().flatten().cloned().collect(),
94
+
Some(allowed_lists) => allowed_lists.clone().into(),
93
95
None => Vec::new(),
94
96
};
95
97
let lists = self.hydrate_lists_basic(lists).await;
···
100
102
))
101
103
}
102
104
105
+
#[instrument(skip_all)]
103
106
async fn hydrate_threadgates(
104
107
&self,
105
108
threadgates: Vec<models::Threadgate>,
106
109
) -> HashMap<String, ThreadgateView> {
107
110
let lists = threadgates.iter().fold(Vec::new(), |mut acc, c| {
108
111
if let Some(lists) = &c.allowed_lists {
109
-
acc.extend(lists.iter().flatten().cloned());
112
+
acc.extend(lists.clone().0);
110
113
}
111
114
acc
112
115
});
···
118
121
let this_lists = match &threadgate.allowed_lists {
119
122
Some(allowed_lists) => allowed_lists
120
123
.iter()
121
-
.filter_map(|v| v.clone().and_then(|v| lists.get(&v).cloned()))
124
+
.filter_map(|v| lists.get(v).cloned())
122
125
.collect(),
123
126
None => Vec::new(),
124
127
};
···
131
134
.collect()
132
135
}
133
136
137
+
#[instrument(skip_all)]
134
138
pub async fn hydrate_post(&self, post: String) -> Option<PostView> {
135
139
let stats = self.loaders.post_stats.load(post.clone()).await;
136
140
let (post, threadgate) = self.loaders.posts.load(post).await?;
···
145
149
)))
146
150
}
147
151
152
+
#[instrument(skip_all)]
148
153
async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> {
149
154
let stats = self.loaders.post_stats.load_many(posts.clone()).await;
150
155
let posts = self.loaders.posts.load_many(posts).await;
···
184
189
.collect()
185
190
}
186
191
192
+
#[instrument(skip_all)]
187
193
pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> {
188
194
self.hydrate_posts_inner(posts)
189
195
.await
···
192
198
.collect()
193
199
}
194
200
201
+
#[instrument(skip_all)]
195
202
pub async fn hydrate_feed_posts(
196
203
&self,
197
204
posts: Vec<RawFeedItem>,
···
295
302
.collect()
296
303
}
297
304
305
+
#[instrument(skip_all)]
298
306
async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> {
299
307
if let Some(viewer) = &self.current_actor {
300
308
let data = self.loaders.post_state.get(viewer, subject).await?;
···
305
313
}
306
314
}
307
315
316
+
#[instrument(skip_all)]
308
317
async fn get_post_viewer_states(
309
318
&self,
310
319
subjects: &[String],
+12
-1
parakeet/src/hydration/profile.rs
+12
-1
parakeet/src/hydration/profile.rs
···
12
12
use std::collections::HashMap;
13
13
use std::str::FromStr;
14
14
use std::sync::OnceLock;
15
+
use tracing::instrument;
15
16
16
17
pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new();
17
18
···
51
52
.followed
52
53
.map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject));
53
54
54
-
let blocking = data.list_block.or(data.blocking);
55
+
let blocking = data.list_block.or(data
56
+
.blocking
57
+
.map(|rkey| format!("at://{}/app.bsky.graph.block/{rkey}", data.did)));
55
58
56
59
ProfileViewerState {
57
60
muted: data.muting.unwrap_or_default(),
···
272
275
}
273
276
274
277
impl super::StatefulHydrator<'_> {
278
+
#[instrument(skip_all)]
275
279
pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> {
276
280
let labels = self.get_profile_label(&did).await;
277
281
let viewer = self.get_profile_viewer_state(&did).await;
···
289
293
))
290
294
}
291
295
296
+
#[instrument(skip_all)]
292
297
pub async fn hydrate_profiles_basic(
293
298
&self,
294
299
dids: Vec<String>,
···
313
318
.collect()
314
319
}
315
320
321
+
#[instrument(skip_all)]
316
322
pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> {
317
323
let labels = self.get_profile_label(&did).await;
318
324
let viewer = self.get_profile_viewer_state(&did).await;
···
330
336
))
331
337
}
332
338
339
+
#[instrument(skip_all)]
333
340
pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> {
334
341
let labels = self.get_profile_label_many(&dids).await;
335
342
let viewers = self.get_profile_viewer_states(&dids).await;
···
351
358
.collect()
352
359
}
353
360
361
+
#[instrument(skip_all)]
354
362
pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> {
355
363
let labels = self.get_profile_label(&did).await;
356
364
let viewer = self.get_profile_viewer_state(&did).await;
···
368
376
))
369
377
}
370
378
379
+
#[instrument(skip_all)]
371
380
pub async fn hydrate_profiles_detailed(
372
381
&self,
373
382
dids: Vec<String>,
···
392
401
.collect()
393
402
}
394
403
404
+
#[instrument(skip_all)]
395
405
async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> {
396
406
if let Some(viewer) = &self.current_actor {
397
407
let data = self.loaders.profile_state.get(viewer, subject).await?;
···
411
421
}
412
422
}
413
423
424
+
#[instrument(skip_all)]
414
425
async fn get_profile_viewer_states(
415
426
&self,
416
427
dids: &[String],
+11
-9
parakeet/src/hydration/starter_packs.rs
+11
-9
parakeet/src/hydration/starter_packs.rs
···
4
4
use lexica::app_bsky::graph::{ListViewBasic, StarterPackView, StarterPackViewBasic};
5
5
use parakeet_db::models;
6
6
use std::collections::HashMap;
7
+
use tracing::instrument;
7
8
8
9
fn build_basic(
9
10
starter_pack: models::StaterPack,
···
50
51
}
51
52
52
53
impl StatefulHydrator<'_> {
54
+
#[instrument(skip_all)]
53
55
pub async fn hydrate_starterpack_basic(&self, pack: String) -> Option<StarterPackViewBasic> {
54
56
let labels = self.get_label(&pack).await;
55
57
let sp = self.loaders.starterpacks.load(pack).await?;
···
59
61
Some(build_basic(sp, creator, labels, list_item_count))
60
62
}
61
63
64
+
#[instrument(skip_all)]
62
65
pub async fn hydrate_starterpacks_basic(
63
66
&self,
64
67
packs: Vec<String>,
···
86
89
.collect()
87
90
}
88
91
92
+
#[instrument(skip_all)]
89
93
pub async fn hydrate_starterpack(&self, pack: String) -> Option<StarterPackView> {
90
94
let labels = self.get_label(&pack).await;
91
95
let sp = self.loaders.starterpacks.load(pack).await?;
···
93
97
let creator = self.hydrate_profile_basic(sp.owner.clone()).await?;
94
98
let list = self.hydrate_list_basic(sp.list.clone()).await;
95
99
96
-
let feeds = sp
97
-
.feeds
98
-
.clone()
99
-
.unwrap_or_default()
100
-
.into_iter()
101
-
.flatten()
100
+
let feeds = sp.feeds.clone().unwrap_or_default();
101
+
let feeds = self
102
+
.hydrate_feedgens(feeds.into())
103
+
.await
104
+
.into_values()
102
105
.collect();
103
-
let feeds = self.hydrate_feedgens(feeds).await.into_values().collect();
104
106
105
107
Some(build_spview(sp, creator, labels, list, feeds))
106
108
}
107
109
110
+
#[instrument(skip_all)]
108
111
pub async fn hydrate_starterpacks(
109
112
&self,
110
113
packs: Vec<String>,
···
119
122
let feeds = packs
120
123
.values()
121
124
.filter_map(|pack| pack.feeds.clone())
122
-
.flat_map(|feeds| feeds.into_iter().flatten())
125
+
.flat_map(Vec::from)
123
126
.collect();
124
127
125
128
let creators = self.hydrate_profiles_basic(creators).await;
···
133
136
let list = lists.get(&pack.list).cloned();
134
137
let feeds = pack.feeds.as_ref().map(|v| {
135
138
v.iter()
136
-
.flatten()
137
139
.filter_map(|feed| feeds.get(feed).cloned())
138
140
.collect()
139
141
});
+57
parakeet/src/instrumentation.rs
+57
parakeet/src/instrumentation.rs
···
1
+
use opentelemetry::trace::TracerProvider;
2
+
use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig};
3
+
use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider};
4
+
use tracing::Subscriber;
5
+
use tracing_opentelemetry::OpenTelemetryLayer;
6
+
use tracing_subscriber::filter::Filtered;
7
+
use tracing_subscriber::layer::SubscriberExt;
8
+
use tracing_subscriber::registry::LookupSpan;
9
+
use tracing_subscriber::util::SubscriberInitExt;
10
+
use tracing_subscriber::{EnvFilter, Layer};
11
+
12
+
pub fn init_instruments(cfg: &crate::config::ConfigInstruments) {
13
+
let otel_layer = cfg.otel_enable.then(init_otel);
14
+
let log_layer = init_log(cfg.log_json);
15
+
16
+
tracing_subscriber::registry()
17
+
.with(log_layer)
18
+
.with(otel_layer)
19
+
.init();
20
+
}
21
+
22
+
fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S>
23
+
where
24
+
S: Subscriber + for<'span> LookupSpan<'span>,
25
+
{
26
+
let span_exporter = SpanExporter::builder()
27
+
.with_http()
28
+
.with_protocol(Protocol::HttpBinary)
29
+
.build()
30
+
.unwrap();
31
+
32
+
let tracer_provider = SdkTracerProvider::builder()
33
+
.with_batch_exporter(span_exporter)
34
+
.with_sampler(Sampler::AlwaysOn)
35
+
.build();
36
+
37
+
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
38
+
39
+
let tracer = tracer_provider.tracer("parakeet");
40
+
let otel_filter = EnvFilter::new("info,otel::tracing=trace,tower_http=off");
41
+
42
+
OpenTelemetryLayer::new(tracer).with_filter(otel_filter)
43
+
}
44
+
45
+
fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S>
46
+
where
47
+
S: Subscriber + for<'span> LookupSpan<'span>,
48
+
{
49
+
let stdout_filter =
50
+
EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap());
51
+
52
+
match json {
53
+
true => tracing_subscriber::fmt::layer().json().boxed(),
54
+
false => tracing_subscriber::fmt::layer().boxed(),
55
+
}
56
+
.with_filter(stdout_filter)
57
+
}
+31
-5
parakeet/src/loaders.rs
+31
-5
parakeet/src/loaders.rs
···
4
4
use dataloader::async_cached::Loader;
5
5
use dataloader::non_cached::Loader as NonCachedLoader;
6
6
use dataloader::BatchFn;
7
+
use diesel::dsl::sql;
7
8
use diesel::prelude::*;
8
9
use diesel_async::pooled_connection::deadpool::Pool;
9
10
use diesel_async::{AsyncPgConnection, RunQueryDsl};
···
14
15
use serde::{Deserialize, Serialize};
15
16
use std::collections::HashMap;
16
17
use std::str::FromStr;
18
+
use tracing::instrument;
17
19
18
20
type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>;
19
21
···
62
64
) -> Dataloaders {
63
65
Dataloaders {
64
66
embed: new_plc_loader(EmbedLoader(pool.clone()), &rc, "embed", 3600),
65
-
feedgen: new_plc_loader(FeedGenLoader(pool.clone(), idxc.clone()), &rc, "feedgen", 600),
67
+
feedgen: new_plc_loader(FeedGenLoader(pool.clone()), &rc, "feedgen", 600),
66
68
handle: new_plc_loader(HandleLoader(pool.clone()), &rc, "handle", 60),
67
69
label: LabelLoader(pool.clone()), // CARE: never cache this.
68
-
labeler: new_plc_loader(LabelServiceLoader(pool.clone(), idxc.clone()), &rc, "labeler", 600),
70
+
labeler: new_plc_loader(LabelServiceLoader(pool.clone()), &rc, "labeler", 600),
69
71
like: NonCachedLoader::new(LikeLoader(idxc.clone())),
70
72
like_state: LikeRecordLoader(pool.clone()),
71
73
list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600),
···
84
86
85
87
pub struct LikeLoader(parakeet_index::Client);
86
88
impl BatchFn<String, i32> for LikeLoader {
89
+
#[instrument(name = "LikeLoader", skip_all)]
87
90
async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> {
88
91
let res = self
89
92
.0
···
106
109
107
110
pub struct LikeRecordLoader(Pool<AsyncPgConnection>);
108
111
impl LikeRecordLoader {
112
+
#[instrument(name = "LikeRecordLoader::get", skip_all)]
109
113
pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> {
110
114
let mut conn = self.0.get().await.unwrap();
111
115
···
117
121
})
118
122
}
119
123
124
+
#[instrument(name = "LikeRecordLoader::get_many", skip_all)]
120
125
pub async fn get_many(
121
126
&self,
122
127
did: &str,
···
138
143
139
144
pub struct HandleLoader(Pool<AsyncPgConnection>);
140
145
impl BatchFn<String, String> for HandleLoader {
146
+
#[instrument(name = "HandleLoader", skip_all)]
141
147
async fn load(&mut self, keys: &[String]) -> HashMap<String, String> {
142
148
let mut conn = self.0.get().await.unwrap();
143
149
···
170
176
Option<ProfileAllowSubscriptions>,
171
177
);
172
178
impl BatchFn<String, ProfileLoaderRet> for ProfileLoader {
179
+
#[instrument(name = "ProfileLoader", skip_all)]
173
180
async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> {
174
181
let mut conn = self.0.get().await.unwrap();
175
182
···
231
238
232
239
pub struct ProfileStatsLoader(parakeet_index::Client);
233
240
impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader {
241
+
#[instrument(name = "ProfileStatsLoader", skip_all)]
234
242
async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> {
235
243
let stats_req = parakeet_index::GetStatsManyReq {
236
244
uris: keys.to_vec(),
···
247
255
248
256
pub struct ProfileStateLoader(Pool<AsyncPgConnection>);
249
257
impl ProfileStateLoader {
258
+
#[instrument(name = "ProfileStateLoader::get", skip_all)]
250
259
pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> {
251
260
let mut conn = self.0.get().await.unwrap();
252
261
···
258
267
})
259
268
}
260
269
270
+
#[instrument(name = "ProfileStateLoader::get_many", skip_all)]
261
271
pub async fn get_many(
262
272
&self,
263
273
did: &str,
···
278
288
pub struct ListLoader(Pool<AsyncPgConnection>);
279
289
type ListLoaderRet = (models::List, i64);
280
290
impl BatchFn<String, ListLoaderRet> for ListLoader {
291
+
#[instrument(name = "ListLoaderRet", skip_all)]
281
292
async fn load(&mut self, keys: &[String]) -> HashMap<String, ListLoaderRet> {
282
293
let mut conn = self.0.get().await.unwrap();
283
294
···
309
320
310
321
pub struct ListStateLoader(Pool<AsyncPgConnection>);
311
322
impl ListStateLoader {
323
+
#[instrument(name = "ListStateLoader::get", skip_all)]
312
324
pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> {
313
325
let mut conn = self.0.get().await.unwrap();
314
326
···
320
332
})
321
333
}
322
334
335
+
#[instrument(name = "ListStateLoader::get_many", skip_all)]
323
336
pub async fn get_many(
324
337
&self,
325
338
did: &str,
···
337
350
}
338
351
}
339
352
340
-
pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client);
353
+
pub struct FeedGenLoader(Pool<AsyncPgConnection>);
341
354
impl BatchFn<String, models::FeedGen> for FeedGenLoader {
355
+
#[instrument(name = "FeedGenLoader", skip_all)]
342
356
async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> {
343
357
let mut conn = self.0.get().await.unwrap();
344
358
···
364
378
pub struct PostLoader(Pool<AsyncPgConnection>);
365
379
type PostLoaderRet = (models::Post, Option<models::Threadgate>);
366
380
impl BatchFn<String, PostLoaderRet> for PostLoader {
381
+
#[instrument(name = "PostLoader", skip_all)]
367
382
async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> {
368
383
let mut conn = self.0.get().await.unwrap();
369
384
370
385
let res = schema::posts::table
371
-
.left_join(schema::threadgates::table)
386
+
.left_join(schema::threadgates::table.on(
387
+
schema::threadgates::post_uri.eq(sql("coalesce(posts.root_uri, posts.at_uri)")),
388
+
))
372
389
.select((
373
390
models::Post::as_select(),
374
391
Option::<models::Threadgate>::as_select(),
···
392
409
393
410
pub struct PostStatsLoader(parakeet_index::Client);
394
411
impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader {
412
+
#[instrument(name = "PostStatsLoader", skip_all)]
395
413
async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> {
396
414
let stats_req = parakeet_index::GetStatsManyReq {
397
415
uris: keys.to_vec(),
···
408
426
409
427
pub struct PostStateLoader(Pool<AsyncPgConnection>);
410
428
impl PostStateLoader {
429
+
#[instrument(name = "PostStateLoader::get", skip_all)]
411
430
pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> {
412
431
let mut conn = self.0.get().await.unwrap();
413
432
···
419
438
})
420
439
}
421
440
441
+
#[instrument(name = "PostStateLoader::get_many", skip_all)]
422
442
pub async fn get_many(
423
443
&self,
424
444
did: &str,
···
446
466
RecordWithMedia(models::PostEmbedRecord, Box<EmbedLoaderRet>),
447
467
}
448
468
impl BatchFn<String, (EmbedLoaderRet, String)> for EmbedLoader {
469
+
#[instrument(name = "EmbedLoader", skip_all)]
449
470
async fn load(&mut self, keys: &[String]) -> HashMap<String, (EmbedLoaderRet, String)> {
450
471
let mut conn = self.0.get().await.unwrap();
451
472
···
528
549
pub struct StarterPackLoader(Pool<AsyncPgConnection>);
529
550
type StarterPackLoaderRet = models::StaterPack;
530
551
impl BatchFn<String, StarterPackLoaderRet> for StarterPackLoader {
552
+
#[instrument(name = "StarterPackLoader", skip_all)]
531
553
async fn load(&mut self, keys: &[String]) -> HashMap<String, StarterPackLoaderRet> {
532
554
let mut conn = self.0.get().await.unwrap();
533
555
···
550
572
}
551
573
}
552
574
553
-
pub struct LabelServiceLoader(Pool<AsyncPgConnection>, parakeet_index::Client);
575
+
pub struct LabelServiceLoader(Pool<AsyncPgConnection>);
554
576
type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>);
555
577
impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader {
578
+
#[instrument(name = "LabelServiceLoader", skip_all)]
556
579
async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> {
557
580
let mut conn = self.0.get().await.unwrap();
558
581
···
591
614
// but it should live here anyway
592
615
pub struct LabelLoader(Pool<AsyncPgConnection>);
593
616
impl LabelLoader {
617
+
#[instrument(name = "LabelLoader::load", skip_all)]
594
618
pub async fn load(&self, uri: &str, services: &[LabelConfigItem]) -> Vec<models::Label> {
595
619
let mut conn = self.0.get().await.unwrap();
596
620
···
610
634
})
611
635
}
612
636
637
+
#[instrument(name = "LabelLoader::load_many", skip_all)]
613
638
pub async fn load_many(
614
639
&self,
615
640
uris: &[String],
···
644
669
645
670
pub struct VerificationLoader(Pool<AsyncPgConnection>);
646
671
impl BatchFn<String, Vec<models::VerificationEntry>> for VerificationLoader {
672
+
#[instrument(name = "VerificationLoader", skip_all)]
647
673
async fn load(&mut self, keys: &[String]) -> HashMap<String, Vec<models::VerificationEntry>> {
648
674
let mut conn = self.0.get().await.unwrap();
649
675
+14
-5
parakeet/src/main.rs
+14
-5
parakeet/src/main.rs
···
1
+
use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer};
1
2
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
2
3
use diesel_async::pooled_connection::deadpool::Pool;
3
4
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
···
14
15
mod config;
15
16
mod db;
16
17
mod hydration;
18
+
mod instrumentation;
17
19
mod loaders;
18
20
mod xrpc;
19
21
···
31
33
32
34
#[tokio::main]
33
35
async fn main() -> eyre::Result<()> {
34
-
tracing_subscriber::fmt::init();
35
-
36
36
let conf = config::load_config()?;
37
37
38
+
instrumentation::init_instruments(&conf.instruments);
39
+
38
40
let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url);
39
41
let pool = Pool::builder(db_mgr).build()?;
40
42
···
52
54
let redis_client = redis::Client::open(conf.redis_uri)?;
53
55
let redis_mp = redis_client.get_multiplexed_tokio_connection().await?;
54
56
55
-
let index_client = parakeet_index::Client::connect(conf.index_uri).await?;
57
+
let index_client = parakeet_index::connect_with_otel(conf.index_uri)
58
+
.await
59
+
.map_err(|e| eyre::eyre!(e))?;
56
60
57
61
let dataloaders = Arc::new(loaders::Dataloaders::new(
58
62
pool.clone(),
···
79
83
80
84
let did_doc = did_web_doc(&conf.service);
81
85
86
+
let mw = tower::ServiceBuilder::new()
87
+
.option_layer(conf.instruments.otel_enable.then(OtelInResponseLayer::default))
88
+
.option_layer(conf.instruments.otel_enable.then(OtelAxumLayer::default))
89
+
.layer(TraceLayer::new_for_http())
90
+
.layer(cors);
91
+
82
92
let app = axum::Router::new()
83
93
.nest("/xrpc", xrpc::xrpc_routes())
84
94
.route(
85
95
"/.well-known/did.json",
86
96
axum::routing::get(async || axum::Json(did_doc)),
87
97
)
88
-
.layer(TraceLayer::new_for_http())
89
-
.layer(cors)
98
+
.layer(mw)
90
99
.with_state(GlobalState {
91
100
pool,
92
101
redis_mp,
+1
-1
parakeet/src/sql/thread.sql
+1
-1
parakeet/src/sql/thread.sql
+13
parakeet/src/sql/thread_branching.sql
+13
parakeet/src/sql/thread_branching.sql
···
1
+
with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth
2
+
from posts
3
+
where parent_uri = $1
4
+
and violates_threadgate = FALSE
5
+
union all
6
+
(select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1
7
+
from posts p
8
+
join thread on p.parent_uri = thread.at_uri
9
+
where thread.depth <= $2
10
+
and violates_threadgate = FALSE
11
+
LIMIT $3))
12
+
select *
13
+
from thread;
+1
-1
parakeet/src/xrpc/app_bsky/bookmark.rs
+1
-1
parakeet/src/xrpc/app_bsky/bookmark.rs
+8
-26
parakeet/src/xrpc/app_bsky/feed/posts.rs
+8
-26
parakeet/src/xrpc/app_bsky/feed/posts.rs
···
24
24
use reqwest::Url;
25
25
use serde::{Deserialize, Serialize};
26
26
use std::collections::HashMap;
27
+
use tracing::instrument;
27
28
28
29
const FEEDGEN_SERVICE_ID: &str = "#bsky_fg";
29
30
···
160
161
161
162
#[derive(Debug, Default, Eq, PartialEq, Deserialize)]
162
163
#[serde(rename_all = "snake_case")]
164
+
#[allow(clippy::enum_variant_names)]
163
165
pub enum GetAuthorFeedFilter {
164
166
#[default]
165
167
PostsWithReplies,
···
198
200
if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? {
199
201
if psr.blocked.unwrap_or_default() {
200
202
// they block us
201
-
return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None))
203
+
return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None));
202
204
} else if psr.blocking.is_some() {
203
205
// we block them
204
-
return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None))
206
+
return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None));
205
207
}
206
208
}
207
209
}
···
361
363
pub threadgate: Option<ThreadgateView>,
362
364
}
363
365
364
-
#[derive(Debug, QueryableByName)]
365
-
#[diesel(check_for_backend(diesel::pg::Pg))]
366
-
struct ThreadItem {
367
-
#[diesel(sql_type = diesel::sql_types::Text)]
368
-
at_uri: String,
369
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
370
-
parent_uri: Option<String>,
371
-
// #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
372
-
// root_uri: Option<String>,
373
-
#[diesel(sql_type = diesel::sql_types::Integer)]
374
-
depth: i32,
375
-
}
376
-
377
366
pub async fn get_post_thread(
378
367
State(state): State<GlobalState>,
379
368
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
···
409
398
}
410
399
}
411
400
412
-
let replies = diesel::sql_query(include_str!("../../../sql/thread.sql"))
413
-
.bind::<diesel::sql_types::Text, _>(&uri)
414
-
.bind::<diesel::sql_types::Integer, _>(depth as i32)
415
-
.load::<ThreadItem>(&mut conn)
416
-
.await?;
417
-
418
-
let parents = diesel::sql_query(include_str!("../../../sql/thread_parent.sql"))
419
-
.bind::<diesel::sql_types::Text, _>(&uri)
420
-
.bind::<diesel::sql_types::Integer, _>(parent_height as i32)
421
-
.load::<ThreadItem>(&mut conn)
422
-
.await?;
401
+
let replies = crate::db::get_thread_children(&mut conn, &uri, depth as i32).await?;
402
+
let parents = crate::db::get_thread_parents(&mut conn, &uri, parent_height as i32).await?;
423
403
424
404
let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect();
425
405
let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect();
···
634
614
.or(schema::posts::embed_subtype.eq_any(filter))
635
615
}
636
616
617
+
#[instrument(skip_all)]
637
618
async fn get_feed_skeleton(
638
619
feed: &str,
639
620
service: &str,
···
675
656
}
676
657
}
677
658
659
+
#[instrument(skip_all)]
678
660
async fn get_skeleton_repost_data(
679
661
conn: &mut AsyncPgConnection,
680
662
reposts: Vec<String>,
+3
parakeet/src/xrpc/app_bsky/mod.rs
+3
parakeet/src/xrpc/app_bsky/mod.rs
···
6
6
mod feed;
7
7
mod graph;
8
8
mod labeler;
9
+
mod unspecced;
9
10
10
11
#[rustfmt::skip]
11
12
pub fn routes() -> Router<crate::GlobalState> {
···
64
65
// TODO: app.bsky.notification.putActivitySubscriptions
65
66
// TODO: app.bsky.notification.putPreferences
66
67
// TODO: app.bsky.notification.putPreferencesV2
68
+
.route("/app.bsky.unspecced.getPostThreadV2", get(unspecced::thread_v2::get_post_thread_v2))
69
+
.route("/app.bsky.unspecced.getPostThreadOtherV2", get(unspecced::thread_v2::get_post_thread_other_v2))
67
70
}
68
71
69
72
async fn not_implemented() -> axum::http::StatusCode {
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
···
1
+
pub mod thread_v2;
+379
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
+379
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
···
1
+
use crate::db::ThreadItem;
2
+
use crate::hydration::StatefulHydrator;
3
+
use crate::xrpc::error::{Error, XrpcResult};
4
+
use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth};
5
+
use crate::xrpc::normalise_at_uri;
6
+
use crate::GlobalState;
7
+
use axum::extract::{Query, State};
8
+
use axum::Json;
9
+
use itertools::Itertools;
10
+
use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadgateView};
11
+
use lexica::app_bsky::unspecced::{ThreadItemPost, ThreadV2Item, ThreadV2ItemType};
12
+
use serde::{Deserialize, Serialize};
13
+
use std::cmp::Ordering;
14
+
use std::collections::{HashMap, HashSet};
15
+
16
+
const THREAD_PARENTS: usize = 50;
17
+
const DEFAULT_BRANCHING: u32 = 10;
18
+
const DEFAULT_DEPTH: u32 = 6;
19
+
20
+
#[derive(Copy, Clone, Debug, Default, Deserialize)]
21
+
#[serde(rename_all = "lowercase")]
22
+
pub enum PostThreadSort {
23
+
Newest,
24
+
#[default]
25
+
Oldest,
26
+
Top,
27
+
}
28
+
29
+
#[derive(Debug, Deserialize)]
30
+
#[serde(rename_all = "camelCase")]
31
+
pub struct GetPostThreadV2Req {
32
+
pub anchor: String,
33
+
pub above: Option<bool>,
34
+
pub below: Option<u32>,
35
+
pub branching_factor: Option<u32>,
36
+
#[serde(default)]
37
+
pub sort: PostThreadSort,
38
+
}
39
+
40
+
#[derive(Debug, Serialize)]
41
+
#[serde(rename_all = "camelCase")]
42
+
pub struct GetPostThreadV2Res {
43
+
pub thread: Vec<ThreadV2Item>,
44
+
#[serde(skip_serializing_if = "Option::is_none")]
45
+
pub threadgate: Option<ThreadgateView>,
46
+
pub has_other_replies: bool,
47
+
}
48
+
49
+
pub async fn get_post_thread_v2(
50
+
State(state): State<GlobalState>,
51
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
52
+
maybe_auth: Option<AtpAuth>,
53
+
Query(query): Query<GetPostThreadV2Req>,
54
+
) -> XrpcResult<Json<GetPostThreadV2Res>> {
55
+
let mut conn = state.pool.get().await?;
56
+
let maybe_did = maybe_auth.clone().map(|v| v.0);
57
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
58
+
59
+
let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?;
60
+
let depth = query.below.unwrap_or(DEFAULT_DEPTH).clamp(0, 20) as i32;
61
+
let branching_factor = query
62
+
.branching_factor
63
+
.unwrap_or(DEFAULT_BRANCHING)
64
+
.clamp(0, 100) as i32;
65
+
66
+
let anchor = hyd
67
+
.hydrate_post(uri.clone())
68
+
.await
69
+
.ok_or(Error::not_found())?;
70
+
71
+
if let Some(v) = &anchor.author.viewer {
72
+
if v.blocked_by || v.blocking.is_some() {
73
+
let block = ThreadV2ItemType::Blocked {
74
+
author: BlockedAuthor {
75
+
did: anchor.author.did,
76
+
viewer: anchor.author.viewer,
77
+
},
78
+
};
79
+
80
+
return Ok(Json(GetPostThreadV2Res {
81
+
thread: vec![ThreadV2Item {
82
+
uri,
83
+
depth: 0,
84
+
value: block,
85
+
}],
86
+
threadgate: anchor.threadgate,
87
+
has_other_replies: false,
88
+
}));
89
+
}
90
+
}
91
+
92
+
// get the root post URI (if there is one) and return its author's DID.
93
+
let root_uri = crate::db::get_root_post(&mut conn, &uri)
94
+
.await?
95
+
.unwrap_or(uri.clone());
96
+
let root_did = root_uri[5..].split('/').collect::<Vec<_>>()[0];
97
+
98
+
let replies =
99
+
crate::db::get_thread_children_branching(&mut conn, &uri, depth, branching_factor + 1)
100
+
.await?;
101
+
let reply_uris = replies
102
+
.iter()
103
+
.map(|item| item.at_uri.clone())
104
+
.collect::<Vec<_>>();
105
+
106
+
// bluesky seems to use -50 atm. we get 1 extra to know if to set more_parents.
107
+
let parents = match query.above.unwrap_or(true) {
108
+
true => crate::db::get_thread_parents(&mut conn, &uri, THREAD_PARENTS as i32 + 1).await?,
109
+
false => vec![],
110
+
};
111
+
let parent_uris = parents
112
+
.iter()
113
+
.map(|item| item.at_uri.clone())
114
+
.collect::<Vec<_>>();
115
+
116
+
let (mut replies_hyd, mut parents_hyd) = tokio::join!(
117
+
hyd.hydrate_posts(reply_uris),
118
+
hyd.hydrate_posts(parent_uris),
119
+
);
120
+
121
+
let threadgate = anchor.threadgate.clone();
122
+
let hidden: HashSet<_, std::hash::RandomState> = match &threadgate {
123
+
Some(tg) => crate::db::get_threadgate_hiddens(&mut conn, &tg.uri).await?,
124
+
None => None,
125
+
}
126
+
.map(|hiddens| HashSet::from_iter(Vec::from(hiddens)))
127
+
.unwrap_or_default();
128
+
129
+
let root_has_more = parents.len() > THREAD_PARENTS;
130
+
let mut is_op_thread = true;
131
+
132
+
let mut thread = Vec::with_capacity(1 + replies.len() + parents.len());
133
+
134
+
thread.extend(
135
+
parents
136
+
.into_iter()
137
+
.tail(THREAD_PARENTS)
138
+
.enumerate()
139
+
.map(|(idx, item)| {
140
+
let value = parents_hyd
141
+
.remove(&item.at_uri)
142
+
.map(|post| {
143
+
if let Some(v) = &post.author.viewer {
144
+
if v.blocked_by || v.blocking.is_some() {
145
+
return ThreadV2ItemType::Blocked {
146
+
author: BlockedAuthor {
147
+
did: post.author.did,
148
+
viewer: post.author.viewer,
149
+
},
150
+
};
151
+
}
152
+
}
153
+
154
+
let op_thread = (is_op_thread
155
+
|| item.root_uri.is_none() && item.parent_uri.is_none())
156
+
&& post.author.did == root_did;
157
+
158
+
ThreadV2ItemType::Post(ThreadItemPost {
159
+
post,
160
+
more_parents: idx == 0 && root_has_more,
161
+
more_replies: 0,
162
+
op_thread,
163
+
hidden_by_threadgate: false,
164
+
muted_by_viewer: false,
165
+
})
166
+
})
167
+
.unwrap_or(ThreadV2ItemType::NotFound {});
168
+
169
+
ThreadV2Item {
170
+
uri: item.at_uri,
171
+
depth: -item.depth - 1,
172
+
value,
173
+
}
174
+
}),
175
+
);
176
+
177
+
is_op_thread = is_op_thread && anchor.author.did == root_did;
178
+
thread.push(ThreadV2Item {
179
+
uri: uri.clone(),
180
+
depth: 0,
181
+
value: ThreadV2ItemType::Post(ThreadItemPost {
182
+
post: anchor,
183
+
more_parents: false,
184
+
more_replies: 0,
185
+
op_thread: is_op_thread,
186
+
hidden_by_threadgate: false,
187
+
muted_by_viewer: false,
188
+
}),
189
+
});
190
+
191
+
let mut replies_grouped = replies
192
+
.into_iter()
193
+
.into_group_map_by(|item| item.parent_uri.clone().unwrap_or_default());
194
+
195
+
// start with the anchor
196
+
let (children, has_other_replies) = build_thread_children(
197
+
&mut replies_grouped,
198
+
&mut replies_hyd,
199
+
&hidden,
200
+
&uri,
201
+
is_op_thread,
202
+
1,
203
+
&BuildThreadChildrenOpts {
204
+
root_did,
205
+
sort: query.sort,
206
+
maybe_did: &maybe_did,
207
+
max_depth: depth,
208
+
},
209
+
);
210
+
thread.extend(children);
211
+
212
+
Ok(Json(GetPostThreadV2Res {
213
+
thread,
214
+
threadgate,
215
+
has_other_replies,
216
+
}))
217
+
}
218
+
219
+
#[derive(Debug, Deserialize)]
220
+
#[serde(rename_all = "camelCase")]
221
+
pub struct GetPostThreadOtherV2Req {
222
+
pub anchor: String,
223
+
}
224
+
225
+
#[derive(Debug, Serialize)]
226
+
#[serde(rename_all = "camelCase")]
227
+
pub struct GetPostThreadOtherV2Res {
228
+
pub thread: Vec<ThreadV2Item>,
229
+
}
230
+
231
+
pub async fn get_post_thread_other_v2(
232
+
State(state): State<GlobalState>,
233
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
234
+
maybe_auth: Option<AtpAuth>,
235
+
Query(query): Query<GetPostThreadOtherV2Req>,
236
+
) -> XrpcResult<Json<GetPostThreadOtherV2Res>> {
237
+
let mut conn = state.pool.get().await?;
238
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
239
+
240
+
let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?;
241
+
242
+
let root = crate::db::get_root_post(&mut conn, &uri)
243
+
.await?
244
+
.unwrap_or(uri.clone());
245
+
246
+
// this only returns immediate children (depth==1) where hiddenByThreadgate=TRUE
247
+
let replies = crate::db::get_thread_children_hidden(&mut conn, &uri, &root).await?;
248
+
let reply_uris = replies
249
+
.into_iter()
250
+
.map(|item| item.at_uri)
251
+
.collect::<Vec<_>>();
252
+
let thread = hyd
253
+
.hydrate_posts(reply_uris)
254
+
.await
255
+
.into_iter()
256
+
.filter(|(_, post)| matches!(&post.author.viewer, Some(viewer) if viewer.blocked_by || viewer.blocking.is_some()))
257
+
.map(|(uri, post)| {
258
+
let post = ThreadItemPost {
259
+
post,
260
+
more_parents: false,
261
+
more_replies: 0,
262
+
op_thread: false,
263
+
hidden_by_threadgate: true,
264
+
muted_by_viewer: false,
265
+
};
266
+
267
+
ThreadV2Item {
268
+
uri,
269
+
depth: 1,
270
+
value: ThreadV2ItemType::Post(post),
271
+
}
272
+
})
273
+
.collect();
274
+
275
+
Ok(Json(GetPostThreadOtherV2Res { thread }))
276
+
}
277
+
278
+
#[derive(Debug)]
279
+
struct BuildThreadChildrenOpts<'a> {
280
+
root_did: &'a str,
281
+
sort: PostThreadSort,
282
+
maybe_did: &'a Option<String>,
283
+
max_depth: i32,
284
+
}
285
+
286
+
fn build_thread_children(
287
+
grouped_replies: &mut HashMap<String, Vec<ThreadItem>>,
288
+
replies_hyd: &mut HashMap<String, PostView>,
289
+
hidden: &HashSet<String>,
290
+
parent: &str,
291
+
is_op_thread: bool,
292
+
depth: i32,
293
+
opts: &BuildThreadChildrenOpts,
294
+
) -> (Vec<ThreadV2Item>, bool) {
295
+
let mut has_other_replies = false;
296
+
297
+
let Some(replies) = grouped_replies.remove(parent) else {
298
+
return (Vec::default(), has_other_replies);
299
+
};
300
+
301
+
let replies = replies
302
+
.into_iter()
303
+
.filter_map(|item| replies_hyd.remove(&item.at_uri))
304
+
.sorted_by(sort_replies(&opts.sort));
305
+
306
+
let mut out = Vec::new();
307
+
308
+
for post in replies {
309
+
let reply_count = grouped_replies
310
+
.get(&post.uri)
311
+
.map(|v| v.len())
312
+
.unwrap_or_default();
313
+
let at_max = depth == opts.max_depth;
314
+
let more_replies = if at_max { reply_count } else { 0 };
315
+
let op_thread = is_op_thread && post.author.did == opts.root_did;
316
+
317
+
// shouldn't push to the thread if there's a block relation. Bsky doesn't push a type of Blocked for replies...
318
+
if let Some(v) = &post.author.viewer {
319
+
if v.blocked_by || v.blocking.is_some() {
320
+
continue;
321
+
}
322
+
}
323
+
324
+
// check if the post is hidden AND we're NOT the author (hidden posts still show for their author)
325
+
if hidden.contains(&post.uri) && !did_is_cur(opts.maybe_did, &post.author.did) {
326
+
// post is hidden - do not ~pass go~ push to the thread.
327
+
if depth == 1 {
328
+
has_other_replies = true;
329
+
}
330
+
continue;
331
+
}
332
+
333
+
let uri = post.uri.clone();
334
+
out.push(ThreadV2Item {
335
+
uri: post.uri.clone(),
336
+
depth,
337
+
value: ThreadV2ItemType::Post(ThreadItemPost {
338
+
post,
339
+
more_parents: false,
340
+
more_replies: more_replies as i32,
341
+
op_thread,
342
+
hidden_by_threadgate: false,
343
+
muted_by_viewer: false,
344
+
}),
345
+
});
346
+
347
+
if !at_max {
348
+
// we don't care about has_other_replies when recursing
349
+
let (children, _) = build_thread_children(
350
+
grouped_replies,
351
+
replies_hyd,
352
+
hidden,
353
+
&uri,
354
+
op_thread,
355
+
depth + 1,
356
+
opts,
357
+
);
358
+
359
+
out.extend(children);
360
+
}
361
+
}
362
+
363
+
(out, has_other_replies)
364
+
}
365
+
366
+
fn sort_replies(sort: &PostThreadSort) -> impl Fn(&PostView, &PostView) -> Ordering + use<'_> {
367
+
move |a: &PostView, b: &PostView| match sort {
368
+
PostThreadSort::Newest => b.indexed_at.cmp(&a.indexed_at),
369
+
PostThreadSort::Oldest => a.indexed_at.cmp(&b.indexed_at),
370
+
PostThreadSort::Top => b.stats.like_count.cmp(&a.stats.like_count),
371
+
}
372
+
}
373
+
374
+
fn did_is_cur(cur: &Option<String>, did: &String) -> bool {
375
+
match cur {
376
+
Some(cur) => did == cur,
377
+
None => false,
378
+
}
379
+
}
+1
-1
parakeet/src/xrpc/community_lexicon/bookmarks.rs
+1
-1
parakeet/src/xrpc/community_lexicon/bookmarks.rs
+3
parakeet/src/xrpc/jwt.rs
+3
parakeet/src/xrpc/jwt.rs
···
4
4
use std::collections::HashMap;
5
5
use std::sync::{Arc, LazyLock};
6
6
use tokio::sync::RwLock;
7
+
use tracing::instrument;
7
8
8
9
static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[]));
9
10
static NO_VERIFY: LazyLock<Validation> = LazyLock::new(|| {
···
38
39
}
39
40
}
40
41
42
+
#[instrument(skip_all)]
41
43
pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> {
42
44
// first we need to decode without verifying, to get iss.
43
45
let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?;
···
56
58
self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud)
57
59
}
58
60
61
+
#[instrument(skip_all)]
59
62
async fn resolve_key(&self, did: &str) -> Option<String> {
60
63
tracing::trace!("resolving multikey for {did}");
61
64
let did_doc = self.resolver.resolve_did(did).await.ok()??;
+55
-13
parakeet-db/src/models.rs
+55
-13
parakeet-db/src/models.rs
···
137
137
138
138
pub content: String,
139
139
pub facets: Option<serde_json::Value>,
140
-
pub languages: Vec<Option<String>>,
141
-
pub tags: Vec<Option<String>>,
140
+
pub languages: not_null_vec::TextArray,
141
+
pub tags: not_null_vec::TextArray,
142
142
143
143
pub parent_uri: Option<String>,
144
144
pub parent_cid: Option<String>,
···
148
148
pub embed: Option<String>,
149
149
pub embed_subtype: Option<String>,
150
150
151
-
pub mentions: Option<Vec<Option<String>>>,
151
+
pub mentions: Option<not_null_vec::TextArray>,
152
152
pub violates_threadgate: bool,
153
153
154
154
pub created_at: DateTime<Utc>,
···
236
236
pub cid: String,
237
237
pub post_uri: String,
238
238
239
-
pub detached: Vec<Option<String>>,
240
-
pub rules: Vec<Option<String>>,
239
+
pub detached: not_null_vec::TextArray,
240
+
pub rules: not_null_vec::TextArray,
241
241
242
242
pub created_at: DateTime<Utc>,
243
243
pub indexed_at: NaiveDateTime,
···
252
252
pub cid: String,
253
253
pub post_uri: String,
254
254
255
-
pub hidden_replies: Vec<Option<String>>,
256
-
pub allow: Option<Vec<Option<String>>>,
257
-
pub allowed_lists: Option<Vec<Option<String>>>,
255
+
pub hidden_replies: not_null_vec::TextArray,
256
+
pub allow: Option<not_null_vec::TextArray>,
257
+
pub allowed_lists: Option<not_null_vec::TextArray>,
258
258
259
259
pub record: serde_json::Value,
260
260
···
276
276
pub description: Option<String>,
277
277
pub description_facets: Option<serde_json::Value>,
278
278
pub list: String,
279
-
pub feeds: Option<Vec<Option<String>>>,
279
+
pub feeds: Option<not_null_vec::TextArray>,
280
280
281
281
pub created_at: DateTime<Utc>,
282
282
pub indexed_at: NaiveDateTime,
···
290
290
pub did: String,
291
291
pub cid: String,
292
292
293
-
pub reasons: Option<Vec<Option<String>>>,
294
-
pub subject_types: Option<Vec<Option<String>>>,
295
-
pub subject_collections: Option<Vec<Option<String>>>,
293
+
pub reasons: Option<not_null_vec::TextArray>,
294
+
pub subject_types: Option<not_null_vec::TextArray>,
295
+
pub subject_collections: Option<not_null_vec::TextArray>,
296
296
297
297
pub created_at: NaiveDateTime,
298
298
pub indexed_at: NaiveDateTime,
···
402
402
pub subject: String,
403
403
pub subject_cid: Option<String>,
404
404
pub subject_type: String,
405
-
pub tags: Vec<Option<String>>,
405
+
pub tags: not_null_vec::TextArray,
406
406
pub created_at: DateTime<Utc>,
407
407
}
408
408
···
430
430
pub typ: String,
431
431
pub sort_at: DateTime<Utc>,
432
432
}
433
+
434
+
pub use not_null_vec::TextArray;
435
+
mod not_null_vec {
436
+
use diesel::deserialize::FromSql;
437
+
use diesel::pg::Pg;
438
+
use diesel::sql_types::{Array, Nullable, Text};
439
+
use diesel::{deserialize, FromSqlRow};
440
+
use serde::{Deserialize, Serialize};
441
+
use std::ops::{Deref, DerefMut};
442
+
443
+
#[derive(Clone, Debug, Default, Serialize, Deserialize, FromSqlRow)]
444
+
#[diesel(sql_type = Array<Nullable<Text>>)]
445
+
pub struct TextArray(pub Vec<String>);
446
+
447
+
impl FromSql<Array<Nullable<Text>>, Pg> for TextArray {
448
+
fn from_sql(bytes: diesel::pg::PgValue<'_>) -> deserialize::Result<Self> {
449
+
let vec_with_nulls =
450
+
<Vec<Option<String>> as FromSql<Array<Nullable<Text>>, Pg>>::from_sql(bytes)?;
451
+
Ok(TextArray(vec_with_nulls.into_iter().flatten().collect()))
452
+
}
453
+
}
454
+
455
+
impl Deref for TextArray {
456
+
type Target = Vec<String>;
457
+
458
+
fn deref(&self) -> &Self::Target {
459
+
&self.0
460
+
}
461
+
}
462
+
463
+
impl DerefMut for TextArray {
464
+
fn deref_mut(&mut self) -> &mut Self::Target {
465
+
&mut self.0
466
+
}
467
+
}
468
+
469
+
impl From<TextArray> for Vec<String> {
470
+
fn from(v: TextArray) -> Vec<String> {
471
+
v.0
472
+
}
473
+
}
474
+
}
+24
-2
parakeet-index/Cargo.toml
+24
-2
parakeet-index/Cargo.toml
···
10
10
[dependencies]
11
11
tonic = "0.13.0"
12
12
prost = "0.13.5"
13
+
tonic-tracing-opentelemetry = { version = "0.32", optional = true }
14
+
tower = { version = "0.5", optional = true }
13
15
14
16
eyre = { version = "0.6.12", optional = true }
15
17
figment = { version = "0.10.19", features = ["env", "toml"], optional = true }
16
18
itertools = { version = "0.14.0", optional = true }
19
+
opentelemetry = { version = "0.31.0", optional = true }
20
+
opentelemetry-otlp = { version = "0.31.0", features = ["reqwest-rustls"], optional = true }
21
+
opentelemetry_sdk = { version = "0.31.0", optional = true }
17
22
rocksdb = { version = "0.23", default-features = false, features = ["lz4", "bindgen-runtime"], optional = true }
18
23
serde = { version = "1.0.217", features = ["derive"], optional = true }
19
24
tokio = { version = "1.42.0", features = ["full"], optional = true }
20
25
tonic-health = { version = "0.13.0", optional = true }
21
26
tracing = { version = "0.1.40", optional = true }
22
-
tracing-subscriber = { version = "0.3.18", optional = true }
27
+
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true }
28
+
tracing-opentelemetry = { version = "0.32", optional = true }
23
29
24
30
[build-dependencies]
25
31
tonic-build = "0.13.0"
26
32
27
33
[features]
28
-
server = ["dep:eyre", "dep:figment", "dep:itertools", "dep:rocksdb", "dep:serde", "dep:tokio", "dep:tonic-health", "dep:tracing", "dep:tracing-subscriber"]
34
+
otel = ["dep:tonic-tracing-opentelemetry", "dep:tower"]
35
+
server = [
36
+
"dep:eyre",
37
+
"dep:figment",
38
+
"dep:itertools",
39
+
"dep:opentelemetry",
40
+
"dep:opentelemetry-otlp",
41
+
"dep:opentelemetry_sdk",
42
+
"dep:rocksdb",
43
+
"dep:serde",
44
+
"dep:tokio",
45
+
"dep:tonic-health",
46
+
"otel",
47
+
"dep:tracing",
48
+
"dep:tracing-subscriber",
49
+
"dep:tracing-opentelemetry"
50
+
]
+20
-1
parakeet-index/src/lib.rs
+20
-1
parakeet-index/src/lib.rs
···
1
+
use tonic::transport::Channel;
2
+
1
3
#[allow(clippy::all)]
2
4
pub mod index {
3
5
tonic::include_proto!("parakeet");
4
6
}
5
7
6
8
pub use index::*;
7
-
pub type Client = index_client::IndexClient<tonic::transport::Channel>;
9
+
#[cfg(not(feature = "otel"))]
10
+
pub type Client = index_client::IndexClient<Channel>;
11
+
#[cfg(feature = "otel")]
12
+
pub type Client = index_client::IndexClient<
13
+
tonic_tracing_opentelemetry::middleware::client::OtelGrpcService<Channel>,
14
+
>;
8
15
9
16
#[cfg(feature = "server")]
10
17
pub mod server;
18
+
19
+
#[cfg(feature = "otel")]
20
+
pub async fn connect_with_otel(
21
+
uri: String,
22
+
) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> {
23
+
let channel = Channel::from_shared(uri)?.connect().await?;
24
+
let channel = tower::ServiceBuilder::new()
25
+
.layer(tonic_tracing_opentelemetry::middleware::client::OtelGrpcLayer)
26
+
.service(channel);
27
+
28
+
Ok(index_client::IndexClient::new(channel))
29
+
}
+9
-3
parakeet-index/src/main.rs
+9
-3
parakeet-index/src/main.rs
···
1
1
use parakeet_index::index_server::IndexServer;
2
2
use parakeet_index::server::service::Service;
3
-
use parakeet_index::server::{GlobalState, config};
3
+
use parakeet_index::server::{GlobalState, config, instrumentation};
4
4
use std::sync::Arc;
5
5
use tonic::transport::Server;
6
+
use tonic_tracing_opentelemetry::middleware::server::OtelGrpcLayer;
6
7
7
8
#[tokio::main]
8
9
async fn main() -> eyre::Result<()> {
9
-
tracing_subscriber::fmt::init();
10
-
11
10
let conf = config::load_config()?;
12
11
12
+
instrumentation::init_instruments(&conf.instruments);
13
+
13
14
let db_root = conf.index_db_path.parse()?;
14
15
let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port);
15
16
let state = Arc::new(GlobalState::new(db_root)?);
···
18
19
reporter.set_serving::<IndexServer<Service>>().await;
19
20
20
21
let service = Service::new(state.clone());
22
+
23
+
let mw = tower::ServiceBuilder::new()
24
+
.option_layer(conf.instruments.otel_enable.then(OtelGrpcLayer::default));
25
+
21
26
Server::builder()
27
+
.layer(mw)
22
28
.add_service(health_service)
23
29
.add_service(IndexServer::new(service))
24
30
.serve(addr)
+10
parakeet-index/src/server/config.rs
+10
parakeet-index/src/server/config.rs
···
13
13
14
14
#[derive(Debug, Deserialize)]
15
15
pub struct Config {
16
+
#[serde(flatten)]
17
+
pub instruments: ConfigInstruments,
16
18
pub database_url: String,
17
19
pub index_db_path: String,
18
20
#[serde(default)]
19
21
pub server: ConfigServer,
22
+
}
23
+
24
+
#[derive(Debug, Deserialize)]
25
+
pub struct ConfigInstruments {
26
+
#[serde(default)]
27
+
pub otel_enable: bool,
28
+
#[serde(default)]
29
+
pub log_json: bool,
20
30
}
21
31
22
32
#[derive(Debug, Deserialize)]
+57
parakeet-index/src/server/instrumentation.rs
+57
parakeet-index/src/server/instrumentation.rs
···
1
+
use opentelemetry::trace::TracerProvider;
2
+
use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig};
3
+
use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider};
4
+
use tracing::Subscriber;
5
+
use tracing_opentelemetry::OpenTelemetryLayer;
6
+
use tracing_subscriber::filter::Filtered;
7
+
use tracing_subscriber::layer::SubscriberExt;
8
+
use tracing_subscriber::registry::LookupSpan;
9
+
use tracing_subscriber::util::SubscriberInitExt;
10
+
use tracing_subscriber::{EnvFilter, Layer};
11
+
12
+
pub fn init_instruments(cfg: &super::config::ConfigInstruments) {
13
+
let otel_layer = cfg.otel_enable.then(init_otel);
14
+
let log_layer = init_log(cfg.log_json);
15
+
16
+
tracing_subscriber::registry()
17
+
.with(log_layer)
18
+
.with(otel_layer)
19
+
.init();
20
+
}
21
+
22
+
fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S>
23
+
where
24
+
S: Subscriber + for<'span> LookupSpan<'span>,
25
+
{
26
+
let span_exporter = SpanExporter::builder()
27
+
.with_http()
28
+
.with_protocol(Protocol::HttpBinary)
29
+
.build()
30
+
.unwrap();
31
+
32
+
let tracer_provider = SdkTracerProvider::builder()
33
+
.with_batch_exporter(span_exporter)
34
+
.with_sampler(Sampler::AlwaysOn)
35
+
.build();
36
+
37
+
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
38
+
39
+
let tracer = tracer_provider.tracer("parakeet");
40
+
let otel_filter = EnvFilter::new("info,otel::tracing=trace");
41
+
42
+
OpenTelemetryLayer::new(tracer).with_filter(otel_filter)
43
+
}
44
+
45
+
fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S>
46
+
where
47
+
S: Subscriber + for<'span> LookupSpan<'span>,
48
+
{
49
+
let stdout_filter =
50
+
EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap());
51
+
52
+
match json {
53
+
true => tracing_subscriber::fmt::layer().json().boxed(),
54
+
false => tracing_subscriber::fmt::layer().boxed(),
55
+
}
56
+
.with_filter(stdout_filter)
57
+
}