tracks lexicons and how many times they appeared on the jetstream

Compare changes

Choose any two refs to compare.

+3 -12
README.md
··· 1 - a webapp and server that monitors the jetstream and tracks the different 2 - lexicons as they are created or deleted. it shows you which collections are most 3 - active on the network. 1 + a webapp and server that monitors the jetstream and tracks the different lexicons as they are created or deleted. 2 + it shows you which collections are most active on the network. 4 3 5 4 for backend it uses rust with fjall as db, the frontend is built with sveltekit. 6 5 7 6 see [here](https://gaze.systems/nsid-tracker) for a hosted instance of it. 8 7 9 - ## performance / storage 10 - 11 - it uses about 50MB of space for 620M recorded events (events being just 12 - timestamp in seconds and deleted boolean for now). and around 50-60ms for 13 - querying 300-400k events. 14 - 15 - this is on a machine with AMD EPYC 7281 (32) @ 2.100GHz. 16 - 17 8 ## running 18 9 19 10 ### with nix 20 11 21 - - build the server: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#server` 12 + - run the server: `nix run git+https://tangled.sh/@poor.dog/nsid-tracker#server` 22 13 - build the client: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#client` 23 14 24 15 ### manually
-9
client/bun.lock
··· 5 5 "name": "nsid-tracker", 6 6 "dependencies": { 7 7 "@number-flow/svelte": "^0.3.9", 8 - "svelte-adapter-bun": "^0.5.2", 9 8 }, 10 9 "devDependencies": { 11 10 "@eslint/compat": "^1.2.5", ··· 354 353 355 354 "globals": ["globals@16.3.0", "", {}, "sha512-bqWEnJ1Nt3neqx2q5SFfGS8r/ahumIakg3HcwtNlrVlwXIeNumWn/c7Pn/wKzGhf6SaW6H6uWXLqC30STCMchQ=="], 356 355 357 - "globalyzer": ["globalyzer@0.1.0", "", {}, "sha512-40oNTM9UfG6aBmuKxk/giHn5nQ8RVz/SS4Ir6zgzOv9/qC3kKZ9v4etGTcJbEl/NyVQH7FGU7d+X1egr57Md2Q=="], 358 - 359 - "globrex": ["globrex@0.1.2", "", {}, "sha512-uHJgbwAMwNFf5mLst7IWLNg14x1CkeqglJb/K3doi4dw6q2IvAAmM/Y81kevy83wP+Sst+nutFTYOGg3d1lsxg=="], 360 - 361 356 "graceful-fs": ["graceful-fs@4.2.11", "", {}, "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="], 362 357 363 358 "graphemer": ["graphemer@1.4.0", "", {}, "sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag=="], ··· 516 511 517 512 "svelte": ["svelte@5.36.8", "", { "dependencies": { "@ampproject/remapping": "^2.3.0", "@jridgewell/sourcemap-codec": "^1.5.0", "@sveltejs/acorn-typescript": "^1.0.5", "@types/estree": "^1.0.5", "acorn": "^8.12.1", "aria-query": "^5.3.1", "axobject-query": "^4.1.0", "clsx": "^2.1.1", "esm-env": "^1.2.1", "esrap": "^2.1.0", "is-reference": "^3.0.3", "locate-character": "^3.0.0", "magic-string": "^0.30.11", "zimmerframe": "^1.1.2" } }, "sha512-8JbZWQu96hMjH/oYQPxXW6taeC6Awl6muGHeZzJTxQx7NGRQ/J9wN1hkzRKLOlSDlbS2igiFg7p5xyTp5uXG3A=="], 518 513 519 - "svelte-adapter-bun": ["svelte-adapter-bun@0.5.2", "", { "dependencies": { "tiny-glob": "^0.2.9" } }, "sha512-xEtFgaal6UgrCwwkSIcapO9kopoFNUYCYqyKCikdqxX9bz2TDYnrWQZ7qBnkunMxi1HOIERUCvTcebYGiarZLA=="], 520 - 521 514 "svelte-check": ["svelte-check@4.3.0", "", { "dependencies": { "@jridgewell/trace-mapping": "^0.3.25", "chokidar": "^4.0.1", "fdir": "^6.2.0", "picocolors": "^1.0.0", "sade": "^1.7.4" }, "peerDependencies": { "svelte": "^4.0.0 || ^5.0.0-next.0", "typescript": ">=5.0.0" }, "bin": { "svelte-check": "bin/svelte-check" } }, "sha512-Iz8dFXzBNAM7XlEIsUjUGQhbEE+Pvv9odb9+0+ITTgFWZBGeJRRYqHUUglwe2EkLD5LIsQaAc4IUJyvtKuOO5w=="], 522 515 523 516 "svelte-eslint-parser": ["svelte-eslint-parser@1.3.0", "", { "dependencies": { "eslint-scope": "^8.2.0", "eslint-visitor-keys": "^4.0.0", "espree": "^10.0.0", "postcss": "^8.4.49", "postcss-scss": "^4.0.9", "postcss-selector-parser": "^7.0.0" }, "peerDependencies": { "svelte": "^3.37.0 || ^4.0.0 || ^5.0.0" }, "optionalPeers": ["svelte"] }, "sha512-VCgMHKV7UtOGcGLGNFSbmdm6kEKjtzo5nnpGU/mnx4OsFY6bZ7QwRF5DUx+Hokw5Lvdyo8dpk8B1m8mliomrNg=="], ··· 527 520 "tapable": ["tapable@2.2.2", "", {}, "sha512-Re10+NauLTMCudc7T5WLFLAwDhQ0JWdrMK+9B2M8zR5hRExKmsRDCBA7/aV/pNJFltmBFO5BAMlQFi/vq3nKOg=="], 528 521 529 522 "tar": ["tar@7.4.3", "", { "dependencies": { "@isaacs/fs-minipass": "^4.0.0", "chownr": "^3.0.0", "minipass": "^7.1.2", "minizlib": "^3.0.1", "mkdirp": "^3.0.1", "yallist": "^5.0.0" } }, "sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw=="], 530 - 531 - "tiny-glob": ["tiny-glob@0.2.9", "", { "dependencies": { "globalyzer": "0.1.0", "globrex": "^0.1.2" } }, "sha512-g/55ssRPUjShh+xkfx9UPDXqhckHEsHr4Vd9zX55oSdGZc/MD0m3sferOkwWtp98bv+kcVfEHtRJgBVJzelrzg=="], 532 523 533 524 "tinyglobby": ["tinyglobby@0.2.14", "", { "dependencies": { "fdir": "^6.4.4", "picomatch": "^4.0.2" } }, "sha512-tX5e7OM1HnYr2+a2C/4V0htOcSQcoSTH9KgJnVvNm5zm/cyEWKJ7j7YutsH9CxMdtOkkLFy2AHrMci9IM8IPZQ=="], 534 525
+1 -2
client/package.json
··· 31 31 }, 32 32 "type": "module", 33 33 "dependencies": { 34 - "@number-flow/svelte": "^0.3.9", 35 - "svelte-adapter-bun": "^0.5.2" 34 + "@number-flow/svelte": "^0.3.9" 36 35 } 37 36 }
-4
client/src/app.css
··· 28 28 overflow-y: overlay; 29 29 overflow-y: auto; /* Fallback for browsers that don't support overlay */ 30 30 } 31 - 32 - .wsbadge { 33 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 34 - }
+9 -9
client/src/app.html
··· 1 1 <!doctype html> 2 2 <html lang="en"> 3 - <head> 4 - <meta charset="utf-8" /> 5 - <link rel="icon" href="%sveltekit.assets%/favicon.svg" /> 6 - <meta name="viewport" content="width=device-width, initial-scale=1" /> 7 - %sveltekit.head% 8 - </head> 9 - <body class="bg-white dark:bg-gray-900" data-sveltekit-preload-data="hover"> 10 - <div style="display: contents">%sveltekit.body%</div> 11 - </body> 3 + <head> 4 + <meta charset="utf-8" /> 5 + <link rel="icon" href="%sveltekit.assets%/favicon.svg" /> 6 + <meta name="viewport" content="width=device-width, initial-scale=1" /> 7 + %sveltekit.head% 8 + </head> 9 + <body data-sveltekit-preload-data="hover"> 10 + <div style="display: contents">%sveltekit.body%</div> 11 + </body> 12 12 </html>
+9 -2
client/src/lib/components/BskyToggle.svelte
··· 11 11 <!-- svelte-ignore a11y_no_static_element_interactions --> 12 12 <button 13 13 onclick={onBskyToggle} 14 - class="wsbadge !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700" 14 + class="wsbadge !mt-0 !font-normal bg-yellow-100 hover:bg-yellow-200 border-yellow-300" 15 15 > 16 16 <input checked={dontShowBsky} type="checkbox" /> 17 - <span class="ml-0.5 text-black dark:text-gray-200"> hide app.bsky.* </span> 17 + <span class="ml-0.5"> hide app.bsky.* </span> 18 18 </button> 19 + 20 + <style lang="postcss"> 21 + @reference "../../app.css"; 22 + .wsbadge { 23 + @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 24 + } 25 + </style>
+5 -8
client/src/lib/components/EventCard.svelte
··· 104 104 </script> 105 105 106 106 <div 107 - class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white dark:bg-gray-800/50 border border-gray-200 dark:border-gray-950 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform" 107 + class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white border border-gray-200 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform" 108 108 class:has-activity={isAnimating} 109 109 style="--border-thickness: {borderThickness}px" 110 110 > 111 111 <div class="flex items-start gap-2"> 112 112 <div 113 - class="text-sm font-bold text-blue-600 bg-blue-100 dark:bg-indigo-950 px-3 py-1 rounded-full" 113 + class="text-sm font-bold text-blue-600 bg-blue-100 px-3 py-1 rounded-full" 114 114 > 115 115 #{index + 1} 116 116 </div> 117 117 <div 118 118 title={event.nsid} 119 - class="font-mono text-sm text-gray-700 dark:text-gray-300 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 dark:group-hover:bg-gray-700 border-gray-100 dark:border-gray-900 group-hover:border transition-all px-1" 119 + class="font-mono text-sm text-gray-700 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 border-gray-100 group-hover:border transition-all px-1" 120 120 > 121 121 {event.nsid} 122 122 </div> ··· 136 136 </div> 137 137 </div> 138 138 139 - <style lang="postcss"> 139 + <style> 140 140 .has-activity { 141 141 position: relative; 142 142 transition: all 0.2s ease-out; 143 143 } 144 144 145 145 .has-activity::before { 146 - @reference "../../app.css"; 147 - @apply border-blue-500 dark:border-blue-800; 148 146 content: ""; 149 147 position: absolute; 150 148 top: calc(-1 * var(--border-thickness)); 151 149 left: calc(-1 * var(--border-thickness)); 152 150 right: calc(-1 * var(--border-thickness)); 153 151 bottom: calc(-1 * var(--border-thickness)); 154 - border-width: var(--border-thickness); 155 - border-style: solid; 152 + border: var(--border-thickness) solid rgba(59, 130, 246, 0.8); 156 153 border-radius: calc(0.5rem + var(--border-thickness)); 157 154 pointer-events: none; 158 155 transition: all 0.3s ease-out;
+10 -5
client/src/lib/components/FilterControls.svelte
··· 8 8 </script> 9 9 10 10 <div 11 - class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700" 11 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 hover:bg-blue-200 border-blue-300" 12 12 > 13 - <label for="filter-regex" class="text-blue-800 dark:text-gray-200 mr-1"> 14 - filter: 15 - </label> 13 + <label for="filter-regex" class="text-blue-800 mr-1"> filter: </label> 16 14 <input 17 15 id="filter-regex" 18 16 value={filterRegex} 19 17 oninput={(e) => onFilterChange((e.target as HTMLInputElement).value)} 20 18 type="text" 21 19 placeholder="regex..." 22 - class="bg-blue-50 dark:bg-blue-950 text-blue-900 dark:text-gray-400 placeholder-blue-400 dark:placeholder-blue-700 border border-blue-200 dark:border-blue-700 rounded-full px-1 outline-none focus:border-blue-400 min-w-0 w-24" 20 + class="bg-blue-50 text-blue-900 placeholder-blue-400 border border-blue-200 rounded-full px-1 outline-none focus:bg-white focus:border-blue-400 min-w-0 w-24" 23 21 /> 24 22 </div> 23 + 24 + <style lang="postcss"> 25 + @reference "../../app.css"; 26 + .wsbadge { 27 + @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 28 + } 29 + </style>
+11 -6
client/src/lib/components/RefreshControl.svelte
··· 8 8 </script> 9 9 10 10 <div 11 - class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-lime-100 dark:bg-lime-900 dark:hover:bg-lime-800 hover:bg-lime-200 border-lime-300 dark:border-lime-700" 11 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-green-100 hover:bg-green-200 border-green-300" 12 12 > 13 - <label for="refresh-rate" class="text-lime-800 dark:text-lime-200 mr-1" 14 - >refresh:</label 15 - > 13 + <label for="refresh-rate" class="text-green-800 mr-1">refresh:</label> 16 14 <input 17 15 id="refresh-rate" 18 16 value={refreshRate} ··· 26 24 pattern="[0-9]*" 27 25 min="0" 28 26 placeholder="real-time" 29 - class="bg-green-50 dark:bg-green-900 text-lime-900 dark:text-lime-200 placeholder-lime-600 dark:placeholder-lime-400 border border-lime-200 dark:border-lime-700 rounded-full px-1 outline-none focus:border-lime-400 min-w-0 w-20" 27 + class="bg-green-50 text-green-900 placeholder-green-400 border border-green-200 rounded-full px-1 outline-none focus:bg-white focus:border-green-400 min-w-0 w-20" 30 28 /> 31 - <span class="text-lime-800 dark:text-lime-200">s</span> 29 + <span class="text-green-700">s</span> 32 30 </div> 31 + 32 + <style lang="postcss"> 33 + @reference "../../app.css"; 34 + .wsbadge { 35 + @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 36 + } 37 + </style>
-31
client/src/lib/components/ShowControls.svelte
··· 1 - <script lang="ts"> 2 - import type { ShowOption } from "$lib/types"; 3 - 4 - interface Props { 5 - show: ShowOption; 6 - onShowChange: (value: ShowOption) => void; 7 - } 8 - 9 - let { show, onShowChange }: Props = $props(); 10 - 11 - const showOptions: ShowOption[] = ["server init", "stream start"]; 12 - </script> 13 - 14 - <div 15 - class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-pink-100 dark:bg-pink-800 hover:bg-pink-200 dark:hover:bg-pink-700 border-pink-300 dark:border-pink-700" 16 - > 17 - <label for="show" class="text-pink-800 dark:text-pink-100 mr-1"> 18 - show since: 19 - </label> 20 - <select 21 - id="show" 22 - value={show} 23 - onchange={(e) => 24 - onShowChange((e.target as HTMLSelectElement).value as ShowOption)} 25 - class="bg-pink-50 dark:bg-pink-900 text-pink-900 dark:text-pink-100 border border-pink-200 dark:border-pink-700 rounded-full px-1 outline-none focus:border-pink-400 min-w-0" 26 - > 27 - {#each showOptions as option} 28 - <option value={option}>{option}</option> 29 - {/each} 30 - </select> 31 - </div>
+10 -5
client/src/lib/components/SortControls.svelte
··· 17 17 </script> 18 18 19 19 <div 20 - class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 dark:bg-purple-800 hover:bg-purple-200 dark:hover:bg-purple-700 border-purple-300 dark:border-purple-700" 20 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 hover:bg-purple-200 border-purple-300" 21 21 > 22 - <label for="sort-by" class="text-purple-800 dark:text-purple-300 mr-1"> 23 - sort by: 24 - </label> 22 + <label for="sort-by" class="text-purple-800 mr-1"> sort by: </label> 25 23 <select 26 24 id="sort-by" 27 25 value={sortBy} 28 26 onchange={(e) => 29 27 onSortChange((e.target as HTMLSelectElement).value as SortOption)} 30 - class="bg-purple-50 dark:bg-purple-900 text-purple-900 dark:text-purple-300 border border-purple-200 dark:border-purple-700 rounded-full px-1 outline-none focus:border-purple-400 min-w-0" 28 + class="bg-purple-50 text-purple-900 border border-purple-200 rounded-full px-1 outline-none focus:bg-white focus:border-purple-400 min-w-0" 31 29 > 32 30 {#each sortOptions as option} 33 31 <option value={option.value}>{option.label}</option> 34 32 {/each} 35 33 </select> 36 34 </div> 35 + 36 + <style lang="postcss"> 37 + @reference "../../app.css"; 38 + .wsbadge { 39 + @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 40 + } 41 + </style>
+18 -17
client/src/lib/components/StatsCard.svelte
··· 1 1 <script lang="ts"> 2 2 import { formatNumber } from "$lib/format"; 3 + import NumberFlow from "@number-flow/svelte"; 3 4 4 5 const colorClasses = { 5 6 green: { 6 - bg: "from-green-50 to-green-100 dark:from-green-900 dark:to-green-800", 7 - border: "border-green-200 dark:border-green-800", 8 - titleText: "text-green-700 dark:text-green-400", 9 - valueText: "text-green-900 dark:text-green-200", 7 + bg: "from-green-50 to-green-100", 8 + border: "border-green-200", 9 + titleText: "text-green-700", 10 + valueText: "text-green-900", 10 11 }, 11 12 red: { 12 - bg: "from-red-50 to-red-100 dark:from-red-900 dark:to-red-800", 13 - border: "border-red-200 dark:border-red-800", 14 - titleText: "text-red-700 dark:text-red-400", 15 - valueText: "text-red-900 dark:text-red-200", 13 + bg: "from-red-50 to-red-100", 14 + border: "border-red-200", 15 + titleText: "text-red-700", 16 + valueText: "text-red-900", 16 17 }, 17 18 turqoise: { 18 - bg: "from-teal-50 to-teal-100 dark:from-teal-900 dark:to-teal-800", 19 - border: "border-teal-200 dark:border-teal-800", 20 - titleText: "text-teal-700 dark:text-teal-400", 21 - valueText: "text-teal-900 dark:text-teal-200", 19 + bg: "from-teal-50 to-teal-100", 20 + border: "border-teal-200", 21 + titleText: "text-teal-700", 22 + valueText: "text-teal-900", 22 23 }, 23 24 orange: { 24 - bg: "from-orange-50 to-orange-100 dark:from-orange-900 dark:to-orange-800", 25 - border: "border-orange-200 dark:border-orange-800", 26 - titleText: "text-orange-700 dark:text-orange-400", 27 - valueText: "text-orange-900 dark:text-orange-200", 25 + bg: "from-orange-50 to-orange-100", 26 + border: "border-orange-200", 27 + titleText: "text-orange-700", 28 + valueText: "text-orange-900", 28 29 }, 29 30 }; 30 31 ··· 44 45 {title} 45 46 </h3> 46 47 <p class="text-xl md:text-2xl font-bold {colors.valueText}"> 47 - {formatNumber(value)} 48 + <NumberFlow {value} /> 48 49 </p> 49 50 </div>
+14 -18
client/src/lib/components/StatusBadge.svelte
··· 8 8 const statusConfig = { 9 9 connected: { 10 10 text: "stream live", 11 - classes: 12 - "bg-green-100 dark:bg-green-900 text-green-800 dark:text-green-200 border-green-200 dark:border-green-800", 11 + classes: "bg-green-100 text-green-800 border-green-200", 13 12 }, 14 13 connecting: { 15 14 text: "stream connecting", 16 - classes: 17 - "bg-yellow-100 dark:bg-yellow-900 text-yellow-800 dark:text-yellow-200 border-yellow-200 dark:border-yellow-800", 15 + classes: "bg-yellow-100 text-yellow-800 border-yellow-200", 18 16 }, 19 17 error: { 20 18 text: "stream errored", 21 - classes: 22 - "bg-red-100 dark:bg-red-900 text-red-800 dark:text-red-200 border-red-200 dark:border-red-800", 19 + classes: "bg-red-100 text-red-800 border-red-200", 23 20 }, 24 21 disconnected: { 25 22 text: "stream offline", 26 - classes: 27 - "bg-gray-100 dark:bg-gray-900 text-gray-800 dark:text-gray-200 border-gray-200 dark:border-gray-800", 23 + classes: "bg-gray-100 text-gray-800 border-gray-200", 28 24 }, 29 25 }; 30 26 31 27 const config = $derived(statusConfig[status]); 32 28 </script> 33 29 34 - <div class="flex flex-row items-center gap-2 wsbadge {config.classes}"> 35 - <!-- connecting spinner --> 36 - {#if status === "connecting"} 37 - <div 38 - class="animate-spin rounded-full h-4 w-4 border-b-2 border-yellow-800 dark:border-yellow-200" 39 - ></div> 40 - {/if} 41 - <!-- status text --> 42 - <span>{config.text}</span> 43 - </div> 30 + <span class="wsbadge {config.classes}"> 31 + {config.text} 32 + </span> 33 + 34 + <style lang="postcss"> 35 + @reference "../../app.css"; 36 + .wsbadge { 37 + @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 38 + } 39 + </style>
+1 -5
client/src/lib/format.ts
··· 2 2 return num.toLocaleString(); 3 3 }; 4 4 5 - const isValidDate = (d: Date) => d instanceof Date && !isNaN(d.getTime()); 6 5 export const formatTimestamp = (timestamp: number): string => { 7 - const date = new Date(timestamp * 1000); 8 - return isValidDate(date) 9 - ? date.toLocaleString() 10 - : new Date(timestamp / 1000).toLocaleString(); 6 + return new Date(timestamp / 1000).toLocaleString(); 11 7 };
-1
client/src/lib/types.ts
··· 18 18 }; 19 19 20 20 export type SortOption = "total" | "created" | "deleted" | "date"; 21 - export type ShowOption = "server init" | "stream start";
+2 -3
client/src/routes/+layout.ts
··· 1 - export const prerender = false; 2 - export const ssr = true; 3 - export const csr = true; 1 + export const prerender = true; 2 + export const ssr = false;
-7
client/src/routes/+page.server.ts
··· 1 - import { fetchEvents, fetchTrackingSince } from "$lib/api"; 2 - 3 - export const load = async () => { 4 - const events = await fetchEvents(); 5 - const trackingSince = await fetchTrackingSince(); 6 - return { events, trackingSince }; 7 - };
+51 -109
client/src/routes/+page.svelte
··· 1 1 <script lang="ts"> 2 2 import { dev } from "$app/environment"; 3 - import type { 4 - EventRecord, 5 - Events, 6 - NsidCount, 7 - ShowOption, 8 - Since, 9 - SortOption, 10 - } from "$lib/types"; 3 + import type { EventRecord, NsidCount, SortOption } from "$lib/types"; 11 4 import { onMount, onDestroy } from "svelte"; 12 - import { get, writable } from "svelte/store"; 5 + import { writable } from "svelte/store"; 13 6 import { PUBLIC_API_URL } from "$env/static/public"; 14 7 import { fetchEvents, fetchTrackingSince } from "$lib/api"; 15 8 import { createRegexFilter } from "$lib/filter"; ··· 21 14 import BskyToggle from "$lib/components/BskyToggle.svelte"; 22 15 import RefreshControl from "$lib/components/RefreshControl.svelte"; 23 16 import { formatTimestamp } from "$lib/format"; 24 - import ShowControls from "$lib/components/ShowControls.svelte"; 25 17 26 - type Props = { 27 - data: { events: Events; trackingSince: Since }; 28 - }; 29 - 30 - const { data }: Props = $props(); 31 - 32 - const events = writable( 33 - new Map<string, EventRecord>(Object.entries(data.events.events)), 34 - ); 35 - const eventsStart = new Map<string, EventRecord>( 36 - Object.entries(data.events.events), 37 - ); 18 + const events = writable(new Map<string, EventRecord>()); 38 19 const pendingUpdates = new Map<string, EventRecord>(); 39 - 40 - let updateTimer: NodeJS.Timeout | null = null; 41 - let per_second = $state(data.events.per_second); 42 - let tracking_since = $state(data.trackingSince.since); 43 - 44 - const diffEvents = ( 45 - oldEvents: Map<string, EventRecord>, 46 - newEvents: Map<string, EventRecord>, 47 - ): NsidCount[] => { 48 - const nsidCounts: NsidCount[] = []; 49 - for (const [nsid, event] of newEvents.entries()) { 50 - const oldEvent = oldEvents.get(nsid); 51 - if (oldEvent) { 52 - const counts = { 53 - nsid, 54 - count: event.count - oldEvent.count, 55 - deleted_count: event.deleted_count - oldEvent.deleted_count, 56 - last_seen: event.last_seen, 57 - }; 58 - if (counts.count > 0 || counts.deleted_count > 0) 59 - nsidCounts.push(counts); 60 - } else { 61 - nsidCounts.push({ 62 - nsid, 63 - ...event, 64 - }); 65 - } 66 - } 67 - return nsidCounts; 68 - }; 69 - const applyEvents = (newEvents: Record<string, EventRecord>) => { 70 - events.update((map) => { 71 - for (const [nsid, event] of Object.entries(newEvents)) { 72 - map.set(nsid, event); 73 - } 74 - return map; 75 - }); 76 - }; 77 - 78 - let error: string | null = $state(null); 79 - let filterRegex = $state(""); 80 - let dontShowBsky = $state(false); 81 - let sortBy: SortOption = $state("total"); 82 - let refreshRate = $state(""); 83 - let changedByUser = $state(false); 84 - let show: ShowOption = $state("server init"); 85 20 let eventsList: NsidCount[] = $state([]); 86 - let updateEventsList = $derived((value: Map<string, EventRecord>) => { 87 - switch (show) { 88 - case "server init": 89 - eventsList = value 90 - .entries() 91 - .map(([nsid, event]) => ({ 92 - nsid, 93 - ...event, 94 - })) 95 - .toArray(); 96 - break; 97 - case "stream start": 98 - eventsList = diffEvents(eventsStart, value); 99 - break; 100 - } 21 + let updateTimer: NodeJS.Timeout | null = null; 22 + events.subscribe((value) => { 23 + eventsList = value 24 + .entries() 25 + .map(([nsid, event]) => ({ 26 + nsid, 27 + ...event, 28 + })) 29 + .toArray(); 101 30 }); 102 - events.subscribe((value) => updateEventsList(value)); 31 + let per_second = $state(0); 32 + let tracking_since = $state(0); 33 + 103 34 let all: EventRecord = $derived( 104 35 eventsList.reduce( 105 36 (acc, event) => { ··· 119 50 }, 120 51 ), 121 52 ); 53 + let error: string | null = $state(null); 54 + let filterRegex = $state(""); 55 + let dontShowBsky = $state(false); 56 + let sortBy: SortOption = $state("total"); 57 + let refreshRate = $state(""); 58 + let changedByUser = $state(false); 122 59 123 60 let websocket: WebSocket | null = null; 124 61 let isStreamOpen = $state(false); ··· 139 76 }; 140 77 websocket.onmessage = async (event) => { 141 78 const jsonData = JSON.parse(event.data); 142 - per_second = jsonData.per_second; 79 + 80 + if (jsonData.per_second > 0) { 81 + per_second = jsonData.per_second; 82 + } 83 + 84 + // Store updates in pending map if refresh rate is set 143 85 if (refreshRate) { 144 86 for (const [nsid, event] of Object.entries(jsonData.events)) { 145 87 pendingUpdates.set(nsid, event as EventRecord); 146 88 } 147 89 } else { 148 - applyEvents(jsonData.events); 90 + // Apply updates immediately if no refresh rate 91 + events.update((map) => { 92 + for (const [nsid, event] of Object.entries( 93 + jsonData.events, 94 + )) { 95 + map.set(nsid, event as EventRecord); 96 + } 97 + return map; 98 + }); 149 99 } 150 100 }; 151 101 websocket.onerror = (error) => { ··· 164 114 error = null; 165 115 const data = await fetchEvents(); 166 116 per_second = data.per_second; 167 - applyEvents(data.events); 117 + events.update((map) => { 118 + for (const [nsid, event] of Object.entries(data.events)) { 119 + map.set(nsid, event); 120 + } 121 + return map; 122 + }); 168 123 tracking_since = (await fetchTrackingSince()).since; 169 124 } catch (err) { 170 125 error = ··· 267 222 /> 268 223 </svelte:head> 269 224 270 - <header 271 - class="bg-white dark:bg-gray-900 border-gray-300 dark:border-gray-950 border-b mb-4 pb-2" 272 - > 225 + <header class="border-gray-300 border-b mb-4 pb-2"> 273 226 <div 274 227 class="px-2 md:ml-[19vw] mx-auto flex flex-wrap items-center text-center" 275 228 > 276 - <h1 class="text-4xl font-bold mr-4 text-gray-900 dark:text-gray-200"> 277 - lexicon tracker 278 - </h1> 279 - <p class="text-lg mt-1 text-gray-600 dark:text-gray-300"> 229 + <h1 class="text-4xl font-bold mr-4 text-gray-900">lexicon tracker</h1> 230 + <p class="text-lg mt-1 text-gray-600"> 280 231 tracks lexicons seen on the jetstream {tracking_since === 0 281 232 ? "" 282 233 : `(since: ${formatTimestamp(tracking_since)})`} 283 234 </p> 284 235 </div> 285 236 </header> 286 - <div class="bg-white dark:bg-gray-900 md:max-w-[61vw] mx-auto p-2"> 237 + <div class="md:max-w-[61vw] mx-auto p-2"> 287 238 <div class="min-w-fit grid grid-cols-2 xl:grid-cols-4 gap-2 2xl:gap-6 mb-8"> 288 239 <StatsCard 289 240 title="total creation" ··· 309 260 310 261 {#if error} 311 262 <div 312 - class="bg-red-100 dark:bg-red-900 border border-red-300 dark:border-red-700 text-red-700 dark:text-red-200 px-4 py-3 rounded-lg mb-6" 263 + class="bg-red-100 border border-red-300 text-red-700 px-4 py-3 rounded-lg mb-6" 313 264 > 314 265 <p>Error: {error}</p> 315 266 </div> ··· 318 269 {#if eventsList.length > 0} 319 270 <div class="mb-8"> 320 271 <div class="flex flex-wrap items-center gap-3 mb-3"> 321 - <h2 class="text-2xl font-bold text-gray-900 dark:text-gray-200"> 322 - seen lexicons 323 - </h2> 272 + <h2 class="text-2xl font-bold text-gray-900">seen lexicons</h2> 324 273 <StatusBadge status={websocketStatus} /> 325 274 </div> 326 275 <div class="flex flex-wrap items-center gap-1.5 mb-6"> ··· 342 291 refreshRate = ""; 343 292 }} 344 293 /> 345 - <ShowControls 346 - {show} 347 - onShowChange={(value: ShowOption) => { 348 - show = value; 349 - updateEventsList(get(events)); 350 - }} 351 - /> 352 294 <RefreshControl 353 295 {refreshRate} 354 296 onRefreshChange={(value) => { ··· 373 315 {/if} 374 316 </div> 375 317 376 - <footer class="py-2 border-t border-gray-200 dark:border-gray-800 text-center"> 377 - <p class="text-gray-600 dark:text-gray-200 text-sm"> 318 + <footer class="py-2 border-t border-gray-200 text-center"> 319 + <p class="text-gray-600 text-sm"> 378 320 source code <a 379 321 href="https://tangled.sh/@poor.dog/nsid-tracker" 380 322 target="_blank" 381 323 rel="noopener noreferrer" 382 - class="text-blue-600 dark:text-blue-400 hover:text-blue-800 dark:hover:text-blue-600 underline" 324 + class="text-blue-600 hover:text-blue-800 underline" 383 325 >@poor.dog/nsid-tracker</a 384 326 > 385 327 </p>
+1 -1
client/svelte.config.js
··· 1 - import adapter from "svelte-adapter-bun"; 1 + import adapter from "@sveltejs/adapter-static"; 2 2 import { vitePreprocess } from "@sveltejs/vite-plugin-svelte"; 3 3 4 4 /** @type {import('@sveltejs/kit').Config} */
+1 -1
nix/client-modules.nix
··· 8 8 9 9 src = ../client; 10 10 11 - outputHash = "sha256-njwXk3u0NUsYWLv9EOdCltgQOjTVkcfu+D+0COSw/6I="; 11 + outputHash = "sha256-t8PJFo+3XGkzmMNbw9Rf9cS5Ob5YtI8ucX3ay+u9a3M="; 12 12 outputHashAlgo = "sha256"; 13 13 outputHashMode = "recursive"; 14 14
+2 -10
nix/client.nix
··· 1 1 { 2 - lib, 3 2 stdenv, 4 3 makeBinaryWrapper, 5 4 bun, ··· 29 28 ''; 30 29 buildPhase = '' 31 30 runHook preBuild 32 - bun --prefer-offline run build 31 + bun --prefer-offline run --bun build 33 32 runHook postBuild 34 33 ''; 35 34 installPhase = '' 36 35 runHook preInstall 37 - 38 - mkdir -p $out/bin 36 + mkdir -p $out 39 37 cp -R ./build/* $out 40 - cp -R ./node_modules $out 41 - 42 - makeBinaryWrapper ${bun}/bin/bun $out/bin/website \ 43 - --prefix PATH : ${lib.makeBinPath [ bun ]} \ 44 - --add-flags "run --bun --no-install --cwd $out start" 45 - 46 38 runHook postInstall 47 39 ''; 48 40 }
+28 -42
server/Cargo.lock
··· 18 18 checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" 19 19 20 20 [[package]] 21 - name = "ahash" 22 - version = "0.8.12" 23 - source = "registry+https://github.com/rust-lang/crates.io-index" 24 - checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" 25 - dependencies = [ 26 - "cfg-if", 27 - "getrandom 0.3.3", 28 - "once_cell", 29 - "serde", 30 - "version_check", 31 - "zerocopy", 32 - ] 33 - 34 - [[package]] 35 21 name = "aho-corasick" 36 22 version = "1.1.3" 37 23 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 60 46 version = "1.0.98" 61 47 source = "registry+https://github.com/rust-lang/crates.io-index" 62 48 checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" 63 - 64 - [[package]] 65 - name = "arc-swap" 66 - version = "1.7.1" 67 - source = "registry+https://github.com/rust-lang/crates.io-index" 68 - checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" 69 49 70 50 [[package]] 71 51 name = "async-compression" ··· 327 307 version = "0.2.1" 328 308 source = "registry+https://github.com/rust-lang/crates.io-index" 329 309 checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" 310 + 311 + [[package]] 312 + name = "cmake" 313 + version = "0.1.54" 314 + source = "registry+https://github.com/rust-lang/crates.io-index" 315 + checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" 316 + dependencies = [ 317 + "cc", 318 + ] 330 319 331 320 [[package]] 332 321 name = "combine" ··· 1563 1552 name = "server" 1564 1553 version = "0.1.0" 1565 1554 dependencies = [ 1566 - "ahash", 1567 1555 "anyhow", 1568 - "arc-swap", 1569 1556 "async-trait", 1570 1557 "axum", 1571 1558 "axum-tws", ··· 1584 1571 "serde", 1585 1572 "serde_json", 1586 1573 "smol_str", 1574 + "snmalloc-rs", 1587 1575 "threadpool", 1588 1576 "tikv-jemallocator", 1589 1577 "tokio", ··· 1655 1643 dependencies = [ 1656 1644 "borsh", 1657 1645 "serde", 1646 + ] 1647 + 1648 + [[package]] 1649 + name = "snmalloc-rs" 1650 + version = "0.3.8" 1651 + source = "registry+https://github.com/rust-lang/crates.io-index" 1652 + checksum = "eb317153089fdfa4d8a2eec059d40a5a23c3bde43995ea23b19121c3f621e74a" 1653 + dependencies = [ 1654 + "snmalloc-sys", 1655 + ] 1656 + 1657 + [[package]] 1658 + name = "snmalloc-sys" 1659 + version = "0.3.8" 1660 + source = "registry+https://github.com/rust-lang/crates.io-index" 1661 + checksum = "065fea53d32bb77bc36cca466cb191f2e5216ebfd0ed360b1d64889ee6e559ea" 1662 + dependencies = [ 1663 + "cmake", 1658 1664 ] 1659 1665 1660 1666 [[package]] ··· 2400 2406 version = "0.8.15" 2401 2407 source = "registry+https://github.com/rust-lang/crates.io-index" 2402 2408 checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 2403 - 2404 - [[package]] 2405 - name = "zerocopy" 2406 - version = "0.8.26" 2407 - source = "registry+https://github.com/rust-lang/crates.io-index" 2408 - checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" 2409 - dependencies = [ 2410 - "zerocopy-derive", 2411 - ] 2412 - 2413 - [[package]] 2414 - name = "zerocopy-derive" 2415 - version = "0.8.26" 2416 - source = "registry+https://github.com/rust-lang/crates.io-index" 2417 - checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" 2418 - dependencies = [ 2419 - "proc-macro2", 2420 - "quote", 2421 - "syn", 2422 - ] 2423 2409 2424 2410 [[package]] 2425 2411 name = "zeroize"
+2 -2
server/Cargo.toml
··· 30 30 rayon = "1.10.0" 31 31 parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] } 32 32 rclite = "0.2.7" 33 - arc-swap = "1.7.1" 34 - ahash = { version = "0.8.12", features = ["serde"] } 35 33 34 + [target.'cfg(target_env = "msvc")'.dependencies] 35 + snmalloc-rs = "0.3.8" 36 36 37 37 [target.'cfg(not(target_env = "msvc"))'.dependencies] 38 38 tikv-jemallocator = "0.6"
+18 -16
server/src/api.rs
··· 1 1 use std::{ 2 + collections::HashMap, 2 3 fmt::Display, 3 4 net::SocketAddr, 4 5 ops::{Bound, Deref, RangeBounds}, 5 6 time::Duration, 6 7 }; 7 8 8 - use ahash::AHashMap; 9 9 use anyhow::anyhow; 10 10 use axum::{ 11 11 Json, Router, ··· 117 117 #[derive(Serialize)] 118 118 struct Events { 119 119 per_second: usize, 120 - events: AHashMap<SmolStr, NsidCount>, 120 + events: HashMap<SmolStr, NsidCount>, 121 121 } 122 122 123 123 async fn events(db: State<Arc<Db>>) -> AppResult<Json<Events>> { 124 - let mut events = AHashMap::new(); 124 + let mut events = HashMap::new(); 125 125 for result in db.get_counts() { 126 126 let (nsid, counts) = result?; 127 127 events.insert( ··· 176 176 ) -> AppResult<Json<Vec<Hit>>> { 177 177 let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded); 178 178 let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded); 179 + let maybe_hits = db 180 + .get_hits(&params.nsid, HitsRange { from, to }) 181 + .take(MAX_HITS); 182 + let mut hits = Vec::with_capacity(maybe_hits.size_hint().0); 179 183 180 - db.get_hits(&params.nsid, HitsRange { from, to }, MAX_HITS) 181 - .take(MAX_HITS) 182 - .try_fold(Vec::with_capacity(MAX_HITS), |mut acc, hit| { 183 - let hit = hit?; 184 - let hit_data = hit.deser()?; 184 + for maybe_hit in maybe_hits { 185 + let hit = maybe_hit?; 186 + let hit_data = hit.deser()?; 185 187 186 - acc.push(Hit { 187 - timestamp: hit.timestamp, 188 - deleted: hit_data.deleted, 189 - }); 190 - Ok(acc) 191 - }) 192 - .map(Json) 188 + hits.push(Hit { 189 + timestamp: hit.timestamp, 190 + deleted: hit_data.deleted, 191 + }); 192 + } 193 + 194 + Ok(Json(hits)) 193 195 } 194 196 195 197 async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response { ··· 198 200 (async move { 199 201 let mut listener = db.new_listener(); 200 202 let mut data = Events { 201 - events: AHashMap::<SmolStr, NsidCount>::with_capacity(10), 203 + events: HashMap::<SmolStr, NsidCount>::with_capacity(10), 202 204 per_second: 0, 203 205 }; 204 206 let mut updates = 0;
+26 -54
server/src/db/handle.rs
··· 1 1 use std::{ 2 2 fmt::Debug, 3 3 io::Cursor, 4 - ops::{Bound, RangeBounds}, 4 + ops::{Bound, Deref, RangeBounds}, 5 5 sync::atomic::{AtomicU64, Ordering as AtomicOrdering}, 6 6 time::Duration, 7 7 }; 8 8 9 9 use byteview::ByteView; 10 - use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice, Snapshot}; 10 + use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice}; 11 11 use itertools::Itertools; 12 12 use parking_lot::Mutex; 13 13 use rayon::iter::{IntoParallelIterator, ParallelIterator}; ··· 16 16 17 17 use crate::{ 18 18 db::{EventRecord, NsidHit, block}, 19 - error::{AppError, AppResult}, 20 - utils::{ 21 - ArcRefCnt, ArcliteSwap, CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, 22 - varints_unsigned_encoded, 23 - }, 19 + error::AppResult, 20 + utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 24 21 }; 25 22 26 23 pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; ··· 34 31 } 35 32 36 33 pub struct LexiconHandle { 37 - write_tree: Partition, 38 - read_tree: ArcliteSwap<Snapshot>, 34 + tree: Partition, 39 35 nsid: SmolStr, 40 36 buf: Arc<Mutex<Vec<EventRecord>>>, 41 37 last_insert: AtomicU64, // relaxed ··· 50 46 } 51 47 } 52 48 49 + impl Deref for LexiconHandle { 50 + type Target = Partition; 51 + 52 + fn deref(&self) -> &Self::Target { 53 + &self.tree 54 + } 55 + } 56 + 53 57 impl LexiconHandle { 54 58 pub fn new(keyspace: &Keyspace, nsid: &str) -> Self { 55 59 let opts = PartitionCreateOptions::default() 56 - .block_size(1024 * 48) 60 + .block_size(1024 * 128) 57 61 .compression(fjall::CompressionType::Miniz(9)); 58 - let write_tree = keyspace.open_partition(nsid, opts).unwrap(); 59 - let read_tree = ArcliteSwap::new(ArcRefCnt::new(write_tree.snapshot())); 60 62 Self { 61 - write_tree, 62 - read_tree, 63 + tree: keyspace.open_partition(nsid, opts).unwrap(), 63 64 nsid: nsid.into(), 64 65 buf: Default::default(), 65 66 last_insert: AtomicU64::new(0), ··· 67 68 } 68 69 } 69 70 70 - #[inline(always)] 71 - pub fn read(&self) -> arc_swap::Guard<ArcRefCnt<Snapshot>> { 72 - self.read_tree.load() 73 - } 74 - 75 - #[inline(always)] 76 - pub fn update_tree(&self) { 77 - self.read_tree 78 - .store(ArcRefCnt::new(self.write_tree.snapshot())); 79 - } 80 - 81 - #[inline(always)] 82 - pub fn span(&self) -> tracing::Span { 83 - tracing::info_span!("handle", nsid = %self.nsid) 84 - } 85 - 86 - #[inline(always)] 87 71 pub fn nsid(&self) -> &SmolStr { 88 72 &self.nsid 89 73 } 90 74 91 - #[inline(always)] 92 75 pub fn item_count(&self) -> usize { 93 76 self.buf.lock().len() 94 77 } 95 78 96 - pub fn since_last_activity(&self) -> Duration { 97 - Duration::from_nanos( 98 - CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()), 99 - ) 79 + pub fn since_last_activity(&self) -> u64 { 80 + CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()) 100 81 } 101 82 102 83 pub fn suggested_block_size(&self) -> usize { ··· 118 99 range: impl RangeBounds<u64>, 119 100 sort: bool, 120 101 ) -> AppResult<()> { 121 - let _span = self.span().entered(); 122 - 123 102 let start_limit = match range.start_bound().cloned() { 124 103 Bound::Included(start) => start, 125 104 Bound::Excluded(start) => start.saturating_add(1), ··· 135 114 let end_key = varints_unsigned_encoded([end_limit]); 136 115 137 116 let blocks_to_compact = self 138 - .read() 117 + .tree 139 118 .range(start_key..end_key) 140 119 .collect::<Result<Vec<_>, _>>()?; 141 120 if blocks_to_compact.len() < 2 { 121 + tracing::info!("{}: nothing to compact", self.nsid); 142 122 return Ok(()); 143 123 } 144 124 ··· 175 155 let end_blocks_size = new_blocks.len(); 176 156 177 157 for key in keys_to_delete { 178 - self.write_tree.remove(key.clone())?; 158 + self.tree.remove(key.clone())?; 179 159 } 180 160 for block in new_blocks { 181 - self.write_tree.insert(block.key, block.data)?; 161 + self.tree.insert(block.key, block.data)?; 182 162 } 183 163 184 - let reduction = 185 - ((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0; 186 164 tracing::info!( 187 - { 188 - start = start_blocks_size, 189 - end = end_blocks_size, 190 - }, 191 - "blocks compacted {reduction:.2}%", 165 + "{}: compacted {} blocks to {} blocks ({}% reduction)", 166 + self.nsid, 167 + start_blocks_size, 168 + end_blocks_size, 169 + ((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0, 192 170 ); 193 171 194 172 Ok(()) 195 - } 196 - 197 - pub fn insert_block(&self, block: Block) -> AppResult<()> { 198 - self.write_tree 199 - .insert(block.key, block.data) 200 - .map_err(AppError::from) 201 173 } 202 174 203 175 pub fn encode_block_from_items(
+82 -139
server/src/db/mod.rs
··· 1 1 use std::{ 2 + collections::HashMap, 2 3 fmt::Debug, 3 4 io::Cursor, 4 5 ops::{Bound, Deref, RangeBounds}, 5 - path::Path, 6 + path::{Path, PathBuf}, 6 7 time::Duration, 7 - u64, 8 8 }; 9 9 10 - use ahash::{AHashMap, AHashSet}; 11 10 use byteview::StrView; 12 - use fjall::{Keyspace, Partition, PartitionCreateOptions}; 11 + use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; 13 12 use itertools::{Either, Itertools}; 14 - use rayon::iter::{IntoParallelIterator, ParallelIterator}; 13 + use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; 15 14 use rclite::Arc; 16 15 use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 17 16 use smol_str::{SmolStr, ToSmolStr}; ··· 22 21 db::handle::{ItemDecoder, LexiconHandle}, 23 22 error::{AppError, AppResult}, 24 23 jetstream::JetstreamEvent, 25 - utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 24 + utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded}, 26 25 }; 27 26 28 27 mod block; ··· 72 71 } 73 72 74 73 pub struct DbInfo { 75 - pub nsids: AHashMap<SmolStr, Vec<usize>>, 74 + pub nsids: HashMap<SmolStr, Vec<usize>>, 76 75 pub disk_size: u64, 77 76 } 78 77 ··· 80 79 pub ks_config: fjall::Config, 81 80 pub min_block_size: usize, 82 81 pub max_block_size: usize, 83 - pub max_last_activity: Duration, 82 + pub max_last_activity: u64, 84 83 } 85 84 86 85 impl DbConfig { ··· 98 97 impl Default for DbConfig { 99 98 fn default() -> Self { 100 99 Self { 101 - ks_config: fjall::Config::default() 102 - .cache_size(1024 * 1024 * 512) 103 - .max_write_buffer_size(u64::MAX), 104 - min_block_size: 1000, 105 - max_block_size: 250_000, 106 - max_last_activity: Duration::from_secs(10), 100 + ks_config: fjall::Config::default(), 101 + min_block_size: 512, 102 + max_block_size: 500_000, 103 + max_last_activity: Duration::from_secs(10).as_nanos() as u64, 107 104 } 108 105 } 109 106 } ··· 114 111 pub cfg: DbConfig, 115 112 pub ks: Keyspace, 116 113 counts: Partition, 117 - hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>, ahash::RandomState>, 114 + hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 118 115 sync_pool: threadpool::ThreadPool, 119 116 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 120 - eps: RateTracker<100>, // 100 millis buckets 117 + eps: RateTracker<100>, 121 118 cancel_token: CancellationToken, 122 119 } 123 120 ··· 163 160 } 164 161 165 162 pub fn sync(&self, all: bool) -> AppResult<()> { 166 - let start = CLOCK.now(); 167 163 // prepare all the data 168 - let nsids_len = self.hits.len(); 169 - let mut data = Vec::with_capacity(nsids_len); 170 - let mut nsids = AHashSet::with_capacity(nsids_len); 164 + let mut data = Vec::with_capacity(self.hits.len()); 171 165 let _guard = scc::ebr::Guard::new(); 172 - for (nsid, handle) in self.hits.iter(&_guard) { 166 + for (_, handle) in self.hits.iter(&_guard) { 173 167 let mut nsid_data = Vec::with_capacity(2); 174 - // let mut total_count = 0; 168 + let mut total_count = 0; 175 169 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; 176 170 // if we disconnect for a long time, we want to sync all of what we 177 171 // have to avoid having many small blocks (even if we run compaction ··· 186 180 let count = handle.item_count(); 187 181 let data_count = count / block_size; 188 182 if count > 0 && (all || data_count > 0 || is_too_old) { 189 - for _ in 0..data_count { 190 - nsid_data.push((handle.clone(), block_size)); 191 - // total_count += block_size; 183 + for i in 0..data_count { 184 + nsid_data.push((i, handle.clone(), block_size)); 185 + total_count += block_size; 192 186 } 193 187 // only sync remainder if we haven't met block size 194 188 let remainder = count % block_size; 195 189 if (all || data_count == 0) && remainder > 0 { 196 - nsid_data.push((handle.clone(), remainder)); 197 - // total_count += remainder; 190 + nsid_data.push((data_count, handle.clone(), remainder)); 191 + total_count += remainder; 198 192 } 199 193 } 200 - let _span = handle.span().entered(); 201 - if nsid_data.len() > 0 { 202 - // tracing::info!( 203 - // {blocks = %nsid_data.len(), count = %total_count}, 204 - // "will encode & sync", 205 - // ); 206 - nsids.insert(nsid.clone()); 207 - data.push(nsid_data); 208 - } 194 + tracing::info!( 195 + "{}: will sync {} blocks ({} count)", 196 + handle.nsid(), 197 + nsid_data.len(), 198 + total_count, 199 + ); 200 + data.push(nsid_data); 209 201 } 210 202 drop(_guard); 211 203 ··· 214 206 .map(|chunk| { 215 207 chunk 216 208 .into_iter() 217 - .map(|(handle, max_block_size)| { 218 - (handle.take_block_items(max_block_size), handle) 209 + .map(|(i, handle, max_block_size)| { 210 + (i, handle.take_block_items(max_block_size), handle) 219 211 }) 220 212 .collect::<Vec<_>>() 221 213 .into_par_iter() 222 - .map(|(items, handle)| { 214 + .map(|(i, items, handle)| { 223 215 let count = items.len(); 224 216 let block = LexiconHandle::encode_block_from_items(items, count)?; 225 - AppResult::Ok((block, handle)) 217 + tracing::info!( 218 + "{}: encoded block with {} items", 219 + handle.nsid(), 220 + block.written, 221 + ); 222 + AppResult::Ok((i, block, handle)) 226 223 }) 227 224 .collect::<Result<Vec<_>, _>>() 228 225 }) 229 226 .try_for_each(|chunk| { 230 227 let chunk = chunk?; 231 - for (block, handle) in chunk { 232 - self.sync_pool.execute(move || { 233 - let _span = handle.span().entered(); 234 - let written = block.written; 235 - match handle.insert_block(block) { 228 + for (i, block, handle) in chunk { 229 + self.sync_pool 230 + .execute(move || match handle.insert(block.key, block.data) { 236 231 Ok(_) => { 237 - tracing::info!({count = %written}, "synced") 232 + tracing::info!("{}: [{i}] synced {}", block.written, handle.nsid()) 238 233 } 239 - Err(err) => tracing::error!({ err = %err }, "failed to sync block"), 240 - } 241 - }); 234 + Err(err) => tracing::error!("failed to sync block: {}", err), 235 + }); 242 236 } 243 237 AppResult::Ok(()) 244 238 })?; 245 239 self.sync_pool.join(); 246 240 247 - // update snapshots for all (changed) handles 248 - for nsid in nsids { 249 - self.hits.peek_with(&nsid, |_, handle| handle.update_tree()); 250 - } 251 - 252 - tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks"); 253 - 254 241 Ok(()) 255 242 } 256 243 ··· 264 251 let Some(handle) = self.get_handle(nsid) else { 265 252 return Ok(()); 266 253 }; 267 - handle.compact(max_count, range, sort)?; 268 - handle.update_tree(); 269 - Ok(()) 254 + handle.compact(max_count, range, sort) 270 255 } 271 256 272 257 pub fn compact_all( ··· 283 268 284 269 pub fn major_compact(&self) -> AppResult<()> { 285 270 self.compact_all(self.cfg.max_block_size, .., true)?; 271 + let _guard = scc::ebr::Guard::new(); 272 + for (_, handle) in self.hits.iter(&_guard) { 273 + handle.deref().major_compact()?; 274 + } 286 275 Ok(()) 287 276 } 288 277 ··· 312 301 } 313 302 314 303 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> { 315 - let mut seen_events = 0; 316 304 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() { 317 305 let mut counts = self.get_count(&key)?; 306 + let mut count = 0; 318 307 self.ensure_handle(&key).queue(chunk.inspect(|e| { 319 308 // increment count 320 309 counts.last_seen = e.timestamp; ··· 323 312 } else { 324 313 counts.count += 1; 325 314 } 326 - seen_events += 1; 315 + count += 1; 327 316 })); 317 + self.eps.observe(count); 328 318 self.insert_count(&key, &counts)?; 329 319 if self.event_broadcaster.receiver_count() > 0 { 330 320 let _ = self.event_broadcaster.send((key, counts)); 331 321 } 332 322 } 333 - self.eps.observe(seen_events); 334 323 Ok(()) 335 324 } 336 325 ··· 370 359 } 371 360 372 361 pub fn info(&self) -> AppResult<DbInfo> { 373 - let mut nsids = AHashMap::new(); 362 + let mut nsids = HashMap::new(); 374 363 for nsid in self.get_nsids() { 375 364 let Some(handle) = self.get_handle(&nsid) else { 376 365 continue; 377 366 }; 378 - let block_lens = handle 379 - .read() 380 - .iter() 381 - .rev() 382 - .try_fold(Vec::new(), |mut acc, item| { 383 - let (key, value) = item?; 384 - let mut timestamps = Cursor::new(key); 385 - let start_timestamp = timestamps.read_varint()?; 386 - let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 387 - acc.push(decoder.item_count()); 388 - AppResult::Ok(acc) 389 - })?; 367 + let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| { 368 + let (key, value) = item?; 369 + let mut timestamps = Cursor::new(key); 370 + let start_timestamp = timestamps.read_varint()?; 371 + let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 372 + acc.push(decoder.item_count()); 373 + AppResult::Ok(acc) 374 + })?; 390 375 nsids.insert(nsid.to_smolstr(), block_lens); 391 376 } 392 377 Ok(DbInfo { ··· 399 384 &self, 400 385 nsid: &str, 401 386 range: impl RangeBounds<u64> + std::fmt::Debug, 402 - max_items: usize, 403 387 ) -> impl Iterator<Item = AppResult<handle::Item>> { 404 388 let start_limit = match range.start_bound().cloned() { 405 389 Bound::Included(start) => start, ··· 417 401 return Either::Right(std::iter::empty()); 418 402 }; 419 403 420 - // let mut ts = CLOCK.now(); 421 - let map_block = move |(res, current_item_count)| -> AppResult<(Option<_>, usize)> { 422 - if current_item_count >= max_items { 423 - return Ok((None, current_item_count)); 424 - } 425 - let (key, val) = res?; 404 + let map_block = move |(key, val)| { 426 405 let mut key_reader = Cursor::new(key); 427 406 let start_timestamp = key_reader.read_varint::<u64>()?; 428 - // let end_timestamp = key_reader.read_varint::<u64>()?; 429 407 if start_timestamp < start_limit { 430 - // tracing::info!( 431 - // "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater" 432 - // ); 433 - return Ok((None, current_item_count)); 408 + return Ok(None); 434 409 } 435 - let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?; 436 - let current_item_count = current_item_count + decoder.item_count(); 437 - // tracing::info!( 438 - // "took {}ns to get block with size {}", 439 - // ts.elapsed().as_nanos(), 440 - // decoder.item_count() 441 - // ); 442 - // ts = CLOCK.now(); 443 - Ok(( 444 - Some( 445 - decoder 446 - .take_while(move |item| { 447 - item.as_ref().map_or(true, |item| { 448 - item.timestamp <= end_limit && item.timestamp >= start_limit 449 - }) 450 - }) 451 - .map(|res| res.map_err(AppError::from)), 452 - ), 453 - current_item_count, 454 - )) 410 + let items = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)? 411 + .take_while(move |item| { 412 + item.as_ref().map_or(true, |item| { 413 + item.timestamp <= end_limit && item.timestamp >= start_limit 414 + }) 415 + }) 416 + .map(|res| res.map_err(AppError::from)); 417 + Ok(Some(items)) 455 418 }; 456 419 457 - let (blocks, _counted) = handle 458 - .read() 459 - .range(..end_key) 460 - .map(|res| res.map_err(AppError::from)) 461 - .rev() 462 - .fold_while( 463 - (Vec::with_capacity(20), 0), 464 - |(mut blocks, current_item_count), res| { 465 - use itertools::FoldWhile::*; 466 - 467 - match map_block((res, current_item_count)) { 468 - Ok((Some(block), current_item_count)) => { 469 - blocks.push(Ok(block)); 470 - Continue((blocks, current_item_count)) 471 - } 472 - Ok((None, current_item_count)) => Done((blocks, current_item_count)), 473 - Err(err) => { 474 - blocks.push(Err(err)); 475 - Done((blocks, current_item_count)) 476 - } 477 - } 478 - }, 479 - ) 480 - .into_inner(); 481 - 482 - // tracing::info!( 483 - // "got blocks with size {}, item count {counted}", 484 - // blocks.len() 485 - // ); 486 - 487 - Either::Left(blocks.into_iter().rev().flatten().flatten()) 420 + Either::Left( 421 + handle 422 + .range(..end_key) 423 + .rev() 424 + .map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose()) 425 + .collect::<Vec<_>>() 426 + .into_iter() 427 + .rev() 428 + .flatten() 429 + .flatten(), 430 + ) 488 431 } 489 432 490 433 pub fn tracking_since(&self) -> AppResult<u64> { ··· 493 436 let Some(handle) = self.get_handle("app.bsky.feed.like") else { 494 437 return Ok(0); 495 438 }; 496 - let Some((timestamps_raw, _)) = handle.read().first_key_value()? else { 439 + let Some((timestamps_raw, _)) = handle.first_key_value()? else { 497 440 return Ok(0); 498 441 }; 499 442 let mut timestamp_reader = Cursor::new(timestamps_raw);
+501
server/src/db_old/block.rs
··· 1 + use ordered_varint::Variable; 2 + use rkyv::{ 3 + Archive, Deserialize, Serialize, 4 + api::high::{HighSerializer, HighValidator}, 5 + bytecheck::CheckBytes, 6 + de::Pool, 7 + rancor::{self, Strategy}, 8 + ser::allocator::ArenaHandle, 9 + util::AlignedVec, 10 + }; 11 + use std::{ 12 + io::{self, Read, Write}, 13 + marker::PhantomData, 14 + }; 15 + 16 + use crate::error::{AppError, AppResult}; 17 + 18 + pub struct Item<T> { 19 + pub timestamp: u64, 20 + data: AlignedVec, 21 + phantom: PhantomData<T>, 22 + } 23 + 24 + impl<T: Archive> Item<T> { 25 + pub fn access(&self) -> &T::Archived { 26 + unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) } 27 + } 28 + } 29 + 30 + impl<T> Item<T> 31 + where 32 + T: Archive, 33 + T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>> 34 + + Deserialize<T, Strategy<Pool, rancor::Error>>, 35 + { 36 + pub fn deser(&self) -> AppResult<T> { 37 + rkyv::from_bytes(&self.data).map_err(AppError::from) 38 + } 39 + } 40 + 41 + impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> { 42 + pub fn new(timestamp: u64, data: &T) -> Self { 43 + Item { 44 + timestamp, 45 + data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() }, 46 + phantom: PhantomData, 47 + } 48 + } 49 + } 50 + 51 + pub struct ItemEncoder<W: Write, T> { 52 + writer: W, 53 + prev_timestamp: u64, 54 + prev_delta: i64, 55 + _item: PhantomData<T>, 56 + } 57 + 58 + impl<W: Write, T> ItemEncoder<W, T> { 59 + pub fn new(writer: W) -> Self { 60 + ItemEncoder { 61 + writer, 62 + prev_timestamp: 0, 63 + prev_delta: 0, 64 + _item: PhantomData, 65 + } 66 + } 67 + 68 + pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> { 69 + if self.prev_timestamp == 0 { 70 + // self.writer.write_varint(item.timestamp)?; 71 + self.prev_timestamp = item.timestamp; 72 + self.write_data(&item.data)?; 73 + return Ok(()); 74 + } 75 + 76 + let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64; 77 + 78 + self.writer.write_varint(delta - self.prev_delta)?; 79 + self.prev_timestamp = item.timestamp; 80 + self.prev_delta = delta; 81 + 82 + self.write_data(&item.data)?; 83 + 84 + Ok(()) 85 + } 86 + 87 + fn write_data(&mut self, data: &[u8]) -> AppResult<()> { 88 + self.writer.write_varint(data.len())?; 89 + self.writer.write_all(data)?; 90 + Ok(()) 91 + } 92 + 93 + pub fn finish(mut self) -> AppResult<W> { 94 + self.writer.flush()?; 95 + Ok(self.writer) 96 + } 97 + } 98 + 99 + pub struct ItemDecoder<R, T> { 100 + reader: R, 101 + current_timestamp: u64, 102 + current_delta: i64, 103 + first_item: bool, 104 + _item: PhantomData<T>, 105 + } 106 + 107 + impl<R: Read, T: Archive> ItemDecoder<R, T> { 108 + pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> { 109 + Ok(ItemDecoder { 110 + reader, 111 + current_timestamp: start_timestamp, 112 + current_delta: 0, 113 + first_item: true, 114 + _item: PhantomData, 115 + }) 116 + } 117 + 118 + pub fn decode(&mut self) -> AppResult<Option<Item<T>>> { 119 + if self.first_item { 120 + // read the first timestamp 121 + // let timestamp = match self.reader.read_varint::<u64>() { 122 + // Ok(timestamp) => timestamp, 123 + // Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 124 + // Err(e) => return Err(e.into()), 125 + // }; 126 + // self.current_timestamp = timestamp; 127 + 128 + let Some(data_raw) = self.read_item()? else { 129 + return Ok(None); 130 + }; 131 + self.first_item = false; 132 + return Ok(Some(Item { 133 + timestamp: self.current_timestamp, 134 + data: data_raw, 135 + phantom: PhantomData, 136 + })); 137 + } 138 + 139 + let Some(_delta) = self.read_timestamp()? else { 140 + return Ok(None); 141 + }; 142 + 143 + // read data 144 + let data_raw = match self.read_item()? { 145 + Some(data_raw) => data_raw, 146 + None => { 147 + return Err(io::Error::new( 148 + io::ErrorKind::UnexpectedEof, 149 + "expected data after delta", 150 + ) 151 + .into()); 152 + } 153 + }; 154 + 155 + Ok(Some(Item { 156 + timestamp: self.current_timestamp, 157 + data: data_raw, 158 + phantom: PhantomData, 159 + })) 160 + } 161 + 162 + // [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1] 163 + fn read_timestamp(&mut self) -> AppResult<Option<u64>> { 164 + let delta = match self.reader.read_varint::<i64>() { 165 + Ok(delta) => delta, 166 + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 167 + Err(e) => return Err(e.into()), 168 + }; 169 + self.current_delta += delta; 170 + self.current_timestamp = 171 + (self.current_timestamp as i128 + self.current_delta as i128) as u64; 172 + Ok(Some(self.current_timestamp)) 173 + } 174 + 175 + fn read_item(&mut self) -> AppResult<Option<AlignedVec>> { 176 + let data_len = match self.reader.read_varint::<usize>() { 177 + Ok(data_len) => data_len, 178 + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 179 + Err(e) => return Err(e.into()), 180 + }; 181 + let mut data_raw = AlignedVec::with_capacity(data_len); 182 + for _ in 0..data_len { 183 + data_raw.push(0); 184 + } 185 + self.reader.read_exact(data_raw.as_mut_slice())?; 186 + Ok(Some(data_raw)) 187 + } 188 + } 189 + 190 + impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> { 191 + type Item = AppResult<Item<T>>; 192 + 193 + fn next(&mut self) -> Option<Self::Item> { 194 + self.decode().transpose() 195 + } 196 + } 197 + 198 + pub trait WriteVariableExt: Write { 199 + fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> { 200 + value.encode_variable(self) 201 + } 202 + } 203 + impl<W: Write> WriteVariableExt for W {} 204 + 205 + pub trait ReadVariableExt: Read { 206 + fn read_varint<T: Variable>(&mut self) -> io::Result<T> { 207 + T::decode_variable(self) 208 + } 209 + } 210 + impl<R: Read> ReadVariableExt for R {} 211 + 212 + #[cfg(test)] 213 + mod test { 214 + use super::*; 215 + use rkyv::{Archive, Deserialize, Serialize}; 216 + use std::io::Cursor; 217 + 218 + #[derive(Archive, Deserialize, Serialize, Debug, PartialEq)] 219 + #[rkyv(compare(PartialEq))] 220 + struct TestData { 221 + id: u32, 222 + value: String, 223 + } 224 + 225 + #[test] 226 + fn test_encoder_decoder_single_item() { 227 + let data = TestData { 228 + id: 123, 229 + value: "test".to_string(), 230 + }; 231 + 232 + let item = Item::new(1000, &data); 233 + 234 + // encode 235 + let mut buffer = Vec::new(); 236 + let mut encoder = ItemEncoder::new(&mut buffer); 237 + encoder.encode(&item).unwrap(); 238 + encoder.finish().unwrap(); 239 + 240 + // decode 241 + let cursor = Cursor::new(buffer); 242 + let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 243 + 244 + let decoded_item = decoder.decode().unwrap().unwrap(); 245 + assert_eq!(decoded_item.timestamp, 1000); 246 + 247 + let decoded_data = decoded_item.access(); 248 + assert_eq!(decoded_data.id, 123); 249 + assert_eq!(decoded_data.value.as_str(), "test"); 250 + } 251 + 252 + #[test] 253 + fn test_encoder_decoder_multiple_items() { 254 + let items = vec![ 255 + Item::new( 256 + 1000, 257 + &TestData { 258 + id: 1, 259 + value: "first".to_string(), 260 + }, 261 + ), 262 + Item::new( 263 + 1010, 264 + &TestData { 265 + id: 2, 266 + value: "second".to_string(), 267 + }, 268 + ), 269 + Item::new( 270 + 1015, 271 + &TestData { 272 + id: 3, 273 + value: "third".to_string(), 274 + }, 275 + ), 276 + Item::new( 277 + 1025, 278 + &TestData { 279 + id: 4, 280 + value: "fourth".to_string(), 281 + }, 282 + ), 283 + ]; 284 + 285 + // encode 286 + let mut buffer = Vec::new(); 287 + let mut encoder = ItemEncoder::new(&mut buffer); 288 + 289 + for item in &items { 290 + encoder.encode(item).unwrap(); 291 + } 292 + encoder.finish().unwrap(); 293 + 294 + // decode 295 + let cursor = Cursor::new(buffer); 296 + let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 297 + 298 + let mut decoded_items = Vec::new(); 299 + while let Some(item) = decoder.decode().unwrap() { 300 + decoded_items.push(item); 301 + } 302 + 303 + assert_eq!(decoded_items.len(), 4); 304 + 305 + for (original, decoded) in items.iter().zip(decoded_items.iter()) { 306 + assert_eq!(original.timestamp, decoded.timestamp); 307 + assert_eq!(original.access().id, decoded.access().id); 308 + assert_eq!( 309 + original.access().value.as_str(), 310 + decoded.access().value.as_str() 311 + ); 312 + } 313 + } 314 + 315 + #[test] 316 + fn test_encoder_decoder_with_iterator() { 317 + let items = vec![ 318 + Item::new( 319 + 2000, 320 + &TestData { 321 + id: 10, 322 + value: "a".to_string(), 323 + }, 324 + ), 325 + Item::new( 326 + 2005, 327 + &TestData { 328 + id: 20, 329 + value: "b".to_string(), 330 + }, 331 + ), 332 + Item::new( 333 + 2012, 334 + &TestData { 335 + id: 30, 336 + value: "c".to_string(), 337 + }, 338 + ), 339 + ]; 340 + 341 + // encode 342 + let mut buffer = Vec::new(); 343 + let mut encoder = ItemEncoder::new(&mut buffer); 344 + 345 + for item in &items { 346 + encoder.encode(item).unwrap(); 347 + } 348 + encoder.finish().unwrap(); 349 + 350 + // decode 351 + let cursor = Cursor::new(buffer); 352 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap(); 353 + 354 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 355 + let decoded_items = decoded_items.unwrap(); 356 + 357 + assert_eq!(decoded_items.len(), 3); 358 + assert_eq!(decoded_items[0].timestamp, 2000); 359 + assert_eq!(decoded_items[1].timestamp, 2005); 360 + assert_eq!(decoded_items[2].timestamp, 2012); 361 + 362 + assert_eq!(decoded_items[0].access().id, 10); 363 + assert_eq!(decoded_items[1].access().id, 20); 364 + assert_eq!(decoded_items[2].access().id, 30); 365 + } 366 + 367 + #[test] 368 + fn test_delta_compression() { 369 + let items = vec![ 370 + Item::new( 371 + 1000, 372 + &TestData { 373 + id: 1, 374 + value: "a".to_string(), 375 + }, 376 + ), 377 + Item::new( 378 + 1010, 379 + &TestData { 380 + id: 2, 381 + value: "b".to_string(), 382 + }, 383 + ), // delta = 10 384 + Item::new( 385 + 1020, 386 + &TestData { 387 + id: 3, 388 + value: "c".to_string(), 389 + }, 390 + ), // delta = 10, delta-of-delta = 0 391 + Item::new( 392 + 1025, 393 + &TestData { 394 + id: 4, 395 + value: "d".to_string(), 396 + }, 397 + ), // delta = 5, delta-of-delta = -5 398 + ]; 399 + 400 + let mut buffer = Vec::new(); 401 + let mut encoder = ItemEncoder::new(&mut buffer); 402 + 403 + for item in &items { 404 + encoder.encode(item).unwrap(); 405 + } 406 + encoder.finish().unwrap(); 407 + 408 + // decode and verify 409 + let cursor = Cursor::new(buffer); 410 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 411 + 412 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 413 + let decoded_items = decoded_items.unwrap(); 414 + 415 + for (original, decoded) in items.iter().zip(decoded_items.iter()) { 416 + assert_eq!(original.timestamp, decoded.timestamp); 417 + assert_eq!(original.access().id, decoded.access().id); 418 + } 419 + } 420 + 421 + #[test] 422 + fn test_empty_decode() { 423 + let buffer = Vec::new(); 424 + let cursor = Cursor::new(buffer); 425 + let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 426 + 427 + let result = decoder.decode().unwrap(); 428 + assert!(result.is_none()); 429 + } 430 + 431 + #[test] 432 + fn test_backwards_timestamp() { 433 + let items = vec![ 434 + Item::new( 435 + 1000, 436 + &TestData { 437 + id: 1, 438 + value: "first".to_string(), 439 + }, 440 + ), 441 + Item::new( 442 + 900, 443 + &TestData { 444 + id: 2, 445 + value: "second".to_string(), 446 + }, 447 + ), 448 + ]; 449 + 450 + let mut buffer = Vec::new(); 451 + let mut encoder = ItemEncoder::new(&mut buffer); 452 + 453 + for item in &items { 454 + encoder.encode(item).unwrap(); 455 + } 456 + encoder.finish().unwrap(); 457 + 458 + let cursor = Cursor::new(buffer); 459 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 460 + 461 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 462 + let decoded_items = decoded_items.unwrap(); 463 + 464 + assert_eq!(decoded_items.len(), 2); 465 + assert_eq!(decoded_items[0].timestamp, 1000); 466 + assert_eq!(decoded_items[1].timestamp, 900); 467 + } 468 + 469 + #[test] 470 + fn test_different_data_sizes() { 471 + let small_data = TestData { 472 + id: 1, 473 + value: "x".to_string(), 474 + }; 475 + let large_data = TestData { 476 + id: 2, 477 + value: "a".repeat(1000), 478 + }; 479 + 480 + let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)]; 481 + 482 + let mut buffer = Vec::new(); 483 + let mut encoder = ItemEncoder::new(&mut buffer); 484 + 485 + for item in &items { 486 + encoder.encode(item).unwrap(); 487 + } 488 + encoder.finish().unwrap(); 489 + 490 + let cursor = Cursor::new(buffer); 491 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 492 + 493 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 494 + let decoded_items = decoded_items.unwrap(); 495 + 496 + assert_eq!(decoded_items.len(), 2); 497 + assert_eq!(decoded_items[0].access().value.as_str(), "x"); 498 + assert_eq!(decoded_items[1].access().value.len(), 1000); 499 + assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000)); 500 + } 501 + }
+424
server/src/db_old/mod.rs
··· 1 + use std::{ 2 + io::Cursor, 3 + ops::{Bound, Deref, RangeBounds}, 4 + path::Path, 5 + sync::{ 6 + Arc, 7 + atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering}, 8 + }, 9 + time::{Duration, Instant}, 10 + }; 11 + 12 + use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice}; 13 + use ordered_varint::Variable; 14 + use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 15 + use smol_str::SmolStr; 16 + use tokio::sync::broadcast; 17 + 18 + use crate::{ 19 + db_old::block::{ReadVariableExt, WriteVariableExt}, 20 + error::{AppError, AppResult}, 21 + jetstream::JetstreamEvent, 22 + utils::{DefaultRateTracker, get_time}, 23 + }; 24 + 25 + mod block; 26 + 27 + #[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 28 + #[rkyv(compare(PartialEq), derive(Debug))] 29 + pub struct NsidCounts { 30 + pub count: u128, 31 + pub deleted_count: u128, 32 + pub last_seen: u64, 33 + } 34 + 35 + #[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 36 + #[rkyv(compare(PartialEq), derive(Debug))] 37 + pub struct NsidHit { 38 + pub deleted: bool, 39 + } 40 + 41 + #[derive(Clone)] 42 + pub struct EventRecord { 43 + pub nsid: SmolStr, 44 + pub timestamp: u64, // seconds 45 + pub deleted: bool, 46 + } 47 + 48 + impl EventRecord { 49 + pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 50 + match event { 51 + JetstreamEvent::Commit { 52 + time_us, commit, .. 53 + } => Some(Self { 54 + nsid: commit.collection.into(), 55 + timestamp: time_us / 1_000_000, 56 + deleted: false, 57 + }), 58 + JetstreamEvent::Delete { 59 + time_us, commit, .. 60 + } => Some(Self { 61 + nsid: commit.collection.into(), 62 + timestamp: time_us / 1_000_000, 63 + deleted: true, 64 + }), 65 + _ => None, 66 + } 67 + } 68 + } 69 + 70 + type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; 71 + type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>; 72 + type Item = block::Item<NsidHit>; 73 + 74 + pub struct LexiconHandle { 75 + tree: Partition, 76 + buf: Arc<scc::Queue<EventRecord>>, 77 + buf_len: AtomicUsize, 78 + last_insert: AtomicU64, 79 + eps: DefaultRateTracker, 80 + block_size: AtomicUsize, 81 + } 82 + 83 + impl LexiconHandle { 84 + fn new(keyspace: &Keyspace, nsid: &str) -> Self { 85 + let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9)); 86 + Self { 87 + tree: keyspace.open_partition(nsid, opts).unwrap(), 88 + buf: Default::default(), 89 + buf_len: AtomicUsize::new(0), 90 + last_insert: AtomicU64::new(0), 91 + eps: DefaultRateTracker::new(Duration::from_secs(5)), 92 + block_size: AtomicUsize::new(1000), 93 + } 94 + } 95 + 96 + fn item_count(&self) -> usize { 97 + self.buf_len.load(AtomicOrdering::Acquire) 98 + } 99 + 100 + fn last_insert(&self) -> u64 { 101 + self.last_insert.load(AtomicOrdering::Acquire) 102 + } 103 + 104 + fn suggested_block_size(&self) -> usize { 105 + self.block_size.load(AtomicOrdering::Relaxed) 106 + } 107 + 108 + fn insert(&self, event: EventRecord) { 109 + self.buf.push(event); 110 + self.buf_len.fetch_add(1, AtomicOrdering::Release); 111 + self.last_insert 112 + .store(get_time().as_millis() as u64, AtomicOrdering::Release); 113 + self.eps.observe(1); 114 + let rate = self.eps.rate() as usize; 115 + if rate != 0 { 116 + self.block_size.store(rate * 60, AtomicOrdering::Relaxed); 117 + } 118 + } 119 + 120 + fn sync(&self, max_block_size: usize) -> AppResult<usize> { 121 + let mut writer = ItemEncoder::new(Vec::with_capacity( 122 + size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(), 123 + )); 124 + let mut start_timestamp = None; 125 + let mut end_timestamp = None; 126 + let mut written = 0_usize; 127 + while let Some(event) = self.buf.pop() { 128 + let item = Item::new( 129 + event.timestamp, 130 + &NsidHit { 131 + deleted: event.deleted, 132 + }, 133 + ); 134 + writer.encode(&item)?; 135 + if start_timestamp.is_none() { 136 + start_timestamp = Some(event.timestamp); 137 + } 138 + end_timestamp = Some(event.timestamp); 139 + if written >= max_block_size { 140 + break; 141 + } 142 + written += 1; 143 + } 144 + if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) { 145 + self.buf_len.store(0, AtomicOrdering::Release); 146 + let value = writer.finish()?; 147 + let mut key = Vec::with_capacity(size_of::<u64>() * 2); 148 + key.write_varint(start_timestamp)?; 149 + key.write_varint(end_timestamp)?; 150 + self.tree.insert(key, value)?; 151 + } 152 + Ok(written) 153 + } 154 + } 155 + 156 + type BoxedIter<T> = Box<dyn Iterator<Item = T>>; 157 + 158 + // counts is nsid -> NsidCounts 159 + // hits is tree per nsid: varint start time + varint end time -> block of hits 160 + pub struct Db { 161 + inner: Keyspace, 162 + hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 163 + counts: Partition, 164 + event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 165 + eps: DefaultRateTracker, 166 + min_block_size: usize, 167 + max_block_size: usize, 168 + max_last_activity: Duration, 169 + } 170 + 171 + impl Db { 172 + pub fn new(path: impl AsRef<Path>) -> AppResult<Self> { 173 + tracing::info!("opening db..."); 174 + let ks = Config::new(path) 175 + .cache_size(8 * 1024 * 1024) // from talna 176 + .open()?; 177 + Ok(Self { 178 + hits: Default::default(), 179 + counts: ks.open_partition( 180 + "_counts", 181 + PartitionCreateOptions::default().compression(fjall::CompressionType::None), 182 + )?, 183 + inner: ks, 184 + event_broadcaster: broadcast::channel(1000).0, 185 + eps: DefaultRateTracker::new(Duration::from_secs(1)), 186 + min_block_size: 512, 187 + max_block_size: 100_000, 188 + max_last_activity: Duration::from_secs(10), 189 + }) 190 + } 191 + 192 + pub fn sync(&self, all: bool) -> AppResult<()> { 193 + let _guard = scc::ebr::Guard::new(); 194 + for (nsid, tree) in self.hits.iter(&_guard) { 195 + let count = tree.item_count(); 196 + let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size()); 197 + let is_too_old = (get_time().as_millis() as u64 - tree.last_insert()) 198 + > self.max_last_activity.as_millis() as u64; 199 + if count > 0 && (all || is_max_block_size || is_too_old) { 200 + loop { 201 + let synced = tree.sync(self.max_block_size)?; 202 + if synced == 0 { 203 + break; 204 + } 205 + tracing::info!("synced {synced} of {nsid} to db"); 206 + } 207 + } 208 + } 209 + Ok(()) 210 + } 211 + 212 + #[inline(always)] 213 + pub fn eps(&self) -> usize { 214 + self.eps.rate() as usize 215 + } 216 + 217 + #[inline(always)] 218 + pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 219 + self.event_broadcaster.subscribe() 220 + } 221 + 222 + #[inline(always)] 223 + fn maybe_run_in_nsid_tree<T>( 224 + &self, 225 + nsid: &str, 226 + f: impl FnOnce(&LexiconHandle) -> T, 227 + ) -> Option<T> { 228 + let _guard = scc::ebr::Guard::new(); 229 + let handle = match self.hits.peek(nsid, &_guard) { 230 + Some(handle) => handle.clone(), 231 + None => { 232 + if self.inner.partition_exists(nsid) { 233 + let handle = Arc::new(LexiconHandle::new(&self.inner, nsid)); 234 + let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 235 + handle 236 + } else { 237 + return None; 238 + } 239 + } 240 + }; 241 + Some(f(&handle)) 242 + } 243 + 244 + #[inline(always)] 245 + fn run_in_nsid_tree<T>( 246 + &self, 247 + nsid: SmolStr, 248 + f: impl FnOnce(&LexiconHandle) -> AppResult<T>, 249 + ) -> AppResult<T> { 250 + f(self 251 + .hits 252 + .entry(nsid.clone()) 253 + .or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid))) 254 + .get()) 255 + } 256 + 257 + pub fn record_event(&self, e: EventRecord) -> AppResult<()> { 258 + let EventRecord { 259 + nsid, 260 + timestamp, 261 + deleted, 262 + } = e.clone(); 263 + 264 + // insert event 265 + self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?; 266 + // increment count 267 + let mut counts = self.get_count(&nsid)?; 268 + counts.last_seen = timestamp; 269 + if deleted { 270 + counts.deleted_count += 1; 271 + } else { 272 + counts.count += 1; 273 + } 274 + self.insert_count(&nsid, counts.clone())?; 275 + if self.event_broadcaster.receiver_count() > 0 { 276 + let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts)); 277 + } 278 + self.eps.observe(1); 279 + Ok(()) 280 + } 281 + 282 + #[inline(always)] 283 + fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> { 284 + self.counts 285 + .insert( 286 + nsid, 287 + unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(), 288 + ) 289 + .map_err(AppError::from) 290 + } 291 + 292 + pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 293 + let Some(raw) = self.counts.get(nsid)? else { 294 + return Ok(NsidCounts::default()); 295 + }; 296 + Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 297 + } 298 + 299 + pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 300 + self.counts.iter().map(|res| { 301 + res.map_err(AppError::from).map(|(key, val)| { 302 + ( 303 + SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 304 + unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 305 + ) 306 + }) 307 + }) 308 + } 309 + 310 + pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> { 311 + self.inner 312 + .list_partitions() 313 + .into_iter() 314 + .filter(|k| k.deref() != "_counts") 315 + } 316 + 317 + pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> { 318 + self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> { 319 + Box::new( 320 + handle 321 + .tree 322 + .iter() 323 + .rev() 324 + .map(|res| res.map_err(AppError::from)), 325 + ) 326 + }) 327 + .unwrap_or_else(|| Box::new(std::iter::empty())) 328 + } 329 + 330 + pub fn get_hits( 331 + &self, 332 + nsid: &str, 333 + range: impl RangeBounds<u64> + std::fmt::Debug, 334 + ) -> BoxedIter<AppResult<Item>> { 335 + let start = range 336 + .start_bound() 337 + .cloned() 338 + .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 339 + let end = range 340 + .end_bound() 341 + .cloned() 342 + .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 343 + let limit = match range.end_bound().cloned() { 344 + Bound::Included(end) => end, 345 + Bound::Excluded(end) => end.saturating_sub(1), 346 + Bound::Unbounded => u64::MAX, 347 + }; 348 + 349 + self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> { 350 + let map_block = move |(key, val)| { 351 + let mut key_reader = Cursor::new(key); 352 + let start_timestamp = key_reader.read_varint::<u64>()?; 353 + let items = 354 + ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| { 355 + item.as_ref().map_or(true, |item| item.timestamp <= limit) 356 + }); 357 + Ok(items) 358 + }; 359 + 360 + Box::new( 361 + handle 362 + .tree 363 + .range(TimestampRange { start, end }) 364 + .map(move |res| res.map_err(AppError::from).and_then(map_block)) 365 + .flatten() 366 + .flatten(), 367 + ) 368 + }) 369 + .unwrap_or_else(|| Box::new(std::iter::empty())) 370 + } 371 + 372 + pub fn tracking_since(&self) -> AppResult<u64> { 373 + // HACK: we should actually store when we started tracking but im lazy 374 + // should be accurate enough 375 + self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| { 376 + let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else { 377 + return Ok(0); 378 + }; 379 + let mut timestamp_reader = Cursor::new(timestamps_raw); 380 + timestamp_reader 381 + .read_varint::<u64>() 382 + .map_err(AppError::from) 383 + }) 384 + .unwrap_or(Ok(0)) 385 + } 386 + } 387 + 388 + type TimestampRepr = Vec<u8>; 389 + 390 + struct TimestampRange { 391 + start: Bound<TimestampRepr>, 392 + end: Bound<TimestampRepr>, 393 + } 394 + 395 + impl RangeBounds<TimestampRepr> for TimestampRange { 396 + #[inline(always)] 397 + fn start_bound(&self) -> Bound<&TimestampRepr> { 398 + self.start.as_ref() 399 + } 400 + 401 + #[inline(always)] 402 + fn end_bound(&self) -> Bound<&TimestampRepr> { 403 + self.end.as_ref() 404 + } 405 + } 406 + 407 + type TimestampReprOld = [u8; 8]; 408 + 409 + struct TimestampRangeOld { 410 + start: Bound<TimestampReprOld>, 411 + end: Bound<TimestampReprOld>, 412 + } 413 + 414 + impl RangeBounds<TimestampReprOld> for TimestampRangeOld { 415 + #[inline(always)] 416 + fn start_bound(&self) -> Bound<&TimestampReprOld> { 417 + self.start.as_ref() 418 + } 419 + 420 + #[inline(always)] 421 + fn end_bound(&self) -> Bound<&TimestampReprOld> { 422 + self.end.as_ref() 423 + } 424 + }
+11 -21
server/src/jetstream.rs
··· 13 13 pub struct JetstreamClient { 14 14 stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>, 15 15 tls_connector: tokio_websockets::Connector, 16 - urls: Vec<SmolStr>, 16 + url: SmolStr, 17 17 } 18 18 19 19 impl JetstreamClient { 20 - pub fn new(urls: impl IntoIterator<Item = impl Into<SmolStr>>) -> AppResult<Self> { 20 + pub fn new(url: &str) -> AppResult<Self> { 21 21 Ok(Self { 22 22 stream: None, 23 23 tls_connector: tokio_websockets::Connector::new()?, 24 - urls: urls.into_iter().map(Into::into).collect(), 24 + url: SmolStr::new(url), 25 25 }) 26 26 } 27 27 28 28 pub async fn connect(&mut self) -> AppResult<()> { 29 - for uri in &self.urls { 30 - let conn_result = ClientBuilder::new() 31 - .connector(&self.tls_connector) 32 - .uri(uri)? 33 - .connect() 34 - .await; 35 - match conn_result { 36 - Ok((stream, _)) => { 37 - self.stream = Some(stream); 38 - tracing::info!("connected to jetstream {}", uri); 39 - return Ok(()); 40 - } 41 - Err(err) => { 42 - tracing::error!("failed to connect to jetstream {uri}: {err}"); 43 - } 44 - }; 45 - } 46 - Err(anyhow!("failed to connect to any jetstream server").into()) 29 + let (stream, _) = ClientBuilder::new() 30 + .connector(&self.tls_connector) 31 + .uri(&self.url)? 32 + .connect() 33 + .await?; 34 + self.stream = Some(stream); 35 + tracing::info!("connected to jetstream ({})", self.url); 36 + Ok(()) 47 37 } 48 38 49 39 // automatically retries connection, only returning error if it fails many times
+21 -48
server/src/main.rs
··· 1 - use std::{ops::Deref, time::Duration, u64, usize}; 1 + use std::{ops::Deref, time::Duration, u64}; 2 2 3 3 use itertools::Itertools; 4 4 use rclite::Arc; ··· 17 17 18 18 mod api; 19 19 mod db; 20 + mod db_old; 20 21 mod error; 21 22 mod jetstream; 22 23 mod utils; ··· 25 26 #[global_allocator] 26 27 static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; 27 28 29 + #[cfg(target_env = "msvc")] 30 + #[global_allocator] 31 + static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; 32 + 28 33 #[tokio::main] 29 34 async fn main() { 30 35 tracing_subscriber::fmt::fmt() ··· 49 54 debug(); 50 55 return; 51 56 } 52 - Some("print") => { 53 - print_all(); 54 - return; 55 - } 56 57 Some(x) => { 57 58 tracing::error!("unknown command: {}", x); 58 59 return; ··· 70 71 .install_default() 71 72 .expect("cant install rustls crypto provider"); 72 73 73 - let urls = [ 74 - "wss://jetstream1.us-west.bsky.network/subscribe", 75 - "wss://jetstream2.us-west.bsky.network/subscribe", 76 - "wss://jetstream2.fr.hose.cam/subscribe", 77 - "wss://jetstream.fire.hose.cam/subscribe", 78 - ]; 79 - let mut jetstream = match JetstreamClient::new(urls) { 80 - Ok(client) => client, 81 - Err(err) => { 82 - tracing::error!("can't create jetstream client: {err}"); 83 - return; 84 - } 85 - }; 74 + let mut jetstream = 75 + match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") { 76 + Ok(client) => client, 77 + Err(err) => { 78 + tracing::error!("can't create jetstream client: {err}"); 79 + return; 80 + } 81 + }; 86 82 87 83 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 88 84 let consume_events = tokio::spawn({ ··· 111 107 move || { 112 108 let mut buffer = Vec::new(); 113 109 loop { 114 - let read = event_rx.blocking_recv_many(&mut buffer, 500); 110 + let read = event_rx.blocking_recv_many(&mut buffer, 100); 115 111 if let Err(err) = db.ingest_events(buffer.drain(..)) { 116 112 tracing::error!("failed to ingest events: {}", err); 117 113 } ··· 157 153 if db.is_shutting_down() { 158 154 return; 159 155 } 160 - let end = get_time(); 156 + let end = get_time() - compact_period / 2; 161 157 let start = end - compact_period; 162 158 let range = start.as_secs()..end.as_secs(); 163 159 tracing::info!( ··· 212 208 db.sync(true).expect("cant sync db"); 213 209 } 214 210 215 - fn print_all() { 216 - let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 217 - let nsids = db.get_nsids().collect::<Vec<_>>(); 218 - let mut count = 0_usize; 219 - for nsid in nsids { 220 - println!("{}:", nsid.deref()); 221 - for hit in db.get_hits(&nsid, .., usize::MAX) { 222 - let hit = hit.expect("aaa"); 223 - println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted); 224 - count += 1; 225 - } 226 - } 227 - println!("total hits: {}", count); 228 - } 229 - 230 211 fn debug() { 231 212 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 232 213 let info = db.info().expect("cant get db info"); ··· 279 260 280 261 fn migrate() { 281 262 let cancel_token = CancellationToken::new(); 282 - let from = Arc::new( 283 - Db::new( 284 - DbConfig::default().path(".fjall_data_from"), 285 - cancel_token.child_token(), 286 - ) 287 - .expect("couldnt create db"), 288 - ); 263 + 264 + let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db")); 265 + 289 266 let to = Arc::new( 290 267 Db::new( 291 268 DbConfig::default().path(".fjall_data_to").ks(|c| { ··· 300 277 ); 301 278 302 279 let nsids = from.get_nsids().collect::<Vec<_>>(); 303 - let _eps_thread = std::thread::spawn({ 280 + let eps_thread = std::thread::spawn({ 304 281 let to = to.clone(); 305 282 move || { 306 283 loop { ··· 320 297 threads.push(std::thread::spawn(move || { 321 298 tracing::info!("{}: migrating...", nsid.deref()); 322 299 let mut count = 0_u64; 323 - for hits in from 324 - .get_hits(&nsid, .., usize::MAX) 325 - .chunks(100000) 326 - .into_iter() 327 - { 300 + for hits in from.get_hits(&nsid, ..).chunks(100000).into_iter() { 328 301 to.ingest_events(hits.map(|hit| { 329 302 count += 1; 330 303 let hit = hit.expect("cant decode hit");
-66
server/src/utils.rs
··· 1 1 use std::io::{self, Read, Write}; 2 - use std::ops::Deref; 3 2 use std::sync::atomic::{AtomicU64, Ordering}; 4 3 use std::time::Duration; 5 4 6 - use arc_swap::RefCnt; 7 5 use byteview::ByteView; 8 6 use ordered_varint::Variable; 9 - use rclite::Arc; 10 7 11 8 pub fn get_time() -> Duration { 12 9 std::time::SystemTime::now() ··· 323 320 } 324 321 } 325 322 } 326 - 327 - pub type ArcliteSwap<T> = arc_swap::ArcSwapAny<ArcRefCnt<T>>; 328 - 329 - pub struct ArcRefCnt<T>(Arc<T>); 330 - 331 - impl<T> ArcRefCnt<T> { 332 - pub fn new(value: T) -> Self { 333 - Self(Arc::new(value)) 334 - } 335 - } 336 - 337 - impl<T> Deref for ArcRefCnt<T> { 338 - type Target = T; 339 - 340 - fn deref(&self) -> &Self::Target { 341 - &self.0 342 - } 343 - } 344 - 345 - impl<T> Clone for ArcRefCnt<T> { 346 - fn clone(&self) -> Self { 347 - Self(self.0.clone()) 348 - } 349 - } 350 - 351 - // SAFETY: uhhhhhhhh copied the Arc impl from arc_swap xd 352 - unsafe impl<T> RefCnt for ArcRefCnt<T> { 353 - type Base = T; 354 - 355 - fn into_ptr(me: Self) -> *mut Self::Base { 356 - Arc::into_raw(me.0) as *mut T 357 - } 358 - 359 - fn as_ptr(me: &Self) -> *mut Self::Base { 360 - // Slightly convoluted way to do this, but this avoids stacked borrows violations. The same 361 - // intention as 362 - // 363 - // me as &T as *const T as *mut T 364 - // 365 - // We first create a "shallow copy" of me - one that doesn't really own its ref count 366 - // (that's OK, me _does_ own it, so it can't be destroyed in the meantime). 367 - // Then we can use into_raw (which preserves not having the ref count). 368 - // 369 - // We need to "revert" the changes we did. In current std implementation, the combination 370 - // of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw 371 - // and that read shall be paired with forget to properly "close the brackets". In future 372 - // versions of STD, these may become something else that's not really no-op (unlikely, but 373 - // possible), so we future-proof it a bit. 374 - 375 - // SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads 376 - let ptr = Arc::into_raw(unsafe { std::ptr::read(&me.0) }); 377 - let ptr = ptr as *mut T; 378 - 379 - // SAFETY: We got the pointer from into_raw just above 380 - std::mem::forget(unsafe { Arc::from_raw(ptr) }); 381 - 382 - ptr 383 - } 384 - 385 - unsafe fn from_ptr(ptr: *const Self::Base) -> Self { 386 - Self(unsafe { Arc::from_raw(ptr) }) 387 - } 388 - }