interactive intro to open social

feat: add jetstream firehose backend with SSE streaming

adds rust backend for real-time firehose visualization:
- firehose module connects to jetstream and broadcasts all atproto events
- uses rocketman crate with lexicon ingester pattern
- SSE endpoint at /api/firehose/watch streams filtered events by DID
- auto-reconnects on connection drop

remaining work:
- add UI toggle button and toast notifications in templates
- implement particle animation system in static/app.js
- test with live events

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+446 -4
Cargo.lock
··· 276 276 ] 277 277 278 278 [[package]] 279 + name = "ahash" 280 + version = "0.8.12" 281 + source = "registry+https://github.com/rust-lang/crates.io-index" 282 + checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" 283 + dependencies = [ 284 + "cfg-if", 285 + "once_cell", 286 + "version_check", 287 + "zerocopy", 288 + ] 289 + 290 + [[package]] 279 291 name = "aho-corasick" 280 292 version = "1.1.3" 281 293 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 395 407 ] 396 408 397 409 [[package]] 410 + name = "async-stream" 411 + version = "0.3.6" 412 + source = "registry+https://github.com/rust-lang/crates.io-index" 413 + checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" 414 + dependencies = [ 415 + "async-stream-impl", 416 + "futures-core", 417 + "pin-project-lite", 418 + ] 419 + 420 + [[package]] 421 + name = "async-stream-impl" 422 + version = "0.3.6" 423 + source = "registry+https://github.com/rust-lang/crates.io-index" 424 + checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" 425 + dependencies = [ 426 + "proc-macro2", 427 + "quote", 428 + "syn 2.0.106", 429 + ] 430 + 431 + [[package]] 398 432 name = "async-trait" 399 433 version = "0.1.89" 400 434 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 412 446 "actix-files", 413 447 "actix-session", 414 448 "actix-web", 449 + "anyhow", 450 + "async-stream", 451 + "async-trait", 415 452 "atrium-api", 416 453 "atrium-common", 417 454 "atrium-identity", 418 455 "atrium-oauth", 419 456 "env_logger", 457 + "futures-util", 420 458 "hickory-resolver", 421 459 "log", 422 460 "reqwest", 461 + "rocketman", 423 462 "serde", 424 463 "serde_json", 425 464 "tokio", ··· 577 616 578 617 [[package]] 579 618 name = "base64" 619 + version = "0.21.7" 620 + source = "registry+https://github.com/rust-lang/crates.io-index" 621 + checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" 622 + 623 + [[package]] 624 + name = "base64" 580 625 version = "0.22.1" 581 626 source = "registry+https://github.com/rust-lang/crates.io-index" 582 627 checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" ··· 603 648 ] 604 649 605 650 [[package]] 651 + name = "bon" 652 + version = "3.8.1" 653 + source = "registry+https://github.com/rust-lang/crates.io-index" 654 + checksum = "ebeb9aaf9329dff6ceb65c689ca3db33dbf15f324909c60e4e5eef5701ce31b1" 655 + dependencies = [ 656 + "bon-macros", 657 + "rustversion", 658 + ] 659 + 660 + [[package]] 661 + name = "bon-macros" 662 + version = "3.8.1" 663 + source = "registry+https://github.com/rust-lang/crates.io-index" 664 + checksum = "77e9d642a7e3a318e37c2c9427b5a6a48aa1ad55dcd986f3034ab2239045a645" 665 + dependencies = [ 666 + "darling 0.21.3", 667 + "ident_case", 668 + "prettyplease", 669 + "proc-macro2", 670 + "quote", 671 + "rustversion", 672 + "syn 2.0.106", 673 + ] 674 + 675 + [[package]] 606 676 name = "brotli" 607 677 version = "8.0.2" 608 678 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 628 698 version = "3.19.0" 629 699 source = "registry+https://github.com/rust-lang/crates.io-index" 630 700 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 701 + 702 + [[package]] 703 + name = "byteorder" 704 + version = "1.5.0" 705 + source = "registry+https://github.com/rust-lang/crates.io-index" 706 + checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 631 707 632 708 [[package]] 633 709 name = "bytes" ··· 862 938 ] 863 939 864 940 [[package]] 941 + name = "darling" 942 + version = "0.20.11" 943 + source = "registry+https://github.com/rust-lang/crates.io-index" 944 + checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" 945 + dependencies = [ 946 + "darling_core 0.20.11", 947 + "darling_macro 0.20.11", 948 + ] 949 + 950 + [[package]] 951 + name = "darling" 952 + version = "0.21.3" 953 + source = "registry+https://github.com/rust-lang/crates.io-index" 954 + checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" 955 + dependencies = [ 956 + "darling_core 0.21.3", 957 + "darling_macro 0.21.3", 958 + ] 959 + 960 + [[package]] 961 + name = "darling_core" 962 + version = "0.20.11" 963 + source = "registry+https://github.com/rust-lang/crates.io-index" 964 + checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" 965 + dependencies = [ 966 + "fnv", 967 + "ident_case", 968 + "proc-macro2", 969 + "quote", 970 + "strsim", 971 + "syn 2.0.106", 972 + ] 973 + 974 + [[package]] 975 + name = "darling_core" 976 + version = "0.21.3" 977 + source = "registry+https://github.com/rust-lang/crates.io-index" 978 + checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" 979 + dependencies = [ 980 + "fnv", 981 + "ident_case", 982 + "proc-macro2", 983 + "quote", 984 + "strsim", 985 + "syn 2.0.106", 986 + ] 987 + 988 + [[package]] 989 + name = "darling_macro" 990 + version = "0.20.11" 991 + source = "registry+https://github.com/rust-lang/crates.io-index" 992 + checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" 993 + dependencies = [ 994 + "darling_core 0.20.11", 995 + "quote", 996 + "syn 2.0.106", 997 + ] 998 + 999 + [[package]] 1000 + name = "darling_macro" 1001 + version = "0.21.3" 1002 + source = "registry+https://github.com/rust-lang/crates.io-index" 1003 + checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" 1004 + dependencies = [ 1005 + "darling_core 0.21.3", 1006 + "quote", 1007 + "syn 2.0.106", 1008 + ] 1009 + 1010 + [[package]] 865 1011 name = "dashmap" 866 1012 version = "6.1.0" 867 1013 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 921 1067 ] 922 1068 923 1069 [[package]] 1070 + name = "derive_builder" 1071 + version = "0.20.2" 1072 + source = "registry+https://github.com/rust-lang/crates.io-index" 1073 + checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947" 1074 + dependencies = [ 1075 + "derive_builder_macro", 1076 + ] 1077 + 1078 + [[package]] 1079 + name = "derive_builder_core" 1080 + version = "0.20.2" 1081 + source = "registry+https://github.com/rust-lang/crates.io-index" 1082 + checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" 1083 + dependencies = [ 1084 + "darling 0.20.11", 1085 + "proc-macro2", 1086 + "quote", 1087 + "syn 2.0.106", 1088 + ] 1089 + 1090 + [[package]] 1091 + name = "derive_builder_macro" 1092 + version = "0.20.2" 1093 + source = "registry+https://github.com/rust-lang/crates.io-index" 1094 + checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" 1095 + dependencies = [ 1096 + "derive_builder_core", 1097 + "syn 2.0.106", 1098 + ] 1099 + 1100 + [[package]] 924 1101 name = "derive_more" 925 1102 version = "1.0.0" 926 1103 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1130 1307 ] 1131 1308 1132 1309 [[package]] 1310 + name = "flume" 1311 + version = "0.11.1" 1312 + source = "registry+https://github.com/rust-lang/crates.io-index" 1313 + checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" 1314 + dependencies = [ 1315 + "futures-core", 1316 + "futures-sink", 1317 + "nanorand", 1318 + "spin", 1319 + ] 1320 + 1321 + [[package]] 1133 1322 name = "fnv" 1134 1323 version = "1.0.7" 1135 1324 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1217 1406 dependencies = [ 1218 1407 "futures-core", 1219 1408 "futures-macro", 1409 + "futures-sink", 1220 1410 "futures-task", 1221 1411 "pin-project-lite", 1222 1412 "pin-utils", ··· 1241 1431 checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" 1242 1432 dependencies = [ 1243 1433 "cfg-if", 1434 + "js-sys", 1244 1435 "libc", 1245 1436 "wasi 0.11.1+wasi-snapshot-preview1", 1437 + "wasm-bindgen", 1246 1438 ] 1247 1439 1248 1440 [[package]] ··· 1508 1700 "http 1.3.1", 1509 1701 "hyper", 1510 1702 "hyper-util", 1511 - "rustls", 1703 + "rustls 0.23.31", 1512 1704 "rustls-pki-types", 1513 1705 "tokio", 1514 - "tokio-rustls", 1706 + "tokio-rustls 0.26.2", 1515 1707 "tower-service", 1516 1708 ] 1517 1709 ··· 1668 1860 ] 1669 1861 1670 1862 [[package]] 1863 + name = "ident_case" 1864 + version = "1.0.1" 1865 + source = "registry+https://github.com/rust-lang/crates.io-index" 1866 + checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" 1867 + 1868 + [[package]] 1671 1869 name = "idna" 1672 1870 version = "1.1.0" 1673 1871 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1869 2067 checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" 1870 2068 1871 2069 [[package]] 2070 + name = "lazy_static" 2071 + version = "1.5.0" 2072 + source = "registry+https://github.com/rust-lang/crates.io-index" 2073 + checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" 2074 + 2075 + [[package]] 1872 2076 name = "libc" 1873 2077 version = "0.2.176" 1874 2078 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1960 2164 checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" 1961 2165 1962 2166 [[package]] 2167 + name = "metrics" 2168 + version = "0.24.2" 2169 + source = "registry+https://github.com/rust-lang/crates.io-index" 2170 + checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5" 2171 + dependencies = [ 2172 + "ahash", 2173 + "portable-atomic", 2174 + ] 2175 + 2176 + [[package]] 1963 2177 name = "mime" 1964 2178 version = "0.3.17" 1965 2179 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2042 2256 ] 2043 2257 2044 2258 [[package]] 2259 + name = "nanorand" 2260 + version = "0.7.0" 2261 + source = "registry+https://github.com/rust-lang/crates.io-index" 2262 + checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" 2263 + dependencies = [ 2264 + "getrandom 0.2.16", 2265 + ] 2266 + 2267 + [[package]] 2045 2268 name = "native-tls" 2046 2269 version = "0.2.14" 2047 2270 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2056 2279 "security-framework", 2057 2280 "security-framework-sys", 2058 2281 "tempfile", 2282 + ] 2283 + 2284 + [[package]] 2285 + name = "nu-ansi-term" 2286 + version = "0.50.3" 2287 + source = "registry+https://github.com/rust-lang/crates.io-index" 2288 + checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" 2289 + dependencies = [ 2290 + "windows-sys 0.61.1", 2059 2291 ] 2060 2292 2061 2293 [[package]] ··· 2261 2493 ] 2262 2494 2263 2495 [[package]] 2496 + name = "prettyplease" 2497 + version = "0.2.37" 2498 + source = "registry+https://github.com/rust-lang/crates.io-index" 2499 + checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" 2500 + dependencies = [ 2501 + "proc-macro2", 2502 + "syn 2.0.106", 2503 + ] 2504 + 2505 + [[package]] 2264 2506 name = "primeorder" 2265 2507 version = "0.13.6" 2266 2508 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2470 2712 ] 2471 2713 2472 2714 [[package]] 2715 + name = "rocketman" 2716 + version = "0.2.5" 2717 + source = "registry+https://github.com/rust-lang/crates.io-index" 2718 + checksum = "90cfc4ee9daf6e9d0ee217b9709aa3bd6c921e6926aa15c6ff5ba9162c2c649a" 2719 + dependencies = [ 2720 + "anyhow", 2721 + "async-trait", 2722 + "bon", 2723 + "derive_builder", 2724 + "flume", 2725 + "futures-util", 2726 + "metrics", 2727 + "rand 0.8.5", 2728 + "serde", 2729 + "serde_json", 2730 + "tokio", 2731 + "tokio-tungstenite", 2732 + "tracing", 2733 + "tracing-subscriber", 2734 + "url", 2735 + "zstd", 2736 + ] 2737 + 2738 + [[package]] 2473 2739 name = "rustc-demangle" 2474 2740 version = "0.1.26" 2475 2741 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2495 2761 "libc", 2496 2762 "linux-raw-sys", 2497 2763 "windows-sys 0.61.1", 2764 + ] 2765 + 2766 + [[package]] 2767 + name = "rustls" 2768 + version = "0.21.12" 2769 + source = "registry+https://github.com/rust-lang/crates.io-index" 2770 + checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" 2771 + dependencies = [ 2772 + "log", 2773 + "ring", 2774 + "rustls-webpki 0.101.7", 2775 + "sct", 2498 2776 ] 2499 2777 2500 2778 [[package]] ··· 2505 2783 dependencies = [ 2506 2784 "once_cell", 2507 2785 "rustls-pki-types", 2508 - "rustls-webpki", 2786 + "rustls-webpki 0.103.4", 2509 2787 "subtle", 2510 2788 "zeroize", 2511 2789 ] 2512 2790 2513 2791 [[package]] 2792 + name = "rustls-native-certs" 2793 + version = "0.6.3" 2794 + source = "registry+https://github.com/rust-lang/crates.io-index" 2795 + checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" 2796 + dependencies = [ 2797 + "openssl-probe", 2798 + "rustls-pemfile", 2799 + "schannel", 2800 + "security-framework", 2801 + ] 2802 + 2803 + [[package]] 2804 + name = "rustls-pemfile" 2805 + version = "1.0.4" 2806 + source = "registry+https://github.com/rust-lang/crates.io-index" 2807 + checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" 2808 + dependencies = [ 2809 + "base64 0.21.7", 2810 + ] 2811 + 2812 + [[package]] 2514 2813 name = "rustls-pki-types" 2515 2814 version = "1.12.0" 2516 2815 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2521 2820 2522 2821 [[package]] 2523 2822 name = "rustls-webpki" 2823 + version = "0.101.7" 2824 + source = "registry+https://github.com/rust-lang/crates.io-index" 2825 + checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" 2826 + dependencies = [ 2827 + "ring", 2828 + "untrusted", 2829 + ] 2830 + 2831 + [[package]] 2832 + name = "rustls-webpki" 2524 2833 version = "0.103.4" 2525 2834 source = "registry+https://github.com/rust-lang/crates.io-index" 2526 2835 checksum = "0a17884ae0c1b773f1ccd2bd4a8c72f16da897310a98b0e84bf349ad5ead92fc" ··· 2556 2865 version = "1.2.0" 2557 2866 source = "registry+https://github.com/rust-lang/crates.io-index" 2558 2867 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 2868 + 2869 + [[package]] 2870 + name = "sct" 2871 + version = "0.7.1" 2872 + source = "registry+https://github.com/rust-lang/crates.io-index" 2873 + checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" 2874 + dependencies = [ 2875 + "ring", 2876 + "untrusted", 2877 + ] 2559 2878 2560 2879 [[package]] 2561 2880 name = "sec1" ··· 2700 3019 ] 2701 3020 2702 3021 [[package]] 3022 + name = "sharded-slab" 3023 + version = "0.1.7" 3024 + source = "registry+https://github.com/rust-lang/crates.io-index" 3025 + checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" 3026 + dependencies = [ 3027 + "lazy_static", 3028 + ] 3029 + 3030 + [[package]] 2703 3031 name = "shlex" 2704 3032 version = "1.3.0" 2705 3033 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2763 3091 ] 2764 3092 2765 3093 [[package]] 3094 + name = "spin" 3095 + version = "0.9.8" 3096 + source = "registry+https://github.com/rust-lang/crates.io-index" 3097 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 3098 + dependencies = [ 3099 + "lock_api", 3100 + ] 3101 + 3102 + [[package]] 2766 3103 name = "stable_deref_trait" 2767 3104 version = "1.2.0" 2768 3105 source = "registry+https://github.com/rust-lang/crates.io-index" 2769 3106 checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" 3107 + 3108 + [[package]] 3109 + name = "strsim" 3110 + version = "0.11.1" 3111 + source = "registry+https://github.com/rust-lang/crates.io-index" 3112 + checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" 2770 3113 2771 3114 [[package]] 2772 3115 name = "subtle" ··· 2877 3220 ] 2878 3221 2879 3222 [[package]] 3223 + name = "thread_local" 3224 + version = "1.1.9" 3225 + source = "registry+https://github.com/rust-lang/crates.io-index" 3226 + checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" 3227 + dependencies = [ 3228 + "cfg-if", 3229 + ] 3230 + 3231 + [[package]] 2880 3232 name = "time" 2881 3233 version = "0.3.44" 2882 3234 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2975 3327 2976 3328 [[package]] 2977 3329 name = "tokio-rustls" 3330 + version = "0.24.1" 3331 + source = "registry+https://github.com/rust-lang/crates.io-index" 3332 + checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" 3333 + dependencies = [ 3334 + "rustls 0.21.12", 3335 + "tokio", 3336 + ] 3337 + 3338 + [[package]] 3339 + name = "tokio-rustls" 2978 3340 version = "0.26.2" 2979 3341 source = "registry+https://github.com/rust-lang/crates.io-index" 2980 3342 checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" 2981 3343 dependencies = [ 2982 - "rustls", 3344 + "rustls 0.23.31", 2983 3345 "tokio", 3346 + ] 3347 + 3348 + [[package]] 3349 + name = "tokio-tungstenite" 3350 + version = "0.20.1" 3351 + source = "registry+https://github.com/rust-lang/crates.io-index" 3352 + checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" 3353 + dependencies = [ 3354 + "futures-util", 3355 + "log", 3356 + "rustls 0.21.12", 3357 + "rustls-native-certs", 3358 + "tokio", 3359 + "tokio-rustls 0.24.1", 3360 + "tungstenite", 3361 + "webpki-roots", 2984 3362 ] 2985 3363 2986 3364 [[package]] ··· 3071 3449 checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" 3072 3450 dependencies = [ 3073 3451 "once_cell", 3452 + "valuable", 3453 + ] 3454 + 3455 + [[package]] 3456 + name = "tracing-log" 3457 + version = "0.2.0" 3458 + source = "registry+https://github.com/rust-lang/crates.io-index" 3459 + checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" 3460 + dependencies = [ 3461 + "log", 3462 + "once_cell", 3463 + "tracing-core", 3464 + ] 3465 + 3466 + [[package]] 3467 + name = "tracing-subscriber" 3468 + version = "0.3.20" 3469 + source = "registry+https://github.com/rust-lang/crates.io-index" 3470 + checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" 3471 + dependencies = [ 3472 + "nu-ansi-term", 3473 + "sharded-slab", 3474 + "smallvec", 3475 + "thread_local", 3476 + "tracing-core", 3477 + "tracing-log", 3074 3478 ] 3075 3479 3076 3480 [[package]] ··· 3091 3495 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 3092 3496 3093 3497 [[package]] 3498 + name = "tungstenite" 3499 + version = "0.20.1" 3500 + source = "registry+https://github.com/rust-lang/crates.io-index" 3501 + checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" 3502 + dependencies = [ 3503 + "byteorder", 3504 + "bytes", 3505 + "data-encoding", 3506 + "http 0.2.12", 3507 + "httparse", 3508 + "log", 3509 + "rand 0.8.5", 3510 + "rustls 0.21.12", 3511 + "sha1", 3512 + "thiserror", 3513 + "url", 3514 + "utf-8", 3515 + ] 3516 + 3517 + [[package]] 3094 3518 name = "typenum" 3095 3519 version = "1.19.0" 3096 3520 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3149 3573 ] 3150 3574 3151 3575 [[package]] 3576 + name = "utf-8" 3577 + version = "0.7.6" 3578 + source = "registry+https://github.com/rust-lang/crates.io-index" 3579 + checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" 3580 + 3581 + [[package]] 3152 3582 name = "utf8_iter" 3153 3583 version = "1.0.4" 3154 3584 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3176 3606 version = "0.15.8" 3177 3607 source = "registry+https://github.com/rust-lang/crates.io-index" 3178 3608 checksum = "4e8257fbc510f0a46eb602c10215901938b5c2a7d5e70fc11483b1d3c9b5b18c" 3609 + 3610 + [[package]] 3611 + name = "valuable" 3612 + version = "0.1.1" 3613 + source = "registry+https://github.com/rust-lang/crates.io-index" 3614 + checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" 3179 3615 3180 3616 [[package]] 3181 3617 name = "vcpkg" ··· 3313 3749 "js-sys", 3314 3750 "wasm-bindgen", 3315 3751 ] 3752 + 3753 + [[package]] 3754 + name = "webpki-roots" 3755 + version = "0.25.4" 3756 + source = "registry+https://github.com/rust-lang/crates.io-index" 3757 + checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" 3316 3758 3317 3759 [[package]] 3318 3760 name = "widestring"
+5
Cargo.toml
··· 18 18 env_logger = "0.11" 19 19 log = "0.4" 20 20 reqwest = { version = "0.12", features = ["json"] } 21 + rocketman = "0.2.0" 22 + futures-util = "0.3" 23 + anyhow = "1.0" 24 + async-stream = "0.3" 25 + async-trait = "0.1"
+146
src/firehose.rs
··· 1 + use anyhow::Result; 2 + use async_trait::async_trait; 3 + use log::{error, info}; 4 + use rocketman::{ 5 + connection::JetstreamConnection, 6 + ingestion::LexiconIngestor, 7 + options::JetstreamOptions, 8 + types::event::{Event, Operation}, 9 + }; 10 + use serde::{Deserialize, Serialize}; 11 + use serde_json::Value; 12 + use std::collections::HashMap; 13 + use std::sync::{Arc, Mutex}; 14 + use tokio::sync::broadcast; 15 + 16 + /// Represents a firehose event that will be sent to the browser 17 + #[derive(Debug, Clone, Serialize, Deserialize)] 18 + #[serde(rename_all = "camelCase")] 19 + pub struct FirehoseEvent { 20 + pub did: String, 21 + pub action: String, // "create", "update", or "delete" 22 + pub collection: String, 23 + pub rkey: String, 24 + pub namespace: String, // e.g., "app.bsky" extracted from collection 25 + } 26 + 27 + /// Broadcaster for firehose events 28 + pub type FirehoseBroadcaster = Arc<broadcast::Sender<FirehoseEvent>>; 29 + 30 + /// A generic ingester that broadcasts all events 31 + struct BroadcastIngester { 32 + broadcaster: FirehoseBroadcaster, 33 + } 34 + 35 + #[async_trait] 36 + impl LexiconIngestor for BroadcastIngester { 37 + async fn ingest(&self, message: Event<Value>) -> Result<()> { 38 + // Only process commit events 39 + let Some(commit) = &message.commit else { 40 + return Ok(()); 41 + }; 42 + 43 + // Extract namespace from collection (e.g., "app.bsky.feed.post" -> "app.bsky") 44 + let collection_parts: Vec<&str> = commit.collection.split('.').collect(); 45 + let namespace = if collection_parts.len() >= 2 { 46 + format!("{}.{}", collection_parts[0], collection_parts[1]) 47 + } else { 48 + commit.collection.clone() 49 + }; 50 + 51 + let action = match commit.operation { 52 + Operation::Create => "create", 53 + Operation::Update => "update", 54 + Operation::Delete => "delete", 55 + }; 56 + 57 + let firehose_event = FirehoseEvent { 58 + did: message.did.clone(), 59 + action: action.to_string(), 60 + collection: commit.collection.clone(), 61 + rkey: commit.rkey.clone(), 62 + namespace, 63 + }; 64 + 65 + // Broadcast the event (ignore if no receivers) 66 + let _ = self.broadcaster.send(firehose_event); 67 + 68 + Ok(()) 69 + } 70 + } 71 + 72 + /// Start the Jetstream ingester that broadcasts events to all listeners 73 + pub async fn start_firehose_broadcaster() -> FirehoseBroadcaster { 74 + // Create a broadcast channel with a buffer of 100 events 75 + let (tx, _rx) = broadcast::channel::<FirehoseEvent>(100); 76 + let broadcaster = Arc::new(tx); 77 + 78 + let broadcaster_clone = broadcaster.clone(); 79 + 80 + tokio::spawn(async move { 81 + loop { 82 + info!("Starting Jetstream connection..."); 83 + 84 + // Configure Jetstream to receive all events (no collection filter) 85 + let opts = JetstreamOptions::builder().build(); 86 + let jetstream = JetstreamConnection::new(opts); 87 + 88 + // Create ingesters - we use a wildcard to capture all collections 89 + let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = 90 + HashMap::new(); 91 + 92 + // Use "*" as a catch-all for all collections 93 + ingesters.insert( 94 + "*".to_string(), 95 + Box::new(BroadcastIngester { 96 + broadcaster: broadcaster_clone.clone(), 97 + }), 98 + ); 99 + 100 + // Get channels 101 + let msg_rx = jetstream.get_msg_rx(); 102 + let reconnect_tx = jetstream.get_reconnect_tx(); 103 + 104 + // Cursor for tracking last processed message 105 + let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None)); 106 + let c_cursor = cursor.clone(); 107 + 108 + // Spawn task to process messages 109 + tokio::spawn(async move { 110 + while let Ok(message) = msg_rx.recv_async().await { 111 + if let Err(e) = rocketman::handler::handle_message( 112 + message, 113 + &ingesters, 114 + reconnect_tx.clone(), 115 + c_cursor.clone(), 116 + ) 117 + .await 118 + { 119 + error!("Error processing message: {}", e); 120 + } 121 + } 122 + }); 123 + 124 + // Connect to Jetstream 125 + let failed = { 126 + let connect_result = jetstream.connect(cursor).await; 127 + if let Err(e) = connect_result { 128 + error!("Jetstream connection failed: {}", e); 129 + true 130 + } else { 131 + false 132 + } 133 + }; 134 + 135 + if failed { 136 + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 137 + continue; 138 + } 139 + 140 + info!("Jetstream connection dropped, reconnecting in 5 seconds..."); 141 + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 142 + } 143 + }); 144 + 145 + broadcaster 146 + }
+6
src/main.rs
··· 2 2 use actix_web::{App, HttpServer, cookie::{Key, time::Duration}, middleware, web}; 3 3 use actix_files::Files; 4 4 5 + mod firehose; 5 6 mod mst; 6 7 mod oauth; 7 8 mod routes; ··· 13 14 14 15 let client = oauth::create_oauth_client(); 15 16 17 + // Start the firehose broadcaster 18 + let firehose_broadcaster = firehose::start_firehose_broadcaster().await; 19 + 16 20 println!("starting server at http://localhost:8080"); 17 21 18 22 HttpServer::new(move || { ··· 31 35 .build(), 32 36 ) 33 37 .app_data(web::Data::new(client.clone())) 38 + .app_data(web::Data::new(firehose_broadcaster.clone())) 34 39 .service(routes::index) 35 40 .service(routes::login) 36 41 .service(routes::callback) ··· 41 46 .service(routes::init) 42 47 .service(routes::get_avatar) 43 48 .service(routes::validate_url) 49 + .service(routes::firehose_watch) 44 50 .service(routes::favicon) 45 51 .service(Files::new("/static", "./static")) 46 52 })
+37
src/routes.rs
··· 3 3 use atrium_oauth::{AuthorizeOptions, CallbackParams, KnownScope, Scope}; 4 4 use serde::Deserialize; 5 5 6 + use crate::firehose::FirehoseBroadcaster; 6 7 use crate::mst; 7 8 use crate::oauth::OAuthClientType; 8 9 use crate::templates; ··· 390 391 "valid": is_valid 391 392 })) 392 393 } 394 + 395 + #[derive(Deserialize)] 396 + pub struct FirehoseQuery { 397 + did: String, 398 + } 399 + 400 + #[get("/api/firehose/watch")] 401 + pub async fn firehose_watch( 402 + query: web::Query<FirehoseQuery>, 403 + broadcaster: web::Data<FirehoseBroadcaster>, 404 + ) -> HttpResponse { 405 + let did = query.did.clone(); 406 + let mut rx = broadcaster.subscribe(); 407 + 408 + let stream = async_stream::stream! { 409 + // Send initial connection message 410 + yield Ok::<_, actix_web::Error>( 411 + web::Bytes::from(format!("data: {{\"type\":\"connected\"}}\n\n")) 412 + ); 413 + 414 + // Stream firehose events filtered by DID 415 + while let Ok(event) = rx.recv().await { 416 + // Only send events for this DID 417 + if event.did == did { 418 + let json = serde_json::to_string(&event).unwrap_or_default(); 419 + yield Ok(web::Bytes::from(format!("data: {}\n\n", json))); 420 + } 421 + } 422 + }; 423 + 424 + HttpResponse::Ok() 425 + .content_type("text/event-stream") 426 + .insert_header(("Cache-Control", "no-cache")) 427 + .insert_header(("X-Accel-Buffering", "no")) 428 + .streaming(Box::pin(stream)) 429 + }