Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

Compare changes

Choose any two refs to compare.

+364 -12
Cargo.lock
··· 286 286 ] 287 287 288 288 [[package]] 289 + name = "axum-tracing-opentelemetry" 290 + version = "0.32.1" 291 + source = "registry+https://github.com/rust-lang/crates.io-index" 292 + checksum = "328c8ddd5ca871b2a5acb00be0b4f103aa62f5d6b6db4071ccf3b12b0629e7c1" 293 + dependencies = [ 294 + "axum", 295 + "futures-core", 296 + "futures-util", 297 + "http", 298 + "opentelemetry", 299 + "opentelemetry-semantic-conventions", 300 + "pin-project-lite", 301 + "tower", 302 + "tracing", 303 + "tracing-opentelemetry", 304 + "tracing-opentelemetry-instrumentation-sdk", 305 + ] 306 + 307 + [[package]] 289 308 name = "backtrace" 290 309 version = "0.3.74" 291 310 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 347 366 "proc-macro2", 348 367 "quote", 349 368 "regex", 350 - "rustc-hash", 369 + "rustc-hash 1.1.0", 351 370 "shlex", 352 371 "syn", 353 372 "which", ··· 465 484 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" 466 485 467 486 [[package]] 487 + name = "cfg_aliases" 488 + version = "0.2.1" 489 + source = "registry+https://github.com/rust-lang/crates.io-index" 490 + checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" 491 + 492 + [[package]] 468 493 name = "chrono" 469 494 version = "0.4.41" 470 495 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1370 1395 checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" 1371 1396 dependencies = [ 1372 1397 "cfg-if", 1398 + "js-sys", 1373 1399 "libc", 1374 1400 "r-efi", 1375 1401 "wasi 0.14.2+wasi-0.2.4", 1402 + "wasm-bindgen", 1376 1403 ] 1377 1404 1378 1405 [[package]] ··· 2169 2196 ] 2170 2197 2171 2198 [[package]] 2199 + name = "lru-slab" 2200 + version = "0.1.2" 2201 + source = "registry+https://github.com/rust-lang/crates.io-index" 2202 + checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" 2203 + 2204 + [[package]] 2172 2205 name = "lz4-sys" 2173 2206 version = "1.11.1+lz4-1.10.0" 2174 2207 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2185 2218 checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" 2186 2219 2187 2220 [[package]] 2221 + name = "matchers" 2222 + version = "0.1.0" 2223 + source = "registry+https://github.com/rust-lang/crates.io-index" 2224 + checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" 2225 + dependencies = [ 2226 + "regex-automata 0.1.10", 2227 + ] 2228 + 2229 + [[package]] 2188 2230 name = "matchit" 2189 2231 version = "0.8.4" 2190 2232 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2513 2555 ] 2514 2556 2515 2557 [[package]] 2558 + name = "opentelemetry" 2559 + version = "0.31.0" 2560 + source = "registry+https://github.com/rust-lang/crates.io-index" 2561 + checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" 2562 + dependencies = [ 2563 + "futures-core", 2564 + "futures-sink", 2565 + "js-sys", 2566 + "pin-project-lite", 2567 + "thiserror 2.0.12", 2568 + "tracing", 2569 + ] 2570 + 2571 + [[package]] 2572 + name = "opentelemetry-http" 2573 + version = "0.31.0" 2574 + source = "registry+https://github.com/rust-lang/crates.io-index" 2575 + checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" 2576 + dependencies = [ 2577 + "async-trait", 2578 + "bytes", 2579 + "http", 2580 + "opentelemetry", 2581 + "reqwest", 2582 + ] 2583 + 2584 + [[package]] 2585 + name = "opentelemetry-otlp" 2586 + version = "0.31.0" 2587 + source = "registry+https://github.com/rust-lang/crates.io-index" 2588 + checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" 2589 + dependencies = [ 2590 + "http", 2591 + "opentelemetry", 2592 + "opentelemetry-http", 2593 + "opentelemetry-proto", 2594 + "opentelemetry_sdk", 2595 + "prost 0.14.1", 2596 + "reqwest", 2597 + "thiserror 2.0.12", 2598 + "tracing", 2599 + ] 2600 + 2601 + [[package]] 2602 + name = "opentelemetry-proto" 2603 + version = "0.31.0" 2604 + source = "registry+https://github.com/rust-lang/crates.io-index" 2605 + checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" 2606 + dependencies = [ 2607 + "opentelemetry", 2608 + "opentelemetry_sdk", 2609 + "prost 0.14.1", 2610 + "tonic 0.14.2", 2611 + "tonic-prost", 2612 + ] 2613 + 2614 + [[package]] 2615 + name = "opentelemetry-semantic-conventions" 2616 + version = "0.31.0" 2617 + source = "registry+https://github.com/rust-lang/crates.io-index" 2618 + checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" 2619 + 2620 + [[package]] 2621 + name = "opentelemetry_sdk" 2622 + version = "0.31.0" 2623 + source = "registry+https://github.com/rust-lang/crates.io-index" 2624 + checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" 2625 + dependencies = [ 2626 + "futures-channel", 2627 + "futures-executor", 2628 + "futures-util", 2629 + "opentelemetry", 2630 + "percent-encoding", 2631 + "rand 0.9.1", 2632 + "thiserror 2.0.12", 2633 + ] 2634 + 2635 + [[package]] 2516 2636 name = "overload" 2517 2637 version = "0.1.1" 2518 2638 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2549 2669 "async-recursion", 2550 2670 "axum", 2551 2671 "axum-extra", 2672 + "axum-tracing-opentelemetry", 2552 2673 "base64 0.22.1", 2553 2674 "chrono", 2554 2675 "dataloader", ··· 2563 2684 "jsonwebtoken", 2564 2685 "lexica", 2565 2686 "multibase", 2687 + "opentelemetry", 2688 + "opentelemetry-otlp", 2689 + "opentelemetry_sdk", 2566 2690 "parakeet-db", 2567 2691 "parakeet-index", 2568 2692 "redis", ··· 2571 2695 "serde_ipld_dagcbor", 2572 2696 "serde_json", 2573 2697 "tokio", 2698 + "tower", 2574 2699 "tower-http", 2575 2700 "tracing", 2701 + "tracing-opentelemetry", 2576 2702 "tracing-subscriber", 2577 2703 ] 2578 2704 ··· 2594 2720 "eyre", 2595 2721 "figment", 2596 2722 "itertools 0.14.0", 2597 - "prost", 2723 + "opentelemetry", 2724 + "opentelemetry-otlp", 2725 + "opentelemetry_sdk", 2726 + "prost 0.13.5", 2598 2727 "rocksdb", 2599 2728 "serde", 2600 2729 "tokio", 2601 - "tonic", 2730 + "tonic 0.13.1", 2602 2731 "tonic-build", 2603 2732 "tonic-health", 2733 + "tonic-tracing-opentelemetry", 2734 + "tower", 2604 2735 "tracing", 2736 + "tracing-opentelemetry", 2605 2737 "tracing-subscriber", 2606 2738 ] 2607 2739 ··· 2907 3039 checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 2908 3040 dependencies = [ 2909 3041 "bytes", 2910 - "prost-derive", 3042 + "prost-derive 0.13.5", 3043 + ] 3044 + 3045 + [[package]] 3046 + name = "prost" 3047 + version = "0.14.1" 3048 + source = "registry+https://github.com/rust-lang/crates.io-index" 3049 + checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" 3050 + dependencies = [ 3051 + "bytes", 3052 + "prost-derive 0.14.1", 2911 3053 ] 2912 3054 2913 3055 [[package]] ··· 2923 3065 "once_cell", 2924 3066 "petgraph", 2925 3067 "prettyplease", 2926 - "prost", 3068 + "prost 0.13.5", 2927 3069 "prost-types", 2928 3070 "regex", 2929 3071 "syn", ··· 2944 3086 ] 2945 3087 2946 3088 [[package]] 3089 + name = "prost-derive" 3090 + version = "0.14.1" 3091 + source = "registry+https://github.com/rust-lang/crates.io-index" 3092 + checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" 3093 + dependencies = [ 3094 + "anyhow", 3095 + "itertools 0.14.0", 3096 + "proc-macro2", 3097 + "quote", 3098 + "syn", 3099 + ] 3100 + 3101 + [[package]] 2947 3102 name = "prost-types" 2948 3103 version = "0.13.5" 2949 3104 source = "registry+https://github.com/rust-lang/crates.io-index" 2950 3105 checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" 2951 3106 dependencies = [ 2952 - "prost", 3107 + "prost 0.13.5", 2953 3108 ] 2954 3109 2955 3110 [[package]] ··· 2974 3129 checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" 2975 3130 2976 3131 [[package]] 3132 + name = "quinn" 3133 + version = "0.11.9" 3134 + source = "registry+https://github.com/rust-lang/crates.io-index" 3135 + checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" 3136 + dependencies = [ 3137 + "bytes", 3138 + "cfg_aliases", 3139 + "pin-project-lite", 3140 + "quinn-proto", 3141 + "quinn-udp", 3142 + "rustc-hash 2.1.1", 3143 + "rustls", 3144 + "socket2 0.6.0", 3145 + "thiserror 2.0.12", 3146 + "tokio", 3147 + "tracing", 3148 + "web-time", 3149 + ] 3150 + 3151 + [[package]] 3152 + name = "quinn-proto" 3153 + version = "0.11.13" 3154 + source = "registry+https://github.com/rust-lang/crates.io-index" 3155 + checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" 3156 + dependencies = [ 3157 + "bytes", 3158 + "getrandom 0.3.3", 3159 + "lru-slab", 3160 + "rand 0.9.1", 3161 + "ring", 3162 + "rustc-hash 2.1.1", 3163 + "rustls", 3164 + "rustls-pki-types", 3165 + "slab", 3166 + "thiserror 2.0.12", 3167 + "tinyvec", 3168 + "tracing", 3169 + "web-time", 3170 + ] 3171 + 3172 + [[package]] 3173 + name = "quinn-udp" 3174 + version = "0.5.14" 3175 + source = "registry+https://github.com/rust-lang/crates.io-index" 3176 + checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" 3177 + dependencies = [ 3178 + "cfg_aliases", 3179 + "libc", 3180 + "once_cell", 3181 + "socket2 0.6.0", 3182 + "tracing", 3183 + "windows-sys 0.59.0", 3184 + ] 3185 + 3186 + [[package]] 2977 3187 name = "quote" 2978 3188 version = "1.0.38" 2979 3189 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3135 3345 dependencies = [ 3136 3346 "aho-corasick", 3137 3347 "memchr", 3138 - "regex-automata", 3139 - "regex-syntax", 3348 + "regex-automata 0.4.9", 3349 + "regex-syntax 0.8.5", 3350 + ] 3351 + 3352 + [[package]] 3353 + name = "regex-automata" 3354 + version = "0.1.10" 3355 + source = "registry+https://github.com/rust-lang/crates.io-index" 3356 + checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" 3357 + dependencies = [ 3358 + "regex-syntax 0.6.29", 3140 3359 ] 3141 3360 3142 3361 [[package]] ··· 3147 3366 dependencies = [ 3148 3367 "aho-corasick", 3149 3368 "memchr", 3150 - "regex-syntax", 3369 + "regex-syntax 0.8.5", 3151 3370 ] 3152 3371 3153 3372 [[package]] 3154 3373 name = "regex-syntax" 3374 + version = "0.6.29" 3375 + source = "registry+https://github.com/rust-lang/crates.io-index" 3376 + checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" 3377 + 3378 + [[package]] 3379 + name = "regex-syntax" 3155 3380 version = "0.8.5" 3156 3381 source = "registry+https://github.com/rust-lang/crates.io-index" 3157 3382 checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" ··· 3166 3391 "base64 0.22.1", 3167 3392 "bytes", 3168 3393 "encoding_rs", 3394 + "futures-channel", 3169 3395 "futures-core", 3170 3396 "futures-util", 3171 3397 "h2", ··· 3184 3410 "once_cell", 3185 3411 "percent-encoding", 3186 3412 "pin-project-lite", 3413 + "quinn", 3414 + "rustls", 3415 + "rustls-native-certs", 3187 3416 "rustls-pemfile", 3417 + "rustls-pki-types", 3188 3418 "serde", 3189 3419 "serde_json", 3190 3420 "serde_urlencoded", ··· 3192 3422 "system-configuration", 3193 3423 "tokio", 3194 3424 "tokio-native-tls", 3425 + "tokio-rustls", 3195 3426 "tokio-util", 3196 3427 "tower", 3197 3428 "tower-service", ··· 3281 3512 checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" 3282 3513 3283 3514 [[package]] 3515 + name = "rustc-hash" 3516 + version = "2.1.1" 3517 + source = "registry+https://github.com/rust-lang/crates.io-index" 3518 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 3519 + 3520 + [[package]] 3284 3521 name = "rustc_version" 3285 3522 version = "0.4.1" 3286 3523 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3310 3547 dependencies = [ 3311 3548 "aws-lc-rs", 3312 3549 "once_cell", 3550 + "ring", 3313 3551 "rustls-pki-types", 3314 3552 "rustls-webpki", 3315 3553 "subtle", ··· 3342 3580 version = "1.11.0" 3343 3581 source = "registry+https://github.com/rust-lang/crates.io-index" 3344 3582 checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" 3583 + dependencies = [ 3584 + "web-time", 3585 + ] 3345 3586 3346 3587 [[package]] 3347 3588 name = "rustls-webpki" ··· 4185 4426 "hyper-util", 4186 4427 "percent-encoding", 4187 4428 "pin-project", 4188 - "prost", 4429 + "prost 0.13.5", 4189 4430 "socket2 0.5.8", 4190 4431 "tokio", 4191 4432 "tokio-stream", 4192 4433 "tower", 4434 + "tower-layer", 4435 + "tower-service", 4436 + "tracing", 4437 + ] 4438 + 4439 + [[package]] 4440 + name = "tonic" 4441 + version = "0.14.2" 4442 + source = "registry+https://github.com/rust-lang/crates.io-index" 4443 + checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" 4444 + dependencies = [ 4445 + "async-trait", 4446 + "base64 0.22.1", 4447 + "bytes", 4448 + "http", 4449 + "http-body", 4450 + "http-body-util", 4451 + "percent-encoding", 4452 + "pin-project", 4453 + "sync_wrapper", 4454 + "tokio-stream", 4193 4455 "tower-layer", 4194 4456 "tower-service", 4195 4457 "tracing", ··· 4215 4477 source = "registry+https://github.com/rust-lang/crates.io-index" 4216 4478 checksum = "cb87334d340313fefa513b6e60794d44a86d5f039b523229c99c323e4e19ca4b" 4217 4479 dependencies = [ 4218 - "prost", 4480 + "prost 0.13.5", 4219 4481 "tokio", 4220 4482 "tokio-stream", 4221 - "tonic", 4483 + "tonic 0.13.1", 4484 + ] 4485 + 4486 + [[package]] 4487 + name = "tonic-prost" 4488 + version = "0.14.2" 4489 + source = "registry+https://github.com/rust-lang/crates.io-index" 4490 + checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" 4491 + dependencies = [ 4492 + "bytes", 4493 + "prost 0.14.1", 4494 + "tonic 0.14.2", 4495 + ] 4496 + 4497 + [[package]] 4498 + name = "tonic-tracing-opentelemetry" 4499 + version = "0.32.0" 4500 + source = "registry+https://github.com/rust-lang/crates.io-index" 4501 + checksum = "31f57ac46b32b08989476b498239364c300b09d75928c1fa2e46cb489a41c8e3" 4502 + dependencies = [ 4503 + "futures-core", 4504 + "futures-util", 4505 + "http", 4506 + "http-body", 4507 + "hyper", 4508 + "opentelemetry", 4509 + "pin-project-lite", 4510 + "tonic 0.14.2", 4511 + "tower", 4512 + "tracing", 4513 + "tracing-opentelemetry", 4514 + "tracing-opentelemetry-instrumentation-sdk", 4222 4515 ] 4223 4516 4224 4517 [[package]] ··· 4313 4606 ] 4314 4607 4315 4608 [[package]] 4609 + name = "tracing-opentelemetry" 4610 + version = "0.32.0" 4611 + source = "registry+https://github.com/rust-lang/crates.io-index" 4612 + checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" 4613 + dependencies = [ 4614 + "js-sys", 4615 + "opentelemetry", 4616 + "opentelemetry_sdk", 4617 + "rustversion", 4618 + "smallvec", 4619 + "thiserror 2.0.12", 4620 + "tracing", 4621 + "tracing-core", 4622 + "tracing-log", 4623 + "tracing-subscriber", 4624 + "web-time", 4625 + ] 4626 + 4627 + [[package]] 4628 + name = "tracing-opentelemetry-instrumentation-sdk" 4629 + version = "0.32.1" 4630 + source = "registry+https://github.com/rust-lang/crates.io-index" 4631 + checksum = "7a1a4dcfb798af2cef9e47c30a14e13c108b4b40e057120401b2025ec622c416" 4632 + dependencies = [ 4633 + "http", 4634 + "opentelemetry", 4635 + "opentelemetry-semantic-conventions", 4636 + "tracing", 4637 + "tracing-opentelemetry", 4638 + ] 4639 + 4640 + [[package]] 4641 + name = "tracing-serde" 4642 + version = "0.2.0" 4643 + source = "registry+https://github.com/rust-lang/crates.io-index" 4644 + checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" 4645 + dependencies = [ 4646 + "serde", 4647 + "tracing-core", 4648 + ] 4649 + 4650 + [[package]] 4316 4651 name = "tracing-subscriber" 4317 4652 version = "0.3.19" 4318 4653 source = "registry+https://github.com/rust-lang/crates.io-index" 4319 4654 checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" 4320 4655 dependencies = [ 4656 + "matchers", 4321 4657 "nu-ansi-term", 4658 + "once_cell", 4659 + "regex", 4660 + "serde", 4661 + "serde_json", 4322 4662 "sharded-slab", 4323 4663 "smallvec", 4324 4664 "thread_local", 4665 + "tracing", 4325 4666 "tracing-core", 4326 4667 "tracing-log", 4668 + "tracing-serde", 4327 4669 ] 4328 4670 4329 4671 [[package]] ··· 4592 4934 version = "0.3.77" 4593 4935 source = "registry+https://github.com/rust-lang/crates.io-index" 4594 4936 checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" 4937 + dependencies = [ 4938 + "js-sys", 4939 + "wasm-bindgen", 4940 + ] 4941 + 4942 + [[package]] 4943 + name = "web-time" 4944 + version = "1.1.0" 4945 + source = "registry+https://github.com/rust-lang/crates.io-index" 4946 + checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" 4595 4947 dependencies = [ 4596 4948 "js-sys", 4597 4949 "wasm-bindgen",
+1 -1
consumer/Cargo.toml
··· 36 36 tokio-tungstenite = { version = "0.26.1", features = ["native-tls"] } 37 37 tokio-util = { version = "0.7.14", features = ["io", "rt"] } 38 38 tracing = "0.1.40" 39 - tracing-subscriber = "0.3.18" 39 + tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
+3 -3
consumer/src/backfill/downloader.rs
··· 84 84 85 85 // has the repo already been downloaded? 86 86 if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() { 87 - tracing::warn!("skipping duplicate repo {did}"); 87 + tracing::info!("skipping duplicate repo {did}"); 88 88 continue; 89 89 } 90 90 ··· 92 92 match db::actor_get_statuses(&mut conn, &did).await { 93 93 Ok(Some((_, state))) => { 94 94 if state == ActorSyncState::Synced || state == ActorSyncState::Processing { 95 - tracing::warn!("skipping duplicate repo {did}"); 95 + tracing::info!("skipping duplicate repo {did}"); 96 96 continue; 97 97 } 98 98 } ··· 206 206 let _ = rc.zadd(BF_REM_KEY, &pds, rem).await; 207 207 let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await; 208 208 } 209 - Ok(_) => tracing::warn!(pds, "got response with no ratelimit headers."), 209 + Ok(_) => tracing::debug!(pds, "got response with no ratelimit headers."), 210 210 Err(e) => { 211 211 tracing::error!(pds, did, "failed to download repo: {e}"); 212 212 continue;
+4 -5
consumer/src/backfill/repo.rs
··· 53 53 54 54 match block { 55 55 CarEntry::Commit(_) => { 56 - tracing::warn!("got commit entry that was not in root") 56 + tracing::debug!("got commit entry that was not in root") 57 57 } 58 58 CarEntry::Record(CarRecordEntry::Known(record)) => { 59 59 if let Some(path) = mst_nodes.remove(&cid) { ··· 96 96 } 97 97 } 98 98 99 - let commit = commit.unwrap(); 99 + let Some(commit) = commit else { 100 + eyre::bail!("repo contained no commit?"); 101 + }; 100 102 101 103 Ok((commit, deltas, copies)) 102 104 } ··· 169 171 } 170 172 RecordTypes::AppBskyFeedThreadgate(record) => { 171 173 if !at_uri_is_by(&record.post, did) { 172 - tracing::warn!("tried to create a threadgate on a post we don't control!"); 173 174 return Ok(()); 174 175 } 175 176 ··· 194 195 RecordTypes::AppBskyGraphListItem(rec) => { 195 196 let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>(); 196 197 if did != split_aturi[2] { 197 - // it's also probably a bad idea to log *all* the attempts to do this... 198 - tracing::warn!("tried to create a listitem on a list we don't control!"); 199 198 return Ok(()); 200 199 } 201 200
+8
consumer/src/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 + #[serde(flatten)] 17 + pub instruments: ConfigInstruments, 16 18 pub index_uri: String, 17 19 pub database: deadpool_postgres::Config, 18 20 pub redis_uri: String, ··· 27 29 pub indexer: Option<IndexerConfig>, 28 30 /// Configuration items specific to backfill 29 31 pub backfill: Option<BackfillConfig>, 32 + } 33 + 34 + #[derive(Debug, Deserialize)] 35 + pub struct ConfigInstruments { 36 + #[serde(default)] 37 + pub log_json: bool, 30 38 } 31 39 32 40 #[derive(Debug, Deserialize)]
+3 -3
consumer/src/db/actor.rs
··· 69 69 ) 70 70 .await?; 71 71 72 - Ok(res.map(|v| (v.get(0), v.get(1)))) 72 + res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 73 73 } 74 74 75 75 pub async fn actor_get_repo_status<C: GenericClient>( ··· 83 83 ) 84 84 .await?; 85 85 86 - Ok(res.map(|v| (v.get(0), v.get(1)))) 86 + res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 87 87 } 88 88 89 89 pub async fn actor_get_statuses<C: GenericClient>( ··· 97 97 ) 98 98 .await?; 99 99 100 - Ok(res.map(|v| (v.get(0), v.get(1)))) 100 + res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 101 101 }
+10 -9
consumer/src/db/backfill.rs
··· 51 51 ) 52 52 .await?; 53 53 54 - Ok(res 55 - .into_iter() 56 - .map(|row| BackfillRow { 57 - repo: row.get(0), 58 - repo_ver: row.get(1), 59 - cid: row.get(2), 60 - data: row.get(3), 61 - indexed_at: row.get(4), 54 + res.into_iter() 55 + .map(|row| { 56 + Ok(BackfillRow { 57 + repo: row.try_get(0)?, 58 + repo_ver: row.try_get(1)?, 59 + cid: row.try_get(2)?, 60 + data: row.try_get(3)?, 61 + indexed_at: row.try_get(4)?, 62 + }) 62 63 }) 63 - .collect()) 64 + .collect() 64 65 } 65 66 66 67 pub async fn backfill_delete_rows<C: GenericClient>(conn: &mut C, repo: &str) -> PgExecResult {
+1 -1
consumer/src/db/copy.rs
··· 205 205 ) 206 206 .await? 207 207 .into_iter() 208 - .map(|v| (v.get(0), v.get(1), v.get(2))).collect(); 208 + .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).collect::<Result<_, _>>()?; 209 209 210 210 for (root, post, created_at) in threadgated { 211 211 match super::post_enforce_threadgate(conn, &root, did, created_at, true).await {
+18 -13
consumer/src/db/gates.rs
··· 47 47 &[&root_author, &post_author], 48 48 ) 49 49 .await? 50 - .map(|v| (v.get(0), v.get(1))); 50 + .map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()?; 51 51 52 52 if let Some((following, followed)) = profile_state { 53 53 if allow.contains(THREADGATE_RULE_FOLLOWER) && followed { ··· 65 65 let mentions: Vec<String> = conn 66 66 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 67 67 .await? 68 - .map(|r| r.get(0)) 68 + .and_then(|r| r.try_get::<_, Option<_>>(0).transpose()) 69 + .transpose()? 69 70 .unwrap_or_default(); 70 71 71 72 if mentions.contains(&post_author.to_owned()) { ··· 84 85 &[&allow_lists, &post_author], 85 86 ) 86 87 .await? 87 - .get(0); 88 + .try_get(0)?; 88 89 if count != 0 { 89 90 return Ok(false); 90 91 } ··· 136 137 ) 137 138 .await? 138 139 .into_iter() 139 - .map(|row| row.get(0)) 140 - .collect(); 140 + .map(|row| row.try_get(0)) 141 + .collect::<Result<_, _>>()?; 141 142 142 143 // this will be empty if there are no replies. 143 144 if dids.is_empty() { ··· 152 153 }) 153 154 .collect::<Vec<_>>(); 154 155 155 - let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str())); 156 + let allow: HashSet<_> = HashSet::from_iter(allow.iter().map(|v| v.as_str())); 156 157 157 158 if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() { 158 159 let current_dids: Vec<_> = dids.iter().collect(); ··· 160 161 let res = conn.query( 161 162 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", 162 163 &[&root_author, &current_dids] 163 - ).await?; 164 + ).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<HashSet<_>, _>>()?; 164 165 165 - dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 166 + dids = &dids - &res; 166 167 } 167 168 168 169 if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { ··· 171 172 let res = conn.query( 172 173 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", 173 174 &[&root_author, &current_dids] 174 - ).await?; 175 + ).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<_, _>>()?; 175 176 176 - dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 177 + dids = &dids - &res; 177 178 } 178 179 179 180 if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { 180 181 let mentions: Vec<String> = conn 181 182 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 182 183 .await? 183 - .map(|r| r.get(0)) 184 + .and_then(|r| r.try_get::<_, Option<_>>(0).transpose()) 185 + .transpose()? 184 186 .unwrap_or_default(); 185 187 186 188 dids = &dids - &HashSet::from_iter(mentions); ··· 194 196 "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", 195 197 &[&allowed_lists, &current_dids], 196 198 ) 197 - .await?; 199 + .await? 200 + .into_iter() 201 + .map(|row| row.try_get(0)) 202 + .collect::<Result<_, _>>()?; 198 203 199 - dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 204 + dids = &dids - &res; 200 205 } 201 206 202 207 let dids = dids.into_iter().collect::<Vec<_>>();
+17 -20
consumer/src/db/record.rs
··· 127 127 ], 128 128 ) 129 129 .await 130 - .map(|r| r.get::<_, i32>(0) == 0) 130 + .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 131 131 } 132 132 133 133 pub async fn feedgen_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 159 159 ) 160 160 .await?; 161 161 162 - Ok(res.map(|v| v.get(0))) 162 + res.map(|v| v.try_get(0)).transpose() 163 163 } 164 164 165 165 pub async fn labeler_upsert<C: GenericClient>( ··· 224 224 ) 225 225 .await?; 226 226 227 - Ok(res.map(|v| v.get(0))) 227 + res.map(|v| v.try_get(0)).transpose() 228 228 } 229 229 230 230 pub async fn list_upsert<C: GenericClient>( ··· 255 255 ], 256 256 ) 257 257 .await 258 - .map(|r| r.get::<_, i32>(0) == 0) 258 + .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 259 259 } 260 260 261 261 pub async fn list_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 391 391 ) 392 392 .await?; 393 393 394 - Ok(res.map(|row| (row.get(0), row.get(1)))) 394 + res.map(|row| Ok((row.try_get(0)?, row.try_get(1)?))) 395 + .transpose() 395 396 } 396 397 397 398 pub async fn post_embed_insert<C: GenericClient>( ··· 536 537 conn: &mut C, 537 538 post: &str, 538 539 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 539 - let res = conn 540 - .query_opt( 541 - "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 - &[&post], 543 - ) 544 - .await? 545 - .map(|v| (v.get(0), v.get(1), v.get(2))); 546 - 547 - Ok(res) 540 + conn.query_opt( 541 + "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 + &[&post], 543 + ) 544 + .await? 545 + .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))) 546 + .transpose() 548 547 } 549 548 550 549 pub async fn postgate_upsert<C: GenericClient>( ··· 651 650 ) 652 651 .await?; 653 652 654 - Ok(res.map(|v| v.get(0))) 653 + res.map(|v| v.try_get(0)).transpose() 655 654 } 656 655 657 656 pub async fn starter_pack_upsert<C: GenericClient>( ··· 686 685 ], 687 686 ) 688 687 .await 689 - .map(|r| r.get::<_, i32>(0) == 0) 688 + .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 690 689 } 691 690 692 691 pub async fn starter_pack_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 731 730 conn: &mut C, 732 731 post: &str, 733 732 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 734 - let res = conn 733 + conn 735 734 .query_opt( 736 735 "SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL", 737 736 &[&post], 738 737 ) 739 738 .await? 740 - .map(|v| (v.get(0), v.get(1), v.get(2))); 741 - 742 - Ok(res) 739 + .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).transpose() 743 740 } 744 741 745 742 pub async fn threadgate_upsert<C: GenericClient>(
-4
consumer/src/indexer/mod.rs
··· 640 640 } 641 641 RecordTypes::AppBskyFeedPostgate(record) => { 642 642 if !at_uri_is_by(&record.post, repo) { 643 - tracing::warn!("tried to create a postgate on a post we don't control!"); 644 643 return Ok(()); 645 644 } 646 645 ··· 670 669 } 671 670 RecordTypes::AppBskyFeedThreadgate(record) => { 672 671 if !at_uri_is_by(&record.post, repo) { 673 - tracing::warn!("tried to create a threadgate on a post we don't control!"); 674 672 return Ok(()); 675 673 } 676 674 ··· 710 708 } 711 709 RecordTypes::AppBskyGraphListItem(record) => { 712 710 if !at_uri_is_by(&record.list, repo) { 713 - // it's also probably a bad idea to log *all* the attempts to do this... 714 - tracing::warn!("tried to create a listitem on a list we don't control!"); 715 711 return Ok(()); 716 712 } 717 713
+25
consumer/src/instrumentation.rs
··· 1 + use tracing::Subscriber; 2 + use tracing_subscriber::filter::Filtered; 3 + use tracing_subscriber::layer::SubscriberExt; 4 + use tracing_subscriber::registry::LookupSpan; 5 + use tracing_subscriber::util::SubscriberInitExt; 6 + use tracing_subscriber::{EnvFilter, Layer}; 7 + 8 + pub fn init_instruments(cfg: &crate::config::ConfigInstruments) { 9 + let log_layer = init_log(cfg.log_json); 10 + 11 + tracing_subscriber::registry().with(log_layer).init(); 12 + } 13 + 14 + fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 15 + where 16 + S: Subscriber + for<'span> LookupSpan<'span>, 17 + { 18 + let stdout_filter = EnvFilter::from_default_env(); 19 + 20 + match json { 21 + true => tracing_subscriber::fmt::layer().json().boxed(), 22 + false => tracing_subscriber::fmt::layer().boxed(), 23 + } 24 + .with_filter(stdout_filter) 25 + }
+2 -1
consumer/src/main.rs
··· 12 12 mod db; 13 13 mod firehose; 14 14 mod indexer; 15 + mod instrumentation; 15 16 mod label_indexer; 16 17 mod utils; 17 18 18 19 #[tokio::main] 19 20 async fn main() -> eyre::Result<()> { 20 - tracing_subscriber::fmt::init(); 21 21 PrometheusBuilder::new().install()?; 22 22 23 23 let cli = cmd::parse(); 24 24 let conf = config::load_config()?; 25 25 26 + instrumentation::init_instruments(&conf.instruments); 26 27 let user_agent = build_ua(&conf.ua_contact); 27 28 28 29 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
+1
lexica/src/app_bsky/mod.rs
··· 7 7 pub mod graph; 8 8 pub mod labeler; 9 9 pub mod richtext; 10 + pub mod unspecced; 10 11 11 12 #[derive(Clone, Default, Debug, Serialize)] 12 13 #[serde(rename_all = "camelCase")]
+33
lexica/src/app_bsky/unspecced.rs
··· 1 + use crate::app_bsky::feed::{BlockedAuthor, PostView}; 2 + use serde::Serialize; 3 + 4 + #[derive(Clone, Debug, Serialize)] 5 + pub struct ThreadV2Item { 6 + pub uri: String, 7 + pub depth: i32, 8 + pub value: ThreadV2ItemType, 9 + } 10 + 11 + #[derive(Clone, Debug, Serialize)] 12 + #[serde(tag = "$type")] 13 + pub enum ThreadV2ItemType { 14 + #[serde(rename = "app.bsky.unspecced.defs#threadItemPost")] 15 + Post(ThreadItemPost), 16 + #[serde(rename = "app.bsky.unspecced.defs#threadItemNoUnauthenticated")] 17 + NoUnauthenticated {}, 18 + #[serde(rename = "app.bsky.unspecced.defs#threadItemNotFound")] 19 + NotFound {}, 20 + #[serde(rename = "app.bsky.unspecced.defs#threadItemBlocked")] 21 + Blocked { author: BlockedAuthor }, 22 + } 23 + 24 + #[derive(Clone, Debug, Serialize)] 25 + #[serde(rename_all = "camelCase")] 26 + pub struct ThreadItemPost { 27 + pub post: PostView, 28 + pub more_parents: bool, 29 + pub more_replies: i32, 30 + pub op_thread: bool, 31 + pub hidden_by_threadgate: bool, 32 + pub muted_by_viewer: bool, 33 + }
+2 -2
migrations/2025-09-27-171241_post-tweaks/up.sql
··· 34 34 language plpgsql as 35 35 $$ 36 36 begin 37 - delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post'; 37 + delete from author_feeds where did = OLD.did and uri = OLD.at_uri and typ = 'post'; 38 38 return OLD; 39 39 end; 40 40 $$; ··· 67 67 language plpgsql as 68 68 $$ 69 69 begin 70 - delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost'; 70 + delete from author_feeds where did = OLD.did and post = OLD.post and typ = 'repost'; 71 71 return OLD; 72 72 end; 73 73 $$;
+8 -2
parakeet/Cargo.toml
··· 6 6 [dependencies] 7 7 async-recursion = "1.1.1" 8 8 axum = { version = "0.8", features = ["json"] } 9 + axum-tracing-opentelemetry = "0.32" 9 10 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 10 11 base64 = "0.22" 11 12 chrono = { version = "0.4.39", features = ["serde"] } ··· 21 22 jsonwebtoken = { git = "https://gitlab.com/parakeet-social/jsonwebtoken", branch = "es256k" } 22 23 lexica = { path = "../lexica" } 23 24 multibase = "0.9.1" 25 + opentelemetry = "0.31.0" 26 + opentelemetry-otlp = "0.31.0" 27 + opentelemetry_sdk = "0.31.0" 24 28 parakeet-db = { path = "../parakeet-db" } 25 - parakeet-index = { path = "../parakeet-index" } 29 + parakeet-index = { path = "../parakeet-index", features = ["otel"] } 26 30 redis = { version = "0.32", features = ["tokio-native-tls-comp"] } 27 31 reqwest = { version = "0.12", features = ["json"] } 28 32 serde = { version = "1.0.217", features = ["derive"] } 29 33 serde_ipld_dagcbor = "0.6.1" 30 34 serde_json = "1.0.134" 31 35 tokio = { version = "1.42.0", features = ["full"] } 36 + tower = "0.5" 32 37 tower-http = { version = "0.6.2", features = ["cors", "trace"] } 33 38 tracing = "0.1.40" 34 - tracing-subscriber = "0.3.18" 39 + tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } 40 + tracing-opentelemetry = "0.32"
+2 -2
parakeet/src/cache.rs
··· 29 29 type Val = V; 30 30 31 31 async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> { 32 - let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, &key).await.ok()?; 32 + let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, key).await.ok()?; 33 33 34 34 match serde_ipld_dagcbor::from_slice(&res?) { 35 35 Ok(v) => Some(v), ··· 57 57 } 58 58 59 59 async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> { 60 - let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, &key) 60 + let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, key) 61 61 .await 62 62 .ok()?; 63 63
+10
parakeet/src/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 + #[serde(flatten)] 17 + pub instruments: ConfigInstruments, 16 18 pub index_uri: String, 17 19 pub database_url: String, 18 20 pub redis_uri: String, ··· 27 29 pub did_allowlist: Option<Vec<String>>, 28 30 #[serde(default)] 29 31 pub migrate: bool, 32 + } 33 + 34 + #[derive(Debug, Deserialize)] 35 + pub struct ConfigInstruments { 36 + #[serde(default)] 37 + pub otel_enable: bool, 38 + #[serde(default)] 39 + pub log_json: bool, 30 40 } 31 41 32 42 #[derive(Debug, Deserialize)]
+115 -1
parakeet/src/db.rs
··· 1 1 use diesel::prelude::*; 2 - use diesel::sql_types::{Array, Bool, Nullable, Text}; 2 + use diesel::sql_types::{Array, Bool, Integer, Nullable, Text}; 3 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 4 + use parakeet_db::models::TextArray; 4 5 use parakeet_db::{schema, types}; 6 + use tracing::instrument; 5 7 8 + #[instrument(skip_all)] 6 9 pub async fn get_actor_status( 7 10 conn: &mut AsyncPgConnection, 8 11 did: &str, ··· 37 40 #[diesel(sql_type = Nullable<Text>)] 38 41 pub list_mute: Option<String>, 39 42 } 43 + 44 + #[instrument(skip_all)] 40 45 pub async fn get_profile_state( 41 46 conn: &mut AsyncPgConnection, 42 47 did: &str, ··· 49 54 .await 50 55 .optional() 51 56 } 57 + 58 + #[instrument(skip_all)] 52 59 pub async fn get_profile_states( 53 60 conn: &mut AsyncPgConnection, 54 61 did: &str, ··· 83 90 #[diesel(sql_type = diesel::sql_types::Bool)] 84 91 pub pinned: bool, 85 92 } 93 + 94 + #[instrument(skip_all)] 86 95 pub async fn get_post_state( 87 96 conn: &mut AsyncPgConnection, 88 97 did: &str, ··· 96 105 .optional() 97 106 } 98 107 108 + #[instrument(skip_all)] 99 109 pub async fn get_post_states( 100 110 conn: &mut AsyncPgConnection, 101 111 did: &str, ··· 119 129 pub block: Option<String>, 120 130 } 121 131 132 + #[instrument(skip_all)] 122 133 pub async fn get_list_state( 123 134 conn: &mut AsyncPgConnection, 124 135 did: &str, ··· 132 143 .optional() 133 144 } 134 145 146 + #[instrument(skip_all)] 135 147 pub async fn get_list_states( 136 148 conn: &mut AsyncPgConnection, 137 149 did: &str, ··· 144 156 .await 145 157 } 146 158 159 + #[instrument(skip_all)] 147 160 pub async fn get_like_state( 148 161 conn: &mut AsyncPgConnection, 149 162 did: &str, ··· 161 174 .optional() 162 175 } 163 176 177 + #[instrument(skip_all)] 164 178 pub async fn get_like_states( 165 179 conn: &mut AsyncPgConnection, 166 180 did: &str, ··· 181 195 .await 182 196 } 183 197 198 + #[instrument(skip_all)] 184 199 pub async fn get_pinned_post_uri( 185 200 conn: &mut AsyncPgConnection, 186 201 did: &str, ··· 196 211 .await 197 212 .optional() 198 213 } 214 + 215 + #[derive(Debug, QueryableByName)] 216 + #[diesel(check_for_backend(diesel::pg::Pg))] 217 + #[allow(unused)] 218 + pub struct ThreadItem { 219 + #[diesel(sql_type = Text)] 220 + pub at_uri: String, 221 + #[diesel(sql_type = Nullable<Text>)] 222 + pub parent_uri: Option<String>, 223 + #[diesel(sql_type = Nullable<Text>)] 224 + pub root_uri: Option<String>, 225 + #[diesel(sql_type = Integer)] 226 + pub depth: i32, 227 + } 228 + 229 + #[instrument(skip_all)] 230 + pub async fn get_thread_children( 231 + conn: &mut AsyncPgConnection, 232 + uri: &str, 233 + depth: i32, 234 + ) -> QueryResult<Vec<ThreadItem>> { 235 + diesel::sql_query(include_str!("sql/thread.sql")) 236 + .bind::<Text, _>(uri) 237 + .bind::<Integer, _>(depth) 238 + .load(conn) 239 + .await 240 + } 241 + 242 + #[instrument(skip_all)] 243 + pub async fn get_thread_children_branching( 244 + conn: &mut AsyncPgConnection, 245 + uri: &str, 246 + depth: i32, 247 + branching_factor: i32, 248 + ) -> QueryResult<Vec<ThreadItem>> { 249 + diesel::sql_query(include_str!("sql/thread_branching.sql")) 250 + .bind::<Text, _>(uri) 251 + .bind::<Integer, _>(depth) 252 + .bind::<Integer, _>(branching_factor) 253 + .load(conn) 254 + .await 255 + } 256 + 257 + #[derive(Debug, QueryableByName)] 258 + #[diesel(check_for_backend(diesel::pg::Pg))] 259 + pub struct HiddenThreadChildItem { 260 + #[diesel(sql_type = Text)] 261 + pub at_uri: String, 262 + } 263 + 264 + #[instrument(skip_all)] 265 + pub async fn get_thread_children_hidden( 266 + conn: &mut AsyncPgConnection, 267 + uri: &str, 268 + root: &str, 269 + ) -> QueryResult<Vec<HiddenThreadChildItem>> { 270 + diesel::sql_query(include_str!("sql/thread_v2_hidden_children.sql")) 271 + .bind::<Text, _>(uri) 272 + .bind::<Text, _>(root) 273 + .load(conn) 274 + .await 275 + } 276 + 277 + #[instrument(skip_all)] 278 + pub async fn get_thread_parents( 279 + conn: &mut AsyncPgConnection, 280 + uri: &str, 281 + height: i32, 282 + ) -> QueryResult<Vec<ThreadItem>> { 283 + diesel::sql_query(include_str!("sql/thread_parent.sql")) 284 + .bind::<Text, _>(uri) 285 + .bind::<Integer, _>(height) 286 + .load(conn) 287 + .await 288 + } 289 + 290 + #[instrument(skip_all)] 291 + pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> { 292 + schema::posts::table 293 + .select(schema::posts::root_uri) 294 + .find(&uri) 295 + .get_result(conn) 296 + .await 297 + .optional() 298 + .map(|v| v.flatten()) 299 + } 300 + 301 + #[instrument(skip_all)] 302 + pub async fn get_threadgate_hiddens( 303 + conn: &mut AsyncPgConnection, 304 + uri: &str, 305 + ) -> QueryResult<Option<TextArray>> { 306 + schema::threadgates::table 307 + .select(schema::threadgates::hidden_replies) 308 + .find(&uri) 309 + .get_result(conn) 310 + .await 311 + .optional() 312 + }
+3
parakeet/src/hydration/embed.rs
··· 8 8 use lexica::app_bsky::feed::PostView; 9 9 use parakeet_db::models; 10 10 use std::collections::HashMap; 11 + use tracing::instrument; 11 12 12 13 fn build_aspect_ratio(height: Option<i32>, width: Option<i32>) -> Option<AspectRatio> { 13 14 height ··· 176 177 out 177 178 } 178 179 180 + #[instrument(skip_all)] 179 181 pub async fn hydrate_embed(&self, post: String) -> Option<Embed> { 180 182 let (embed, author) = self.loaders.embed.load(post).await?; 181 183 ··· 195 197 } 196 198 } 197 199 200 + #[instrument(skip_all)] 198 201 pub async fn hydrate_embeds(&self, posts: Vec<String>) -> HashMap<String, Embed> { 199 202 let embeds = self.loaders.embed.load_many(posts).await; 200 203
+5
parakeet/src/hydration/feedgen.rs
··· 5 5 use parakeet_db::models; 6 6 use std::collections::HashMap; 7 7 use std::str::FromStr; 8 + use tracing::instrument; 8 9 9 10 fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState { 10 11 GeneratorViewerState { ··· 49 50 } 50 51 51 52 impl super::StatefulHydrator<'_> { 53 + #[instrument(skip_all)] 52 54 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 53 55 let labels = self.get_label(&feedgen).await; 54 56 let viewer = self.get_feedgen_viewer_state(&feedgen).await; ··· 61 63 )) 62 64 } 63 65 66 + #[instrument(skip_all)] 64 67 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 65 68 let labels = self.get_label_many(&feedgens).await; 66 69 let viewers = self.get_feedgen_viewer_states(&feedgens).await; ··· 90 93 .collect() 91 94 } 92 95 96 + #[instrument(skip_all)] 93 97 async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> { 94 98 if let Some(viewer) = &self.current_actor { 95 99 let data = self.loaders.like_state.get(viewer, subject).await?; ··· 100 104 } 101 105 } 102 106 107 + #[instrument(skip_all)] 103 108 async fn get_feedgen_viewer_states( 104 109 &self, 105 110 subjects: &[String],
+12 -9
parakeet/src/hydration/labeler.rs
··· 8 8 use parakeet_db::models; 9 9 use std::collections::HashMap; 10 10 use std::str::FromStr; 11 + use tracing::instrument; 11 12 12 13 fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState { 13 14 LabelerViewerState { ··· 42 43 likes: Option<i32>, 43 44 ) -> LabelerViewDetailed { 44 45 let reason_types = labeler.reasons.map(|v| { 45 - v.into_iter() 46 - .flatten() 47 - .filter_map(|v| ReasonType::from_str(&v).ok()) 46 + v.iter() 47 + .filter_map(|v| ReasonType::from_str(v).ok()) 48 48 .collect() 49 49 }); 50 50 ··· 74 74 }) 75 75 .collect(); 76 76 let subject_types = labeler.subject_types.map(|v| { 77 - v.into_iter() 78 - .flatten() 79 - .filter_map(|v| SubjectType::from_str(&v).ok()) 77 + v.iter() 78 + .filter_map(|v| SubjectType::from_str(v).ok()) 80 79 .collect() 81 80 }); 82 - let subject_collections = labeler 83 - .subject_collections 84 - .map(|v| v.into_iter().flatten().collect()); 81 + let subject_collections = labeler.subject_collections.map(Vec::from); 85 82 86 83 LabelerViewDetailed { 87 84 uri: format!("at://{}/app.bsky.labeler.service/self", labeler.did), ··· 102 99 } 103 100 104 101 impl StatefulHydrator<'_> { 102 + #[instrument(skip_all)] 105 103 pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> { 106 104 let labels = self.get_label(&labeler).await; 107 105 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 112 110 Some(build_view(labeler, creator, labels, viewer, likes)) 113 111 } 114 112 113 + #[instrument(skip_all)] 115 114 pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> { 116 115 let labels = self.get_label_many(&labelers).await; 117 116 let labelers = self.loaders.labeler.load_many(labelers).await; ··· 137 136 .collect() 138 137 } 139 138 139 + #[instrument(skip_all)] 140 140 pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> { 141 141 let labels = self.get_label(&labeler).await; 142 142 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 149 149 )) 150 150 } 151 151 152 + #[instrument(skip_all)] 152 153 pub async fn hydrate_labelers_detailed( 153 154 &self, 154 155 labelers: Vec<String>, ··· 179 180 .collect() 180 181 } 181 182 183 + #[instrument(skip_all)] 182 184 async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> { 183 185 if let Some(viewer) = &self.current_actor { 184 186 let data = self ··· 193 195 } 194 196 } 195 197 198 + #[instrument(skip_all)] 196 199 async fn get_labeler_viewer_states( 197 200 &self, 198 201 subjects: &[String],
+7
parakeet/src/hydration/list.rs
··· 6 6 use parakeet_db::models; 7 7 use std::collections::HashMap; 8 8 use std::str::FromStr; 9 + use tracing::instrument; 9 10 10 11 fn build_viewer(data: ListStateRet) -> ListViewerState { 11 12 ListViewerState { ··· 69 70 } 70 71 71 72 impl StatefulHydrator<'_> { 73 + #[instrument(skip_all)] 72 74 pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> { 73 75 let labels = self.get_label(&list).await; 74 76 let viewer = self.get_list_viewer_state(&list).await; ··· 77 79 build_basic(list, count, labels, viewer, &self.cdn) 78 80 } 79 81 82 + #[instrument(skip_all)] 80 83 pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> { 81 84 if lists.is_empty() { 82 85 return HashMap::new(); ··· 97 100 .collect() 98 101 } 99 102 103 + #[instrument(skip_all)] 100 104 pub async fn hydrate_list(&self, list: String) -> Option<ListView> { 101 105 let labels = self.get_label(&list).await; 102 106 let viewer = self.get_list_viewer_state(&list).await; ··· 106 110 build_listview(list, count, profile, labels, viewer, &self.cdn) 107 111 } 108 112 113 + #[instrument(skip_all)] 109 114 pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> { 110 115 if lists.is_empty() { 111 116 return HashMap::new(); ··· 131 136 .collect() 132 137 } 133 138 139 + #[instrument(skip_all)] 134 140 async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> { 135 141 if let Some(viewer) = &self.current_actor { 136 142 let data = self.loaders.list_state.get(viewer, subject).await?; ··· 141 147 } 142 148 } 143 149 150 + #[instrument(skip_all)] 144 151 async fn get_list_viewer_states( 145 152 &self, 146 153 subjects: &[String],
+4
parakeet/src/hydration/mod.rs
··· 63 63 } 64 64 } 65 65 66 + #[tracing::instrument(skip_all)] 66 67 async fn get_label(&self, uri: &str) -> Vec<parakeet_db::models::Label> { 67 68 self.loaders.label.load(uri, self.accept_labelers).await 68 69 } 69 70 71 + #[tracing::instrument(skip_all)] 70 72 async fn get_profile_label(&self, did: &str) -> Vec<parakeet_db::models::Label> { 71 73 let uris = &[ 72 74 did.to_string(), ··· 80 82 .collect() 81 83 } 82 84 85 + #[tracing::instrument(skip_all)] 83 86 async fn get_label_many( 84 87 &self, 85 88 uris: &[String], ··· 90 93 .await 91 94 } 92 95 96 + #[tracing::instrument(skip_all)] 93 97 async fn get_profile_label_many( 94 98 &self, 95 99 uris: &[String],
+12 -3
parakeet/src/hydration/posts.rs
··· 11 11 use parakeet_db::models; 12 12 use parakeet_index::PostStats; 13 13 use std::collections::HashMap; 14 + use tracing::instrument; 14 15 15 16 fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState { 16 17 let is_me = did == data.did; ··· 82 83 } 83 84 84 85 impl StatefulHydrator<'_> { 86 + #[instrument(skip_all)] 85 87 async fn hydrate_threadgate( 86 88 &self, 87 89 threadgate: Option<models::Threadgate>, ··· 89 91 let threadgate = threadgate?; 90 92 91 93 let lists = match threadgate.allowed_lists.as_ref() { 92 - Some(allowed_lists) => allowed_lists.iter().flatten().cloned().collect(), 94 + Some(allowed_lists) => allowed_lists.clone().into(), 93 95 None => Vec::new(), 94 96 }; 95 97 let lists = self.hydrate_lists_basic(lists).await; ··· 100 102 )) 101 103 } 102 104 105 + #[instrument(skip_all)] 103 106 async fn hydrate_threadgates( 104 107 &self, 105 108 threadgates: Vec<models::Threadgate>, 106 109 ) -> HashMap<String, ThreadgateView> { 107 110 let lists = threadgates.iter().fold(Vec::new(), |mut acc, c| { 108 111 if let Some(lists) = &c.allowed_lists { 109 - acc.extend(lists.iter().flatten().cloned()); 112 + acc.extend(lists.clone().0); 110 113 } 111 114 acc 112 115 }); ··· 118 121 let this_lists = match &threadgate.allowed_lists { 119 122 Some(allowed_lists) => allowed_lists 120 123 .iter() 121 - .filter_map(|v| v.clone().and_then(|v| lists.get(&v).cloned())) 124 + .filter_map(|v| lists.get(v).cloned()) 122 125 .collect(), 123 126 None => Vec::new(), 124 127 }; ··· 131 134 .collect() 132 135 } 133 136 137 + #[instrument(skip_all)] 134 138 pub async fn hydrate_post(&self, post: String) -> Option<PostView> { 135 139 let stats = self.loaders.post_stats.load(post.clone()).await; 136 140 let (post, threadgate) = self.loaders.posts.load(post).await?; ··· 145 149 ))) 146 150 } 147 151 152 + #[instrument(skip_all)] 148 153 async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> { 149 154 let stats = self.loaders.post_stats.load_many(posts.clone()).await; 150 155 let posts = self.loaders.posts.load_many(posts).await; ··· 184 189 .collect() 185 190 } 186 191 192 + #[instrument(skip_all)] 187 193 pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 188 194 self.hydrate_posts_inner(posts) 189 195 .await ··· 192 198 .collect() 193 199 } 194 200 201 + #[instrument(skip_all)] 195 202 pub async fn hydrate_feed_posts( 196 203 &self, 197 204 posts: Vec<RawFeedItem>, ··· 295 302 .collect() 296 303 } 297 304 305 + #[instrument(skip_all)] 298 306 async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> { 299 307 if let Some(viewer) = &self.current_actor { 300 308 let data = self.loaders.post_state.get(viewer, subject).await?; ··· 305 313 } 306 314 } 307 315 316 + #[instrument(skip_all)] 308 317 async fn get_post_viewer_states( 309 318 &self, 310 319 subjects: &[String],
+12 -1
parakeet/src/hydration/profile.rs
··· 12 12 use std::collections::HashMap; 13 13 use std::str::FromStr; 14 14 use std::sync::OnceLock; 15 + use tracing::instrument; 15 16 16 17 pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new(); 17 18 ··· 51 52 .followed 52 53 .map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject)); 53 54 54 - let blocking = data.list_block.or(data.blocking); 55 + let blocking = data.list_block.or(data 56 + .blocking 57 + .map(|rkey| format!("at://{}/app.bsky.graph.block/{rkey}", data.did))); 55 58 56 59 ProfileViewerState { 57 60 muted: data.muting.unwrap_or_default(), ··· 272 275 } 273 276 274 277 impl super::StatefulHydrator<'_> { 278 + #[instrument(skip_all)] 275 279 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 276 280 let labels = self.get_profile_label(&did).await; 277 281 let viewer = self.get_profile_viewer_state(&did).await; ··· 289 293 )) 290 294 } 291 295 296 + #[instrument(skip_all)] 292 297 pub async fn hydrate_profiles_basic( 293 298 &self, 294 299 dids: Vec<String>, ··· 313 318 .collect() 314 319 } 315 320 321 + #[instrument(skip_all)] 316 322 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 317 323 let labels = self.get_profile_label(&did).await; 318 324 let viewer = self.get_profile_viewer_state(&did).await; ··· 330 336 )) 331 337 } 332 338 339 + #[instrument(skip_all)] 333 340 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 334 341 let labels = self.get_profile_label_many(&dids).await; 335 342 let viewers = self.get_profile_viewer_states(&dids).await; ··· 351 358 .collect() 352 359 } 353 360 361 + #[instrument(skip_all)] 354 362 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 355 363 let labels = self.get_profile_label(&did).await; 356 364 let viewer = self.get_profile_viewer_state(&did).await; ··· 368 376 )) 369 377 } 370 378 379 + #[instrument(skip_all)] 371 380 pub async fn hydrate_profiles_detailed( 372 381 &self, 373 382 dids: Vec<String>, ··· 392 401 .collect() 393 402 } 394 403 404 + #[instrument(skip_all)] 395 405 async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> { 396 406 if let Some(viewer) = &self.current_actor { 397 407 let data = self.loaders.profile_state.get(viewer, subject).await?; ··· 411 421 } 412 422 } 413 423 424 + #[instrument(skip_all)] 414 425 async fn get_profile_viewer_states( 415 426 &self, 416 427 dids: &[String],
+11 -9
parakeet/src/hydration/starter_packs.rs
··· 4 4 use lexica::app_bsky::graph::{ListViewBasic, StarterPackView, StarterPackViewBasic}; 5 5 use parakeet_db::models; 6 6 use std::collections::HashMap; 7 + use tracing::instrument; 7 8 8 9 fn build_basic( 9 10 starter_pack: models::StaterPack, ··· 50 51 } 51 52 52 53 impl StatefulHydrator<'_> { 54 + #[instrument(skip_all)] 53 55 pub async fn hydrate_starterpack_basic(&self, pack: String) -> Option<StarterPackViewBasic> { 54 56 let labels = self.get_label(&pack).await; 55 57 let sp = self.loaders.starterpacks.load(pack).await?; ··· 59 61 Some(build_basic(sp, creator, labels, list_item_count)) 60 62 } 61 63 64 + #[instrument(skip_all)] 62 65 pub async fn hydrate_starterpacks_basic( 63 66 &self, 64 67 packs: Vec<String>, ··· 86 89 .collect() 87 90 } 88 91 92 + #[instrument(skip_all)] 89 93 pub async fn hydrate_starterpack(&self, pack: String) -> Option<StarterPackView> { 90 94 let labels = self.get_label(&pack).await; 91 95 let sp = self.loaders.starterpacks.load(pack).await?; ··· 93 97 let creator = self.hydrate_profile_basic(sp.owner.clone()).await?; 94 98 let list = self.hydrate_list_basic(sp.list.clone()).await; 95 99 96 - let feeds = sp 97 - .feeds 98 - .clone() 99 - .unwrap_or_default() 100 - .into_iter() 101 - .flatten() 100 + let feeds = sp.feeds.clone().unwrap_or_default(); 101 + let feeds = self 102 + .hydrate_feedgens(feeds.into()) 103 + .await 104 + .into_values() 102 105 .collect(); 103 - let feeds = self.hydrate_feedgens(feeds).await.into_values().collect(); 104 106 105 107 Some(build_spview(sp, creator, labels, list, feeds)) 106 108 } 107 109 110 + #[instrument(skip_all)] 108 111 pub async fn hydrate_starterpacks( 109 112 &self, 110 113 packs: Vec<String>, ··· 119 122 let feeds = packs 120 123 .values() 121 124 .filter_map(|pack| pack.feeds.clone()) 122 - .flat_map(|feeds| feeds.into_iter().flatten()) 125 + .flat_map(Vec::from) 123 126 .collect(); 124 127 125 128 let creators = self.hydrate_profiles_basic(creators).await; ··· 133 136 let list = lists.get(&pack.list).cloned(); 134 137 let feeds = pack.feeds.as_ref().map(|v| { 135 138 v.iter() 136 - .flatten() 137 139 .filter_map(|feed| feeds.get(feed).cloned()) 138 140 .collect() 139 141 });
+57
parakeet/src/instrumentation.rs
··· 1 + use opentelemetry::trace::TracerProvider; 2 + use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; 3 + use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider}; 4 + use tracing::Subscriber; 5 + use tracing_opentelemetry::OpenTelemetryLayer; 6 + use tracing_subscriber::filter::Filtered; 7 + use tracing_subscriber::layer::SubscriberExt; 8 + use tracing_subscriber::registry::LookupSpan; 9 + use tracing_subscriber::util::SubscriberInitExt; 10 + use tracing_subscriber::{EnvFilter, Layer}; 11 + 12 + pub fn init_instruments(cfg: &crate::config::ConfigInstruments) { 13 + let otel_layer = cfg.otel_enable.then(init_otel); 14 + let log_layer = init_log(cfg.log_json); 15 + 16 + tracing_subscriber::registry() 17 + .with(log_layer) 18 + .with(otel_layer) 19 + .init(); 20 + } 21 + 22 + fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S> 23 + where 24 + S: Subscriber + for<'span> LookupSpan<'span>, 25 + { 26 + let span_exporter = SpanExporter::builder() 27 + .with_http() 28 + .with_protocol(Protocol::HttpBinary) 29 + .build() 30 + .unwrap(); 31 + 32 + let tracer_provider = SdkTracerProvider::builder() 33 + .with_batch_exporter(span_exporter) 34 + .with_sampler(Sampler::AlwaysOn) 35 + .build(); 36 + 37 + opentelemetry::global::set_tracer_provider(tracer_provider.clone()); 38 + 39 + let tracer = tracer_provider.tracer("parakeet"); 40 + let otel_filter = EnvFilter::new("info,otel::tracing=trace,tower_http=off"); 41 + 42 + OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 43 + } 44 + 45 + fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 46 + where 47 + S: Subscriber + for<'span> LookupSpan<'span>, 48 + { 49 + let stdout_filter = 50 + EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 51 + 52 + match json { 53 + true => tracing_subscriber::fmt::layer().json().boxed(), 54 + false => tracing_subscriber::fmt::layer().boxed(), 55 + } 56 + .with_filter(stdout_filter) 57 + }
+31 -5
parakeet/src/loaders.rs
··· 4 4 use dataloader::async_cached::Loader; 5 5 use dataloader::non_cached::Loader as NonCachedLoader; 6 6 use dataloader::BatchFn; 7 + use diesel::dsl::sql; 7 8 use diesel::prelude::*; 8 9 use diesel_async::pooled_connection::deadpool::Pool; 9 10 use diesel_async::{AsyncPgConnection, RunQueryDsl}; ··· 14 15 use serde::{Deserialize, Serialize}; 15 16 use std::collections::HashMap; 16 17 use std::str::FromStr; 18 + use tracing::instrument; 17 19 18 20 type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>; 19 21 ··· 62 64 ) -> Dataloaders { 63 65 Dataloaders { 64 66 embed: new_plc_loader(EmbedLoader(pool.clone()), &rc, "embed", 3600), 65 - feedgen: new_plc_loader(FeedGenLoader(pool.clone(), idxc.clone()), &rc, "feedgen", 600), 67 + feedgen: new_plc_loader(FeedGenLoader(pool.clone()), &rc, "feedgen", 600), 66 68 handle: new_plc_loader(HandleLoader(pool.clone()), &rc, "handle", 60), 67 69 label: LabelLoader(pool.clone()), // CARE: never cache this. 68 - labeler: new_plc_loader(LabelServiceLoader(pool.clone(), idxc.clone()), &rc, "labeler", 600), 70 + labeler: new_plc_loader(LabelServiceLoader(pool.clone()), &rc, "labeler", 600), 69 71 like: NonCachedLoader::new(LikeLoader(idxc.clone())), 70 72 like_state: LikeRecordLoader(pool.clone()), 71 73 list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600), ··· 84 86 85 87 pub struct LikeLoader(parakeet_index::Client); 86 88 impl BatchFn<String, i32> for LikeLoader { 89 + #[instrument(name = "LikeLoader", skip_all)] 87 90 async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> { 88 91 let res = self 89 92 .0 ··· 106 109 107 110 pub struct LikeRecordLoader(Pool<AsyncPgConnection>); 108 111 impl LikeRecordLoader { 112 + #[instrument(name = "LikeRecordLoader::get", skip_all)] 109 113 pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> { 110 114 let mut conn = self.0.get().await.unwrap(); 111 115 ··· 117 121 }) 118 122 } 119 123 124 + #[instrument(name = "LikeRecordLoader::get_many", skip_all)] 120 125 pub async fn get_many( 121 126 &self, 122 127 did: &str, ··· 138 143 139 144 pub struct HandleLoader(Pool<AsyncPgConnection>); 140 145 impl BatchFn<String, String> for HandleLoader { 146 + #[instrument(name = "HandleLoader", skip_all)] 141 147 async fn load(&mut self, keys: &[String]) -> HashMap<String, String> { 142 148 let mut conn = self.0.get().await.unwrap(); 143 149 ··· 170 176 Option<ProfileAllowSubscriptions>, 171 177 ); 172 178 impl BatchFn<String, ProfileLoaderRet> for ProfileLoader { 179 + #[instrument(name = "ProfileLoader", skip_all)] 173 180 async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> { 174 181 let mut conn = self.0.get().await.unwrap(); 175 182 ··· 231 238 232 239 pub struct ProfileStatsLoader(parakeet_index::Client); 233 240 impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader { 241 + #[instrument(name = "ProfileStatsLoader", skip_all)] 234 242 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> { 235 243 let stats_req = parakeet_index::GetStatsManyReq { 236 244 uris: keys.to_vec(), ··· 247 255 248 256 pub struct ProfileStateLoader(Pool<AsyncPgConnection>); 249 257 impl ProfileStateLoader { 258 + #[instrument(name = "ProfileStateLoader::get", skip_all)] 250 259 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> { 251 260 let mut conn = self.0.get().await.unwrap(); 252 261 ··· 258 267 }) 259 268 } 260 269 270 + #[instrument(name = "ProfileStateLoader::get_many", skip_all)] 261 271 pub async fn get_many( 262 272 &self, 263 273 did: &str, ··· 278 288 pub struct ListLoader(Pool<AsyncPgConnection>); 279 289 type ListLoaderRet = (models::List, i64); 280 290 impl BatchFn<String, ListLoaderRet> for ListLoader { 291 + #[instrument(name = "ListLoaderRet", skip_all)] 281 292 async fn load(&mut self, keys: &[String]) -> HashMap<String, ListLoaderRet> { 282 293 let mut conn = self.0.get().await.unwrap(); 283 294 ··· 309 320 310 321 pub struct ListStateLoader(Pool<AsyncPgConnection>); 311 322 impl ListStateLoader { 323 + #[instrument(name = "ListStateLoader::get", skip_all)] 312 324 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> { 313 325 let mut conn = self.0.get().await.unwrap(); 314 326 ··· 320 332 }) 321 333 } 322 334 335 + #[instrument(name = "ListStateLoader::get_many", skip_all)] 323 336 pub async fn get_many( 324 337 &self, 325 338 did: &str, ··· 337 350 } 338 351 } 339 352 340 - pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 353 + pub struct FeedGenLoader(Pool<AsyncPgConnection>); 341 354 impl BatchFn<String, models::FeedGen> for FeedGenLoader { 355 + #[instrument(name = "FeedGenLoader", skip_all)] 342 356 async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> { 343 357 let mut conn = self.0.get().await.unwrap(); 344 358 ··· 364 378 pub struct PostLoader(Pool<AsyncPgConnection>); 365 379 type PostLoaderRet = (models::Post, Option<models::Threadgate>); 366 380 impl BatchFn<String, PostLoaderRet> for PostLoader { 381 + #[instrument(name = "PostLoader", skip_all)] 367 382 async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> { 368 383 let mut conn = self.0.get().await.unwrap(); 369 384 370 385 let res = schema::posts::table 371 - .left_join(schema::threadgates::table) 386 + .left_join(schema::threadgates::table.on( 387 + schema::threadgates::post_uri.eq(sql("coalesce(posts.root_uri, posts.at_uri)")), 388 + )) 372 389 .select(( 373 390 models::Post::as_select(), 374 391 Option::<models::Threadgate>::as_select(), ··· 392 409 393 410 pub struct PostStatsLoader(parakeet_index::Client); 394 411 impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader { 412 + #[instrument(name = "PostStatsLoader", skip_all)] 395 413 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> { 396 414 let stats_req = parakeet_index::GetStatsManyReq { 397 415 uris: keys.to_vec(), ··· 408 426 409 427 pub struct PostStateLoader(Pool<AsyncPgConnection>); 410 428 impl PostStateLoader { 429 + #[instrument(name = "PostStateLoader::get", skip_all)] 411 430 pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> { 412 431 let mut conn = self.0.get().await.unwrap(); 413 432 ··· 419 438 }) 420 439 } 421 440 441 + #[instrument(name = "PostStateLoader::get_many", skip_all)] 422 442 pub async fn get_many( 423 443 &self, 424 444 did: &str, ··· 446 466 RecordWithMedia(models::PostEmbedRecord, Box<EmbedLoaderRet>), 447 467 } 448 468 impl BatchFn<String, (EmbedLoaderRet, String)> for EmbedLoader { 469 + #[instrument(name = "EmbedLoader", skip_all)] 449 470 async fn load(&mut self, keys: &[String]) -> HashMap<String, (EmbedLoaderRet, String)> { 450 471 let mut conn = self.0.get().await.unwrap(); 451 472 ··· 528 549 pub struct StarterPackLoader(Pool<AsyncPgConnection>); 529 550 type StarterPackLoaderRet = models::StaterPack; 530 551 impl BatchFn<String, StarterPackLoaderRet> for StarterPackLoader { 552 + #[instrument(name = "StarterPackLoader", skip_all)] 531 553 async fn load(&mut self, keys: &[String]) -> HashMap<String, StarterPackLoaderRet> { 532 554 let mut conn = self.0.get().await.unwrap(); 533 555 ··· 550 572 } 551 573 } 552 574 553 - pub struct LabelServiceLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 575 + pub struct LabelServiceLoader(Pool<AsyncPgConnection>); 554 576 type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>); 555 577 impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader { 578 + #[instrument(name = "LabelServiceLoader", skip_all)] 556 579 async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> { 557 580 let mut conn = self.0.get().await.unwrap(); 558 581 ··· 591 614 // but it should live here anyway 592 615 pub struct LabelLoader(Pool<AsyncPgConnection>); 593 616 impl LabelLoader { 617 + #[instrument(name = "LabelLoader::load", skip_all)] 594 618 pub async fn load(&self, uri: &str, services: &[LabelConfigItem]) -> Vec<models::Label> { 595 619 let mut conn = self.0.get().await.unwrap(); 596 620 ··· 610 634 }) 611 635 } 612 636 637 + #[instrument(name = "LabelLoader::load_many", skip_all)] 613 638 pub async fn load_many( 614 639 &self, 615 640 uris: &[String], ··· 644 669 645 670 pub struct VerificationLoader(Pool<AsyncPgConnection>); 646 671 impl BatchFn<String, Vec<models::VerificationEntry>> for VerificationLoader { 672 + #[instrument(name = "VerificationLoader", skip_all)] 647 673 async fn load(&mut self, keys: &[String]) -> HashMap<String, Vec<models::VerificationEntry>> { 648 674 let mut conn = self.0.get().await.unwrap(); 649 675
+14 -5
parakeet/src/main.rs
··· 1 + use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; 1 2 use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; 2 3 use diesel_async::pooled_connection::deadpool::Pool; 3 4 use diesel_async::pooled_connection::AsyncDieselConnectionManager; ··· 14 15 mod config; 15 16 mod db; 16 17 mod hydration; 18 + mod instrumentation; 17 19 mod loaders; 18 20 mod xrpc; 19 21 ··· 31 33 32 34 #[tokio::main] 33 35 async fn main() -> eyre::Result<()> { 34 - tracing_subscriber::fmt::init(); 35 - 36 36 let conf = config::load_config()?; 37 37 38 + instrumentation::init_instruments(&conf.instruments); 39 + 38 40 let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url); 39 41 let pool = Pool::builder(db_mgr).build()?; 40 42 ··· 52 54 let redis_client = redis::Client::open(conf.redis_uri)?; 53 55 let redis_mp = redis_client.get_multiplexed_tokio_connection().await?; 54 56 55 - let index_client = parakeet_index::Client::connect(conf.index_uri).await?; 57 + let index_client = parakeet_index::connect_with_otel(conf.index_uri) 58 + .await 59 + .map_err(|e| eyre::eyre!(e))?; 56 60 57 61 let dataloaders = Arc::new(loaders::Dataloaders::new( 58 62 pool.clone(), ··· 79 83 80 84 let did_doc = did_web_doc(&conf.service); 81 85 86 + let mw = tower::ServiceBuilder::new() 87 + .option_layer(conf.instruments.otel_enable.then(OtelInResponseLayer::default)) 88 + .option_layer(conf.instruments.otel_enable.then(OtelAxumLayer::default)) 89 + .layer(TraceLayer::new_for_http()) 90 + .layer(cors); 91 + 82 92 let app = axum::Router::new() 83 93 .nest("/xrpc", xrpc::xrpc_routes()) 84 94 .route( 85 95 "/.well-known/did.json", 86 96 axum::routing::get(async || axum::Json(did_doc)), 87 97 ) 88 - .layer(TraceLayer::new_for_http()) 89 - .layer(cors) 98 + .layer(mw) 90 99 .with_state(GlobalState { 91 100 pool, 92 101 redis_mp,
+1 -1
parakeet/src/sql/thread.sql
··· 1 - with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth 1 + with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth 2 2 from posts 3 3 where parent_uri = $1 and violates_threadgate=FALSE 4 4 union all
+13
parakeet/src/sql/thread_branching.sql
··· 1 + with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth 2 + from posts 3 + where parent_uri = $1 4 + and violates_threadgate = FALSE 5 + union all 6 + (select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1 7 + from posts p 8 + join thread on p.parent_uri = thread.at_uri 9 + where thread.depth <= $2 10 + and violates_threadgate = FALSE 11 + LIMIT $3)) 12 + select * 13 + from thread;
+6
parakeet/src/sql/thread_v2_hidden_children.sql
··· 1 + select at_uri 2 + from posts 3 + where parent_uri = $1 4 + and at_uri = any (select unnest(hidden_replies) 5 + from threadgates 6 + where post_uri = $2)
+1 -1
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 36 36 rkey: None, 37 37 subject: &form.uri, 38 38 subject_cid: Some(form.cid), 39 - subject_type: &parts[1], 39 + subject_type: parts[1], 40 40 tags: vec![], 41 41 }; 42 42
+8 -26
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 24 24 use reqwest::Url; 25 25 use serde::{Deserialize, Serialize}; 26 26 use std::collections::HashMap; 27 + use tracing::instrument; 27 28 28 29 const FEEDGEN_SERVICE_ID: &str = "#bsky_fg"; 29 30 ··· 160 161 161 162 #[derive(Debug, Default, Eq, PartialEq, Deserialize)] 162 163 #[serde(rename_all = "snake_case")] 164 + #[allow(clippy::enum_variant_names)] 163 165 pub enum GetAuthorFeedFilter { 164 166 #[default] 165 167 PostsWithReplies, ··· 198 200 if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? { 199 201 if psr.blocked.unwrap_or_default() { 200 202 // they block us 201 - return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)) 203 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)); 202 204 } else if psr.blocking.is_some() { 203 205 // we block them 204 - return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)) 206 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)); 205 207 } 206 208 } 207 209 } ··· 361 363 pub threadgate: Option<ThreadgateView>, 362 364 } 363 365 364 - #[derive(Debug, QueryableByName)] 365 - #[diesel(check_for_backend(diesel::pg::Pg))] 366 - struct ThreadItem { 367 - #[diesel(sql_type = diesel::sql_types::Text)] 368 - at_uri: String, 369 - #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 370 - parent_uri: Option<String>, 371 - // #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 372 - // root_uri: Option<String>, 373 - #[diesel(sql_type = diesel::sql_types::Integer)] 374 - depth: i32, 375 - } 376 - 377 366 pub async fn get_post_thread( 378 367 State(state): State<GlobalState>, 379 368 AtpAcceptLabelers(labelers): AtpAcceptLabelers, ··· 409 398 } 410 399 } 411 400 412 - let replies = diesel::sql_query(include_str!("../../../sql/thread.sql")) 413 - .bind::<diesel::sql_types::Text, _>(&uri) 414 - .bind::<diesel::sql_types::Integer, _>(depth as i32) 415 - .load::<ThreadItem>(&mut conn) 416 - .await?; 417 - 418 - let parents = diesel::sql_query(include_str!("../../../sql/thread_parent.sql")) 419 - .bind::<diesel::sql_types::Text, _>(&uri) 420 - .bind::<diesel::sql_types::Integer, _>(parent_height as i32) 421 - .load::<ThreadItem>(&mut conn) 422 - .await?; 401 + let replies = crate::db::get_thread_children(&mut conn, &uri, depth as i32).await?; 402 + let parents = crate::db::get_thread_parents(&mut conn, &uri, parent_height as i32).await?; 423 403 424 404 let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect(); 425 405 let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect(); ··· 634 614 .or(schema::posts::embed_subtype.eq_any(filter)) 635 615 } 636 616 617 + #[instrument(skip_all)] 637 618 async fn get_feed_skeleton( 638 619 feed: &str, 639 620 service: &str, ··· 675 656 } 676 657 } 677 658 659 + #[instrument(skip_all)] 678 660 async fn get_skeleton_repost_data( 679 661 conn: &mut AsyncPgConnection, 680 662 reposts: Vec<String>,
+3
parakeet/src/xrpc/app_bsky/mod.rs
··· 6 6 mod feed; 7 7 mod graph; 8 8 mod labeler; 9 + mod unspecced; 9 10 10 11 #[rustfmt::skip] 11 12 pub fn routes() -> Router<crate::GlobalState> { ··· 64 65 // TODO: app.bsky.notification.putActivitySubscriptions 65 66 // TODO: app.bsky.notification.putPreferences 66 67 // TODO: app.bsky.notification.putPreferencesV2 68 + .route("/app.bsky.unspecced.getPostThreadV2", get(unspecced::thread_v2::get_post_thread_v2)) 69 + .route("/app.bsky.unspecced.getPostThreadOtherV2", get(unspecced::thread_v2::get_post_thread_other_v2)) 67 70 } 68 71 69 72 async fn not_implemented() -> axum::http::StatusCode {
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
··· 1 + pub mod thread_v2;
+379
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
··· 1 + use crate::db::ThreadItem; 2 + use crate::hydration::StatefulHydrator; 3 + use crate::xrpc::error::{Error, XrpcResult}; 4 + use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 5 + use crate::xrpc::normalise_at_uri; 6 + use crate::GlobalState; 7 + use axum::extract::{Query, State}; 8 + use axum::Json; 9 + use itertools::Itertools; 10 + use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadgateView}; 11 + use lexica::app_bsky::unspecced::{ThreadItemPost, ThreadV2Item, ThreadV2ItemType}; 12 + use serde::{Deserialize, Serialize}; 13 + use std::cmp::Ordering; 14 + use std::collections::{HashMap, HashSet}; 15 + 16 + const THREAD_PARENTS: usize = 50; 17 + const DEFAULT_BRANCHING: u32 = 10; 18 + const DEFAULT_DEPTH: u32 = 6; 19 + 20 + #[derive(Copy, Clone, Debug, Default, Deserialize)] 21 + #[serde(rename_all = "lowercase")] 22 + pub enum PostThreadSort { 23 + Newest, 24 + #[default] 25 + Oldest, 26 + Top, 27 + } 28 + 29 + #[derive(Debug, Deserialize)] 30 + #[serde(rename_all = "camelCase")] 31 + pub struct GetPostThreadV2Req { 32 + pub anchor: String, 33 + pub above: Option<bool>, 34 + pub below: Option<u32>, 35 + pub branching_factor: Option<u32>, 36 + #[serde(default)] 37 + pub sort: PostThreadSort, 38 + } 39 + 40 + #[derive(Debug, Serialize)] 41 + #[serde(rename_all = "camelCase")] 42 + pub struct GetPostThreadV2Res { 43 + pub thread: Vec<ThreadV2Item>, 44 + #[serde(skip_serializing_if = "Option::is_none")] 45 + pub threadgate: Option<ThreadgateView>, 46 + pub has_other_replies: bool, 47 + } 48 + 49 + pub async fn get_post_thread_v2( 50 + State(state): State<GlobalState>, 51 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 52 + maybe_auth: Option<AtpAuth>, 53 + Query(query): Query<GetPostThreadV2Req>, 54 + ) -> XrpcResult<Json<GetPostThreadV2Res>> { 55 + let mut conn = state.pool.get().await?; 56 + let maybe_did = maybe_auth.clone().map(|v| v.0); 57 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 58 + 59 + let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 60 + let depth = query.below.unwrap_or(DEFAULT_DEPTH).clamp(0, 20) as i32; 61 + let branching_factor = query 62 + .branching_factor 63 + .unwrap_or(DEFAULT_BRANCHING) 64 + .clamp(0, 100) as i32; 65 + 66 + let anchor = hyd 67 + .hydrate_post(uri.clone()) 68 + .await 69 + .ok_or(Error::not_found())?; 70 + 71 + if let Some(v) = &anchor.author.viewer { 72 + if v.blocked_by || v.blocking.is_some() { 73 + let block = ThreadV2ItemType::Blocked { 74 + author: BlockedAuthor { 75 + did: anchor.author.did, 76 + viewer: anchor.author.viewer, 77 + }, 78 + }; 79 + 80 + return Ok(Json(GetPostThreadV2Res { 81 + thread: vec![ThreadV2Item { 82 + uri, 83 + depth: 0, 84 + value: block, 85 + }], 86 + threadgate: anchor.threadgate, 87 + has_other_replies: false, 88 + })); 89 + } 90 + } 91 + 92 + // get the root post URI (if there is one) and return its author's DID. 93 + let root_uri = crate::db::get_root_post(&mut conn, &uri) 94 + .await? 95 + .unwrap_or(uri.clone()); 96 + let root_did = root_uri[5..].split('/').collect::<Vec<_>>()[0]; 97 + 98 + let replies = 99 + crate::db::get_thread_children_branching(&mut conn, &uri, depth, branching_factor + 1) 100 + .await?; 101 + let reply_uris = replies 102 + .iter() 103 + .map(|item| item.at_uri.clone()) 104 + .collect::<Vec<_>>(); 105 + 106 + // bluesky seems to use -50 atm. we get 1 extra to know if to set more_parents. 107 + let parents = match query.above.unwrap_or(true) { 108 + true => crate::db::get_thread_parents(&mut conn, &uri, THREAD_PARENTS as i32 + 1).await?, 109 + false => vec![], 110 + }; 111 + let parent_uris = parents 112 + .iter() 113 + .map(|item| item.at_uri.clone()) 114 + .collect::<Vec<_>>(); 115 + 116 + let (mut replies_hyd, mut parents_hyd) = tokio::join!( 117 + hyd.hydrate_posts(reply_uris), 118 + hyd.hydrate_posts(parent_uris), 119 + ); 120 + 121 + let threadgate = anchor.threadgate.clone(); 122 + let hidden: HashSet<_, std::hash::RandomState> = match &threadgate { 123 + Some(tg) => crate::db::get_threadgate_hiddens(&mut conn, &tg.uri).await?, 124 + None => None, 125 + } 126 + .map(|hiddens| HashSet::from_iter(Vec::from(hiddens))) 127 + .unwrap_or_default(); 128 + 129 + let root_has_more = parents.len() > THREAD_PARENTS; 130 + let mut is_op_thread = true; 131 + 132 + let mut thread = Vec::with_capacity(1 + replies.len() + parents.len()); 133 + 134 + thread.extend( 135 + parents 136 + .into_iter() 137 + .tail(THREAD_PARENTS) 138 + .enumerate() 139 + .map(|(idx, item)| { 140 + let value = parents_hyd 141 + .remove(&item.at_uri) 142 + .map(|post| { 143 + if let Some(v) = &post.author.viewer { 144 + if v.blocked_by || v.blocking.is_some() { 145 + return ThreadV2ItemType::Blocked { 146 + author: BlockedAuthor { 147 + did: post.author.did, 148 + viewer: post.author.viewer, 149 + }, 150 + }; 151 + } 152 + } 153 + 154 + let op_thread = (is_op_thread 155 + || item.root_uri.is_none() && item.parent_uri.is_none()) 156 + && post.author.did == root_did; 157 + 158 + ThreadV2ItemType::Post(ThreadItemPost { 159 + post, 160 + more_parents: idx == 0 && root_has_more, 161 + more_replies: 0, 162 + op_thread, 163 + hidden_by_threadgate: false, 164 + muted_by_viewer: false, 165 + }) 166 + }) 167 + .unwrap_or(ThreadV2ItemType::NotFound {}); 168 + 169 + ThreadV2Item { 170 + uri: item.at_uri, 171 + depth: -item.depth - 1, 172 + value, 173 + } 174 + }), 175 + ); 176 + 177 + is_op_thread = is_op_thread && anchor.author.did == root_did; 178 + thread.push(ThreadV2Item { 179 + uri: uri.clone(), 180 + depth: 0, 181 + value: ThreadV2ItemType::Post(ThreadItemPost { 182 + post: anchor, 183 + more_parents: false, 184 + more_replies: 0, 185 + op_thread: is_op_thread, 186 + hidden_by_threadgate: false, 187 + muted_by_viewer: false, 188 + }), 189 + }); 190 + 191 + let mut replies_grouped = replies 192 + .into_iter() 193 + .into_group_map_by(|item| item.parent_uri.clone().unwrap_or_default()); 194 + 195 + // start with the anchor 196 + let (children, has_other_replies) = build_thread_children( 197 + &mut replies_grouped, 198 + &mut replies_hyd, 199 + &hidden, 200 + &uri, 201 + is_op_thread, 202 + 1, 203 + &BuildThreadChildrenOpts { 204 + root_did, 205 + sort: query.sort, 206 + maybe_did: &maybe_did, 207 + max_depth: depth, 208 + }, 209 + ); 210 + thread.extend(children); 211 + 212 + Ok(Json(GetPostThreadV2Res { 213 + thread, 214 + threadgate, 215 + has_other_replies, 216 + })) 217 + } 218 + 219 + #[derive(Debug, Deserialize)] 220 + #[serde(rename_all = "camelCase")] 221 + pub struct GetPostThreadOtherV2Req { 222 + pub anchor: String, 223 + } 224 + 225 + #[derive(Debug, Serialize)] 226 + #[serde(rename_all = "camelCase")] 227 + pub struct GetPostThreadOtherV2Res { 228 + pub thread: Vec<ThreadV2Item>, 229 + } 230 + 231 + pub async fn get_post_thread_other_v2( 232 + State(state): State<GlobalState>, 233 + AtpAcceptLabelers(labelers): AtpAcceptLabelers, 234 + maybe_auth: Option<AtpAuth>, 235 + Query(query): Query<GetPostThreadOtherV2Req>, 236 + ) -> XrpcResult<Json<GetPostThreadOtherV2Res>> { 237 + let mut conn = state.pool.get().await?; 238 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 239 + 240 + let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 241 + 242 + let root = crate::db::get_root_post(&mut conn, &uri) 243 + .await? 244 + .unwrap_or(uri.clone()); 245 + 246 + // this only returns immediate children (depth==1) where hiddenByThreadgate=TRUE 247 + let replies = crate::db::get_thread_children_hidden(&mut conn, &uri, &root).await?; 248 + let reply_uris = replies 249 + .into_iter() 250 + .map(|item| item.at_uri) 251 + .collect::<Vec<_>>(); 252 + let thread = hyd 253 + .hydrate_posts(reply_uris) 254 + .await 255 + .into_iter() 256 + .filter(|(_, post)| matches!(&post.author.viewer, Some(viewer) if viewer.blocked_by || viewer.blocking.is_some())) 257 + .map(|(uri, post)| { 258 + let post = ThreadItemPost { 259 + post, 260 + more_parents: false, 261 + more_replies: 0, 262 + op_thread: false, 263 + hidden_by_threadgate: true, 264 + muted_by_viewer: false, 265 + }; 266 + 267 + ThreadV2Item { 268 + uri, 269 + depth: 1, 270 + value: ThreadV2ItemType::Post(post), 271 + } 272 + }) 273 + .collect(); 274 + 275 + Ok(Json(GetPostThreadOtherV2Res { thread })) 276 + } 277 + 278 + #[derive(Debug)] 279 + struct BuildThreadChildrenOpts<'a> { 280 + root_did: &'a str, 281 + sort: PostThreadSort, 282 + maybe_did: &'a Option<String>, 283 + max_depth: i32, 284 + } 285 + 286 + fn build_thread_children( 287 + grouped_replies: &mut HashMap<String, Vec<ThreadItem>>, 288 + replies_hyd: &mut HashMap<String, PostView>, 289 + hidden: &HashSet<String>, 290 + parent: &str, 291 + is_op_thread: bool, 292 + depth: i32, 293 + opts: &BuildThreadChildrenOpts, 294 + ) -> (Vec<ThreadV2Item>, bool) { 295 + let mut has_other_replies = false; 296 + 297 + let Some(replies) = grouped_replies.remove(parent) else { 298 + return (Vec::default(), has_other_replies); 299 + }; 300 + 301 + let replies = replies 302 + .into_iter() 303 + .filter_map(|item| replies_hyd.remove(&item.at_uri)) 304 + .sorted_by(sort_replies(&opts.sort)); 305 + 306 + let mut out = Vec::new(); 307 + 308 + for post in replies { 309 + let reply_count = grouped_replies 310 + .get(&post.uri) 311 + .map(|v| v.len()) 312 + .unwrap_or_default(); 313 + let at_max = depth == opts.max_depth; 314 + let more_replies = if at_max { reply_count } else { 0 }; 315 + let op_thread = is_op_thread && post.author.did == opts.root_did; 316 + 317 + // shouldn't push to the thread if there's a block relation. Bsky doesn't push a type of Blocked for replies... 318 + if let Some(v) = &post.author.viewer { 319 + if v.blocked_by || v.blocking.is_some() { 320 + continue; 321 + } 322 + } 323 + 324 + // check if the post is hidden AND we're NOT the author (hidden posts still show for their author) 325 + if hidden.contains(&post.uri) && !did_is_cur(opts.maybe_did, &post.author.did) { 326 + // post is hidden - do not ~pass go~ push to the thread. 327 + if depth == 1 { 328 + has_other_replies = true; 329 + } 330 + continue; 331 + } 332 + 333 + let uri = post.uri.clone(); 334 + out.push(ThreadV2Item { 335 + uri: post.uri.clone(), 336 + depth, 337 + value: ThreadV2ItemType::Post(ThreadItemPost { 338 + post, 339 + more_parents: false, 340 + more_replies: more_replies as i32, 341 + op_thread, 342 + hidden_by_threadgate: false, 343 + muted_by_viewer: false, 344 + }), 345 + }); 346 + 347 + if !at_max { 348 + // we don't care about has_other_replies when recursing 349 + let (children, _) = build_thread_children( 350 + grouped_replies, 351 + replies_hyd, 352 + hidden, 353 + &uri, 354 + op_thread, 355 + depth + 1, 356 + opts, 357 + ); 358 + 359 + out.extend(children); 360 + } 361 + } 362 + 363 + (out, has_other_replies) 364 + } 365 + 366 + fn sort_replies(sort: &PostThreadSort) -> impl Fn(&PostView, &PostView) -> Ordering + use<'_> { 367 + move |a: &PostView, b: &PostView| match sort { 368 + PostThreadSort::Newest => b.indexed_at.cmp(&a.indexed_at), 369 + PostThreadSort::Oldest => a.indexed_at.cmp(&b.indexed_at), 370 + PostThreadSort::Top => b.stats.like_count.cmp(&a.stats.like_count), 371 + } 372 + } 373 + 374 + fn did_is_cur(cur: &Option<String>, did: &String) -> bool { 375 + match cur { 376 + Some(cur) => did == cur, 377 + None => false, 378 + } 379 + }
+1 -1
parakeet/src/xrpc/community_lexicon/bookmarks.rs
··· 60 60 .into_iter() 61 61 .map(|bookmark| Bookmark { 62 62 subject: bookmark.subject, 63 - tags: bookmark.tags.into_iter().flatten().collect(), 63 + tags: bookmark.tags.into(), 64 64 created_at: bookmark.created_at, 65 65 }) 66 66 .collect();
+3
parakeet/src/xrpc/jwt.rs
··· 4 4 use std::collections::HashMap; 5 5 use std::sync::{Arc, LazyLock}; 6 6 use tokio::sync::RwLock; 7 + use tracing::instrument; 7 8 8 9 static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[])); 9 10 static NO_VERIFY: LazyLock<Validation> = LazyLock::new(|| { ··· 38 39 } 39 40 } 40 41 42 + #[instrument(skip_all)] 41 43 pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> { 42 44 // first we need to decode without verifying, to get iss. 43 45 let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?; ··· 56 58 self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud) 57 59 } 58 60 61 + #[instrument(skip_all)] 59 62 async fn resolve_key(&self, did: &str) -> Option<String> { 60 63 tracing::trace!("resolving multikey for {did}"); 61 64 let did_doc = self.resolver.resolve_did(did).await.ok()??;
+55 -13
parakeet-db/src/models.rs
··· 137 137 138 138 pub content: String, 139 139 pub facets: Option<serde_json::Value>, 140 - pub languages: Vec<Option<String>>, 141 - pub tags: Vec<Option<String>>, 140 + pub languages: not_null_vec::TextArray, 141 + pub tags: not_null_vec::TextArray, 142 142 143 143 pub parent_uri: Option<String>, 144 144 pub parent_cid: Option<String>, ··· 148 148 pub embed: Option<String>, 149 149 pub embed_subtype: Option<String>, 150 150 151 - pub mentions: Option<Vec<Option<String>>>, 151 + pub mentions: Option<not_null_vec::TextArray>, 152 152 pub violates_threadgate: bool, 153 153 154 154 pub created_at: DateTime<Utc>, ··· 236 236 pub cid: String, 237 237 pub post_uri: String, 238 238 239 - pub detached: Vec<Option<String>>, 240 - pub rules: Vec<Option<String>>, 239 + pub detached: not_null_vec::TextArray, 240 + pub rules: not_null_vec::TextArray, 241 241 242 242 pub created_at: DateTime<Utc>, 243 243 pub indexed_at: NaiveDateTime, ··· 252 252 pub cid: String, 253 253 pub post_uri: String, 254 254 255 - pub hidden_replies: Vec<Option<String>>, 256 - pub allow: Option<Vec<Option<String>>>, 257 - pub allowed_lists: Option<Vec<Option<String>>>, 255 + pub hidden_replies: not_null_vec::TextArray, 256 + pub allow: Option<not_null_vec::TextArray>, 257 + pub allowed_lists: Option<not_null_vec::TextArray>, 258 258 259 259 pub record: serde_json::Value, 260 260 ··· 276 276 pub description: Option<String>, 277 277 pub description_facets: Option<serde_json::Value>, 278 278 pub list: String, 279 - pub feeds: Option<Vec<Option<String>>>, 279 + pub feeds: Option<not_null_vec::TextArray>, 280 280 281 281 pub created_at: DateTime<Utc>, 282 282 pub indexed_at: NaiveDateTime, ··· 290 290 pub did: String, 291 291 pub cid: String, 292 292 293 - pub reasons: Option<Vec<Option<String>>>, 294 - pub subject_types: Option<Vec<Option<String>>>, 295 - pub subject_collections: Option<Vec<Option<String>>>, 293 + pub reasons: Option<not_null_vec::TextArray>, 294 + pub subject_types: Option<not_null_vec::TextArray>, 295 + pub subject_collections: Option<not_null_vec::TextArray>, 296 296 297 297 pub created_at: NaiveDateTime, 298 298 pub indexed_at: NaiveDateTime, ··· 402 402 pub subject: String, 403 403 pub subject_cid: Option<String>, 404 404 pub subject_type: String, 405 - pub tags: Vec<Option<String>>, 405 + pub tags: not_null_vec::TextArray, 406 406 pub created_at: DateTime<Utc>, 407 407 } 408 408 ··· 430 430 pub typ: String, 431 431 pub sort_at: DateTime<Utc>, 432 432 } 433 + 434 + pub use not_null_vec::TextArray; 435 + mod not_null_vec { 436 + use diesel::deserialize::FromSql; 437 + use diesel::pg::Pg; 438 + use diesel::sql_types::{Array, Nullable, Text}; 439 + use diesel::{deserialize, FromSqlRow}; 440 + use serde::{Deserialize, Serialize}; 441 + use std::ops::{Deref, DerefMut}; 442 + 443 + #[derive(Clone, Debug, Default, Serialize, Deserialize, FromSqlRow)] 444 + #[diesel(sql_type = Array<Nullable<Text>>)] 445 + pub struct TextArray(pub Vec<String>); 446 + 447 + impl FromSql<Array<Nullable<Text>>, Pg> for TextArray { 448 + fn from_sql(bytes: diesel::pg::PgValue<'_>) -> deserialize::Result<Self> { 449 + let vec_with_nulls = 450 + <Vec<Option<String>> as FromSql<Array<Nullable<Text>>, Pg>>::from_sql(bytes)?; 451 + Ok(TextArray(vec_with_nulls.into_iter().flatten().collect())) 452 + } 453 + } 454 + 455 + impl Deref for TextArray { 456 + type Target = Vec<String>; 457 + 458 + fn deref(&self) -> &Self::Target { 459 + &self.0 460 + } 461 + } 462 + 463 + impl DerefMut for TextArray { 464 + fn deref_mut(&mut self) -> &mut Self::Target { 465 + &mut self.0 466 + } 467 + } 468 + 469 + impl From<TextArray> for Vec<String> { 470 + fn from(v: TextArray) -> Vec<String> { 471 + v.0 472 + } 473 + } 474 + }
+24 -2
parakeet-index/Cargo.toml
··· 10 10 [dependencies] 11 11 tonic = "0.13.0" 12 12 prost = "0.13.5" 13 + tonic-tracing-opentelemetry = { version = "0.32", optional = true } 14 + tower = { version = "0.5", optional = true } 13 15 14 16 eyre = { version = "0.6.12", optional = true } 15 17 figment = { version = "0.10.19", features = ["env", "toml"], optional = true } 16 18 itertools = { version = "0.14.0", optional = true } 19 + opentelemetry = { version = "0.31.0", optional = true } 20 + opentelemetry-otlp = { version = "0.31.0", features = ["reqwest-rustls"], optional = true } 21 + opentelemetry_sdk = { version = "0.31.0", optional = true } 17 22 rocksdb = { version = "0.23", default-features = false, features = ["lz4", "bindgen-runtime"], optional = true } 18 23 serde = { version = "1.0.217", features = ["derive"], optional = true } 19 24 tokio = { version = "1.42.0", features = ["full"], optional = true } 20 25 tonic-health = { version = "0.13.0", optional = true } 21 26 tracing = { version = "0.1.40", optional = true } 22 - tracing-subscriber = { version = "0.3.18", optional = true } 27 + tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true } 28 + tracing-opentelemetry = { version = "0.32", optional = true } 23 29 24 30 [build-dependencies] 25 31 tonic-build = "0.13.0" 26 32 27 33 [features] 28 - server = ["dep:eyre", "dep:figment", "dep:itertools", "dep:rocksdb", "dep:serde", "dep:tokio", "dep:tonic-health", "dep:tracing", "dep:tracing-subscriber"] 34 + otel = ["dep:tonic-tracing-opentelemetry", "dep:tower"] 35 + server = [ 36 + "dep:eyre", 37 + "dep:figment", 38 + "dep:itertools", 39 + "dep:opentelemetry", 40 + "dep:opentelemetry-otlp", 41 + "dep:opentelemetry_sdk", 42 + "dep:rocksdb", 43 + "dep:serde", 44 + "dep:tokio", 45 + "dep:tonic-health", 46 + "otel", 47 + "dep:tracing", 48 + "dep:tracing-subscriber", 49 + "dep:tracing-opentelemetry" 50 + ]
+20 -1
parakeet-index/src/lib.rs
··· 1 + use tonic::transport::Channel; 2 + 1 3 #[allow(clippy::all)] 2 4 pub mod index { 3 5 tonic::include_proto!("parakeet"); 4 6 } 5 7 6 8 pub use index::*; 7 - pub type Client = index_client::IndexClient<tonic::transport::Channel>; 9 + #[cfg(not(feature = "otel"))] 10 + pub type Client = index_client::IndexClient<Channel>; 11 + #[cfg(feature = "otel")] 12 + pub type Client = index_client::IndexClient< 13 + tonic_tracing_opentelemetry::middleware::client::OtelGrpcService<Channel>, 14 + >; 8 15 9 16 #[cfg(feature = "server")] 10 17 pub mod server; 18 + 19 + #[cfg(feature = "otel")] 20 + pub async fn connect_with_otel( 21 + uri: String, 22 + ) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> { 23 + let channel = Channel::from_shared(uri)?.connect().await?; 24 + let channel = tower::ServiceBuilder::new() 25 + .layer(tonic_tracing_opentelemetry::middleware::client::OtelGrpcLayer) 26 + .service(channel); 27 + 28 + Ok(index_client::IndexClient::new(channel)) 29 + }
+9 -3
parakeet-index/src/main.rs
··· 1 1 use parakeet_index::index_server::IndexServer; 2 2 use parakeet_index::server::service::Service; 3 - use parakeet_index::server::{GlobalState, config}; 3 + use parakeet_index::server::{GlobalState, config, instrumentation}; 4 4 use std::sync::Arc; 5 5 use tonic::transport::Server; 6 + use tonic_tracing_opentelemetry::middleware::server::OtelGrpcLayer; 6 7 7 8 #[tokio::main] 8 9 async fn main() -> eyre::Result<()> { 9 - tracing_subscriber::fmt::init(); 10 - 11 10 let conf = config::load_config()?; 12 11 12 + instrumentation::init_instruments(&conf.instruments); 13 + 13 14 let db_root = conf.index_db_path.parse()?; 14 15 let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port); 15 16 let state = Arc::new(GlobalState::new(db_root)?); ··· 18 19 reporter.set_serving::<IndexServer<Service>>().await; 19 20 20 21 let service = Service::new(state.clone()); 22 + 23 + let mw = tower::ServiceBuilder::new() 24 + .option_layer(conf.instruments.otel_enable.then(OtelGrpcLayer::default)); 25 + 21 26 Server::builder() 27 + .layer(mw) 22 28 .add_service(health_service) 23 29 .add_service(IndexServer::new(service)) 24 30 .serve(addr)
+10
parakeet-index/src/server/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 + #[serde(flatten)] 17 + pub instruments: ConfigInstruments, 16 18 pub database_url: String, 17 19 pub index_db_path: String, 18 20 #[serde(default)] 19 21 pub server: ConfigServer, 22 + } 23 + 24 + #[derive(Debug, Deserialize)] 25 + pub struct ConfigInstruments { 26 + #[serde(default)] 27 + pub otel_enable: bool, 28 + #[serde(default)] 29 + pub log_json: bool, 20 30 } 21 31 22 32 #[derive(Debug, Deserialize)]
+57
parakeet-index/src/server/instrumentation.rs
··· 1 + use opentelemetry::trace::TracerProvider; 2 + use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; 3 + use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider}; 4 + use tracing::Subscriber; 5 + use tracing_opentelemetry::OpenTelemetryLayer; 6 + use tracing_subscriber::filter::Filtered; 7 + use tracing_subscriber::layer::SubscriberExt; 8 + use tracing_subscriber::registry::LookupSpan; 9 + use tracing_subscriber::util::SubscriberInitExt; 10 + use tracing_subscriber::{EnvFilter, Layer}; 11 + 12 + pub fn init_instruments(cfg: &super::config::ConfigInstruments) { 13 + let otel_layer = cfg.otel_enable.then(init_otel); 14 + let log_layer = init_log(cfg.log_json); 15 + 16 + tracing_subscriber::registry() 17 + .with(log_layer) 18 + .with(otel_layer) 19 + .init(); 20 + } 21 + 22 + fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S> 23 + where 24 + S: Subscriber + for<'span> LookupSpan<'span>, 25 + { 26 + let span_exporter = SpanExporter::builder() 27 + .with_http() 28 + .with_protocol(Protocol::HttpBinary) 29 + .build() 30 + .unwrap(); 31 + 32 + let tracer_provider = SdkTracerProvider::builder() 33 + .with_batch_exporter(span_exporter) 34 + .with_sampler(Sampler::AlwaysOn) 35 + .build(); 36 + 37 + opentelemetry::global::set_tracer_provider(tracer_provider.clone()); 38 + 39 + let tracer = tracer_provider.tracer("parakeet"); 40 + let otel_filter = EnvFilter::new("info,otel::tracing=trace"); 41 + 42 + OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 43 + } 44 + 45 + fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 46 + where 47 + S: Subscriber + for<'span> LookupSpan<'span>, 48 + { 49 + let stdout_filter = 50 + EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 51 + 52 + match json { 53 + true => tracing_subscriber::fmt::layer().json().boxed(), 54 + false => tracing_subscriber::fmt::layer().boxed(), 55 + } 56 + .with_filter(stdout_filter) 57 + }
+1
parakeet-index/src/server/mod.rs
··· 2 2 3 3 pub mod config; 4 4 pub mod db; 5 + pub mod instrumentation; 5 6 pub mod service; 6 7 mod utils; 7 8