Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

Compare changes

Choose any two refs to compare.

+12 -364
Cargo.lock
··· 286 ] 287 288 [[package]] 289 - name = "axum-tracing-opentelemetry" 290 - version = "0.32.1" 291 - source = "registry+https://github.com/rust-lang/crates.io-index" 292 - checksum = "328c8ddd5ca871b2a5acb00be0b4f103aa62f5d6b6db4071ccf3b12b0629e7c1" 293 - dependencies = [ 294 - "axum", 295 - "futures-core", 296 - "futures-util", 297 - "http", 298 - "opentelemetry", 299 - "opentelemetry-semantic-conventions", 300 - "pin-project-lite", 301 - "tower", 302 - "tracing", 303 - "tracing-opentelemetry", 304 - "tracing-opentelemetry-instrumentation-sdk", 305 - ] 306 - 307 - [[package]] 308 name = "backtrace" 309 version = "0.3.74" 310 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 366 "proc-macro2", 367 "quote", 368 "regex", 369 - "rustc-hash 1.1.0", 370 "shlex", 371 "syn", 372 "which", ··· 484 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" 485 486 [[package]] 487 - name = "cfg_aliases" 488 - version = "0.2.1" 489 - source = "registry+https://github.com/rust-lang/crates.io-index" 490 - checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" 491 - 492 - [[package]] 493 name = "chrono" 494 version = "0.4.41" 495 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1395 checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" 1396 dependencies = [ 1397 "cfg-if", 1398 - "js-sys", 1399 "libc", 1400 "r-efi", 1401 "wasi 0.14.2+wasi-0.2.4", 1402 - "wasm-bindgen", 1403 ] 1404 1405 [[package]] ··· 2196 ] 2197 2198 [[package]] 2199 - name = "lru-slab" 2200 - version = "0.1.2" 2201 - source = "registry+https://github.com/rust-lang/crates.io-index" 2202 - checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" 2203 - 2204 - [[package]] 2205 name = "lz4-sys" 2206 version = "1.11.1+lz4-1.10.0" 2207 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2218 checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" 2219 2220 [[package]] 2221 - name = "matchers" 2222 - version = "0.1.0" 2223 - source = "registry+https://github.com/rust-lang/crates.io-index" 2224 - checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" 2225 - dependencies = [ 2226 - "regex-automata 0.1.10", 2227 - ] 2228 - 2229 - [[package]] 2230 name = "matchit" 2231 version = "0.8.4" 2232 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2555 ] 2556 2557 [[package]] 2558 - name = "opentelemetry" 2559 - version = "0.31.0" 2560 - source = "registry+https://github.com/rust-lang/crates.io-index" 2561 - checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" 2562 - dependencies = [ 2563 - "futures-core", 2564 - "futures-sink", 2565 - "js-sys", 2566 - "pin-project-lite", 2567 - "thiserror 2.0.12", 2568 - "tracing", 2569 - ] 2570 - 2571 - [[package]] 2572 - name = "opentelemetry-http" 2573 - version = "0.31.0" 2574 - source = "registry+https://github.com/rust-lang/crates.io-index" 2575 - checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" 2576 - dependencies = [ 2577 - "async-trait", 2578 - "bytes", 2579 - "http", 2580 - "opentelemetry", 2581 - "reqwest", 2582 - ] 2583 - 2584 - [[package]] 2585 - name = "opentelemetry-otlp" 2586 - version = "0.31.0" 2587 - source = "registry+https://github.com/rust-lang/crates.io-index" 2588 - checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" 2589 - dependencies = [ 2590 - "http", 2591 - "opentelemetry", 2592 - "opentelemetry-http", 2593 - "opentelemetry-proto", 2594 - "opentelemetry_sdk", 2595 - "prost 0.14.1", 2596 - "reqwest", 2597 - "thiserror 2.0.12", 2598 - "tracing", 2599 - ] 2600 - 2601 - [[package]] 2602 - name = "opentelemetry-proto" 2603 - version = "0.31.0" 2604 - source = "registry+https://github.com/rust-lang/crates.io-index" 2605 - checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" 2606 - dependencies = [ 2607 - "opentelemetry", 2608 - "opentelemetry_sdk", 2609 - "prost 0.14.1", 2610 - "tonic 0.14.2", 2611 - "tonic-prost", 2612 - ] 2613 - 2614 - [[package]] 2615 - name = "opentelemetry-semantic-conventions" 2616 - version = "0.31.0" 2617 - source = "registry+https://github.com/rust-lang/crates.io-index" 2618 - checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" 2619 - 2620 - [[package]] 2621 - name = "opentelemetry_sdk" 2622 - version = "0.31.0" 2623 - source = "registry+https://github.com/rust-lang/crates.io-index" 2624 - checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" 2625 - dependencies = [ 2626 - "futures-channel", 2627 - "futures-executor", 2628 - "futures-util", 2629 - "opentelemetry", 2630 - "percent-encoding", 2631 - "rand 0.9.1", 2632 - "thiserror 2.0.12", 2633 - ] 2634 - 2635 - [[package]] 2636 name = "overload" 2637 version = "0.1.1" 2638 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2669 "async-recursion", 2670 "axum", 2671 "axum-extra", 2672 - "axum-tracing-opentelemetry", 2673 "base64 0.22.1", 2674 "chrono", 2675 "dataloader", ··· 2684 "jsonwebtoken", 2685 "lexica", 2686 "multibase", 2687 - "opentelemetry", 2688 - "opentelemetry-otlp", 2689 - "opentelemetry_sdk", 2690 "parakeet-db", 2691 "parakeet-index", 2692 "redis", ··· 2695 "serde_ipld_dagcbor", 2696 "serde_json", 2697 "tokio", 2698 - "tower", 2699 "tower-http", 2700 "tracing", 2701 - "tracing-opentelemetry", 2702 "tracing-subscriber", 2703 ] 2704 ··· 2720 "eyre", 2721 "figment", 2722 "itertools 0.14.0", 2723 - "opentelemetry", 2724 - "opentelemetry-otlp", 2725 - "opentelemetry_sdk", 2726 - "prost 0.13.5", 2727 "rocksdb", 2728 "serde", 2729 "tokio", 2730 - "tonic 0.13.1", 2731 "tonic-build", 2732 "tonic-health", 2733 - "tonic-tracing-opentelemetry", 2734 - "tower", 2735 "tracing", 2736 - "tracing-opentelemetry", 2737 "tracing-subscriber", 2738 ] 2739 ··· 3039 checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 3040 dependencies = [ 3041 "bytes", 3042 - "prost-derive 0.13.5", 3043 - ] 3044 - 3045 - [[package]] 3046 - name = "prost" 3047 - version = "0.14.1" 3048 - source = "registry+https://github.com/rust-lang/crates.io-index" 3049 - checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" 3050 - dependencies = [ 3051 - "bytes", 3052 - "prost-derive 0.14.1", 3053 ] 3054 3055 [[package]] ··· 3065 "once_cell", 3066 "petgraph", 3067 "prettyplease", 3068 - "prost 0.13.5", 3069 "prost-types", 3070 "regex", 3071 "syn", ··· 3086 ] 3087 3088 [[package]] 3089 - name = "prost-derive" 3090 - version = "0.14.1" 3091 - source = "registry+https://github.com/rust-lang/crates.io-index" 3092 - checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" 3093 - dependencies = [ 3094 - "anyhow", 3095 - "itertools 0.14.0", 3096 - "proc-macro2", 3097 - "quote", 3098 - "syn", 3099 - ] 3100 - 3101 - [[package]] 3102 name = "prost-types" 3103 version = "0.13.5" 3104 source = "registry+https://github.com/rust-lang/crates.io-index" 3105 checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" 3106 dependencies = [ 3107 - "prost 0.13.5", 3108 ] 3109 3110 [[package]] ··· 3129 checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" 3130 3131 [[package]] 3132 - name = "quinn" 3133 - version = "0.11.9" 3134 - source = "registry+https://github.com/rust-lang/crates.io-index" 3135 - checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" 3136 - dependencies = [ 3137 - "bytes", 3138 - "cfg_aliases", 3139 - "pin-project-lite", 3140 - "quinn-proto", 3141 - "quinn-udp", 3142 - "rustc-hash 2.1.1", 3143 - "rustls", 3144 - "socket2 0.6.0", 3145 - "thiserror 2.0.12", 3146 - "tokio", 3147 - "tracing", 3148 - "web-time", 3149 - ] 3150 - 3151 - [[package]] 3152 - name = "quinn-proto" 3153 - version = "0.11.13" 3154 - source = "registry+https://github.com/rust-lang/crates.io-index" 3155 - checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" 3156 - dependencies = [ 3157 - "bytes", 3158 - "getrandom 0.3.3", 3159 - "lru-slab", 3160 - "rand 0.9.1", 3161 - "ring", 3162 - "rustc-hash 2.1.1", 3163 - "rustls", 3164 - "rustls-pki-types", 3165 - "slab", 3166 - "thiserror 2.0.12", 3167 - "tinyvec", 3168 - "tracing", 3169 - "web-time", 3170 - ] 3171 - 3172 - [[package]] 3173 - name = "quinn-udp" 3174 - version = "0.5.14" 3175 - source = "registry+https://github.com/rust-lang/crates.io-index" 3176 - checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" 3177 - dependencies = [ 3178 - "cfg_aliases", 3179 - "libc", 3180 - "once_cell", 3181 - "socket2 0.6.0", 3182 - "tracing", 3183 - "windows-sys 0.59.0", 3184 - ] 3185 - 3186 - [[package]] 3187 name = "quote" 3188 version = "1.0.38" 3189 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3345 dependencies = [ 3346 "aho-corasick", 3347 "memchr", 3348 - "regex-automata 0.4.9", 3349 - "regex-syntax 0.8.5", 3350 - ] 3351 - 3352 - [[package]] 3353 - name = "regex-automata" 3354 - version = "0.1.10" 3355 - source = "registry+https://github.com/rust-lang/crates.io-index" 3356 - checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" 3357 - dependencies = [ 3358 - "regex-syntax 0.6.29", 3359 ] 3360 3361 [[package]] ··· 3366 dependencies = [ 3367 "aho-corasick", 3368 "memchr", 3369 - "regex-syntax 0.8.5", 3370 ] 3371 3372 [[package]] 3373 name = "regex-syntax" 3374 - version = "0.6.29" 3375 - source = "registry+https://github.com/rust-lang/crates.io-index" 3376 - checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" 3377 - 3378 - [[package]] 3379 - name = "regex-syntax" 3380 version = "0.8.5" 3381 source = "registry+https://github.com/rust-lang/crates.io-index" 3382 checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" ··· 3391 "base64 0.22.1", 3392 "bytes", 3393 "encoding_rs", 3394 - "futures-channel", 3395 "futures-core", 3396 "futures-util", 3397 "h2", ··· 3410 "once_cell", 3411 "percent-encoding", 3412 "pin-project-lite", 3413 - "quinn", 3414 - "rustls", 3415 - "rustls-native-certs", 3416 "rustls-pemfile", 3417 - "rustls-pki-types", 3418 "serde", 3419 "serde_json", 3420 "serde_urlencoded", ··· 3422 "system-configuration", 3423 "tokio", 3424 "tokio-native-tls", 3425 - "tokio-rustls", 3426 "tokio-util", 3427 "tower", 3428 "tower-service", ··· 3512 checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" 3513 3514 [[package]] 3515 - name = "rustc-hash" 3516 - version = "2.1.1" 3517 - source = "registry+https://github.com/rust-lang/crates.io-index" 3518 - checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 3519 - 3520 - [[package]] 3521 name = "rustc_version" 3522 version = "0.4.1" 3523 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3547 dependencies = [ 3548 "aws-lc-rs", 3549 "once_cell", 3550 - "ring", 3551 "rustls-pki-types", 3552 "rustls-webpki", 3553 "subtle", ··· 3580 version = "1.11.0" 3581 source = "registry+https://github.com/rust-lang/crates.io-index" 3582 checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" 3583 - dependencies = [ 3584 - "web-time", 3585 - ] 3586 3587 [[package]] 3588 name = "rustls-webpki" ··· 4426 "hyper-util", 4427 "percent-encoding", 4428 "pin-project", 4429 - "prost 0.13.5", 4430 "socket2 0.5.8", 4431 "tokio", 4432 "tokio-stream", 4433 "tower", 4434 - "tower-layer", 4435 - "tower-service", 4436 - "tracing", 4437 - ] 4438 - 4439 - [[package]] 4440 - name = "tonic" 4441 - version = "0.14.2" 4442 - source = "registry+https://github.com/rust-lang/crates.io-index" 4443 - checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" 4444 - dependencies = [ 4445 - "async-trait", 4446 - "base64 0.22.1", 4447 - "bytes", 4448 - "http", 4449 - "http-body", 4450 - "http-body-util", 4451 - "percent-encoding", 4452 - "pin-project", 4453 - "sync_wrapper", 4454 - "tokio-stream", 4455 "tower-layer", 4456 "tower-service", 4457 "tracing", ··· 4477 source = "registry+https://github.com/rust-lang/crates.io-index" 4478 checksum = "cb87334d340313fefa513b6e60794d44a86d5f039b523229c99c323e4e19ca4b" 4479 dependencies = [ 4480 - "prost 0.13.5", 4481 "tokio", 4482 "tokio-stream", 4483 - "tonic 0.13.1", 4484 - ] 4485 - 4486 - [[package]] 4487 - name = "tonic-prost" 4488 - version = "0.14.2" 4489 - source = "registry+https://github.com/rust-lang/crates.io-index" 4490 - checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" 4491 - dependencies = [ 4492 - "bytes", 4493 - "prost 0.14.1", 4494 - "tonic 0.14.2", 4495 - ] 4496 - 4497 - [[package]] 4498 - name = "tonic-tracing-opentelemetry" 4499 - version = "0.32.0" 4500 - source = "registry+https://github.com/rust-lang/crates.io-index" 4501 - checksum = "31f57ac46b32b08989476b498239364c300b09d75928c1fa2e46cb489a41c8e3" 4502 - dependencies = [ 4503 - "futures-core", 4504 - "futures-util", 4505 - "http", 4506 - "http-body", 4507 - "hyper", 4508 - "opentelemetry", 4509 - "pin-project-lite", 4510 - "tonic 0.14.2", 4511 - "tower", 4512 - "tracing", 4513 - "tracing-opentelemetry", 4514 - "tracing-opentelemetry-instrumentation-sdk", 4515 ] 4516 4517 [[package]] ··· 4606 ] 4607 4608 [[package]] 4609 - name = "tracing-opentelemetry" 4610 - version = "0.32.0" 4611 - source = "registry+https://github.com/rust-lang/crates.io-index" 4612 - checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" 4613 - dependencies = [ 4614 - "js-sys", 4615 - "opentelemetry", 4616 - "opentelemetry_sdk", 4617 - "rustversion", 4618 - "smallvec", 4619 - "thiserror 2.0.12", 4620 - "tracing", 4621 - "tracing-core", 4622 - "tracing-log", 4623 - "tracing-subscriber", 4624 - "web-time", 4625 - ] 4626 - 4627 - [[package]] 4628 - name = "tracing-opentelemetry-instrumentation-sdk" 4629 - version = "0.32.1" 4630 - source = "registry+https://github.com/rust-lang/crates.io-index" 4631 - checksum = "7a1a4dcfb798af2cef9e47c30a14e13c108b4b40e057120401b2025ec622c416" 4632 - dependencies = [ 4633 - "http", 4634 - "opentelemetry", 4635 - "opentelemetry-semantic-conventions", 4636 - "tracing", 4637 - "tracing-opentelemetry", 4638 - ] 4639 - 4640 - [[package]] 4641 - name = "tracing-serde" 4642 - version = "0.2.0" 4643 - source = "registry+https://github.com/rust-lang/crates.io-index" 4644 - checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" 4645 - dependencies = [ 4646 - "serde", 4647 - "tracing-core", 4648 - ] 4649 - 4650 - [[package]] 4651 name = "tracing-subscriber" 4652 version = "0.3.19" 4653 source = "registry+https://github.com/rust-lang/crates.io-index" 4654 checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" 4655 dependencies = [ 4656 - "matchers", 4657 "nu-ansi-term", 4658 - "once_cell", 4659 - "regex", 4660 - "serde", 4661 - "serde_json", 4662 "sharded-slab", 4663 "smallvec", 4664 "thread_local", 4665 - "tracing", 4666 "tracing-core", 4667 "tracing-log", 4668 - "tracing-serde", 4669 ] 4670 4671 [[package]] ··· 4934 version = "0.3.77" 4935 source = "registry+https://github.com/rust-lang/crates.io-index" 4936 checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" 4937 - dependencies = [ 4938 - "js-sys", 4939 - "wasm-bindgen", 4940 - ] 4941 - 4942 - [[package]] 4943 - name = "web-time" 4944 - version = "1.1.0" 4945 - source = "registry+https://github.com/rust-lang/crates.io-index" 4946 - checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" 4947 dependencies = [ 4948 "js-sys", 4949 "wasm-bindgen",
··· 286 ] 287 288 [[package]] 289 name = "backtrace" 290 version = "0.3.74" 291 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 347 "proc-macro2", 348 "quote", 349 "regex", 350 + "rustc-hash", 351 "shlex", 352 "syn", 353 "which", ··· 465 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" 466 467 [[package]] 468 name = "chrono" 469 version = "0.4.41" 470 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1370 checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" 1371 dependencies = [ 1372 "cfg-if", 1373 "libc", 1374 "r-efi", 1375 "wasi 0.14.2+wasi-0.2.4", 1376 ] 1377 1378 [[package]] ··· 2169 ] 2170 2171 [[package]] 2172 name = "lz4-sys" 2173 version = "1.11.1+lz4-1.10.0" 2174 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2185 checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" 2186 2187 [[package]] 2188 name = "matchit" 2189 version = "0.8.4" 2190 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2513 ] 2514 2515 [[package]] 2516 name = "overload" 2517 version = "0.1.1" 2518 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2549 "async-recursion", 2550 "axum", 2551 "axum-extra", 2552 "base64 0.22.1", 2553 "chrono", 2554 "dataloader", ··· 2563 "jsonwebtoken", 2564 "lexica", 2565 "multibase", 2566 "parakeet-db", 2567 "parakeet-index", 2568 "redis", ··· 2571 "serde_ipld_dagcbor", 2572 "serde_json", 2573 "tokio", 2574 "tower-http", 2575 "tracing", 2576 "tracing-subscriber", 2577 ] 2578 ··· 2594 "eyre", 2595 "figment", 2596 "itertools 0.14.0", 2597 + "prost", 2598 "rocksdb", 2599 "serde", 2600 "tokio", 2601 + "tonic", 2602 "tonic-build", 2603 "tonic-health", 2604 "tracing", 2605 "tracing-subscriber", 2606 ] 2607 ··· 2907 checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 2908 dependencies = [ 2909 "bytes", 2910 + "prost-derive", 2911 ] 2912 2913 [[package]] ··· 2923 "once_cell", 2924 "petgraph", 2925 "prettyplease", 2926 + "prost", 2927 "prost-types", 2928 "regex", 2929 "syn", ··· 2944 ] 2945 2946 [[package]] 2947 name = "prost-types" 2948 version = "0.13.5" 2949 source = "registry+https://github.com/rust-lang/crates.io-index" 2950 checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" 2951 dependencies = [ 2952 + "prost", 2953 ] 2954 2955 [[package]] ··· 2974 checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" 2975 2976 [[package]] 2977 name = "quote" 2978 version = "1.0.38" 2979 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3135 dependencies = [ 3136 "aho-corasick", 3137 "memchr", 3138 + "regex-automata", 3139 + "regex-syntax", 3140 ] 3141 3142 [[package]] ··· 3147 dependencies = [ 3148 "aho-corasick", 3149 "memchr", 3150 + "regex-syntax", 3151 ] 3152 3153 [[package]] 3154 name = "regex-syntax" 3155 version = "0.8.5" 3156 source = "registry+https://github.com/rust-lang/crates.io-index" 3157 checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" ··· 3166 "base64 0.22.1", 3167 "bytes", 3168 "encoding_rs", 3169 "futures-core", 3170 "futures-util", 3171 "h2", ··· 3184 "once_cell", 3185 "percent-encoding", 3186 "pin-project-lite", 3187 "rustls-pemfile", 3188 "serde", 3189 "serde_json", 3190 "serde_urlencoded", ··· 3192 "system-configuration", 3193 "tokio", 3194 "tokio-native-tls", 3195 "tokio-util", 3196 "tower", 3197 "tower-service", ··· 3281 checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" 3282 3283 [[package]] 3284 name = "rustc_version" 3285 version = "0.4.1" 3286 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3310 dependencies = [ 3311 "aws-lc-rs", 3312 "once_cell", 3313 "rustls-pki-types", 3314 "rustls-webpki", 3315 "subtle", ··· 3342 version = "1.11.0" 3343 source = "registry+https://github.com/rust-lang/crates.io-index" 3344 checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" 3345 3346 [[package]] 3347 name = "rustls-webpki" ··· 4185 "hyper-util", 4186 "percent-encoding", 4187 "pin-project", 4188 + "prost", 4189 "socket2 0.5.8", 4190 "tokio", 4191 "tokio-stream", 4192 "tower", 4193 "tower-layer", 4194 "tower-service", 4195 "tracing", ··· 4215 source = "registry+https://github.com/rust-lang/crates.io-index" 4216 checksum = "cb87334d340313fefa513b6e60794d44a86d5f039b523229c99c323e4e19ca4b" 4217 dependencies = [ 4218 + "prost", 4219 "tokio", 4220 "tokio-stream", 4221 + "tonic", 4222 ] 4223 4224 [[package]] ··· 4313 ] 4314 4315 [[package]] 4316 name = "tracing-subscriber" 4317 version = "0.3.19" 4318 source = "registry+https://github.com/rust-lang/crates.io-index" 4319 checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" 4320 dependencies = [ 4321 "nu-ansi-term", 4322 "sharded-slab", 4323 "smallvec", 4324 "thread_local", 4325 "tracing-core", 4326 "tracing-log", 4327 ] 4328 4329 [[package]] ··· 4592 version = "0.3.77" 4593 source = "registry+https://github.com/rust-lang/crates.io-index" 4594 checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" 4595 dependencies = [ 4596 "js-sys", 4597 "wasm-bindgen",
+1 -1
consumer/Cargo.toml
··· 36 tokio-tungstenite = { version = "0.26.1", features = ["native-tls"] } 37 tokio-util = { version = "0.7.14", features = ["io", "rt"] } 38 tracing = "0.1.40" 39 - tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
··· 36 tokio-tungstenite = { version = "0.26.1", features = ["native-tls"] } 37 tokio-util = { version = "0.7.14", features = ["io", "rt"] } 38 tracing = "0.1.40" 39 + tracing-subscriber = "0.3.18"
+3 -3
consumer/src/backfill/downloader.rs
··· 84 85 // has the repo already been downloaded? 86 if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() { 87 - tracing::info!("skipping duplicate repo {did}"); 88 continue; 89 } 90 ··· 92 match db::actor_get_statuses(&mut conn, &did).await { 93 Ok(Some((_, state))) => { 94 if state == ActorSyncState::Synced || state == ActorSyncState::Processing { 95 - tracing::info!("skipping duplicate repo {did}"); 96 continue; 97 } 98 } ··· 206 let _ = rc.zadd(BF_REM_KEY, &pds, rem).await; 207 let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await; 208 } 209 - Ok(_) => tracing::debug!(pds, "got response with no ratelimit headers."), 210 Err(e) => { 211 tracing::error!(pds, did, "failed to download repo: {e}"); 212 continue;
··· 84 85 // has the repo already been downloaded? 86 if rc.sismember(DL_DUP_KEY, &did).await.unwrap_or_default() { 87 + tracing::warn!("skipping duplicate repo {did}"); 88 continue; 89 } 90 ··· 92 match db::actor_get_statuses(&mut conn, &did).await { 93 Ok(Some((_, state))) => { 94 if state == ActorSyncState::Synced || state == ActorSyncState::Processing { 95 + tracing::warn!("skipping duplicate repo {did}"); 96 continue; 97 } 98 } ··· 206 let _ = rc.zadd(BF_REM_KEY, &pds, rem).await; 207 let _ = rc.zadd(BF_RESET_KEY, &pds, reset).await; 208 } 209 + Ok(_) => tracing::warn!(pds, "got response with no ratelimit headers."), 210 Err(e) => { 211 tracing::error!(pds, did, "failed to download repo: {e}"); 212 continue;
+5 -4
consumer/src/backfill/repo.rs
··· 53 54 match block { 55 CarEntry::Commit(_) => { 56 - tracing::debug!("got commit entry that was not in root") 57 } 58 CarEntry::Record(CarRecordEntry::Known(record)) => { 59 if let Some(path) = mst_nodes.remove(&cid) { ··· 96 } 97 } 98 99 - let Some(commit) = commit else { 100 - eyre::bail!("repo contained no commit?"); 101 - }; 102 103 Ok((commit, deltas, copies)) 104 } ··· 171 } 172 RecordTypes::AppBskyFeedThreadgate(record) => { 173 if !at_uri_is_by(&record.post, did) { 174 return Ok(()); 175 } 176 ··· 195 RecordTypes::AppBskyGraphListItem(rec) => { 196 let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>(); 197 if did != split_aturi[2] { 198 return Ok(()); 199 } 200
··· 53 54 match block { 55 CarEntry::Commit(_) => { 56 + tracing::warn!("got commit entry that was not in root") 57 } 58 CarEntry::Record(CarRecordEntry::Known(record)) => { 59 if let Some(path) = mst_nodes.remove(&cid) { ··· 96 } 97 } 98 99 + let commit = commit.unwrap(); 100 101 Ok((commit, deltas, copies)) 102 } ··· 169 } 170 RecordTypes::AppBskyFeedThreadgate(record) => { 171 if !at_uri_is_by(&record.post, did) { 172 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 173 return Ok(()); 174 } 175 ··· 194 RecordTypes::AppBskyGraphListItem(rec) => { 195 let split_aturi = rec.list.rsplitn(4, '/').collect::<Vec<_>>(); 196 if did != split_aturi[2] { 197 + // it's also probably a bad idea to log *all* the attempts to do this... 198 + tracing::warn!("tried to create a listitem on a list we don't control!"); 199 return Ok(()); 200 } 201
-8
consumer/src/config.rs
··· 13 14 #[derive(Debug, Deserialize)] 15 pub struct Config { 16 - #[serde(flatten)] 17 - pub instruments: ConfigInstruments, 18 pub index_uri: String, 19 pub database: deadpool_postgres::Config, 20 pub redis_uri: String, ··· 29 pub indexer: Option<IndexerConfig>, 30 /// Configuration items specific to backfill 31 pub backfill: Option<BackfillConfig>, 32 - } 33 - 34 - #[derive(Debug, Deserialize)] 35 - pub struct ConfigInstruments { 36 - #[serde(default)] 37 - pub log_json: bool, 38 } 39 40 #[derive(Debug, Deserialize)]
··· 13 14 #[derive(Debug, Deserialize)] 15 pub struct Config { 16 pub index_uri: String, 17 pub database: deadpool_postgres::Config, 18 pub redis_uri: String, ··· 27 pub indexer: Option<IndexerConfig>, 28 /// Configuration items specific to backfill 29 pub backfill: Option<BackfillConfig>, 30 } 31 32 #[derive(Debug, Deserialize)]
+3 -3
consumer/src/db/actor.rs
··· 69 ) 70 .await?; 71 72 - res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 73 } 74 75 pub async fn actor_get_repo_status<C: GenericClient>( ··· 83 ) 84 .await?; 85 86 - res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 87 } 88 89 pub async fn actor_get_statuses<C: GenericClient>( ··· 97 ) 98 .await?; 99 100 - res.map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose() 101 }
··· 69 ) 70 .await?; 71 72 + Ok(res.map(|v| (v.get(0), v.get(1)))) 73 } 74 75 pub async fn actor_get_repo_status<C: GenericClient>( ··· 83 ) 84 .await?; 85 86 + Ok(res.map(|v| (v.get(0), v.get(1)))) 87 } 88 89 pub async fn actor_get_statuses<C: GenericClient>( ··· 97 ) 98 .await?; 99 100 + Ok(res.map(|v| (v.get(0), v.get(1)))) 101 }
+9 -10
consumer/src/db/backfill.rs
··· 51 ) 52 .await?; 53 54 - res.into_iter() 55 - .map(|row| { 56 - Ok(BackfillRow { 57 - repo: row.try_get(0)?, 58 - repo_ver: row.try_get(1)?, 59 - cid: row.try_get(2)?, 60 - data: row.try_get(3)?, 61 - indexed_at: row.try_get(4)?, 62 - }) 63 }) 64 - .collect() 65 } 66 67 pub async fn backfill_delete_rows<C: GenericClient>(conn: &mut C, repo: &str) -> PgExecResult {
··· 51 ) 52 .await?; 53 54 + Ok(res 55 + .into_iter() 56 + .map(|row| BackfillRow { 57 + repo: row.get(0), 58 + repo_ver: row.get(1), 59 + cid: row.get(2), 60 + data: row.get(3), 61 + indexed_at: row.get(4), 62 }) 63 + .collect()) 64 } 65 66 pub async fn backfill_delete_rows<C: GenericClient>(conn: &mut C, repo: &str) -> PgExecResult {
+1 -1
consumer/src/db/copy.rs
··· 205 ) 206 .await? 207 .into_iter() 208 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).collect::<Result<_, _>>()?; 209 210 for (root, post, created_at) in threadgated { 211 match super::post_enforce_threadgate(conn, &root, did, created_at, true).await {
··· 205 ) 206 .await? 207 .into_iter() 208 + .map(|v| (v.get(0), v.get(1), v.get(2))).collect(); 209 210 for (root, post, created_at) in threadgated { 211 match super::post_enforce_threadgate(conn, &root, did, created_at, true).await {
+13 -18
consumer/src/db/gates.rs
··· 47 &[&root_author, &post_author], 48 ) 49 .await? 50 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?))).transpose()?; 51 52 if let Some((following, followed)) = profile_state { 53 if allow.contains(THREADGATE_RULE_FOLLOWER) && followed { ··· 65 let mentions: Vec<String> = conn 66 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 67 .await? 68 - .and_then(|r| r.try_get::<_, Option<_>>(0).transpose()) 69 - .transpose()? 70 .unwrap_or_default(); 71 72 if mentions.contains(&post_author.to_owned()) { ··· 85 &[&allow_lists, &post_author], 86 ) 87 .await? 88 - .try_get(0)?; 89 if count != 0 { 90 return Ok(false); 91 } ··· 137 ) 138 .await? 139 .into_iter() 140 - .map(|row| row.try_get(0)) 141 - .collect::<Result<_, _>>()?; 142 143 // this will be empty if there are no replies. 144 if dids.is_empty() { ··· 153 }) 154 .collect::<Vec<_>>(); 155 156 - let allow: HashSet<_> = HashSet::from_iter(allow.iter().map(|v| v.as_str())); 157 158 if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() { 159 let current_dids: Vec<_> = dids.iter().collect(); ··· 161 let res = conn.query( 162 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", 163 &[&root_author, &current_dids] 164 - ).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<HashSet<_>, _>>()?; 165 166 - dids = &dids - &res; 167 } 168 169 if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { ··· 172 let res = conn.query( 173 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", 174 &[&root_author, &current_dids] 175 - ).await?.into_iter().map(|row| row.try_get(0)).collect::<Result<_, _>>()?; 176 177 - dids = &dids - &res; 178 } 179 180 if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { 181 let mentions: Vec<String> = conn 182 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 183 .await? 184 - .and_then(|r| r.try_get::<_, Option<_>>(0).transpose()) 185 - .transpose()? 186 .unwrap_or_default(); 187 188 dids = &dids - &HashSet::from_iter(mentions); ··· 196 "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", 197 &[&allowed_lists, &current_dids], 198 ) 199 - .await? 200 - .into_iter() 201 - .map(|row| row.try_get(0)) 202 - .collect::<Result<_, _>>()?; 203 204 - dids = &dids - &res; 205 } 206 207 let dids = dids.into_iter().collect::<Vec<_>>();
··· 47 &[&root_author, &post_author], 48 ) 49 .await? 50 + .map(|v| (v.get(0), v.get(1))); 51 52 if let Some((following, followed)) = profile_state { 53 if allow.contains(THREADGATE_RULE_FOLLOWER) && followed { ··· 65 let mentions: Vec<String> = conn 66 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 67 .await? 68 + .map(|r| r.get(0)) 69 .unwrap_or_default(); 70 71 if mentions.contains(&post_author.to_owned()) { ··· 84 &[&allow_lists, &post_author], 85 ) 86 .await? 87 + .get(0); 88 if count != 0 { 89 return Ok(false); 90 } ··· 136 ) 137 .await? 138 .into_iter() 139 + .map(|row| row.get(0)) 140 + .collect(); 141 142 // this will be empty if there are no replies. 143 if dids.is_empty() { ··· 152 }) 153 .collect::<Vec<_>>(); 154 155 + let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str())); 156 157 if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() { 158 let current_dids: Vec<_> = dids.iter().collect(); ··· 160 let res = conn.query( 161 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", 162 &[&root_author, &current_dids] 163 + ).await?; 164 165 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 166 } 167 168 if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { ··· 171 let res = conn.query( 172 "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", 173 &[&root_author, &current_dids] 174 + ).await?; 175 176 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 177 } 178 179 if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { 180 let mentions: Vec<String> = conn 181 .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 182 .await? 183 + .map(|r| r.get(0)) 184 .unwrap_or_default(); 185 186 dids = &dids - &HashSet::from_iter(mentions); ··· 194 "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", 195 &[&allowed_lists, &current_dids], 196 ) 197 + .await?; 198 199 + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); 200 } 201 202 let dids = dids.into_iter().collect::<Vec<_>>();
+20 -17
consumer/src/db/record.rs
··· 127 ], 128 ) 129 .await 130 - .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 131 } 132 133 pub async fn feedgen_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 159 ) 160 .await?; 161 162 - res.map(|v| v.try_get(0)).transpose() 163 } 164 165 pub async fn labeler_upsert<C: GenericClient>( ··· 224 ) 225 .await?; 226 227 - res.map(|v| v.try_get(0)).transpose() 228 } 229 230 pub async fn list_upsert<C: GenericClient>( ··· 255 ], 256 ) 257 .await 258 - .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 259 } 260 261 pub async fn list_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 391 ) 392 .await?; 393 394 - res.map(|row| Ok((row.try_get(0)?, row.try_get(1)?))) 395 - .transpose() 396 } 397 398 pub async fn post_embed_insert<C: GenericClient>( ··· 537 conn: &mut C, 538 post: &str, 539 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 540 - conn.query_opt( 541 - "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 - &[&post], 543 - ) 544 - .await? 545 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))) 546 - .transpose() 547 } 548 549 pub async fn postgate_upsert<C: GenericClient>( ··· 650 ) 651 .await?; 652 653 - res.map(|v| v.try_get(0)).transpose() 654 } 655 656 pub async fn starter_pack_upsert<C: GenericClient>( ··· 685 ], 686 ) 687 .await 688 - .and_then(|r| Ok(r.try_get::<_, i32>(0)? == 0)) 689 } 690 691 pub async fn starter_pack_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 730 conn: &mut C, 731 post: &str, 732 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 733 - conn 734 .query_opt( 735 "SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL", 736 &[&post], 737 ) 738 .await? 739 - .map(|v| Ok((v.try_get(0)?, v.try_get(1)?, v.try_get(2)?))).transpose() 740 } 741 742 pub async fn threadgate_upsert<C: GenericClient>(
··· 127 ], 128 ) 129 .await 130 + .map(|r| r.get::<_, i32>(0) == 0) 131 } 132 133 pub async fn feedgen_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 159 ) 160 .await?; 161 162 + Ok(res.map(|v| v.get(0))) 163 } 164 165 pub async fn labeler_upsert<C: GenericClient>( ··· 224 ) 225 .await?; 226 227 + Ok(res.map(|v| v.get(0))) 228 } 229 230 pub async fn list_upsert<C: GenericClient>( ··· 255 ], 256 ) 257 .await 258 + .map(|r| r.get::<_, i32>(0) == 0) 259 } 260 261 pub async fn list_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 391 ) 392 .await?; 393 394 + Ok(res.map(|row| (row.get(0), row.get(1)))) 395 } 396 397 pub async fn post_embed_insert<C: GenericClient>( ··· 536 conn: &mut C, 537 post: &str, 538 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 539 + let res = conn 540 + .query_opt( 541 + "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", 542 + &[&post], 543 + ) 544 + .await? 545 + .map(|v| (v.get(0), v.get(1), v.get(2))); 546 + 547 + Ok(res) 548 } 549 550 pub async fn postgate_upsert<C: GenericClient>( ··· 651 ) 652 .await?; 653 654 + Ok(res.map(|v| v.get(0))) 655 } 656 657 pub async fn starter_pack_upsert<C: GenericClient>( ··· 686 ], 687 ) 688 .await 689 + .map(|r| r.get::<_, i32>(0) == 0) 690 } 691 692 pub async fn starter_pack_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult { ··· 731 conn: &mut C, 732 post: &str, 733 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> { 734 + let res = conn 735 .query_opt( 736 "SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL", 737 &[&post], 738 ) 739 .await? 740 + .map(|v| (v.get(0), v.get(1), v.get(2))); 741 + 742 + Ok(res) 743 } 744 745 pub async fn threadgate_upsert<C: GenericClient>(
+4
consumer/src/indexer/mod.rs
··· 640 } 641 RecordTypes::AppBskyFeedPostgate(record) => { 642 if !at_uri_is_by(&record.post, repo) { 643 return Ok(()); 644 } 645 ··· 669 } 670 RecordTypes::AppBskyFeedThreadgate(record) => { 671 if !at_uri_is_by(&record.post, repo) { 672 return Ok(()); 673 } 674 ··· 708 } 709 RecordTypes::AppBskyGraphListItem(record) => { 710 if !at_uri_is_by(&record.list, repo) { 711 return Ok(()); 712 } 713
··· 640 } 641 RecordTypes::AppBskyFeedPostgate(record) => { 642 if !at_uri_is_by(&record.post, repo) { 643 + tracing::warn!("tried to create a postgate on a post we don't control!"); 644 return Ok(()); 645 } 646 ··· 670 } 671 RecordTypes::AppBskyFeedThreadgate(record) => { 672 if !at_uri_is_by(&record.post, repo) { 673 + tracing::warn!("tried to create a threadgate on a post we don't control!"); 674 return Ok(()); 675 } 676 ··· 710 } 711 RecordTypes::AppBskyGraphListItem(record) => { 712 if !at_uri_is_by(&record.list, repo) { 713 + // it's also probably a bad idea to log *all* the attempts to do this... 714 + tracing::warn!("tried to create a listitem on a list we don't control!"); 715 return Ok(()); 716 } 717
-25
consumer/src/instrumentation.rs
··· 1 - use tracing::Subscriber; 2 - use tracing_subscriber::filter::Filtered; 3 - use tracing_subscriber::layer::SubscriberExt; 4 - use tracing_subscriber::registry::LookupSpan; 5 - use tracing_subscriber::util::SubscriberInitExt; 6 - use tracing_subscriber::{EnvFilter, Layer}; 7 - 8 - pub fn init_instruments(cfg: &crate::config::ConfigInstruments) { 9 - let log_layer = init_log(cfg.log_json); 10 - 11 - tracing_subscriber::registry().with(log_layer).init(); 12 - } 13 - 14 - fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 15 - where 16 - S: Subscriber + for<'span> LookupSpan<'span>, 17 - { 18 - let stdout_filter = EnvFilter::from_default_env(); 19 - 20 - match json { 21 - true => tracing_subscriber::fmt::layer().json().boxed(), 22 - false => tracing_subscriber::fmt::layer().boxed(), 23 - } 24 - .with_filter(stdout_filter) 25 - }
···
+1 -2
consumer/src/main.rs
··· 12 mod db; 13 mod firehose; 14 mod indexer; 15 - mod instrumentation; 16 mod label_indexer; 17 mod utils; 18 19 #[tokio::main] 20 async fn main() -> eyre::Result<()> { 21 PrometheusBuilder::new().install()?; 22 23 let cli = cmd::parse(); 24 let conf = config::load_config()?; 25 26 - instrumentation::init_instruments(&conf.instruments); 27 let user_agent = build_ua(&conf.ua_contact); 28 29 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
··· 12 mod db; 13 mod firehose; 14 mod indexer; 15 mod label_indexer; 16 mod utils; 17 18 #[tokio::main] 19 async fn main() -> eyre::Result<()> { 20 + tracing_subscriber::fmt::init(); 21 PrometheusBuilder::new().install()?; 22 23 let cli = cmd::parse(); 24 let conf = config::load_config()?; 25 26 let user_agent = build_ua(&conf.ua_contact); 27 28 let pool = conf.database.create_pool(Some(Runtime::Tokio1), NoTls)?;
+2 -2
migrations/2025-09-27-171241_post-tweaks/up.sql
··· 34 language plpgsql as 35 $$ 36 begin 37 - delete from author_feeds where did = OLD.did and uri = OLD.at_uri and typ = 'post'; 38 return OLD; 39 end; 40 $$; ··· 67 language plpgsql as 68 $$ 69 begin 70 - delete from author_feeds where did = OLD.did and post = OLD.post and typ = 'repost'; 71 return OLD; 72 end; 73 $$;
··· 34 language plpgsql as 35 $$ 36 begin 37 + delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post'; 38 return OLD; 39 end; 40 $$; ··· 67 language plpgsql as 68 $$ 69 begin 70 + delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost'; 71 return OLD; 72 end; 73 $$;
+2 -8
parakeet/Cargo.toml
··· 6 [dependencies] 7 async-recursion = "1.1.1" 8 axum = { version = "0.8", features = ["json"] } 9 - axum-tracing-opentelemetry = "0.32" 10 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 11 base64 = "0.22" 12 chrono = { version = "0.4.39", features = ["serde"] } ··· 22 jsonwebtoken = { git = "https://gitlab.com/parakeet-social/jsonwebtoken", branch = "es256k" } 23 lexica = { path = "../lexica" } 24 multibase = "0.9.1" 25 - opentelemetry = "0.31.0" 26 - opentelemetry-otlp = "0.31.0" 27 - opentelemetry_sdk = "0.31.0" 28 parakeet-db = { path = "../parakeet-db" } 29 - parakeet-index = { path = "../parakeet-index", features = ["otel"] } 30 redis = { version = "0.32", features = ["tokio-native-tls-comp"] } 31 reqwest = { version = "0.12", features = ["json"] } 32 serde = { version = "1.0.217", features = ["derive"] } 33 serde_ipld_dagcbor = "0.6.1" 34 serde_json = "1.0.134" 35 tokio = { version = "1.42.0", features = ["full"] } 36 - tower = "0.5" 37 tower-http = { version = "0.6.2", features = ["cors", "trace"] } 38 tracing = "0.1.40" 39 - tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } 40 - tracing-opentelemetry = "0.32"
··· 6 [dependencies] 7 async-recursion = "1.1.1" 8 axum = { version = "0.8", features = ["json"] } 9 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 10 base64 = "0.22" 11 chrono = { version = "0.4.39", features = ["serde"] } ··· 21 jsonwebtoken = { git = "https://gitlab.com/parakeet-social/jsonwebtoken", branch = "es256k" } 22 lexica = { path = "../lexica" } 23 multibase = "0.9.1" 24 parakeet-db = { path = "../parakeet-db" } 25 + parakeet-index = { path = "../parakeet-index" } 26 redis = { version = "0.32", features = ["tokio-native-tls-comp"] } 27 reqwest = { version = "0.12", features = ["json"] } 28 serde = { version = "1.0.217", features = ["derive"] } 29 serde_ipld_dagcbor = "0.6.1" 30 serde_json = "1.0.134" 31 tokio = { version = "1.42.0", features = ["full"] } 32 tower-http = { version = "0.6.2", features = ["cors", "trace"] } 33 tracing = "0.1.40" 34 + tracing-subscriber = "0.3.18"
+2 -2
parakeet/src/cache.rs
··· 29 type Val = V; 30 31 async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> { 32 - let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, key).await.ok()?; 33 34 match serde_ipld_dagcbor::from_slice(&res?) { 35 Ok(v) => Some(v), ··· 57 } 58 59 async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> { 60 - let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, key) 61 .await 62 .ok()?; 63
··· 29 type Val = V; 30 31 async fn get(&mut self, key: &Self::Key) -> Option<Self::Val> { 32 + let res: Option<Vec<u8>> = redis::AsyncCommands::get(&mut self.conn, &key).await.ok()?; 33 34 match serde_ipld_dagcbor::from_slice(&res?) { 35 Ok(v) => Some(v), ··· 57 } 58 59 async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val> { 60 + let res: Option<Vec<u8>> = redis::AsyncCommands::get_del(&mut self.conn, &key) 61 .await 62 .ok()?; 63
-10
parakeet/src/config.rs
··· 13 14 #[derive(Debug, Deserialize)] 15 pub struct Config { 16 - #[serde(flatten)] 17 - pub instruments: ConfigInstruments, 18 pub index_uri: String, 19 pub database_url: String, 20 pub redis_uri: String, ··· 29 pub did_allowlist: Option<Vec<String>>, 30 #[serde(default)] 31 pub migrate: bool, 32 - } 33 - 34 - #[derive(Debug, Deserialize)] 35 - pub struct ConfigInstruments { 36 - #[serde(default)] 37 - pub otel_enable: bool, 38 - #[serde(default)] 39 - pub log_json: bool, 40 } 41 42 #[derive(Debug, Deserialize)]
··· 13 14 #[derive(Debug, Deserialize)] 15 pub struct Config { 16 pub index_uri: String, 17 pub database_url: String, 18 pub redis_uri: String, ··· 27 pub did_allowlist: Option<Vec<String>>, 28 #[serde(default)] 29 pub migrate: bool, 30 } 31 32 #[derive(Debug, Deserialize)]
+1 -21
parakeet/src/db.rs
··· 1 use diesel::prelude::*; 2 use diesel::sql_types::{Array, Bool, Integer, Nullable, Text}; 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 4 - use parakeet_db::models::TextArray; 5 use parakeet_db::{schema, types}; 6 - use tracing::instrument; 7 8 - #[instrument(skip_all)] 9 pub async fn get_actor_status( 10 conn: &mut AsyncPgConnection, 11 did: &str, ··· 40 #[diesel(sql_type = Nullable<Text>)] 41 pub list_mute: Option<String>, 42 } 43 - 44 - #[instrument(skip_all)] 45 pub async fn get_profile_state( 46 conn: &mut AsyncPgConnection, 47 did: &str, ··· 54 .await 55 .optional() 56 } 57 - 58 - #[instrument(skip_all)] 59 pub async fn get_profile_states( 60 conn: &mut AsyncPgConnection, 61 did: &str, ··· 90 #[diesel(sql_type = diesel::sql_types::Bool)] 91 pub pinned: bool, 92 } 93 - 94 - #[instrument(skip_all)] 95 pub async fn get_post_state( 96 conn: &mut AsyncPgConnection, 97 did: &str, ··· 105 .optional() 106 } 107 108 - #[instrument(skip_all)] 109 pub async fn get_post_states( 110 conn: &mut AsyncPgConnection, 111 did: &str, ··· 129 pub block: Option<String>, 130 } 131 132 - #[instrument(skip_all)] 133 pub async fn get_list_state( 134 conn: &mut AsyncPgConnection, 135 did: &str, ··· 143 .optional() 144 } 145 146 - #[instrument(skip_all)] 147 pub async fn get_list_states( 148 conn: &mut AsyncPgConnection, 149 did: &str, ··· 156 .await 157 } 158 159 - #[instrument(skip_all)] 160 pub async fn get_like_state( 161 conn: &mut AsyncPgConnection, 162 did: &str, ··· 174 .optional() 175 } 176 177 - #[instrument(skip_all)] 178 pub async fn get_like_states( 179 conn: &mut AsyncPgConnection, 180 did: &str, ··· 195 .await 196 } 197 198 - #[instrument(skip_all)] 199 pub async fn get_pinned_post_uri( 200 conn: &mut AsyncPgConnection, 201 did: &str, ··· 226 pub depth: i32, 227 } 228 229 - #[instrument(skip_all)] 230 pub async fn get_thread_children( 231 conn: &mut AsyncPgConnection, 232 uri: &str, ··· 239 .await 240 } 241 242 - #[instrument(skip_all)] 243 pub async fn get_thread_children_branching( 244 conn: &mut AsyncPgConnection, 245 uri: &str, ··· 261 pub at_uri: String, 262 } 263 264 - #[instrument(skip_all)] 265 pub async fn get_thread_children_hidden( 266 conn: &mut AsyncPgConnection, 267 uri: &str, ··· 274 .await 275 } 276 277 - #[instrument(skip_all)] 278 pub async fn get_thread_parents( 279 conn: &mut AsyncPgConnection, 280 uri: &str, ··· 287 .await 288 } 289 290 - #[instrument(skip_all)] 291 pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> { 292 schema::posts::table 293 .select(schema::posts::root_uri) ··· 298 .map(|v| v.flatten()) 299 } 300 301 - #[instrument(skip_all)] 302 pub async fn get_threadgate_hiddens( 303 conn: &mut AsyncPgConnection, 304 uri: &str,
··· 1 use diesel::prelude::*; 2 use diesel::sql_types::{Array, Bool, Integer, Nullable, Text}; 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 4 use parakeet_db::{schema, types}; 5 + use parakeet_db::models::TextArray; 6 7 pub async fn get_actor_status( 8 conn: &mut AsyncPgConnection, 9 did: &str, ··· 38 #[diesel(sql_type = Nullable<Text>)] 39 pub list_mute: Option<String>, 40 } 41 pub async fn get_profile_state( 42 conn: &mut AsyncPgConnection, 43 did: &str, ··· 50 .await 51 .optional() 52 } 53 pub async fn get_profile_states( 54 conn: &mut AsyncPgConnection, 55 did: &str, ··· 84 #[diesel(sql_type = diesel::sql_types::Bool)] 85 pub pinned: bool, 86 } 87 pub async fn get_post_state( 88 conn: &mut AsyncPgConnection, 89 did: &str, ··· 97 .optional() 98 } 99 100 pub async fn get_post_states( 101 conn: &mut AsyncPgConnection, 102 did: &str, ··· 120 pub block: Option<String>, 121 } 122 123 pub async fn get_list_state( 124 conn: &mut AsyncPgConnection, 125 did: &str, ··· 133 .optional() 134 } 135 136 pub async fn get_list_states( 137 conn: &mut AsyncPgConnection, 138 did: &str, ··· 145 .await 146 } 147 148 pub async fn get_like_state( 149 conn: &mut AsyncPgConnection, 150 did: &str, ··· 162 .optional() 163 } 164 165 pub async fn get_like_states( 166 conn: &mut AsyncPgConnection, 167 did: &str, ··· 182 .await 183 } 184 185 pub async fn get_pinned_post_uri( 186 conn: &mut AsyncPgConnection, 187 did: &str, ··· 212 pub depth: i32, 213 } 214 215 pub async fn get_thread_children( 216 conn: &mut AsyncPgConnection, 217 uri: &str, ··· 224 .await 225 } 226 227 pub async fn get_thread_children_branching( 228 conn: &mut AsyncPgConnection, 229 uri: &str, ··· 245 pub at_uri: String, 246 } 247 248 pub async fn get_thread_children_hidden( 249 conn: &mut AsyncPgConnection, 250 uri: &str, ··· 257 .await 258 } 259 260 pub async fn get_thread_parents( 261 conn: &mut AsyncPgConnection, 262 uri: &str, ··· 269 .await 270 } 271 272 pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> { 273 schema::posts::table 274 .select(schema::posts::root_uri) ··· 279 .map(|v| v.flatten()) 280 } 281 282 pub async fn get_threadgate_hiddens( 283 conn: &mut AsyncPgConnection, 284 uri: &str,
-3
parakeet/src/hydration/embed.rs
··· 8 use lexica::app_bsky::feed::PostView; 9 use parakeet_db::models; 10 use std::collections::HashMap; 11 - use tracing::instrument; 12 13 fn build_aspect_ratio(height: Option<i32>, width: Option<i32>) -> Option<AspectRatio> { 14 height ··· 177 out 178 } 179 180 - #[instrument(skip_all)] 181 pub async fn hydrate_embed(&self, post: String) -> Option<Embed> { 182 let (embed, author) = self.loaders.embed.load(post).await?; 183 ··· 197 } 198 } 199 200 - #[instrument(skip_all)] 201 pub async fn hydrate_embeds(&self, posts: Vec<String>) -> HashMap<String, Embed> { 202 let embeds = self.loaders.embed.load_many(posts).await; 203
··· 8 use lexica::app_bsky::feed::PostView; 9 use parakeet_db::models; 10 use std::collections::HashMap; 11 12 fn build_aspect_ratio(height: Option<i32>, width: Option<i32>) -> Option<AspectRatio> { 13 height ··· 176 out 177 } 178 179 pub async fn hydrate_embed(&self, post: String) -> Option<Embed> { 180 let (embed, author) = self.loaders.embed.load(post).await?; 181 ··· 195 } 196 } 197 198 pub async fn hydrate_embeds(&self, posts: Vec<String>) -> HashMap<String, Embed> { 199 let embeds = self.loaders.embed.load_many(posts).await; 200
-5
parakeet/src/hydration/feedgen.rs
··· 5 use parakeet_db::models; 6 use std::collections::HashMap; 7 use std::str::FromStr; 8 - use tracing::instrument; 9 10 fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState { 11 GeneratorViewerState { ··· 50 } 51 52 impl super::StatefulHydrator<'_> { 53 - #[instrument(skip_all)] 54 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 55 let labels = self.get_label(&feedgen).await; 56 let viewer = self.get_feedgen_viewer_state(&feedgen).await; ··· 63 )) 64 } 65 66 - #[instrument(skip_all)] 67 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 68 let labels = self.get_label_many(&feedgens).await; 69 let viewers = self.get_feedgen_viewer_states(&feedgens).await; ··· 93 .collect() 94 } 95 96 - #[instrument(skip_all)] 97 async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> { 98 if let Some(viewer) = &self.current_actor { 99 let data = self.loaders.like_state.get(viewer, subject).await?; ··· 104 } 105 } 106 107 - #[instrument(skip_all)] 108 async fn get_feedgen_viewer_states( 109 &self, 110 subjects: &[String],
··· 5 use parakeet_db::models; 6 use std::collections::HashMap; 7 use std::str::FromStr; 8 9 fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState { 10 GeneratorViewerState { ··· 49 } 50 51 impl super::StatefulHydrator<'_> { 52 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 53 let labels = self.get_label(&feedgen).await; 54 let viewer = self.get_feedgen_viewer_state(&feedgen).await; ··· 61 )) 62 } 63 64 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 65 let labels = self.get_label_many(&feedgens).await; 66 let viewers = self.get_feedgen_viewer_states(&feedgens).await; ··· 90 .collect() 91 } 92 93 async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> { 94 if let Some(viewer) = &self.current_actor { 95 let data = self.loaders.like_state.get(viewer, subject).await?; ··· 100 } 101 } 102 103 async fn get_feedgen_viewer_states( 104 &self, 105 subjects: &[String],
-7
parakeet/src/hydration/labeler.rs
··· 8 use parakeet_db::models; 9 use std::collections::HashMap; 10 use std::str::FromStr; 11 - use tracing::instrument; 12 13 fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState { 14 LabelerViewerState { ··· 99 } 100 101 impl StatefulHydrator<'_> { 102 - #[instrument(skip_all)] 103 pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> { 104 let labels = self.get_label(&labeler).await; 105 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 110 Some(build_view(labeler, creator, labels, viewer, likes)) 111 } 112 113 - #[instrument(skip_all)] 114 pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> { 115 let labels = self.get_label_many(&labelers).await; 116 let labelers = self.loaders.labeler.load_many(labelers).await; ··· 136 .collect() 137 } 138 139 - #[instrument(skip_all)] 140 pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> { 141 let labels = self.get_label(&labeler).await; 142 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 149 )) 150 } 151 152 - #[instrument(skip_all)] 153 pub async fn hydrate_labelers_detailed( 154 &self, 155 labelers: Vec<String>, ··· 180 .collect() 181 } 182 183 - #[instrument(skip_all)] 184 async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> { 185 if let Some(viewer) = &self.current_actor { 186 let data = self ··· 195 } 196 } 197 198 - #[instrument(skip_all)] 199 async fn get_labeler_viewer_states( 200 &self, 201 subjects: &[String],
··· 8 use parakeet_db::models; 9 use std::collections::HashMap; 10 use std::str::FromStr; 11 12 fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState { 13 LabelerViewerState { ··· 98 } 99 100 impl StatefulHydrator<'_> { 101 pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> { 102 let labels = self.get_label(&labeler).await; 103 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 108 Some(build_view(labeler, creator, labels, viewer, likes)) 109 } 110 111 pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> { 112 let labels = self.get_label_many(&labelers).await; 113 let labelers = self.loaders.labeler.load_many(labelers).await; ··· 133 .collect() 134 } 135 136 pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> { 137 let labels = self.get_label(&labeler).await; 138 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 145 )) 146 } 147 148 pub async fn hydrate_labelers_detailed( 149 &self, 150 labelers: Vec<String>, ··· 175 .collect() 176 } 177 178 async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> { 179 if let Some(viewer) = &self.current_actor { 180 let data = self ··· 189 } 190 } 191 192 async fn get_labeler_viewer_states( 193 &self, 194 subjects: &[String],
-7
parakeet/src/hydration/list.rs
··· 6 use parakeet_db::models; 7 use std::collections::HashMap; 8 use std::str::FromStr; 9 - use tracing::instrument; 10 11 fn build_viewer(data: ListStateRet) -> ListViewerState { 12 ListViewerState { ··· 70 } 71 72 impl StatefulHydrator<'_> { 73 - #[instrument(skip_all)] 74 pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> { 75 let labels = self.get_label(&list).await; 76 let viewer = self.get_list_viewer_state(&list).await; ··· 79 build_basic(list, count, labels, viewer, &self.cdn) 80 } 81 82 - #[instrument(skip_all)] 83 pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> { 84 if lists.is_empty() { 85 return HashMap::new(); ··· 100 .collect() 101 } 102 103 - #[instrument(skip_all)] 104 pub async fn hydrate_list(&self, list: String) -> Option<ListView> { 105 let labels = self.get_label(&list).await; 106 let viewer = self.get_list_viewer_state(&list).await; ··· 110 build_listview(list, count, profile, labels, viewer, &self.cdn) 111 } 112 113 - #[instrument(skip_all)] 114 pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> { 115 if lists.is_empty() { 116 return HashMap::new(); ··· 136 .collect() 137 } 138 139 - #[instrument(skip_all)] 140 async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> { 141 if let Some(viewer) = &self.current_actor { 142 let data = self.loaders.list_state.get(viewer, subject).await?; ··· 147 } 148 } 149 150 - #[instrument(skip_all)] 151 async fn get_list_viewer_states( 152 &self, 153 subjects: &[String],
··· 6 use parakeet_db::models; 7 use std::collections::HashMap; 8 use std::str::FromStr; 9 10 fn build_viewer(data: ListStateRet) -> ListViewerState { 11 ListViewerState { ··· 69 } 70 71 impl StatefulHydrator<'_> { 72 pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> { 73 let labels = self.get_label(&list).await; 74 let viewer = self.get_list_viewer_state(&list).await; ··· 77 build_basic(list, count, labels, viewer, &self.cdn) 78 } 79 80 pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> { 81 if lists.is_empty() { 82 return HashMap::new(); ··· 97 .collect() 98 } 99 100 pub async fn hydrate_list(&self, list: String) -> Option<ListView> { 101 let labels = self.get_label(&list).await; 102 let viewer = self.get_list_viewer_state(&list).await; ··· 106 build_listview(list, count, profile, labels, viewer, &self.cdn) 107 } 108 109 pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> { 110 if lists.is_empty() { 111 return HashMap::new(); ··· 131 .collect() 132 } 133 134 async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> { 135 if let Some(viewer) = &self.current_actor { 136 let data = self.loaders.list_state.get(viewer, subject).await?; ··· 141 } 142 } 143 144 async fn get_list_viewer_states( 145 &self, 146 subjects: &[String],
-4
parakeet/src/hydration/mod.rs
··· 63 } 64 } 65 66 - #[tracing::instrument(skip_all)] 67 async fn get_label(&self, uri: &str) -> Vec<parakeet_db::models::Label> { 68 self.loaders.label.load(uri, self.accept_labelers).await 69 } 70 71 - #[tracing::instrument(skip_all)] 72 async fn get_profile_label(&self, did: &str) -> Vec<parakeet_db::models::Label> { 73 let uris = &[ 74 did.to_string(), ··· 82 .collect() 83 } 84 85 - #[tracing::instrument(skip_all)] 86 async fn get_label_many( 87 &self, 88 uris: &[String], ··· 93 .await 94 } 95 96 - #[tracing::instrument(skip_all)] 97 async fn get_profile_label_many( 98 &self, 99 uris: &[String],
··· 63 } 64 } 65 66 async fn get_label(&self, uri: &str) -> Vec<parakeet_db::models::Label> { 67 self.loaders.label.load(uri, self.accept_labelers).await 68 } 69 70 async fn get_profile_label(&self, did: &str) -> Vec<parakeet_db::models::Label> { 71 let uris = &[ 72 did.to_string(), ··· 80 .collect() 81 } 82 83 async fn get_label_many( 84 &self, 85 uris: &[String], ··· 90 .await 91 } 92 93 async fn get_profile_label_many( 94 &self, 95 uris: &[String],
-9
parakeet/src/hydration/posts.rs
··· 11 use parakeet_db::models; 12 use parakeet_index::PostStats; 13 use std::collections::HashMap; 14 - use tracing::instrument; 15 16 fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState { 17 let is_me = did == data.did; ··· 83 } 84 85 impl StatefulHydrator<'_> { 86 - #[instrument(skip_all)] 87 async fn hydrate_threadgate( 88 &self, 89 threadgate: Option<models::Threadgate>, ··· 102 )) 103 } 104 105 - #[instrument(skip_all)] 106 async fn hydrate_threadgates( 107 &self, 108 threadgates: Vec<models::Threadgate>, ··· 134 .collect() 135 } 136 137 - #[instrument(skip_all)] 138 pub async fn hydrate_post(&self, post: String) -> Option<PostView> { 139 let stats = self.loaders.post_stats.load(post.clone()).await; 140 let (post, threadgate) = self.loaders.posts.load(post).await?; ··· 149 ))) 150 } 151 152 - #[instrument(skip_all)] 153 async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> { 154 let stats = self.loaders.post_stats.load_many(posts.clone()).await; 155 let posts = self.loaders.posts.load_many(posts).await; ··· 189 .collect() 190 } 191 192 - #[instrument(skip_all)] 193 pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 194 self.hydrate_posts_inner(posts) 195 .await ··· 198 .collect() 199 } 200 201 - #[instrument(skip_all)] 202 pub async fn hydrate_feed_posts( 203 &self, 204 posts: Vec<RawFeedItem>, ··· 302 .collect() 303 } 304 305 - #[instrument(skip_all)] 306 async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> { 307 if let Some(viewer) = &self.current_actor { 308 let data = self.loaders.post_state.get(viewer, subject).await?; ··· 313 } 314 } 315 316 - #[instrument(skip_all)] 317 async fn get_post_viewer_states( 318 &self, 319 subjects: &[String],
··· 11 use parakeet_db::models; 12 use parakeet_index::PostStats; 13 use std::collections::HashMap; 14 15 fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState { 16 let is_me = did == data.did; ··· 82 } 83 84 impl StatefulHydrator<'_> { 85 async fn hydrate_threadgate( 86 &self, 87 threadgate: Option<models::Threadgate>, ··· 100 )) 101 } 102 103 async fn hydrate_threadgates( 104 &self, 105 threadgates: Vec<models::Threadgate>, ··· 131 .collect() 132 } 133 134 pub async fn hydrate_post(&self, post: String) -> Option<PostView> { 135 let stats = self.loaders.post_stats.load(post.clone()).await; 136 let (post, threadgate) = self.loaders.posts.load(post).await?; ··· 145 ))) 146 } 147 148 async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> { 149 let stats = self.loaders.post_stats.load_many(posts.clone()).await; 150 let posts = self.loaders.posts.load_many(posts).await; ··· 184 .collect() 185 } 186 187 pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 188 self.hydrate_posts_inner(posts) 189 .await ··· 192 .collect() 193 } 194 195 pub async fn hydrate_feed_posts( 196 &self, 197 posts: Vec<RawFeedItem>, ··· 295 .collect() 296 } 297 298 async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> { 299 if let Some(viewer) = &self.current_actor { 300 let data = self.loaders.post_state.get(viewer, subject).await?; ··· 305 } 306 } 307 308 async fn get_post_viewer_states( 309 &self, 310 subjects: &[String],
+1 -12
parakeet/src/hydration/profile.rs
··· 12 use std::collections::HashMap; 13 use std::str::FromStr; 14 use std::sync::OnceLock; 15 - use tracing::instrument; 16 17 pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new(); 18 ··· 52 .followed 53 .map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject)); 54 55 - let blocking = data.list_block.or(data 56 - .blocking 57 - .map(|rkey| format!("at://{}/app.bsky.graph.block/{rkey}", data.did))); 58 59 ProfileViewerState { 60 muted: data.muting.unwrap_or_default(), ··· 275 } 276 277 impl super::StatefulHydrator<'_> { 278 - #[instrument(skip_all)] 279 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 280 let labels = self.get_profile_label(&did).await; 281 let viewer = self.get_profile_viewer_state(&did).await; ··· 293 )) 294 } 295 296 - #[instrument(skip_all)] 297 pub async fn hydrate_profiles_basic( 298 &self, 299 dids: Vec<String>, ··· 318 .collect() 319 } 320 321 - #[instrument(skip_all)] 322 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 323 let labels = self.get_profile_label(&did).await; 324 let viewer = self.get_profile_viewer_state(&did).await; ··· 336 )) 337 } 338 339 - #[instrument(skip_all)] 340 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 341 let labels = self.get_profile_label_many(&dids).await; 342 let viewers = self.get_profile_viewer_states(&dids).await; ··· 358 .collect() 359 } 360 361 - #[instrument(skip_all)] 362 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 363 let labels = self.get_profile_label(&did).await; 364 let viewer = self.get_profile_viewer_state(&did).await; ··· 376 )) 377 } 378 379 - #[instrument(skip_all)] 380 pub async fn hydrate_profiles_detailed( 381 &self, 382 dids: Vec<String>, ··· 401 .collect() 402 } 403 404 - #[instrument(skip_all)] 405 async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> { 406 if let Some(viewer) = &self.current_actor { 407 let data = self.loaders.profile_state.get(viewer, subject).await?; ··· 421 } 422 } 423 424 - #[instrument(skip_all)] 425 async fn get_profile_viewer_states( 426 &self, 427 dids: &[String],
··· 12 use std::collections::HashMap; 13 use std::str::FromStr; 14 use std::sync::OnceLock; 15 16 pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new(); 17 ··· 51 .followed 52 .map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject)); 53 54 + let blocking = data.list_block.or(data.blocking); 55 56 ProfileViewerState { 57 muted: data.muting.unwrap_or_default(), ··· 272 } 273 274 impl super::StatefulHydrator<'_> { 275 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 276 let labels = self.get_profile_label(&did).await; 277 let viewer = self.get_profile_viewer_state(&did).await; ··· 289 )) 290 } 291 292 pub async fn hydrate_profiles_basic( 293 &self, 294 dids: Vec<String>, ··· 313 .collect() 314 } 315 316 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 317 let labels = self.get_profile_label(&did).await; 318 let viewer = self.get_profile_viewer_state(&did).await; ··· 330 )) 331 } 332 333 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 334 let labels = self.get_profile_label_many(&dids).await; 335 let viewers = self.get_profile_viewer_states(&dids).await; ··· 351 .collect() 352 } 353 354 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 355 let labels = self.get_profile_label(&did).await; 356 let viewer = self.get_profile_viewer_state(&did).await; ··· 368 )) 369 } 370 371 pub async fn hydrate_profiles_detailed( 372 &self, 373 dids: Vec<String>, ··· 392 .collect() 393 } 394 395 async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> { 396 if let Some(viewer) = &self.current_actor { 397 let data = self.loaders.profile_state.get(viewer, subject).await?; ··· 411 } 412 } 413 414 async fn get_profile_viewer_states( 415 &self, 416 dids: &[String],
+5 -11
parakeet/src/hydration/starter_packs.rs
··· 4 use lexica::app_bsky::graph::{ListViewBasic, StarterPackView, StarterPackViewBasic}; 5 use parakeet_db::models; 6 use std::collections::HashMap; 7 - use tracing::instrument; 8 9 fn build_basic( 10 starter_pack: models::StaterPack, ··· 51 } 52 53 impl StatefulHydrator<'_> { 54 - #[instrument(skip_all)] 55 pub async fn hydrate_starterpack_basic(&self, pack: String) -> Option<StarterPackViewBasic> { 56 let labels = self.get_label(&pack).await; 57 let sp = self.loaders.starterpacks.load(pack).await?; ··· 61 Some(build_basic(sp, creator, labels, list_item_count)) 62 } 63 64 - #[instrument(skip_all)] 65 pub async fn hydrate_starterpacks_basic( 66 &self, 67 packs: Vec<String>, ··· 89 .collect() 90 } 91 92 - #[instrument(skip_all)] 93 pub async fn hydrate_starterpack(&self, pack: String) -> Option<StarterPackView> { 94 let labels = self.get_label(&pack).await; 95 let sp = self.loaders.starterpacks.load(pack).await?; ··· 97 let creator = self.hydrate_profile_basic(sp.owner.clone()).await?; 98 let list = self.hydrate_list_basic(sp.list.clone()).await; 99 100 - let feeds = sp.feeds.clone().unwrap_or_default(); 101 - let feeds = self 102 - .hydrate_feedgens(feeds.into()) 103 - .await 104 - .into_values() 105 - .collect(); 106 107 Some(build_spview(sp, creator, labels, list, feeds)) 108 } 109 110 - #[instrument(skip_all)] 111 pub async fn hydrate_starterpacks( 112 &self, 113 packs: Vec<String>,
··· 4 use lexica::app_bsky::graph::{ListViewBasic, StarterPackView, StarterPackViewBasic}; 5 use parakeet_db::models; 6 use std::collections::HashMap; 7 8 fn build_basic( 9 starter_pack: models::StaterPack, ··· 50 } 51 52 impl StatefulHydrator<'_> { 53 pub async fn hydrate_starterpack_basic(&self, pack: String) -> Option<StarterPackViewBasic> { 54 let labels = self.get_label(&pack).await; 55 let sp = self.loaders.starterpacks.load(pack).await?; ··· 59 Some(build_basic(sp, creator, labels, list_item_count)) 60 } 61 62 pub async fn hydrate_starterpacks_basic( 63 &self, 64 packs: Vec<String>, ··· 86 .collect() 87 } 88 89 pub async fn hydrate_starterpack(&self, pack: String) -> Option<StarterPackView> { 90 let labels = self.get_label(&pack).await; 91 let sp = self.loaders.starterpacks.load(pack).await?; ··· 93 let creator = self.hydrate_profile_basic(sp.owner.clone()).await?; 94 let list = self.hydrate_list_basic(sp.list.clone()).await; 95 96 + let feeds = sp 97 + .feeds 98 + .clone() 99 + .unwrap_or_default(); 100 + let feeds = self.hydrate_feedgens(feeds.into()).await.into_values().collect(); 101 102 Some(build_spview(sp, creator, labels, list, feeds)) 103 } 104 105 pub async fn hydrate_starterpacks( 106 &self, 107 packs: Vec<String>,
-57
parakeet/src/instrumentation.rs
··· 1 - use opentelemetry::trace::TracerProvider; 2 - use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; 3 - use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider}; 4 - use tracing::Subscriber; 5 - use tracing_opentelemetry::OpenTelemetryLayer; 6 - use tracing_subscriber::filter::Filtered; 7 - use tracing_subscriber::layer::SubscriberExt; 8 - use tracing_subscriber::registry::LookupSpan; 9 - use tracing_subscriber::util::SubscriberInitExt; 10 - use tracing_subscriber::{EnvFilter, Layer}; 11 - 12 - pub fn init_instruments(cfg: &crate::config::ConfigInstruments) { 13 - let otel_layer = cfg.otel_enable.then(init_otel); 14 - let log_layer = init_log(cfg.log_json); 15 - 16 - tracing_subscriber::registry() 17 - .with(log_layer) 18 - .with(otel_layer) 19 - .init(); 20 - } 21 - 22 - fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S> 23 - where 24 - S: Subscriber + for<'span> LookupSpan<'span>, 25 - { 26 - let span_exporter = SpanExporter::builder() 27 - .with_http() 28 - .with_protocol(Protocol::HttpBinary) 29 - .build() 30 - .unwrap(); 31 - 32 - let tracer_provider = SdkTracerProvider::builder() 33 - .with_batch_exporter(span_exporter) 34 - .with_sampler(Sampler::AlwaysOn) 35 - .build(); 36 - 37 - opentelemetry::global::set_tracer_provider(tracer_provider.clone()); 38 - 39 - let tracer = tracer_provider.tracer("parakeet"); 40 - let otel_filter = EnvFilter::new("info,otel::tracing=trace,tower_http=off"); 41 - 42 - OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 43 - } 44 - 45 - fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 46 - where 47 - S: Subscriber + for<'span> LookupSpan<'span>, 48 - { 49 - let stdout_filter = 50 - EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 51 - 52 - match json { 53 - true => tracing_subscriber::fmt::layer().json().boxed(), 54 - false => tracing_subscriber::fmt::layer().boxed(), 55 - } 56 - .with_filter(stdout_filter) 57 - }
···
+4 -27
parakeet/src/loaders.rs
··· 15 use serde::{Deserialize, Serialize}; 16 use std::collections::HashMap; 17 use std::str::FromStr; 18 - use tracing::instrument; 19 20 type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>; 21 ··· 64 ) -> Dataloaders { 65 Dataloaders { 66 embed: new_plc_loader(EmbedLoader(pool.clone()), &rc, "embed", 3600), 67 - feedgen: new_plc_loader(FeedGenLoader(pool.clone()), &rc, "feedgen", 600), 68 handle: new_plc_loader(HandleLoader(pool.clone()), &rc, "handle", 60), 69 label: LabelLoader(pool.clone()), // CARE: never cache this. 70 - labeler: new_plc_loader(LabelServiceLoader(pool.clone()), &rc, "labeler", 600), 71 like: NonCachedLoader::new(LikeLoader(idxc.clone())), 72 like_state: LikeRecordLoader(pool.clone()), 73 list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600), ··· 86 87 pub struct LikeLoader(parakeet_index::Client); 88 impl BatchFn<String, i32> for LikeLoader { 89 - #[instrument(name = "LikeLoader", skip_all)] 90 async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> { 91 let res = self 92 .0 ··· 109 110 pub struct LikeRecordLoader(Pool<AsyncPgConnection>); 111 impl LikeRecordLoader { 112 - #[instrument(name = "LikeRecordLoader::get", skip_all)] 113 pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> { 114 let mut conn = self.0.get().await.unwrap(); 115 ··· 121 }) 122 } 123 124 - #[instrument(name = "LikeRecordLoader::get_many", skip_all)] 125 pub async fn get_many( 126 &self, 127 did: &str, ··· 143 144 pub struct HandleLoader(Pool<AsyncPgConnection>); 145 impl BatchFn<String, String> for HandleLoader { 146 - #[instrument(name = "HandleLoader", skip_all)] 147 async fn load(&mut self, keys: &[String]) -> HashMap<String, String> { 148 let mut conn = self.0.get().await.unwrap(); 149 ··· 176 Option<ProfileAllowSubscriptions>, 177 ); 178 impl BatchFn<String, ProfileLoaderRet> for ProfileLoader { 179 - #[instrument(name = "ProfileLoader", skip_all)] 180 async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> { 181 let mut conn = self.0.get().await.unwrap(); 182 ··· 238 239 pub struct ProfileStatsLoader(parakeet_index::Client); 240 impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader { 241 - #[instrument(name = "ProfileStatsLoader", skip_all)] 242 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> { 243 let stats_req = parakeet_index::GetStatsManyReq { 244 uris: keys.to_vec(), ··· 255 256 pub struct ProfileStateLoader(Pool<AsyncPgConnection>); 257 impl ProfileStateLoader { 258 - #[instrument(name = "ProfileStateLoader::get", skip_all)] 259 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> { 260 let mut conn = self.0.get().await.unwrap(); 261 ··· 267 }) 268 } 269 270 - #[instrument(name = "ProfileStateLoader::get_many", skip_all)] 271 pub async fn get_many( 272 &self, 273 did: &str, ··· 288 pub struct ListLoader(Pool<AsyncPgConnection>); 289 type ListLoaderRet = (models::List, i64); 290 impl BatchFn<String, ListLoaderRet> for ListLoader { 291 - #[instrument(name = "ListLoaderRet", skip_all)] 292 async fn load(&mut self, keys: &[String]) -> HashMap<String, ListLoaderRet> { 293 let mut conn = self.0.get().await.unwrap(); 294 ··· 320 321 pub struct ListStateLoader(Pool<AsyncPgConnection>); 322 impl ListStateLoader { 323 - #[instrument(name = "ListStateLoader::get", skip_all)] 324 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> { 325 let mut conn = self.0.get().await.unwrap(); 326 ··· 332 }) 333 } 334 335 - #[instrument(name = "ListStateLoader::get_many", skip_all)] 336 pub async fn get_many( 337 &self, 338 did: &str, ··· 350 } 351 } 352 353 - pub struct FeedGenLoader(Pool<AsyncPgConnection>); 354 impl BatchFn<String, models::FeedGen> for FeedGenLoader { 355 - #[instrument(name = "FeedGenLoader", skip_all)] 356 async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> { 357 let mut conn = self.0.get().await.unwrap(); 358 ··· 378 pub struct PostLoader(Pool<AsyncPgConnection>); 379 type PostLoaderRet = (models::Post, Option<models::Threadgate>); 380 impl BatchFn<String, PostLoaderRet> for PostLoader { 381 - #[instrument(name = "PostLoader", skip_all)] 382 async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> { 383 let mut conn = self.0.get().await.unwrap(); 384 ··· 409 410 pub struct PostStatsLoader(parakeet_index::Client); 411 impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader { 412 - #[instrument(name = "PostStatsLoader", skip_all)] 413 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> { 414 let stats_req = parakeet_index::GetStatsManyReq { 415 uris: keys.to_vec(), ··· 426 427 pub struct PostStateLoader(Pool<AsyncPgConnection>); 428 impl PostStateLoader { 429 - #[instrument(name = "PostStateLoader::get", skip_all)] 430 pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> { 431 let mut conn = self.0.get().await.unwrap(); 432 ··· 438 }) 439 } 440 441 - #[instrument(name = "PostStateLoader::get_many", skip_all)] 442 pub async fn get_many( 443 &self, 444 did: &str, ··· 466 RecordWithMedia(models::PostEmbedRecord, Box<EmbedLoaderRet>), 467 } 468 impl BatchFn<String, (EmbedLoaderRet, String)> for EmbedLoader { 469 - #[instrument(name = "EmbedLoader", skip_all)] 470 async fn load(&mut self, keys: &[String]) -> HashMap<String, (EmbedLoaderRet, String)> { 471 let mut conn = self.0.get().await.unwrap(); 472 ··· 549 pub struct StarterPackLoader(Pool<AsyncPgConnection>); 550 type StarterPackLoaderRet = models::StaterPack; 551 impl BatchFn<String, StarterPackLoaderRet> for StarterPackLoader { 552 - #[instrument(name = "StarterPackLoader", skip_all)] 553 async fn load(&mut self, keys: &[String]) -> HashMap<String, StarterPackLoaderRet> { 554 let mut conn = self.0.get().await.unwrap(); 555 ··· 572 } 573 } 574 575 - pub struct LabelServiceLoader(Pool<AsyncPgConnection>); 576 type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>); 577 impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader { 578 - #[instrument(name = "LabelServiceLoader", skip_all)] 579 async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> { 580 let mut conn = self.0.get().await.unwrap(); 581 ··· 614 // but it should live here anyway 615 pub struct LabelLoader(Pool<AsyncPgConnection>); 616 impl LabelLoader { 617 - #[instrument(name = "LabelLoader::load", skip_all)] 618 pub async fn load(&self, uri: &str, services: &[LabelConfigItem]) -> Vec<models::Label> { 619 let mut conn = self.0.get().await.unwrap(); 620 ··· 634 }) 635 } 636 637 - #[instrument(name = "LabelLoader::load_many", skip_all)] 638 pub async fn load_many( 639 &self, 640 uris: &[String], ··· 669 670 pub struct VerificationLoader(Pool<AsyncPgConnection>); 671 impl BatchFn<String, Vec<models::VerificationEntry>> for VerificationLoader { 672 - #[instrument(name = "VerificationLoader", skip_all)] 673 async fn load(&mut self, keys: &[String]) -> HashMap<String, Vec<models::VerificationEntry>> { 674 let mut conn = self.0.get().await.unwrap(); 675
··· 15 use serde::{Deserialize, Serialize}; 16 use std::collections::HashMap; 17 use std::str::FromStr; 18 19 type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>; 20 ··· 63 ) -> Dataloaders { 64 Dataloaders { 65 embed: new_plc_loader(EmbedLoader(pool.clone()), &rc, "embed", 3600), 66 + feedgen: new_plc_loader(FeedGenLoader(pool.clone(), idxc.clone()), &rc, "feedgen", 600), 67 handle: new_plc_loader(HandleLoader(pool.clone()), &rc, "handle", 60), 68 label: LabelLoader(pool.clone()), // CARE: never cache this. 69 + labeler: new_plc_loader(LabelServiceLoader(pool.clone(), idxc.clone()), &rc, "labeler", 600), 70 like: NonCachedLoader::new(LikeLoader(idxc.clone())), 71 like_state: LikeRecordLoader(pool.clone()), 72 list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600), ··· 85 86 pub struct LikeLoader(parakeet_index::Client); 87 impl BatchFn<String, i32> for LikeLoader { 88 async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> { 89 let res = self 90 .0 ··· 107 108 pub struct LikeRecordLoader(Pool<AsyncPgConnection>); 109 impl LikeRecordLoader { 110 pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> { 111 let mut conn = self.0.get().await.unwrap(); 112 ··· 118 }) 119 } 120 121 pub async fn get_many( 122 &self, 123 did: &str, ··· 139 140 pub struct HandleLoader(Pool<AsyncPgConnection>); 141 impl BatchFn<String, String> for HandleLoader { 142 async fn load(&mut self, keys: &[String]) -> HashMap<String, String> { 143 let mut conn = self.0.get().await.unwrap(); 144 ··· 171 Option<ProfileAllowSubscriptions>, 172 ); 173 impl BatchFn<String, ProfileLoaderRet> for ProfileLoader { 174 async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> { 175 let mut conn = self.0.get().await.unwrap(); 176 ··· 232 233 pub struct ProfileStatsLoader(parakeet_index::Client); 234 impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader { 235 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> { 236 let stats_req = parakeet_index::GetStatsManyReq { 237 uris: keys.to_vec(), ··· 248 249 pub struct ProfileStateLoader(Pool<AsyncPgConnection>); 250 impl ProfileStateLoader { 251 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> { 252 let mut conn = self.0.get().await.unwrap(); 253 ··· 259 }) 260 } 261 262 pub async fn get_many( 263 &self, 264 did: &str, ··· 279 pub struct ListLoader(Pool<AsyncPgConnection>); 280 type ListLoaderRet = (models::List, i64); 281 impl BatchFn<String, ListLoaderRet> for ListLoader { 282 async fn load(&mut self, keys: &[String]) -> HashMap<String, ListLoaderRet> { 283 let mut conn = self.0.get().await.unwrap(); 284 ··· 310 311 pub struct ListStateLoader(Pool<AsyncPgConnection>); 312 impl ListStateLoader { 313 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> { 314 let mut conn = self.0.get().await.unwrap(); 315 ··· 321 }) 322 } 323 324 pub async fn get_many( 325 &self, 326 did: &str, ··· 338 } 339 } 340 341 + pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 342 impl BatchFn<String, models::FeedGen> for FeedGenLoader { 343 async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> { 344 let mut conn = self.0.get().await.unwrap(); 345 ··· 365 pub struct PostLoader(Pool<AsyncPgConnection>); 366 type PostLoaderRet = (models::Post, Option<models::Threadgate>); 367 impl BatchFn<String, PostLoaderRet> for PostLoader { 368 async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> { 369 let mut conn = self.0.get().await.unwrap(); 370 ··· 395 396 pub struct PostStatsLoader(parakeet_index::Client); 397 impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader { 398 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> { 399 let stats_req = parakeet_index::GetStatsManyReq { 400 uris: keys.to_vec(), ··· 411 412 pub struct PostStateLoader(Pool<AsyncPgConnection>); 413 impl PostStateLoader { 414 pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> { 415 let mut conn = self.0.get().await.unwrap(); 416 ··· 422 }) 423 } 424 425 pub async fn get_many( 426 &self, 427 did: &str, ··· 449 RecordWithMedia(models::PostEmbedRecord, Box<EmbedLoaderRet>), 450 } 451 impl BatchFn<String, (EmbedLoaderRet, String)> for EmbedLoader { 452 async fn load(&mut self, keys: &[String]) -> HashMap<String, (EmbedLoaderRet, String)> { 453 let mut conn = self.0.get().await.unwrap(); 454 ··· 531 pub struct StarterPackLoader(Pool<AsyncPgConnection>); 532 type StarterPackLoaderRet = models::StaterPack; 533 impl BatchFn<String, StarterPackLoaderRet> for StarterPackLoader { 534 async fn load(&mut self, keys: &[String]) -> HashMap<String, StarterPackLoaderRet> { 535 let mut conn = self.0.get().await.unwrap(); 536 ··· 553 } 554 } 555 556 + pub struct LabelServiceLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 557 type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>); 558 impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader { 559 async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> { 560 let mut conn = self.0.get().await.unwrap(); 561 ··· 594 // but it should live here anyway 595 pub struct LabelLoader(Pool<AsyncPgConnection>); 596 impl LabelLoader { 597 pub async fn load(&self, uri: &str, services: &[LabelConfigItem]) -> Vec<models::Label> { 598 let mut conn = self.0.get().await.unwrap(); 599 ··· 613 }) 614 } 615 616 pub async fn load_many( 617 &self, 618 uris: &[String], ··· 647 648 pub struct VerificationLoader(Pool<AsyncPgConnection>); 649 impl BatchFn<String, Vec<models::VerificationEntry>> for VerificationLoader { 650 async fn load(&mut self, keys: &[String]) -> HashMap<String, Vec<models::VerificationEntry>> { 651 let mut conn = self.0.get().await.unwrap(); 652
+5 -14
parakeet/src/main.rs
··· 1 - use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; 2 use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; 3 use diesel_async::pooled_connection::deadpool::Pool; 4 use diesel_async::pooled_connection::AsyncDieselConnectionManager; ··· 15 mod config; 16 mod db; 17 mod hydration; 18 - mod instrumentation; 19 mod loaders; 20 mod xrpc; 21 ··· 33 34 #[tokio::main] 35 async fn main() -> eyre::Result<()> { 36 - let conf = config::load_config()?; 37 38 - instrumentation::init_instruments(&conf.instruments); 39 40 let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url); 41 let pool = Pool::builder(db_mgr).build()?; ··· 54 let redis_client = redis::Client::open(conf.redis_uri)?; 55 let redis_mp = redis_client.get_multiplexed_tokio_connection().await?; 56 57 - let index_client = parakeet_index::connect_with_otel(conf.index_uri) 58 - .await 59 - .map_err(|e| eyre::eyre!(e))?; 60 61 let dataloaders = Arc::new(loaders::Dataloaders::new( 62 pool.clone(), ··· 83 84 let did_doc = did_web_doc(&conf.service); 85 86 - let mw = tower::ServiceBuilder::new() 87 - .option_layer(conf.instruments.otel_enable.then(OtelInResponseLayer::default)) 88 - .option_layer(conf.instruments.otel_enable.then(OtelAxumLayer::default)) 89 - .layer(TraceLayer::new_for_http()) 90 - .layer(cors); 91 - 92 let app = axum::Router::new() 93 .nest("/xrpc", xrpc::xrpc_routes()) 94 .route( 95 "/.well-known/did.json", 96 axum::routing::get(async || axum::Json(did_doc)), 97 ) 98 - .layer(mw) 99 .with_state(GlobalState { 100 pool, 101 redis_mp,
··· 1 use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; 2 use diesel_async::pooled_connection::deadpool::Pool; 3 use diesel_async::pooled_connection::AsyncDieselConnectionManager; ··· 14 mod config; 15 mod db; 16 mod hydration; 17 mod loaders; 18 mod xrpc; 19 ··· 31 32 #[tokio::main] 33 async fn main() -> eyre::Result<()> { 34 + tracing_subscriber::fmt::init(); 35 36 + let conf = config::load_config()?; 37 38 let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url); 39 let pool = Pool::builder(db_mgr).build()?; ··· 52 let redis_client = redis::Client::open(conf.redis_uri)?; 53 let redis_mp = redis_client.get_multiplexed_tokio_connection().await?; 54 55 + let index_client = parakeet_index::Client::connect(conf.index_uri).await?; 56 57 let dataloaders = Arc::new(loaders::Dataloaders::new( 58 pool.clone(), ··· 79 80 let did_doc = did_web_doc(&conf.service); 81 82 let app = axum::Router::new() 83 .nest("/xrpc", xrpc::xrpc_routes()) 84 .route( 85 "/.well-known/did.json", 86 axum::routing::get(async || axum::Json(did_doc)), 87 ) 88 + .layer(TraceLayer::new_for_http()) 89 + .layer(cors) 90 .with_state(GlobalState { 91 pool, 92 redis_mp,
+1 -1
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 36 rkey: None, 37 subject: &form.uri, 38 subject_cid: Some(form.cid), 39 - subject_type: parts[1], 40 tags: vec![], 41 }; 42
··· 36 rkey: None, 37 subject: &form.uri, 38 subject_cid: Some(form.cid), 39 + subject_type: &parts[1], 40 tags: vec![], 41 }; 42
+2 -6
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 24 use reqwest::Url; 25 use serde::{Deserialize, Serialize}; 26 use std::collections::HashMap; 27 - use tracing::instrument; 28 29 const FEEDGEN_SERVICE_ID: &str = "#bsky_fg"; 30 ··· 161 162 #[derive(Debug, Default, Eq, PartialEq, Deserialize)] 163 #[serde(rename_all = "snake_case")] 164 - #[allow(clippy::enum_variant_names)] 165 pub enum GetAuthorFeedFilter { 166 #[default] 167 PostsWithReplies, ··· 200 if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? { 201 if psr.blocked.unwrap_or_default() { 202 // they block us 203 - return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)); 204 } else if psr.blocking.is_some() { 205 // we block them 206 - return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)); 207 } 208 } 209 } ··· 614 .or(schema::posts::embed_subtype.eq_any(filter)) 615 } 616 617 - #[instrument(skip_all)] 618 async fn get_feed_skeleton( 619 feed: &str, 620 service: &str, ··· 656 } 657 } 658 659 - #[instrument(skip_all)] 660 async fn get_skeleton_repost_data( 661 conn: &mut AsyncPgConnection, 662 reposts: Vec<String>,
··· 24 use reqwest::Url; 25 use serde::{Deserialize, Serialize}; 26 use std::collections::HashMap; 27 28 const FEEDGEN_SERVICE_ID: &str = "#bsky_fg"; 29 ··· 160 161 #[derive(Debug, Default, Eq, PartialEq, Deserialize)] 162 #[serde(rename_all = "snake_case")] 163 pub enum GetAuthorFeedFilter { 164 #[default] 165 PostsWithReplies, ··· 198 if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? { 199 if psr.blocked.unwrap_or_default() { 200 // they block us 201 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)) 202 } else if psr.blocking.is_some() { 203 // we block them 204 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)) 205 } 206 } 207 } ··· 612 .or(schema::posts::embed_subtype.eq_any(filter)) 613 } 614 615 async fn get_feed_skeleton( 616 feed: &str, 617 service: &str, ··· 653 } 654 } 655 656 async fn get_skeleton_repost_data( 657 conn: &mut AsyncPgConnection, 658 reposts: Vec<String>,
+4 -1
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
··· 253 .hydrate_posts(reply_uris) 254 .await 255 .into_iter() 256 - .filter(|(_, post)| matches!(&post.author.viewer, Some(viewer) if viewer.blocked_by || viewer.blocking.is_some())) 257 .map(|(uri, post)| { 258 let post = ThreadItemPost { 259 post,
··· 253 .hydrate_posts(reply_uris) 254 .await 255 .into_iter() 256 + .filter(|(_, post)| match &post.author.viewer { 257 + Some(viewer) if viewer.blocked_by || viewer.blocking.is_some() => false, 258 + _ => true, 259 + }) 260 .map(|(uri, post)| { 261 let post = ThreadItemPost { 262 post,
-3
parakeet/src/xrpc/jwt.rs
··· 4 use std::collections::HashMap; 5 use std::sync::{Arc, LazyLock}; 6 use tokio::sync::RwLock; 7 - use tracing::instrument; 8 9 static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[])); 10 static NO_VERIFY: LazyLock<Validation> = LazyLock::new(|| { ··· 39 } 40 } 41 42 - #[instrument(skip_all)] 43 pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> { 44 // first we need to decode without verifying, to get iss. 45 let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?; ··· 58 self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud) 59 } 60 61 - #[instrument(skip_all)] 62 async fn resolve_key(&self, did: &str) -> Option<String> { 63 tracing::trace!("resolving multikey for {did}"); 64 let did_doc = self.resolver.resolve_did(did).await.ok()??;
··· 4 use std::collections::HashMap; 5 use std::sync::{Arc, LazyLock}; 6 use tokio::sync::RwLock; 7 8 static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[])); 9 static NO_VERIFY: LazyLock<Validation> = LazyLock::new(|| { ··· 38 } 39 } 40 41 pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> { 42 // first we need to decode without verifying, to get iss. 43 let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?; ··· 56 self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud) 57 } 58 59 async fn resolve_key(&self, did: &str) -> Option<String> { 60 tracing::trace!("resolving multikey for {did}"); 61 let did_doc = self.resolver.resolve_did(did).await.ok()??;
+2 -24
parakeet-index/Cargo.toml
··· 10 [dependencies] 11 tonic = "0.13.0" 12 prost = "0.13.5" 13 - tonic-tracing-opentelemetry = { version = "0.32", optional = true } 14 - tower = { version = "0.5", optional = true } 15 16 eyre = { version = "0.6.12", optional = true } 17 figment = { version = "0.10.19", features = ["env", "toml"], optional = true } 18 itertools = { version = "0.14.0", optional = true } 19 - opentelemetry = { version = "0.31.0", optional = true } 20 - opentelemetry-otlp = { version = "0.31.0", features = ["reqwest-rustls"], optional = true } 21 - opentelemetry_sdk = { version = "0.31.0", optional = true } 22 rocksdb = { version = "0.23", default-features = false, features = ["lz4", "bindgen-runtime"], optional = true } 23 serde = { version = "1.0.217", features = ["derive"], optional = true } 24 tokio = { version = "1.42.0", features = ["full"], optional = true } 25 tonic-health = { version = "0.13.0", optional = true } 26 tracing = { version = "0.1.40", optional = true } 27 - tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"], optional = true } 28 - tracing-opentelemetry = { version = "0.32", optional = true } 29 30 [build-dependencies] 31 tonic-build = "0.13.0" 32 33 [features] 34 - otel = ["dep:tonic-tracing-opentelemetry", "dep:tower"] 35 - server = [ 36 - "dep:eyre", 37 - "dep:figment", 38 - "dep:itertools", 39 - "dep:opentelemetry", 40 - "dep:opentelemetry-otlp", 41 - "dep:opentelemetry_sdk", 42 - "dep:rocksdb", 43 - "dep:serde", 44 - "dep:tokio", 45 - "dep:tonic-health", 46 - "otel", 47 - "dep:tracing", 48 - "dep:tracing-subscriber", 49 - "dep:tracing-opentelemetry" 50 - ]
··· 10 [dependencies] 11 tonic = "0.13.0" 12 prost = "0.13.5" 13 14 eyre = { version = "0.6.12", optional = true } 15 figment = { version = "0.10.19", features = ["env", "toml"], optional = true } 16 itertools = { version = "0.14.0", optional = true } 17 rocksdb = { version = "0.23", default-features = false, features = ["lz4", "bindgen-runtime"], optional = true } 18 serde = { version = "1.0.217", features = ["derive"], optional = true } 19 tokio = { version = "1.42.0", features = ["full"], optional = true } 20 tonic-health = { version = "0.13.0", optional = true } 21 tracing = { version = "0.1.40", optional = true } 22 + tracing-subscriber = { version = "0.3.18", optional = true } 23 24 [build-dependencies] 25 tonic-build = "0.13.0" 26 27 [features] 28 + server = ["dep:eyre", "dep:figment", "dep:itertools", "dep:rocksdb", "dep:serde", "dep:tokio", "dep:tonic-health", "dep:tracing", "dep:tracing-subscriber"]
+1 -20
parakeet-index/src/lib.rs
··· 1 - use tonic::transport::Channel; 2 - 3 #[allow(clippy::all)] 4 pub mod index { 5 tonic::include_proto!("parakeet"); 6 } 7 8 pub use index::*; 9 - #[cfg(not(feature = "otel"))] 10 - pub type Client = index_client::IndexClient<Channel>; 11 - #[cfg(feature = "otel")] 12 - pub type Client = index_client::IndexClient< 13 - tonic_tracing_opentelemetry::middleware::client::OtelGrpcService<Channel>, 14 - >; 15 16 #[cfg(feature = "server")] 17 pub mod server; 18 - 19 - #[cfg(feature = "otel")] 20 - pub async fn connect_with_otel( 21 - uri: String, 22 - ) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> { 23 - let channel = Channel::from_shared(uri)?.connect().await?; 24 - let channel = tower::ServiceBuilder::new() 25 - .layer(tonic_tracing_opentelemetry::middleware::client::OtelGrpcLayer) 26 - .service(channel); 27 - 28 - Ok(index_client::IndexClient::new(channel)) 29 - }
··· 1 #[allow(clippy::all)] 2 pub mod index { 3 tonic::include_proto!("parakeet"); 4 } 5 6 pub use index::*; 7 + pub type Client = index_client::IndexClient<tonic::transport::Channel>; 8 9 #[cfg(feature = "server")] 10 pub mod server;
+3 -9
parakeet-index/src/main.rs
··· 1 use parakeet_index::index_server::IndexServer; 2 use parakeet_index::server::service::Service; 3 - use parakeet_index::server::{GlobalState, config, instrumentation}; 4 use std::sync::Arc; 5 use tonic::transport::Server; 6 - use tonic_tracing_opentelemetry::middleware::server::OtelGrpcLayer; 7 8 #[tokio::main] 9 async fn main() -> eyre::Result<()> { 10 - let conf = config::load_config()?; 11 12 - instrumentation::init_instruments(&conf.instruments); 13 14 let db_root = conf.index_db_path.parse()?; 15 let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port); ··· 19 reporter.set_serving::<IndexServer<Service>>().await; 20 21 let service = Service::new(state.clone()); 22 - 23 - let mw = tower::ServiceBuilder::new() 24 - .option_layer(conf.instruments.otel_enable.then(OtelGrpcLayer::default)); 25 - 26 Server::builder() 27 - .layer(mw) 28 .add_service(health_service) 29 .add_service(IndexServer::new(service)) 30 .serve(addr)
··· 1 use parakeet_index::index_server::IndexServer; 2 use parakeet_index::server::service::Service; 3 + use parakeet_index::server::{GlobalState, config}; 4 use std::sync::Arc; 5 use tonic::transport::Server; 6 7 #[tokio::main] 8 async fn main() -> eyre::Result<()> { 9 + tracing_subscriber::fmt::init(); 10 11 + let conf = config::load_config()?; 12 13 let db_root = conf.index_db_path.parse()?; 14 let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port); ··· 18 reporter.set_serving::<IndexServer<Service>>().await; 19 20 let service = Service::new(state.clone()); 21 Server::builder() 22 .add_service(health_service) 23 .add_service(IndexServer::new(service)) 24 .serve(addr)
-10
parakeet-index/src/server/config.rs
··· 13 14 #[derive(Debug, Deserialize)] 15 pub struct Config { 16 - #[serde(flatten)] 17 - pub instruments: ConfigInstruments, 18 pub database_url: String, 19 pub index_db_path: String, 20 #[serde(default)] 21 pub server: ConfigServer, 22 - } 23 - 24 - #[derive(Debug, Deserialize)] 25 - pub struct ConfigInstruments { 26 - #[serde(default)] 27 - pub otel_enable: bool, 28 - #[serde(default)] 29 - pub log_json: bool, 30 } 31 32 #[derive(Debug, Deserialize)]
··· 13 14 #[derive(Debug, Deserialize)] 15 pub struct Config { 16 pub database_url: String, 17 pub index_db_path: String, 18 #[serde(default)] 19 pub server: ConfigServer, 20 } 21 22 #[derive(Debug, Deserialize)]
-57
parakeet-index/src/server/instrumentation.rs
··· 1 - use opentelemetry::trace::TracerProvider; 2 - use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; 3 - use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider}; 4 - use tracing::Subscriber; 5 - use tracing_opentelemetry::OpenTelemetryLayer; 6 - use tracing_subscriber::filter::Filtered; 7 - use tracing_subscriber::layer::SubscriberExt; 8 - use tracing_subscriber::registry::LookupSpan; 9 - use tracing_subscriber::util::SubscriberInitExt; 10 - use tracing_subscriber::{EnvFilter, Layer}; 11 - 12 - pub fn init_instruments(cfg: &super::config::ConfigInstruments) { 13 - let otel_layer = cfg.otel_enable.then(init_otel); 14 - let log_layer = init_log(cfg.log_json); 15 - 16 - tracing_subscriber::registry() 17 - .with(log_layer) 18 - .with(otel_layer) 19 - .init(); 20 - } 21 - 22 - fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S> 23 - where 24 - S: Subscriber + for<'span> LookupSpan<'span>, 25 - { 26 - let span_exporter = SpanExporter::builder() 27 - .with_http() 28 - .with_protocol(Protocol::HttpBinary) 29 - .build() 30 - .unwrap(); 31 - 32 - let tracer_provider = SdkTracerProvider::builder() 33 - .with_batch_exporter(span_exporter) 34 - .with_sampler(Sampler::AlwaysOn) 35 - .build(); 36 - 37 - opentelemetry::global::set_tracer_provider(tracer_provider.clone()); 38 - 39 - let tracer = tracer_provider.tracer("parakeet"); 40 - let otel_filter = EnvFilter::new("info,otel::tracing=trace"); 41 - 42 - OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 43 - } 44 - 45 - fn init_log<S>(json: bool) -> Filtered<Box<dyn Layer<S> + Send + Sync>, EnvFilter, S> 46 - where 47 - S: Subscriber + for<'span> LookupSpan<'span>, 48 - { 49 - let stdout_filter = 50 - EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 51 - 52 - match json { 53 - true => tracing_subscriber::fmt::layer().json().boxed(), 54 - false => tracing_subscriber::fmt::layer().boxed(), 55 - } 56 - .with_filter(stdout_filter) 57 - }
···
-1
parakeet-index/src/server/mod.rs
··· 2 3 pub mod config; 4 pub mod db; 5 - pub mod instrumentation; 6 pub mod service; 7 mod utils; 8
··· 2 3 pub mod config; 4 pub mod db; 5 pub mod service; 6 mod utils; 7