+12
-364
Cargo.lock
+12
-364
Cargo.lock
···
286
]
287
288
[[package]]
289
-
name = "axum-tracing-opentelemetry"
290
-
version = "0.32.1"
291
-
source = "registry+https://github.com/rust-lang/crates.io-index"
292
-
checksum = "328c8ddd5ca871b2a5acb00be0b4f103aa62f5d6b6db4071ccf3b12b0629e7c1"
293
-
dependencies = [
294
-
"axum",
295
-
"futures-core",
296
-
"futures-util",
297
-
"http",
298
-
"opentelemetry",
299
-
"opentelemetry-semantic-conventions",
300
-
"pin-project-lite",
301
-
"tower",
302
-
"tracing",
303
-
"tracing-opentelemetry",
304
-
"tracing-opentelemetry-instrumentation-sdk",
305
-
]
306
-
307
-
[[package]]
308
name = "backtrace"
309
version = "0.3.74"
310
source = "registry+https://github.com/rust-lang/crates.io-index"
···
366
"proc-macro2",
367
"quote",
368
"regex",
369
-
"rustc-hash 1.1.0",
370
"shlex",
371
"syn",
372
"which",
···
484
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
485
486
[[package]]
487
-
name = "cfg_aliases"
488
-
version = "0.2.1"
489
-
source = "registry+https://github.com/rust-lang/crates.io-index"
490
-
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
491
-
492
-
[[package]]
493
name = "chrono"
494
version = "0.4.41"
495
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1395
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
1396
dependencies = [
1397
"cfg-if",
1398
-
"js-sys",
1399
"libc",
1400
"r-efi",
1401
"wasi 0.14.2+wasi-0.2.4",
1402
-
"wasm-bindgen",
1403
]
1404
1405
[[package]]
···
2196
]
2197
2198
[[package]]
2199
-
name = "lru-slab"
2200
-
version = "0.1.2"
2201
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2202
-
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
2203
-
2204
-
[[package]]
2205
name = "lz4-sys"
2206
version = "1.11.1+lz4-1.10.0"
2207
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2218
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
2219
2220
[[package]]
2221
-
name = "matchers"
2222
-
version = "0.1.0"
2223
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2224
-
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
2225
-
dependencies = [
2226
-
"regex-automata 0.1.10",
2227
-
]
2228
-
2229
-
[[package]]
2230
name = "matchit"
2231
version = "0.8.4"
2232
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2555
]
2556
2557
[[package]]
2558
-
name = "opentelemetry"
2559
-
version = "0.31.0"
2560
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2561
-
checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0"
2562
-
dependencies = [
2563
-
"futures-core",
2564
-
"futures-sink",
2565
-
"js-sys",
2566
-
"pin-project-lite",
2567
-
"thiserror 2.0.12",
2568
-
"tracing",
2569
-
]
2570
-
2571
-
[[package]]
2572
-
name = "opentelemetry-http"
2573
-
version = "0.31.0"
2574
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2575
-
checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d"
2576
-
dependencies = [
2577
-
"async-trait",
2578
-
"bytes",
2579
-
"http",
2580
-
"opentelemetry",
2581
-
"reqwest",
2582
-
]
2583
-
2584
-
[[package]]
2585
-
name = "opentelemetry-otlp"
2586
-
version = "0.31.0"
2587
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2588
-
checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf"
2589
-
dependencies = [
2590
-
"http",
2591
-
"opentelemetry",
2592
-
"opentelemetry-http",
2593
-
"opentelemetry-proto",
2594
-
"opentelemetry_sdk",
2595
-
"prost 0.14.1",
2596
-
"reqwest",
2597
-
"thiserror 2.0.12",
2598
-
"tracing",
2599
-
]
2600
-
2601
-
[[package]]
2602
-
name = "opentelemetry-proto"
2603
-
version = "0.31.0"
2604
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2605
-
checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f"
2606
-
dependencies = [
2607
-
"opentelemetry",
2608
-
"opentelemetry_sdk",
2609
-
"prost 0.14.1",
2610
-
"tonic 0.14.2",
2611
-
"tonic-prost",
2612
-
]
2613
-
2614
-
[[package]]
2615
-
name = "opentelemetry-semantic-conventions"
2616
-
version = "0.31.0"
2617
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2618
-
checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846"
2619
-
2620
-
[[package]]
2621
-
name = "opentelemetry_sdk"
2622
-
version = "0.31.0"
2623
-
source = "registry+https://github.com/rust-lang/crates.io-index"
2624
-
checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd"
2625
-
dependencies = [
2626
-
"futures-channel",
2627
-
"futures-executor",
2628
-
"futures-util",
2629
-
"opentelemetry",
2630
-
"percent-encoding",
2631
-
"rand 0.9.1",
2632
-
"thiserror 2.0.12",
2633
-
]
2634
-
2635
-
[[package]]
2636
name = "overload"
2637
version = "0.1.1"
2638
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2669
"async-recursion",
2670
"axum",
2671
"axum-extra",
2672
-
"axum-tracing-opentelemetry",
2673
"base64 0.22.1",
2674
"chrono",
2675
"dataloader",
···
2684
"jsonwebtoken",
2685
"lexica",
2686
"multibase",
2687
-
"opentelemetry",
2688
-
"opentelemetry-otlp",
2689
-
"opentelemetry_sdk",
2690
"parakeet-db",
2691
"parakeet-index",
2692
"redis",
···
2695
"serde_ipld_dagcbor",
2696
"serde_json",
2697
"tokio",
2698
-
"tower",
2699
"tower-http",
2700
"tracing",
2701
-
"tracing-opentelemetry",
2702
"tracing-subscriber",
2703
]
2704
···
2720
"eyre",
2721
"figment",
2722
"itertools 0.14.0",
2723
-
"opentelemetry",
2724
-
"opentelemetry-otlp",
2725
-
"opentelemetry_sdk",
2726
-
"prost 0.13.5",
2727
"rocksdb",
2728
"serde",
2729
"tokio",
2730
-
"tonic 0.13.1",
2731
"tonic-build",
2732
"tonic-health",
2733
-
"tonic-tracing-opentelemetry",
2734
-
"tower",
2735
"tracing",
2736
-
"tracing-opentelemetry",
2737
"tracing-subscriber",
2738
]
2739
···
3039
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
3040
dependencies = [
3041
"bytes",
3042
-
"prost-derive 0.13.5",
3043
-
]
3044
-
3045
-
[[package]]
3046
-
name = "prost"
3047
-
version = "0.14.1"
3048
-
source = "registry+https://github.com/rust-lang/crates.io-index"
3049
-
checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d"
3050
-
dependencies = [
3051
-
"bytes",
3052
-
"prost-derive 0.14.1",
3053
]
3054
3055
[[package]]
···
3065
"once_cell",
3066
"petgraph",
3067
"prettyplease",
3068
-
"prost 0.13.5",
3069
"prost-types",
3070
"regex",
3071
"syn",
···
3086
]
3087
3088
[[package]]
3089
-
name = "prost-derive"
3090
-
version = "0.14.1"
3091
-
source = "registry+https://github.com/rust-lang/crates.io-index"
3092
-
checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425"
3093
-
dependencies = [
3094
-
"anyhow",
3095
-
"itertools 0.14.0",
3096
-
"proc-macro2",
3097
-
"quote",
3098
-
"syn",
3099
-
]
3100
-
3101
-
[[package]]
3102
name = "prost-types"
3103
version = "0.13.5"
3104
source = "registry+https://github.com/rust-lang/crates.io-index"
3105
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
3106
dependencies = [
3107
-
"prost 0.13.5",
3108
]
3109
3110
[[package]]
···
3129
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
3130
3131
[[package]]
3132
-
name = "quinn"
3133
-
version = "0.11.9"
3134
-
source = "registry+https://github.com/rust-lang/crates.io-index"
3135
-
checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20"
3136
-
dependencies = [
3137
-
"bytes",
3138
-
"cfg_aliases",
3139
-
"pin-project-lite",
3140
-
"quinn-proto",
3141
-
"quinn-udp",
3142
-
"rustc-hash 2.1.1",
3143
-
"rustls",
3144
-
"socket2 0.6.0",
3145
-
"thiserror 2.0.12",
3146
-
"tokio",
3147
-
"tracing",
3148
-
"web-time",
3149
-
]
3150
-
3151
-
[[package]]
3152
-
name = "quinn-proto"
3153
-
version = "0.11.13"
3154
-
source = "registry+https://github.com/rust-lang/crates.io-index"
3155
-
checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31"
3156
-
dependencies = [
3157
-
"bytes",
3158
-
"getrandom 0.3.3",
3159
-
"lru-slab",
3160
-
"rand 0.9.1",
3161
-
"ring",
3162
-
"rustc-hash 2.1.1",
3163
-
"rustls",
3164
-
"rustls-pki-types",
3165
-
"slab",
3166
-
"thiserror 2.0.12",
3167
-
"tinyvec",
3168
-
"tracing",
3169
-
"web-time",
3170
-
]
3171
-
3172
-
[[package]]
3173
-
name = "quinn-udp"
3174
-
version = "0.5.14"
3175
-
source = "registry+https://github.com/rust-lang/crates.io-index"
3176
-
checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd"
3177
-
dependencies = [
3178
-
"cfg_aliases",
3179
-
"libc",
3180
-
"once_cell",
3181
-
"socket2 0.6.0",
3182
-
"tracing",
3183
-
"windows-sys 0.59.0",
3184
-
]
3185
-
3186
-
[[package]]
3187
name = "quote"
3188
version = "1.0.38"
3189
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3345
dependencies = [
3346
"aho-corasick",
3347
"memchr",
3348
-
"regex-automata 0.4.9",
3349
-
"regex-syntax 0.8.5",
3350
-
]
3351
-
3352
-
[[package]]
3353
-
name = "regex-automata"
3354
-
version = "0.1.10"
3355
-
source = "registry+https://github.com/rust-lang/crates.io-index"
3356
-
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
3357
-
dependencies = [
3358
-
"regex-syntax 0.6.29",
3359
]
3360
3361
[[package]]
···
3366
dependencies = [
3367
"aho-corasick",
3368
"memchr",
3369
-
"regex-syntax 0.8.5",
3370
]
3371
3372
[[package]]
3373
name = "regex-syntax"
3374
-
version = "0.6.29"
3375
-
source = "registry+https://github.com/rust-lang/crates.io-index"
3376
-
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
3377
-
3378
-
[[package]]
3379
-
name = "regex-syntax"
3380
version = "0.8.5"
3381
source = "registry+https://github.com/rust-lang/crates.io-index"
3382
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
···
3391
"base64 0.22.1",
3392
"bytes",
3393
"encoding_rs",
3394
-
"futures-channel",
3395
"futures-core",
3396
"futures-util",
3397
"h2",
···
3410
"once_cell",
3411
"percent-encoding",
3412
"pin-project-lite",
3413
-
"quinn",
3414
-
"rustls",
3415
-
"rustls-native-certs",
3416
"rustls-pemfile",
3417
-
"rustls-pki-types",
3418
"serde",
3419
"serde_json",
3420
"serde_urlencoded",
···
3422
"system-configuration",
3423
"tokio",
3424
"tokio-native-tls",
3425
-
"tokio-rustls",
3426
"tokio-util",
3427
"tower",
3428
"tower-service",
···
3512
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
3513
3514
[[package]]
3515
-
name = "rustc-hash"
3516
-
version = "2.1.1"
3517
-
source = "registry+https://github.com/rust-lang/crates.io-index"
3518
-
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
3519
-
3520
-
[[package]]
3521
name = "rustc_version"
3522
version = "0.4.1"
3523
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3547
dependencies = [
3548
"aws-lc-rs",
3549
"once_cell",
3550
-
"ring",
3551
"rustls-pki-types",
3552
"rustls-webpki",
3553
"subtle",
···
3580
version = "1.11.0"
3581
source = "registry+https://github.com/rust-lang/crates.io-index"
3582
checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c"
3583
-
dependencies = [
3584
-
"web-time",
3585
-
]
3586
3587
[[package]]
3588
name = "rustls-webpki"
···
4426
"hyper-util",
4427
"percent-encoding",
4428
"pin-project",
4429
-
"prost 0.13.5",
4430
"socket2 0.5.8",
4431
"tokio",
4432
"tokio-stream",
4433
"tower",
4434
-
"tower-layer",
4435
-
"tower-service",
4436
-
"tracing",
4437
-
]
4438
-
4439
-
[[package]]
4440
-
name = "tonic"
4441
-
version = "0.14.2"
4442
-
source = "registry+https://github.com/rust-lang/crates.io-index"
4443
-
checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203"
4444
-
dependencies = [
4445
-
"async-trait",
4446
-
"base64 0.22.1",
4447
-
"bytes",
4448
-
"http",
4449
-
"http-body",
4450
-
"http-body-util",
4451
-
"percent-encoding",
4452
-
"pin-project",
4453
-
"sync_wrapper",
4454
-
"tokio-stream",
4455
"tower-layer",
4456
"tower-service",
4457
"tracing",
···
4477
source = "registry+https://github.com/rust-lang/crates.io-index"
4478
checksum = "cb87334d340313fefa513b6e60794d44a86d5f039b523229c99c323e4e19ca4b"
4479
dependencies = [
4480
-
"prost 0.13.5",
4481
"tokio",
4482
"tokio-stream",
4483
-
"tonic 0.13.1",
4484
-
]
4485
-
4486
-
[[package]]
4487
-
name = "tonic-prost"
4488
-
version = "0.14.2"
4489
-
source = "registry+https://github.com/rust-lang/crates.io-index"
4490
-
checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67"
4491
-
dependencies = [
4492
-
"bytes",
4493
-
"prost 0.14.1",
4494
-
"tonic 0.14.2",
4495
-
]
4496
-
4497
-
[[package]]
4498
-
name = "tonic-tracing-opentelemetry"
4499
-
version = "0.32.0"
4500
-
source = "registry+https://github.com/rust-lang/crates.io-index"
4501
-
checksum = "31f57ac46b32b08989476b498239364c300b09d75928c1fa2e46cb489a41c8e3"
4502
-
dependencies = [
4503
-
"futures-core",
4504
-
"futures-util",
4505
-
"http",
4506
-
"http-body",
4507
-
"hyper",
4508
-
"opentelemetry",
4509
-
"pin-project-lite",
4510
-
"tonic 0.14.2",
4511
-
"tower",
4512
-
"tracing",
4513
-
"tracing-opentelemetry",
4514
-
"tracing-opentelemetry-instrumentation-sdk",
4515
]
4516
4517
[[package]]
···
4606
]
4607
4608
[[package]]
4609
-
name = "tracing-opentelemetry"
4610
-
version = "0.32.0"
4611
-
source = "registry+https://github.com/rust-lang/crates.io-index"
4612
-
checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e"
4613
-
dependencies = [
4614
-
"js-sys",
4615
-
"opentelemetry",
4616
-
"opentelemetry_sdk",
4617
-
"rustversion",
4618
-
"smallvec",
4619
-
"thiserror 2.0.12",
4620
-
"tracing",
4621
-
"tracing-core",
4622
-
"tracing-log",
4623
-
"tracing-subscriber",
4624
-
"web-time",
4625
-
]
4626
-
4627
-
[[package]]
4628
-
name = "tracing-opentelemetry-instrumentation-sdk"
4629
-
version = "0.32.1"
4630
-
source = "registry+https://github.com/rust-lang/crates.io-index"
4631
-
checksum = "7a1a4dcfb798af2cef9e47c30a14e13c108b4b40e057120401b2025ec622c416"
4632
-
dependencies = [
4633
-
"http",
4634
-
"opentelemetry",
4635
-
"opentelemetry-semantic-conventions",
4636
-
"tracing",
4637
-
"tracing-opentelemetry",
4638
-
]
4639
-
4640
-
[[package]]
4641
-
name = "tracing-serde"
4642
-
version = "0.2.0"
4643
-
source = "registry+https://github.com/rust-lang/crates.io-index"
4644
-
checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1"
4645
-
dependencies = [
4646
-
"serde",
4647
-
"tracing-core",
4648
-
]
4649
-
4650
-
[[package]]
4651
name = "tracing-subscriber"
4652
version = "0.3.19"
4653
source = "registry+https://github.com/rust-lang/crates.io-index"
4654
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
4655
dependencies = [
4656
-
"matchers",
4657
"nu-ansi-term",
4658
-
"once_cell",
4659
-
"regex",
4660
-
"serde",
4661
-
"serde_json",
4662
"sharded-slab",
4663
"smallvec",
4664
"thread_local",
4665
-
"tracing",
4666
"tracing-core",
4667
"tracing-log",
4668
-
"tracing-serde",
4669
]
4670
4671
[[package]]
···
4934
version = "0.3.77"
4935
source = "registry+https://github.com/rust-lang/crates.io-index"
4936
checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2"
4937
-
dependencies = [
4938
-
"js-sys",
4939
-
"wasm-bindgen",
4940
-
]
4941
-
4942
-
[[package]]
4943
-
name = "web-time"
4944
-
version = "1.1.0"
4945
-
source = "registry+https://github.com/rust-lang/crates.io-index"
4946
-
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
4947
dependencies = [
4948
"js-sys",
4949
"wasm-bindgen",
···
286
]
287
288
[[package]]
289
name = "backtrace"
290
version = "0.3.74"
291
source = "registry+https://github.com/rust-lang/crates.io-index"
···
347
"proc-macro2",
348
"quote",
349
"regex",
350
+
"rustc-hash",
351
"shlex",
352
"syn",
353
"which",
···
465
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
466
467
[[package]]
468
name = "chrono"
469
version = "0.4.41"
470
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1370
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
1371
dependencies = [
1372
"cfg-if",
1373
"libc",
1374
"r-efi",
1375
"wasi 0.14.2+wasi-0.2.4",
1376
]
1377
1378
[[package]]
···
2169
]
2170
2171
[[package]]
2172
name = "lz4-sys"
2173
version = "1.11.1+lz4-1.10.0"
2174
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2185
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
2186
2187
[[package]]
2188
name = "matchit"
2189
version = "0.8.4"
2190
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2513
]
2514
2515
[[package]]
2516
name = "overload"
2517
version = "0.1.1"
2518
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2549
"async-recursion",
2550
"axum",
2551
"axum-extra",
2552
"base64 0.22.1",
2553
"chrono",
2554
"dataloader",
···
2563
"jsonwebtoken",
2564
"lexica",
2565
"multibase",
2566
"parakeet-db",
2567
"parakeet-index",
2568
"redis",
···
2571
"serde_ipld_dagcbor",
2572
"serde_json",
2573
"tokio",
2574
"tower-http",
2575
"tracing",
2576
"tracing-subscriber",
2577
]
2578
···
2594
"eyre",
2595
"figment",
2596
"itertools 0.14.0",
2597
+
"prost",
2598
"rocksdb",
2599
"serde",
2600
"tokio",
2601
+
"tonic",
2602
"tonic-build",
2603
"tonic-health",
2604
"tracing",
2605
"tracing-subscriber",
2606
]
2607
···
2907
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
2908
dependencies = [
2909
"bytes",
2910
+
"prost-derive",
2911
]
2912
2913
[[package]]
···
2923
"once_cell",
2924
"petgraph",
2925
"prettyplease",
2926
+
"prost",
2927
"prost-types",
2928
"regex",
2929
"syn",
···
2944
]
2945
2946
[[package]]
2947
name = "prost-types"
2948
version = "0.13.5"
2949
source = "registry+https://github.com/rust-lang/crates.io-index"
2950
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
2951
dependencies = [
2952
+
"prost",
2953
]
2954
2955
[[package]]
···
2974
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
2975
2976
[[package]]
2977
name = "quote"
2978
version = "1.0.38"
2979
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3135
dependencies = [
3136
"aho-corasick",
3137
"memchr",
3138
+
"regex-automata",
3139
+
"regex-syntax",
3140
]
3141
3142
[[package]]
···
3147
dependencies = [
3148
"aho-corasick",
3149
"memchr",
3150
+
"regex-syntax",
3151
]
3152
3153
[[package]]
3154
name = "regex-syntax"
3155
version = "0.8.5"
3156
source = "registry+https://github.com/rust-lang/crates.io-index"
3157
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
···
3166
"base64 0.22.1",
3167
"bytes",
3168
"encoding_rs",
3169
"futures-core",
3170
"futures-util",
3171
"h2",
···
3184
"once_cell",
3185
"percent-encoding",
3186
"pin-project-lite",
3187
"rustls-pemfile",
3188
"serde",
3189
"serde_json",
3190
"serde_urlencoded",
···
3192
"system-configuration",
3193
"tokio",
3194
"tokio-native-tls",
3195
"tokio-util",
3196
"tower",
3197
"tower-service",
···
3281
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
3282
3283
[[package]]
3284
name = "rustc_version"
3285
version = "0.4.1"
3286
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3310
dependencies = [
3311
"aws-lc-rs",
3312
"once_cell",
3313
"rustls-pki-types",
3314
"rustls-webpki",
3315
"subtle",
···
3342
version = "1.11.0"
3343
source = "registry+https://github.com/rust-lang/crates.io-index"
3344
checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c"
3345
3346
[[package]]
3347
name = "rustls-webpki"
···
4185
"hyper-util",
4186
"percent-encoding",
4187
"pin-project",
4188
+
"prost",
4189
"socket2 0.5.8",
4190
"tokio",
4191
"tokio-stream",
4192
"tower",
4193
"tower-layer",
4194
"tower-service",
4195
"tracing",
···
4215
source = "registry+https://github.com/rust-lang/crates.io-index"
4216
checksum = "cb87334d340313fefa513b6e60794d44a86d5f039b523229c99c323e4e19ca4b"
4217
dependencies = [
4218
+
"prost",
4219
"tokio",
4220
"tokio-stream",
4221
+
"tonic",
4222
]
4223
4224
[[package]]
···
4313
]
4314
4315
[[package]]
4316
name = "tracing-subscriber"
4317
version = "0.3.19"
4318
source = "registry+https://github.com/rust-lang/crates.io-index"
4319
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
4320
dependencies = [
4321
"nu-ansi-term",
4322
"sharded-slab",
4323
"smallvec",
4324
"thread_local",
4325
"tracing-core",
4326
"tracing-log",
4327
]
4328
4329
[[package]]
···
4592
version = "0.3.77"
4593
source = "registry+https://github.com/rust-lang/crates.io-index"
4594
checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2"
4595
dependencies = [
4596
"js-sys",
4597
"wasm-bindgen",
+1
-1
consumer/Cargo.toml
+1
-1
consumer/Cargo.toml
+3
-3
consumer/src/backfill/downloader.rs
+3
-3
consumer/src/backfill/downloader.rs
···
84
85
// has the repo already been downloaded?
86
if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() {
87
-
tracing::info!("skipping duplicate repo {did}");
88
continue;
89
}
90
···
92
match db::actor_get_statuses(&mut conn, &did).await {
93
Ok(Some((_, state))) => {
94
if state == ActorSyncState::Synced || state == ActorSyncState::Processing {
95
-
tracing::info!("skipping duplicate repo {did}");
96
continue;
97
}
98
}
···
206
let _ = rc.zadd(BF_REM_KEY, &pds, rem).await;
207
let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await;
208
}
209
-
Ok(_) => tracing::debug!(pds, "got response with no ratelimit headers."),
210
Err(e) => {
211
tracing::error!(pds, did, "failed to download repo: {e}");
212
continue;
···
84
85
// has the repo already been downloaded?
86
if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() {
87
+
tracing::warn!("skipping duplicate repo {did}");
88
continue;
89
}
90
···
92
match db::actor_get_statuses(&mut conn, &did).await {
93
Ok(Some((_, state))) => {
94
if state == ActorSyncState::Synced || state == ActorSyncState::Processing {
95
+
tracing::warn!("skipping duplicate repo {did}");
96
continue;
97
}
98
}
···
206
let _ = rc.zadd(BF_REM_KEY, &pds, rem).await;
207
let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await;
208
}
209
+
Ok(_) => tracing::warn!(pds, "got response with no ratelimit headers."),
210
Err(e) => {
211
tracing::error!(pds, did, "failed to download repo: {e}");
212
continue;
+5
-4
consumer/src/backfill/repo.rs
+5
-4
consumer/src/backfill/repo.rs
···
53
54
match block {
55
CarEntry::Commit(_) => {
56
-
tracing::debug!("got commit entry that was not in root")
57
}
58
CarEntry::Record(CarRecordEntry::Known(record)) => {
59
if let Some(path) = mst_nodes.remove(&cid) {
···
96
}
97
}
98
99
-
let Some(commit) = commit else {
100
-
eyre::bail!("repo contained no commit?");
101
-
};
102
103
Ok((commit, deltas, copies))
104
}
···
171
}
172
RecordTypes::AppBskyFeedThreadgate(record) => {
173
if !at_uri_is_by(&record.post, did) {
174
return Ok(());
175
}
176
···
195
RecordTypes::AppBskyGraphListItem(rec) => {
196
let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>();
197
if did != split_aturi[2] {
198
return Ok(());
199
}
200
···
53
54
match block {
55
CarEntry::Commit(_) => {
56
+
tracing::warn!("got commit entry that was not in root")
57
}
58
CarEntry::Record(CarRecordEntry::Known(record)) => {
59
if let Some(path) = mst_nodes.remove(&cid) {
···
96
}
97
}
98
99
+
let commit = commit.unwrap();
100
101
Ok((commit, deltas, copies))
102
}
···
169
}
170
RecordTypes::AppBskyFeedThreadgate(record) => {
171
if !at_uri_is_by(&record.post, did) {
172
+
tracing::warn!("tried to create a threadgate on a post we don't control!");
173
return Ok(());
174
}
175
···
194
RecordTypes::AppBskyGraphListItem(rec) => {
195
let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>();
196
if did != split_aturi[2] {
197
+
// it's also probably a bad idea to log *all* the attempts to do this...
198
+
tracing::warn!("tried to create a listitem on a list we don't control!");
199
return Ok(());
200
}
201
-8
consumer/src/config.rs
-8
consumer/src/config.rs
···
13
14
#[derive(Debug, Deserialize)]
15
pub struct Config {
16
-
#[serde(flatten)]
17
-
pub instruments: ConfigInstruments,
18
pub index_uri: String,
19
pub database: deadpool_postgres::Config,
20
pub redis_uri: String,
···
29
pub indexer: Option<IndexerConfig>,
30
/// Configuration items specific to backfill
31
pub backfill: Option<BackfillConfig>,
32
-
}
33
-
34
-
#[derive(Debug, Deserialize)]
35
-
pub struct ConfigInstruments {
36
-
#[serde(default)]
37
-
pub log_json: bool,
38
}
39
40
#[derive(Debug, Deserialize)]
···
13
14
#[derive(Debug, Deserialize)]
15
pub struct Config {
16
pub index_uri: String,
17
pub database: deadpool_postgres::Config,
18
pub redis_uri: String,
···
27
pub indexer: Option<IndexerConfig>,
28
/// Configuration items specific to backfill
29
pub backfill: Option<BackfillConfig>,
30
}
31
32
#[derive(Debug, Deserialize)]
+3
-3
consumer/src/db/actor.rs
+3
-3
consumer/src/db/actor.rs
···
69
)
70
.await?;
71
72
-
res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()
73
}
74
75
pub async fn actor_get_repo_status<C: GenericClient>(
···
83
)
84
.await?;
85
86
-
res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()
87
}
88
89
pub async fn actor_get_statuses<C: GenericClient>(
···
97
)
98
.await?;
99
100
-
res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()
101
}
···
69
)
70
.await?;
71
72
+
Ok(res.map(|v| (v.get(0), v.get(1))))
73
}
74
75
pub async fn actor_get_repo_status<C: GenericClient>(
···
83
)
84
.await?;
85
86
+
Ok(res.map(|v| (v.get(0), v.get(1))))
87
}
88
89
pub async fn actor_get_statuses<C: GenericClient>(
···
97
)
98
.await?;
99
100
+
Ok(res.map(|v| (v.get(0), v.get(1))))
101
}
+9
-10
consumer/src/db/backfill.rs
+9
-10
consumer/src/db/backfill.rs
···
51
)
52
.await?;
53
54
-
res.into_iter()
55
-
.map(|row| {
56
-
Ok(BackfillRow {
57
-
repo: row.try_get(0)?,
58
-
repo_ver: row.try_get(1)?,
59
-
cid: row.try_get(2)?,
60
-
data: row.try_get(3)?,
61
-
indexed_at: row.try_get(4)?,
62
-
})
63
})
64
-
.collect()
65
}
66
67
pub async fn backfill_delete_rows<C: GenericClient>(conn: &mut C, repo: &str) -> PgExecResult {
···
51
)
52
.await?;
53
54
+
Ok(res
55
+
.into_iter()
56
+
.map(|row| BackfillRow {
57
+
repo: row.get(0),
58
+
repo_ver: row.get(1),
59
+
cid: row.get(2),
60
+
data: row.get(3),
61
+
indexed_at: row.get(4),
62
})
63
+
.collect())
64
}
65
66
pub async fn backfill_delete_rows<C: GenericClient>(conn: &mut C, repo: &str) -> PgExecResult {
+1
-1
consumer/src/db/copy.rs
+1
-1
consumer/src/db/copy.rs
+13
-18
consumer/src/db/gates.rs
+13
-18
consumer/src/db/gates.rs
···
47
&[&root_author, &post_author],
48
)
49
.await?
50
-
.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()?;
51
52
if let Some((following, followed)) = profile_state {
53
if allow.contains(THREADGATE_RULE_FOLLOWER) && followed {
···
65
let mentions: Vec<String> = conn
66
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
67
.await?
68
-
.and_then(|r| r.try_get::<_, Option<_>>(0).transpose())
69
-
.transpose()?
70
.unwrap_or_default();
71
72
if mentions.contains(&post_author.to_owned()) {
···
85
&[&allow_lists, &post_author],
86
)
87
.await?
88
-
.try_get(0)?;
89
if count != 0 {
90
return Ok(false);
91
}
···
137
)
138
.await?
139
.into_iter()
140
-
.map(|row| row.try_get(0))
141
-
.collect::<Result<_, _>>()?;
142
143
// this will be empty if there are no replies.
144
if dids.is_empty() {
···
153
})
154
.collect::<Vec<_>>();
155
156
-
let allow: HashSet<_> = HashSet::from_iter(allow.iter().map(|v| v.as_str()));
157
158
if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() {
159
let current_dids: Vec<_> = dids.iter().collect();
···
161
let res = conn.query(
162
"SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL",
163
&[&root_author, ¤t_dids]
164
-
).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<HashSet<_>, _>>()?;
165
166
-
dids = &dids - &res;
167
}
168
169
if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() {
···
172
let res = conn.query(
173
"SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL",
174
&[&root_author, ¤t_dids]
175
-
).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<_, _>>()?;
176
177
-
dids = &dids - &res;
178
}
179
180
if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() {
181
let mentions: Vec<String> = conn
182
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
183
.await?
184
-
.and_then(|r| r.try_get::<_, Option<_>>(0).transpose())
185
-
.transpose()?
186
.unwrap_or_default();
187
188
dids = &dids - &HashSet::from_iter(mentions);
···
196
"SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)",
197
&[&allowed_lists, ¤t_dids],
198
)
199
-
.await?
200
-
.into_iter()
201
-
.map(|row| row.try_get(0))
202
-
.collect::<Result<_, _>>()?;
203
204
-
dids = &dids - &res;
205
}
206
207
let dids = dids.into_iter().collect::<Vec<_>>();
···
47
&[&root_author, &post_author],
48
)
49
.await?
50
+
.map(|v| (v.get(0), v.get(1)));
51
52
if let Some((following, followed)) = profile_state {
53
if allow.contains(THREADGATE_RULE_FOLLOWER) && followed {
···
65
let mentions: Vec<String> = conn
66
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
67
.await?
68
+
.map(|r| r.get(0))
69
.unwrap_or_default();
70
71
if mentions.contains(&post_author.to_owned()) {
···
84
&[&allow_lists, &post_author],
85
)
86
.await?
87
+
.get(0);
88
if count != 0 {
89
return Ok(false);
90
}
···
136
)
137
.await?
138
.into_iter()
139
+
.map(|row| row.get(0))
140
+
.collect();
141
142
// this will be empty if there are no replies.
143
if dids.is_empty() {
···
152
})
153
.collect::<Vec<_>>();
154
155
+
let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str()));
156
157
if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() {
158
let current_dids: Vec<_> = dids.iter().collect();
···
160
let res = conn.query(
161
"SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL",
162
&[&root_author, ¤t_dids]
163
+
).await?;
164
165
+
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
166
}
167
168
if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() {
···
171
let res = conn.query(
172
"SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL",
173
&[&root_author, ¤t_dids]
174
+
).await?;
175
176
+
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
177
}
178
179
if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() {
180
let mentions: Vec<String> = conn
181
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
182
.await?
183
+
.map(|r| r.get(0))
184
.unwrap_or_default();
185
186
dids = &dids - &HashSet::from_iter(mentions);
···
194
"SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)",
195
&[&allowed_lists, ¤t_dids],
196
)
197
+
.await?;
198
199
+
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
200
}
201
202
let dids = dids.into_iter().collect::<Vec<_>>();
+20
-17
consumer/src/db/record.rs
+20
-17
consumer/src/db/record.rs
···
127
],
128
)
129
.await
130
-
.and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0))
131
}
132
133
pub async fn feedgen_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
···
159
)
160
.await?;
161
162
-
res.map(|v| v.try_get(0)).transpose()
163
}
164
165
pub async fn labeler_upsert<C: GenericClient>(
···
224
)
225
.await?;
226
227
-
res.map(|v| v.try_get(0)).transpose()
228
}
229
230
pub async fn list_upsert<C: GenericClient>(
···
255
],
256
)
257
.await
258
-
.and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0))
259
}
260
261
pub async fn list_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
···
391
)
392
.await?;
393
394
-
res.map(|row| Ok((row.try_get(0)?, row.try_get(1)?)))
395
-
.transpose()
396
}
397
398
pub async fn post_embed_insert<C: GenericClient>(
···
537
conn: &mut C,
538
post: &str,
539
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
540
-
conn.query_opt(
541
-
"SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1",
542
-
&[&post],
543
-
)
544
-
.await?
545
-
.map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?)))
546
-
.transpose()
547
}
548
549
pub async fn postgate_upsert<C: GenericClient>(
···
650
)
651
.await?;
652
653
-
res.map(|v| v.try_get(0)).transpose()
654
}
655
656
pub async fn starter_pack_upsert<C: GenericClient>(
···
685
],
686
)
687
.await
688
-
.and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0))
689
}
690
691
pub async fn starter_pack_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
···
730
conn: &mut C,
731
post: &str,
732
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
733
-
conn
734
.query_opt(
735
"SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL",
736
&[&post],
737
)
738
.await?
739
-
.map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).transpose()
740
}
741
742
pub async fn threadgate_upsert<C: GenericClient>(
···
127
],
128
)
129
.await
130
+
.map(|r| r.get::<_, i32>(0) == 0)
131
}
132
133
pub async fn feedgen_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
···
159
)
160
.await?;
161
162
+
Ok(res.map(|v| v.get(0)))
163
}
164
165
pub async fn labeler_upsert<C: GenericClient>(
···
224
)
225
.await?;
226
227
+
Ok(res.map(|v| v.get(0)))
228
}
229
230
pub async fn list_upsert<C: GenericClient>(
···
255
],
256
)
257
.await
258
+
.map(|r| r.get::<_, i32>(0) == 0)
259
}
260
261
pub async fn list_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
···
391
)
392
.await?;
393
394
+
Ok(res.map(|row| (row.get(0), row.get(1))))
395
}
396
397
pub async fn post_embed_insert<C: GenericClient>(
···
536
conn: &mut C,
537
post: &str,
538
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
539
+
let res = conn
540
+
.query_opt(
541
+
"SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1",
542
+
&[&post],
543
+
)
544
+
.await?
545
+
.map(|v| (v.get(0), v.get(1), v.get(2)));
546
+
547
+
Ok(res)
548
}
549
550
pub async fn postgate_upsert<C: GenericClient>(
···
651
)
652
.await?;
653
654
+
Ok(res.map(|v| v.get(0)))
655
}
656
657
pub async fn starter_pack_upsert<C: GenericClient>(
···
686
],
687
)
688
.await
689
+
.map(|r| r.get::<_, i32>(0) == 0)
690
}
691
692
pub async fn starter_pack_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
···
731
conn: &mut C,
732
post: &str,
733
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
734
+
let res = conn
735
.query_opt(
736
"SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL",
737
&[&post],
738
)
739
.await?
740
+
.map(|v| (v.get(0), v.get(1), v.get(2)));
741
+
742
+
Ok(res)
743
}
744
745
pub async fn threadgate_upsert<C: GenericClient>(
+4
consumer/src/indexer/mod.rs
+4
consumer/src/indexer/mod.rs
···
640
}
641
RecordTypes::AppBskyFeedPostgate(record) => {
642
if !at_uri_is_by(&record.post, repo) {
643
return Ok(());
644
}
645
···
669
}
670
RecordTypes::AppBskyFeedThreadgate(record) => {
671
if !at_uri_is_by(&record.post, repo) {
672
return Ok(());
673
}
674
···
708
}
709
RecordTypes::AppBskyGraphListItem(record) => {
710
if !at_uri_is_by(&record.list, repo) {
711
return Ok(());
712
}
713
···
640
}
641
RecordTypes::AppBskyFeedPostgate(record) => {
642
if !at_uri_is_by(&record.post, repo) {
643
+
tracing::warn!("tried to create a postgate on a post we don't control!");
644
return Ok(());
645
}
646
···
670
}
671
RecordTypes::AppBskyFeedThreadgate(record) => {
672
if !at_uri_is_by(&record.post, repo) {
673
+
tracing::warn!("tried to create a threadgate on a post we don't control!");
674
return Ok(());
675
}
676
···
710
}
711
RecordTypes::AppBskyGraphListItem(record) => {
712
if !at_uri_is_by(&record.list, repo) {
713
+
// it's also probably a bad idea to log *all* the attempts to do this...
714
+
tracing::warn!("tried to create a listitem on a list we don't control!");
715
return Ok(());
716
}
717
-25
consumer/src/instrumentation.rs
-25
consumer/src/instrumentation.rs
···
1
-
use tracing::Subscriber;
2
-
use tracing_subscriber::filter::Filtered;
3
-
use tracing_subscriber::layer::SubscriberExt;
4
-
use tracing_subscriber::registry::LookupSpan;
5
-
use tracing_subscriber::util::SubscriberInitExt;
6
-
use tracing_subscriber::{EnvFilter, Layer};
7
-
8
-
pub fn init_instruments(cfg: &crate::config::ConfigInstruments) {
9
-
let log_layer = init_log(cfg.log_json);
10
-
11
-
tracing_subscriber::registry().with(log_layer).init();
12
-
}
13
-
14
-
fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S>
15
-
where
16
-
S: Subscriber + for<'span> LookupSpan<'span>,
17
-
{
18
-
let stdout_filter = EnvFilter::from_default_env();
19
-
20
-
match json {
21
-
true => tracing_subscriber::fmt::layer().json().boxed(),
22
-
false => tracing_subscriber::fmt::layer().boxed(),
23
-
}
24
-
.with_filter(stdout_filter)
25
-
}
···
+1
-2
consumer/src/main.rs
+1
-2
consumer/src/main.rs
···
12
mod db;
13
mod firehose;
14
mod indexer;
15
-
mod instrumentation;
16
mod label_indexer;
17
mod utils;
18
19
#[tokio::main]
20
async fn main() -> eyre::Result<()> {
21
PrometheusBuilder::new().install()?;
22
23
let cli = cmd::parse();
24
let conf = config::load_config()?;
25
26
-
instrumentation::init_instruments(&conf.instruments);
27
let user_agent = build_ua(&conf.ua_contact);
28
29
let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
···
12
mod db;
13
mod firehose;
14
mod indexer;
15
mod label_indexer;
16
mod utils;
17
18
#[tokio::main]
19
async fn main() -> eyre::Result<()> {
20
+
tracing_subscriber::fmt::init();
21
PrometheusBuilder::new().install()?;
22
23
let cli = cmd::parse();
24
let conf = config::load_config()?;
25
26
let user_agent = build_ua(&conf.ua_contact);
27
28
let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
+2
-2
migrations/2025-09-27-171241_post-tweaks/up.sql
+2
-2
migrations/2025-09-27-171241_post-tweaks/up.sql
···
34
language plpgsql as
35
$$
36
begin
37
-
delete from author_feeds where did = OLD.did and uri = OLD.at_uri and typ = 'post';
38
return OLD;
39
end;
40
$$;
···
67
language plpgsql as
68
$$
69
begin
70
-
delete from author_feeds where did = OLD.did and post = OLD.post and typ = 'repost';
71
return OLD;
72
end;
73
$$;
···
34
language plpgsql as
35
$$
36
begin
37
+
delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post';
38
return OLD;
39
end;
40
$$;
···
67
language plpgsql as
68
$$
69
begin
70
+
delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost';
71
return OLD;
72
end;
73
$$;
+2
-8
parakeet/Cargo.toml
+2
-8
parakeet/Cargo.toml
···
6
[dependencies]
7
async-recursion = "1.1.1"
8
axum = { version = "0.8", features = ["json"] }
9
-
axum-tracing-opentelemetry = "0.32"
10
axum-extra = { version = "0.10.0", features = ["query", "typed-header"] }
11
base64 = "0.22"
12
chrono = { version = "0.4.39", features = ["serde"] }
···
22
jsonwebtoken = { git = "https://gitlab.com/parakeet-social/jsonwebtoken", branch = "es256k" }
23
lexica = { path = "../lexica" }
24
multibase = "0.9.1"
25
-
opentelemetry = "0.31.0"
26
-
opentelemetry-otlp = "0.31.0"
27
-
opentelemetry_sdk = "0.31.0"
28
parakeet-db = { path = "../parakeet-db" }
29
-
parakeet-index = { path = "../parakeet-index", features = ["otel"] }
30
redis = { version = "0.32", features = ["tokio-native-tls-comp"] }
31
reqwest = { version = "0.12", features = ["json"] }
32
serde = { version = "1.0.217", features = ["derive"] }
33
serde_ipld_dagcbor = "0.6.1"
34
serde_json = "1.0.134"
35
tokio = { version = "1.42.0", features = ["full"] }
36
-
tower = "0.5"
37
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
38
tracing = "0.1.40"
39
-
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
40
-
tracing-opentelemetry = "0.32"
···
6
[dependencies]
7
async-recursion = "1.1.1"
8
axum = { version = "0.8", features = ["json"] }
9
axum-extra = { version = "0.10.0", features = ["query", "typed-header"] }
10
base64 = "0.22"
11
chrono = { version = "0.4.39", features = ["serde"] }
···
21
jsonwebtoken = { git = "https://gitlab.com/parakeet-social/jsonwebtoken", branch = "es256k" }
22
lexica = { path = "../lexica" }
23
multibase = "0.9.1"
24
parakeet-db = { path = "../parakeet-db" }
25
+
parakeet-index = { path = "../parakeet-index" }
26
redis = { version = "0.32", features = ["tokio-native-tls-comp"] }
27
reqwest = { version = "0.12", features = ["json"] }
28
serde = { version = "1.0.217", features = ["derive"] }
29
serde_ipld_dagcbor = "0.6.1"
30
serde_json = "1.0.134"
31
tokio = { version = "1.42.0", features = ["full"] }
32
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
33
tracing = "0.1.40"
34
+
tracing-subscriber = "0.3.18"
+2
-2
parakeet/src/cache.rs
+2
-2
parakeet/src/cache.rs
···
29
type Val = V;
30
31
async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> {
32
-
let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, key).await.ok()?;
33
34
match serde_ipld_dagcbor::from_slice(&res?) {
35
Ok(v) => Some(v),
···
57
}
58
59
async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> {
60
-
let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, key)
61
.await
62
.ok()?;
63
···
29
type Val = V;
30
31
async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> {
32
+
let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, &key).await.ok()?;
33
34
match serde_ipld_dagcbor::from_slice(&res?) {
35
Ok(v) => Some(v),
···
57
}
58
59
async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> {
60
+
let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, &key)
61
.await
62
.ok()?;
63
-10
parakeet/src/config.rs
-10
parakeet/src/config.rs
···
13
14
#[derive(Debug, Deserialize)]
15
pub struct Config {
16
-
#[serde(flatten)]
17
-
pub instruments: ConfigInstruments,
18
pub index_uri: String,
19
pub database_url: String,
20
pub redis_uri: String,
···
29
pub did_allowlist: Option<Vec<String>>,
30
#[serde(default)]
31
pub migrate: bool,
32
-
}
33
-
34
-
#[derive(Debug, Deserialize)]
35
-
pub struct ConfigInstruments {
36
-
#[serde(default)]
37
-
pub otel_enable: bool,
38
-
#[serde(default)]
39
-
pub log_json: bool,
40
}
41
42
#[derive(Debug, Deserialize)]
+1
-21
parakeet/src/db.rs
+1
-21
parakeet/src/db.rs
···
1
use diesel::prelude::*;
2
use diesel::sql_types::{Array, Bool, Integer, Nullable, Text};
3
use diesel_async::{AsyncPgConnection, RunQueryDsl};
4
-
use parakeet_db::models::TextArray;
5
use parakeet_db::{schema, types};
6
-
use tracing::instrument;
7
8
-
#[instrument(skip_all)]
9
pub async fn get_actor_status(
10
conn: &mut AsyncPgConnection,
11
did: &str,
···
40
#[diesel(sql_type = Nullable<Text>)]
41
pub list_mute: Option<String>,
42
}
43
-
44
-
#[instrument(skip_all)]
45
pub async fn get_profile_state(
46
conn: &mut AsyncPgConnection,
47
did: &str,
···
54
.await
55
.optional()
56
}
57
-
58
-
#[instrument(skip_all)]
59
pub async fn get_profile_states(
60
conn: &mut AsyncPgConnection,
61
did: &str,
···
90
#[diesel(sql_type = diesel::sql_types::Bool)]
91
pub pinned: bool,
92
}
93
-
94
-
#[instrument(skip_all)]
95
pub async fn get_post_state(
96
conn: &mut AsyncPgConnection,
97
did: &str,
···
105
.optional()
106
}
107
108
-
#[instrument(skip_all)]
109
pub async fn get_post_states(
110
conn: &mut AsyncPgConnection,
111
did: &str,
···
129
pub block: Option<String>,
130
}
131
132
-
#[instrument(skip_all)]
133
pub async fn get_list_state(
134
conn: &mut AsyncPgConnection,
135
did: &str,
···
143
.optional()
144
}
145
146
-
#[instrument(skip_all)]
147
pub async fn get_list_states(
148
conn: &mut AsyncPgConnection,
149
did: &str,
···
156
.await
157
}
158
159
-
#[instrument(skip_all)]
160
pub async fn get_like_state(
161
conn: &mut AsyncPgConnection,
162
did: &str,
···
174
.optional()
175
}
176
177
-
#[instrument(skip_all)]
178
pub async fn get_like_states(
179
conn: &mut AsyncPgConnection,
180
did: &str,
···
195
.await
196
}
197
198
-
#[instrument(skip_all)]
199
pub async fn get_pinned_post_uri(
200
conn: &mut AsyncPgConnection,
201
did: &str,
···
226
pub depth: i32,
227
}
228
229
-
#[instrument(skip_all)]
230
pub async fn get_thread_children(
231
conn: &mut AsyncPgConnection,
232
uri: &str,
···
239
.await
240
}
241
242
-
#[instrument(skip_all)]
243
pub async fn get_thread_children_branching(
244
conn: &mut AsyncPgConnection,
245
uri: &str,
···
261
pub at_uri: String,
262
}
263
264
-
#[instrument(skip_all)]
265
pub async fn get_thread_children_hidden(
266
conn: &mut AsyncPgConnection,
267
uri: &str,
···
274
.await
275
}
276
277
-
#[instrument(skip_all)]
278
pub async fn get_thread_parents(
279
conn: &mut AsyncPgConnection,
280
uri: &str,
···
287
.await
288
}
289
290
-
#[instrument(skip_all)]
291
pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> {
292
schema::posts::table
293
.select(schema::posts::root_uri)
···
298
.map(|v| v.flatten())
299
}
300
301
-
#[instrument(skip_all)]
302
pub async fn get_threadgate_hiddens(
303
conn: &mut AsyncPgConnection,
304
uri: &str,
···
1
use diesel::prelude::*;
2
use diesel::sql_types::{Array, Bool, Integer, Nullable, Text};
3
use diesel_async::{AsyncPgConnection, RunQueryDsl};
4
use parakeet_db::{schema, types};
5
+
use parakeet_db::models::TextArray;
6
7
pub async fn get_actor_status(
8
conn: &mut AsyncPgConnection,
9
did: &str,
···
38
#[diesel(sql_type = Nullable<Text>)]
39
pub list_mute: Option<String>,
40
}
41
pub async fn get_profile_state(
42
conn: &mut AsyncPgConnection,
43
did: &str,
···
50
.await
51
.optional()
52
}
53
pub async fn get_profile_states(
54
conn: &mut AsyncPgConnection,
55
did: &str,
···
84
#[diesel(sql_type = diesel::sql_types::Bool)]
85
pub pinned: bool,
86
}
87
pub async fn get_post_state(
88
conn: &mut AsyncPgConnection,
89
did: &str,
···
97
.optional()
98
}
99
100
pub async fn get_post_states(
101
conn: &mut AsyncPgConnection,
102
did: &str,
···
120
pub block: Option<String>,
121
}
122
123
pub async fn get_list_state(
124
conn: &mut AsyncPgConnection,
125
did: &str,
···
133
.optional()
134
}
135
136
pub async fn get_list_states(
137
conn: &mut AsyncPgConnection,
138
did: &str,
···
145
.await
146
}
147
148
pub async fn get_like_state(
149
conn: &mut AsyncPgConnection,
150
did: &str,
···
162
.optional()
163
}
164
165
pub async fn get_like_states(
166
conn: &mut AsyncPgConnection,
167
did: &str,
···
182
.await
183
}
184
185
pub async fn get_pinned_post_uri(
186
conn: &mut AsyncPgConnection,
187
did: &str,
···
212
pub depth: i32,
213
}
214
215
pub async fn get_thread_children(
216
conn: &mut AsyncPgConnection,
217
uri: &str,
···
224
.await
225
}
226
227
pub async fn get_thread_children_branching(
228
conn: &mut AsyncPgConnection,
229
uri: &str,
···
245
pub at_uri: String,
246
}
247
248
pub async fn get_thread_children_hidden(
249
conn: &mut AsyncPgConnection,
250
uri: &str,
···
257
.await
258
}
259
260
pub async fn get_thread_parents(
261
conn: &mut AsyncPgConnection,
262
uri: &str,
···
269
.await
270
}
271
272
pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> {
273
schema::posts::table
274
.select(schema::posts::root_uri)
···
279
.map(|v| v.flatten())
280
}
281
282
pub async fn get_threadgate_hiddens(
283
conn: &mut AsyncPgConnection,
284
uri: &str,
-3
parakeet/src/hydration/embed.rs
-3
parakeet/src/hydration/embed.rs
···
8
use lexica::app_bsky::feed::PostView;
9
use parakeet_db::models;
10
use std::collections::HashMap;
11
-
use tracing::instrument;
12
13
fn build_aspect_ratio(height: Option<i32>, width: Option<i32>) -> Option<AspectRatio> {
14
height
···
177
out
178
}
179
180
-
#[instrument(skip_all)]
181
pub async fn hydrate_embed(&self, post: String) -> Option<Embed> {
182
let (embed, author) = self.loaders.embed.load(post).await?;
183
···
197
}
198
}
199
200
-
#[instrument(skip_all)]
201
pub async fn hydrate_embeds(&self, posts: Vec<String>) -> HashMap<String, Embed> {
202
let embeds = self.loaders.embed.load_many(posts).await;
203
···
8
use lexica::app_bsky::feed::PostView;
9
use parakeet_db::models;
10
use std::collections::HashMap;
11
12
fn build_aspect_ratio(height: Option<i32>, width: Option<i32>) -> Option<AspectRatio> {
13
height
···
176
out
177
}
178
179
pub async fn hydrate_embed(&self, post: String) -> Option<Embed> {
180
let (embed, author) = self.loaders.embed.load(post).await?;
181
···
195
}
196
}
197
198
pub async fn hydrate_embeds(&self, posts: Vec<String>) -> HashMap<String, Embed> {
199
let embeds = self.loaders.embed.load_many(posts).await;
200
-5
parakeet/src/hydration/feedgen.rs
-5
parakeet/src/hydration/feedgen.rs
···
5
use parakeet_db::models;
6
use std::collections::HashMap;
7
use std::str::FromStr;
8
-
use tracing::instrument;
9
10
fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState {
11
GeneratorViewerState {
···
50
}
51
52
impl super::StatefulHydrator<'_> {
53
-
#[instrument(skip_all)]
54
pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> {
55
let labels = self.get_label(&feedgen).await;
56
let viewer = self.get_feedgen_viewer_state(&feedgen).await;
···
63
))
64
}
65
66
-
#[instrument(skip_all)]
67
pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> {
68
let labels = self.get_label_many(&feedgens).await;
69
let viewers = self.get_feedgen_viewer_states(&feedgens).await;
···
93
.collect()
94
}
95
96
-
#[instrument(skip_all)]
97
async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> {
98
if let Some(viewer) = &self.current_actor {
99
let data = self.loaders.like_state.get(viewer, subject).await?;
···
104
}
105
}
106
107
-
#[instrument(skip_all)]
108
async fn get_feedgen_viewer_states(
109
&self,
110
subjects: &[String],
···
5
use parakeet_db::models;
6
use std::collections::HashMap;
7
use std::str::FromStr;
8
9
fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState {
10
GeneratorViewerState {
···
49
}
50
51
impl super::StatefulHydrator<'_> {
52
pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> {
53
let labels = self.get_label(&feedgen).await;
54
let viewer = self.get_feedgen_viewer_state(&feedgen).await;
···
61
))
62
}
63
64
pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> {
65
let labels = self.get_label_many(&feedgens).await;
66
let viewers = self.get_feedgen_viewer_states(&feedgens).await;
···
90
.collect()
91
}
92
93
async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> {
94
if let Some(viewer) = &self.current_actor {
95
let data = self.loaders.like_state.get(viewer, subject).await?;
···
100
}
101
}
102
103
async fn get_feedgen_viewer_states(
104
&self,
105
subjects: &[String],
-7
parakeet/src/hydration/labeler.rs
-7
parakeet/src/hydration/labeler.rs
···
8
use parakeet_db::models;
9
use std::collections::HashMap;
10
use std::str::FromStr;
11
-
use tracing::instrument;
12
13
fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState {
14
LabelerViewerState {
···
99
}
100
101
impl StatefulHydrator<'_> {
102
-
#[instrument(skip_all)]
103
pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> {
104
let labels = self.get_label(&labeler).await;
105
let viewer = self.get_labeler_viewer_state(&labeler).await;
···
110
Some(build_view(labeler, creator, labels, viewer, likes))
111
}
112
113
-
#[instrument(skip_all)]
114
pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> {
115
let labels = self.get_label_many(&labelers).await;
116
let labelers = self.loaders.labeler.load_many(labelers).await;
···
136
.collect()
137
}
138
139
-
#[instrument(skip_all)]
140
pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> {
141
let labels = self.get_label(&labeler).await;
142
let viewer = self.get_labeler_viewer_state(&labeler).await;
···
149
))
150
}
151
152
-
#[instrument(skip_all)]
153
pub async fn hydrate_labelers_detailed(
154
&self,
155
labelers: Vec<String>,
···
180
.collect()
181
}
182
183
-
#[instrument(skip_all)]
184
async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> {
185
if let Some(viewer) = &self.current_actor {
186
let data = self
···
195
}
196
}
197
198
-
#[instrument(skip_all)]
199
async fn get_labeler_viewer_states(
200
&self,
201
subjects: &[String],
···
8
use parakeet_db::models;
9
use std::collections::HashMap;
10
use std::str::FromStr;
11
12
fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState {
13
LabelerViewerState {
···
98
}
99
100
impl StatefulHydrator<'_> {
101
pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> {
102
let labels = self.get_label(&labeler).await;
103
let viewer = self.get_labeler_viewer_state(&labeler).await;
···
108
Some(build_view(labeler, creator, labels, viewer, likes))
109
}
110
111
pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> {
112
let labels = self.get_label_many(&labelers).await;
113
let labelers = self.loaders.labeler.load_many(labelers).await;
···
133
.collect()
134
}
135
136
pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> {
137
let labels = self.get_label(&labeler).await;
138
let viewer = self.get_labeler_viewer_state(&labeler).await;
···
145
))
146
}
147
148
pub async fn hydrate_labelers_detailed(
149
&self,
150
labelers: Vec<String>,
···
175
.collect()
176
}
177
178
async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> {
179
if let Some(viewer) = &self.current_actor {
180
let data = self
···
189
}
190
}
191
192
async fn get_labeler_viewer_states(
193
&self,
194
subjects: &[String],
-7
parakeet/src/hydration/list.rs
-7
parakeet/src/hydration/list.rs
···
6
use parakeet_db::models;
7
use std::collections::HashMap;
8
use std::str::FromStr;
9
-
use tracing::instrument;
10
11
fn build_viewer(data: ListStateRet) -> ListViewerState {
12
ListViewerState {
···
70
}
71
72
impl StatefulHydrator<'_> {
73
-
#[instrument(skip_all)]
74
pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> {
75
let labels = self.get_label(&list).await;
76
let viewer = self.get_list_viewer_state(&list).await;
···
79
build_basic(list, count, labels, viewer, &self.cdn)
80
}
81
82
-
#[instrument(skip_all)]
83
pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> {
84
if lists.is_empty() {
85
return HashMap::new();
···
100
.collect()
101
}
102
103
-
#[instrument(skip_all)]
104
pub async fn hydrate_list(&self, list: String) -> Option<ListView> {
105
let labels = self.get_label(&list).await;
106
let viewer = self.get_list_viewer_state(&list).await;
···
110
build_listview(list, count, profile, labels, viewer, &self.cdn)
111
}
112
113
-
#[instrument(skip_all)]
114
pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> {
115
if lists.is_empty() {
116
return HashMap::new();
···
136
.collect()
137
}
138
139
-
#[instrument(skip_all)]
140
async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> {
141
if let Some(viewer) = &self.current_actor {
142
let data = self.loaders.list_state.get(viewer, subject).await?;
···
147
}
148
}
149
150
-
#[instrument(skip_all)]
151
async fn get_list_viewer_states(
152
&self,
153
subjects: &[String],
···
6
use parakeet_db::models;
7
use std::collections::HashMap;
8
use std::str::FromStr;
9
10
fn build_viewer(data: ListStateRet) -> ListViewerState {
11
ListViewerState {
···
69
}
70
71
impl StatefulHydrator<'_> {
72
pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> {
73
let labels = self.get_label(&list).await;
74
let viewer = self.get_list_viewer_state(&list).await;
···
77
build_basic(list, count, labels, viewer, &self.cdn)
78
}
79
80
pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> {
81
if lists.is_empty() {
82
return HashMap::new();
···
97
.collect()
98
}
99
100
pub async fn hydrate_list(&self, list: String) -> Option<ListView> {
101
let labels = self.get_label(&list).await;
102
let viewer = self.get_list_viewer_state(&list).await;
···
106
build_listview(list, count, profile, labels, viewer, &self.cdn)
107
}
108
109
pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> {
110
if lists.is_empty() {
111
return HashMap::new();
···
131
.collect()
132
}
133
134
async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> {
135
if let Some(viewer) = &self.current_actor {
136
let data = self.loaders.list_state.get(viewer, subject).await?;
···
141
}
142
}
143
144
async fn get_list_viewer_states(
145
&self,
146
subjects: &[String],
-4
parakeet/src/hydration/mod.rs
-4
parakeet/src/hydration/mod.rs
···
63
}
64
}
65
66
-
#[tracing::instrument(skip_all)]
67
async fn get_label(&self, uri: &str) -> Vec<parakeet_db::models::Label> {
68
self.loaders.label.load(uri, self.accept_labelers).await
69
}
70
71
-
#[tracing::instrument(skip_all)]
72
async fn get_profile_label(&self, did: &str) -> Vec<parakeet_db::models::Label> {
73
let uris = &[
74
did.to_string(),
···
82
.collect()
83
}
84
85
-
#[tracing::instrument(skip_all)]
86
async fn get_label_many(
87
&self,
88
uris: &[String],
···
93
.await
94
}
95
96
-
#[tracing::instrument(skip_all)]
97
async fn get_profile_label_many(
98
&self,
99
uris: &[String],
···
63
}
64
}
65
66
async fn get_label(&self, uri: &str) -> Vec<parakeet_db::models::Label> {
67
self.loaders.label.load(uri, self.accept_labelers).await
68
}
69
70
async fn get_profile_label(&self, did: &str) -> Vec<parakeet_db::models::Label> {
71
let uris = &[
72
did.to_string(),
···
80
.collect()
81
}
82
83
async fn get_label_many(
84
&self,
85
uris: &[String],
···
90
.await
91
}
92
93
async fn get_profile_label_many(
94
&self,
95
uris: &[String],
-9
parakeet/src/hydration/posts.rs
-9
parakeet/src/hydration/posts.rs
···
11
use parakeet_db::models;
12
use parakeet_index::PostStats;
13
use std::collections::HashMap;
14
-
use tracing::instrument;
15
16
fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState {
17
let is_me = did == data.did;
···
83
}
84
85
impl StatefulHydrator<'_> {
86
-
#[instrument(skip_all)]
87
async fn hydrate_threadgate(
88
&self,
89
threadgate: Option<models::Threadgate>,
···
102
))
103
}
104
105
-
#[instrument(skip_all)]
106
async fn hydrate_threadgates(
107
&self,
108
threadgates: Vec<models::Threadgate>,
···
134
.collect()
135
}
136
137
-
#[instrument(skip_all)]
138
pub async fn hydrate_post(&self, post: String) -> Option<PostView> {
139
let stats = self.loaders.post_stats.load(post.clone()).await;
140
let (post, threadgate) = self.loaders.posts.load(post).await?;
···
149
)))
150
}
151
152
-
#[instrument(skip_all)]
153
async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> {
154
let stats = self.loaders.post_stats.load_many(posts.clone()).await;
155
let posts = self.loaders.posts.load_many(posts).await;
···
189
.collect()
190
}
191
192
-
#[instrument(skip_all)]
193
pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> {
194
self.hydrate_posts_inner(posts)
195
.await
···
198
.collect()
199
}
200
201
-
#[instrument(skip_all)]
202
pub async fn hydrate_feed_posts(
203
&self,
204
posts: Vec<RawFeedItem>,
···
302
.collect()
303
}
304
305
-
#[instrument(skip_all)]
306
async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> {
307
if let Some(viewer) = &self.current_actor {
308
let data = self.loaders.post_state.get(viewer, subject).await?;
···
313
}
314
}
315
316
-
#[instrument(skip_all)]
317
async fn get_post_viewer_states(
318
&self,
319
subjects: &[String],
···
11
use parakeet_db::models;
12
use parakeet_index::PostStats;
13
use std::collections::HashMap;
14
15
fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState {
16
let is_me = did == data.did;
···
82
}
83
84
impl StatefulHydrator<'_> {
85
async fn hydrate_threadgate(
86
&self,
87
threadgate: Option<models::Threadgate>,
···
100
))
101
}
102
103
async fn hydrate_threadgates(
104
&self,
105
threadgates: Vec<models::Threadgate>,
···
131
.collect()
132
}
133
134
pub async fn hydrate_post(&self, post: String) -> Option<PostView> {
135
let stats = self.loaders.post_stats.load(post.clone()).await;
136
let (post, threadgate) = self.loaders.posts.load(post).await?;
···
145
)))
146
}
147
148
async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> {
149
let stats = self.loaders.post_stats.load_many(posts.clone()).await;
150
let posts = self.loaders.posts.load_many(posts).await;
···
184
.collect()
185
}
186
187
pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> {
188
self.hydrate_posts_inner(posts)
189
.await
···
192
.collect()
193
}
194
195
pub async fn hydrate_feed_posts(
196
&self,
197
posts: Vec<RawFeedItem>,
···
295
.collect()
296
}
297
298
async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> {
299
if let Some(viewer) = &self.current_actor {
300
let data = self.loaders.post_state.get(viewer, subject).await?;
···
305
}
306
}
307
308
async fn get_post_viewer_states(
309
&self,
310
subjects: &[String],
+1
-12
parakeet/src/hydration/profile.rs
+1
-12
parakeet/src/hydration/profile.rs
···
12
use std::collections::HashMap;
13
use std::str::FromStr;
14
use std::sync::OnceLock;
15
-
use tracing::instrument;
16
17
pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new();
18
···
52
.followed
53
.map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject));
54
55
-
let blocking = data.list_block.or(data
56
-
.blocking
57
-
.map(|rkey| format!("at://{}/app.bsky.graph.block/{rkey}", data.did)));
58
59
ProfileViewerState {
60
muted: data.muting.unwrap_or_default(),
···
275
}
276
277
impl super::StatefulHydrator<'_> {
278
-
#[instrument(skip_all)]
279
pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> {
280
let labels = self.get_profile_label(&did).await;
281
let viewer = self.get_profile_viewer_state(&did).await;
···
293
))
294
}
295
296
-
#[instrument(skip_all)]
297
pub async fn hydrate_profiles_basic(
298
&self,
299
dids: Vec<String>,
···
318
.collect()
319
}
320
321
-
#[instrument(skip_all)]
322
pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> {
323
let labels = self.get_profile_label(&did).await;
324
let viewer = self.get_profile_viewer_state(&did).await;
···
336
))
337
}
338
339
-
#[instrument(skip_all)]
340
pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> {
341
let labels = self.get_profile_label_many(&dids).await;
342
let viewers = self.get_profile_viewer_states(&dids).await;
···
358
.collect()
359
}
360
361
-
#[instrument(skip_all)]
362
pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> {
363
let labels = self.get_profile_label(&did).await;
364
let viewer = self.get_profile_viewer_state(&did).await;
···
376
))
377
}
378
379
-
#[instrument(skip_all)]
380
pub async fn hydrate_profiles_detailed(
381
&self,
382
dids: Vec<String>,
···
401
.collect()
402
}
403
404
-
#[instrument(skip_all)]
405
async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> {
406
if let Some(viewer) = &self.current_actor {
407
let data = self.loaders.profile_state.get(viewer, subject).await?;
···
421
}
422
}
423
424
-
#[instrument(skip_all)]
425
async fn get_profile_viewer_states(
426
&self,
427
dids: &[String],
···
12
use std::collections::HashMap;
13
use std::str::FromStr;
14
use std::sync::OnceLock;
15
16
pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new();
17
···
51
.followed
52
.map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject));
53
54
+
let blocking = data.list_block.or(data.blocking);
55
56
ProfileViewerState {
57
muted: data.muting.unwrap_or_default(),
···
272
}
273
274
impl super::StatefulHydrator<'_> {
275
pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> {
276
let labels = self.get_profile_label(&did).await;
277
let viewer = self.get_profile_viewer_state(&did).await;
···
289
))
290
}
291
292
pub async fn hydrate_profiles_basic(
293
&self,
294
dids: Vec<String>,
···
313
.collect()
314
}
315
316
pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> {
317
let labels = self.get_profile_label(&did).await;
318
let viewer = self.get_profile_viewer_state(&did).await;
···
330
))
331
}
332
333
pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> {
334
let labels = self.get_profile_label_many(&dids).await;
335
let viewers = self.get_profile_viewer_states(&dids).await;
···
351
.collect()
352
}
353
354
pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> {
355
let labels = self.get_profile_label(&did).await;
356
let viewer = self.get_profile_viewer_state(&did).await;
···
368
))
369
}
370
371
pub async fn hydrate_profiles_detailed(
372
&self,
373
dids: Vec<String>,
···
392
.collect()
393
}
394
395
async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> {
396
if let Some(viewer) = &self.current_actor {
397
let data = self.loaders.profile_state.get(viewer, subject).await?;
···
411
}
412
}
413
414
async fn get_profile_viewer_states(
415
&self,
416
dids: &[String],
+5
-11
parakeet/src/hydration/starter_packs.rs
+5
-11
parakeet/src/hydration/starter_packs.rs
···
4
use lexica::app_bsky::graph::{ListViewBasic, StarterPackView, StarterPackViewBasic};
5
use parakeet_db::models;
6
use std::collections::HashMap;
7
-
use tracing::instrument;
8
9
fn build_basic(
10
starter_pack: models::StaterPack,
···
51
}
52
53
impl StatefulHydrator<'_> {
54
-
#[instrument(skip_all)]
55
pub async fn hydrate_starterpack_basic(&self, pack: String) -> Option<StarterPackViewBasic> {
56
let labels = self.get_label(&pack).await;
57
let sp = self.loaders.starterpacks.load(pack).await?;
···
61
Some(build_basic(sp, creator, labels, list_item_count))
62
}
63
64
-
#[instrument(skip_all)]
65
pub async fn hydrate_starterpacks_basic(
66
&self,
67
packs: Vec<String>,
···
89
.collect()
90
}
91
92
-
#[instrument(skip_all)]
93
pub async fn hydrate_starterpack(&self, pack: String) -> Option<StarterPackView> {
94
let labels = self.get_label(&pack).await;
95
let sp = self.loaders.starterpacks.load(pack).await?;
···
97
let creator = self.hydrate_profile_basic(sp.owner.clone()).await?;
98
let list = self.hydrate_list_basic(sp.list.clone()).await;
99
100
-
let feeds = sp.feeds.clone().unwrap_or_default();
101
-
let feeds = self
102
-
.hydrate_feedgens(feeds.into())
103
-
.await
104
-
.into_values()
105
-
.collect();
106
107
Some(build_spview(sp, creator, labels, list, feeds))
108
}
109
110
-
#[instrument(skip_all)]
111
pub async fn hydrate_starterpacks(
112
&self,
113
packs: Vec<String>,
···
4
use lexica::app_bsky::graph::{ListViewBasic, StarterPackView, StarterPackViewBasic};
5
use parakeet_db::models;
6
use std::collections::HashMap;
7
8
fn build_basic(
9
starter_pack: models::StaterPack,
···
50
}
51
52
impl StatefulHydrator<'_> {
53
pub async fn hydrate_starterpack_basic(&self, pack: String) -> Option<StarterPackViewBasic> {
54
let labels = self.get_label(&pack).await;
55
let sp = self.loaders.starterpacks.load(pack).await?;
···
59
Some(build_basic(sp, creator, labels, list_item_count))
60
}
61
62
pub async fn hydrate_starterpacks_basic(
63
&self,
64
packs: Vec<String>,
···
86
.collect()
87
}
88
89
pub async fn hydrate_starterpack(&self, pack: String) -> Option<StarterPackView> {
90
let labels = self.get_label(&pack).await;
91
let sp = self.loaders.starterpacks.load(pack).await?;
···
93
let creator = self.hydrate_profile_basic(sp.owner.clone()).await?;
94
let list = self.hydrate_list_basic(sp.list.clone()).await;
95
96
+
let feeds = sp
97
+
.feeds
98
+
.clone()
99
+
.unwrap_or_default();
100
+
let feeds = self.hydrate_feedgens(feeds.into()).await.into_values().collect();
101
102
Some(build_spview(sp, creator, labels, list, feeds))
103
}
104
105
pub async fn hydrate_starterpacks(
106
&self,
107
packs: Vec<String>,
-57
parakeet/src/instrumentation.rs
-57
parakeet/src/instrumentation.rs
···
1
-
use opentelemetry::trace::TracerProvider;
2
-
use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig};
3
-
use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider};
4
-
use tracing::Subscriber;
5
-
use tracing_opentelemetry::OpenTelemetryLayer;
6
-
use tracing_subscriber::filter::Filtered;
7
-
use tracing_subscriber::layer::SubscriberExt;
8
-
use tracing_subscriber::registry::LookupSpan;
9
-
use tracing_subscriber::util::SubscriberInitExt;
10
-
use tracing_subscriber::{EnvFilter, Layer};
11
-
12
-
pub fn init_instruments(cfg: &crate::config::ConfigInstruments) {
13
-
let otel_layer = cfg.otel_enable.then(init_otel);
14
-
let log_layer = init_log(cfg.log_json);
15
-
16
-
tracing_subscriber::registry()
17
-
.with(log_layer)
18
-
.with(otel_layer)
19
-
.init();
20
-
}
21
-
22
-
fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S>
23
-
where
24
-
S: Subscriber + for<'span> LookupSpan<'span>,
25
-
{
26
-
let span_exporter = SpanExporter::builder()
27
-
.with_http()
28
-
.with_protocol(Protocol::HttpBinary)
29
-
.build()
30
-
.unwrap();
31
-
32
-
let tracer_provider = SdkTracerProvider::builder()
33
-
.with_batch_exporter(span_exporter)
34
-
.with_sampler(Sampler::AlwaysOn)
35
-
.build();
36
-
37
-
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
38
-
39
-
let tracer = tracer_provider.tracer("parakeet");
40
-
let otel_filter = EnvFilter::new("info,otel::tracing=trace,tower_http=off");
41
-
42
-
OpenTelemetryLayer::new(tracer).with_filter(otel_filter)
43
-
}
44
-
45
-
fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S>
46
-
where
47
-
S: Subscriber + for<'span> LookupSpan<'span>,
48
-
{
49
-
let stdout_filter =
50
-
EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap());
51
-
52
-
match json {
53
-
true => tracing_subscriber::fmt::layer().json().boxed(),
54
-
false => tracing_subscriber::fmt::layer().boxed(),
55
-
}
56
-
.with_filter(stdout_filter)
57
-
}
···
+4
-27
parakeet/src/loaders.rs
+4
-27
parakeet/src/loaders.rs
···
15
use serde::{Deserialize, Serialize};
16
use std::collections::HashMap;
17
use std::str::FromStr;
18
-
use tracing::instrument;
19
20
type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>;
21
···
64
) -> Dataloaders {
65
Dataloaders {
66
embed: new_plc_loader(EmbedLoader(pool.clone()), &rc, "embed", 3600),
67
-
feedgen: new_plc_loader(FeedGenLoader(pool.clone()), &rc, "feedgen", 600),
68
handle: new_plc_loader(HandleLoader(pool.clone()), &rc, "handle", 60),
69
label: LabelLoader(pool.clone()), // CARE: never cache this.
70
-
labeler: new_plc_loader(LabelServiceLoader(pool.clone()), &rc, "labeler", 600),
71
like: NonCachedLoader::new(LikeLoader(idxc.clone())),
72
like_state: LikeRecordLoader(pool.clone()),
73
list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600),
···
86
87
pub struct LikeLoader(parakeet_index::Client);
88
impl BatchFn<String, i32> for LikeLoader {
89
-
#[instrument(name = "LikeLoader", skip_all)]
90
async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> {
91
let res = self
92
.0
···
109
110
pub struct LikeRecordLoader(Pool<AsyncPgConnection>);
111
impl LikeRecordLoader {
112
-
#[instrument(name = "LikeRecordLoader::get", skip_all)]
113
pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> {
114
let mut conn = self.0.get().await.unwrap();
115
···
121
})
122
}
123
124
-
#[instrument(name = "LikeRecordLoader::get_many", skip_all)]
125
pub async fn get_many(
126
&self,
127
did: &str,
···
143
144
pub struct HandleLoader(Pool<AsyncPgConnection>);
145
impl BatchFn<String, String> for HandleLoader {
146
-
#[instrument(name = "HandleLoader", skip_all)]
147
async fn load(&mut self, keys: &[String]) -> HashMap<String, String> {
148
let mut conn = self.0.get().await.unwrap();
149
···
176
Option<ProfileAllowSubscriptions>,
177
);
178
impl BatchFn<String, ProfileLoaderRet> for ProfileLoader {
179
-
#[instrument(name = "ProfileLoader", skip_all)]
180
async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> {
181
let mut conn = self.0.get().await.unwrap();
182
···
238
239
pub struct ProfileStatsLoader(parakeet_index::Client);
240
impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader {
241
-
#[instrument(name = "ProfileStatsLoader", skip_all)]
242
async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> {
243
let stats_req = parakeet_index::GetStatsManyReq {
244
uris: keys.to_vec(),
···
255
256
pub struct ProfileStateLoader(Pool<AsyncPgConnection>);
257
impl ProfileStateLoader {
258
-
#[instrument(name = "ProfileStateLoader::get", skip_all)]
259
pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> {
260
let mut conn = self.0.get().await.unwrap();
261
···
267
})
268
}
269
270
-
#[instrument(name = "ProfileStateLoader::get_many", skip_all)]
271
pub async fn get_many(
272
&self,
273
did: &str,
···
288
pub struct ListLoader(Pool<AsyncPgConnection>);
289
type ListLoaderRet = (models::List, i64);
290
impl BatchFn<String, ListLoaderRet> for ListLoader {
291
-
#[instrument(name = "ListLoaderRet", skip_all)]
292
async fn load(&mut self, keys: &[String]) -> HashMap<String, ListLoaderRet> {
293
let mut conn = self.0.get().await.unwrap();
294
···
320
321
pub struct ListStateLoader(Pool<AsyncPgConnection>);
322
impl ListStateLoader {
323
-
#[instrument(name = "ListStateLoader::get", skip_all)]
324
pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> {
325
let mut conn = self.0.get().await.unwrap();
326
···
332
})
333
}
334
335
-
#[instrument(name = "ListStateLoader::get_many", skip_all)]
336
pub async fn get_many(
337
&self,
338
did: &str,
···
350
}
351
}
352
353
-
pub struct FeedGenLoader(Pool<AsyncPgConnection>);
354
impl BatchFn<String, models::FeedGen> for FeedGenLoader {
355
-
#[instrument(name = "FeedGenLoader", skip_all)]
356
async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> {
357
let mut conn = self.0.get().await.unwrap();
358
···
378
pub struct PostLoader(Pool<AsyncPgConnection>);
379
type PostLoaderRet = (models::Post, Option<models::Threadgate>);
380
impl BatchFn<String, PostLoaderRet> for PostLoader {
381
-
#[instrument(name = "PostLoader", skip_all)]
382
async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> {
383
let mut conn = self.0.get().await.unwrap();
384
···
409
410
pub struct PostStatsLoader(parakeet_index::Client);
411
impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader {
412
-
#[instrument(name = "PostStatsLoader", skip_all)]
413
async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> {
414
let stats_req = parakeet_index::GetStatsManyReq {
415
uris: keys.to_vec(),
···
426
427
pub struct PostStateLoader(Pool<AsyncPgConnection>);
428
impl PostStateLoader {
429
-
#[instrument(name = "PostStateLoader::get", skip_all)]
430
pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> {
431
let mut conn = self.0.get().await.unwrap();
432
···
438
})
439
}
440
441
-
#[instrument(name = "PostStateLoader::get_many", skip_all)]
442
pub async fn get_many(
443
&self,
444
did: &str,
···
466
RecordWithMedia(models::PostEmbedRecord, Box<EmbedLoaderRet>),
467
}
468
impl BatchFn<String, (EmbedLoaderRet, String)> for EmbedLoader {
469
-
#[instrument(name = "EmbedLoader", skip_all)]
470
async fn load(&mut self, keys: &[String]) -> HashMap<String, (EmbedLoaderRet, String)> {
471
let mut conn = self.0.get().await.unwrap();
472
···
549
pub struct StarterPackLoader(Pool<AsyncPgConnection>);
550
type StarterPackLoaderRet = models::StaterPack;
551
impl BatchFn<String, StarterPackLoaderRet> for StarterPackLoader {
552
-
#[instrument(name = "StarterPackLoader", skip_all)]
553
async fn load(&mut self, keys: &[String]) -> HashMap<String, StarterPackLoaderRet> {
554
let mut conn = self.0.get().await.unwrap();
555
···
572
}
573
}
574
575
-
pub struct LabelServiceLoader(Pool<AsyncPgConnection>);
576
type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>);
577
impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader {
578
-
#[instrument(name = "LabelServiceLoader", skip_all)]
579
async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> {
580
let mut conn = self.0.get().await.unwrap();
581
···
614
// but it should live here anyway
615
pub struct LabelLoader(Pool<AsyncPgConnection>);
616
impl LabelLoader {
617
-
#[instrument(name = "LabelLoader::load", skip_all)]
618
pub async fn load(&self, uri: &str, services: &[LabelConfigItem]) -> Vec<models::Label> {
619
let mut conn = self.0.get().await.unwrap();
620
···
634
})
635
}
636
637
-
#[instrument(name = "LabelLoader::load_many", skip_all)]
638
pub async fn load_many(
639
&self,
640
uris: &[String],
···
669
670
pub struct VerificationLoader(Pool<AsyncPgConnection>);
671
impl BatchFn<String, Vec<models::VerificationEntry>> for VerificationLoader {
672
-
#[instrument(name = "VerificationLoader", skip_all)]
673
async fn load(&mut self, keys: &[String]) -> HashMap<String, Vec<models::VerificationEntry>> {
674
let mut conn = self.0.get().await.unwrap();
675
···
15
use serde::{Deserialize, Serialize};
16
use std::collections::HashMap;
17
use std::str::FromStr;
18
19
type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>;
20
···
63
) -> Dataloaders {
64
Dataloaders {
65
embed: new_plc_loader(EmbedLoader(pool.clone()), &rc, "embed", 3600),
66
+
feedgen: new_plc_loader(FeedGenLoader(pool.clone(), idxc.clone()), &rc, "feedgen", 600),
67
handle: new_plc_loader(HandleLoader(pool.clone()), &rc, "handle", 60),
68
label: LabelLoader(pool.clone()), // CARE: never cache this.
69
+
labeler: new_plc_loader(LabelServiceLoader(pool.clone(), idxc.clone()), &rc, "labeler", 600),
70
like: NonCachedLoader::new(LikeLoader(idxc.clone())),
71
like_state: LikeRecordLoader(pool.clone()),
72
list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600),
···
85
86
pub struct LikeLoader(parakeet_index::Client);
87
impl BatchFn<String, i32> for LikeLoader {
88
async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> {
89
let res = self
90
.0
···
107
108
pub struct LikeRecordLoader(Pool<AsyncPgConnection>);
109
impl LikeRecordLoader {
110
pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> {
111
let mut conn = self.0.get().await.unwrap();
112
···
118
})
119
}
120
121
pub async fn get_many(
122
&self,
123
did: &str,
···
139
140
pub struct HandleLoader(Pool<AsyncPgConnection>);
141
impl BatchFn<String, String> for HandleLoader {
142
async fn load(&mut self, keys: &[String]) -> HashMap<String, String> {
143
let mut conn = self.0.get().await.unwrap();
144
···
171
Option<ProfileAllowSubscriptions>,
172
);
173
impl BatchFn<String, ProfileLoaderRet> for ProfileLoader {
174
async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> {
175
let mut conn = self.0.get().await.unwrap();
176
···
232
233
pub struct ProfileStatsLoader(parakeet_index::Client);
234
impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader {
235
async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> {
236
let stats_req = parakeet_index::GetStatsManyReq {
237
uris: keys.to_vec(),
···
248
249
pub struct ProfileStateLoader(Pool<AsyncPgConnection>);
250
impl ProfileStateLoader {
251
pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> {
252
let mut conn = self.0.get().await.unwrap();
253
···
259
})
260
}
261
262
pub async fn get_many(
263
&self,
264
did: &str,
···
279
pub struct ListLoader(Pool<AsyncPgConnection>);
280
type ListLoaderRet = (models::List, i64);
281
impl BatchFn<String, ListLoaderRet> for ListLoader {
282
async fn load(&mut self, keys: &[String]) -> HashMap<String, ListLoaderRet> {
283
let mut conn = self.0.get().await.unwrap();
284
···
310
311
pub struct ListStateLoader(Pool<AsyncPgConnection>);
312
impl ListStateLoader {
313
pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> {
314
let mut conn = self.0.get().await.unwrap();
315
···
321
})
322
}
323
324
pub async fn get_many(
325
&self,
326
did: &str,
···
338
}
339
}
340
341
+
pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client);
342
impl BatchFn<String, models::FeedGen> for FeedGenLoader {
343
async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> {
344
let mut conn = self.0.get().await.unwrap();
345
···
365
pub struct PostLoader(Pool<AsyncPgConnection>);
366
type PostLoaderRet = (models::Post, Option<models::Threadgate>);
367
impl BatchFn<String, PostLoaderRet> for PostLoader {
368
async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> {
369
let mut conn = self.0.get().await.unwrap();
370
···
395
396
pub struct PostStatsLoader(parakeet_index::Client);
397
impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader {
398
async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> {
399
let stats_req = parakeet_index::GetStatsManyReq {
400
uris: keys.to_vec(),
···
411
412
pub struct PostStateLoader(Pool<AsyncPgConnection>);
413
impl PostStateLoader {
414
pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> {
415
let mut conn = self.0.get().await.unwrap();
416
···
422
})
423
}
424
425
pub async fn get_many(
426
&self,
427
did: &str,
···
449
RecordWithMedia(models::PostEmbedRecord, Box<EmbedLoaderRet>),
450
}
451
impl BatchFn<String, (EmbedLoaderRet, String)> for EmbedLoader {
452
async fn load(&mut self, keys: &[String]) -> HashMap<String, (EmbedLoaderRet, String)> {
453
let mut conn = self.0.get().await.unwrap();
454
···
531
pub struct StarterPackLoader(Pool<AsyncPgConnection>);
532
type StarterPackLoaderRet = models::StaterPack;
533
impl BatchFn<String, StarterPackLoaderRet> for StarterPackLoader {
534
async fn load(&mut self, keys: &[String]) -> HashMap<String, StarterPackLoaderRet> {
535
let mut conn = self.0.get().await.unwrap();
536
···
553
}
554
}
555
556
+
pub struct LabelServiceLoader(Pool<AsyncPgConnection>, parakeet_index::Client);
557
type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>);
558
impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader {
559
async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> {
560
let mut conn = self.0.get().await.unwrap();
561
···
594
// but it should live here anyway
595
pub struct LabelLoader(Pool<AsyncPgConnection>);
596
impl LabelLoader {
597
pub async fn load(&self, uri: &str, services: &[LabelConfigItem]) -> Vec<models::Label> {
598
let mut conn = self.0.get().await.unwrap();
599
···
613
})
614
}
615
616
pub async fn load_many(
617
&self,
618
uris: &[String],
···
647
648
pub struct VerificationLoader(Pool<AsyncPgConnection>);
649
impl BatchFn<String, Vec<models::VerificationEntry>> for VerificationLoader {
650
async fn load(&mut self, keys: &[String]) -> HashMap<String, Vec<models::VerificationEntry>> {
651
let mut conn = self.0.get().await.unwrap();
652
+5
-14
parakeet/src/main.rs
+5
-14
parakeet/src/main.rs
···
1
-
use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer};
2
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
3
use diesel_async::pooled_connection::deadpool::Pool;
4
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
···
15
mod config;
16
mod db;
17
mod hydration;
18
-
mod instrumentation;
19
mod loaders;
20
mod xrpc;
21
···
33
34
#[tokio::main]
35
async fn main() -> eyre::Result<()> {
36
-
let conf = config::load_config()?;
37
38
-
instrumentation::init_instruments(&conf.instruments);
39
40
let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url);
41
let pool = Pool::builder(db_mgr).build()?;
···
54
let redis_client = redis::Client::open(conf.redis_uri)?;
55
let redis_mp = redis_client.get_multiplexed_tokio_connection().await?;
56
57
-
let index_client = parakeet_index::connect_with_otel(conf.index_uri)
58
-
.await
59
-
.map_err(|e| eyre::eyre!(e))?;
60
61
let dataloaders = Arc::new(loaders::Dataloaders::new(
62
pool.clone(),
···
83
84
let did_doc = did_web_doc(&conf.service);
85
86
-
let mw = tower::ServiceBuilder::new()
87
-
.option_layer(conf.instruments.otel_enable.then(OtelInResponseLayer::default))
88
-
.option_layer(conf.instruments.otel_enable.then(OtelAxumLayer::default))
89
-
.layer(TraceLayer::new_for_http())
90
-
.layer(cors);
91
-
92
let app = axum::Router::new()
93
.nest("/xrpc", xrpc::xrpc_routes())
94
.route(
95
"/.well-known/did.json",
96
axum::routing::get(async || axum::Json(did_doc)),
97
)
98
-
.layer(mw)
99
.with_state(GlobalState {
100
pool,
101
redis_mp,
···
1
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
2
use diesel_async::pooled_connection::deadpool::Pool;
3
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
···
14
mod config;
15
mod db;
16
mod hydration;
17
mod loaders;
18
mod xrpc;
19
···
31
32
#[tokio::main]
33
async fn main() -> eyre::Result<()> {
34
+
tracing_subscriber::fmt::init();
35
36
+
let conf = config::load_config()?;
37
38
let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url);
39
let pool = Pool::builder(db_mgr).build()?;
···
52
let redis_client = redis::Client::open(conf.redis_uri)?;
53
let redis_mp = redis_client.get_multiplexed_tokio_connection().await?;
54
55
+
let index_client = parakeet_index::Client::connect(conf.index_uri).await?;
56
57
let dataloaders = Arc::new(loaders::Dataloaders::new(
58
pool.clone(),
···
79
80
let did_doc = did_web_doc(&conf.service);
81
82
let app = axum::Router::new()
83
.nest("/xrpc", xrpc::xrpc_routes())
84
.route(
85
"/.well-known/did.json",
86
axum::routing::get(async || axum::Json(did_doc)),
87
)
88
+
.layer(TraceLayer::new_for_http())
89
+
.layer(cors)
90
.with_state(GlobalState {
91
pool,
92
redis_mp,
+1
-1
parakeet/src/xrpc/app_bsky/bookmark.rs
+1
-1
parakeet/src/xrpc/app_bsky/bookmark.rs
+2
-6
parakeet/src/xrpc/app_bsky/feed/posts.rs
+2
-6
parakeet/src/xrpc/app_bsky/feed/posts.rs
···
24
use reqwest::Url;
25
use serde::{Deserialize, Serialize};
26
use std::collections::HashMap;
27
-
use tracing::instrument;
28
29
const FEEDGEN_SERVICE_ID: &str = "#bsky_fg";
30
···
161
162
#[derive(Debug, Default, Eq, PartialEq, Deserialize)]
163
#[serde(rename_all = "snake_case")]
164
-
#[allow(clippy::enum_variant_names)]
165
pub enum GetAuthorFeedFilter {
166
#[default]
167
PostsWithReplies,
···
200
if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? {
201
if psr.blocked.unwrap_or_default() {
202
// they block us
203
-
return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None));
204
} else if psr.blocking.is_some() {
205
// we block them
206
-
return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None));
207
}
208
}
209
}
···
614
.or(schema::posts::embed_subtype.eq_any(filter))
615
}
616
617
-
#[instrument(skip_all)]
618
async fn get_feed_skeleton(
619
feed: &str,
620
service: &str,
···
656
}
657
}
658
659
-
#[instrument(skip_all)]
660
async fn get_skeleton_repost_data(
661
conn: &mut AsyncPgConnection,
662
reposts: Vec<String>,
···
24
use reqwest::Url;
25
use serde::{Deserialize, Serialize};
26
use std::collections::HashMap;
27
28
const FEEDGEN_SERVICE_ID: &str = "#bsky_fg";
29
···
160
161
#[derive(Debug, Default, Eq, PartialEq, Deserialize)]
162
#[serde(rename_all = "snake_case")]
163
pub enum GetAuthorFeedFilter {
164
#[default]
165
PostsWithReplies,
···
198
if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? {
199
if psr.blocked.unwrap_or_default() {
200
// they block us
201
+
return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None))
202
} else if psr.blocking.is_some() {
203
// we block them
204
+
return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None))
205
}
206
}
207
}
···
612
.or(schema::posts::embed_subtype.eq_any(filter))
613
}
614
615
async fn get_feed_skeleton(
616
feed: &str,
617
service: &str,
···
653
}
654
}
655
656
async fn get_skeleton_repost_data(
657
conn: &mut AsyncPgConnection,
658
reposts: Vec<String>,
+4
-1
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
+4
-1
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
-3
parakeet/src/xrpc/jwt.rs
-3
parakeet/src/xrpc/jwt.rs
···
4
use std::collections::HashMap;
5
use std::sync::{Arc, LazyLock};
6
use tokio::sync::RwLock;
7
-
use tracing::instrument;
8
9
static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[]));
10
static NO_VERIFY: LazyLock<Validation> = LazyLock::new(|| {
···
39
}
40
}
41
42
-
#[instrument(skip_all)]
43
pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> {
44
// first we need to decode without verifying, to get iss.
45
let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?;
···
58
self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud)
59
}
60
61
-
#[instrument(skip_all)]
62
async fn resolve_key(&self, did: &str) -> Option<String> {
63
tracing::trace!("resolving multikey for {did}");
64
let did_doc = self.resolver.resolve_did(did).await.ok()??;
···
4
use std::collections::HashMap;
5
use std::sync::{Arc, LazyLock};
6
use tokio::sync::RwLock;
7
8
static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[]));
9
static NO_VERIFY: LazyLock<Validation> = LazyLock::new(|| {
···
38
}
39
}
40
41
pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> {
42
// first we need to decode without verifying, to get iss.
43
let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?;
···
56
self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud)
57
}
58
59
async fn resolve_key(&self, did: &str) -> Option<String> {
60
tracing::trace!("resolving multikey for {did}");
61
let did_doc = self.resolver.resolve_did(did).await.ok()??;
+2
-24
parakeet-index/Cargo.toml
+2
-24
parakeet-index/Cargo.toml
···
10
[dependencies]
11
tonic = "0.13.0"
12
prost = "0.13.5"
13
-
tonic-tracing-opentelemetry = { version = "0.32", optional = true }
14
-
tower = { version = "0.5", optional = true }
15
16
eyre = { version = "0.6.12", optional = true }
17
figment = { version = "0.10.19", features = ["env", "toml"], optional = true }
18
itertools = { version = "0.14.0", optional = true }
19
-
opentelemetry = { version = "0.31.0", optional = true }
20
-
opentelemetry-otlp = { version = "0.31.0", features = ["reqwest-rustls"], optional = true }
21
-
opentelemetry_sdk = { version = "0.31.0", optional = true }
22
rocksdb = { version = "0.23", default-features = false, features = ["lz4", "bindgen-runtime"], optional = true }
23
serde = { version = "1.0.217", features = ["derive"], optional = true }
24
tokio = { version = "1.42.0", features = ["full"], optional = true }
25
tonic-health = { version = "0.13.0", optional = true }
26
tracing = { version = "0.1.40", optional = true }
27
-
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true }
28
-
tracing-opentelemetry = { version = "0.32", optional = true }
29
30
[build-dependencies]
31
tonic-build = "0.13.0"
32
33
[features]
34
-
otel = ["dep:tonic-tracing-opentelemetry", "dep:tower"]
35
-
server = [
36
-
"dep:eyre",
37
-
"dep:figment",
38
-
"dep:itertools",
39
-
"dep:opentelemetry",
40
-
"dep:opentelemetry-otlp",
41
-
"dep:opentelemetry_sdk",
42
-
"dep:rocksdb",
43
-
"dep:serde",
44
-
"dep:tokio",
45
-
"dep:tonic-health",
46
-
"otel",
47
-
"dep:tracing",
48
-
"dep:tracing-subscriber",
49
-
"dep:tracing-opentelemetry"
50
-
]
···
10
[dependencies]
11
tonic = "0.13.0"
12
prost = "0.13.5"
13
14
eyre = { version = "0.6.12", optional = true }
15
figment = { version = "0.10.19", features = ["env", "toml"], optional = true }
16
itertools = { version = "0.14.0", optional = true }
17
rocksdb = { version = "0.23", default-features = false, features = ["lz4", "bindgen-runtime"], optional = true }
18
serde = { version = "1.0.217", features = ["derive"], optional = true }
19
tokio = { version = "1.42.0", features = ["full"], optional = true }
20
tonic-health = { version = "0.13.0", optional = true }
21
tracing = { version = "0.1.40", optional = true }
22
+
tracing-subscriber = { version = "0.3.18", optional = true }
23
24
[build-dependencies]
25
tonic-build = "0.13.0"
26
27
[features]
28
+
server = ["dep:eyre", "dep:figment", "dep:itertools", "dep:rocksdb", "dep:serde", "dep:tokio", "dep:tonic-health", "dep:tracing", "dep:tracing-subscriber"]
+1
-20
parakeet-index/src/lib.rs
+1
-20
parakeet-index/src/lib.rs
···
1
-
use tonic::transport::Channel;
2
-
3
#[allow(clippy::all)]
4
pub mod index {
5
tonic::include_proto!("parakeet");
6
}
7
8
pub use index::*;
9
-
#[cfg(not(feature = "otel"))]
10
-
pub type Client = index_client::IndexClient<Channel>;
11
-
#[cfg(feature = "otel")]
12
-
pub type Client = index_client::IndexClient<
13
-
tonic_tracing_opentelemetry::middleware::client::OtelGrpcService<Channel>,
14
-
>;
15
16
#[cfg(feature = "server")]
17
pub mod server;
18
-
19
-
#[cfg(feature = "otel")]
20
-
pub async fn connect_with_otel(
21
-
uri: String,
22
-
) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> {
23
-
let channel = Channel::from_shared(uri)?.connect().await?;
24
-
let channel = tower::ServiceBuilder::new()
25
-
.layer(tonic_tracing_opentelemetry::middleware::client::OtelGrpcLayer)
26
-
.service(channel);
27
-
28
-
Ok(index_client::IndexClient::new(channel))
29
-
}
+3
-9
parakeet-index/src/main.rs
+3
-9
parakeet-index/src/main.rs
···
1
use parakeet_index::index_server::IndexServer;
2
use parakeet_index::server::service::Service;
3
-
use parakeet_index::server::{GlobalState, config, instrumentation};
4
use std::sync::Arc;
5
use tonic::transport::Server;
6
-
use tonic_tracing_opentelemetry::middleware::server::OtelGrpcLayer;
7
8
#[tokio::main]
9
async fn main() -> eyre::Result<()> {
10
-
let conf = config::load_config()?;
11
12
-
instrumentation::init_instruments(&conf.instruments);
13
14
let db_root = conf.index_db_path.parse()?;
15
let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port);
···
19
reporter.set_serving::<IndexServer<Service>>().await;
20
21
let service = Service::new(state.clone());
22
-
23
-
let mw = tower::ServiceBuilder::new()
24
-
.option_layer(conf.instruments.otel_enable.then(OtelGrpcLayer::default));
25
-
26
Server::builder()
27
-
.layer(mw)
28
.add_service(health_service)
29
.add_service(IndexServer::new(service))
30
.serve(addr)
···
1
use parakeet_index::index_server::IndexServer;
2
use parakeet_index::server::service::Service;
3
+
use parakeet_index::server::{GlobalState, config};
4
use std::sync::Arc;
5
use tonic::transport::Server;
6
7
#[tokio::main]
8
async fn main() -> eyre::Result<()> {
9
+
tracing_subscriber::fmt::init();
10
11
+
let conf = config::load_config()?;
12
13
let db_root = conf.index_db_path.parse()?;
14
let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port);
···
18
reporter.set_serving::<IndexServer<Service>>().await;
19
20
let service = Service::new(state.clone());
21
Server::builder()
22
.add_service(health_service)
23
.add_service(IndexServer::new(service))
24
.serve(addr)
-10
parakeet-index/src/server/config.rs
-10
parakeet-index/src/server/config.rs
···
13
14
#[derive(Debug, Deserialize)]
15
pub struct Config {
16
-
#[serde(flatten)]
17
-
pub instruments: ConfigInstruments,
18
pub database_url: String,
19
pub index_db_path: String,
20
#[serde(default)]
21
pub server: ConfigServer,
22
-
}
23
-
24
-
#[derive(Debug, Deserialize)]
25
-
pub struct ConfigInstruments {
26
-
#[serde(default)]
27
-
pub otel_enable: bool,
28
-
#[serde(default)]
29
-
pub log_json: bool,
30
}
31
32
#[derive(Debug, Deserialize)]
-57
parakeet-index/src/server/instrumentation.rs
-57
parakeet-index/src/server/instrumentation.rs
···
1
-
use opentelemetry::trace::TracerProvider;
2
-
use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig};
3
-
use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider};
4
-
use tracing::Subscriber;
5
-
use tracing_opentelemetry::OpenTelemetryLayer;
6
-
use tracing_subscriber::filter::Filtered;
7
-
use tracing_subscriber::layer::SubscriberExt;
8
-
use tracing_subscriber::registry::LookupSpan;
9
-
use tracing_subscriber::util::SubscriberInitExt;
10
-
use tracing_subscriber::{EnvFilter, Layer};
11
-
12
-
pub fn init_instruments(cfg: &super::config::ConfigInstruments) {
13
-
let otel_layer = cfg.otel_enable.then(init_otel);
14
-
let log_layer = init_log(cfg.log_json);
15
-
16
-
tracing_subscriber::registry()
17
-
.with(log_layer)
18
-
.with(otel_layer)
19
-
.init();
20
-
}
21
-
22
-
fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S>
23
-
where
24
-
S: Subscriber + for<'span> LookupSpan<'span>,
25
-
{
26
-
let span_exporter = SpanExporter::builder()
27
-
.with_http()
28
-
.with_protocol(Protocol::HttpBinary)
29
-
.build()
30
-
.unwrap();
31
-
32
-
let tracer_provider = SdkTracerProvider::builder()
33
-
.with_batch_exporter(span_exporter)
34
-
.with_sampler(Sampler::AlwaysOn)
35
-
.build();
36
-
37
-
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
38
-
39
-
let tracer = tracer_provider.tracer("parakeet");
40
-
let otel_filter = EnvFilter::new("info,otel::tracing=trace");
41
-
42
-
OpenTelemetryLayer::new(tracer).with_filter(otel_filter)
43
-
}
44
-
45
-
fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S>
46
-
where
47
-
S: Subscriber + for<'span> LookupSpan<'span>,
48
-
{
49
-
let stdout_filter =
50
-
EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap());
51
-
52
-
match json {
53
-
true => tracing_subscriber::fmt::layer().json().boxed(),
54
-
false => tracing_subscriber::fmt::layer().boxed(),
55
-
}
56
-
.with_filter(stdout_filter)
57
-
}
···