Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

Compare changes

Choose any two refs to compare.

+12 -364
Cargo.lock
··· 286 286 ] 287 287 288 288 [[package]] 289 - name = "axum-tracing-opentelemetry" 290 - version = "0.32.1" 291 - source = "registry+https://github.com/rust-lang/crates.io-index" 292 - checksum = "328c8ddd5ca871b2a5acb00be0b4f103aa62f5d6b6db4071ccf3b12b0629e7c1" 293 - dependencies = [ 294 - "axum", 295 - "futures-core", 296 - "futures-util", 297 - "http", 298 - "opentelemetry", 299 - "opentelemetry-semantic-conventions", 300 - "pin-project-lite", 301 - "tower", 302 - "tracing", 303 - "tracing-opentelemetry", 304 - "tracing-opentelemetry-instrumentation-sdk", 305 - ] 306 - 307 - [[package]] 308 289 name = "backtrace" 309 290 version = "0.3.74" 310 291 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 366 347 "proc-macro2", 367 348 "quote", 368 349 "regex", 369 - "rustc-hash 1.1.0", 350 + "rustc-hash", 370 351 "shlex", 371 352 "syn", 372 353 "which", ··· 484 465 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" 485 466 486 467 [[package]] 487 - name = "cfg_aliases" 488 - version = "0.2.1" 489 - source = "registry+https://github.com/rust-lang/crates.io-index" 490 - checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" 491 - 492 - [[package]] 493 468 name = "chrono" 494 469 version = "0.4.41" 495 470 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1395 1370 checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" 1396 1371 dependencies = [ 1397 1372 "cfg-if", 1398 - "js-sys", 1399 1373 "libc", 1400 1374 "r-efi", 1401 1375 "wasi 0.14.2+wasi-0.2.4", 1402 - "wasm-bindgen", 1403 1376 ] 1404 1377 1405 1378 [[package]] ··· 2196 2169 ] 2197 2170 2198 2171 [[package]] 2199 - name = "lru-slab" 2200 - version = "0.1.2" 2201 - source = "registry+https://github.com/rust-lang/crates.io-index" 2202 - checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" 2203 - 2204 - [[package]] 2205 2172 name = "lz4-sys" 2206 2173 version = "1.11.1+lz4-1.10.0" 2207 2174 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2218 2185 checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" 2219 2186 2220 2187 [[package]] 2221 - name = "matchers" 2222 - version = "0.1.0" 2223 - source = "registry+https://github.com/rust-lang/crates.io-index" 2224 - checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" 2225 - dependencies = [ 2226 - "regex-automata 0.1.10", 2227 - ] 2228 - 2229 - [[package]] 2230 2188 name = "matchit" 2231 2189 version = "0.8.4" 2232 2190 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2555 2513 ] 2556 2514 2557 2515 [[package]] 2558 - name = "opentelemetry" 2559 - version = "0.31.0" 2560 - source = "registry+https://github.com/rust-lang/crates.io-index" 2561 - checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" 2562 - dependencies = [ 2563 - "futures-core", 2564 - "futures-sink", 2565 - "js-sys", 2566 - "pin-project-lite", 2567 - "thiserror 2.0.12", 2568 - "tracing", 2569 - ] 2570 - 2571 - [[package]] 2572 - name = "opentelemetry-http" 2573 - version = "0.31.0" 2574 - source = "registry+https://github.com/rust-lang/crates.io-index" 2575 - checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" 2576 - dependencies = [ 2577 - "async-trait", 2578 - "bytes", 2579 - "http", 2580 - "opentelemetry", 2581 - "reqwest", 2582 - ] 2583 - 2584 - [[package]] 2585 - name = "opentelemetry-otlp" 2586 - version = "0.31.0" 2587 - source = "registry+https://github.com/rust-lang/crates.io-index" 2588 - checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" 2589 - dependencies = [ 2590 - "http", 2591 - "opentelemetry", 2592 - "opentelemetry-http", 2593 - "opentelemetry-proto", 2594 - "opentelemetry_sdk", 2595 - "prost 0.14.1", 2596 - "reqwest", 2597 - "thiserror 2.0.12", 2598 - "tracing", 2599 - ] 2600 - 2601 - [[package]] 2602 - name = "opentelemetry-proto" 2603 - version = "0.31.0" 2604 - source = "registry+https://github.com/rust-lang/crates.io-index" 2605 - checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" 2606 - dependencies = [ 2607 - "opentelemetry", 2608 - "opentelemetry_sdk", 2609 - "prost 0.14.1", 2610 - "tonic 0.14.2", 2611 - "tonic-prost", 2612 - ] 2613 - 2614 - [[package]] 2615 - name = "opentelemetry-semantic-conventions" 2616 - version = "0.31.0" 2617 - source = "registry+https://github.com/rust-lang/crates.io-index" 2618 - checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" 2619 - 2620 - [[package]] 2621 - name = "opentelemetry_sdk" 2622 - version = "0.31.0" 2623 - source = "registry+https://github.com/rust-lang/crates.io-index" 2624 - checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" 2625 - dependencies = [ 2626 - "futures-channel", 2627 - "futures-executor", 2628 - "futures-util", 2629 - "opentelemetry", 2630 - "percent-encoding", 2631 - "rand 0.9.1", 2632 - "thiserror 2.0.12", 2633 - ] 2634 - 2635 - [[package]] 2636 2516 name = "overload" 2637 2517 version = "0.1.1" 2638 2518 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2669 2549 "async-recursion", 2670 2550 "axum", 2671 2551 "axum-extra", 2672 - "axum-tracing-opentelemetry", 2673 2552 "base64 0.22.1", 2674 2553 "chrono", 2675 2554 "dataloader", ··· 2684 2563 "jsonwebtoken", 2685 2564 "lexica", 2686 2565 "multibase", 2687 - "opentelemetry", 2688 - "opentelemetry-otlp", 2689 - "opentelemetry_sdk", 2690 2566 "parakeet-db", 2691 2567 "parakeet-index", 2692 2568 "redis", ··· 2695 2571 "serde_ipld_dagcbor", 2696 2572 "serde_json", 2697 2573 "tokio", 2698 - "tower", 2699 2574 "tower-http", 2700 2575 "tracing", 2701 - "tracing-opentelemetry", 2702 2576 "tracing-subscriber", 2703 2577 ] 2704 2578 ··· 2720 2594 "eyre", 2721 2595 "figment", 2722 2596 "itertools 0.14.0", 2723 - "opentelemetry", 2724 - "opentelemetry-otlp", 2725 - "opentelemetry_sdk", 2726 - "prost 0.13.5", 2597 + "prost", 2727 2598 "rocksdb", 2728 2599 "serde", 2729 2600 "tokio", 2730 - "tonic 0.13.1", 2601 + "tonic", 2731 2602 "tonic-build", 2732 2603 "tonic-health", 2733 - "tonic-tracing-opentelemetry", 2734 - "tower", 2735 2604 "tracing", 2736 - "tracing-opentelemetry", 2737 2605 "tracing-subscriber", 2738 2606 ] 2739 2607 ··· 3039 2907 checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 3040 2908 dependencies = [ 3041 2909 "bytes", 3042 - "prost-derive 0.13.5", 3043 - ] 3044 - 3045 - [[package]] 3046 - name = "prost" 3047 - version = "0.14.1" 3048 - source = "registry+https://github.com/rust-lang/crates.io-index" 3049 - checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" 3050 - dependencies = [ 3051 - "bytes", 3052 - "prost-derive 0.14.1", 2910 + "prost-derive", 3053 2911 ] 3054 2912 3055 2913 [[package]] ··· 3065 2923 "once_cell", 3066 2924 "petgraph", 3067 2925 "prettyplease", 3068 - "prost 0.13.5", 2926 + "prost", 3069 2927 "prost-types", 3070 2928 "regex", 3071 2929 "syn", ··· 3086 2944 ] 3087 2945 3088 2946 [[package]] 3089 - name = "prost-derive" 3090 - version = "0.14.1" 3091 - source = "registry+https://github.com/rust-lang/crates.io-index" 3092 - checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" 3093 - dependencies = [ 3094 - "anyhow", 3095 - "itertools 0.14.0", 3096 - "proc-macro2", 3097 - "quote", 3098 - "syn", 3099 - ] 3100 - 3101 - [[package]] 3102 2947 name = "prost-types" 3103 2948 version = "0.13.5" 3104 2949 source = "registry+https://github.com/rust-lang/crates.io-index" 3105 2950 checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" 3106 2951 dependencies = [ 3107 - "prost 0.13.5", 2952 + "prost", 3108 2953 ] 3109 2954 3110 2955 [[package]] ··· 3129 2974 checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" 3130 2975 3131 2976 [[package]] 3132 - name = "quinn" 3133 - version = "0.11.9" 3134 - source = "registry+https://github.com/rust-lang/crates.io-index" 3135 - checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" 3136 - dependencies = [ 3137 - "bytes", 3138 - "cfg_aliases", 3139 - "pin-project-lite", 3140 - "quinn-proto", 3141 - "quinn-udp", 3142 - "rustc-hash 2.1.1", 3143 - "rustls", 3144 - "socket2 0.6.0", 3145 - "thiserror 2.0.12", 3146 - "tokio", 3147 - "tracing", 3148 - "web-time", 3149 - ] 3150 - 3151 - [[package]] 3152 - name = "quinn-proto" 3153 - version = "0.11.13" 3154 - source = "registry+https://github.com/rust-lang/crates.io-index" 3155 - checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" 3156 - dependencies = [ 3157 - "bytes", 3158 - "getrandom 0.3.3", 3159 - "lru-slab", 3160 - "rand 0.9.1", 3161 - "ring", 3162 - "rustc-hash 2.1.1", 3163 - "rustls", 3164 - "rustls-pki-types", 3165 - "slab", 3166 - "thiserror 2.0.12", 3167 - "tinyvec", 3168 - "tracing", 3169 - "web-time", 3170 - ] 3171 - 3172 - [[package]] 3173 - name = "quinn-udp" 3174 - version = "0.5.14" 3175 - source = "registry+https://github.com/rust-lang/crates.io-index" 3176 - checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" 3177 - dependencies = [ 3178 - "cfg_aliases", 3179 - "libc", 3180 - "once_cell", 3181 - "socket2 0.6.0", 3182 - "tracing", 3183 - "windows-sys 0.59.0", 3184 - ] 3185 - 3186 - [[package]] 3187 2977 name = "quote" 3188 2978 version = "1.0.38" 3189 2979 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3345 3135 dependencies = [ 3346 3136 "aho-corasick", 3347 3137 "memchr", 3348 - "regex-automata 0.4.9", 3349 - "regex-syntax 0.8.5", 3350 - ] 3351 - 3352 - [[package]] 3353 - name = "regex-automata" 3354 - version = "0.1.10" 3355 - source = "registry+https://github.com/rust-lang/crates.io-index" 3356 - checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" 3357 - dependencies = [ 3358 - "regex-syntax 0.6.29", 3138 + "regex-automata", 3139 + "regex-syntax", 3359 3140 ] 3360 3141 3361 3142 [[package]] ··· 3366 3147 dependencies = [ 3367 3148 "aho-corasick", 3368 3149 "memchr", 3369 - "regex-syntax 0.8.5", 3150 + "regex-syntax", 3370 3151 ] 3371 3152 3372 3153 [[package]] 3373 3154 name = "regex-syntax" 3374 - version = "0.6.29" 3375 - source = "registry+https://github.com/rust-lang/crates.io-index" 3376 - checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" 3377 - 3378 - [[package]] 3379 - name = "regex-syntax" 3380 3155 version = "0.8.5" 3381 3156 source = "registry+https://github.com/rust-lang/crates.io-index" 3382 3157 checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" ··· 3391 3166 "base64 0.22.1", 3392 3167 "bytes", 3393 3168 "encoding_rs", 3394 - "futures-channel", 3395 3169 "futures-core", 3396 3170 "futures-util", 3397 3171 "h2", ··· 3410 3184 "once_cell", 3411 3185 "percent-encoding", 3412 3186 "pin-project-lite", 3413 - "quinn", 3414 - "rustls", 3415 - "rustls-native-certs", 3416 3187 "rustls-pemfile", 3417 - "rustls-pki-types", 3418 3188 "serde", 3419 3189 "serde_json", 3420 3190 "serde_urlencoded", ··· 3422 3192 "system-configuration", 3423 3193 "tokio", 3424 3194 "tokio-native-tls", 3425 - "tokio-rustls", 3426 3195 "tokio-util", 3427 3196 "tower", 3428 3197 "tower-service", ··· 3512 3281 checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" 3513 3282 3514 3283 [[package]] 3515 - name = "rustc-hash" 3516 - version = "2.1.1" 3517 - source = "registry+https://github.com/rust-lang/crates.io-index" 3518 - checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 3519 - 3520 - [[package]] 3521 3284 name = "rustc_version" 3522 3285 version = "0.4.1" 3523 3286 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3547 3310 dependencies = [ 3548 3311 "aws-lc-rs", 3549 3312 "once_cell", 3550 - "ring", 3551 3313 "rustls-pki-types", 3552 3314 "rustls-webpki", 3553 3315 "subtle", ··· 3580 3342 version = "1.11.0" 3581 3343 source = "registry+https://github.com/rust-lang/crates.io-index" 3582 3344 checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" 3583 - dependencies = [ 3584 - "web-time", 3585 - ] 3586 3345 3587 3346 [[package]] 3588 3347 name = "rustls-webpki" ··· 4426 4185 "hyper-util", 4427 4186 "percent-encoding", 4428 4187 "pin-project", 4429 - "prost 0.13.5", 4188 + "prost", 4430 4189 "socket2 0.5.8", 4431 4190 "tokio", 4432 4191 "tokio-stream", 4433 4192 "tower", 4434 - "tower-layer", 4435 - "tower-service", 4436 - "tracing", 4437 - ] 4438 - 4439 - [[package]] 4440 - name = "tonic" 4441 - version = "0.14.2" 4442 - source = "registry+https://github.com/rust-lang/crates.io-index" 4443 - checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" 4444 - dependencies = [ 4445 - "async-trait", 4446 - "base64 0.22.1", 4447 - "bytes", 4448 - "http", 4449 - "http-body", 4450 - "http-body-util", 4451 - "percent-encoding", 4452 - "pin-project", 4453 - "sync_wrapper", 4454 - "tokio-stream", 4455 4193 "tower-layer", 4456 4194 "tower-service", 4457 4195 "tracing", ··· 4477 4215 source = "registry+https://github.com/rust-lang/crates.io-index" 4478 4216 checksum = "cb87334d340313fefa513b6e60794d44a86d5f039b523229c99c323e4e19ca4b" 4479 4217 dependencies = [ 4480 - "prost 0.13.5", 4218 + "prost", 4481 4219 "tokio", 4482 4220 "tokio-stream", 4483 - "tonic 0.13.1", 4484 - ] 4485 - 4486 - [[package]] 4487 - name = "tonic-prost" 4488 - version = "0.14.2" 4489 - source = "registry+https://github.com/rust-lang/crates.io-index" 4490 - checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" 4491 - dependencies = [ 4492 - "bytes", 4493 - "prost 0.14.1", 4494 - "tonic 0.14.2", 4495 - ] 4496 - 4497 - [[package]] 4498 - name = "tonic-tracing-opentelemetry" 4499 - version = "0.32.0" 4500 - source = "registry+https://github.com/rust-lang/crates.io-index" 4501 - checksum = "31f57ac46b32b08989476b498239364c300b09d75928c1fa2e46cb489a41c8e3" 4502 - dependencies = [ 4503 - "futures-core", 4504 - "futures-util", 4505 - "http", 4506 - "http-body", 4507 - "hyper", 4508 - "opentelemetry", 4509 - "pin-project-lite", 4510 - "tonic 0.14.2", 4511 - "tower", 4512 - "tracing", 4513 - "tracing-opentelemetry", 4514 - "tracing-opentelemetry-instrumentation-sdk", 4221 + "tonic", 4515 4222 ] 4516 4223 4517 4224 [[package]] ··· 4606 4313 ] 4607 4314 4608 4315 [[package]] 4609 - name = "tracing-opentelemetry" 4610 - version = "0.32.0" 4611 - source = "registry+https://github.com/rust-lang/crates.io-index" 4612 - checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" 4613 - dependencies = [ 4614 - "js-sys", 4615 - "opentelemetry", 4616 - "opentelemetry_sdk", 4617 - "rustversion", 4618 - "smallvec", 4619 - "thiserror 2.0.12", 4620 - "tracing", 4621 - "tracing-core", 4622 - "tracing-log", 4623 - "tracing-subscriber", 4624 - "web-time", 4625 - ] 4626 - 4627 - [[package]] 4628 - name = "tracing-opentelemetry-instrumentation-sdk" 4629 - version = "0.32.1" 4630 - source = "registry+https://github.com/rust-lang/crates.io-index" 4631 - checksum = "7a1a4dcfb798af2cef9e47c30a14e13c108b4b40e057120401b2025ec622c416" 4632 - dependencies = [ 4633 - "http", 4634 - "opentelemetry", 4635 - "opentelemetry-semantic-conventions", 4636 - "tracing", 4637 - "tracing-opentelemetry", 4638 - ] 4639 - 4640 - [[package]] 4641 - name = "tracing-serde" 4642 - version = "0.2.0" 4643 - source = "registry+https://github.com/rust-lang/crates.io-index" 4644 - checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" 4645 - dependencies = [ 4646 - "serde", 4647 - "tracing-core", 4648 - ] 4649 - 4650 - [[package]] 4651 4316 name = "tracing-subscriber" 4652 4317 version = "0.3.19" 4653 4318 source = "registry+https://github.com/rust-lang/crates.io-index" 4654 4319 checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" 4655 4320 dependencies = [ 4656 - "matchers", 4657 4321 "nu-ansi-term", 4658 - "once_cell", 4659 - "regex", 4660 - "serde", 4661 - "serde_json", 4662 4322 "sharded-slab", 4663 4323 "smallvec", 4664 4324 "thread_local", 4665 - "tracing", 4666 4325 "tracing-core", 4667 4326 "tracing-log", 4668 - "tracing-serde", 4669 4327 ] 4670 4328 4671 4329 [[package]] ··· 4934 4592 version = "0.3.77" 4935 4593 source = "registry+https://github.com/rust-lang/crates.io-index" 4936 4594 checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" 4937 - dependencies = [ 4938 - "js-sys", 4939 - "wasm-bindgen", 4940 - ] 4941 - 4942 - [[package]] 4943 - name = "web-time" 4944 - version = "1.1.0" 4945 - source = "registry+https://github.com/rust-lang/crates.io-index" 4946 - checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" 4947 4595 dependencies = [ 4948 4596 "js-sys", 4949 4597 "wasm-bindgen",
+1 -1
consumer/Cargo.toml
··· 36 36 tokio-tungstenite = { version = "0.26.1", features = ["native-tls"] } 37 37 tokio-util = { version = "0.7.14", features = ["io", "rt"] } 38 38 tracing = "0.1.40" 39 - tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } 39 + tracing-subscriber = "0.3.18"
+3 -3
consumer/src/backfill/downloader.rs
··· 84 84 85 85 // has the repo already been downloaded? 86 86 if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() { 87 - tracing::info!("skipping duplicate repo {did}"); 87 + tracing::warn!("skipping duplicate repo {did}"); 88 88 continue; 89 89 } 90 90 ··· 92 92 match db::actor_get_statuses(&mut conn, &did).await { 93 93 Ok(Some((_, state))) => { 94 94 if state == ActorSyncState::Synced || state == ActorSyncState::Processing { 95 - tracing::info!("skipping duplicate repo {did}"); 95 + tracing::warn!("skipping duplicate repo {did}"); 96 96 continue; 97 97 } 98 98 } ··· 206 206 let _ = rc.zadd(BF_REM_KEY, &pds, rem).await; 207 207 let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await; 208 208 } 209 - Ok(_) => tracing::debug!(pds, "got response with no ratelimit headers."), 209 + Ok(_) => tracing::warn!(pds, "got response with no ratelimit headers."), 210 210 Err(e) => { 211 211 tracing::error!(pds, did, "failed to download repo: {e}"); 212 212 continue;
+5 -4
consumer/src/backfill/repo.rs
··· 53 53 54 54 match block { 55 55 CarEntry::Commit(_) => { 56 - tracing::debug!("got commit entry that was not in root") 56 + tracing::warn!("got commit entry that was not in root") 57 57 } 58 58 CarEntry::Record(CarRecordEntry::Known(record)) => { 59 59 if let Some(path) = mst_nodes.remove(&cid) { ··· 96 96 } 97 97 } 98 98 99 - let Some(commit) = commit else { 100 - eyre::bail!("repo contained no commit?"); 101 - }; 99 + let commit = commit.unwrap(); 102 100 103 101 Ok((commit, deltas, copies)) 104 102 } ··· 171 169 } 172 170 RecordTypes::AppBskyFeedThreadgate(record) => { 173 171 if !at_uri_is_by(&record.post, did) { 172 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 174 173 return Ok(()); 175 174 } 176 175 ··· 195 194 RecordTypes::AppBskyGraphListItem(rec) => { 196 195 let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>(); 197 196 if did != split_aturi[2] { 197 + // it's also probably a bad idea to log *all* the attempts to do this... 198 + tracing::warn!("tried to create a listitem on a list we don't control!"); 198 199 return Ok(()); 199 200 } 200 201
-8
consumer/src/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 - #[serde(flatten)] 17 - pub instruments: ConfigInstruments, 18 16 pub index_uri: String, 19 17 pub database: deadpool_postgres::Config, 20 18 pub redis_uri: String, ··· 29 27 pub indexer: Option<IndexerConfig>, 30 28 /// Configuration items specific to backfill 31 29 pub backfill: Option<BackfillConfig>, 32 - } 33 - 34 - #[derive(Debug, Deserialize)] 35 - pub struct ConfigInstruments { 36 - #[serde(default)] 37 - pub log_json: bool, 38 30 } 39 31 40 32 #[derive(Debug, Deserialize)]
+3 -3
consumer/src/db/actor.rs
··· 69 69 ) 70 70 .await?; 71 71 72 - res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 72 + Ok(res.map(|v| (v.get(0), v.get(1)))) 73 73 } 74 74 75 75 pub async fn actor_get_repo_status<C: GenericClient>( ··· 83 83 ) 84 84 .await?; 85 85 86 - res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 86 + Ok(res.map(|v| (v.get(0), v.get(1)))) 87 87 } 88 88 89 89 pub async fn actor_get_statuses<C: GenericClient>( ··· 97 97 ) 98 98 .await?; 99 99 100 - res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 100 + Ok(res.map(|v| (v.get(0), v.get(1)))) 101 101 }
+9 -10
consumer/src/db/backfill.rs
··· 51 51 ) 52 52 .await?; 53 53 54 - res.into_iter() 55 - .map(|row| { 56 - Ok(BackfillRow { 57 - repo: row.try_get(0)?, 58 - repo_ver: row.try_get(1)?, 59 - cid: row.try_get(2)?, 60 - data: row.try_get(3)?, 61 - indexed_at: row.try_get(4)?, 62 - }) 54 + Ok(res 55 + .into_iter() 56 + .map(|row| BackfillRow { 57 + repo: row.get(0), 58 + repo_ver: row.get(1), 59 + cid: row.get(2), 60 + data: row.get(3), 61 + indexed_at: row.get(4), 63 62 }) 64 - .collect() 63 + .collect()) 65 64 } 66 65 67 66 pub async fn backfill_delete_rows<C: GenericClient>(conn: &mut C, repo: &str) -> PgExecResult {
+1 -1
consumer/src/db/copy.rs
··· 205 205 ) 206 206 .await? 207 207 .into_iter() 208 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).collect::<Result<_, _>>()?; 208 + .map(|v| (v.get(0), v.get(1), v.get(2))).collect(); 209 209 210 210 for (root, post, created_at) in threadgated { 211 211 match super::post_enforce_threadgate(conn, &root, did, created_at, true).await {
+13 -18
consumer/src/db/gates.rs
··· 47 47 &[&root_author, &post_author], 48 48 ) 49 49 .await? 50 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()?; 50 + .map(|v| (v.get(0), v.get(1))); 51 51 52 52 if let Some((following, followed)) = profile_state { 53 53 if allow.contains(THREADGATE_RULE_FOLLOWER) && followed { ··· 65 65 let mentions: Vec<String> = conn 66 66 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 67 67 .await? 68 - .and_then(|r| r.try_get::<_, Option<_>>(0).transpose()) 69 - .transpose()? 68 + .map(|r| r.get(0)) 70 69 .unwrap_or_default(); 71 70 72 71 if mentions.contains(&post_author.to_owned()) { ··· 85 84 &[&allow_lists, &post_author], 86 85 ) 87 86 .await? 88 - .try_get(0)?; 87 + .get(0); 89 88 if count != 0 { 90 89 return Ok(false); 91 90 } ··· 137 136 ) 138 137 .await? 139 138 .into_iter() 140 - .map(|row| row.try_get(0)) 141 - .collect::<Result<_, _>>()?; 139 + .map(|row| row.get(0)) 140 + .collect(); 142 141 143 142 // this will be empty if there are no replies. 144 143 if dids.is_empty() { ··· 153 152 }) 154 153 .collect::<Vec<_>>(); 155 154 156 - let allow: HashSet<_> = HashSet::from_iter(allow.iter().map(|v| v.as_str())); 155 + let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str())); 157 156 158 157 if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() { 159 158 let current_dids: Vec<_> = dids.iter().collect(); ··· 161 160 let res = conn.query( 162 161 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", 163 162 &[&root_author, &current_dids] 164 - ).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<HashSet<_>, _>>()?; 163 + ).await?; 165 164 166 - dids = &dids - &res; 165 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 167 166 } 168 167 169 168 if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { ··· 172 171 let res = conn.query( 173 172 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", 174 173 &[&root_author, &current_dids] 175 - ).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<_, _>>()?; 174 + ).await?; 176 175 177 - dids = &dids - &res; 176 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 178 177 } 179 178 180 179 if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { 181 180 let mentions: Vec<String> = conn 182 181 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 183 182 .await? 184 - .and_then(|r| r.try_get::<_, Option<_>>(0).transpose()) 185 - .transpose()? 183 + .map(|r| r.get(0)) 186 184 .unwrap_or_default(); 187 185 188 186 dids = &dids - &HashSet::from_iter(mentions); ··· 196 194 "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", 197 195 &[&allowed_lists, &current_dids], 198 196 ) 199 - .await? 200 - .into_iter() 201 - .map(|row| row.try_get(0)) 202 - .collect::<Result<_, _>>()?; 197 + .await?; 203 198 204 - dids = &dids - &res; 199 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 205 200 } 206 201 207 202 let dids = dids.into_iter().collect::<Vec<_>>();
+20 -17
consumer/src/db/record.rs
··· 127 127 ], 128 128 ) 129 129 .await 130 - .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 130 + .map(|r| r.get::<_, i32>(0) == 0) 131 131 } 132 132 133 133 pub async fn feedgen_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 159 159 ) 160 160 .await?; 161 161 162 - res.map(|v| v.try_get(0)).transpose() 162 + Ok(res.map(|v| v.get(0))) 163 163 } 164 164 165 165 pub async fn labeler_upsert<C: GenericClient>( ··· 224 224 ) 225 225 .await?; 226 226 227 - res.map(|v| v.try_get(0)).transpose() 227 + Ok(res.map(|v| v.get(0))) 228 228 } 229 229 230 230 pub async fn list_upsert<C: GenericClient>( ··· 255 255 ], 256 256 ) 257 257 .await 258 - .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 258 + .map(|r| r.get::<_, i32>(0) == 0) 259 259 } 260 260 261 261 pub async fn list_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 391 391 ) 392 392 .await?; 393 393 394 - res.map(|row| Ok((row.try_get(0)?, row.try_get(1)?))) 395 - .transpose() 394 + Ok(res.map(|row| (row.get(0), row.get(1)))) 396 395 } 397 396 398 397 pub async fn post_embed_insert<C: GenericClient>( ··· 537 536 conn: &mut C, 538 537 post: &str, 539 538 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 540 - conn.query_opt( 541 - "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 - &[&post], 543 - ) 544 - .await? 545 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))) 546 - .transpose() 539 + let res = conn 540 + .query_opt( 541 + "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 + &[&post], 543 + ) 544 + .await? 545 + .map(|v| (v.get(0), v.get(1), v.get(2))); 546 + 547 + Ok(res) 547 548 } 548 549 549 550 pub async fn postgate_upsert<C: GenericClient>( ··· 650 651 ) 651 652 .await?; 652 653 653 - res.map(|v| v.try_get(0)).transpose() 654 + Ok(res.map(|v| v.get(0))) 654 655 } 655 656 656 657 pub async fn starter_pack_upsert<C: GenericClient>( ··· 685 686 ], 686 687 ) 687 688 .await 688 - .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 689 + .map(|r| r.get::<_, i32>(0) == 0) 689 690 } 690 691 691 692 pub async fn starter_pack_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 730 731 conn: &mut C, 731 732 post: &str, 732 733 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 733 - conn 734 + let res = conn 734 735 .query_opt( 735 736 "SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL", 736 737 &[&post], 737 738 ) 738 739 .await? 739 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).transpose() 740 + .map(|v| (v.get(0), v.get(1), v.get(2))); 741 + 742 + Ok(res) 740 743 } 741 744 742 745 pub async fn threadgate_upsert<C: GenericClient>(
+4
consumer/src/indexer/mod.rs
··· 640 640 } 641 641 RecordTypes::AppBskyFeedPostgate(record) => { 642 642 if !at_uri_is_by(&record.post, repo) { 643 + tracing::warn!("tried to create a postgate on a post we don't control!"); 643 644 return Ok(()); 644 645 } 645 646 ··· 669 670 } 670 671 RecordTypes::AppBskyFeedThreadgate(record) => { 671 672 if !at_uri_is_by(&record.post, repo) { 673 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 672 674 return Ok(()); 673 675 } 674 676 ··· 708 710 } 709 711 RecordTypes::AppBskyGraphListItem(record) => { 710 712 if !at_uri_is_by(&record.list, repo) { 713 + // it's also probably a bad idea to log *all* the attempts to do this... 714 + tracing::warn!("tried to create a listitem on a list we don't control!"); 711 715 return Ok(()); 712 716 } 713 717
-25
consumer/src/instrumentation.rs
··· 1 - use tracing::Subscriber; 2 - use tracing_subscriber::filter::Filtered; 3 - use tracing_subscriber::layer::SubscriberExt; 4 - use tracing_subscriber::registry::LookupSpan; 5 - use tracing_subscriber::util::SubscriberInitExt; 6 - use tracing_subscriber::{EnvFilter, Layer}; 7 - 8 - pub fn init_instruments(cfg: &crate::config::ConfigInstruments) { 9 - let log_layer = init_log(cfg.log_json); 10 - 11 - tracing_subscriber::registry().with(log_layer).init(); 12 - } 13 - 14 - fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 15 - where 16 - S: Subscriber + for<'span> LookupSpan<'span>, 17 - { 18 - let stdout_filter = EnvFilter::from_default_env(); 19 - 20 - match json { 21 - true => tracing_subscriber::fmt::layer().json().boxed(), 22 - false => tracing_subscriber::fmt::layer().boxed(), 23 - } 24 - .with_filter(stdout_filter) 25 - }
+1 -2
consumer/src/main.rs
··· 12 12 mod db; 13 13 mod firehose; 14 14 mod indexer; 15 - mod instrumentation; 16 15 mod label_indexer; 17 16 mod utils; 18 17 19 18 #[tokio::main] 20 19 async fn main() -> eyre::Result<()> { 20 + tracing_subscriber::fmt::init(); 21 21 PrometheusBuilder::new().install()?; 22 22 23 23 let cli = cmd::parse(); 24 24 let conf = config::load_config()?; 25 25 26 - instrumentation::init_instruments(&conf.instruments); 27 26 let user_agent = build_ua(&conf.ua_contact); 28 27 29 28 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
-1
lexica/src/app_bsky/mod.rs
··· 7 7 pub mod graph; 8 8 pub mod labeler; 9 9 pub mod richtext; 10 - pub mod unspecced; 11 10 12 11 #[derive(Clone, Default, Debug, Serialize)] 13 12 #[serde(rename_all = "camelCase")]
-33
lexica/src/app_bsky/unspecced.rs
··· 1 - use crate::app_bsky::feed::{BlockedAuthor, PostView}; 2 - use serde::Serialize; 3 - 4 - #[derive(Clone, Debug, Serialize)] 5 - pub struct ThreadV2Item { 6 - pub uri: String, 7 - pub depth: i32, 8 - pub value: ThreadV2ItemType, 9 - } 10 - 11 - #[derive(Clone, Debug, Serialize)] 12 - #[serde(tag = "$type")] 13 - pub enum ThreadV2ItemType { 14 - #[serde(rename = "app.bsky.unspecced.defs#threadItemPost")] 15 - Post(ThreadItemPost), 16 - #[serde(rename = "app.bsky.unspecced.defs#threadItemNoUnauthenticated")] 17 - NoUnauthenticated {}, 18 - #[serde(rename = "app.bsky.unspecced.defs#threadItemNotFound")] 19 - NotFound {}, 20 - #[serde(rename = "app.bsky.unspecced.defs#threadItemBlocked")] 21 - Blocked { author: BlockedAuthor }, 22 - } 23 - 24 - #[derive(Clone, Debug, Serialize)] 25 - #[serde(rename_all = "camelCase")] 26 - pub struct ThreadItemPost { 27 - pub post: PostView, 28 - pub more_parents: bool, 29 - pub more_replies: i32, 30 - pub op_thread: bool, 31 - pub hidden_by_threadgate: bool, 32 - pub muted_by_viewer: bool, 33 - }
+2 -2
migrations/2025-09-27-171241_post-tweaks/up.sql
··· 34 34 language plpgsql as 35 35 $$ 36 36 begin 37 - delete from author_feeds where did = OLD.did and uri = OLD.at_uri and typ = 'post'; 37 + delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post'; 38 38 return OLD; 39 39 end; 40 40 $$; ··· 67 67 language plpgsql as 68 68 $$ 69 69 begin 70 - delete from author_feeds where did = OLD.did and post = OLD.post and typ = 'repost'; 70 + delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost'; 71 71 return OLD; 72 72 end; 73 73 $$;
+2 -8
parakeet/Cargo.toml
··· 6 6 [dependencies] 7 7 async-recursion = "1.1.1" 8 8 axum = { version = "0.8", features = ["json"] } 9 - axum-tracing-opentelemetry = "0.32" 10 9 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 11 10 base64 = "0.22" 12 11 chrono = { version = "0.4.39", features = ["serde"] } ··· 22 21 jsonwebtoken = { git = "https://gitlab.com/parakeet-social/jsonwebtoken", branch = "es256k" } 23 22 lexica = { path = "../lexica" } 24 23 multibase = "0.9.1" 25 - opentelemetry = "0.31.0" 26 - opentelemetry-otlp = "0.31.0" 27 - opentelemetry_sdk = "0.31.0" 28 24 parakeet-db = { path = "../parakeet-db" } 29 - parakeet-index = { path = "../parakeet-index", features = ["otel"] } 25 + parakeet-index = { path = "../parakeet-index" } 30 26 redis = { version = "0.32", features = ["tokio-native-tls-comp"] } 31 27 reqwest = { version = "0.12", features = ["json"] } 32 28 serde = { version = "1.0.217", features = ["derive"] } 33 29 serde_ipld_dagcbor = "0.6.1" 34 30 serde_json = "1.0.134" 35 31 tokio = { version = "1.42.0", features = ["full"] } 36 - tower = "0.5" 37 32 tower-http = { version = "0.6.2", features = ["cors", "trace"] } 38 33 tracing = "0.1.40" 39 - tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } 40 - tracing-opentelemetry = "0.32" 34 + tracing-subscriber = "0.3.18"
+2 -2
parakeet/src/cache.rs
··· 29 29 type Val = V; 30 30 31 31 async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> { 32 - let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, key).await.ok()?; 32 + let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, &key).await.ok()?; 33 33 34 34 match serde_ipld_dagcbor::from_slice(&res?) { 35 35 Ok(v) => Some(v), ··· 57 57 } 58 58 59 59 async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> { 60 - let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, key) 60 + let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, &key) 61 61 .await 62 62 .ok()?; 63 63
-10
parakeet/src/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 - #[serde(flatten)] 17 - pub instruments: ConfigInstruments, 18 16 pub index_uri: String, 19 17 pub database_url: String, 20 18 pub redis_uri: String, ··· 29 27 pub did_allowlist: Option<Vec<String>>, 30 28 #[serde(default)] 31 29 pub migrate: bool, 32 - } 33 - 34 - #[derive(Debug, Deserialize)] 35 - pub struct ConfigInstruments { 36 - #[serde(default)] 37 - pub otel_enable: bool, 38 - #[serde(default)] 39 - pub log_json: bool, 40 30 } 41 31 42 32 #[derive(Debug, Deserialize)]
+1 -115
parakeet/src/db.rs
··· 1 1 use diesel::prelude::*; 2 - use diesel::sql_types::{Array, Bool, Integer, Nullable, Text}; 2 + use diesel::sql_types::{Array, Bool, Nullable, Text}; 3 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 4 - use parakeet_db::models::TextArray; 5 4 use parakeet_db::{schema, types}; 6 - use tracing::instrument; 7 5 8 - #[instrument(skip_all)] 9 6 pub async fn get_actor_status( 10 7 conn: &mut AsyncPgConnection, 11 8 did: &str, ··· 40 37 #[diesel(sql_type = Nullable<Text>)] 41 38 pub list_mute: Option<String>, 42 39 } 43 - 44 - #[instrument(skip_all)] 45 40 pub async fn get_profile_state( 46 41 conn: &mut AsyncPgConnection, 47 42 did: &str, ··· 54 49 .await 55 50 .optional() 56 51 } 57 - 58 - #[instrument(skip_all)] 59 52 pub async fn get_profile_states( 60 53 conn: &mut AsyncPgConnection, 61 54 did: &str, ··· 90 83 #[diesel(sql_type = diesel::sql_types::Bool)] 91 84 pub pinned: bool, 92 85 } 93 - 94 - #[instrument(skip_all)] 95 86 pub async fn get_post_state( 96 87 conn: &mut AsyncPgConnection, 97 88 did: &str, ··· 105 96 .optional() 106 97 } 107 98 108 - #[instrument(skip_all)] 109 99 pub async fn get_post_states( 110 100 conn: &mut AsyncPgConnection, 111 101 did: &str, ··· 129 119 pub block: Option<String>, 130 120 } 131 121 132 - #[instrument(skip_all)] 133 122 pub async fn get_list_state( 134 123 conn: &mut AsyncPgConnection, 135 124 did: &str, ··· 143 132 .optional() 144 133 } 145 134 146 - #[instrument(skip_all)] 147 135 pub async fn get_list_states( 148 136 conn: &mut AsyncPgConnection, 149 137 did: &str, ··· 156 144 .await 157 145 } 158 146 159 - #[instrument(skip_all)] 160 147 pub async fn get_like_state( 161 148 conn: &mut AsyncPgConnection, 162 149 did: &str, ··· 174 161 .optional() 175 162 } 176 163 177 - #[instrument(skip_all)] 178 164 pub async fn get_like_states( 179 165 conn: &mut AsyncPgConnection, 180 166 did: &str, ··· 195 181 .await 196 182 } 197 183 198 - #[instrument(skip_all)] 199 184 pub async fn get_pinned_post_uri( 200 185 conn: &mut AsyncPgConnection, 201 186 did: &str, ··· 211 196 .await 212 197 .optional() 213 198 } 214 - 215 - #[derive(Debug, QueryableByName)] 216 - #[diesel(check_for_backend(diesel::pg::Pg))] 217 - #[allow(unused)] 218 - pub struct ThreadItem { 219 - #[diesel(sql_type = Text)] 220 - pub at_uri: String, 221 - #[diesel(sql_type = Nullable<Text>)] 222 - pub parent_uri: Option<String>, 223 - #[diesel(sql_type = Nullable<Text>)] 224 - pub root_uri: Option<String>, 225 - #[diesel(sql_type = Integer)] 226 - pub depth: i32, 227 - } 228 - 229 - #[instrument(skip_all)] 230 - pub async fn get_thread_children( 231 - conn: &mut AsyncPgConnection, 232 - uri: &str, 233 - depth: i32, 234 - ) -> QueryResult<Vec<ThreadItem>> { 235 - diesel::sql_query(include_str!("sql/thread.sql")) 236 - .bind::<Text, _>(uri) 237 - .bind::<Integer, _>(depth) 238 - .load(conn) 239 - .await 240 - } 241 - 242 - #[instrument(skip_all)] 243 - pub async fn get_thread_children_branching( 244 - conn: &mut AsyncPgConnection, 245 - uri: &str, 246 - depth: i32, 247 - branching_factor: i32, 248 - ) -> QueryResult<Vec<ThreadItem>> { 249 - diesel::sql_query(include_str!("sql/thread_branching.sql")) 250 - .bind::<Text, _>(uri) 251 - .bind::<Integer, _>(depth) 252 - .bind::<Integer, _>(branching_factor) 253 - .load(conn) 254 - .await 255 - } 256 - 257 - #[derive(Debug, QueryableByName)] 258 - #[diesel(check_for_backend(diesel::pg::Pg))] 259 - pub struct HiddenThreadChildItem { 260 - #[diesel(sql_type = Text)] 261 - pub at_uri: String, 262 - } 263 - 264 - #[instrument(skip_all)] 265 - pub async fn get_thread_children_hidden( 266 - conn: &mut AsyncPgConnection, 267 - uri: &str, 268 - root: &str, 269 - ) -> QueryResult<Vec<HiddenThreadChildItem>> { 270 - diesel::sql_query(include_str!("sql/thread_v2_hidden_children.sql")) 271 - .bind::<Text, _>(uri) 272 - .bind::<Text, _>(root) 273 - .load(conn) 274 - .await 275 - } 276 - 277 - #[instrument(skip_all)] 278 - pub async fn get_thread_parents( 279 - conn: &mut AsyncPgConnection, 280 - uri: &str, 281 - height: i32, 282 - ) -> QueryResult<Vec<ThreadItem>> { 283 - diesel::sql_query(include_str!("sql/thread_parent.sql")) 284 - .bind::<Text, _>(uri) 285 - .bind::<Integer, _>(height) 286 - .load(conn) 287 - .await 288 - } 289 - 290 - #[instrument(skip_all)] 291 - pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> { 292 - schema::posts::table 293 - .select(schema::posts::root_uri) 294 - .find(&uri) 295 - .get_result(conn) 296 - .await 297 - .optional() 298 - .map(|v| v.flatten()) 299 - } 300 - 301 - #[instrument(skip_all)] 302 - pub async fn get_threadgate_hiddens( 303 - conn: &mut AsyncPgConnection, 304 - uri: &str, 305 - ) -> QueryResult<Option<TextArray>> { 306 - schema::threadgates::table 307 - .select(schema::threadgates::hidden_replies) 308 - .find(&uri) 309 - .get_result(conn) 310 - .await 311 - .optional() 312 - }
-3
parakeet/src/hydration/embed.rs
··· 8 8 use lexica::app_bsky::feed::PostView; 9 9 use parakeet_db::models; 10 10 use std::collections::HashMap; 11 - use tracing::instrument; 12 11 13 12 fn build_aspect_ratio(height: Option<i32>, width: Option<i32>) -> Option<AspectRatio> { 14 13 height ··· 177 176 out 178 177 } 179 178 180 - #[instrument(skip_all)] 181 179 pub async fn hydrate_embed(&self, post: String) -> Option<Embed> { 182 180 let (embed, author) = self.loaders.embed.load(post).await?; 183 181 ··· 197 195 } 198 196 } 199 197 200 - #[instrument(skip_all)] 201 198 pub async fn hydrate_embeds(&self, posts: Vec<String>) -> HashMap<String, Embed> { 202 199 let embeds = self.loaders.embed.load_many(posts).await; 203 200
-5
parakeet/src/hydration/feedgen.rs
··· 5 5 use parakeet_db::models; 6 6 use std::collections::HashMap; 7 7 use std::str::FromStr; 8 - use tracing::instrument; 9 8 10 9 fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState { 11 10 GeneratorViewerState { ··· 50 49 } 51 50 52 51 impl super::StatefulHydrator<'_> { 53 - #[instrument(skip_all)] 54 52 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 55 53 let labels = self.get_label(&feedgen).await; 56 54 let viewer = self.get_feedgen_viewer_state(&feedgen).await; ··· 63 61 )) 64 62 } 65 63 66 - #[instrument(skip_all)] 67 64 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 68 65 let labels = self.get_label_many(&feedgens).await; 69 66 let viewers = self.get_feedgen_viewer_states(&feedgens).await; ··· 93 90 .collect() 94 91 } 95 92 96 - #[instrument(skip_all)] 97 93 async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> { 98 94 if let Some(viewer) = &self.current_actor { 99 95 let data = self.loaders.like_state.get(viewer, subject).await?; ··· 104 100 } 105 101 } 106 102 107 - #[instrument(skip_all)] 108 103 async fn get_feedgen_viewer_states( 109 104 &self, 110 105 subjects: &[String],
+9 -12
parakeet/src/hydration/labeler.rs
··· 8 8 use parakeet_db::models; 9 9 use std::collections::HashMap; 10 10 use std::str::FromStr; 11 - use tracing::instrument; 12 11 13 12 fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState { 14 13 LabelerViewerState { ··· 43 42 likes: Option<i32>, 44 43 ) -> LabelerViewDetailed { 45 44 let reason_types = labeler.reasons.map(|v| { 46 - v.iter() 47 - .filter_map(|v| ReasonType::from_str(v).ok()) 45 + v.into_iter() 46 + .flatten() 47 + .filter_map(|v| ReasonType::from_str(&v).ok()) 48 48 .collect() 49 49 }); 50 50 ··· 74 74 }) 75 75 .collect(); 76 76 let subject_types = labeler.subject_types.map(|v| { 77 - v.iter() 78 - .filter_map(|v| SubjectType::from_str(v).ok()) 77 + v.into_iter() 78 + .flatten() 79 + .filter_map(|v| SubjectType::from_str(&v).ok()) 79 80 .collect() 80 81 }); 81 - let subject_collections = labeler.subject_collections.map(Vec::from); 82 + let subject_collections = labeler 83 + .subject_collections 84 + .map(|v| v.into_iter().flatten().collect()); 82 85 83 86 LabelerViewDetailed { 84 87 uri: format!("at://{}/app.bsky.labeler.service/self", labeler.did), ··· 99 102 } 100 103 101 104 impl StatefulHydrator<'_> { 102 - #[instrument(skip_all)] 103 105 pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> { 104 106 let labels = self.get_label(&labeler).await; 105 107 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 110 112 Some(build_view(labeler, creator, labels, viewer, likes)) 111 113 } 112 114 113 - #[instrument(skip_all)] 114 115 pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> { 115 116 let labels = self.get_label_many(&labelers).await; 116 117 let labelers = self.loaders.labeler.load_many(labelers).await; ··· 136 137 .collect() 137 138 } 138 139 139 - #[instrument(skip_all)] 140 140 pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> { 141 141 let labels = self.get_label(&labeler).await; 142 142 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 149 149 )) 150 150 } 151 151 152 - #[instrument(skip_all)] 153 152 pub async fn hydrate_labelers_detailed( 154 153 &self, 155 154 labelers: Vec<String>, ··· 180 179 .collect() 181 180 } 182 181 183 - #[instrument(skip_all)] 184 182 async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> { 185 183 if let Some(viewer) = &self.current_actor { 186 184 let data = self ··· 195 193 } 196 194 } 197 195 198 - #[instrument(skip_all)] 199 196 async fn get_labeler_viewer_states( 200 197 &self, 201 198 subjects: &[String],
-7
parakeet/src/hydration/list.rs
··· 6 6 use parakeet_db::models; 7 7 use std::collections::HashMap; 8 8 use std::str::FromStr; 9 - use tracing::instrument; 10 9 11 10 fn build_viewer(data: ListStateRet) -> ListViewerState { 12 11 ListViewerState { ··· 70 69 } 71 70 72 71 impl StatefulHydrator<'_> { 73 - #[instrument(skip_all)] 74 72 pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> { 75 73 let labels = self.get_label(&list).await; 76 74 let viewer = self.get_list_viewer_state(&list).await; ··· 79 77 build_basic(list, count, labels, viewer, &self.cdn) 80 78 } 81 79 82 - #[instrument(skip_all)] 83 80 pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> { 84 81 if lists.is_empty() { 85 82 return HashMap::new(); ··· 100 97 .collect() 101 98 } 102 99 103 - #[instrument(skip_all)] 104 100 pub async fn hydrate_list(&self, list: String) -> Option<ListView> { 105 101 let labels = self.get_label(&list).await; 106 102 let viewer = self.get_list_viewer_state(&list).await; ··· 110 106 build_listview(list, count, profile, labels, viewer, &self.cdn) 111 107 } 112 108 113 - #[instrument(skip_all)] 114 109 pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> { 115 110 if lists.is_empty() { 116 111 return HashMap::new(); ··· 136 131 .collect() 137 132 } 138 133 139 - #[instrument(skip_all)] 140 134 async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> { 141 135 if let Some(viewer) = &self.current_actor { 142 136 let data = self.loaders.list_state.get(viewer, subject).await?; ··· 147 141 } 148 142 } 149 143 150 - #[instrument(skip_all)] 151 144 async fn get_list_viewer_states( 152 145 &self, 153 146 subjects: &[String],
-4
parakeet/src/hydration/mod.rs
··· 63 63 } 64 64 } 65 65 66 - #[tracing::instrument(skip_all)] 67 66 async fn get_label(&self, uri: &str) -> Vec<parakeet_db::models::Label> { 68 67 self.loaders.label.load(uri, self.accept_labelers).await 69 68 } 70 69 71 - #[tracing::instrument(skip_all)] 72 70 async fn get_profile_label(&self, did: &str) -> Vec<parakeet_db::models::Label> { 73 71 let uris = &[ 74 72 did.to_string(), ··· 82 80 .collect() 83 81 } 84 82 85 - #[tracing::instrument(skip_all)] 86 83 async fn get_label_many( 87 84 &self, 88 85 uris: &[String], ··· 93 90 .await 94 91 } 95 92 96 - #[tracing::instrument(skip_all)] 97 93 async fn get_profile_label_many( 98 94 &self, 99 95 uris: &[String],
+3 -12
parakeet/src/hydration/posts.rs
··· 11 11 use parakeet_db::models; 12 12 use parakeet_index::PostStats; 13 13 use std::collections::HashMap; 14 - use tracing::instrument; 15 14 16 15 fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState { 17 16 let is_me = did == data.did; ··· 83 82 } 84 83 85 84 impl StatefulHydrator<'_> { 86 - #[instrument(skip_all)] 87 85 async fn hydrate_threadgate( 88 86 &self, 89 87 threadgate: Option<models::Threadgate>, ··· 91 89 let threadgate = threadgate?; 92 90 93 91 let lists = match threadgate.allowed_lists.as_ref() { 94 - Some(allowed_lists) => allowed_lists.clone().into(), 92 + Some(allowed_lists) => allowed_lists.iter().flatten().cloned().collect(), 95 93 None => Vec::new(), 96 94 }; 97 95 let lists = self.hydrate_lists_basic(lists).await; ··· 102 100 )) 103 101 } 104 102 105 - #[instrument(skip_all)] 106 103 async fn hydrate_threadgates( 107 104 &self, 108 105 threadgates: Vec<models::Threadgate>, 109 106 ) -> HashMap<String, ThreadgateView> { 110 107 let lists = threadgates.iter().fold(Vec::new(), |mut acc, c| { 111 108 if let Some(lists) = &c.allowed_lists { 112 - acc.extend(lists.clone().0); 109 + acc.extend(lists.iter().flatten().cloned()); 113 110 } 114 111 acc 115 112 }); ··· 121 118 let this_lists = match &threadgate.allowed_lists { 122 119 Some(allowed_lists) => allowed_lists 123 120 .iter() 124 - .filter_map(|v| lists.get(v).cloned()) 121 + .filter_map(|v| v.clone().and_then(|v| lists.get(&v).cloned())) 125 122 .collect(), 126 123 None => Vec::new(), 127 124 }; ··· 134 131 .collect() 135 132 } 136 133 137 - #[instrument(skip_all)] 138 134 pub async fn hydrate_post(&self, post: String) -> Option<PostView> { 139 135 let stats = self.loaders.post_stats.load(post.clone()).await; 140 136 let (post, threadgate) = self.loaders.posts.load(post).await?; ··· 149 145 ))) 150 146 } 151 147 152 - #[instrument(skip_all)] 153 148 async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> { 154 149 let stats = self.loaders.post_stats.load_many(posts.clone()).await; 155 150 let posts = self.loaders.posts.load_many(posts).await; ··· 189 184 .collect() 190 185 } 191 186 192 - #[instrument(skip_all)] 193 187 pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 194 188 self.hydrate_posts_inner(posts) 195 189 .await ··· 198 192 .collect() 199 193 } 200 194 201 - #[instrument(skip_all)] 202 195 pub async fn hydrate_feed_posts( 203 196 &self, 204 197 posts: Vec<RawFeedItem>, ··· 302 295 .collect() 303 296 } 304 297 305 - #[instrument(skip_all)] 306 298 async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> { 307 299 if let Some(viewer) = &self.current_actor { 308 300 let data = self.loaders.post_state.get(viewer, subject).await?; ··· 313 305 } 314 306 } 315 307 316 - #[instrument(skip_all)] 317 308 async fn get_post_viewer_states( 318 309 &self, 319 310 subjects: &[String],
+1 -12
parakeet/src/hydration/profile.rs
··· 12 12 use std::collections::HashMap; 13 13 use std::str::FromStr; 14 14 use std::sync::OnceLock; 15 - use tracing::instrument; 16 15 17 16 pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new(); 18 17 ··· 52 51 .followed 53 52 .map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject)); 54 53 55 - let blocking = data.list_block.or(data 56 - .blocking 57 - .map(|rkey| format!("at://{}/app.bsky.graph.block/{rkey}", data.did))); 54 + let blocking = data.list_block.or(data.blocking); 58 55 59 56 ProfileViewerState { 60 57 muted: data.muting.unwrap_or_default(), ··· 275 272 } 276 273 277 274 impl super::StatefulHydrator<'_> { 278 - #[instrument(skip_all)] 279 275 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 280 276 let labels = self.get_profile_label(&did).await; 281 277 let viewer = self.get_profile_viewer_state(&did).await; ··· 293 289 )) 294 290 } 295 291 296 - #[instrument(skip_all)] 297 292 pub async fn hydrate_profiles_basic( 298 293 &self, 299 294 dids: Vec<String>, ··· 318 313 .collect() 319 314 } 320 315 321 - #[instrument(skip_all)] 322 316 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 323 317 let labels = self.get_profile_label(&did).await; 324 318 let viewer = self.get_profile_viewer_state(&did).await; ··· 336 330 )) 337 331 } 338 332 339 - #[instrument(skip_all)] 340 333 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 341 334 let labels = self.get_profile_label_many(&dids).await; 342 335 let viewers = self.get_profile_viewer_states(&dids).await; ··· 358 351 .collect() 359 352 } 360 353 361 - #[instrument(skip_all)] 362 354 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 363 355 let labels = self.get_profile_label(&did).await; 364 356 let viewer = self.get_profile_viewer_state(&did).await; ··· 376 368 )) 377 369 } 378 370 379 - #[instrument(skip_all)] 380 371 pub async fn hydrate_profiles_detailed( 381 372 &self, 382 373 dids: Vec<String>, ··· 401 392 .collect() 402 393 } 403 394 404 - #[instrument(skip_all)] 405 395 async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> { 406 396 if let Some(viewer) = &self.current_actor { 407 397 let data = self.loaders.profile_state.get(viewer, subject).await?; ··· 421 411 } 422 412 } 423 413 424 - #[instrument(skip_all)] 425 414 async fn get_profile_viewer_states( 426 415 &self, 427 416 dids: &[String],
+9 -11
parakeet/src/hydration/starter_packs.rs
··· 4 4 use lexica::app_bsky::graph::{ListViewBasic, StarterPackView, StarterPackViewBasic}; 5 5 use parakeet_db::models; 6 6 use std::collections::HashMap; 7 - use tracing::instrument; 8 7 9 8 fn build_basic( 10 9 starter_pack: models::StaterPack, ··· 51 50 } 52 51 53 52 impl StatefulHydrator<'_> { 54 - #[instrument(skip_all)] 55 53 pub async fn hydrate_starterpack_basic(&self, pack: String) -> Option<StarterPackViewBasic> { 56 54 let labels = self.get_label(&pack).await; 57 55 let sp = self.loaders.starterpacks.load(pack).await?; ··· 61 59 Some(build_basic(sp, creator, labels, list_item_count)) 62 60 } 63 61 64 - #[instrument(skip_all)] 65 62 pub async fn hydrate_starterpacks_basic( 66 63 &self, 67 64 packs: Vec<String>, ··· 89 86 .collect() 90 87 } 91 88 92 - #[instrument(skip_all)] 93 89 pub async fn hydrate_starterpack(&self, pack: String) -> Option<StarterPackView> { 94 90 let labels = self.get_label(&pack).await; 95 91 let sp = self.loaders.starterpacks.load(pack).await?; ··· 97 93 let creator = self.hydrate_profile_basic(sp.owner.clone()).await?; 98 94 let list = self.hydrate_list_basic(sp.list.clone()).await; 99 95 100 - let feeds = sp.feeds.clone().unwrap_or_default(); 101 - let feeds = self 102 - .hydrate_feedgens(feeds.into()) 103 - .await 104 - .into_values() 96 + let feeds = sp 97 + .feeds 98 + .clone() 99 + .unwrap_or_default() 100 + .into_iter() 101 + .flatten() 105 102 .collect(); 103 + let feeds = self.hydrate_feedgens(feeds).await.into_values().collect(); 106 104 107 105 Some(build_spview(sp, creator, labels, list, feeds)) 108 106 } 109 107 110 - #[instrument(skip_all)] 111 108 pub async fn hydrate_starterpacks( 112 109 &self, 113 110 packs: Vec<String>, ··· 122 119 let feeds = packs 123 120 .values() 124 121 .filter_map(|pack| pack.feeds.clone()) 125 - .flat_map(Vec::from) 122 + .flat_map(|feeds| feeds.into_iter().flatten()) 126 123 .collect(); 127 124 128 125 let creators = self.hydrate_profiles_basic(creators).await; ··· 136 133 let list = lists.get(&pack.list).cloned(); 137 134 let feeds = pack.feeds.as_ref().map(|v| { 138 135 v.iter() 136 + .flatten() 139 137 .filter_map(|feed| feeds.get(feed).cloned()) 140 138 .collect() 141 139 });
-57
parakeet/src/instrumentation.rs
··· 1 - use opentelemetry::trace::TracerProvider; 2 - use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; 3 - use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider}; 4 - use tracing::Subscriber; 5 - use tracing_opentelemetry::OpenTelemetryLayer; 6 - use tracing_subscriber::filter::Filtered; 7 - use tracing_subscriber::layer::SubscriberExt; 8 - use tracing_subscriber::registry::LookupSpan; 9 - use tracing_subscriber::util::SubscriberInitExt; 10 - use tracing_subscriber::{EnvFilter, Layer}; 11 - 12 - pub fn init_instruments(cfg: &crate::config::ConfigInstruments) { 13 - let otel_layer = cfg.otel_enable.then(init_otel); 14 - let log_layer = init_log(cfg.log_json); 15 - 16 - tracing_subscriber::registry() 17 - .with(log_layer) 18 - .with(otel_layer) 19 - .init(); 20 - } 21 - 22 - fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S> 23 - where 24 - S: Subscriber + for<'span> LookupSpan<'span>, 25 - { 26 - let span_exporter = SpanExporter::builder() 27 - .with_http() 28 - .with_protocol(Protocol::HttpBinary) 29 - .build() 30 - .unwrap(); 31 - 32 - let tracer_provider = SdkTracerProvider::builder() 33 - .with_batch_exporter(span_exporter) 34 - .with_sampler(Sampler::AlwaysOn) 35 - .build(); 36 - 37 - opentelemetry::global::set_tracer_provider(tracer_provider.clone()); 38 - 39 - let tracer = tracer_provider.tracer("parakeet"); 40 - let otel_filter = EnvFilter::new("info,otel::tracing=trace,tower_http=off"); 41 - 42 - OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 43 - } 44 - 45 - fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 46 - where 47 - S: Subscriber + for<'span> LookupSpan<'span>, 48 - { 49 - let stdout_filter = 50 - EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 51 - 52 - match json { 53 - true => tracing_subscriber::fmt::layer().json().boxed(), 54 - false => tracing_subscriber::fmt::layer().boxed(), 55 - } 56 - .with_filter(stdout_filter) 57 - }
+5 -31
parakeet/src/loaders.rs
··· 4 4 use dataloader::async_cached::Loader; 5 5 use dataloader::non_cached::Loader as NonCachedLoader; 6 6 use dataloader::BatchFn; 7 - use diesel::dsl::sql; 8 7 use diesel::prelude::*; 9 8 use diesel_async::pooled_connection::deadpool::Pool; 10 9 use diesel_async::{AsyncPgConnection, RunQueryDsl}; ··· 15 14 use serde::{Deserialize, Serialize}; 16 15 use std::collections::HashMap; 17 16 use std::str::FromStr; 18 - use tracing::instrument; 19 17 20 18 type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>; 21 19 ··· 64 62 ) -> Dataloaders { 65 63 Dataloaders { 66 64 embed: new_plc_loader(EmbedLoader(pool.clone()), &rc, "embed", 3600), 67 - feedgen: new_plc_loader(FeedGenLoader(pool.clone()), &rc, "feedgen", 600), 65 + feedgen: new_plc_loader(FeedGenLoader(pool.clone(), idxc.clone()), &rc, "feedgen", 600), 68 66 handle: new_plc_loader(HandleLoader(pool.clone()), &rc, "handle", 60), 69 67 label: LabelLoader(pool.clone()), // CARE: never cache this. 70 - labeler: new_plc_loader(LabelServiceLoader(pool.clone()), &rc, "labeler", 600), 68 + labeler: new_plc_loader(LabelServiceLoader(pool.clone(), idxc.clone()), &rc, "labeler", 600), 71 69 like: NonCachedLoader::new(LikeLoader(idxc.clone())), 72 70 like_state: LikeRecordLoader(pool.clone()), 73 71 list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600), ··· 86 84 87 85 pub struct LikeLoader(parakeet_index::Client); 88 86 impl BatchFn<String, i32> for LikeLoader { 89 - #[instrument(name = "LikeLoader", skip_all)] 90 87 async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> { 91 88 let res = self 92 89 .0 ··· 109 106 110 107 pub struct LikeRecordLoader(Pool<AsyncPgConnection>); 111 108 impl LikeRecordLoader { 112 - #[instrument(name = "LikeRecordLoader::get", skip_all)] 113 109 pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> { 114 110 let mut conn = self.0.get().await.unwrap(); 115 111 ··· 121 117 }) 122 118 } 123 119 124 - #[instrument(name = "LikeRecordLoader::get_many", skip_all)] 125 120 pub async fn get_many( 126 121 &self, 127 122 did: &str, ··· 143 138 144 139 pub struct HandleLoader(Pool<AsyncPgConnection>); 145 140 impl BatchFn<String, String> for HandleLoader { 146 - #[instrument(name = "HandleLoader", skip_all)] 147 141 async fn load(&mut self, keys: &[String]) -> HashMap<String, String> { 148 142 let mut conn = self.0.get().await.unwrap(); 149 143 ··· 176 170 Option<ProfileAllowSubscriptions>, 177 171 ); 178 172 impl BatchFn<String, ProfileLoaderRet> for ProfileLoader { 179 - #[instrument(name = "ProfileLoader", skip_all)] 180 173 async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> { 181 174 let mut conn = self.0.get().await.unwrap(); 182 175 ··· 238 231 239 232 pub struct ProfileStatsLoader(parakeet_index::Client); 240 233 impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader { 241 - #[instrument(name = "ProfileStatsLoader", skip_all)] 242 234 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> { 243 235 let stats_req = parakeet_index::GetStatsManyReq { 244 236 uris: keys.to_vec(), ··· 255 247 256 248 pub struct ProfileStateLoader(Pool<AsyncPgConnection>); 257 249 impl ProfileStateLoader { 258 - #[instrument(name = "ProfileStateLoader::get", skip_all)] 259 250 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> { 260 251 let mut conn = self.0.get().await.unwrap(); 261 252 ··· 267 258 }) 268 259 } 269 260 270 - #[instrument(name = "ProfileStateLoader::get_many", skip_all)] 271 261 pub async fn get_many( 272 262 &self, 273 263 did: &str, ··· 288 278 pub struct ListLoader(Pool<AsyncPgConnection>); 289 279 type ListLoaderRet = (models::List, i64); 290 280 impl BatchFn<String, ListLoaderRet> for ListLoader { 291 - #[instrument(name = "ListLoaderRet", skip_all)] 292 281 async fn load(&mut self, keys: &[String]) -> HashMap<String, ListLoaderRet> { 293 282 let mut conn = self.0.get().await.unwrap(); 294 283 ··· 320 309 321 310 pub struct ListStateLoader(Pool<AsyncPgConnection>); 322 311 impl ListStateLoader { 323 - #[instrument(name = "ListStateLoader::get", skip_all)] 324 312 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> { 325 313 let mut conn = self.0.get().await.unwrap(); 326 314 ··· 332 320 }) 333 321 } 334 322 335 - #[instrument(name = "ListStateLoader::get_many", skip_all)] 336 323 pub async fn get_many( 337 324 &self, 338 325 did: &str, ··· 350 337 } 351 338 } 352 339 353 - pub struct FeedGenLoader(Pool<AsyncPgConnection>); 340 + pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 354 341 impl BatchFn<String, models::FeedGen> for FeedGenLoader { 355 - #[instrument(name = "FeedGenLoader", skip_all)] 356 342 async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> { 357 343 let mut conn = self.0.get().await.unwrap(); 358 344 ··· 378 364 pub struct PostLoader(Pool<AsyncPgConnection>); 379 365 type PostLoaderRet = (models::Post, Option<models::Threadgate>); 380 366 impl BatchFn<String, PostLoaderRet> for PostLoader { 381 - #[instrument(name = "PostLoader", skip_all)] 382 367 async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> { 383 368 let mut conn = self.0.get().await.unwrap(); 384 369 385 370 let res = schema::posts::table 386 - .left_join(schema::threadgates::table.on( 387 - schema::threadgates::post_uri.eq(sql("coalesce(posts.root_uri, posts.at_uri)")), 388 - )) 371 + .left_join(schema::threadgates::table) 389 372 .select(( 390 373 models::Post::as_select(), 391 374 Option::<models::Threadgate>::as_select(), ··· 409 392 410 393 pub struct PostStatsLoader(parakeet_index::Client); 411 394 impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader { 412 - #[instrument(name = "PostStatsLoader", skip_all)] 413 395 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> { 414 396 let stats_req = parakeet_index::GetStatsManyReq { 415 397 uris: keys.to_vec(), ··· 426 408 427 409 pub struct PostStateLoader(Pool<AsyncPgConnection>); 428 410 impl PostStateLoader { 429 - #[instrument(name = "PostStateLoader::get", skip_all)] 430 411 pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> { 431 412 let mut conn = self.0.get().await.unwrap(); 432 413 ··· 438 419 }) 439 420 } 440 421 441 - #[instrument(name = "PostStateLoader::get_many", skip_all)] 442 422 pub async fn get_many( 443 423 &self, 444 424 did: &str, ··· 466 446 RecordWithMedia(models::PostEmbedRecord, Box<EmbedLoaderRet>), 467 447 } 468 448 impl BatchFn<String, (EmbedLoaderRet, String)> for EmbedLoader { 469 - #[instrument(name = "EmbedLoader", skip_all)] 470 449 async fn load(&mut self, keys: &[String]) -> HashMap<String, (EmbedLoaderRet, String)> { 471 450 let mut conn = self.0.get().await.unwrap(); 472 451 ··· 549 528 pub struct StarterPackLoader(Pool<AsyncPgConnection>); 550 529 type StarterPackLoaderRet = models::StaterPack; 551 530 impl BatchFn<String, StarterPackLoaderRet> for StarterPackLoader { 552 - #[instrument(name = "StarterPackLoader", skip_all)] 553 531 async fn load(&mut self, keys: &[String]) -> HashMap<String, StarterPackLoaderRet> { 554 532 let mut conn = self.0.get().await.unwrap(); 555 533 ··· 572 550 } 573 551 } 574 552 575 - pub struct LabelServiceLoader(Pool<AsyncPgConnection>); 553 + pub struct LabelServiceLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 576 554 type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>); 577 555 impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader { 578 - #[instrument(name = "LabelServiceLoader", skip_all)] 579 556 async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> { 580 557 let mut conn = self.0.get().await.unwrap(); 581 558 ··· 614 591 // but it should live here anyway 615 592 pub struct LabelLoader(Pool<AsyncPgConnection>); 616 593 impl LabelLoader { 617 - #[instrument(name = "LabelLoader::load", skip_all)] 618 594 pub async fn load(&self, uri: &str, services: &[LabelConfigItem]) -> Vec<models::Label> { 619 595 let mut conn = self.0.get().await.unwrap(); 620 596 ··· 634 610 }) 635 611 } 636 612 637 - #[instrument(name = "LabelLoader::load_many", skip_all)] 638 613 pub async fn load_many( 639 614 &self, 640 615 uris: &[String], ··· 669 644 670 645 pub struct VerificationLoader(Pool<AsyncPgConnection>); 671 646 impl BatchFn<String, Vec<models::VerificationEntry>> for VerificationLoader { 672 - #[instrument(name = "VerificationLoader", skip_all)] 673 647 async fn load(&mut self, keys: &[String]) -> HashMap<String, Vec<models::VerificationEntry>> { 674 648 let mut conn = self.0.get().await.unwrap(); 675 649
+5 -14
parakeet/src/main.rs
··· 1 - use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; 2 1 use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; 3 2 use diesel_async::pooled_connection::deadpool::Pool; 4 3 use diesel_async::pooled_connection::AsyncDieselConnectionManager; ··· 15 14 mod config; 16 15 mod db; 17 16 mod hydration; 18 - mod instrumentation; 19 17 mod loaders; 20 18 mod xrpc; 21 19 ··· 33 31 34 32 #[tokio::main] 35 33 async fn main() -> eyre::Result<()> { 36 - let conf = config::load_config()?; 34 + tracing_subscriber::fmt::init(); 37 35 38 - instrumentation::init_instruments(&conf.instruments); 36 + let conf = config::load_config()?; 39 37 40 38 let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url); 41 39 let pool = Pool::builder(db_mgr).build()?; ··· 54 52 let redis_client = redis::Client::open(conf.redis_uri)?; 55 53 let redis_mp = redis_client.get_multiplexed_tokio_connection().await?; 56 54 57 - let index_client = parakeet_index::connect_with_otel(conf.index_uri) 58 - .await 59 - .map_err(|e| eyre::eyre!(e))?; 55 + let index_client = parakeet_index::Client::connect(conf.index_uri).await?; 60 56 61 57 let dataloaders = Arc::new(loaders::Dataloaders::new( 62 58 pool.clone(), ··· 83 79 84 80 let did_doc = did_web_doc(&conf.service); 85 81 86 - let mw = tower::ServiceBuilder::new() 87 - .option_layer(conf.instruments.otel_enable.then(OtelInResponseLayer::default)) 88 - .option_layer(conf.instruments.otel_enable.then(OtelAxumLayer::default)) 89 - .layer(TraceLayer::new_for_http()) 90 - .layer(cors); 91 - 92 82 let app = axum::Router::new() 93 83 .nest("/xrpc", xrpc::xrpc_routes()) 94 84 .route( 95 85 "/.well-known/did.json", 96 86 axum::routing::get(async || axum::Json(did_doc)), 97 87 ) 98 - .layer(mw) 88 + .layer(TraceLayer::new_for_http()) 89 + .layer(cors) 99 90 .with_state(GlobalState { 100 91 pool, 101 92 redis_mp,
+1 -1
parakeet/src/sql/thread.sql
··· 1 - with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth 1 + with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth 2 2 from posts 3 3 where parent_uri = $1 and violates_threadgate=FALSE 4 4 union all
-13
parakeet/src/sql/thread_branching.sql
··· 1 - with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth 2 - from posts 3 - where parent_uri = $1 4 - and violates_threadgate = FALSE 5 - union all 6 - (select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1 7 - from posts p 8 - join thread on p.parent_uri = thread.at_uri 9 - where thread.depth <= $2 10 - and violates_threadgate = FALSE 11 - LIMIT $3)) 12 - select * 13 - from thread;
-6
parakeet/src/sql/thread_v2_hidden_children.sql
··· 1 - select at_uri 2 - from posts 3 - where parent_uri = $1 4 - and at_uri = any (select unnest(hidden_replies) 5 - from threadgates 6 - where post_uri = $2)
+1 -1
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 36 36 rkey: None, 37 37 subject: &form.uri, 38 38 subject_cid: Some(form.cid), 39 - subject_type: parts[1], 39 + subject_type: &parts[1], 40 40 tags: vec![], 41 41 }; 42 42
+26 -8
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 24 24 use reqwest::Url; 25 25 use serde::{Deserialize, Serialize}; 26 26 use std::collections::HashMap; 27 - use tracing::instrument; 28 27 29 28 const FEEDGEN_SERVICE_ID: &str = "#bsky_fg"; 30 29 ··· 161 160 162 161 #[derive(Debug, Default, Eq, PartialEq, Deserialize)] 163 162 #[serde(rename_all = "snake_case")] 164 - #[allow(clippy::enum_variant_names)] 165 163 pub enum GetAuthorFeedFilter { 166 164 #[default] 167 165 PostsWithReplies, ··· 200 198 if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? { 201 199 if psr.blocked.unwrap_or_default() { 202 200 // they block us 203 - return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)); 201 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)) 204 202 } else if psr.blocking.is_some() { 205 203 // we block them 206 - return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)); 204 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)) 207 205 } 208 206 } 209 207 } ··· 363 361 pub threadgate: Option<ThreadgateView>, 364 362 } 365 363 364 + #[derive(Debug, QueryableByName)] 365 + #[diesel(check_for_backend(diesel::pg::Pg))] 366 + struct ThreadItem { 367 + #[diesel(sql_type = diesel::sql_types::Text)] 368 + at_uri: String, 369 + #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 370 + parent_uri: Option<String>, 371 + // #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 372 + // root_uri: Option<String>, 373 + #[diesel(sql_type = diesel::sql_types::Integer)] 374 + depth: i32, 375 + } 376 + 366 377 pub async fn get_post_thread( 367 378 State(state): State<GlobalState>, 368 379 AtpAcceptLabelers(labelers): AtpAcceptLabelers, ··· 398 409 } 399 410 } 400 411 401 - let replies = crate::db::get_thread_children(&mut conn, &uri, depth as i32).await?; 402 - let parents = crate::db::get_thread_parents(&mut conn, &uri, parent_height as i32).await?; 412 + let replies = diesel::sql_query(include_str!("../../../sql/thread.sql")) 413 + .bind::<diesel::sql_types::Text, _>(&uri) 414 + .bind::<diesel::sql_types::Integer, _>(depth as i32) 415 + .load::<ThreadItem>(&mut conn) 416 + .await?; 417 + 418 + let parents = diesel::sql_query(include_str!("../../../sql/thread_parent.sql")) 419 + .bind::<diesel::sql_types::Text, _>(&uri) 420 + .bind::<diesel::sql_types::Integer, _>(parent_height as i32) 421 + .load::<ThreadItem>(&mut conn) 422 + .await?; 403 423 404 424 let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect(); 405 425 let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect(); ··· 614 634 .or(schema::posts::embed_subtype.eq_any(filter)) 615 635 } 616 636 617 - #[instrument(skip_all)] 618 637 async fn get_feed_skeleton( 619 638 feed: &str, 620 639 service: &str, ··· 656 675 } 657 676 } 658 677 659 - #[instrument(skip_all)] 660 678 async fn get_skeleton_repost_data( 661 679 conn: &mut AsyncPgConnection, 662 680 reposts: Vec<String>,
-3
parakeet/src/xrpc/app_bsky/mod.rs
··· 6 6 mod feed; 7 7 mod graph; 8 8 mod labeler; 9 - mod unspecced; 10 9 11 10 #[rustfmt::skip] 12 11 pub fn routes() -> Router<crate::GlobalState> { ··· 65 64 // TODO: app.bsky.notification.putActivitySubscriptions 66 65 // TODO: app.bsky.notification.putPreferences 67 66 // TODO: app.bsky.notification.putPreferencesV2 68 - .route("/app.bsky.unspecced.getPostThreadV2", get(unspecced::thread_v2::get_post_thread_v2)) 69 - .route("/app.bsky.unspecced.getPostThreadOtherV2", get(unspecced::thread_v2::get_post_thread_other_v2)) 70 67 } 71 68 72 69 async fn not_implemented() -> axum::http::StatusCode {
-1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
··· 1 - pub mod thread_v2;
-379
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
··· 1 - use crate::db::ThreadItem; 2 - use crate::hydration::StatefulHydrator; 3 - use crate::xrpc::error::{Error, XrpcResult}; 4 - use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; 5 - use crate::xrpc::normalise_at_uri; 6 - use crate::GlobalState; 7 - use axum::extract::{Query, State}; 8 - use axum::Json; 9 - use itertools::Itertools; 10 - use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadgateView}; 11 - use lexica::app_bsky::unspecced::{ThreadItemPost, ThreadV2Item, ThreadV2ItemType}; 12 - use serde::{Deserialize, Serialize}; 13 - use std::cmp::Ordering; 14 - use std::collections::{HashMap, HashSet}; 15 - 16 - const THREAD_PARENTS: usize = 50; 17 - const DEFAULT_BRANCHING: u32 = 10; 18 - const DEFAULT_DEPTH: u32 = 6; 19 - 20 - #[derive(Copy, Clone, Debug, Default, Deserialize)] 21 - #[serde(rename_all = "lowercase")] 22 - pub enum PostThreadSort { 23 - Newest, 24 - #[default] 25 - Oldest, 26 - Top, 27 - } 28 - 29 - #[derive(Debug, Deserialize)] 30 - #[serde(rename_all = "camelCase")] 31 - pub struct GetPostThreadV2Req { 32 - pub anchor: String, 33 - pub above: Option<bool>, 34 - pub below: Option<u32>, 35 - pub branching_factor: Option<u32>, 36 - #[serde(default)] 37 - pub sort: PostThreadSort, 38 - } 39 - 40 - #[derive(Debug, Serialize)] 41 - #[serde(rename_all = "camelCase")] 42 - pub struct GetPostThreadV2Res { 43 - pub thread: Vec<ThreadV2Item>, 44 - #[serde(skip_serializing_if = "Option::is_none")] 45 - pub threadgate: Option<ThreadgateView>, 46 - pub has_other_replies: bool, 47 - } 48 - 49 - pub async fn get_post_thread_v2( 50 - State(state): State<GlobalState>, 51 - AtpAcceptLabelers(labelers): AtpAcceptLabelers, 52 - maybe_auth: Option<AtpAuth>, 53 - Query(query): Query<GetPostThreadV2Req>, 54 - ) -> XrpcResult<Json<GetPostThreadV2Res>> { 55 - let mut conn = state.pool.get().await?; 56 - let maybe_did = maybe_auth.clone().map(|v| v.0); 57 - let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 58 - 59 - let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 60 - let depth = query.below.unwrap_or(DEFAULT_DEPTH).clamp(0, 20) as i32; 61 - let branching_factor = query 62 - .branching_factor 63 - .unwrap_or(DEFAULT_BRANCHING) 64 - .clamp(0, 100) as i32; 65 - 66 - let anchor = hyd 67 - .hydrate_post(uri.clone()) 68 - .await 69 - .ok_or(Error::not_found())?; 70 - 71 - if let Some(v) = &anchor.author.viewer { 72 - if v.blocked_by || v.blocking.is_some() { 73 - let block = ThreadV2ItemType::Blocked { 74 - author: BlockedAuthor { 75 - did: anchor.author.did, 76 - viewer: anchor.author.viewer, 77 - }, 78 - }; 79 - 80 - return Ok(Json(GetPostThreadV2Res { 81 - thread: vec![ThreadV2Item { 82 - uri, 83 - depth: 0, 84 - value: block, 85 - }], 86 - threadgate: anchor.threadgate, 87 - has_other_replies: false, 88 - })); 89 - } 90 - } 91 - 92 - // get the root post URI (if there is one) and return its author's DID. 93 - let root_uri = crate::db::get_root_post(&mut conn, &uri) 94 - .await? 95 - .unwrap_or(uri.clone()); 96 - let root_did = root_uri[5..].split('/').collect::<Vec<_>>()[0]; 97 - 98 - let replies = 99 - crate::db::get_thread_children_branching(&mut conn, &uri, depth, branching_factor + 1) 100 - .await?; 101 - let reply_uris = replies 102 - .iter() 103 - .map(|item| item.at_uri.clone()) 104 - .collect::<Vec<_>>(); 105 - 106 - // bluesky seems to use -50 atm. we get 1 extra to know if to set more_parents. 107 - let parents = match query.above.unwrap_or(true) { 108 - true => crate::db::get_thread_parents(&mut conn, &uri, THREAD_PARENTS as i32 + 1).await?, 109 - false => vec![], 110 - }; 111 - let parent_uris = parents 112 - .iter() 113 - .map(|item| item.at_uri.clone()) 114 - .collect::<Vec<_>>(); 115 - 116 - let (mut replies_hyd, mut parents_hyd) = tokio::join!( 117 - hyd.hydrate_posts(reply_uris), 118 - hyd.hydrate_posts(parent_uris), 119 - ); 120 - 121 - let threadgate = anchor.threadgate.clone(); 122 - let hidden: HashSet<_, std::hash::RandomState> = match &threadgate { 123 - Some(tg) => crate::db::get_threadgate_hiddens(&mut conn, &tg.uri).await?, 124 - None => None, 125 - } 126 - .map(|hiddens| HashSet::from_iter(Vec::from(hiddens))) 127 - .unwrap_or_default(); 128 - 129 - let root_has_more = parents.len() > THREAD_PARENTS; 130 - let mut is_op_thread = true; 131 - 132 - let mut thread = Vec::with_capacity(1 + replies.len() + parents.len()); 133 - 134 - thread.extend( 135 - parents 136 - .into_iter() 137 - .tail(THREAD_PARENTS) 138 - .enumerate() 139 - .map(|(idx, item)| { 140 - let value = parents_hyd 141 - .remove(&item.at_uri) 142 - .map(|post| { 143 - if let Some(v) = &post.author.viewer { 144 - if v.blocked_by || v.blocking.is_some() { 145 - return ThreadV2ItemType::Blocked { 146 - author: BlockedAuthor { 147 - did: post.author.did, 148 - viewer: post.author.viewer, 149 - }, 150 - }; 151 - } 152 - } 153 - 154 - let op_thread = (is_op_thread 155 - || item.root_uri.is_none() && item.parent_uri.is_none()) 156 - && post.author.did == root_did; 157 - 158 - ThreadV2ItemType::Post(ThreadItemPost { 159 - post, 160 - more_parents: idx == 0 && root_has_more, 161 - more_replies: 0, 162 - op_thread, 163 - hidden_by_threadgate: false, 164 - muted_by_viewer: false, 165 - }) 166 - }) 167 - .unwrap_or(ThreadV2ItemType::NotFound {}); 168 - 169 - ThreadV2Item { 170 - uri: item.at_uri, 171 - depth: -item.depth - 1, 172 - value, 173 - } 174 - }), 175 - ); 176 - 177 - is_op_thread = is_op_thread && anchor.author.did == root_did; 178 - thread.push(ThreadV2Item { 179 - uri: uri.clone(), 180 - depth: 0, 181 - value: ThreadV2ItemType::Post(ThreadItemPost { 182 - post: anchor, 183 - more_parents: false, 184 - more_replies: 0, 185 - op_thread: is_op_thread, 186 - hidden_by_threadgate: false, 187 - muted_by_viewer: false, 188 - }), 189 - }); 190 - 191 - let mut replies_grouped = replies 192 - .into_iter() 193 - .into_group_map_by(|item| item.parent_uri.clone().unwrap_or_default()); 194 - 195 - // start with the anchor 196 - let (children, has_other_replies) = build_thread_children( 197 - &mut replies_grouped, 198 - &mut replies_hyd, 199 - &hidden, 200 - &uri, 201 - is_op_thread, 202 - 1, 203 - &BuildThreadChildrenOpts { 204 - root_did, 205 - sort: query.sort, 206 - maybe_did: &maybe_did, 207 - max_depth: depth, 208 - }, 209 - ); 210 - thread.extend(children); 211 - 212 - Ok(Json(GetPostThreadV2Res { 213 - thread, 214 - threadgate, 215 - has_other_replies, 216 - })) 217 - } 218 - 219 - #[derive(Debug, Deserialize)] 220 - #[serde(rename_all = "camelCase")] 221 - pub struct GetPostThreadOtherV2Req { 222 - pub anchor: String, 223 - } 224 - 225 - #[derive(Debug, Serialize)] 226 - #[serde(rename_all = "camelCase")] 227 - pub struct GetPostThreadOtherV2Res { 228 - pub thread: Vec<ThreadV2Item>, 229 - } 230 - 231 - pub async fn get_post_thread_other_v2( 232 - State(state): State<GlobalState>, 233 - AtpAcceptLabelers(labelers): AtpAcceptLabelers, 234 - maybe_auth: Option<AtpAuth>, 235 - Query(query): Query<GetPostThreadOtherV2Req>, 236 - ) -> XrpcResult<Json<GetPostThreadOtherV2Res>> { 237 - let mut conn = state.pool.get().await?; 238 - let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 239 - 240 - let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?; 241 - 242 - let root = crate::db::get_root_post(&mut conn, &uri) 243 - .await? 244 - .unwrap_or(uri.clone()); 245 - 246 - // this only returns immediate children (depth==1) where hiddenByThreadgate=TRUE 247 - let replies = crate::db::get_thread_children_hidden(&mut conn, &uri, &root).await?; 248 - let reply_uris = replies 249 - .into_iter() 250 - .map(|item| item.at_uri) 251 - .collect::<Vec<_>>(); 252 - let thread = hyd 253 - .hydrate_posts(reply_uris) 254 - .await 255 - .into_iter() 256 - .filter(|(_, post)| matches!(&post.author.viewer, Some(viewer) if viewer.blocked_by || viewer.blocking.is_some())) 257 - .map(|(uri, post)| { 258 - let post = ThreadItemPost { 259 - post, 260 - more_parents: false, 261 - more_replies: 0, 262 - op_thread: false, 263 - hidden_by_threadgate: true, 264 - muted_by_viewer: false, 265 - }; 266 - 267 - ThreadV2Item { 268 - uri, 269 - depth: 1, 270 - value: ThreadV2ItemType::Post(post), 271 - } 272 - }) 273 - .collect(); 274 - 275 - Ok(Json(GetPostThreadOtherV2Res { thread })) 276 - } 277 - 278 - #[derive(Debug)] 279 - struct BuildThreadChildrenOpts<'a> { 280 - root_did: &'a str, 281 - sort: PostThreadSort, 282 - maybe_did: &'a Option<String>, 283 - max_depth: i32, 284 - } 285 - 286 - fn build_thread_children( 287 - grouped_replies: &mut HashMap<String, Vec<ThreadItem>>, 288 - replies_hyd: &mut HashMap<String, PostView>, 289 - hidden: &HashSet<String>, 290 - parent: &str, 291 - is_op_thread: bool, 292 - depth: i32, 293 - opts: &BuildThreadChildrenOpts, 294 - ) -> (Vec<ThreadV2Item>, bool) { 295 - let mut has_other_replies = false; 296 - 297 - let Some(replies) = grouped_replies.remove(parent) else { 298 - return (Vec::default(), has_other_replies); 299 - }; 300 - 301 - let replies = replies 302 - .into_iter() 303 - .filter_map(|item| replies_hyd.remove(&item.at_uri)) 304 - .sorted_by(sort_replies(&opts.sort)); 305 - 306 - let mut out = Vec::new(); 307 - 308 - for post in replies { 309 - let reply_count = grouped_replies 310 - .get(&post.uri) 311 - .map(|v| v.len()) 312 - .unwrap_or_default(); 313 - let at_max = depth == opts.max_depth; 314 - let more_replies = if at_max { reply_count } else { 0 }; 315 - let op_thread = is_op_thread && post.author.did == opts.root_did; 316 - 317 - // shouldn't push to the thread if there's a block relation. Bsky doesn't push a type of Blocked for replies... 318 - if let Some(v) = &post.author.viewer { 319 - if v.blocked_by || v.blocking.is_some() { 320 - continue; 321 - } 322 - } 323 - 324 - // check if the post is hidden AND we're NOT the author (hidden posts still show for their author) 325 - if hidden.contains(&post.uri) && !did_is_cur(opts.maybe_did, &post.author.did) { 326 - // post is hidden - do not ~pass go~ push to the thread. 327 - if depth == 1 { 328 - has_other_replies = true; 329 - } 330 - continue; 331 - } 332 - 333 - let uri = post.uri.clone(); 334 - out.push(ThreadV2Item { 335 - uri: post.uri.clone(), 336 - depth, 337 - value: ThreadV2ItemType::Post(ThreadItemPost { 338 - post, 339 - more_parents: false, 340 - more_replies: more_replies as i32, 341 - op_thread, 342 - hidden_by_threadgate: false, 343 - muted_by_viewer: false, 344 - }), 345 - }); 346 - 347 - if !at_max { 348 - // we don't care about has_other_replies when recursing 349 - let (children, _) = build_thread_children( 350 - grouped_replies, 351 - replies_hyd, 352 - hidden, 353 - &uri, 354 - op_thread, 355 - depth + 1, 356 - opts, 357 - ); 358 - 359 - out.extend(children); 360 - } 361 - } 362 - 363 - (out, has_other_replies) 364 - } 365 - 366 - fn sort_replies(sort: &PostThreadSort) -> impl Fn(&PostView, &PostView) -> Ordering + use<'_> { 367 - move |a: &PostView, b: &PostView| match sort { 368 - PostThreadSort::Newest => b.indexed_at.cmp(&a.indexed_at), 369 - PostThreadSort::Oldest => a.indexed_at.cmp(&b.indexed_at), 370 - PostThreadSort::Top => b.stats.like_count.cmp(&a.stats.like_count), 371 - } 372 - } 373 - 374 - fn did_is_cur(cur: &Option<String>, did: &String) -> bool { 375 - match cur { 376 - Some(cur) => did == cur, 377 - None => false, 378 - } 379 - }
+1 -1
parakeet/src/xrpc/community_lexicon/bookmarks.rs
··· 60 60 .into_iter() 61 61 .map(|bookmark| Bookmark { 62 62 subject: bookmark.subject, 63 - tags: bookmark.tags.into(), 63 + tags: bookmark.tags.into_iter().flatten().collect(), 64 64 created_at: bookmark.created_at, 65 65 }) 66 66 .collect();
-3
parakeet/src/xrpc/jwt.rs
··· 4 4 use std::collections::HashMap; 5 5 use std::sync::{Arc, LazyLock}; 6 6 use tokio::sync::RwLock; 7 - use tracing::instrument; 8 7 9 8 static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[])); 10 9 static NO_VERIFY: LazyLock<Validation> = LazyLock::new(|| { ··· 39 38 } 40 39 } 41 40 42 - #[instrument(skip_all)] 43 41 pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> { 44 42 // first we need to decode without verifying, to get iss. 45 43 let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?; ··· 58 56 self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud) 59 57 } 60 58 61 - #[instrument(skip_all)] 62 59 async fn resolve_key(&self, did: &str) -> Option<String> { 63 60 tracing::trace!("resolving multikey for {did}"); 64 61 let did_doc = self.resolver.resolve_did(did).await.ok()??;
+13 -55
parakeet-db/src/models.rs
··· 137 137 138 138 pub content: String, 139 139 pub facets: Option<serde_json::Value>, 140 - pub languages: not_null_vec::TextArray, 141 - pub tags: not_null_vec::TextArray, 140 + pub languages: Vec<Option<String>>, 141 + pub tags: Vec<Option<String>>, 142 142 143 143 pub parent_uri: Option<String>, 144 144 pub parent_cid: Option<String>, ··· 148 148 pub embed: Option<String>, 149 149 pub embed_subtype: Option<String>, 150 150 151 - pub mentions: Option<not_null_vec::TextArray>, 151 + pub mentions: Option<Vec<Option<String>>>, 152 152 pub violates_threadgate: bool, 153 153 154 154 pub created_at: DateTime<Utc>, ··· 236 236 pub cid: String, 237 237 pub post_uri: String, 238 238 239 - pub detached: not_null_vec::TextArray, 240 - pub rules: not_null_vec::TextArray, 239 + pub detached: Vec<Option<String>>, 240 + pub rules: Vec<Option<String>>, 241 241 242 242 pub created_at: DateTime<Utc>, 243 243 pub indexed_at: NaiveDateTime, ··· 252 252 pub cid: String, 253 253 pub post_uri: String, 254 254 255 - pub hidden_replies: not_null_vec::TextArray, 256 - pub allow: Option<not_null_vec::TextArray>, 257 - pub allowed_lists: Option<not_null_vec::TextArray>, 255 + pub hidden_replies: Vec<Option<String>>, 256 + pub allow: Option<Vec<Option<String>>>, 257 + pub allowed_lists: Option<Vec<Option<String>>>, 258 258 259 259 pub record: serde_json::Value, 260 260 ··· 276 276 pub description: Option<String>, 277 277 pub description_facets: Option<serde_json::Value>, 278 278 pub list: String, 279 - pub feeds: Option<not_null_vec::TextArray>, 279 + pub feeds: Option<Vec<Option<String>>>, 280 280 281 281 pub created_at: DateTime<Utc>, 282 282 pub indexed_at: NaiveDateTime, ··· 290 290 pub did: String, 291 291 pub cid: String, 292 292 293 - pub reasons: Option<not_null_vec::TextArray>, 294 - pub subject_types: Option<not_null_vec::TextArray>, 295 - pub subject_collections: Option<not_null_vec::TextArray>, 293 + pub reasons: Option<Vec<Option<String>>>, 294 + pub subject_types: Option<Vec<Option<String>>>, 295 + pub subject_collections: Option<Vec<Option<String>>>, 296 296 297 297 pub created_at: NaiveDateTime, 298 298 pub indexed_at: NaiveDateTime, ··· 402 402 pub subject: String, 403 403 pub subject_cid: Option<String>, 404 404 pub subject_type: String, 405 - pub tags: not_null_vec::TextArray, 405 + pub tags: Vec<Option<String>>, 406 406 pub created_at: DateTime<Utc>, 407 407 } 408 408 ··· 430 430 pub typ: String, 431 431 pub sort_at: DateTime<Utc>, 432 432 } 433 - 434 - pub use not_null_vec::TextArray; 435 - mod not_null_vec { 436 - use diesel::deserialize::FromSql; 437 - use diesel::pg::Pg; 438 - use diesel::sql_types::{Array, Nullable, Text}; 439 - use diesel::{deserialize, FromSqlRow}; 440 - use serde::{Deserialize, Serialize}; 441 - use std::ops::{Deref, DerefMut}; 442 - 443 - #[derive(Clone, Debug, Default, Serialize, Deserialize, FromSqlRow)] 444 - #[diesel(sql_type = Array<Nullable<Text>>)] 445 - pub struct TextArray(pub Vec<String>); 446 - 447 - impl FromSql<Array<Nullable<Text>>, Pg> for TextArray { 448 - fn from_sql(bytes: diesel::pg::PgValue<'_>) -> deserialize::Result<Self> { 449 - let vec_with_nulls = 450 - <Vec<Option<String>> as FromSql<Array<Nullable<Text>>, Pg>>::from_sql(bytes)?; 451 - Ok(TextArray(vec_with_nulls.into_iter().flatten().collect())) 452 - } 453 - } 454 - 455 - impl Deref for TextArray { 456 - type Target = Vec<String>; 457 - 458 - fn deref(&self) -> &Self::Target { 459 - &self.0 460 - } 461 - } 462 - 463 - impl DerefMut for TextArray { 464 - fn deref_mut(&mut self) -> &mut Self::Target { 465 - &mut self.0 466 - } 467 - } 468 - 469 - impl From<TextArray> for Vec<String> { 470 - fn from(v: TextArray) -> Vec<String> { 471 - v.0 472 - } 473 - } 474 - }
+2 -24
parakeet-index/Cargo.toml
··· 10 10 [dependencies] 11 11 tonic = "0.13.0" 12 12 prost = "0.13.5" 13 - tonic-tracing-opentelemetry = { version = "0.32", optional = true } 14 - tower = { version = "0.5", optional = true } 15 13 16 14 eyre = { version = "0.6.12", optional = true } 17 15 figment = { version = "0.10.19", features = ["env", "toml"], optional = true } 18 16 itertools = { version = "0.14.0", optional = true } 19 - opentelemetry = { version = "0.31.0", optional = true } 20 - opentelemetry-otlp = { version = "0.31.0", features = ["reqwest-rustls"], optional = true } 21 - opentelemetry_sdk = { version = "0.31.0", optional = true } 22 17 rocksdb = { version = "0.23", default-features = false, features = ["lz4", "bindgen-runtime"], optional = true } 23 18 serde = { version = "1.0.217", features = ["derive"], optional = true } 24 19 tokio = { version = "1.42.0", features = ["full"], optional = true } 25 20 tonic-health = { version = "0.13.0", optional = true } 26 21 tracing = { version = "0.1.40", optional = true } 27 - tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true } 28 - tracing-opentelemetry = { version = "0.32", optional = true } 22 + tracing-subscriber = { version = "0.3.18", optional = true } 29 23 30 24 [build-dependencies] 31 25 tonic-build = "0.13.0" 32 26 33 27 [features] 34 - otel = ["dep:tonic-tracing-opentelemetry", "dep:tower"] 35 - server = [ 36 - "dep:eyre", 37 - "dep:figment", 38 - "dep:itertools", 39 - "dep:opentelemetry", 40 - "dep:opentelemetry-otlp", 41 - "dep:opentelemetry_sdk", 42 - "dep:rocksdb", 43 - "dep:serde", 44 - "dep:tokio", 45 - "dep:tonic-health", 46 - "otel", 47 - "dep:tracing", 48 - "dep:tracing-subscriber", 49 - "dep:tracing-opentelemetry" 50 - ] 28 + server = ["dep:eyre", "dep:figment", "dep:itertools", "dep:rocksdb", "dep:serde", "dep:tokio", "dep:tonic-health", "dep:tracing", "dep:tracing-subscriber"]
+1 -20
parakeet-index/src/lib.rs
··· 1 - use tonic::transport::Channel; 2 - 3 1 #[allow(clippy::all)] 4 2 pub mod index { 5 3 tonic::include_proto!("parakeet"); 6 4 } 7 5 8 6 pub use index::*; 9 - #[cfg(not(feature = "otel"))] 10 - pub type Client = index_client::IndexClient<Channel>; 11 - #[cfg(feature = "otel")] 12 - pub type Client = index_client::IndexClient< 13 - tonic_tracing_opentelemetry::middleware::client::OtelGrpcService<Channel>, 14 - >; 7 + pub type Client = index_client::IndexClient<tonic::transport::Channel>; 15 8 16 9 #[cfg(feature = "server")] 17 10 pub mod server; 18 - 19 - #[cfg(feature = "otel")] 20 - pub async fn connect_with_otel( 21 - uri: String, 22 - ) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> { 23 - let channel = Channel::from_shared(uri)?.connect().await?; 24 - let channel = tower::ServiceBuilder::new() 25 - .layer(tonic_tracing_opentelemetry::middleware::client::OtelGrpcLayer) 26 - .service(channel); 27 - 28 - Ok(index_client::IndexClient::new(channel)) 29 - }
+3 -9
parakeet-index/src/main.rs
··· 1 1 use parakeet_index::index_server::IndexServer; 2 2 use parakeet_index::server::service::Service; 3 - use parakeet_index::server::{GlobalState, config, instrumentation}; 3 + use parakeet_index::server::{GlobalState, config}; 4 4 use std::sync::Arc; 5 5 use tonic::transport::Server; 6 - use tonic_tracing_opentelemetry::middleware::server::OtelGrpcLayer; 7 6 8 7 #[tokio::main] 9 8 async fn main() -> eyre::Result<()> { 10 - let conf = config::load_config()?; 9 + tracing_subscriber::fmt::init(); 11 10 12 - instrumentation::init_instruments(&conf.instruments); 11 + let conf = config::load_config()?; 13 12 14 13 let db_root = conf.index_db_path.parse()?; 15 14 let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port); ··· 19 18 reporter.set_serving::<IndexServer<Service>>().await; 20 19 21 20 let service = Service::new(state.clone()); 22 - 23 - let mw = tower::ServiceBuilder::new() 24 - .option_layer(conf.instruments.otel_enable.then(OtelGrpcLayer::default)); 25 - 26 21 Server::builder() 27 - .layer(mw) 28 22 .add_service(health_service) 29 23 .add_service(IndexServer::new(service)) 30 24 .serve(addr)
-10
parakeet-index/src/server/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 - #[serde(flatten)] 17 - pub instruments: ConfigInstruments, 18 16 pub database_url: String, 19 17 pub index_db_path: String, 20 18 #[serde(default)] 21 19 pub server: ConfigServer, 22 - } 23 - 24 - #[derive(Debug, Deserialize)] 25 - pub struct ConfigInstruments { 26 - #[serde(default)] 27 - pub otel_enable: bool, 28 - #[serde(default)] 29 - pub log_json: bool, 30 20 } 31 21 32 22 #[derive(Debug, Deserialize)]
-57
parakeet-index/src/server/instrumentation.rs
··· 1 - use opentelemetry::trace::TracerProvider; 2 - use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; 3 - use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider}; 4 - use tracing::Subscriber; 5 - use tracing_opentelemetry::OpenTelemetryLayer; 6 - use tracing_subscriber::filter::Filtered; 7 - use tracing_subscriber::layer::SubscriberExt; 8 - use tracing_subscriber::registry::LookupSpan; 9 - use tracing_subscriber::util::SubscriberInitExt; 10 - use tracing_subscriber::{EnvFilter, Layer}; 11 - 12 - pub fn init_instruments(cfg: &super::config::ConfigInstruments) { 13 - let otel_layer = cfg.otel_enable.then(init_otel); 14 - let log_layer = init_log(cfg.log_json); 15 - 16 - tracing_subscriber::registry() 17 - .with(log_layer) 18 - .with(otel_layer) 19 - .init(); 20 - } 21 - 22 - fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S> 23 - where 24 - S: Subscriber + for<'span> LookupSpan<'span>, 25 - { 26 - let span_exporter = SpanExporter::builder() 27 - .with_http() 28 - .with_protocol(Protocol::HttpBinary) 29 - .build() 30 - .unwrap(); 31 - 32 - let tracer_provider = SdkTracerProvider::builder() 33 - .with_batch_exporter(span_exporter) 34 - .with_sampler(Sampler::AlwaysOn) 35 - .build(); 36 - 37 - opentelemetry::global::set_tracer_provider(tracer_provider.clone()); 38 - 39 - let tracer = tracer_provider.tracer("parakeet"); 40 - let otel_filter = EnvFilter::new("info,otel::tracing=trace"); 41 - 42 - OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 43 - } 44 - 45 - fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 46 - where 47 - S: Subscriber + for<'span> LookupSpan<'span>, 48 - { 49 - let stdout_filter = 50 - EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 51 - 52 - match json { 53 - true => tracing_subscriber::fmt::layer().json().boxed(), 54 - false => tracing_subscriber::fmt::layer().boxed(), 55 - } 56 - .with_filter(stdout_filter) 57 - }
-1
parakeet-index/src/server/mod.rs
··· 2 2 3 3 pub mod config; 4 4 pub mod db; 5 - pub mod instrumentation; 6 5 pub mod service; 7 6 mod utils; 8 7