tracks lexicons and how many times they appeared on the jetstream

Compare changes

Choose any two refs to compare.

+12 -3
README.md
··· 1 - a webapp and server that monitors the jetstream and tracks the different lexicons as they are created or deleted. 2 - it shows you which collections are most active on the network. 1 + a webapp and server that monitors the jetstream and tracks the different 2 + lexicons as they are created or deleted. it shows you which collections are most 3 + active on the network. 3 4 4 5 for backend it uses rust with fjall as db, the frontend is built with sveltekit. 5 6 6 7 see [here](https://gaze.systems/nsid-tracker) for a hosted instance of it. 7 8 9 + ## performance / storage 10 + 11 + it uses about 50MB of space for 620M recorded events (events being just 12 + timestamp in seconds and deleted boolean for now). and around 50-60ms for 13 + querying 300-400k events. 14 + 15 + this is on a machine with AMD EPYC 7281 (32) @ 2.100GHz. 16 + 8 17 ## running 9 18 10 19 ### with nix 11 20 12 - - run the server: `nix run git+https://tangled.sh/@poor.dog/nsid-tracker#server` 21 + - build the server: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#server` 13 22 - build the client: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#client` 14 23 15 24 ### manually
+9
client/bun.lock
··· 5 5 "name": "nsid-tracker", 6 6 "dependencies": { 7 7 "@number-flow/svelte": "^0.3.9", 8 + "svelte-adapter-bun": "^0.5.2", 8 9 }, 9 10 "devDependencies": { 10 11 "@eslint/compat": "^1.2.5", ··· 353 354 354 355 "globals": ["globals@16.3.0", "", {}, "sha512-bqWEnJ1Nt3neqx2q5SFfGS8r/ahumIakg3HcwtNlrVlwXIeNumWn/c7Pn/wKzGhf6SaW6H6uWXLqC30STCMchQ=="], 355 356 357 + "globalyzer": ["globalyzer@0.1.0", "", {}, "sha512-40oNTM9UfG6aBmuKxk/giHn5nQ8RVz/SS4Ir6zgzOv9/qC3kKZ9v4etGTcJbEl/NyVQH7FGU7d+X1egr57Md2Q=="], 358 + 359 + "globrex": ["globrex@0.1.2", "", {}, "sha512-uHJgbwAMwNFf5mLst7IWLNg14x1CkeqglJb/K3doi4dw6q2IvAAmM/Y81kevy83wP+Sst+nutFTYOGg3d1lsxg=="], 360 + 356 361 "graceful-fs": ["graceful-fs@4.2.11", "", {}, "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="], 357 362 358 363 "graphemer": ["graphemer@1.4.0", "", {}, "sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag=="], ··· 511 516 512 517 "svelte": ["svelte@5.36.8", "", { "dependencies": { "@ampproject/remapping": "^2.3.0", "@jridgewell/sourcemap-codec": "^1.5.0", "@sveltejs/acorn-typescript": "^1.0.5", "@types/estree": "^1.0.5", "acorn": "^8.12.1", "aria-query": "^5.3.1", "axobject-query": "^4.1.0", "clsx": "^2.1.1", "esm-env": "^1.2.1", "esrap": "^2.1.0", "is-reference": "^3.0.3", "locate-character": "^3.0.0", "magic-string": "^0.30.11", "zimmerframe": "^1.1.2" } }, "sha512-8JbZWQu96hMjH/oYQPxXW6taeC6Awl6muGHeZzJTxQx7NGRQ/J9wN1hkzRKLOlSDlbS2igiFg7p5xyTp5uXG3A=="], 513 518 519 + "svelte-adapter-bun": ["svelte-adapter-bun@0.5.2", "", { "dependencies": { "tiny-glob": "^0.2.9" } }, "sha512-xEtFgaal6UgrCwwkSIcapO9kopoFNUYCYqyKCikdqxX9bz2TDYnrWQZ7qBnkunMxi1HOIERUCvTcebYGiarZLA=="], 520 + 514 521 "svelte-check": ["svelte-check@4.3.0", "", { "dependencies": { "@jridgewell/trace-mapping": "^0.3.25", "chokidar": "^4.0.1", "fdir": "^6.2.0", "picocolors": "^1.0.0", "sade": "^1.7.4" }, "peerDependencies": { "svelte": "^4.0.0 || ^5.0.0-next.0", "typescript": ">=5.0.0" }, "bin": { "svelte-check": "bin/svelte-check" } }, "sha512-Iz8dFXzBNAM7XlEIsUjUGQhbEE+Pvv9odb9+0+ITTgFWZBGeJRRYqHUUglwe2EkLD5LIsQaAc4IUJyvtKuOO5w=="], 515 522 516 523 "svelte-eslint-parser": ["svelte-eslint-parser@1.3.0", "", { "dependencies": { "eslint-scope": "^8.2.0", "eslint-visitor-keys": "^4.0.0", "espree": "^10.0.0", "postcss": "^8.4.49", "postcss-scss": "^4.0.9", "postcss-selector-parser": "^7.0.0" }, "peerDependencies": { "svelte": "^3.37.0 || ^4.0.0 || ^5.0.0" }, "optionalPeers": ["svelte"] }, "sha512-VCgMHKV7UtOGcGLGNFSbmdm6kEKjtzo5nnpGU/mnx4OsFY6bZ7QwRF5DUx+Hokw5Lvdyo8dpk8B1m8mliomrNg=="], ··· 520 527 "tapable": ["tapable@2.2.2", "", {}, "sha512-Re10+NauLTMCudc7T5WLFLAwDhQ0JWdrMK+9B2M8zR5hRExKmsRDCBA7/aV/pNJFltmBFO5BAMlQFi/vq3nKOg=="], 521 528 522 529 "tar": ["tar@7.4.3", "", { "dependencies": { "@isaacs/fs-minipass": "^4.0.0", "chownr": "^3.0.0", "minipass": "^7.1.2", "minizlib": "^3.0.1", "mkdirp": "^3.0.1", "yallist": "^5.0.0" } }, "sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw=="], 530 + 531 + "tiny-glob": ["tiny-glob@0.2.9", "", { "dependencies": { "globalyzer": "0.1.0", "globrex": "^0.1.2" } }, "sha512-g/55ssRPUjShh+xkfx9UPDXqhckHEsHr4Vd9zX55oSdGZc/MD0m3sferOkwWtp98bv+kcVfEHtRJgBVJzelrzg=="], 523 532 524 533 "tinyglobby": ["tinyglobby@0.2.14", "", { "dependencies": { "fdir": "^6.4.4", "picomatch": "^4.0.2" } }, "sha512-tX5e7OM1HnYr2+a2C/4V0htOcSQcoSTH9KgJnVvNm5zm/cyEWKJ7j7YutsH9CxMdtOkkLFy2AHrMci9IM8IPZQ=="], 525 534
+2 -1
client/package.json
··· 31 31 }, 32 32 "type": "module", 33 33 "dependencies": { 34 - "@number-flow/svelte": "^0.3.9" 34 + "@number-flow/svelte": "^0.3.9", 35 + "svelte-adapter-bun": "^0.5.2" 35 36 } 36 37 }
+4
client/src/app.css
··· 28 28 overflow-y: overlay; 29 29 overflow-y: auto; /* Fallback for browsers that don't support overlay */ 30 30 } 31 + 32 + .wsbadge { 33 + @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 34 + }
+9 -9
client/src/app.html
··· 1 1 <!doctype html> 2 2 <html lang="en"> 3 - <head> 4 - <meta charset="utf-8" /> 5 - <link rel="icon" href="%sveltekit.assets%/favicon.svg" /> 6 - <meta name="viewport" content="width=device-width, initial-scale=1" /> 7 - %sveltekit.head% 8 - </head> 9 - <body data-sveltekit-preload-data="hover"> 10 - <div style="display: contents">%sveltekit.body%</div> 11 - </body> 3 + <head> 4 + <meta charset="utf-8" /> 5 + <link rel="icon" href="%sveltekit.assets%/favicon.svg" /> 6 + <meta name="viewport" content="width=device-width, initial-scale=1" /> 7 + %sveltekit.head% 8 + </head> 9 + <body class="bg-white dark:bg-gray-900" data-sveltekit-preload-data="hover"> 10 + <div style="display: contents">%sveltekit.body%</div> 11 + </body> 12 12 </html>
+2 -9
client/src/lib/components/BskyToggle.svelte
··· 11 11 <!-- svelte-ignore a11y_no_static_element_interactions --> 12 12 <button 13 13 onclick={onBskyToggle} 14 - class="wsbadge !mt-0 !font-normal bg-yellow-100 hover:bg-yellow-200 border-yellow-300" 14 + class="wsbadge !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700" 15 15 > 16 16 <input checked={dontShowBsky} type="checkbox" /> 17 - <span class="ml-0.5"> hide app.bsky.* </span> 17 + <span class="ml-0.5 text-black dark:text-gray-200"> hide app.bsky.* </span> 18 18 </button> 19 - 20 - <style lang="postcss"> 21 - @reference "../../app.css"; 22 - .wsbadge { 23 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 24 - } 25 - </style>
+8 -5
client/src/lib/components/EventCard.svelte
··· 104 104 </script> 105 105 106 106 <div 107 - class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white border border-gray-200 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform" 107 + class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white dark:bg-gray-800/50 border border-gray-200 dark:border-gray-950 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform" 108 108 class:has-activity={isAnimating} 109 109 style="--border-thickness: {borderThickness}px" 110 110 > 111 111 <div class="flex items-start gap-2"> 112 112 <div 113 - class="text-sm font-bold text-blue-600 bg-blue-100 px-3 py-1 rounded-full" 113 + class="text-sm font-bold text-blue-600 bg-blue-100 dark:bg-indigo-950 px-3 py-1 rounded-full" 114 114 > 115 115 #{index + 1} 116 116 </div> 117 117 <div 118 118 title={event.nsid} 119 - class="font-mono text-sm text-gray-700 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 border-gray-100 group-hover:border transition-all px-1" 119 + class="font-mono text-sm text-gray-700 dark:text-gray-300 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 dark:group-hover:bg-gray-700 border-gray-100 dark:border-gray-900 group-hover:border transition-all px-1" 120 120 > 121 121 {event.nsid} 122 122 </div> ··· 136 136 </div> 137 137 </div> 138 138 139 - <style> 139 + <style lang="postcss"> 140 140 .has-activity { 141 141 position: relative; 142 142 transition: all 0.2s ease-out; 143 143 } 144 144 145 145 .has-activity::before { 146 + @reference "../../app.css"; 147 + @apply border-blue-500 dark:border-blue-800; 146 148 content: ""; 147 149 position: absolute; 148 150 top: calc(-1 * var(--border-thickness)); 149 151 left: calc(-1 * var(--border-thickness)); 150 152 right: calc(-1 * var(--border-thickness)); 151 153 bottom: calc(-1 * var(--border-thickness)); 152 - border: var(--border-thickness) solid rgba(59, 130, 246, 0.8); 154 + border-width: var(--border-thickness); 155 + border-style: solid; 153 156 border-radius: calc(0.5rem + var(--border-thickness)); 154 157 pointer-events: none; 155 158 transition: all 0.3s ease-out;
+5 -10
client/src/lib/components/FilterControls.svelte
··· 8 8 </script> 9 9 10 10 <div 11 - class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 hover:bg-blue-200 border-blue-300" 11 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700" 12 12 > 13 - <label for="filter-regex" class="text-blue-800 mr-1"> filter: </label> 13 + <label for="filter-regex" class="text-blue-800 dark:text-gray-200 mr-1"> 14 + filter: 15 + </label> 14 16 <input 15 17 id="filter-regex" 16 18 value={filterRegex} 17 19 oninput={(e) => onFilterChange((e.target as HTMLInputElement).value)} 18 20 type="text" 19 21 placeholder="regex..." 20 - class="bg-blue-50 text-blue-900 placeholder-blue-400 border border-blue-200 rounded-full px-1 outline-none focus:bg-white focus:border-blue-400 min-w-0 w-24" 22 + class="bg-blue-50 dark:bg-blue-950 text-blue-900 dark:text-gray-400 placeholder-blue-400 dark:placeholder-blue-700 border border-blue-200 dark:border-blue-700 rounded-full px-1 outline-none focus:border-blue-400 min-w-0 w-24" 21 23 /> 22 24 </div> 23 - 24 - <style lang="postcss"> 25 - @reference "../../app.css"; 26 - .wsbadge { 27 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 28 - } 29 - </style>
+6 -11
client/src/lib/components/RefreshControl.svelte
··· 8 8 </script> 9 9 10 10 <div 11 - class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-green-100 hover:bg-green-200 border-green-300" 11 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-lime-100 dark:bg-lime-900 dark:hover:bg-lime-800 hover:bg-lime-200 border-lime-300 dark:border-lime-700" 12 12 > 13 - <label for="refresh-rate" class="text-green-800 mr-1">refresh:</label> 13 + <label for="refresh-rate" class="text-lime-800 dark:text-lime-200 mr-1" 14 + >refresh:</label 15 + > 14 16 <input 15 17 id="refresh-rate" 16 18 value={refreshRate} ··· 24 26 pattern="[0-9]*" 25 27 min="0" 26 28 placeholder="real-time" 27 - class="bg-green-50 text-green-900 placeholder-green-400 border border-green-200 rounded-full px-1 outline-none focus:bg-white focus:border-green-400 min-w-0 w-20" 29 + class="bg-green-50 dark:bg-green-900 text-lime-900 dark:text-lime-200 placeholder-lime-600 dark:placeholder-lime-400 border border-lime-200 dark:border-lime-700 rounded-full px-1 outline-none focus:border-lime-400 min-w-0 w-20" 28 30 /> 29 - <span class="text-green-700">s</span> 31 + <span class="text-lime-800 dark:text-lime-200">s</span> 30 32 </div> 31 - 32 - <style lang="postcss"> 33 - @reference "../../app.css"; 34 - .wsbadge { 35 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 36 - } 37 - </style>
+31
client/src/lib/components/ShowControls.svelte
··· 1 + <script lang="ts"> 2 + import type { ShowOption } from "$lib/types"; 3 + 4 + interface Props { 5 + show: ShowOption; 6 + onShowChange: (value: ShowOption) => void; 7 + } 8 + 9 + let { show, onShowChange }: Props = $props(); 10 + 11 + const showOptions: ShowOption[] = ["server init", "stream start"]; 12 + </script> 13 + 14 + <div 15 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-pink-100 dark:bg-pink-800 hover:bg-pink-200 dark:hover:bg-pink-700 border-pink-300 dark:border-pink-700" 16 + > 17 + <label for="show" class="text-pink-800 dark:text-pink-100 mr-1"> 18 + show since: 19 + </label> 20 + <select 21 + id="show" 22 + value={show} 23 + onchange={(e) => 24 + onShowChange((e.target as HTMLSelectElement).value as ShowOption)} 25 + class="bg-pink-50 dark:bg-pink-900 text-pink-900 dark:text-pink-100 border border-pink-200 dark:border-pink-700 rounded-full px-1 outline-none focus:border-pink-400 min-w-0" 26 + > 27 + {#each showOptions as option} 28 + <option value={option}>{option}</option> 29 + {/each} 30 + </select> 31 + </div>
+5 -10
client/src/lib/components/SortControls.svelte
··· 17 17 </script> 18 18 19 19 <div 20 - class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 hover:bg-purple-200 border-purple-300" 20 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 dark:bg-purple-800 hover:bg-purple-200 dark:hover:bg-purple-700 border-purple-300 dark:border-purple-700" 21 21 > 22 - <label for="sort-by" class="text-purple-800 mr-1"> sort by: </label> 22 + <label for="sort-by" class="text-purple-800 dark:text-purple-300 mr-1"> 23 + sort by: 24 + </label> 23 25 <select 24 26 id="sort-by" 25 27 value={sortBy} 26 28 onchange={(e) => 27 29 onSortChange((e.target as HTMLSelectElement).value as SortOption)} 28 - class="bg-purple-50 text-purple-900 border border-purple-200 rounded-full px-1 outline-none focus:bg-white focus:border-purple-400 min-w-0" 30 + class="bg-purple-50 dark:bg-purple-900 text-purple-900 dark:text-purple-300 border border-purple-200 dark:border-purple-700 rounded-full px-1 outline-none focus:border-purple-400 min-w-0" 29 31 > 30 32 {#each sortOptions as option} 31 33 <option value={option.value}>{option.label}</option> 32 34 {/each} 33 35 </select> 34 36 </div> 35 - 36 - <style lang="postcss"> 37 - @reference "../../app.css"; 38 - .wsbadge { 39 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 40 - } 41 - </style>
+16 -16
client/src/lib/components/StatsCard.svelte
··· 3 3 4 4 const colorClasses = { 5 5 green: { 6 - bg: "from-green-50 to-green-100", 7 - border: "border-green-200", 8 - titleText: "text-green-700", 9 - valueText: "text-green-900", 6 + bg: "from-green-50 to-green-100 dark:from-green-900 dark:to-green-800", 7 + border: "border-green-200 dark:border-green-800", 8 + titleText: "text-green-700 dark:text-green-400", 9 + valueText: "text-green-900 dark:text-green-200", 10 10 }, 11 11 red: { 12 - bg: "from-red-50 to-red-100", 13 - border: "border-red-200", 14 - titleText: "text-red-700", 15 - valueText: "text-red-900", 12 + bg: "from-red-50 to-red-100 dark:from-red-900 dark:to-red-800", 13 + border: "border-red-200 dark:border-red-800", 14 + titleText: "text-red-700 dark:text-red-400", 15 + valueText: "text-red-900 dark:text-red-200", 16 16 }, 17 17 turqoise: { 18 - bg: "from-teal-50 to-teal-100", 19 - border: "border-teal-200", 20 - titleText: "text-teal-700", 21 - valueText: "text-teal-900", 18 + bg: "from-teal-50 to-teal-100 dark:from-teal-900 dark:to-teal-800", 19 + border: "border-teal-200 dark:border-teal-800", 20 + titleText: "text-teal-700 dark:text-teal-400", 21 + valueText: "text-teal-900 dark:text-teal-200", 22 22 }, 23 23 orange: { 24 - bg: "from-orange-50 to-orange-100", 25 - border: "border-orange-200", 26 - titleText: "text-orange-700", 27 - valueText: "text-orange-900", 24 + bg: "from-orange-50 to-orange-100 dark:from-orange-900 dark:to-orange-800", 25 + border: "border-orange-200 dark:border-orange-800", 26 + titleText: "text-orange-700 dark:text-orange-400", 27 + valueText: "text-orange-900 dark:text-orange-200", 28 28 }, 29 29 }; 30 30
+18 -14
client/src/lib/components/StatusBadge.svelte
··· 8 8 const statusConfig = { 9 9 connected: { 10 10 text: "stream live", 11 - classes: "bg-green-100 text-green-800 border-green-200", 11 + classes: 12 + "bg-green-100 dark:bg-green-900 text-green-800 dark:text-green-200 border-green-200 dark:border-green-800", 12 13 }, 13 14 connecting: { 14 15 text: "stream connecting", 15 - classes: "bg-yellow-100 text-yellow-800 border-yellow-200", 16 + classes: 17 + "bg-yellow-100 dark:bg-yellow-900 text-yellow-800 dark:text-yellow-200 border-yellow-200 dark:border-yellow-800", 16 18 }, 17 19 error: { 18 20 text: "stream errored", 19 - classes: "bg-red-100 text-red-800 border-red-200", 21 + classes: 22 + "bg-red-100 dark:bg-red-900 text-red-800 dark:text-red-200 border-red-200 dark:border-red-800", 20 23 }, 21 24 disconnected: { 22 25 text: "stream offline", 23 - classes: "bg-gray-100 text-gray-800 border-gray-200", 26 + classes: 27 + "bg-gray-100 dark:bg-gray-900 text-gray-800 dark:text-gray-200 border-gray-200 dark:border-gray-800", 24 28 }, 25 29 }; 26 30 27 31 const config = $derived(statusConfig[status]); 28 32 </script> 29 33 30 - <span class="wsbadge {config.classes}"> 31 - {config.text} 32 - </span> 33 - 34 - <style lang="postcss"> 35 - @reference "../../app.css"; 36 - .wsbadge { 37 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 38 - } 39 - </style> 34 + <div class="flex flex-row items-center gap-2 wsbadge {config.classes}"> 35 + <!-- connecting spinner --> 36 + {#if status === "connecting"} 37 + <div 38 + class="animate-spin rounded-full h-4 w-4 border-b-2 border-yellow-800 dark:border-yellow-200" 39 + ></div> 40 + {/if} 41 + <!-- status text --> 42 + <span>{config.text}</span> 43 + </div>
+1
client/src/lib/types.ts
··· 18 18 }; 19 19 20 20 export type SortOption = "total" | "created" | "deleted" | "date"; 21 + export type ShowOption = "server init" | "stream start";
+3 -2
client/src/routes/+layout.ts
··· 1 - export const prerender = true; 2 - export const ssr = false; 1 + export const prerender = false; 2 + export const ssr = true; 3 + export const csr = true;
+7
client/src/routes/+page.server.ts
··· 1 + import { fetchEvents, fetchTrackingSince } from "$lib/api"; 2 + 3 + export const load = async () => { 4 + const events = await fetchEvents(); 5 + const trackingSince = await fetchTrackingSince(); 6 + return { events, trackingSince }; 7 + };
+109 -51
client/src/routes/+page.svelte
··· 1 1 <script lang="ts"> 2 2 import { dev } from "$app/environment"; 3 - import type { EventRecord, NsidCount, SortOption } from "$lib/types"; 3 + import type { 4 + EventRecord, 5 + Events, 6 + NsidCount, 7 + ShowOption, 8 + Since, 9 + SortOption, 10 + } from "$lib/types"; 4 11 import { onMount, onDestroy } from "svelte"; 5 - import { writable } from "svelte/store"; 12 + import { get, writable } from "svelte/store"; 6 13 import { PUBLIC_API_URL } from "$env/static/public"; 7 14 import { fetchEvents, fetchTrackingSince } from "$lib/api"; 8 15 import { createRegexFilter } from "$lib/filter"; ··· 14 21 import BskyToggle from "$lib/components/BskyToggle.svelte"; 15 22 import RefreshControl from "$lib/components/RefreshControl.svelte"; 16 23 import { formatTimestamp } from "$lib/format"; 24 + import ShowControls from "$lib/components/ShowControls.svelte"; 25 + 26 + type Props = { 27 + data: { events: Events; trackingSince: Since }; 28 + }; 17 29 18 - const events = writable(new Map<string, EventRecord>()); 30 + const { data }: Props = $props(); 31 + 32 + const events = writable( 33 + new Map<string, EventRecord>(Object.entries(data.events.events)), 34 + ); 35 + const eventsStart = new Map<string, EventRecord>( 36 + Object.entries(data.events.events), 37 + ); 19 38 const pendingUpdates = new Map<string, EventRecord>(); 20 - let eventsList: NsidCount[] = $state([]); 39 + 21 40 let updateTimer: NodeJS.Timeout | null = null; 22 - events.subscribe((value) => { 23 - eventsList = value 24 - .entries() 25 - .map(([nsid, event]) => ({ 26 - nsid, 27 - ...event, 28 - })) 29 - .toArray(); 30 - }); 31 - let per_second = $state(0); 32 - let tracking_since = $state(0); 41 + let per_second = $state(data.events.per_second); 42 + let tracking_since = $state(data.trackingSince.since); 33 43 44 + const diffEvents = ( 45 + oldEvents: Map<string, EventRecord>, 46 + newEvents: Map<string, EventRecord>, 47 + ): NsidCount[] => { 48 + const nsidCounts: NsidCount[] = []; 49 + for (const [nsid, event] of newEvents.entries()) { 50 + const oldEvent = oldEvents.get(nsid); 51 + if (oldEvent) { 52 + const counts = { 53 + nsid, 54 + count: event.count - oldEvent.count, 55 + deleted_count: event.deleted_count - oldEvent.deleted_count, 56 + last_seen: event.last_seen, 57 + }; 58 + if (counts.count > 0 || counts.deleted_count > 0) 59 + nsidCounts.push(counts); 60 + } else { 61 + nsidCounts.push({ 62 + nsid, 63 + ...event, 64 + }); 65 + } 66 + } 67 + return nsidCounts; 68 + }; 69 + const applyEvents = (newEvents: Record<string, EventRecord>) => { 70 + events.update((map) => { 71 + for (const [nsid, event] of Object.entries(newEvents)) { 72 + map.set(nsid, event); 73 + } 74 + return map; 75 + }); 76 + }; 77 + 78 + let error: string | null = $state(null); 79 + let filterRegex = $state(""); 80 + let dontShowBsky = $state(false); 81 + let sortBy: SortOption = $state("total"); 82 + let refreshRate = $state(""); 83 + let changedByUser = $state(false); 84 + let show: ShowOption = $state("server init"); 85 + let eventsList: NsidCount[] = $state([]); 86 + let updateEventsList = $derived((value: Map<string, EventRecord>) => { 87 + switch (show) { 88 + case "server init": 89 + eventsList = value 90 + .entries() 91 + .map(([nsid, event]) => ({ 92 + nsid, 93 + ...event, 94 + })) 95 + .toArray(); 96 + break; 97 + case "stream start": 98 + eventsList = diffEvents(eventsStart, value); 99 + break; 100 + } 101 + }); 102 + events.subscribe((value) => updateEventsList(value)); 34 103 let all: EventRecord = $derived( 35 104 eventsList.reduce( 36 105 (acc, event) => { ··· 50 119 }, 51 120 ), 52 121 ); 53 - let error: string | null = $state(null); 54 - let filterRegex = $state(""); 55 - let dontShowBsky = $state(false); 56 - let sortBy: SortOption = $state("total"); 57 - let refreshRate = $state(""); 58 - let changedByUser = $state(false); 59 122 60 123 let websocket: WebSocket | null = null; 61 124 let isStreamOpen = $state(false); ··· 76 139 }; 77 140 websocket.onmessage = async (event) => { 78 141 const jsonData = JSON.parse(event.data); 79 - 80 - if (jsonData.per_second > 0) { 81 - per_second = jsonData.per_second; 82 - } 83 - 84 - // Store updates in pending map if refresh rate is set 142 + per_second = jsonData.per_second; 85 143 if (refreshRate) { 86 144 for (const [nsid, event] of Object.entries(jsonData.events)) { 87 145 pendingUpdates.set(nsid, event as EventRecord); 88 146 } 89 147 } else { 90 - // Apply updates immediately if no refresh rate 91 - events.update((map) => { 92 - for (const [nsid, event] of Object.entries( 93 - jsonData.events, 94 - )) { 95 - map.set(nsid, event as EventRecord); 96 - } 97 - return map; 98 - }); 148 + applyEvents(jsonData.events); 99 149 } 100 150 }; 101 151 websocket.onerror = (error) => { ··· 114 164 error = null; 115 165 const data = await fetchEvents(); 116 166 per_second = data.per_second; 117 - events.update((map) => { 118 - for (const [nsid, event] of Object.entries(data.events)) { 119 - map.set(nsid, event); 120 - } 121 - return map; 122 - }); 167 + applyEvents(data.events); 123 168 tracking_since = (await fetchTrackingSince()).since; 124 169 } catch (err) { 125 170 error = ··· 222 267 /> 223 268 </svelte:head> 224 269 225 - <header class="border-gray-300 border-b mb-4 pb-2"> 270 + <header 271 + class="bg-white dark:bg-gray-900 border-gray-300 dark:border-gray-950 border-b mb-4 pb-2" 272 + > 226 273 <div 227 274 class="px-2 md:ml-[19vw] mx-auto flex flex-wrap items-center text-center" 228 275 > 229 - <h1 class="text-4xl font-bold mr-4 text-gray-900">lexicon tracker</h1> 230 - <p class="text-lg mt-1 text-gray-600"> 276 + <h1 class="text-4xl font-bold mr-4 text-gray-900 dark:text-gray-200"> 277 + lexicon tracker 278 + </h1> 279 + <p class="text-lg mt-1 text-gray-600 dark:text-gray-300"> 231 280 tracks lexicons seen on the jetstream {tracking_since === 0 232 281 ? "" 233 282 : `(since: ${formatTimestamp(tracking_since)})`} 234 283 </p> 235 284 </div> 236 285 </header> 237 - <div class="md:max-w-[61vw] mx-auto p-2"> 286 + <div class="bg-white dark:bg-gray-900 md:max-w-[61vw] mx-auto p-2"> 238 287 <div class="min-w-fit grid grid-cols-2 xl:grid-cols-4 gap-2 2xl:gap-6 mb-8"> 239 288 <StatsCard 240 289 title="total creation" ··· 260 309 261 310 {#if error} 262 311 <div 263 - class="bg-red-100 border border-red-300 text-red-700 px-4 py-3 rounded-lg mb-6" 312 + class="bg-red-100 dark:bg-red-900 border border-red-300 dark:border-red-700 text-red-700 dark:text-red-200 px-4 py-3 rounded-lg mb-6" 264 313 > 265 314 <p>Error: {error}</p> 266 315 </div> ··· 269 318 {#if eventsList.length > 0} 270 319 <div class="mb-8"> 271 320 <div class="flex flex-wrap items-center gap-3 mb-3"> 272 - <h2 class="text-2xl font-bold text-gray-900">seen lexicons</h2> 321 + <h2 class="text-2xl font-bold text-gray-900 dark:text-gray-200"> 322 + seen lexicons 323 + </h2> 273 324 <StatusBadge status={websocketStatus} /> 274 325 </div> 275 326 <div class="flex flex-wrap items-center gap-1.5 mb-6"> ··· 291 342 refreshRate = ""; 292 343 }} 293 344 /> 345 + <ShowControls 346 + {show} 347 + onShowChange={(value: ShowOption) => { 348 + show = value; 349 + updateEventsList(get(events)); 350 + }} 351 + /> 294 352 <RefreshControl 295 353 {refreshRate} 296 354 onRefreshChange={(value) => { ··· 315 373 {/if} 316 374 </div> 317 375 318 - <footer class="py-2 border-t border-gray-200 text-center"> 319 - <p class="text-gray-600 text-sm"> 376 + <footer class="py-2 border-t border-gray-200 dark:border-gray-800 text-center"> 377 + <p class="text-gray-600 dark:text-gray-200 text-sm"> 320 378 source code <a 321 379 href="https://tangled.sh/@poor.dog/nsid-tracker" 322 380 target="_blank" 323 381 rel="noopener noreferrer" 324 - class="text-blue-600 hover:text-blue-800 underline" 382 + class="text-blue-600 dark:text-blue-400 hover:text-blue-800 dark:hover:text-blue-600 underline" 325 383 >@poor.dog/nsid-tracker</a 326 384 > 327 385 </p>
+1 -1
client/svelte.config.js
··· 1 - import adapter from "@sveltejs/adapter-static"; 1 + import adapter from "svelte-adapter-bun"; 2 2 import { vitePreprocess } from "@sveltejs/vite-plugin-svelte"; 3 3 4 4 /** @type {import('@sveltejs/kit').Config} */
+1 -1
nix/client-modules.nix
··· 8 8 9 9 src = ../client; 10 10 11 - outputHash = "sha256-t8PJFo+3XGkzmMNbw9Rf9cS5Ob5YtI8ucX3ay+u9a3M="; 11 + outputHash = "sha256-njwXk3u0NUsYWLv9EOdCltgQOjTVkcfu+D+0COSw/6I="; 12 12 outputHashAlgo = "sha256"; 13 13 outputHashMode = "recursive"; 14 14
+10 -2
nix/client.nix
··· 1 1 { 2 + lib, 2 3 stdenv, 3 4 makeBinaryWrapper, 4 5 bun, ··· 28 29 ''; 29 30 buildPhase = '' 30 31 runHook preBuild 31 - bun --prefer-offline run --bun build 32 + bun --prefer-offline run build 32 33 runHook postBuild 33 34 ''; 34 35 installPhase = '' 35 36 runHook preInstall 36 - mkdir -p $out 37 + 38 + mkdir -p $out/bin 37 39 cp -R ./build/* $out 40 + cp -R ./node_modules $out 41 + 42 + makeBinaryWrapper ${bun}/bin/bun $out/bin/website \ 43 + --prefix PATH : ${lib.makeBinPath [ bun ]} \ 44 + --add-flags "run --bun --no-install --cwd $out start" 45 + 38 46 runHook postInstall 39 47 ''; 40 48 }
+63 -28
server/Cargo.lock
··· 18 18 checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" 19 19 20 20 [[package]] 21 + name = "ahash" 22 + version = "0.8.12" 23 + source = "registry+https://github.com/rust-lang/crates.io-index" 24 + checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" 25 + dependencies = [ 26 + "cfg-if", 27 + "getrandom 0.3.3", 28 + "once_cell", 29 + "serde", 30 + "version_check", 31 + "zerocopy", 32 + ] 33 + 34 + [[package]] 21 35 name = "aho-corasick" 22 36 version = "1.1.3" 23 37 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 46 60 version = "1.0.98" 47 61 source = "registry+https://github.com/rust-lang/crates.io-index" 48 62 checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" 63 + 64 + [[package]] 65 + name = "arc-swap" 66 + version = "1.7.1" 67 + source = "registry+https://github.com/rust-lang/crates.io-index" 68 + checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" 49 69 50 70 [[package]] 51 71 name = "async-compression" ··· 307 327 version = "0.2.1" 308 328 source = "registry+https://github.com/rust-lang/crates.io-index" 309 329 checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" 310 - 311 - [[package]] 312 - name = "cmake" 313 - version = "0.1.54" 314 - source = "registry+https://github.com/rust-lang/crates.io-index" 315 - checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" 316 - dependencies = [ 317 - "cc", 318 - ] 319 330 320 331 [[package]] 321 332 name = "combine" ··· 1552 1563 name = "server" 1553 1564 version = "0.1.0" 1554 1565 dependencies = [ 1566 + "ahash", 1555 1567 "anyhow", 1568 + "arc-swap", 1556 1569 "async-trait", 1557 1570 "axum", 1558 1571 "axum-tws", ··· 1571 1584 "serde", 1572 1585 "serde_json", 1573 1586 "smol_str", 1574 - "snmalloc-rs", 1575 1587 "threadpool", 1588 + "tikv-jemallocator", 1576 1589 "tokio", 1577 1590 "tokio-util", 1578 1591 "tokio-websockets", ··· 1645 1658 ] 1646 1659 1647 1660 [[package]] 1648 - name = "snmalloc-rs" 1649 - version = "0.3.8" 1650 - source = "registry+https://github.com/rust-lang/crates.io-index" 1651 - checksum = "eb317153089fdfa4d8a2eec059d40a5a23c3bde43995ea23b19121c3f621e74a" 1652 - dependencies = [ 1653 - "snmalloc-sys", 1654 - ] 1655 - 1656 - [[package]] 1657 - name = "snmalloc-sys" 1658 - version = "0.3.8" 1659 - source = "registry+https://github.com/rust-lang/crates.io-index" 1660 - checksum = "065fea53d32bb77bc36cca466cb191f2e5216ebfd0ed360b1d64889ee6e559ea" 1661 - dependencies = [ 1662 - "cmake", 1663 - ] 1664 - 1665 - [[package]] 1666 1661 name = "socket2" 1667 1662 version = "0.5.10" 1668 1663 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1750 1745 checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" 1751 1746 dependencies = [ 1752 1747 "num_cpus", 1748 + ] 1749 + 1750 + [[package]] 1751 + name = "tikv-jemalloc-sys" 1752 + version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" 1753 + source = "registry+https://github.com/rust-lang/crates.io-index" 1754 + checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d" 1755 + dependencies = [ 1756 + "cc", 1757 + "libc", 1758 + ] 1759 + 1760 + [[package]] 1761 + name = "tikv-jemallocator" 1762 + version = "0.6.0" 1763 + source = "registry+https://github.com/rust-lang/crates.io-index" 1764 + checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865" 1765 + dependencies = [ 1766 + "libc", 1767 + "tikv-jemalloc-sys", 1753 1768 ] 1754 1769 1755 1770 [[package]] ··· 2385 2400 version = "0.8.15" 2386 2401 source = "registry+https://github.com/rust-lang/crates.io-index" 2387 2402 checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 2403 + 2404 + [[package]] 2405 + name = "zerocopy" 2406 + version = "0.8.26" 2407 + source = "registry+https://github.com/rust-lang/crates.io-index" 2408 + checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" 2409 + dependencies = [ 2410 + "zerocopy-derive", 2411 + ] 2412 + 2413 + [[package]] 2414 + name = "zerocopy-derive" 2415 + version = "0.8.26" 2416 + source = "registry+https://github.com/rust-lang/crates.io-index" 2417 + checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" 2418 + dependencies = [ 2419 + "proc-macro2", 2420 + "quote", 2421 + "syn", 2422 + ] 2388 2423 2389 2424 [[package]] 2390 2425 name = "zeroize"
+6 -1
server/Cargo.toml
··· 30 30 rayon = "1.10.0" 31 31 parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] } 32 32 rclite = "0.2.7" 33 - snmalloc-rs = "0.3.8" 33 + arc-swap = "1.7.1" 34 + ahash = { version = "0.8.12", features = ["serde"] } 35 + 36 + 37 + [target.'cfg(not(target_env = "msvc"))'.dependencies] 38 + tikv-jemallocator = "0.6"
+16 -18
server/src/api.rs
··· 1 1 use std::{ 2 - collections::HashMap, 3 2 fmt::Display, 4 3 net::SocketAddr, 5 4 ops::{Bound, Deref, RangeBounds}, 6 5 time::Duration, 7 6 }; 8 7 8 + use ahash::AHashMap; 9 9 use anyhow::anyhow; 10 10 use axum::{ 11 11 Json, Router, ··· 117 117 #[derive(Serialize)] 118 118 struct Events { 119 119 per_second: usize, 120 - events: HashMap<SmolStr, NsidCount>, 120 + events: AHashMap<SmolStr, NsidCount>, 121 121 } 122 122 123 123 async fn events(db: State<Arc<Db>>) -> AppResult<Json<Events>> { 124 - let mut events = HashMap::new(); 124 + let mut events = AHashMap::new(); 125 125 for result in db.get_counts() { 126 126 let (nsid, counts) = result?; 127 127 events.insert( ··· 176 176 ) -> AppResult<Json<Vec<Hit>>> { 177 177 let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded); 178 178 let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded); 179 - let maybe_hits = db 180 - .get_hits(&params.nsid, HitsRange { from, to }, MAX_HITS) 181 - .take(MAX_HITS); 182 - let mut hits = Vec::with_capacity(maybe_hits.size_hint().0); 183 179 184 - for maybe_hit in maybe_hits { 185 - let hit = maybe_hit?; 186 - let hit_data = hit.deser()?; 187 - 188 - hits.push(Hit { 189 - timestamp: hit.timestamp, 190 - deleted: hit_data.deleted, 191 - }); 192 - } 180 + db.get_hits(&params.nsid, HitsRange { from, to }, MAX_HITS) 181 + .take(MAX_HITS) 182 + .try_fold(Vec::with_capacity(MAX_HITS), |mut acc, hit| { 183 + let hit = hit?; 184 + let hit_data = hit.deser()?; 193 185 194 - Ok(Json(hits)) 186 + acc.push(Hit { 187 + timestamp: hit.timestamp, 188 + deleted: hit_data.deleted, 189 + }); 190 + Ok(acc) 191 + }) 192 + .map(Json) 195 193 } 196 194 197 195 async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response { ··· 200 198 (async move { 201 199 let mut listener = db.new_listener(); 202 200 let mut data = Events { 203 - events: HashMap::<SmolStr, NsidCount>::with_capacity(10), 201 + events: AHashMap::<SmolStr, NsidCount>::with_capacity(10), 204 202 per_second: 0, 205 203 }; 206 204 let mut updates = 0;
+36 -17
server/src/db/handle.rs
··· 1 1 use std::{ 2 2 fmt::Debug, 3 3 io::Cursor, 4 - ops::{Bound, Deref, RangeBounds}, 4 + ops::{Bound, RangeBounds}, 5 5 sync::atomic::{AtomicU64, Ordering as AtomicOrdering}, 6 6 time::Duration, 7 7 }; 8 8 9 9 use byteview::ByteView; 10 - use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice}; 10 + use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice, Snapshot}; 11 11 use itertools::Itertools; 12 12 use parking_lot::Mutex; 13 13 use rayon::iter::{IntoParallelIterator, ParallelIterator}; ··· 16 16 17 17 use crate::{ 18 18 db::{EventRecord, NsidHit, block}, 19 - error::AppResult, 20 - utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 19 + error::{AppError, AppResult}, 20 + utils::{ 21 + ArcRefCnt, ArcliteSwap, CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, 22 + varints_unsigned_encoded, 23 + }, 21 24 }; 22 25 23 26 pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; ··· 31 34 } 32 35 33 36 pub struct LexiconHandle { 34 - tree: Partition, 37 + write_tree: Partition, 38 + read_tree: ArcliteSwap<Snapshot>, 35 39 nsid: SmolStr, 36 40 buf: Arc<Mutex<Vec<EventRecord>>>, 37 41 last_insert: AtomicU64, // relaxed ··· 46 50 } 47 51 } 48 52 49 - impl Deref for LexiconHandle { 50 - type Target = Partition; 51 - 52 - fn deref(&self) -> &Self::Target { 53 - &self.tree 54 - } 55 - } 56 - 57 53 impl LexiconHandle { 58 54 pub fn new(keyspace: &Keyspace, nsid: &str) -> Self { 59 55 let opts = PartitionCreateOptions::default() 60 56 .block_size(1024 * 48) 61 57 .compression(fjall::CompressionType::Miniz(9)); 58 + let write_tree = keyspace.open_partition(nsid, opts).unwrap(); 59 + let read_tree = ArcliteSwap::new(ArcRefCnt::new(write_tree.snapshot())); 62 60 Self { 63 - tree: keyspace.open_partition(nsid, opts).unwrap(), 61 + write_tree, 62 + read_tree, 64 63 nsid: nsid.into(), 65 64 buf: Default::default(), 66 65 last_insert: AtomicU64::new(0), ··· 68 67 } 69 68 } 70 69 70 + #[inline(always)] 71 + pub fn read(&self) -> arc_swap::Guard<ArcRefCnt<Snapshot>> { 72 + self.read_tree.load() 73 + } 74 + 75 + #[inline(always)] 76 + pub fn update_tree(&self) { 77 + self.read_tree 78 + .store(ArcRefCnt::new(self.write_tree.snapshot())); 79 + } 80 + 81 + #[inline(always)] 71 82 pub fn span(&self) -> tracing::Span { 72 83 tracing::info_span!("handle", nsid = %self.nsid) 73 84 } 74 85 86 + #[inline(always)] 75 87 pub fn nsid(&self) -> &SmolStr { 76 88 &self.nsid 77 89 } 78 90 91 + #[inline(always)] 79 92 pub fn item_count(&self) -> usize { 80 93 self.buf.lock().len() 81 94 } ··· 122 135 let end_key = varints_unsigned_encoded([end_limit]); 123 136 124 137 let blocks_to_compact = self 125 - .tree 138 + .read() 126 139 .range(start_key..end_key) 127 140 .collect::<Result<Vec<_>, _>>()?; 128 141 if blocks_to_compact.len() < 2 { ··· 162 175 let end_blocks_size = new_blocks.len(); 163 176 164 177 for key in keys_to_delete { 165 - self.tree.remove(key.clone())?; 178 + self.write_tree.remove(key.clone())?; 166 179 } 167 180 for block in new_blocks { 168 - self.tree.insert(block.key, block.data)?; 181 + self.write_tree.insert(block.key, block.data)?; 169 182 } 170 183 171 184 let reduction = ··· 179 192 ); 180 193 181 194 Ok(()) 195 + } 196 + 197 + pub fn insert_block(&self, block: Block) -> AppResult<()> { 198 + self.write_tree 199 + .insert(block.key, block.data) 200 + .map_err(AppError::from) 182 201 } 183 202 184 203 pub fn encode_block_from_items(
+86 -45
server/src/db/mod.rs
··· 1 1 use std::{ 2 - collections::HashMap, 3 2 fmt::Debug, 4 3 io::Cursor, 5 4 ops::{Bound, Deref, RangeBounds}, ··· 8 7 u64, 9 8 }; 10 9 10 + use ahash::{AHashMap, AHashSet}; 11 11 use byteview::StrView; 12 12 use fjall::{Keyspace, Partition, PartitionCreateOptions}; 13 13 use itertools::{Either, Itertools}; ··· 72 72 } 73 73 74 74 pub struct DbInfo { 75 - pub nsids: HashMap<SmolStr, Vec<usize>>, 75 + pub nsids: AHashMap<SmolStr, Vec<usize>>, 76 76 pub disk_size: u64, 77 77 } 78 78 ··· 114 114 pub cfg: DbConfig, 115 115 pub ks: Keyspace, 116 116 counts: Partition, 117 - hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 117 + hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>, ahash::RandomState>, 118 118 sync_pool: threadpool::ThreadPool, 119 119 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 120 120 eps: RateTracker<100>, // 100 millis buckets ··· 165 165 pub fn sync(&self, all: bool) -> AppResult<()> { 166 166 let start = CLOCK.now(); 167 167 // prepare all the data 168 - let mut data = Vec::with_capacity(self.hits.len()); 168 + let nsids_len = self.hits.len(); 169 + let mut data = Vec::with_capacity(nsids_len); 170 + let mut nsids = AHashSet::with_capacity(nsids_len); 169 171 let _guard = scc::ebr::Guard::new(); 170 - for (_, handle) in self.hits.iter(&_guard) { 172 + for (nsid, handle) in self.hits.iter(&_guard) { 171 173 let mut nsid_data = Vec::with_capacity(2); 172 - let mut total_count = 0; 174 + // let mut total_count = 0; 173 175 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; 174 176 // if we disconnect for a long time, we want to sync all of what we 175 177 // have to avoid having many small blocks (even if we run compaction ··· 186 188 if count > 0 && (all || data_count > 0 || is_too_old) { 187 189 for _ in 0..data_count { 188 190 nsid_data.push((handle.clone(), block_size)); 189 - total_count += block_size; 191 + // total_count += block_size; 190 192 } 191 193 // only sync remainder if we haven't met block size 192 194 let remainder = count % block_size; 193 195 if (all || data_count == 0) && remainder > 0 { 194 196 nsid_data.push((handle.clone(), remainder)); 195 - total_count += remainder; 197 + // total_count += remainder; 196 198 } 197 199 } 198 200 let _span = handle.span().entered(); 199 201 if nsid_data.len() > 0 { 200 - tracing::info!( 201 - {blocks = %nsid_data.len(), count = %total_count}, 202 - "will encode & sync", 203 - ); 202 + // tracing::info!( 203 + // {blocks = %nsid_data.len(), count = %total_count}, 204 + // "will encode & sync", 205 + // ); 206 + nsids.insert(nsid.clone()); 204 207 data.push(nsid_data); 205 208 } 206 209 } ··· 228 231 for (block, handle) in chunk { 229 232 self.sync_pool.execute(move || { 230 233 let _span = handle.span().entered(); 231 - match handle.insert(block.key, block.data) { 234 + let written = block.written; 235 + match handle.insert_block(block) { 232 236 Ok(_) => { 233 - tracing::info!({count = %block.written}, "synced") 237 + tracing::info!({count = %written}, "synced") 234 238 } 235 239 Err(err) => tracing::error!({ err = %err }, "failed to sync block"), 236 240 } ··· 239 243 AppResult::Ok(()) 240 244 })?; 241 245 self.sync_pool.join(); 246 + 247 + // update snapshots for all (changed) handles 248 + for nsid in nsids { 249 + self.hits.peek_with(&nsid, |_, handle| handle.update_tree()); 250 + } 251 + 242 252 tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks"); 243 253 244 254 Ok(()) ··· 254 264 let Some(handle) = self.get_handle(nsid) else { 255 265 return Ok(()); 256 266 }; 257 - handle.compact(max_count, range, sort) 267 + handle.compact(max_count, range, sort)?; 268 + handle.update_tree(); 269 + Ok(()) 258 270 } 259 271 260 272 pub fn compact_all( ··· 300 312 } 301 313 302 314 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> { 315 + let mut seen_events = 0; 303 316 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() { 304 317 let mut counts = self.get_count(&key)?; 305 - let mut count = 0; 306 318 self.ensure_handle(&key).queue(chunk.inspect(|e| { 307 319 // increment count 308 320 counts.last_seen = e.timestamp; ··· 311 323 } else { 312 324 counts.count += 1; 313 325 } 314 - count += 1; 326 + seen_events += 1; 315 327 })); 316 - self.eps.observe(count); 317 328 self.insert_count(&key, &counts)?; 318 329 if self.event_broadcaster.receiver_count() > 0 { 319 330 let _ = self.event_broadcaster.send((key, counts)); 320 331 } 321 332 } 333 + self.eps.observe(seen_events); 322 334 Ok(()) 323 335 } 324 336 ··· 358 370 } 359 371 360 372 pub fn info(&self) -> AppResult<DbInfo> { 361 - let mut nsids = HashMap::new(); 373 + let mut nsids = AHashMap::new(); 362 374 for nsid in self.get_nsids() { 363 375 let Some(handle) = self.get_handle(&nsid) else { 364 376 continue; 365 377 }; 366 - let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| { 367 - let (key, value) = item?; 368 - let mut timestamps = Cursor::new(key); 369 - let start_timestamp = timestamps.read_varint()?; 370 - let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 371 - acc.push(decoder.item_count()); 372 - AppResult::Ok(acc) 373 - })?; 378 + let block_lens = handle 379 + .read() 380 + .iter() 381 + .rev() 382 + .try_fold(Vec::new(), |mut acc, item| { 383 + let (key, value) = item?; 384 + let mut timestamps = Cursor::new(key); 385 + let start_timestamp = timestamps.read_varint()?; 386 + let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 387 + acc.push(decoder.item_count()); 388 + AppResult::Ok(acc) 389 + })?; 374 390 nsids.insert(nsid.to_smolstr(), block_lens); 375 391 } 376 392 Ok(DbInfo { ··· 402 418 }; 403 419 404 420 // let mut ts = CLOCK.now(); 405 - let mut current_item_count = 0; 406 - let map_block = move |(key, val)| { 407 - if current_item_count > max_items { 408 - return Ok(None); 421 + let map_block = move |(res, current_item_count)| -> AppResult<(Option<_>, usize)> { 422 + if current_item_count >= max_items { 423 + return Ok((None, current_item_count)); 409 424 } 425 + let (key, val) = res?; 410 426 let mut key_reader = Cursor::new(key); 411 427 let start_timestamp = key_reader.read_varint::<u64>()?; 412 428 // let end_timestamp = key_reader.read_varint::<u64>()?; ··· 414 430 // tracing::info!( 415 431 // "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater" 416 432 // ); 417 - return Ok(None); 433 + return Ok((None, current_item_count)); 418 434 } 419 435 let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?; 420 - current_item_count += decoder.item_count(); 436 + let current_item_count = current_item_count + decoder.item_count(); 421 437 // tracing::info!( 422 438 // "took {}ns to get block with size {}", 423 439 // ts.elapsed().as_nanos(), 424 440 // decoder.item_count() 425 441 // ); 426 442 // ts = CLOCK.now(); 427 - Ok(Some( 428 - decoder 429 - .take_while(move |item| { 430 - item.as_ref().map_or(true, |item| { 431 - item.timestamp <= end_limit && item.timestamp >= start_limit 443 + Ok(( 444 + Some( 445 + decoder 446 + .take_while(move |item| { 447 + item.as_ref().map_or(true, |item| { 448 + item.timestamp <= end_limit && item.timestamp >= start_limit 449 + }) 432 450 }) 433 - }) 434 - .map(|res| res.map_err(AppError::from)), 451 + .map(|res| res.map_err(AppError::from)), 452 + ), 453 + current_item_count, 435 454 )) 436 455 }; 437 456 438 - let blocks = handle 457 + let (blocks, _counted) = handle 458 + .read() 439 459 .range(..end_key) 460 + .map(|res| res.map_err(AppError::from)) 440 461 .rev() 441 - .map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose()) 442 - .collect_vec(); 462 + .fold_while( 463 + (Vec::with_capacity(20), 0), 464 + |(mut blocks, current_item_count), res| { 465 + use itertools::FoldWhile::*; 443 466 444 - tracing::info!("got blocks with size {}", blocks.len()); 467 + match map_block((res, current_item_count)) { 468 + Ok((Some(block), current_item_count)) => { 469 + blocks.push(Ok(block)); 470 + Continue((blocks, current_item_count)) 471 + } 472 + Ok((None, current_item_count)) => Done((blocks, current_item_count)), 473 + Err(err) => { 474 + blocks.push(Err(err)); 475 + Done((blocks, current_item_count)) 476 + } 477 + } 478 + }, 479 + ) 480 + .into_inner(); 481 + 482 + // tracing::info!( 483 + // "got blocks with size {}, item count {counted}", 484 + // blocks.len() 485 + // ); 445 486 446 487 Either::Left(blocks.into_iter().rev().flatten().flatten()) 447 488 } ··· 452 493 let Some(handle) = self.get_handle("app.bsky.feed.like") else { 453 494 return Ok(0); 454 495 }; 455 - let Some((timestamps_raw, _)) = handle.first_key_value()? else { 496 + let Some((timestamps_raw, _)) = handle.read().first_key_value()? else { 456 497 return Ok(0); 457 498 }; 458 499 let mut timestamp_reader = Cursor::new(timestamps_raw);
+21 -11
server/src/jetstream.rs
··· 13 13 pub struct JetstreamClient { 14 14 stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>, 15 15 tls_connector: tokio_websockets::Connector, 16 - url: SmolStr, 16 + urls: Vec<SmolStr>, 17 17 } 18 18 19 19 impl JetstreamClient { 20 - pub fn new(url: &str) -> AppResult<Self> { 20 + pub fn new(urls: impl IntoIterator<Item = impl Into<SmolStr>>) -> AppResult<Self> { 21 21 Ok(Self { 22 22 stream: None, 23 23 tls_connector: tokio_websockets::Connector::new()?, 24 - url: SmolStr::new(url), 24 + urls: urls.into_iter().map(Into::into).collect(), 25 25 }) 26 26 } 27 27 28 28 pub async fn connect(&mut self) -> AppResult<()> { 29 - let (stream, _) = ClientBuilder::new() 30 - .connector(&self.tls_connector) 31 - .uri(&self.url)? 32 - .connect() 33 - .await?; 34 - self.stream = Some(stream); 35 - tracing::info!("connected to jetstream ({})", self.url); 36 - Ok(()) 29 + for uri in &self.urls { 30 + let conn_result = ClientBuilder::new() 31 + .connector(&self.tls_connector) 32 + .uri(uri)? 33 + .connect() 34 + .await; 35 + match conn_result { 36 + Ok((stream, _)) => { 37 + self.stream = Some(stream); 38 + tracing::info!("connected to jetstream {}", uri); 39 + return Ok(()); 40 + } 41 + Err(err) => { 42 + tracing::error!("failed to connect to jetstream {uri}: {err}"); 43 + } 44 + }; 45 + } 46 + Err(anyhow!("failed to connect to any jetstream server").into()) 37 47 } 38 48 39 49 // automatically retries connection, only returning error if it fails many times
+16 -10
server/src/main.rs
··· 21 21 mod jetstream; 22 22 mod utils; 23 23 24 + #[cfg(not(target_env = "msvc"))] 24 25 #[global_allocator] 25 - static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; 26 + static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; 26 27 27 28 #[tokio::main] 28 29 async fn main() { ··· 69 70 .install_default() 70 71 .expect("cant install rustls crypto provider"); 71 72 72 - let mut jetstream = 73 - match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") { 74 - Ok(client) => client, 75 - Err(err) => { 76 - tracing::error!("can't create jetstream client: {err}"); 77 - return; 78 - } 79 - }; 73 + let urls = [ 74 + "wss://jetstream1.us-west.bsky.network/subscribe", 75 + "wss://jetstream2.us-west.bsky.network/subscribe", 76 + "wss://jetstream2.fr.hose.cam/subscribe", 77 + "wss://jetstream.fire.hose.cam/subscribe", 78 + ]; 79 + let mut jetstream = match JetstreamClient::new(urls) { 80 + Ok(client) => client, 81 + Err(err) => { 82 + tracing::error!("can't create jetstream client: {err}"); 83 + return; 84 + } 85 + }; 80 86 81 87 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 82 88 let consume_events = tokio::spawn({ ··· 105 111 move || { 106 112 let mut buffer = Vec::new(); 107 113 loop { 108 - let read = event_rx.blocking_recv_many(&mut buffer, 100); 114 + let read = event_rx.blocking_recv_many(&mut buffer, 500); 109 115 if let Err(err) = db.ingest_events(buffer.drain(..)) { 110 116 tracing::error!("failed to ingest events: {}", err); 111 117 }
+66
server/src/utils.rs
··· 1 1 use std::io::{self, Read, Write}; 2 + use std::ops::Deref; 2 3 use std::sync::atomic::{AtomicU64, Ordering}; 3 4 use std::time::Duration; 4 5 6 + use arc_swap::RefCnt; 5 7 use byteview::ByteView; 6 8 use ordered_varint::Variable; 9 + use rclite::Arc; 7 10 8 11 pub fn get_time() -> Duration { 9 12 std::time::SystemTime::now() ··· 320 323 } 321 324 } 322 325 } 326 + 327 + pub type ArcliteSwap<T> = arc_swap::ArcSwapAny<ArcRefCnt<T>>; 328 + 329 + pub struct ArcRefCnt<T>(Arc<T>); 330 + 331 + impl<T> ArcRefCnt<T> { 332 + pub fn new(value: T) -> Self { 333 + Self(Arc::new(value)) 334 + } 335 + } 336 + 337 + impl<T> Deref for ArcRefCnt<T> { 338 + type Target = T; 339 + 340 + fn deref(&self) -> &Self::Target { 341 + &self.0 342 + } 343 + } 344 + 345 + impl<T> Clone for ArcRefCnt<T> { 346 + fn clone(&self) -> Self { 347 + Self(self.0.clone()) 348 + } 349 + } 350 + 351 + // SAFETY: uhhhhhhhh copied the Arc impl from arc_swap xd 352 + unsafe impl<T> RefCnt for ArcRefCnt<T> { 353 + type Base = T; 354 + 355 + fn into_ptr(me: Self) -> *mut Self::Base { 356 + Arc::into_raw(me.0) as *mut T 357 + } 358 + 359 + fn as_ptr(me: &Self) -> *mut Self::Base { 360 + // Slightly convoluted way to do this, but this avoids stacked borrows violations. The same 361 + // intention as 362 + // 363 + // me as &T as *const T as *mut T 364 + // 365 + // We first create a "shallow copy" of me - one that doesn't really own its ref count 366 + // (that's OK, me _does_ own it, so it can't be destroyed in the meantime). 367 + // Then we can use into_raw (which preserves not having the ref count). 368 + // 369 + // We need to "revert" the changes we did. In current std implementation, the combination 370 + // of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw 371 + // and that read shall be paired with forget to properly "close the brackets". In future 372 + // versions of STD, these may become something else that's not really no-op (unlikely, but 373 + // possible), so we future-proof it a bit. 374 + 375 + // SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads 376 + let ptr = Arc::into_raw(unsafe { std::ptr::read(&me.0) }); 377 + let ptr = ptr as *mut T; 378 + 379 + // SAFETY: We got the pointer from into_raw just above 380 + std::mem::forget(unsafe { Arc::from_raw(ptr) }); 381 + 382 + ptr 383 + } 384 + 385 + unsafe fn from_ptr(ptr: *const Self::Base) -> Self { 386 + Self(unsafe { Arc::from_raw(ptr) }) 387 + } 388 + }