Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

Compare changes

Choose any two refs to compare.

+12 -364
Cargo.lock
··· 286 286 ] 287 287 288 288 [[package]] 289 - name = "axum-tracing-opentelemetry" 290 - version = "0.32.1" 291 - source = "registry+https://github.com/rust-lang/crates.io-index" 292 - checksum = "328c8ddd5ca871b2a5acb00be0b4f103aa62f5d6b6db4071ccf3b12b0629e7c1" 293 - dependencies = [ 294 - "axum", 295 - "futures-core", 296 - "futures-util", 297 - "http", 298 - "opentelemetry", 299 - "opentelemetry-semantic-conventions", 300 - "pin-project-lite", 301 - "tower", 302 - "tracing", 303 - "tracing-opentelemetry", 304 - "tracing-opentelemetry-instrumentation-sdk", 305 - ] 306 - 307 - [[package]] 308 289 name = "backtrace" 309 290 version = "0.3.74" 310 291 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 366 347 "proc-macro2", 367 348 "quote", 368 349 "regex", 369 - "rustc-hash 1.1.0", 350 + "rustc-hash", 370 351 "shlex", 371 352 "syn", 372 353 "which", ··· 484 465 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" 485 466 486 467 [[package]] 487 - name = "cfg_aliases" 488 - version = "0.2.1" 489 - source = "registry+https://github.com/rust-lang/crates.io-index" 490 - checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" 491 - 492 - [[package]] 493 468 name = "chrono" 494 469 version = "0.4.41" 495 470 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1395 1370 checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" 1396 1371 dependencies = [ 1397 1372 "cfg-if", 1398 - "js-sys", 1399 1373 "libc", 1400 1374 "r-efi", 1401 1375 "wasi 0.14.2+wasi-0.2.4", 1402 - "wasm-bindgen", 1403 1376 ] 1404 1377 1405 1378 [[package]] ··· 2196 2169 ] 2197 2170 2198 2171 [[package]] 2199 - name = "lru-slab" 2200 - version = "0.1.2" 2201 - source = "registry+https://github.com/rust-lang/crates.io-index" 2202 - checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" 2203 - 2204 - [[package]] 2205 2172 name = "lz4-sys" 2206 2173 version = "1.11.1+lz4-1.10.0" 2207 2174 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2218 2185 checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" 2219 2186 2220 2187 [[package]] 2221 - name = "matchers" 2222 - version = "0.1.0" 2223 - source = "registry+https://github.com/rust-lang/crates.io-index" 2224 - checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" 2225 - dependencies = [ 2226 - "regex-automata 0.1.10", 2227 - ] 2228 - 2229 - [[package]] 2230 2188 name = "matchit" 2231 2189 version = "0.8.4" 2232 2190 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2555 2513 ] 2556 2514 2557 2515 [[package]] 2558 - name = "opentelemetry" 2559 - version = "0.31.0" 2560 - source = "registry+https://github.com/rust-lang/crates.io-index" 2561 - checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" 2562 - dependencies = [ 2563 - "futures-core", 2564 - "futures-sink", 2565 - "js-sys", 2566 - "pin-project-lite", 2567 - "thiserror 2.0.12", 2568 - "tracing", 2569 - ] 2570 - 2571 - [[package]] 2572 - name = "opentelemetry-http" 2573 - version = "0.31.0" 2574 - source = "registry+https://github.com/rust-lang/crates.io-index" 2575 - checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" 2576 - dependencies = [ 2577 - "async-trait", 2578 - "bytes", 2579 - "http", 2580 - "opentelemetry", 2581 - "reqwest", 2582 - ] 2583 - 2584 - [[package]] 2585 - name = "opentelemetry-otlp" 2586 - version = "0.31.0" 2587 - source = "registry+https://github.com/rust-lang/crates.io-index" 2588 - checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" 2589 - dependencies = [ 2590 - "http", 2591 - "opentelemetry", 2592 - "opentelemetry-http", 2593 - "opentelemetry-proto", 2594 - "opentelemetry_sdk", 2595 - "prost 0.14.1", 2596 - "reqwest", 2597 - "thiserror 2.0.12", 2598 - "tracing", 2599 - ] 2600 - 2601 - [[package]] 2602 - name = "opentelemetry-proto" 2603 - version = "0.31.0" 2604 - source = "registry+https://github.com/rust-lang/crates.io-index" 2605 - checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" 2606 - dependencies = [ 2607 - "opentelemetry", 2608 - "opentelemetry_sdk", 2609 - "prost 0.14.1", 2610 - "tonic 0.14.2", 2611 - "tonic-prost", 2612 - ] 2613 - 2614 - [[package]] 2615 - name = "opentelemetry-semantic-conventions" 2616 - version = "0.31.0" 2617 - source = "registry+https://github.com/rust-lang/crates.io-index" 2618 - checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" 2619 - 2620 - [[package]] 2621 - name = "opentelemetry_sdk" 2622 - version = "0.31.0" 2623 - source = "registry+https://github.com/rust-lang/crates.io-index" 2624 - checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" 2625 - dependencies = [ 2626 - "futures-channel", 2627 - "futures-executor", 2628 - "futures-util", 2629 - "opentelemetry", 2630 - "percent-encoding", 2631 - "rand 0.9.1", 2632 - "thiserror 2.0.12", 2633 - ] 2634 - 2635 - [[package]] 2636 2516 name = "overload" 2637 2517 version = "0.1.1" 2638 2518 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2669 2549 "async-recursion", 2670 2550 "axum", 2671 2551 "axum-extra", 2672 - "axum-tracing-opentelemetry", 2673 2552 "base64 0.22.1", 2674 2553 "chrono", 2675 2554 "dataloader", ··· 2684 2563 "jsonwebtoken", 2685 2564 "lexica", 2686 2565 "multibase", 2687 - "opentelemetry", 2688 - "opentelemetry-otlp", 2689 - "opentelemetry_sdk", 2690 2566 "parakeet-db", 2691 2567 "parakeet-index", 2692 2568 "redis", ··· 2695 2571 "serde_ipld_dagcbor", 2696 2572 "serde_json", 2697 2573 "tokio", 2698 - "tower", 2699 2574 "tower-http", 2700 2575 "tracing", 2701 - "tracing-opentelemetry", 2702 2576 "tracing-subscriber", 2703 2577 ] 2704 2578 ··· 2720 2594 "eyre", 2721 2595 "figment", 2722 2596 "itertools 0.14.0", 2723 - "opentelemetry", 2724 - "opentelemetry-otlp", 2725 - "opentelemetry_sdk", 2726 - "prost 0.13.5", 2597 + "prost", 2727 2598 "rocksdb", 2728 2599 "serde", 2729 2600 "tokio", 2730 - "tonic 0.13.1", 2601 + "tonic", 2731 2602 "tonic-build", 2732 2603 "tonic-health", 2733 - "tonic-tracing-opentelemetry", 2734 - "tower", 2735 2604 "tracing", 2736 - "tracing-opentelemetry", 2737 2605 "tracing-subscriber", 2738 2606 ] 2739 2607 ··· 3039 2907 checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 3040 2908 dependencies = [ 3041 2909 "bytes", 3042 - "prost-derive 0.13.5", 3043 - ] 3044 - 3045 - [[package]] 3046 - name = "prost" 3047 - version = "0.14.1" 3048 - source = "registry+https://github.com/rust-lang/crates.io-index" 3049 - checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" 3050 - dependencies = [ 3051 - "bytes", 3052 - "prost-derive 0.14.1", 2910 + "prost-derive", 3053 2911 ] 3054 2912 3055 2913 [[package]] ··· 3065 2923 "once_cell", 3066 2924 "petgraph", 3067 2925 "prettyplease", 3068 - "prost 0.13.5", 2926 + "prost", 3069 2927 "prost-types", 3070 2928 "regex", 3071 2929 "syn", ··· 3086 2944 ] 3087 2945 3088 2946 [[package]] 3089 - name = "prost-derive" 3090 - version = "0.14.1" 3091 - source = "registry+https://github.com/rust-lang/crates.io-index" 3092 - checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" 3093 - dependencies = [ 3094 - "anyhow", 3095 - "itertools 0.14.0", 3096 - "proc-macro2", 3097 - "quote", 3098 - "syn", 3099 - ] 3100 - 3101 - [[package]] 3102 2947 name = "prost-types" 3103 2948 version = "0.13.5" 3104 2949 source = "registry+https://github.com/rust-lang/crates.io-index" 3105 2950 checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" 3106 2951 dependencies = [ 3107 - "prost 0.13.5", 2952 + "prost", 3108 2953 ] 3109 2954 3110 2955 [[package]] ··· 3129 2974 checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" 3130 2975 3131 2976 [[package]] 3132 - name = "quinn" 3133 - version = "0.11.9" 3134 - source = "registry+https://github.com/rust-lang/crates.io-index" 3135 - checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" 3136 - dependencies = [ 3137 - "bytes", 3138 - "cfg_aliases", 3139 - "pin-project-lite", 3140 - "quinn-proto", 3141 - "quinn-udp", 3142 - "rustc-hash 2.1.1", 3143 - "rustls", 3144 - "socket2 0.6.0", 3145 - "thiserror 2.0.12", 3146 - "tokio", 3147 - "tracing", 3148 - "web-time", 3149 - ] 3150 - 3151 - [[package]] 3152 - name = "quinn-proto" 3153 - version = "0.11.13" 3154 - source = "registry+https://github.com/rust-lang/crates.io-index" 3155 - checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" 3156 - dependencies = [ 3157 - "bytes", 3158 - "getrandom 0.3.3", 3159 - "lru-slab", 3160 - "rand 0.9.1", 3161 - "ring", 3162 - "rustc-hash 2.1.1", 3163 - "rustls", 3164 - "rustls-pki-types", 3165 - "slab", 3166 - "thiserror 2.0.12", 3167 - "tinyvec", 3168 - "tracing", 3169 - "web-time", 3170 - ] 3171 - 3172 - [[package]] 3173 - name = "quinn-udp" 3174 - version = "0.5.14" 3175 - source = "registry+https://github.com/rust-lang/crates.io-index" 3176 - checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" 3177 - dependencies = [ 3178 - "cfg_aliases", 3179 - "libc", 3180 - "once_cell", 3181 - "socket2 0.6.0", 3182 - "tracing", 3183 - "windows-sys 0.59.0", 3184 - ] 3185 - 3186 - [[package]] 3187 2977 name = "quote" 3188 2978 version = "1.0.38" 3189 2979 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3345 3135 dependencies = [ 3346 3136 "aho-corasick", 3347 3137 "memchr", 3348 - "regex-automata 0.4.9", 3349 - "regex-syntax 0.8.5", 3350 - ] 3351 - 3352 - [[package]] 3353 - name = "regex-automata" 3354 - version = "0.1.10" 3355 - source = "registry+https://github.com/rust-lang/crates.io-index" 3356 - checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" 3357 - dependencies = [ 3358 - "regex-syntax 0.6.29", 3138 + "regex-automata", 3139 + "regex-syntax", 3359 3140 ] 3360 3141 3361 3142 [[package]] ··· 3366 3147 dependencies = [ 3367 3148 "aho-corasick", 3368 3149 "memchr", 3369 - "regex-syntax 0.8.5", 3150 + "regex-syntax", 3370 3151 ] 3371 3152 3372 3153 [[package]] 3373 3154 name = "regex-syntax" 3374 - version = "0.6.29" 3375 - source = "registry+https://github.com/rust-lang/crates.io-index" 3376 - checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" 3377 - 3378 - [[package]] 3379 - name = "regex-syntax" 3380 3155 version = "0.8.5" 3381 3156 source = "registry+https://github.com/rust-lang/crates.io-index" 3382 3157 checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" ··· 3391 3166 "base64 0.22.1", 3392 3167 "bytes", 3393 3168 "encoding_rs", 3394 - "futures-channel", 3395 3169 "futures-core", 3396 3170 "futures-util", 3397 3171 "h2", ··· 3410 3184 "once_cell", 3411 3185 "percent-encoding", 3412 3186 "pin-project-lite", 3413 - "quinn", 3414 - "rustls", 3415 - "rustls-native-certs", 3416 3187 "rustls-pemfile", 3417 - "rustls-pki-types", 3418 3188 "serde", 3419 3189 "serde_json", 3420 3190 "serde_urlencoded", ··· 3422 3192 "system-configuration", 3423 3193 "tokio", 3424 3194 "tokio-native-tls", 3425 - "tokio-rustls", 3426 3195 "tokio-util", 3427 3196 "tower", 3428 3197 "tower-service", ··· 3512 3281 checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" 3513 3282 3514 3283 [[package]] 3515 - name = "rustc-hash" 3516 - version = "2.1.1" 3517 - source = "registry+https://github.com/rust-lang/crates.io-index" 3518 - checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 3519 - 3520 - [[package]] 3521 3284 name = "rustc_version" 3522 3285 version = "0.4.1" 3523 3286 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3547 3310 dependencies = [ 3548 3311 "aws-lc-rs", 3549 3312 "once_cell", 3550 - "ring", 3551 3313 "rustls-pki-types", 3552 3314 "rustls-webpki", 3553 3315 "subtle", ··· 3580 3342 version = "1.11.0" 3581 3343 source = "registry+https://github.com/rust-lang/crates.io-index" 3582 3344 checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" 3583 - dependencies = [ 3584 - "web-time", 3585 - ] 3586 3345 3587 3346 [[package]] 3588 3347 name = "rustls-webpki" ··· 4426 4185 "hyper-util", 4427 4186 "percent-encoding", 4428 4187 "pin-project", 4429 - "prost 0.13.5", 4188 + "prost", 4430 4189 "socket2 0.5.8", 4431 4190 "tokio", 4432 4191 "tokio-stream", 4433 4192 "tower", 4434 - "tower-layer", 4435 - "tower-service", 4436 - "tracing", 4437 - ] 4438 - 4439 - [[package]] 4440 - name = "tonic" 4441 - version = "0.14.2" 4442 - source = "registry+https://github.com/rust-lang/crates.io-index" 4443 - checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" 4444 - dependencies = [ 4445 - "async-trait", 4446 - "base64 0.22.1", 4447 - "bytes", 4448 - "http", 4449 - "http-body", 4450 - "http-body-util", 4451 - "percent-encoding", 4452 - "pin-project", 4453 - "sync_wrapper", 4454 - "tokio-stream", 4455 4193 "tower-layer", 4456 4194 "tower-service", 4457 4195 "tracing", ··· 4477 4215 source = "registry+https://github.com/rust-lang/crates.io-index" 4478 4216 checksum = "cb87334d340313fefa513b6e60794d44a86d5f039b523229c99c323e4e19ca4b" 4479 4217 dependencies = [ 4480 - "prost 0.13.5", 4218 + "prost", 4481 4219 "tokio", 4482 4220 "tokio-stream", 4483 - "tonic 0.13.1", 4484 - ] 4485 - 4486 - [[package]] 4487 - name = "tonic-prost" 4488 - version = "0.14.2" 4489 - source = "registry+https://github.com/rust-lang/crates.io-index" 4490 - checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" 4491 - dependencies = [ 4492 - "bytes", 4493 - "prost 0.14.1", 4494 - "tonic 0.14.2", 4495 - ] 4496 - 4497 - [[package]] 4498 - name = "tonic-tracing-opentelemetry" 4499 - version = "0.32.0" 4500 - source = "registry+https://github.com/rust-lang/crates.io-index" 4501 - checksum = "31f57ac46b32b08989476b498239364c300b09d75928c1fa2e46cb489a41c8e3" 4502 - dependencies = [ 4503 - "futures-core", 4504 - "futures-util", 4505 - "http", 4506 - "http-body", 4507 - "hyper", 4508 - "opentelemetry", 4509 - "pin-project-lite", 4510 - "tonic 0.14.2", 4511 - "tower", 4512 - "tracing", 4513 - "tracing-opentelemetry", 4514 - "tracing-opentelemetry-instrumentation-sdk", 4221 + "tonic", 4515 4222 ] 4516 4223 4517 4224 [[package]] ··· 4606 4313 ] 4607 4314 4608 4315 [[package]] 4609 - name = "tracing-opentelemetry" 4610 - version = "0.32.0" 4611 - source = "registry+https://github.com/rust-lang/crates.io-index" 4612 - checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" 4613 - dependencies = [ 4614 - "js-sys", 4615 - "opentelemetry", 4616 - "opentelemetry_sdk", 4617 - "rustversion", 4618 - "smallvec", 4619 - "thiserror 2.0.12", 4620 - "tracing", 4621 - "tracing-core", 4622 - "tracing-log", 4623 - "tracing-subscriber", 4624 - "web-time", 4625 - ] 4626 - 4627 - [[package]] 4628 - name = "tracing-opentelemetry-instrumentation-sdk" 4629 - version = "0.32.1" 4630 - source = "registry+https://github.com/rust-lang/crates.io-index" 4631 - checksum = "7a1a4dcfb798af2cef9e47c30a14e13c108b4b40e057120401b2025ec622c416" 4632 - dependencies = [ 4633 - "http", 4634 - "opentelemetry", 4635 - "opentelemetry-semantic-conventions", 4636 - "tracing", 4637 - "tracing-opentelemetry", 4638 - ] 4639 - 4640 - [[package]] 4641 - name = "tracing-serde" 4642 - version = "0.2.0" 4643 - source = "registry+https://github.com/rust-lang/crates.io-index" 4644 - checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" 4645 - dependencies = [ 4646 - "serde", 4647 - "tracing-core", 4648 - ] 4649 - 4650 - [[package]] 4651 4316 name = "tracing-subscriber" 4652 4317 version = "0.3.19" 4653 4318 source = "registry+https://github.com/rust-lang/crates.io-index" 4654 4319 checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" 4655 4320 dependencies = [ 4656 - "matchers", 4657 4321 "nu-ansi-term", 4658 - "once_cell", 4659 - "regex", 4660 - "serde", 4661 - "serde_json", 4662 4322 "sharded-slab", 4663 4323 "smallvec", 4664 4324 "thread_local", 4665 - "tracing", 4666 4325 "tracing-core", 4667 4326 "tracing-log", 4668 - "tracing-serde", 4669 4327 ] 4670 4328 4671 4329 [[package]] ··· 4934 4592 version = "0.3.77" 4935 4593 source = "registry+https://github.com/rust-lang/crates.io-index" 4936 4594 checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" 4937 - dependencies = [ 4938 - "js-sys", 4939 - "wasm-bindgen", 4940 - ] 4941 - 4942 - [[package]] 4943 - name = "web-time" 4944 - version = "1.1.0" 4945 - source = "registry+https://github.com/rust-lang/crates.io-index" 4946 - checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" 4947 4595 dependencies = [ 4948 4596 "js-sys", 4949 4597 "wasm-bindgen",
+1 -1
consumer/Cargo.toml
··· 36 36 tokio-tungstenite = { version = "0.26.1", features = ["native-tls"] } 37 37 tokio-util = { version = "0.7.14", features = ["io", "rt"] } 38 38 tracing = "0.1.40" 39 - tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } 39 + tracing-subscriber = "0.3.18"
+3 -3
consumer/src/backfill/downloader.rs
··· 84 84 85 85 // has the repo already been downloaded? 86 86 if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() { 87 - tracing::info!("skipping duplicate repo {did}"); 87 + tracing::warn!("skipping duplicate repo {did}"); 88 88 continue; 89 89 } 90 90 ··· 92 92 match db::actor_get_statuses(&mut conn, &did).await { 93 93 Ok(Some((_, state))) => { 94 94 if state == ActorSyncState::Synced || state == ActorSyncState::Processing { 95 - tracing::info!("skipping duplicate repo {did}"); 95 + tracing::warn!("skipping duplicate repo {did}"); 96 96 continue; 97 97 } 98 98 } ··· 206 206 let _ = rc.zadd(BF_REM_KEY, &pds, rem).await; 207 207 let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await; 208 208 } 209 - Ok(_) => tracing::debug!(pds, "got response with no ratelimit headers."), 209 + Ok(_) => tracing::warn!(pds, "got response with no ratelimit headers."), 210 210 Err(e) => { 211 211 tracing::error!(pds, did, "failed to download repo: {e}"); 212 212 continue;
+5 -4
consumer/src/backfill/repo.rs
··· 53 53 54 54 match block { 55 55 CarEntry::Commit(_) => { 56 - tracing::debug!("got commit entry that was not in root") 56 + tracing::warn!("got commit entry that was not in root") 57 57 } 58 58 CarEntry::Record(CarRecordEntry::Known(record)) => { 59 59 if let Some(path) = mst_nodes.remove(&cid) { ··· 96 96 } 97 97 } 98 98 99 - let Some(commit) = commit else { 100 - eyre::bail!("repo contained no commit?"); 101 - }; 99 + let commit = commit.unwrap(); 102 100 103 101 Ok((commit, deltas, copies)) 104 102 } ··· 171 169 } 172 170 RecordTypes::AppBskyFeedThreadgate(record) => { 173 171 if !at_uri_is_by(&record.post, did) { 172 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 174 173 return Ok(()); 175 174 } 176 175 ··· 195 194 RecordTypes::AppBskyGraphListItem(rec) => { 196 195 let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>(); 197 196 if did != split_aturi[2] { 197 + // it's also probably a bad idea to log *all* the attempts to do this... 198 + tracing::warn!("tried to create a listitem on a list we don't control!"); 198 199 return Ok(()); 199 200 } 200 201
-8
consumer/src/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 - #[serde(flatten)] 17 - pub instruments: ConfigInstruments, 18 16 pub index_uri: String, 19 17 pub database: deadpool_postgres::Config, 20 18 pub redis_uri: String, ··· 29 27 pub indexer: Option<IndexerConfig>, 30 28 /// Configuration items specific to backfill 31 29 pub backfill: Option<BackfillConfig>, 32 - } 33 - 34 - #[derive(Debug, Deserialize)] 35 - pub struct ConfigInstruments { 36 - #[serde(default)] 37 - pub log_json: bool, 38 30 } 39 31 40 32 #[derive(Debug, Deserialize)]
+3 -3
consumer/src/db/actor.rs
··· 69 69 ) 70 70 .await?; 71 71 72 - res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 72 + Ok(res.map(|v| (v.get(0), v.get(1)))) 73 73 } 74 74 75 75 pub async fn actor_get_repo_status<C: GenericClient>( ··· 83 83 ) 84 84 .await?; 85 85 86 - res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 86 + Ok(res.map(|v| (v.get(0), v.get(1)))) 87 87 } 88 88 89 89 pub async fn actor_get_statuses<C: GenericClient>( ··· 97 97 ) 98 98 .await?; 99 99 100 - res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 100 + Ok(res.map(|v| (v.get(0), v.get(1)))) 101 101 }
+9 -10
consumer/src/db/backfill.rs
··· 51 51 ) 52 52 .await?; 53 53 54 - res.into_iter() 55 - .map(|row| { 56 - Ok(BackfillRow { 57 - repo: row.try_get(0)?, 58 - repo_ver: row.try_get(1)?, 59 - cid: row.try_get(2)?, 60 - data: row.try_get(3)?, 61 - indexed_at: row.try_get(4)?, 62 - }) 54 + Ok(res 55 + .into_iter() 56 + .map(|row| BackfillRow { 57 + repo: row.get(0), 58 + repo_ver: row.get(1), 59 + cid: row.get(2), 60 + data: row.get(3), 61 + indexed_at: row.get(4), 63 62 }) 64 - .collect() 63 + .collect()) 65 64 } 66 65 67 66 pub async fn backfill_delete_rows<C: GenericClient>(conn: &mut C, repo: &str) -> PgExecResult {
+1 -1
consumer/src/db/copy.rs
··· 205 205 ) 206 206 .await? 207 207 .into_iter() 208 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).collect::<Result<_, _>>()?; 208 + .map(|v| (v.get(0), v.get(1), v.get(2))).collect(); 209 209 210 210 for (root, post, created_at) in threadgated { 211 211 match super::post_enforce_threadgate(conn, &root, did, created_at, true).await {
+13 -18
consumer/src/db/gates.rs
··· 47 47 &[&root_author, &post_author], 48 48 ) 49 49 .await? 50 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()?; 50 + .map(|v| (v.get(0), v.get(1))); 51 51 52 52 if let Some((following, followed)) = profile_state { 53 53 if allow.contains(THREADGATE_RULE_FOLLOWER) && followed { ··· 65 65 let mentions: Vec<String> = conn 66 66 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 67 67 .await? 68 - .and_then(|r| r.try_get::<_, Option<_>>(0).transpose()) 69 - .transpose()? 68 + .map(|r| r.get(0)) 70 69 .unwrap_or_default(); 71 70 72 71 if mentions.contains(&post_author.to_owned()) { ··· 85 84 &[&allow_lists, &post_author], 86 85 ) 87 86 .await? 88 - .try_get(0)?; 87 + .get(0); 89 88 if count != 0 { 90 89 return Ok(false); 91 90 } ··· 137 136 ) 138 137 .await? 139 138 .into_iter() 140 - .map(|row| row.try_get(0)) 141 - .collect::<Result<_, _>>()?; 139 + .map(|row| row.get(0)) 140 + .collect(); 142 141 143 142 // this will be empty if there are no replies. 144 143 if dids.is_empty() { ··· 153 152 }) 154 153 .collect::<Vec<_>>(); 155 154 156 - let allow: HashSet<_> = HashSet::from_iter(allow.iter().map(|v| v.as_str())); 155 + let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str())); 157 156 158 157 if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() { 159 158 let current_dids: Vec<_> = dids.iter().collect(); ··· 161 160 let res = conn.query( 162 161 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", 163 162 &[&root_author, &current_dids] 164 - ).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<HashSet<_>, _>>()?; 163 + ).await?; 165 164 166 - dids = &dids - &res; 165 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 167 166 } 168 167 169 168 if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { ··· 172 171 let res = conn.query( 173 172 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", 174 173 &[&root_author, &current_dids] 175 - ).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<_, _>>()?; 174 + ).await?; 176 175 177 - dids = &dids - &res; 176 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 178 177 } 179 178 180 179 if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { 181 180 let mentions: Vec<String> = conn 182 181 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 183 182 .await? 184 - .and_then(|r| r.try_get::<_, Option<_>>(0).transpose()) 185 - .transpose()? 183 + .map(|r| r.get(0)) 186 184 .unwrap_or_default(); 187 185 188 186 dids = &dids - &HashSet::from_iter(mentions); ··· 196 194 "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", 197 195 &[&allowed_lists, &current_dids], 198 196 ) 199 - .await? 200 - .into_iter() 201 - .map(|row| row.try_get(0)) 202 - .collect::<Result<_, _>>()?; 197 + .await?; 203 198 204 - dids = &dids - &res; 199 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 205 200 } 206 201 207 202 let dids = dids.into_iter().collect::<Vec<_>>();
+20 -17
consumer/src/db/record.rs
··· 127 127 ], 128 128 ) 129 129 .await 130 - .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 130 + .map(|r| r.get::<_, i32>(0) == 0) 131 131 } 132 132 133 133 pub async fn feedgen_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 159 159 ) 160 160 .await?; 161 161 162 - res.map(|v| v.try_get(0)).transpose() 162 + Ok(res.map(|v| v.get(0))) 163 163 } 164 164 165 165 pub async fn labeler_upsert<C: GenericClient>( ··· 224 224 ) 225 225 .await?; 226 226 227 - res.map(|v| v.try_get(0)).transpose() 227 + Ok(res.map(|v| v.get(0))) 228 228 } 229 229 230 230 pub async fn list_upsert<C: GenericClient>( ··· 255 255 ], 256 256 ) 257 257 .await 258 - .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 258 + .map(|r| r.get::<_, i32>(0) == 0) 259 259 } 260 260 261 261 pub async fn list_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 391 391 ) 392 392 .await?; 393 393 394 - res.map(|row| Ok((row.try_get(0)?, row.try_get(1)?))) 395 - .transpose() 394 + Ok(res.map(|row| (row.get(0), row.get(1)))) 396 395 } 397 396 398 397 pub async fn post_embed_insert<C: GenericClient>( ··· 537 536 conn: &mut C, 538 537 post: &str, 539 538 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 540 - conn.query_opt( 541 - "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 - &[&post], 543 - ) 544 - .await? 545 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))) 546 - .transpose() 539 + let res = conn 540 + .query_opt( 541 + "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 + &[&post], 543 + ) 544 + .await? 545 + .map(|v| (v.get(0), v.get(1), v.get(2))); 546 + 547 + Ok(res) 547 548 } 548 549 549 550 pub async fn postgate_upsert<C: GenericClient>( ··· 650 651 ) 651 652 .await?; 652 653 653 - res.map(|v| v.try_get(0)).transpose() 654 + Ok(res.map(|v| v.get(0))) 654 655 } 655 656 656 657 pub async fn starter_pack_upsert<C: GenericClient>( ··· 685 686 ], 686 687 ) 687 688 .await 688 - .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 689 + .map(|r| r.get::<_, i32>(0) == 0) 689 690 } 690 691 691 692 pub async fn starter_pack_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 730 731 conn: &mut C, 731 732 post: &str, 732 733 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 733 - conn 734 + let res = conn 734 735 .query_opt( 735 736 "SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL", 736 737 &[&post], 737 738 ) 738 739 .await? 739 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).transpose() 740 + .map(|v| (v.get(0), v.get(1), v.get(2))); 741 + 742 + Ok(res) 740 743 } 741 744 742 745 pub async fn threadgate_upsert<C: GenericClient>(
+4
consumer/src/indexer/mod.rs
··· 640 640 } 641 641 RecordTypes::AppBskyFeedPostgate(record) => { 642 642 if !at_uri_is_by(&record.post, repo) { 643 + tracing::warn!("tried to create a postgate on a post we don't control!"); 643 644 return Ok(()); 644 645 } 645 646 ··· 669 670 } 670 671 RecordTypes::AppBskyFeedThreadgate(record) => { 671 672 if !at_uri_is_by(&record.post, repo) { 673 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 672 674 return Ok(()); 673 675 } 674 676 ··· 708 710 } 709 711 RecordTypes::AppBskyGraphListItem(record) => { 710 712 if !at_uri_is_by(&record.list, repo) { 713 + // it's also probably a bad idea to log *all* the attempts to do this... 714 + tracing::warn!("tried to create a listitem on a list we don't control!"); 711 715 return Ok(()); 712 716 } 713 717
-25
consumer/src/instrumentation.rs
··· 1 - use tracing::Subscriber; 2 - use tracing_subscriber::filter::Filtered; 3 - use tracing_subscriber::layer::SubscriberExt; 4 - use tracing_subscriber::registry::LookupSpan; 5 - use tracing_subscriber::util::SubscriberInitExt; 6 - use tracing_subscriber::{EnvFilter, Layer}; 7 - 8 - pub fn init_instruments(cfg: &crate::config::ConfigInstruments) { 9 - let log_layer = init_log(cfg.log_json); 10 - 11 - tracing_subscriber::registry().with(log_layer).init(); 12 - } 13 - 14 - fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 15 - where 16 - S: Subscriber + for<'span> LookupSpan<'span>, 17 - { 18 - let stdout_filter = EnvFilter::from_default_env(); 19 - 20 - match json { 21 - true => tracing_subscriber::fmt::layer().json().boxed(), 22 - false => tracing_subscriber::fmt::layer().boxed(), 23 - } 24 - .with_filter(stdout_filter) 25 - }
+1 -2
consumer/src/main.rs
··· 12 12 mod db; 13 13 mod firehose; 14 14 mod indexer; 15 - mod instrumentation; 16 15 mod label_indexer; 17 16 mod utils; 18 17 19 18 #[tokio::main] 20 19 async fn main() -> eyre::Result<()> { 20 + tracing_subscriber::fmt::init(); 21 21 PrometheusBuilder::new().install()?; 22 22 23 23 let cli = cmd::parse(); 24 24 let conf = config::load_config()?; 25 25 26 - instrumentation::init_instruments(&conf.instruments); 27 26 let user_agent = build_ua(&conf.ua_contact); 28 27 29 28 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
+2 -2
migrations/2025-09-27-171241_post-tweaks/up.sql
··· 34 34 language plpgsql as 35 35 $$ 36 36 begin 37 - delete from author_feeds where did = OLD.did and uri = OLD.at_uri and typ = 'post'; 37 + delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post'; 38 38 return OLD; 39 39 end; 40 40 $$; ··· 67 67 language plpgsql as 68 68 $$ 69 69 begin 70 - delete from author_feeds where did = OLD.did and post = OLD.post and typ = 'repost'; 70 + delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost'; 71 71 return OLD; 72 72 end; 73 73 $$;
+2 -8
parakeet/Cargo.toml
··· 6 6 [dependencies] 7 7 async-recursion = "1.1.1" 8 8 axum = { version = "0.8", features = ["json"] } 9 - axum-tracing-opentelemetry = "0.32" 10 9 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 11 10 base64 = "0.22" 12 11 chrono = { version = "0.4.39", features = ["serde"] } ··· 22 21 jsonwebtoken = { git = "https://gitlab.com/parakeet-social/jsonwebtoken", branch = "es256k" } 23 22 lexica = { path = "../lexica" } 24 23 multibase = "0.9.1" 25 - opentelemetry = "0.31.0" 26 - opentelemetry-otlp = "0.31.0" 27 - opentelemetry_sdk = "0.31.0" 28 24 parakeet-db = { path = "../parakeet-db" } 29 - parakeet-index = { path = "../parakeet-index", features = ["otel"] } 25 + parakeet-index = { path = "../parakeet-index" } 30 26 redis = { version = "0.32", features = ["tokio-native-tls-comp"] } 31 27 reqwest = { version = "0.12", features = ["json"] } 32 28 serde = { version = "1.0.217", features = ["derive"] } 33 29 serde_ipld_dagcbor = "0.6.1" 34 30 serde_json = "1.0.134" 35 31 tokio = { version = "1.42.0", features = ["full"] } 36 - tower = "0.5" 37 32 tower-http = { version = "0.6.2", features = ["cors", "trace"] } 38 33 tracing = "0.1.40" 39 - tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } 40 - tracing-opentelemetry = "0.32" 34 + tracing-subscriber = "0.3.18"
+2 -2
parakeet/src/cache.rs
··· 29 29 type Val = V; 30 30 31 31 async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> { 32 - let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, key).await.ok()?; 32 + let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, &key).await.ok()?; 33 33 34 34 match serde_ipld_dagcbor::from_slice(&res?) { 35 35 Ok(v) => Some(v), ··· 57 57 } 58 58 59 59 async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> { 60 - let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, key) 60 + let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, &key) 61 61 .await 62 62 .ok()?; 63 63
-10
parakeet/src/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 - #[serde(flatten)] 17 - pub instruments: ConfigInstruments, 18 16 pub index_uri: String, 19 17 pub database_url: String, 20 18 pub redis_uri: String, ··· 29 27 pub did_allowlist: Option<Vec<String>>, 30 28 #[serde(default)] 31 29 pub migrate: bool, 32 - } 33 - 34 - #[derive(Debug, Deserialize)] 35 - pub struct ConfigInstruments { 36 - #[serde(default)] 37 - pub otel_enable: bool, 38 - #[serde(default)] 39 - pub log_json: bool, 40 30 } 41 31 42 32 #[derive(Debug, Deserialize)]
+1 -21
parakeet/src/db.rs
··· 1 1 use diesel::prelude::*; 2 2 use diesel::sql_types::{Array, Bool, Integer, Nullable, Text}; 3 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 4 - use parakeet_db::models::TextArray; 5 4 use parakeet_db::{schema, types}; 6 - use tracing::instrument; 5 + use parakeet_db::models::TextArray; 7 6 8 - #[instrument(skip_all)] 9 7 pub async fn get_actor_status( 10 8 conn: &mut AsyncPgConnection, 11 9 did: &str, ··· 40 38 #[diesel(sql_type = Nullable<Text>)] 41 39 pub list_mute: Option<String>, 42 40 } 43 - 44 - #[instrument(skip_all)] 45 41 pub async fn get_profile_state( 46 42 conn: &mut AsyncPgConnection, 47 43 did: &str, ··· 54 50 .await 55 51 .optional() 56 52 } 57 - 58 - #[instrument(skip_all)] 59 53 pub async fn get_profile_states( 60 54 conn: &mut AsyncPgConnection, 61 55 did: &str, ··· 90 84 #[diesel(sql_type = diesel::sql_types::Bool)] 91 85 pub pinned: bool, 92 86 } 93 - 94 - #[instrument(skip_all)] 95 87 pub async fn get_post_state( 96 88 conn: &mut AsyncPgConnection, 97 89 did: &str, ··· 105 97 .optional() 106 98 } 107 99 108 - #[instrument(skip_all)] 109 100 pub async fn get_post_states( 110 101 conn: &mut AsyncPgConnection, 111 102 did: &str, ··· 129 120 pub block: Option<String>, 130 121 } 131 122 132 - #[instrument(skip_all)] 133 123 pub async fn get_list_state( 134 124 conn: &mut AsyncPgConnection, 135 125 did: &str, ··· 143 133 .optional() 144 134 } 145 135 146 - #[instrument(skip_all)] 147 136 pub async fn get_list_states( 148 137 conn: &mut AsyncPgConnection, 149 138 did: &str, ··· 156 145 .await 157 146 } 158 147 159 - #[instrument(skip_all)] 160 148 pub async fn get_like_state( 161 149 conn: &mut AsyncPgConnection, 162 150 did: &str, ··· 174 162 .optional() 175 163 } 176 164 177 - #[instrument(skip_all)] 178 165 pub async fn get_like_states( 179 166 conn: &mut AsyncPgConnection, 180 167 did: &str, ··· 195 182 .await 196 183 } 197 184 198 - #[instrument(skip_all)] 199 185 pub async fn get_pinned_post_uri( 200 186 conn: &mut AsyncPgConnection, 201 187 did: &str, ··· 226 212 pub depth: i32, 227 213 } 228 214 229 - #[instrument(skip_all)] 230 215 pub async fn get_thread_children( 231 216 conn: &mut AsyncPgConnection, 232 217 uri: &str, ··· 239 224 .await 240 225 } 241 226 242 - #[instrument(skip_all)] 243 227 pub async fn get_thread_children_branching( 244 228 conn: &mut AsyncPgConnection, 245 229 uri: &str, ··· 261 245 pub at_uri: String, 262 246 } 263 247 264 - #[instrument(skip_all)] 265 248 pub async fn get_thread_children_hidden( 266 249 conn: &mut AsyncPgConnection, 267 250 uri: &str, ··· 274 257 .await 275 258 } 276 259 277 - #[instrument(skip_all)] 278 260 pub async fn get_thread_parents( 279 261 conn: &mut AsyncPgConnection, 280 262 uri: &str, ··· 287 269 .await 288 270 } 289 271 290 - #[instrument(skip_all)] 291 272 pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> { 292 273 schema::posts::table 293 274 .select(schema::posts::root_uri) ··· 298 279 .map(|v| v.flatten()) 299 280 } 300 281 301 - #[instrument(skip_all)] 302 282 pub async fn get_threadgate_hiddens( 303 283 conn: &mut AsyncPgConnection, 304 284 uri: &str,
-3
parakeet/src/hydration/embed.rs
··· 8 8 use lexica::app_bsky::feed::PostView; 9 9 use parakeet_db::models; 10 10 use std::collections::HashMap; 11 - use tracing::instrument; 12 11 13 12 fn build_aspect_ratio(height: Option<i32>, width: Option<i32>) -> Option<AspectRatio> { 14 13 height ··· 177 176 out 178 177 } 179 178 180 - #[instrument(skip_all)] 181 179 pub async fn hydrate_embed(&self, post: String) -> Option<Embed> { 182 180 let (embed, author) = self.loaders.embed.load(post).await?; 183 181 ··· 197 195 } 198 196 } 199 197 200 - #[instrument(skip_all)] 201 198 pub async fn hydrate_embeds(&self, posts: Vec<String>) -> HashMap<String, Embed> { 202 199 let embeds = self.loaders.embed.load_many(posts).await; 203 200
-5
parakeet/src/hydration/feedgen.rs
··· 5 5 use parakeet_db::models; 6 6 use std::collections::HashMap; 7 7 use std::str::FromStr; 8 - use tracing::instrument; 9 8 10 9 fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState { 11 10 GeneratorViewerState { ··· 50 49 } 51 50 52 51 impl super::StatefulHydrator<'_> { 53 - #[instrument(skip_all)] 54 52 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 55 53 let labels = self.get_label(&feedgen).await; 56 54 let viewer = self.get_feedgen_viewer_state(&feedgen).await; ··· 63 61 )) 64 62 } 65 63 66 - #[instrument(skip_all)] 67 64 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 68 65 let labels = self.get_label_many(&feedgens).await; 69 66 let viewers = self.get_feedgen_viewer_states(&feedgens).await; ··· 93 90 .collect() 94 91 } 95 92 96 - #[instrument(skip_all)] 97 93 async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> { 98 94 if let Some(viewer) = &self.current_actor { 99 95 let data = self.loaders.like_state.get(viewer, subject).await?; ··· 104 100 } 105 101 } 106 102 107 - #[instrument(skip_all)] 108 103 async fn get_feedgen_viewer_states( 109 104 &self, 110 105 subjects: &[String],
-7
parakeet/src/hydration/labeler.rs
··· 8 8 use parakeet_db::models; 9 9 use std::collections::HashMap; 10 10 use std::str::FromStr; 11 - use tracing::instrument; 12 11 13 12 fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState { 14 13 LabelerViewerState { ··· 99 98 } 100 99 101 100 impl StatefulHydrator<'_> { 102 - #[instrument(skip_all)] 103 101 pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> { 104 102 let labels = self.get_label(&labeler).await; 105 103 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 110 108 Some(build_view(labeler, creator, labels, viewer, likes)) 111 109 } 112 110 113 - #[instrument(skip_all)] 114 111 pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> { 115 112 let labels = self.get_label_many(&labelers).await; 116 113 let labelers = self.loaders.labeler.load_many(labelers).await; ··· 136 133 .collect() 137 134 } 138 135 139 - #[instrument(skip_all)] 140 136 pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> { 141 137 let labels = self.get_label(&labeler).await; 142 138 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 149 145 )) 150 146 } 151 147 152 - #[instrument(skip_all)] 153 148 pub async fn hydrate_labelers_detailed( 154 149 &self, 155 150 labelers: Vec<String>, ··· 180 175 .collect() 181 176 } 182 177 183 - #[instrument(skip_all)] 184 178 async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> { 185 179 if let Some(viewer) = &self.current_actor { 186 180 let data = self ··· 195 189 } 196 190 } 197 191 198 - #[instrument(skip_all)] 199 192 async fn get_labeler_viewer_states( 200 193 &self, 201 194 subjects: &[String],
-7
parakeet/src/hydration/list.rs
··· 6 6 use parakeet_db::models; 7 7 use std::collections::HashMap; 8 8 use std::str::FromStr; 9 - use tracing::instrument; 10 9 11 10 fn build_viewer(data: ListStateRet) -> ListViewerState { 12 11 ListViewerState { ··· 70 69 } 71 70 72 71 impl StatefulHydrator<'_> { 73 - #[instrument(skip_all)] 74 72 pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> { 75 73 let labels = self.get_label(&list).await; 76 74 let viewer = self.get_list_viewer_state(&list).await; ··· 79 77 build_basic(list, count, labels, viewer, &self.cdn) 80 78 } 81 79 82 - #[instrument(skip_all)] 83 80 pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> { 84 81 if lists.is_empty() { 85 82 return HashMap::new(); ··· 100 97 .collect() 101 98 } 102 99 103 - #[instrument(skip_all)] 104 100 pub async fn hydrate_list(&self, list: String) -> Option<ListView> { 105 101 let labels = self.get_label(&list).await; 106 102 let viewer = self.get_list_viewer_state(&list).await; ··· 110 106 build_listview(list, count, profile, labels, viewer, &self.cdn) 111 107 } 112 108 113 - #[instrument(skip_all)] 114 109 pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> { 115 110 if lists.is_empty() { 116 111 return HashMap::new(); ··· 136 131 .collect() 137 132 } 138 133 139 - #[instrument(skip_all)] 140 134 async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> { 141 135 if let Some(viewer) = &self.current_actor { 142 136 let data = self.loaders.list_state.get(viewer, subject).await?; ··· 147 141 } 148 142 } 149 143 150 - #[instrument(skip_all)] 151 144 async fn get_list_viewer_states( 152 145 &self, 153 146 subjects: &[String],
-4
parakeet/src/hydration/mod.rs
··· 63 63 } 64 64 } 65 65 66 - #[tracing::instrument(skip_all)] 67 66 async fn get_label(&self, uri: &str) -> Vec<parakeet_db::models::Label> { 68 67 self.loaders.label.load(uri, self.accept_labelers).await 69 68 } 70 69 71 - #[tracing::instrument(skip_all)] 72 70 async fn get_profile_label(&self, did: &str) -> Vec<parakeet_db::models::Label> { 73 71 let uris = &[ 74 72 did.to_string(), ··· 82 80 .collect() 83 81 } 84 82 85 - #[tracing::instrument(skip_all)] 86 83 async fn get_label_many( 87 84 &self, 88 85 uris: &[String], ··· 93 90 .await 94 91 } 95 92 96 - #[tracing::instrument(skip_all)] 97 93 async fn get_profile_label_many( 98 94 &self, 99 95 uris: &[String],
-9
parakeet/src/hydration/posts.rs
··· 11 11 use parakeet_db::models; 12 12 use parakeet_index::PostStats; 13 13 use std::collections::HashMap; 14 - use tracing::instrument; 15 14 16 15 fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState { 17 16 let is_me = did == data.did; ··· 83 82 } 84 83 85 84 impl StatefulHydrator<'_> { 86 - #[instrument(skip_all)] 87 85 async fn hydrate_threadgate( 88 86 &self, 89 87 threadgate: Option<models::Threadgate>, ··· 102 100 )) 103 101 } 104 102 105 - #[instrument(skip_all)] 106 103 async fn hydrate_threadgates( 107 104 &self, 108 105 threadgates: Vec<models::Threadgate>, ··· 134 131 .collect() 135 132 } 136 133 137 - #[instrument(skip_all)] 138 134 pub async fn hydrate_post(&self, post: String) -> Option<PostView> { 139 135 let stats = self.loaders.post_stats.load(post.clone()).await; 140 136 let (post, threadgate) = self.loaders.posts.load(post).await?; ··· 149 145 ))) 150 146 } 151 147 152 - #[instrument(skip_all)] 153 148 async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> { 154 149 let stats = self.loaders.post_stats.load_many(posts.clone()).await; 155 150 let posts = self.loaders.posts.load_many(posts).await; ··· 189 184 .collect() 190 185 } 191 186 192 - #[instrument(skip_all)] 193 187 pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 194 188 self.hydrate_posts_inner(posts) 195 189 .await ··· 198 192 .collect() 199 193 } 200 194 201 - #[instrument(skip_all)] 202 195 pub async fn hydrate_feed_posts( 203 196 &self, 204 197 posts: Vec<RawFeedItem>, ··· 302 295 .collect() 303 296 } 304 297 305 - #[instrument(skip_all)] 306 298 async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> { 307 299 if let Some(viewer) = &self.current_actor { 308 300 let data = self.loaders.post_state.get(viewer, subject).await?; ··· 313 305 } 314 306 } 315 307 316 - #[instrument(skip_all)] 317 308 async fn get_post_viewer_states( 318 309 &self, 319 310 subjects: &[String],
+1 -12
parakeet/src/hydration/profile.rs
··· 12 12 use std::collections::HashMap; 13 13 use std::str::FromStr; 14 14 use std::sync::OnceLock; 15 - use tracing::instrument; 16 15 17 16 pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new(); 18 17 ··· 52 51 .followed 53 52 .map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject)); 54 53 55 - let blocking = data.list_block.or(data 56 - .blocking 57 - .map(|rkey| format!("at://{}/app.bsky.graph.block/{rkey}", data.did))); 54 + let blocking = data.list_block.or(data.blocking); 58 55 59 56 ProfileViewerState { 60 57 muted: data.muting.unwrap_or_default(), ··· 275 272 } 276 273 277 274 impl super::StatefulHydrator<'_> { 278 - #[instrument(skip_all)] 279 275 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 280 276 let labels = self.get_profile_label(&did).await; 281 277 let viewer = self.get_profile_viewer_state(&did).await; ··· 293 289 )) 294 290 } 295 291 296 - #[instrument(skip_all)] 297 292 pub async fn hydrate_profiles_basic( 298 293 &self, 299 294 dids: Vec<String>, ··· 318 313 .collect() 319 314 } 320 315 321 - #[instrument(skip_all)] 322 316 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 323 317 let labels = self.get_profile_label(&did).await; 324 318 let viewer = self.get_profile_viewer_state(&did).await; ··· 336 330 )) 337 331 } 338 332 339 - #[instrument(skip_all)] 340 333 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 341 334 let labels = self.get_profile_label_many(&dids).await; 342 335 let viewers = self.get_profile_viewer_states(&dids).await; ··· 358 351 .collect() 359 352 } 360 353 361 - #[instrument(skip_all)] 362 354 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 363 355 let labels = self.get_profile_label(&did).await; 364 356 let viewer = self.get_profile_viewer_state(&did).await; ··· 376 368 )) 377 369 } 378 370 379 - #[instrument(skip_all)] 380 371 pub async fn hydrate_profiles_detailed( 381 372 &self, 382 373 dids: Vec<String>, ··· 401 392 .collect() 402 393 } 403 394 404 - #[instrument(skip_all)] 405 395 async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> { 406 396 if let Some(viewer) = &self.current_actor { 407 397 let data = self.loaders.profile_state.get(viewer, subject).await?; ··· 421 411 } 422 412 } 423 413 424 - #[instrument(skip_all)] 425 414 async fn get_profile_viewer_states( 426 415 &self, 427 416 dids: &[String],
+5 -11
parakeet/src/hydration/starter_packs.rs
··· 4 4 use lexica::app_bsky::graph::{ListViewBasic, StarterPackView, StarterPackViewBasic}; 5 5 use parakeet_db::models; 6 6 use std::collections::HashMap; 7 - use tracing::instrument; 8 7 9 8 fn build_basic( 10 9 starter_pack: models::StaterPack, ··· 51 50 } 52 51 53 52 impl StatefulHydrator<'_> { 54 - #[instrument(skip_all)] 55 53 pub async fn hydrate_starterpack_basic(&self, pack: String) -> Option<StarterPackViewBasic> { 56 54 let labels = self.get_label(&pack).await; 57 55 let sp = self.loaders.starterpacks.load(pack).await?; ··· 61 59 Some(build_basic(sp, creator, labels, list_item_count)) 62 60 } 63 61 64 - #[instrument(skip_all)] 65 62 pub async fn hydrate_starterpacks_basic( 66 63 &self, 67 64 packs: Vec<String>, ··· 89 86 .collect() 90 87 } 91 88 92 - #[instrument(skip_all)] 93 89 pub async fn hydrate_starterpack(&self, pack: String) -> Option<StarterPackView> { 94 90 let labels = self.get_label(&pack).await; 95 91 let sp = self.loaders.starterpacks.load(pack).await?; ··· 97 93 let creator = self.hydrate_profile_basic(sp.owner.clone()).await?; 98 94 let list = self.hydrate_list_basic(sp.list.clone()).await; 99 95 100 - let feeds = sp.feeds.clone().unwrap_or_default(); 101 - let feeds = self 102 - .hydrate_feedgens(feeds.into()) 103 - .await 104 - .into_values() 105 - .collect(); 96 + let feeds = sp 97 + .feeds 98 + .clone() 99 + .unwrap_or_default(); 100 + let feeds = self.hydrate_feedgens(feeds.into()).await.into_values().collect(); 106 101 107 102 Some(build_spview(sp, creator, labels, list, feeds)) 108 103 } 109 104 110 - #[instrument(skip_all)] 111 105 pub async fn hydrate_starterpacks( 112 106 &self, 113 107 packs: Vec<String>,
-57
parakeet/src/instrumentation.rs
··· 1 - use opentelemetry::trace::TracerProvider; 2 - use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; 3 - use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider}; 4 - use tracing::Subscriber; 5 - use tracing_opentelemetry::OpenTelemetryLayer; 6 - use tracing_subscriber::filter::Filtered; 7 - use tracing_subscriber::layer::SubscriberExt; 8 - use tracing_subscriber::registry::LookupSpan; 9 - use tracing_subscriber::util::SubscriberInitExt; 10 - use tracing_subscriber::{EnvFilter, Layer}; 11 - 12 - pub fn init_instruments(cfg: &crate::config::ConfigInstruments) { 13 - let otel_layer = cfg.otel_enable.then(init_otel); 14 - let log_layer = init_log(cfg.log_json); 15 - 16 - tracing_subscriber::registry() 17 - .with(log_layer) 18 - .with(otel_layer) 19 - .init(); 20 - } 21 - 22 - fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S> 23 - where 24 - S: Subscriber + for<'span> LookupSpan<'span>, 25 - { 26 - let span_exporter = SpanExporter::builder() 27 - .with_http() 28 - .with_protocol(Protocol::HttpBinary) 29 - .build() 30 - .unwrap(); 31 - 32 - let tracer_provider = SdkTracerProvider::builder() 33 - .with_batch_exporter(span_exporter) 34 - .with_sampler(Sampler::AlwaysOn) 35 - .build(); 36 - 37 - opentelemetry::global::set_tracer_provider(tracer_provider.clone()); 38 - 39 - let tracer = tracer_provider.tracer("parakeet"); 40 - let otel_filter = EnvFilter::new("info,otel::tracing=trace,tower_http=off"); 41 - 42 - OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 43 - } 44 - 45 - fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 46 - where 47 - S: Subscriber + for<'span> LookupSpan<'span>, 48 - { 49 - let stdout_filter = 50 - EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 51 - 52 - match json { 53 - true => tracing_subscriber::fmt::layer().json().boxed(), 54 - false => tracing_subscriber::fmt::layer().boxed(), 55 - } 56 - .with_filter(stdout_filter) 57 - }
+4 -27
parakeet/src/loaders.rs
··· 15 15 use serde::{Deserialize, Serialize}; 16 16 use std::collections::HashMap; 17 17 use std::str::FromStr; 18 - use tracing::instrument; 19 18 20 19 type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>; 21 20 ··· 64 63 ) -> Dataloaders { 65 64 Dataloaders { 66 65 embed: new_plc_loader(EmbedLoader(pool.clone()), &rc, "embed", 3600), 67 - feedgen: new_plc_loader(FeedGenLoader(pool.clone()), &rc, "feedgen", 600), 66 + feedgen: new_plc_loader(FeedGenLoader(pool.clone(), idxc.clone()), &rc, "feedgen", 600), 68 67 handle: new_plc_loader(HandleLoader(pool.clone()), &rc, "handle", 60), 69 68 label: LabelLoader(pool.clone()), // CARE: never cache this. 70 - labeler: new_plc_loader(LabelServiceLoader(pool.clone()), &rc, "labeler", 600), 69 + labeler: new_plc_loader(LabelServiceLoader(pool.clone(), idxc.clone()), &rc, "labeler", 600), 71 70 like: NonCachedLoader::new(LikeLoader(idxc.clone())), 72 71 like_state: LikeRecordLoader(pool.clone()), 73 72 list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600), ··· 86 85 87 86 pub struct LikeLoader(parakeet_index::Client); 88 87 impl BatchFn<String, i32> for LikeLoader { 89 - #[instrument(name = "LikeLoader", skip_all)] 90 88 async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> { 91 89 let res = self 92 90 .0 ··· 109 107 110 108 pub struct LikeRecordLoader(Pool<AsyncPgConnection>); 111 109 impl LikeRecordLoader { 112 - #[instrument(name = "LikeRecordLoader::get", skip_all)] 113 110 pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> { 114 111 let mut conn = self.0.get().await.unwrap(); 115 112 ··· 121 118 }) 122 119 } 123 120 124 - #[instrument(name = "LikeRecordLoader::get_many", skip_all)] 125 121 pub async fn get_many( 126 122 &self, 127 123 did: &str, ··· 143 139 144 140 pub struct HandleLoader(Pool<AsyncPgConnection>); 145 141 impl BatchFn<String, String> for HandleLoader { 146 - #[instrument(name = "HandleLoader", skip_all)] 147 142 async fn load(&mut self, keys: &[String]) -> HashMap<String, String> { 148 143 let mut conn = self.0.get().await.unwrap(); 149 144 ··· 176 171 Option<ProfileAllowSubscriptions>, 177 172 ); 178 173 impl BatchFn<String, ProfileLoaderRet> for ProfileLoader { 179 - #[instrument(name = "ProfileLoader", skip_all)] 180 174 async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> { 181 175 let mut conn = self.0.get().await.unwrap(); 182 176 ··· 238 232 239 233 pub struct ProfileStatsLoader(parakeet_index::Client); 240 234 impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader { 241 - #[instrument(name = "ProfileStatsLoader", skip_all)] 242 235 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> { 243 236 let stats_req = parakeet_index::GetStatsManyReq { 244 237 uris: keys.to_vec(), ··· 255 248 256 249 pub struct ProfileStateLoader(Pool<AsyncPgConnection>); 257 250 impl ProfileStateLoader { 258 - #[instrument(name = "ProfileStateLoader::get", skip_all)] 259 251 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> { 260 252 let mut conn = self.0.get().await.unwrap(); 261 253 ··· 267 259 }) 268 260 } 269 261 270 - #[instrument(name = "ProfileStateLoader::get_many", skip_all)] 271 262 pub async fn get_many( 272 263 &self, 273 264 did: &str, ··· 288 279 pub struct ListLoader(Pool<AsyncPgConnection>); 289 280 type ListLoaderRet = (models::List, i64); 290 281 impl BatchFn<String, ListLoaderRet> for ListLoader { 291 - #[instrument(name = "ListLoaderRet", skip_all)] 292 282 async fn load(&mut self, keys: &[String]) -> HashMap<String, ListLoaderRet> { 293 283 let mut conn = self.0.get().await.unwrap(); 294 284 ··· 320 310 321 311 pub struct ListStateLoader(Pool<AsyncPgConnection>); 322 312 impl ListStateLoader { 323 - #[instrument(name = "ListStateLoader::get", skip_all)] 324 313 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> { 325 314 let mut conn = self.0.get().await.unwrap(); 326 315 ··· 332 321 }) 333 322 } 334 323 335 - #[instrument(name = "ListStateLoader::get_many", skip_all)] 336 324 pub async fn get_many( 337 325 &self, 338 326 did: &str, ··· 350 338 } 351 339 } 352 340 353 - pub struct FeedGenLoader(Pool<AsyncPgConnection>); 341 + pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 354 342 impl BatchFn<String, models::FeedGen> for FeedGenLoader { 355 - #[instrument(name = "FeedGenLoader", skip_all)] 356 343 async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> { 357 344 let mut conn = self.0.get().await.unwrap(); 358 345 ··· 378 365 pub struct PostLoader(Pool<AsyncPgConnection>); 379 366 type PostLoaderRet = (models::Post, Option<models::Threadgate>); 380 367 impl BatchFn<String, PostLoaderRet> for PostLoader { 381 - #[instrument(name = "PostLoader", skip_all)] 382 368 async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> { 383 369 let mut conn = self.0.get().await.unwrap(); 384 370 ··· 409 395 410 396 pub struct PostStatsLoader(parakeet_index::Client); 411 397 impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader { 412 - #[instrument(name = "PostStatsLoader", skip_all)] 413 398 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> { 414 399 let stats_req = parakeet_index::GetStatsManyReq { 415 400 uris: keys.to_vec(), ··· 426 411 427 412 pub struct PostStateLoader(Pool<AsyncPgConnection>); 428 413 impl PostStateLoader { 429 - #[instrument(name = "PostStateLoader::get", skip_all)] 430 414 pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> { 431 415 let mut conn = self.0.get().await.unwrap(); 432 416 ··· 438 422 }) 439 423 } 440 424 441 - #[instrument(name = "PostStateLoader::get_many", skip_all)] 442 425 pub async fn get_many( 443 426 &self, 444 427 did: &str, ··· 466 449 RecordWithMedia(models::PostEmbedRecord, Box<EmbedLoaderRet>), 467 450 } 468 451 impl BatchFn<String, (EmbedLoaderRet, String)> for EmbedLoader { 469 - #[instrument(name = "EmbedLoader", skip_all)] 470 452 async fn load(&mut self, keys: &[String]) -> HashMap<String, (EmbedLoaderRet, String)> { 471 453 let mut conn = self.0.get().await.unwrap(); 472 454 ··· 549 531 pub struct StarterPackLoader(Pool<AsyncPgConnection>); 550 532 type StarterPackLoaderRet = models::StaterPack; 551 533 impl BatchFn<String, StarterPackLoaderRet> for StarterPackLoader { 552 - #[instrument(name = "StarterPackLoader", skip_all)] 553 534 async fn load(&mut self, keys: &[String]) -> HashMap<String, StarterPackLoaderRet> { 554 535 let mut conn = self.0.get().await.unwrap(); 555 536 ··· 572 553 } 573 554 } 574 555 575 - pub struct LabelServiceLoader(Pool<AsyncPgConnection>); 556 + pub struct LabelServiceLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 576 557 type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>); 577 558 impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader { 578 - #[instrument(name = "LabelServiceLoader", skip_all)] 579 559 async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> { 580 560 let mut conn = self.0.get().await.unwrap(); 581 561 ··· 614 594 // but it should live here anyway 615 595 pub struct LabelLoader(Pool<AsyncPgConnection>); 616 596 impl LabelLoader { 617 - #[instrument(name = "LabelLoader::load", skip_all)] 618 597 pub async fn load(&self, uri: &str, services: &[LabelConfigItem]) -> Vec<models::Label> { 619 598 let mut conn = self.0.get().await.unwrap(); 620 599 ··· 634 613 }) 635 614 } 636 615 637 - #[instrument(name = "LabelLoader::load_many", skip_all)] 638 616 pub async fn load_many( 639 617 &self, 640 618 uris: &[String], ··· 669 647 670 648 pub struct VerificationLoader(Pool<AsyncPgConnection>); 671 649 impl BatchFn<String, Vec<models::VerificationEntry>> for VerificationLoader { 672 - #[instrument(name = "VerificationLoader", skip_all)] 673 650 async fn load(&mut self, keys: &[String]) -> HashMap<String, Vec<models::VerificationEntry>> { 674 651 let mut conn = self.0.get().await.unwrap(); 675 652
+5 -14
parakeet/src/main.rs
··· 1 - use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; 2 1 use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; 3 2 use diesel_async::pooled_connection::deadpool::Pool; 4 3 use diesel_async::pooled_connection::AsyncDieselConnectionManager; ··· 15 14 mod config; 16 15 mod db; 17 16 mod hydration; 18 - mod instrumentation; 19 17 mod loaders; 20 18 mod xrpc; 21 19 ··· 33 31 34 32 #[tokio::main] 35 33 async fn main() -> eyre::Result<()> { 36 - let conf = config::load_config()?; 34 + tracing_subscriber::fmt::init(); 37 35 38 - instrumentation::init_instruments(&conf.instruments); 36 + let conf = config::load_config()?; 39 37 40 38 let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url); 41 39 let pool = Pool::builder(db_mgr).build()?; ··· 54 52 let redis_client = redis::Client::open(conf.redis_uri)?; 55 53 let redis_mp = redis_client.get_multiplexed_tokio_connection().await?; 56 54 57 - let index_client = parakeet_index::connect_with_otel(conf.index_uri) 58 - .await 59 - .map_err(|e| eyre::eyre!(e))?; 55 + let index_client = parakeet_index::Client::connect(conf.index_uri).await?; 60 56 61 57 let dataloaders = Arc::new(loaders::Dataloaders::new( 62 58 pool.clone(), ··· 83 79 84 80 let did_doc = did_web_doc(&conf.service); 85 81 86 - let mw = tower::ServiceBuilder::new() 87 - .option_layer(conf.instruments.otel_enable.then(OtelInResponseLayer::default)) 88 - .option_layer(conf.instruments.otel_enable.then(OtelAxumLayer::default)) 89 - .layer(TraceLayer::new_for_http()) 90 - .layer(cors); 91 - 92 82 let app = axum::Router::new() 93 83 .nest("/xrpc", xrpc::xrpc_routes()) 94 84 .route( 95 85 "/.well-known/did.json", 96 86 axum::routing::get(async || axum::Json(did_doc)), 97 87 ) 98 - .layer(mw) 88 + .layer(TraceLayer::new_for_http()) 89 + .layer(cors) 99 90 .with_state(GlobalState { 100 91 pool, 101 92 redis_mp,
+1 -1
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 36 36 rkey: None, 37 37 subject: &form.uri, 38 38 subject_cid: Some(form.cid), 39 - subject_type: parts[1], 39 + subject_type: &parts[1], 40 40 tags: vec![], 41 41 }; 42 42
+2 -6
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 24 24 use reqwest::Url; 25 25 use serde::{Deserialize, Serialize}; 26 26 use std::collections::HashMap; 27 - use tracing::instrument; 28 27 29 28 const FEEDGEN_SERVICE_ID: &str = "#bsky_fg"; 30 29 ··· 161 160 162 161 #[derive(Debug, Default, Eq, PartialEq, Deserialize)] 163 162 #[serde(rename_all = "snake_case")] 164 - #[allow(clippy::enum_variant_names)] 165 163 pub enum GetAuthorFeedFilter { 166 164 #[default] 167 165 PostsWithReplies, ··· 200 198 if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? { 201 199 if psr.blocked.unwrap_or_default() { 202 200 // they block us 203 - return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)); 201 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)) 204 202 } else if psr.blocking.is_some() { 205 203 // we block them 206 - return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)); 204 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)) 207 205 } 208 206 } 209 207 } ··· 614 612 .or(schema::posts::embed_subtype.eq_any(filter)) 615 613 } 616 614 617 - #[instrument(skip_all)] 618 615 async fn get_feed_skeleton( 619 616 feed: &str, 620 617 service: &str, ··· 656 653 } 657 654 } 658 655 659 - #[instrument(skip_all)] 660 656 async fn get_skeleton_repost_data( 661 657 conn: &mut AsyncPgConnection, 662 658 reposts: Vec<String>,
+4 -1
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
··· 253 253 .hydrate_posts(reply_uris) 254 254 .await 255 255 .into_iter() 256 - .filter(|(_, post)| matches!(&post.author.viewer, Some(viewer) if viewer.blocked_by || viewer.blocking.is_some())) 256 + .filter(|(_, post)| match &post.author.viewer { 257 + Some(viewer) if viewer.blocked_by || viewer.blocking.is_some() => false, 258 + _ => true, 259 + }) 257 260 .map(|(uri, post)| { 258 261 let post = ThreadItemPost { 259 262 post,
-3
parakeet/src/xrpc/jwt.rs
··· 4 4 use std::collections::HashMap; 5 5 use std::sync::{Arc, LazyLock}; 6 6 use tokio::sync::RwLock; 7 - use tracing::instrument; 8 7 9 8 static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[])); 10 9 static NO_VERIFY: LazyLock<Validation> = LazyLock::new(|| { ··· 39 38 } 40 39 } 41 40 42 - #[instrument(skip_all)] 43 41 pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> { 44 42 // first we need to decode without verifying, to get iss. 45 43 let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?; ··· 58 56 self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud) 59 57 } 60 58 61 - #[instrument(skip_all)] 62 59 async fn resolve_key(&self, did: &str) -> Option<String> { 63 60 tracing::trace!("resolving multikey for {did}"); 64 61 let did_doc = self.resolver.resolve_did(did).await.ok()??;
+2 -24
parakeet-index/Cargo.toml
··· 10 10 [dependencies] 11 11 tonic = "0.13.0" 12 12 prost = "0.13.5" 13 - tonic-tracing-opentelemetry = { version = "0.32", optional = true } 14 - tower = { version = "0.5", optional = true } 15 13 16 14 eyre = { version = "0.6.12", optional = true } 17 15 figment = { version = "0.10.19", features = ["env", "toml"], optional = true } 18 16 itertools = { version = "0.14.0", optional = true } 19 - opentelemetry = { version = "0.31.0", optional = true } 20 - opentelemetry-otlp = { version = "0.31.0", features = ["reqwest-rustls"], optional = true } 21 - opentelemetry_sdk = { version = "0.31.0", optional = true } 22 17 rocksdb = { version = "0.23", default-features = false, features = ["lz4", "bindgen-runtime"], optional = true } 23 18 serde = { version = "1.0.217", features = ["derive"], optional = true } 24 19 tokio = { version = "1.42.0", features = ["full"], optional = true } 25 20 tonic-health = { version = "0.13.0", optional = true } 26 21 tracing = { version = "0.1.40", optional = true } 27 - tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true } 28 - tracing-opentelemetry = { version = "0.32", optional = true } 22 + tracing-subscriber = { version = "0.3.18", optional = true } 29 23 30 24 [build-dependencies] 31 25 tonic-build = "0.13.0" 32 26 33 27 [features] 34 - otel = ["dep:tonic-tracing-opentelemetry", "dep:tower"] 35 - server = [ 36 - "dep:eyre", 37 - "dep:figment", 38 - "dep:itertools", 39 - "dep:opentelemetry", 40 - "dep:opentelemetry-otlp", 41 - "dep:opentelemetry_sdk", 42 - "dep:rocksdb", 43 - "dep:serde", 44 - "dep:tokio", 45 - "dep:tonic-health", 46 - "otel", 47 - "dep:tracing", 48 - "dep:tracing-subscriber", 49 - "dep:tracing-opentelemetry" 50 - ] 28 + server = ["dep:eyre", "dep:figment", "dep:itertools", "dep:rocksdb", "dep:serde", "dep:tokio", "dep:tonic-health", "dep:tracing", "dep:tracing-subscriber"]
+1 -20
parakeet-index/src/lib.rs
··· 1 - use tonic::transport::Channel; 2 - 3 1 #[allow(clippy::all)] 4 2 pub mod index { 5 3 tonic::include_proto!("parakeet"); 6 4 } 7 5 8 6 pub use index::*; 9 - #[cfg(not(feature = "otel"))] 10 - pub type Client = index_client::IndexClient<Channel>; 11 - #[cfg(feature = "otel")] 12 - pub type Client = index_client::IndexClient< 13 - tonic_tracing_opentelemetry::middleware::client::OtelGrpcService<Channel>, 14 - >; 7 + pub type Client = index_client::IndexClient<tonic::transport::Channel>; 15 8 16 9 #[cfg(feature = "server")] 17 10 pub mod server; 18 - 19 - #[cfg(feature = "otel")] 20 - pub async fn connect_with_otel( 21 - uri: String, 22 - ) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> { 23 - let channel = Channel::from_shared(uri)?.connect().await?; 24 - let channel = tower::ServiceBuilder::new() 25 - .layer(tonic_tracing_opentelemetry::middleware::client::OtelGrpcLayer) 26 - .service(channel); 27 - 28 - Ok(index_client::IndexClient::new(channel)) 29 - }
+3 -9
parakeet-index/src/main.rs
··· 1 1 use parakeet_index::index_server::IndexServer; 2 2 use parakeet_index::server::service::Service; 3 - use parakeet_index::server::{GlobalState, config, instrumentation}; 3 + use parakeet_index::server::{GlobalState, config}; 4 4 use std::sync::Arc; 5 5 use tonic::transport::Server; 6 - use tonic_tracing_opentelemetry::middleware::server::OtelGrpcLayer; 7 6 8 7 #[tokio::main] 9 8 async fn main() -> eyre::Result<()> { 10 - let conf = config::load_config()?; 9 + tracing_subscriber::fmt::init(); 11 10 12 - instrumentation::init_instruments(&conf.instruments); 11 + let conf = config::load_config()?; 13 12 14 13 let db_root = conf.index_db_path.parse()?; 15 14 let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port); ··· 19 18 reporter.set_serving::<IndexServer<Service>>().await; 20 19 21 20 let service = Service::new(state.clone()); 22 - 23 - let mw = tower::ServiceBuilder::new() 24 - .option_layer(conf.instruments.otel_enable.then(OtelGrpcLayer::default)); 25 - 26 21 Server::builder() 27 - .layer(mw) 28 22 .add_service(health_service) 29 23 .add_service(IndexServer::new(service)) 30 24 .serve(addr)
-10
parakeet-index/src/server/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 - #[serde(flatten)] 17 - pub instruments: ConfigInstruments, 18 16 pub database_url: String, 19 17 pub index_db_path: String, 20 18 #[serde(default)] 21 19 pub server: ConfigServer, 22 - } 23 - 24 - #[derive(Debug, Deserialize)] 25 - pub struct ConfigInstruments { 26 - #[serde(default)] 27 - pub otel_enable: bool, 28 - #[serde(default)] 29 - pub log_json: bool, 30 20 } 31 21 32 22 #[derive(Debug, Deserialize)]
-57
parakeet-index/src/server/instrumentation.rs
··· 1 - use opentelemetry::trace::TracerProvider; 2 - use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; 3 - use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider}; 4 - use tracing::Subscriber; 5 - use tracing_opentelemetry::OpenTelemetryLayer; 6 - use tracing_subscriber::filter::Filtered; 7 - use tracing_subscriber::layer::SubscriberExt; 8 - use tracing_subscriber::registry::LookupSpan; 9 - use tracing_subscriber::util::SubscriberInitExt; 10 - use tracing_subscriber::{EnvFilter, Layer}; 11 - 12 - pub fn init_instruments(cfg: &super::config::ConfigInstruments) { 13 - let otel_layer = cfg.otel_enable.then(init_otel); 14 - let log_layer = init_log(cfg.log_json); 15 - 16 - tracing_subscriber::registry() 17 - .with(log_layer) 18 - .with(otel_layer) 19 - .init(); 20 - } 21 - 22 - fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S> 23 - where 24 - S: Subscriber + for<'span> LookupSpan<'span>, 25 - { 26 - let span_exporter = SpanExporter::builder() 27 - .with_http() 28 - .with_protocol(Protocol::HttpBinary) 29 - .build() 30 - .unwrap(); 31 - 32 - let tracer_provider = SdkTracerProvider::builder() 33 - .with_batch_exporter(span_exporter) 34 - .with_sampler(Sampler::AlwaysOn) 35 - .build(); 36 - 37 - opentelemetry::global::set_tracer_provider(tracer_provider.clone()); 38 - 39 - let tracer = tracer_provider.tracer("parakeet"); 40 - let otel_filter = EnvFilter::new("info,otel::tracing=trace"); 41 - 42 - OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 43 - } 44 - 45 - fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 46 - where 47 - S: Subscriber + for<'span> LookupSpan<'span>, 48 - { 49 - let stdout_filter = 50 - EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 51 - 52 - match json { 53 - true => tracing_subscriber::fmt::layer().json().boxed(), 54 - false => tracing_subscriber::fmt::layer().boxed(), 55 - } 56 - .with_filter(stdout_filter) 57 - }
-1
parakeet-index/src/server/mod.rs
··· 2 2 3 3 pub mod config; 4 4 pub mod db; 5 - pub mod instrumentation; 6 5 pub mod service; 7 6 mod utils; 8 7