tracks lexicons and how many times they appeared on the jetstream

Compare changes

Choose any two refs to compare.

+12 -3
README.md
··· 1 - a webapp and server that monitors the jetstream and tracks the different lexicons as they are created or deleted. 2 - it shows you which collections are most active on the network. 3 4 for backend it uses rust with fjall as db, the frontend is built with sveltekit. 5 6 see [here](https://gaze.systems/nsid-tracker) for a hosted instance of it. 7 8 ## running 9 10 ### with nix 11 12 - - run the server: `nix run git+https://tangled.sh/@poor.dog/nsid-tracker#server` 13 - build the client: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#client` 14 15 ### manually
··· 1 + a webapp and server that monitors the jetstream and tracks the different 2 + lexicons as they are created or deleted. it shows you which collections are most 3 + active on the network. 4 5 for backend it uses rust with fjall as db, the frontend is built with sveltekit. 6 7 see [here](https://gaze.systems/nsid-tracker) for a hosted instance of it. 8 9 + ## performance / storage 10 + 11 + it uses about 50MB of space for 620M recorded events (events being just 12 + timestamp in seconds and deleted boolean for now). and around 50-60ms for 13 + querying 300-400k events. 14 + 15 + this is on a machine with AMD EPYC 7281 (32) @ 2.100GHz. 16 + 17 ## running 18 19 ### with nix 20 21 + - build the server: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#server` 22 - build the client: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#client` 23 24 ### manually
+9
client/bun.lock
··· 5 "name": "nsid-tracker", 6 "dependencies": { 7 "@number-flow/svelte": "^0.3.9", 8 }, 9 "devDependencies": { 10 "@eslint/compat": "^1.2.5", ··· 353 354 "globals": ["globals@16.3.0", "", {}, "sha512-bqWEnJ1Nt3neqx2q5SFfGS8r/ahumIakg3HcwtNlrVlwXIeNumWn/c7Pn/wKzGhf6SaW6H6uWXLqC30STCMchQ=="], 355 356 "graceful-fs": ["graceful-fs@4.2.11", "", {}, "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="], 357 358 "graphemer": ["graphemer@1.4.0", "", {}, "sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag=="], ··· 511 512 "svelte": ["svelte@5.36.8", "", { "dependencies": { "@ampproject/remapping": "^2.3.0", "@jridgewell/sourcemap-codec": "^1.5.0", "@sveltejs/acorn-typescript": "^1.0.5", "@types/estree": "^1.0.5", "acorn": "^8.12.1", "aria-query": "^5.3.1", "axobject-query": "^4.1.0", "clsx": "^2.1.1", "esm-env": "^1.2.1", "esrap": "^2.1.0", "is-reference": "^3.0.3", "locate-character": "^3.0.0", "magic-string": "^0.30.11", "zimmerframe": "^1.1.2" } }, "sha512-8JbZWQu96hMjH/oYQPxXW6taeC6Awl6muGHeZzJTxQx7NGRQ/J9wN1hkzRKLOlSDlbS2igiFg7p5xyTp5uXG3A=="], 513 514 "svelte-check": ["svelte-check@4.3.0", "", { "dependencies": { "@jridgewell/trace-mapping": "^0.3.25", "chokidar": "^4.0.1", "fdir": "^6.2.0", "picocolors": "^1.0.0", "sade": "^1.7.4" }, "peerDependencies": { "svelte": "^4.0.0 || ^5.0.0-next.0", "typescript": ">=5.0.0" }, "bin": { "svelte-check": "bin/svelte-check" } }, "sha512-Iz8dFXzBNAM7XlEIsUjUGQhbEE+Pvv9odb9+0+ITTgFWZBGeJRRYqHUUglwe2EkLD5LIsQaAc4IUJyvtKuOO5w=="], 515 516 "svelte-eslint-parser": ["svelte-eslint-parser@1.3.0", "", { "dependencies": { "eslint-scope": "^8.2.0", "eslint-visitor-keys": "^4.0.0", "espree": "^10.0.0", "postcss": "^8.4.49", "postcss-scss": "^4.0.9", "postcss-selector-parser": "^7.0.0" }, "peerDependencies": { "svelte": "^3.37.0 || ^4.0.0 || ^5.0.0" }, "optionalPeers": ["svelte"] }, "sha512-VCgMHKV7UtOGcGLGNFSbmdm6kEKjtzo5nnpGU/mnx4OsFY6bZ7QwRF5DUx+Hokw5Lvdyo8dpk8B1m8mliomrNg=="], ··· 520 "tapable": ["tapable@2.2.2", "", {}, "sha512-Re10+NauLTMCudc7T5WLFLAwDhQ0JWdrMK+9B2M8zR5hRExKmsRDCBA7/aV/pNJFltmBFO5BAMlQFi/vq3nKOg=="], 521 522 "tar": ["tar@7.4.3", "", { "dependencies": { "@isaacs/fs-minipass": "^4.0.0", "chownr": "^3.0.0", "minipass": "^7.1.2", "minizlib": "^3.0.1", "mkdirp": "^3.0.1", "yallist": "^5.0.0" } }, "sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw=="], 523 524 "tinyglobby": ["tinyglobby@0.2.14", "", { "dependencies": { "fdir": "^6.4.4", "picomatch": "^4.0.2" } }, "sha512-tX5e7OM1HnYr2+a2C/4V0htOcSQcoSTH9KgJnVvNm5zm/cyEWKJ7j7YutsH9CxMdtOkkLFy2AHrMci9IM8IPZQ=="], 525
··· 5 "name": "nsid-tracker", 6 "dependencies": { 7 "@number-flow/svelte": "^0.3.9", 8 + "svelte-adapter-bun": "^0.5.2", 9 }, 10 "devDependencies": { 11 "@eslint/compat": "^1.2.5", ··· 354 355 "globals": ["globals@16.3.0", "", {}, "sha512-bqWEnJ1Nt3neqx2q5SFfGS8r/ahumIakg3HcwtNlrVlwXIeNumWn/c7Pn/wKzGhf6SaW6H6uWXLqC30STCMchQ=="], 356 357 + "globalyzer": ["globalyzer@0.1.0", "", {}, "sha512-40oNTM9UfG6aBmuKxk/giHn5nQ8RVz/SS4Ir6zgzOv9/qC3kKZ9v4etGTcJbEl/NyVQH7FGU7d+X1egr57Md2Q=="], 358 + 359 + "globrex": ["globrex@0.1.2", "", {}, "sha512-uHJgbwAMwNFf5mLst7IWLNg14x1CkeqglJb/K3doi4dw6q2IvAAmM/Y81kevy83wP+Sst+nutFTYOGg3d1lsxg=="], 360 + 361 "graceful-fs": ["graceful-fs@4.2.11", "", {}, "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="], 362 363 "graphemer": ["graphemer@1.4.0", "", {}, "sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag=="], ··· 516 517 "svelte": ["svelte@5.36.8", "", { "dependencies": { "@ampproject/remapping": "^2.3.0", "@jridgewell/sourcemap-codec": "^1.5.0", "@sveltejs/acorn-typescript": "^1.0.5", "@types/estree": "^1.0.5", "acorn": "^8.12.1", "aria-query": "^5.3.1", "axobject-query": "^4.1.0", "clsx": "^2.1.1", "esm-env": "^1.2.1", "esrap": "^2.1.0", "is-reference": "^3.0.3", "locate-character": "^3.0.0", "magic-string": "^0.30.11", "zimmerframe": "^1.1.2" } }, "sha512-8JbZWQu96hMjH/oYQPxXW6taeC6Awl6muGHeZzJTxQx7NGRQ/J9wN1hkzRKLOlSDlbS2igiFg7p5xyTp5uXG3A=="], 518 519 + "svelte-adapter-bun": ["svelte-adapter-bun@0.5.2", "", { "dependencies": { "tiny-glob": "^0.2.9" } }, "sha512-xEtFgaal6UgrCwwkSIcapO9kopoFNUYCYqyKCikdqxX9bz2TDYnrWQZ7qBnkunMxi1HOIERUCvTcebYGiarZLA=="], 520 + 521 "svelte-check": ["svelte-check@4.3.0", "", { "dependencies": { "@jridgewell/trace-mapping": "^0.3.25", "chokidar": "^4.0.1", "fdir": "^6.2.0", "picocolors": "^1.0.0", "sade": "^1.7.4" }, "peerDependencies": { "svelte": "^4.0.0 || ^5.0.0-next.0", "typescript": ">=5.0.0" }, "bin": { "svelte-check": "bin/svelte-check" } }, "sha512-Iz8dFXzBNAM7XlEIsUjUGQhbEE+Pvv9odb9+0+ITTgFWZBGeJRRYqHUUglwe2EkLD5LIsQaAc4IUJyvtKuOO5w=="], 522 523 "svelte-eslint-parser": ["svelte-eslint-parser@1.3.0", "", { "dependencies": { "eslint-scope": "^8.2.0", "eslint-visitor-keys": "^4.0.0", "espree": "^10.0.0", "postcss": "^8.4.49", "postcss-scss": "^4.0.9", "postcss-selector-parser": "^7.0.0" }, "peerDependencies": { "svelte": "^3.37.0 || ^4.0.0 || ^5.0.0" }, "optionalPeers": ["svelte"] }, "sha512-VCgMHKV7UtOGcGLGNFSbmdm6kEKjtzo5nnpGU/mnx4OsFY6bZ7QwRF5DUx+Hokw5Lvdyo8dpk8B1m8mliomrNg=="], ··· 527 "tapable": ["tapable@2.2.2", "", {}, "sha512-Re10+NauLTMCudc7T5WLFLAwDhQ0JWdrMK+9B2M8zR5hRExKmsRDCBA7/aV/pNJFltmBFO5BAMlQFi/vq3nKOg=="], 528 529 "tar": ["tar@7.4.3", "", { "dependencies": { "@isaacs/fs-minipass": "^4.0.0", "chownr": "^3.0.0", "minipass": "^7.1.2", "minizlib": "^3.0.1", "mkdirp": "^3.0.1", "yallist": "^5.0.0" } }, "sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw=="], 530 + 531 + "tiny-glob": ["tiny-glob@0.2.9", "", { "dependencies": { "globalyzer": "0.1.0", "globrex": "^0.1.2" } }, "sha512-g/55ssRPUjShh+xkfx9UPDXqhckHEsHr4Vd9zX55oSdGZc/MD0m3sferOkwWtp98bv+kcVfEHtRJgBVJzelrzg=="], 532 533 "tinyglobby": ["tinyglobby@0.2.14", "", { "dependencies": { "fdir": "^6.4.4", "picomatch": "^4.0.2" } }, "sha512-tX5e7OM1HnYr2+a2C/4V0htOcSQcoSTH9KgJnVvNm5zm/cyEWKJ7j7YutsH9CxMdtOkkLFy2AHrMci9IM8IPZQ=="], 534
+2 -1
client/package.json
··· 31 }, 32 "type": "module", 33 "dependencies": { 34 - "@number-flow/svelte": "^0.3.9" 35 } 36 }
··· 31 }, 32 "type": "module", 33 "dependencies": { 34 + "@number-flow/svelte": "^0.3.9", 35 + "svelte-adapter-bun": "^0.5.2" 36 } 37 }
+4
client/src/app.css
··· 28 overflow-y: overlay; 29 overflow-y: auto; /* Fallback for browsers that don't support overlay */ 30 }
··· 28 overflow-y: overlay; 29 overflow-y: auto; /* Fallback for browsers that don't support overlay */ 30 } 31 + 32 + .wsbadge { 33 + @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 34 + }
+9 -9
client/src/app.html
··· 1 <!doctype html> 2 <html lang="en"> 3 - <head> 4 - <meta charset="utf-8" /> 5 - <link rel="icon" href="%sveltekit.assets%/favicon.svg" /> 6 - <meta name="viewport" content="width=device-width, initial-scale=1" /> 7 - %sveltekit.head% 8 - </head> 9 - <body data-sveltekit-preload-data="hover"> 10 - <div style="display: contents">%sveltekit.body%</div> 11 - </body> 12 </html>
··· 1 <!doctype html> 2 <html lang="en"> 3 + <head> 4 + <meta charset="utf-8" /> 5 + <link rel="icon" href="%sveltekit.assets%/favicon.svg" /> 6 + <meta name="viewport" content="width=device-width, initial-scale=1" /> 7 + %sveltekit.head% 8 + </head> 9 + <body class="bg-white dark:bg-gray-900" data-sveltekit-preload-data="hover"> 10 + <div style="display: contents">%sveltekit.body%</div> 11 + </body> 12 </html>
+2 -9
client/src/lib/components/BskyToggle.svelte
··· 11 <!-- svelte-ignore a11y_no_static_element_interactions --> 12 <button 13 onclick={onBskyToggle} 14 - class="wsbadge !mt-0 !font-normal bg-yellow-100 hover:bg-yellow-200 border-yellow-300" 15 > 16 <input checked={dontShowBsky} type="checkbox" /> 17 - <span class="ml-0.5"> hide app.bsky.* </span> 18 </button> 19 - 20 - <style lang="postcss"> 21 - @reference "../../app.css"; 22 - .wsbadge { 23 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 24 - } 25 - </style>
··· 11 <!-- svelte-ignore a11y_no_static_element_interactions --> 12 <button 13 onclick={onBskyToggle} 14 + class="wsbadge !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700" 15 > 16 <input checked={dontShowBsky} type="checkbox" /> 17 + <span class="ml-0.5 text-black dark:text-gray-200"> hide app.bsky.* </span> 18 </button>
+8 -5
client/src/lib/components/EventCard.svelte
··· 104 </script> 105 106 <div 107 - class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white border border-gray-200 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform" 108 class:has-activity={isAnimating} 109 style="--border-thickness: {borderThickness}px" 110 > 111 <div class="flex items-start gap-2"> 112 <div 113 - class="text-sm font-bold text-blue-600 bg-blue-100 px-3 py-1 rounded-full" 114 > 115 #{index + 1} 116 </div> 117 <div 118 title={event.nsid} 119 - class="font-mono text-sm text-gray-700 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 border-gray-100 group-hover:border transition-all px-1" 120 > 121 {event.nsid} 122 </div> ··· 136 </div> 137 </div> 138 139 - <style> 140 .has-activity { 141 position: relative; 142 transition: all 0.2s ease-out; 143 } 144 145 .has-activity::before { 146 content: ""; 147 position: absolute; 148 top: calc(-1 * var(--border-thickness)); 149 left: calc(-1 * var(--border-thickness)); 150 right: calc(-1 * var(--border-thickness)); 151 bottom: calc(-1 * var(--border-thickness)); 152 - border: var(--border-thickness) solid rgba(59, 130, 246, 0.8); 153 border-radius: calc(0.5rem + var(--border-thickness)); 154 pointer-events: none; 155 transition: all 0.3s ease-out;
··· 104 </script> 105 106 <div 107 + class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white dark:bg-gray-800/50 border border-gray-200 dark:border-gray-950 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform" 108 class:has-activity={isAnimating} 109 style="--border-thickness: {borderThickness}px" 110 > 111 <div class="flex items-start gap-2"> 112 <div 113 + class="text-sm font-bold text-blue-600 bg-blue-100 dark:bg-indigo-950 px-3 py-1 rounded-full" 114 > 115 #{index + 1} 116 </div> 117 <div 118 title={event.nsid} 119 + class="font-mono text-sm text-gray-700 dark:text-gray-300 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 dark:group-hover:bg-gray-700 border-gray-100 dark:border-gray-900 group-hover:border transition-all px-1" 120 > 121 {event.nsid} 122 </div> ··· 136 </div> 137 </div> 138 139 + <style lang="postcss"> 140 .has-activity { 141 position: relative; 142 transition: all 0.2s ease-out; 143 } 144 145 .has-activity::before { 146 + @reference "../../app.css"; 147 + @apply border-blue-500 dark:border-blue-800; 148 content: ""; 149 position: absolute; 150 top: calc(-1 * var(--border-thickness)); 151 left: calc(-1 * var(--border-thickness)); 152 right: calc(-1 * var(--border-thickness)); 153 bottom: calc(-1 * var(--border-thickness)); 154 + border-width: var(--border-thickness); 155 + border-style: solid; 156 border-radius: calc(0.5rem + var(--border-thickness)); 157 pointer-events: none; 158 transition: all 0.3s ease-out;
+5 -10
client/src/lib/components/FilterControls.svelte
··· 8 </script> 9 10 <div 11 - class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 hover:bg-blue-200 border-blue-300" 12 > 13 - <label for="filter-regex" class="text-blue-800 mr-1"> filter: </label> 14 <input 15 id="filter-regex" 16 value={filterRegex} 17 oninput={(e) => onFilterChange((e.target as HTMLInputElement).value)} 18 type="text" 19 placeholder="regex..." 20 - class="bg-blue-50 text-blue-900 placeholder-blue-400 border border-blue-200 rounded-full px-1 outline-none focus:bg-white focus:border-blue-400 min-w-0 w-24" 21 /> 22 </div> 23 - 24 - <style lang="postcss"> 25 - @reference "../../app.css"; 26 - .wsbadge { 27 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 28 - } 29 - </style>
··· 8 </script> 9 10 <div 11 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700" 12 > 13 + <label for="filter-regex" class="text-blue-800 dark:text-gray-200 mr-1"> 14 + filter: 15 + </label> 16 <input 17 id="filter-regex" 18 value={filterRegex} 19 oninput={(e) => onFilterChange((e.target as HTMLInputElement).value)} 20 type="text" 21 placeholder="regex..." 22 + class="bg-blue-50 dark:bg-blue-950 text-blue-900 dark:text-gray-400 placeholder-blue-400 dark:placeholder-blue-700 border border-blue-200 dark:border-blue-700 rounded-full px-1 outline-none focus:border-blue-400 min-w-0 w-24" 23 /> 24 </div>
+6 -11
client/src/lib/components/RefreshControl.svelte
··· 8 </script> 9 10 <div 11 - class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-green-100 hover:bg-green-200 border-green-300" 12 > 13 - <label for="refresh-rate" class="text-green-800 mr-1">refresh:</label> 14 <input 15 id="refresh-rate" 16 value={refreshRate} ··· 24 pattern="[0-9]*" 25 min="0" 26 placeholder="real-time" 27 - class="bg-green-50 text-green-900 placeholder-green-400 border border-green-200 rounded-full px-1 outline-none focus:bg-white focus:border-green-400 min-w-0 w-20" 28 /> 29 - <span class="text-green-700">s</span> 30 </div> 31 - 32 - <style lang="postcss"> 33 - @reference "../../app.css"; 34 - .wsbadge { 35 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 36 - } 37 - </style>
··· 8 </script> 9 10 <div 11 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-lime-100 dark:bg-lime-900 dark:hover:bg-lime-800 hover:bg-lime-200 border-lime-300 dark:border-lime-700" 12 > 13 + <label for="refresh-rate" class="text-lime-800 dark:text-lime-200 mr-1" 14 + >refresh:</label 15 + > 16 <input 17 id="refresh-rate" 18 value={refreshRate} ··· 26 pattern="[0-9]*" 27 min="0" 28 placeholder="real-time" 29 + class="bg-green-50 dark:bg-green-900 text-lime-900 dark:text-lime-200 placeholder-lime-600 dark:placeholder-lime-400 border border-lime-200 dark:border-lime-700 rounded-full px-1 outline-none focus:border-lime-400 min-w-0 w-20" 30 /> 31 + <span class="text-lime-800 dark:text-lime-200">s</span> 32 </div>
+31
client/src/lib/components/ShowControls.svelte
···
··· 1 + <script lang="ts"> 2 + import type { ShowOption } from "$lib/types"; 3 + 4 + interface Props { 5 + show: ShowOption; 6 + onShowChange: (value: ShowOption) => void; 7 + } 8 + 9 + let { show, onShowChange }: Props = $props(); 10 + 11 + const showOptions: ShowOption[] = ["server init", "stream start"]; 12 + </script> 13 + 14 + <div 15 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-pink-100 dark:bg-pink-800 hover:bg-pink-200 dark:hover:bg-pink-700 border-pink-300 dark:border-pink-700" 16 + > 17 + <label for="show" class="text-pink-800 dark:text-pink-100 mr-1"> 18 + show since: 19 + </label> 20 + <select 21 + id="show" 22 + value={show} 23 + onchange={(e) => 24 + onShowChange((e.target as HTMLSelectElement).value as ShowOption)} 25 + class="bg-pink-50 dark:bg-pink-900 text-pink-900 dark:text-pink-100 border border-pink-200 dark:border-pink-700 rounded-full px-1 outline-none focus:border-pink-400 min-w-0" 26 + > 27 + {#each showOptions as option} 28 + <option value={option}>{option}</option> 29 + {/each} 30 + </select> 31 + </div>
+5 -10
client/src/lib/components/SortControls.svelte
··· 17 </script> 18 19 <div 20 - class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 hover:bg-purple-200 border-purple-300" 21 > 22 - <label for="sort-by" class="text-purple-800 mr-1"> sort by: </label> 23 <select 24 id="sort-by" 25 value={sortBy} 26 onchange={(e) => 27 onSortChange((e.target as HTMLSelectElement).value as SortOption)} 28 - class="bg-purple-50 text-purple-900 border border-purple-200 rounded-full px-1 outline-none focus:bg-white focus:border-purple-400 min-w-0" 29 > 30 {#each sortOptions as option} 31 <option value={option.value}>{option.label}</option> 32 {/each} 33 </select> 34 </div> 35 - 36 - <style lang="postcss"> 37 - @reference "../../app.css"; 38 - .wsbadge { 39 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 40 - } 41 - </style>
··· 17 </script> 18 19 <div 20 + class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 dark:bg-purple-800 hover:bg-purple-200 dark:hover:bg-purple-700 border-purple-300 dark:border-purple-700" 21 > 22 + <label for="sort-by" class="text-purple-800 dark:text-purple-300 mr-1"> 23 + sort by: 24 + </label> 25 <select 26 id="sort-by" 27 value={sortBy} 28 onchange={(e) => 29 onSortChange((e.target as HTMLSelectElement).value as SortOption)} 30 + class="bg-purple-50 dark:bg-purple-900 text-purple-900 dark:text-purple-300 border border-purple-200 dark:border-purple-700 rounded-full px-1 outline-none focus:border-purple-400 min-w-0" 31 > 32 {#each sortOptions as option} 33 <option value={option.value}>{option.label}</option> 34 {/each} 35 </select> 36 </div>
+17 -18
client/src/lib/components/StatsCard.svelte
··· 1 <script lang="ts"> 2 import { formatNumber } from "$lib/format"; 3 - import NumberFlow from "@number-flow/svelte"; 4 5 const colorClasses = { 6 green: { 7 - bg: "from-green-50 to-green-100", 8 - border: "border-green-200", 9 - titleText: "text-green-700", 10 - valueText: "text-green-900", 11 }, 12 red: { 13 - bg: "from-red-50 to-red-100", 14 - border: "border-red-200", 15 - titleText: "text-red-700", 16 - valueText: "text-red-900", 17 }, 18 turqoise: { 19 - bg: "from-teal-50 to-teal-100", 20 - border: "border-teal-200", 21 - titleText: "text-teal-700", 22 - valueText: "text-teal-900", 23 }, 24 orange: { 25 - bg: "from-orange-50 to-orange-100", 26 - border: "border-orange-200", 27 - titleText: "text-orange-700", 28 - valueText: "text-orange-900", 29 }, 30 }; 31 ··· 45 {title} 46 </h3> 47 <p class="text-xl md:text-2xl font-bold {colors.valueText}"> 48 - <NumberFlow {value} /> 49 </p> 50 </div>
··· 1 <script lang="ts"> 2 import { formatNumber } from "$lib/format"; 3 4 const colorClasses = { 5 green: { 6 + bg: "from-green-50 to-green-100 dark:from-green-900 dark:to-green-800", 7 + border: "border-green-200 dark:border-green-800", 8 + titleText: "text-green-700 dark:text-green-400", 9 + valueText: "text-green-900 dark:text-green-200", 10 }, 11 red: { 12 + bg: "from-red-50 to-red-100 dark:from-red-900 dark:to-red-800", 13 + border: "border-red-200 dark:border-red-800", 14 + titleText: "text-red-700 dark:text-red-400", 15 + valueText: "text-red-900 dark:text-red-200", 16 }, 17 turqoise: { 18 + bg: "from-teal-50 to-teal-100 dark:from-teal-900 dark:to-teal-800", 19 + border: "border-teal-200 dark:border-teal-800", 20 + titleText: "text-teal-700 dark:text-teal-400", 21 + valueText: "text-teal-900 dark:text-teal-200", 22 }, 23 orange: { 24 + bg: "from-orange-50 to-orange-100 dark:from-orange-900 dark:to-orange-800", 25 + border: "border-orange-200 dark:border-orange-800", 26 + titleText: "text-orange-700 dark:text-orange-400", 27 + valueText: "text-orange-900 dark:text-orange-200", 28 }, 29 }; 30 ··· 44 {title} 45 </h3> 46 <p class="text-xl md:text-2xl font-bold {colors.valueText}"> 47 + {formatNumber(value)} 48 </p> 49 </div>
+18 -14
client/src/lib/components/StatusBadge.svelte
··· 8 const statusConfig = { 9 connected: { 10 text: "stream live", 11 - classes: "bg-green-100 text-green-800 border-green-200", 12 }, 13 connecting: { 14 text: "stream connecting", 15 - classes: "bg-yellow-100 text-yellow-800 border-yellow-200", 16 }, 17 error: { 18 text: "stream errored", 19 - classes: "bg-red-100 text-red-800 border-red-200", 20 }, 21 disconnected: { 22 text: "stream offline", 23 - classes: "bg-gray-100 text-gray-800 border-gray-200", 24 }, 25 }; 26 27 const config = $derived(statusConfig[status]); 28 </script> 29 30 - <span class="wsbadge {config.classes}"> 31 - {config.text} 32 - </span> 33 - 34 - <style lang="postcss"> 35 - @reference "../../app.css"; 36 - .wsbadge { 37 - @apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border; 38 - } 39 - </style>
··· 8 const statusConfig = { 9 connected: { 10 text: "stream live", 11 + classes: 12 + "bg-green-100 dark:bg-green-900 text-green-800 dark:text-green-200 border-green-200 dark:border-green-800", 13 }, 14 connecting: { 15 text: "stream connecting", 16 + classes: 17 + "bg-yellow-100 dark:bg-yellow-900 text-yellow-800 dark:text-yellow-200 border-yellow-200 dark:border-yellow-800", 18 }, 19 error: { 20 text: "stream errored", 21 + classes: 22 + "bg-red-100 dark:bg-red-900 text-red-800 dark:text-red-200 border-red-200 dark:border-red-800", 23 }, 24 disconnected: { 25 text: "stream offline", 26 + classes: 27 + "bg-gray-100 dark:bg-gray-900 text-gray-800 dark:text-gray-200 border-gray-200 dark:border-gray-800", 28 }, 29 }; 30 31 const config = $derived(statusConfig[status]); 32 </script> 33 34 + <div class="flex flex-row items-center gap-2 wsbadge {config.classes}"> 35 + <!-- connecting spinner --> 36 + {#if status === "connecting"} 37 + <div 38 + class="animate-spin rounded-full h-4 w-4 border-b-2 border-yellow-800 dark:border-yellow-200" 39 + ></div> 40 + {/if} 41 + <!-- status text --> 42 + <span>{config.text}</span> 43 + </div>
+5 -1
client/src/lib/format.ts
··· 2 return num.toLocaleString(); 3 }; 4 5 export const formatTimestamp = (timestamp: number): string => { 6 - return new Date(timestamp / 1000).toLocaleString(); 7 };
··· 2 return num.toLocaleString(); 3 }; 4 5 + const isValidDate = (d: Date) => d instanceof Date && !isNaN(d.getTime()); 6 export const formatTimestamp = (timestamp: number): string => { 7 + const date = new Date(timestamp * 1000); 8 + return isValidDate(date) 9 + ? date.toLocaleString() 10 + : new Date(timestamp / 1000).toLocaleString(); 11 };
+1
client/src/lib/types.ts
··· 18 }; 19 20 export type SortOption = "total" | "created" | "deleted" | "date";
··· 18 }; 19 20 export type SortOption = "total" | "created" | "deleted" | "date"; 21 + export type ShowOption = "server init" | "stream start";
+3 -2
client/src/routes/+layout.ts
··· 1 - export const prerender = true; 2 - export const ssr = false;
··· 1 + export const prerender = false; 2 + export const ssr = true; 3 + export const csr = true;
+7
client/src/routes/+page.server.ts
···
··· 1 + import { fetchEvents, fetchTrackingSince } from "$lib/api"; 2 + 3 + export const load = async () => { 4 + const events = await fetchEvents(); 5 + const trackingSince = await fetchTrackingSince(); 6 + return { events, trackingSince }; 7 + };
+109 -51
client/src/routes/+page.svelte
··· 1 <script lang="ts"> 2 import { dev } from "$app/environment"; 3 - import type { EventRecord, NsidCount, SortOption } from "$lib/types"; 4 import { onMount, onDestroy } from "svelte"; 5 - import { writable } from "svelte/store"; 6 import { PUBLIC_API_URL } from "$env/static/public"; 7 import { fetchEvents, fetchTrackingSince } from "$lib/api"; 8 import { createRegexFilter } from "$lib/filter"; ··· 14 import BskyToggle from "$lib/components/BskyToggle.svelte"; 15 import RefreshControl from "$lib/components/RefreshControl.svelte"; 16 import { formatTimestamp } from "$lib/format"; 17 18 - const events = writable(new Map<string, EventRecord>()); 19 const pendingUpdates = new Map<string, EventRecord>(); 20 - let eventsList: NsidCount[] = $state([]); 21 let updateTimer: NodeJS.Timeout | null = null; 22 - events.subscribe((value) => { 23 - eventsList = value 24 - .entries() 25 - .map(([nsid, event]) => ({ 26 - nsid, 27 - ...event, 28 - })) 29 - .toArray(); 30 - }); 31 - let per_second = $state(0); 32 - let tracking_since = $state(0); 33 34 let all: EventRecord = $derived( 35 eventsList.reduce( 36 (acc, event) => { ··· 50 }, 51 ), 52 ); 53 - let error: string | null = $state(null); 54 - let filterRegex = $state(""); 55 - let dontShowBsky = $state(false); 56 - let sortBy: SortOption = $state("total"); 57 - let refreshRate = $state(""); 58 - let changedByUser = $state(false); 59 60 let websocket: WebSocket | null = null; 61 let isStreamOpen = $state(false); ··· 76 }; 77 websocket.onmessage = async (event) => { 78 const jsonData = JSON.parse(event.data); 79 - 80 - if (jsonData.per_second > 0) { 81 - per_second = jsonData.per_second; 82 - } 83 - 84 - // Store updates in pending map if refresh rate is set 85 if (refreshRate) { 86 for (const [nsid, event] of Object.entries(jsonData.events)) { 87 pendingUpdates.set(nsid, event as EventRecord); 88 } 89 } else { 90 - // Apply updates immediately if no refresh rate 91 - events.update((map) => { 92 - for (const [nsid, event] of Object.entries( 93 - jsonData.events, 94 - )) { 95 - map.set(nsid, event as EventRecord); 96 - } 97 - return map; 98 - }); 99 } 100 }; 101 websocket.onerror = (error) => { ··· 114 error = null; 115 const data = await fetchEvents(); 116 per_second = data.per_second; 117 - events.update((map) => { 118 - for (const [nsid, event] of Object.entries(data.events)) { 119 - map.set(nsid, event); 120 - } 121 - return map; 122 - }); 123 tracking_since = (await fetchTrackingSince()).since; 124 } catch (err) { 125 error = ··· 222 /> 223 </svelte:head> 224 225 - <header class="border-gray-300 border-b mb-4 pb-2"> 226 <div 227 class="px-2 md:ml-[19vw] mx-auto flex flex-wrap items-center text-center" 228 > 229 - <h1 class="text-4xl font-bold mr-4 text-gray-900">lexicon tracker</h1> 230 - <p class="text-lg mt-1 text-gray-600"> 231 tracks lexicons seen on the jetstream {tracking_since === 0 232 ? "" 233 : `(since: ${formatTimestamp(tracking_since)})`} 234 </p> 235 </div> 236 </header> 237 - <div class="md:max-w-[61vw] mx-auto p-2"> 238 <div class="min-w-fit grid grid-cols-2 xl:grid-cols-4 gap-2 2xl:gap-6 mb-8"> 239 <StatsCard 240 title="total creation" ··· 260 261 {#if error} 262 <div 263 - class="bg-red-100 border border-red-300 text-red-700 px-4 py-3 rounded-lg mb-6" 264 > 265 <p>Error: {error}</p> 266 </div> ··· 269 {#if eventsList.length > 0} 270 <div class="mb-8"> 271 <div class="flex flex-wrap items-center gap-3 mb-3"> 272 - <h2 class="text-2xl font-bold text-gray-900">seen lexicons</h2> 273 <StatusBadge status={websocketStatus} /> 274 </div> 275 <div class="flex flex-wrap items-center gap-1.5 mb-6"> ··· 291 refreshRate = ""; 292 }} 293 /> 294 <RefreshControl 295 {refreshRate} 296 onRefreshChange={(value) => { ··· 315 {/if} 316 </div> 317 318 - <footer class="py-2 border-t border-gray-200 text-center"> 319 - <p class="text-gray-600 text-sm"> 320 source code <a 321 href="https://tangled.sh/@poor.dog/nsid-tracker" 322 target="_blank" 323 rel="noopener noreferrer" 324 - class="text-blue-600 hover:text-blue-800 underline" 325 >@poor.dog/nsid-tracker</a 326 > 327 </p>
··· 1 <script lang="ts"> 2 import { dev } from "$app/environment"; 3 + import type { 4 + EventRecord, 5 + Events, 6 + NsidCount, 7 + ShowOption, 8 + Since, 9 + SortOption, 10 + } from "$lib/types"; 11 import { onMount, onDestroy } from "svelte"; 12 + import { get, writable } from "svelte/store"; 13 import { PUBLIC_API_URL } from "$env/static/public"; 14 import { fetchEvents, fetchTrackingSince } from "$lib/api"; 15 import { createRegexFilter } from "$lib/filter"; ··· 21 import BskyToggle from "$lib/components/BskyToggle.svelte"; 22 import RefreshControl from "$lib/components/RefreshControl.svelte"; 23 import { formatTimestamp } from "$lib/format"; 24 + import ShowControls from "$lib/components/ShowControls.svelte"; 25 + 26 + type Props = { 27 + data: { events: Events; trackingSince: Since }; 28 + }; 29 30 + const { data }: Props = $props(); 31 + 32 + const events = writable( 33 + new Map<string, EventRecord>(Object.entries(data.events.events)), 34 + ); 35 + const eventsStart = new Map<string, EventRecord>( 36 + Object.entries(data.events.events), 37 + ); 38 const pendingUpdates = new Map<string, EventRecord>(); 39 + 40 let updateTimer: NodeJS.Timeout | null = null; 41 + let per_second = $state(data.events.per_second); 42 + let tracking_since = $state(data.trackingSince.since); 43 44 + const diffEvents = ( 45 + oldEvents: Map<string, EventRecord>, 46 + newEvents: Map<string, EventRecord>, 47 + ): NsidCount[] => { 48 + const nsidCounts: NsidCount[] = []; 49 + for (const [nsid, event] of newEvents.entries()) { 50 + const oldEvent = oldEvents.get(nsid); 51 + if (oldEvent) { 52 + const counts = { 53 + nsid, 54 + count: event.count - oldEvent.count, 55 + deleted_count: event.deleted_count - oldEvent.deleted_count, 56 + last_seen: event.last_seen, 57 + }; 58 + if (counts.count > 0 || counts.deleted_count > 0) 59 + nsidCounts.push(counts); 60 + } else { 61 + nsidCounts.push({ 62 + nsid, 63 + ...event, 64 + }); 65 + } 66 + } 67 + return nsidCounts; 68 + }; 69 + const applyEvents = (newEvents: Record<string, EventRecord>) => { 70 + events.update((map) => { 71 + for (const [nsid, event] of Object.entries(newEvents)) { 72 + map.set(nsid, event); 73 + } 74 + return map; 75 + }); 76 + }; 77 + 78 + let error: string | null = $state(null); 79 + let filterRegex = $state(""); 80 + let dontShowBsky = $state(false); 81 + let sortBy: SortOption = $state("total"); 82 + let refreshRate = $state(""); 83 + let changedByUser = $state(false); 84 + let show: ShowOption = $state("server init"); 85 + let eventsList: NsidCount[] = $state([]); 86 + let updateEventsList = $derived((value: Map<string, EventRecord>) => { 87 + switch (show) { 88 + case "server init": 89 + eventsList = value 90 + .entries() 91 + .map(([nsid, event]) => ({ 92 + nsid, 93 + ...event, 94 + })) 95 + .toArray(); 96 + break; 97 + case "stream start": 98 + eventsList = diffEvents(eventsStart, value); 99 + break; 100 + } 101 + }); 102 + events.subscribe((value) => updateEventsList(value)); 103 let all: EventRecord = $derived( 104 eventsList.reduce( 105 (acc, event) => { ··· 119 }, 120 ), 121 ); 122 123 let websocket: WebSocket | null = null; 124 let isStreamOpen = $state(false); ··· 139 }; 140 websocket.onmessage = async (event) => { 141 const jsonData = JSON.parse(event.data); 142 + per_second = jsonData.per_second; 143 if (refreshRate) { 144 for (const [nsid, event] of Object.entries(jsonData.events)) { 145 pendingUpdates.set(nsid, event as EventRecord); 146 } 147 } else { 148 + applyEvents(jsonData.events); 149 } 150 }; 151 websocket.onerror = (error) => { ··· 164 error = null; 165 const data = await fetchEvents(); 166 per_second = data.per_second; 167 + applyEvents(data.events); 168 tracking_since = (await fetchTrackingSince()).since; 169 } catch (err) { 170 error = ··· 267 /> 268 </svelte:head> 269 270 + <header 271 + class="bg-white dark:bg-gray-900 border-gray-300 dark:border-gray-950 border-b mb-4 pb-2" 272 + > 273 <div 274 class="px-2 md:ml-[19vw] mx-auto flex flex-wrap items-center text-center" 275 > 276 + <h1 class="text-4xl font-bold mr-4 text-gray-900 dark:text-gray-200"> 277 + lexicon tracker 278 + </h1> 279 + <p class="text-lg mt-1 text-gray-600 dark:text-gray-300"> 280 tracks lexicons seen on the jetstream {tracking_since === 0 281 ? "" 282 : `(since: ${formatTimestamp(tracking_since)})`} 283 </p> 284 </div> 285 </header> 286 + <div class="bg-white dark:bg-gray-900 md:max-w-[61vw] mx-auto p-2"> 287 <div class="min-w-fit grid grid-cols-2 xl:grid-cols-4 gap-2 2xl:gap-6 mb-8"> 288 <StatsCard 289 title="total creation" ··· 309 310 {#if error} 311 <div 312 + class="bg-red-100 dark:bg-red-900 border border-red-300 dark:border-red-700 text-red-700 dark:text-red-200 px-4 py-3 rounded-lg mb-6" 313 > 314 <p>Error: {error}</p> 315 </div> ··· 318 {#if eventsList.length > 0} 319 <div class="mb-8"> 320 <div class="flex flex-wrap items-center gap-3 mb-3"> 321 + <h2 class="text-2xl font-bold text-gray-900 dark:text-gray-200"> 322 + seen lexicons 323 + </h2> 324 <StatusBadge status={websocketStatus} /> 325 </div> 326 <div class="flex flex-wrap items-center gap-1.5 mb-6"> ··· 342 refreshRate = ""; 343 }} 344 /> 345 + <ShowControls 346 + {show} 347 + onShowChange={(value: ShowOption) => { 348 + show = value; 349 + updateEventsList(get(events)); 350 + }} 351 + /> 352 <RefreshControl 353 {refreshRate} 354 onRefreshChange={(value) => { ··· 373 {/if} 374 </div> 375 376 + <footer class="py-2 border-t border-gray-200 dark:border-gray-800 text-center"> 377 + <p class="text-gray-600 dark:text-gray-200 text-sm"> 378 source code <a 379 href="https://tangled.sh/@poor.dog/nsid-tracker" 380 target="_blank" 381 rel="noopener noreferrer" 382 + class="text-blue-600 dark:text-blue-400 hover:text-blue-800 dark:hover:text-blue-600 underline" 383 >@poor.dog/nsid-tracker</a 384 > 385 </p>
+1 -1
client/svelte.config.js
··· 1 - import adapter from "@sveltejs/adapter-static"; 2 import { vitePreprocess } from "@sveltejs/vite-plugin-svelte"; 3 4 /** @type {import('@sveltejs/kit').Config} */
··· 1 + import adapter from "svelte-adapter-bun"; 2 import { vitePreprocess } from "@sveltejs/vite-plugin-svelte"; 3 4 /** @type {import('@sveltejs/kit').Config} */
+1 -1
nix/client-modules.nix
··· 8 9 src = ../client; 10 11 - outputHash = "sha256-t8PJFo+3XGkzmMNbw9Rf9cS5Ob5YtI8ucX3ay+u9a3M="; 12 outputHashAlgo = "sha256"; 13 outputHashMode = "recursive"; 14
··· 8 9 src = ../client; 10 11 + outputHash = "sha256-njwXk3u0NUsYWLv9EOdCltgQOjTVkcfu+D+0COSw/6I="; 12 outputHashAlgo = "sha256"; 13 outputHashMode = "recursive"; 14
+10 -2
nix/client.nix
··· 1 { 2 stdenv, 3 makeBinaryWrapper, 4 bun, ··· 28 ''; 29 buildPhase = '' 30 runHook preBuild 31 - bun --prefer-offline run --bun build 32 runHook postBuild 33 ''; 34 installPhase = '' 35 runHook preInstall 36 - mkdir -p $out 37 cp -R ./build/* $out 38 runHook postInstall 39 ''; 40 }
··· 1 { 2 + lib, 3 stdenv, 4 makeBinaryWrapper, 5 bun, ··· 29 ''; 30 buildPhase = '' 31 runHook preBuild 32 + bun --prefer-offline run build 33 runHook postBuild 34 ''; 35 installPhase = '' 36 runHook preInstall 37 + 38 + mkdir -p $out/bin 39 cp -R ./build/* $out 40 + cp -R ./node_modules $out 41 + 42 + makeBinaryWrapper ${bun}/bin/bun $out/bin/website \ 43 + --prefix PATH : ${lib.makeBinPath [ bun ]} \ 44 + --add-flags "run --bun --no-install --cwd $out start" 45 + 46 runHook postInstall 47 ''; 48 }
+42 -28
server/Cargo.lock
··· 18 checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" 19 20 [[package]] 21 name = "aho-corasick" 22 version = "1.1.3" 23 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 46 version = "1.0.98" 47 source = "registry+https://github.com/rust-lang/crates.io-index" 48 checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" 49 50 [[package]] 51 name = "async-compression" ··· 307 version = "0.2.1" 308 source = "registry+https://github.com/rust-lang/crates.io-index" 309 checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" 310 - 311 - [[package]] 312 - name = "cmake" 313 - version = "0.1.54" 314 - source = "registry+https://github.com/rust-lang/crates.io-index" 315 - checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" 316 - dependencies = [ 317 - "cc", 318 - ] 319 320 [[package]] 321 name = "combine" ··· 1552 name = "server" 1553 version = "0.1.0" 1554 dependencies = [ 1555 "anyhow", 1556 "async-trait", 1557 "axum", 1558 "axum-tws", ··· 1571 "serde", 1572 "serde_json", 1573 "smol_str", 1574 - "snmalloc-rs", 1575 "threadpool", 1576 "tikv-jemallocator", 1577 "tokio", ··· 1643 dependencies = [ 1644 "borsh", 1645 "serde", 1646 - ] 1647 - 1648 - [[package]] 1649 - name = "snmalloc-rs" 1650 - version = "0.3.8" 1651 - source = "registry+https://github.com/rust-lang/crates.io-index" 1652 - checksum = "eb317153089fdfa4d8a2eec059d40a5a23c3bde43995ea23b19121c3f621e74a" 1653 - dependencies = [ 1654 - "snmalloc-sys", 1655 - ] 1656 - 1657 - [[package]] 1658 - name = "snmalloc-sys" 1659 - version = "0.3.8" 1660 - source = "registry+https://github.com/rust-lang/crates.io-index" 1661 - checksum = "065fea53d32bb77bc36cca466cb191f2e5216ebfd0ed360b1d64889ee6e559ea" 1662 - dependencies = [ 1663 - "cmake", 1664 ] 1665 1666 [[package]] ··· 2406 version = "0.8.15" 2407 source = "registry+https://github.com/rust-lang/crates.io-index" 2408 checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 2409 2410 [[package]] 2411 name = "zeroize"
··· 18 checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" 19 20 [[package]] 21 + name = "ahash" 22 + version = "0.8.12" 23 + source = "registry+https://github.com/rust-lang/crates.io-index" 24 + checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" 25 + dependencies = [ 26 + "cfg-if", 27 + "getrandom 0.3.3", 28 + "once_cell", 29 + "serde", 30 + "version_check", 31 + "zerocopy", 32 + ] 33 + 34 + [[package]] 35 name = "aho-corasick" 36 version = "1.1.3" 37 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 60 version = "1.0.98" 61 source = "registry+https://github.com/rust-lang/crates.io-index" 62 checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" 63 + 64 + [[package]] 65 + name = "arc-swap" 66 + version = "1.7.1" 67 + source = "registry+https://github.com/rust-lang/crates.io-index" 68 + checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" 69 70 [[package]] 71 name = "async-compression" ··· 327 version = "0.2.1" 328 source = "registry+https://github.com/rust-lang/crates.io-index" 329 checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" 330 331 [[package]] 332 name = "combine" ··· 1563 name = "server" 1564 version = "0.1.0" 1565 dependencies = [ 1566 + "ahash", 1567 "anyhow", 1568 + "arc-swap", 1569 "async-trait", 1570 "axum", 1571 "axum-tws", ··· 1584 "serde", 1585 "serde_json", 1586 "smol_str", 1587 "threadpool", 1588 "tikv-jemallocator", 1589 "tokio", ··· 1655 dependencies = [ 1656 "borsh", 1657 "serde", 1658 ] 1659 1660 [[package]] ··· 2400 version = "0.8.15" 2401 source = "registry+https://github.com/rust-lang/crates.io-index" 2402 checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 2403 + 2404 + [[package]] 2405 + name = "zerocopy" 2406 + version = "0.8.26" 2407 + source = "registry+https://github.com/rust-lang/crates.io-index" 2408 + checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" 2409 + dependencies = [ 2410 + "zerocopy-derive", 2411 + ] 2412 + 2413 + [[package]] 2414 + name = "zerocopy-derive" 2415 + version = "0.8.26" 2416 + source = "registry+https://github.com/rust-lang/crates.io-index" 2417 + checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" 2418 + dependencies = [ 2419 + "proc-macro2", 2420 + "quote", 2421 + "syn", 2422 + ] 2423 2424 [[package]] 2425 name = "zeroize"
+2 -2
server/Cargo.toml
··· 30 rayon = "1.10.0" 31 parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] } 32 rclite = "0.2.7" 33 34 - [target.'cfg(target_env = "msvc")'.dependencies] 35 - snmalloc-rs = "0.3.8" 36 37 [target.'cfg(not(target_env = "msvc"))'.dependencies] 38 tikv-jemallocator = "0.6"
··· 30 rayon = "1.10.0" 31 parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] } 32 rclite = "0.2.7" 33 + arc-swap = "1.7.1" 34 + ahash = { version = "0.8.12", features = ["serde"] } 35 36 37 [target.'cfg(not(target_env = "msvc"))'.dependencies] 38 tikv-jemallocator = "0.6"
+16 -18
server/src/api.rs
··· 1 use std::{ 2 - collections::HashMap, 3 fmt::Display, 4 net::SocketAddr, 5 ops::{Bound, Deref, RangeBounds}, 6 time::Duration, 7 }; 8 9 use anyhow::anyhow; 10 use axum::{ 11 Json, Router, ··· 117 #[derive(Serialize)] 118 struct Events { 119 per_second: usize, 120 - events: HashMap<SmolStr, NsidCount>, 121 } 122 123 async fn events(db: State<Arc<Db>>) -> AppResult<Json<Events>> { 124 - let mut events = HashMap::new(); 125 for result in db.get_counts() { 126 let (nsid, counts) = result?; 127 events.insert( ··· 176 ) -> AppResult<Json<Vec<Hit>>> { 177 let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded); 178 let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded); 179 - let maybe_hits = db 180 - .get_hits(&params.nsid, HitsRange { from, to }) 181 - .take(MAX_HITS); 182 - let mut hits = Vec::with_capacity(maybe_hits.size_hint().0); 183 184 - for maybe_hit in maybe_hits { 185 - let hit = maybe_hit?; 186 - let hit_data = hit.deser()?; 187 - 188 - hits.push(Hit { 189 - timestamp: hit.timestamp, 190 - deleted: hit_data.deleted, 191 - }); 192 - } 193 194 - Ok(Json(hits)) 195 } 196 197 async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response { ··· 200 (async move { 201 let mut listener = db.new_listener(); 202 let mut data = Events { 203 - events: HashMap::<SmolStr, NsidCount>::with_capacity(10), 204 per_second: 0, 205 }; 206 let mut updates = 0;
··· 1 use std::{ 2 fmt::Display, 3 net::SocketAddr, 4 ops::{Bound, Deref, RangeBounds}, 5 time::Duration, 6 }; 7 8 + use ahash::AHashMap; 9 use anyhow::anyhow; 10 use axum::{ 11 Json, Router, ··· 117 #[derive(Serialize)] 118 struct Events { 119 per_second: usize, 120 + events: AHashMap<SmolStr, NsidCount>, 121 } 122 123 async fn events(db: State<Arc<Db>>) -> AppResult<Json<Events>> { 124 + let mut events = AHashMap::new(); 125 for result in db.get_counts() { 126 let (nsid, counts) = result?; 127 events.insert( ··· 176 ) -> AppResult<Json<Vec<Hit>>> { 177 let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded); 178 let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded); 179 180 + db.get_hits(&params.nsid, HitsRange { from, to }, MAX_HITS) 181 + .take(MAX_HITS) 182 + .try_fold(Vec::with_capacity(MAX_HITS), |mut acc, hit| { 183 + let hit = hit?; 184 + let hit_data = hit.deser()?; 185 186 + acc.push(Hit { 187 + timestamp: hit.timestamp, 188 + deleted: hit_data.deleted, 189 + }); 190 + Ok(acc) 191 + }) 192 + .map(Json) 193 } 194 195 async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response { ··· 198 (async move { 199 let mut listener = db.new_listener(); 200 let mut data = Events { 201 + events: AHashMap::<SmolStr, NsidCount>::with_capacity(10), 202 per_second: 0, 203 }; 204 let mut updates = 0;
+54 -26
server/src/db/handle.rs
··· 1 use std::{ 2 fmt::Debug, 3 io::Cursor, 4 - ops::{Bound, Deref, RangeBounds}, 5 sync::atomic::{AtomicU64, Ordering as AtomicOrdering}, 6 time::Duration, 7 }; 8 9 use byteview::ByteView; 10 - use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice}; 11 use itertools::Itertools; 12 use parking_lot::Mutex; 13 use rayon::iter::{IntoParallelIterator, ParallelIterator}; ··· 16 17 use crate::{ 18 db::{EventRecord, NsidHit, block}, 19 - error::AppResult, 20 - utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 21 }; 22 23 pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; ··· 31 } 32 33 pub struct LexiconHandle { 34 - tree: Partition, 35 nsid: SmolStr, 36 buf: Arc<Mutex<Vec<EventRecord>>>, 37 last_insert: AtomicU64, // relaxed ··· 46 } 47 } 48 49 - impl Deref for LexiconHandle { 50 - type Target = Partition; 51 - 52 - fn deref(&self) -> &Self::Target { 53 - &self.tree 54 - } 55 - } 56 - 57 impl LexiconHandle { 58 pub fn new(keyspace: &Keyspace, nsid: &str) -> Self { 59 let opts = PartitionCreateOptions::default() 60 - .block_size(1024 * 128) 61 .compression(fjall::CompressionType::Miniz(9)); 62 Self { 63 - tree: keyspace.open_partition(nsid, opts).unwrap(), 64 nsid: nsid.into(), 65 buf: Default::default(), 66 last_insert: AtomicU64::new(0), ··· 68 } 69 } 70 71 pub fn nsid(&self) -> &SmolStr { 72 &self.nsid 73 } 74 75 pub fn item_count(&self) -> usize { 76 self.buf.lock().len() 77 } 78 79 - pub fn since_last_activity(&self) -> u64 { 80 - CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()) 81 } 82 83 pub fn suggested_block_size(&self) -> usize { ··· 99 range: impl RangeBounds<u64>, 100 sort: bool, 101 ) -> AppResult<()> { 102 let start_limit = match range.start_bound().cloned() { 103 Bound::Included(start) => start, 104 Bound::Excluded(start) => start.saturating_add(1), ··· 114 let end_key = varints_unsigned_encoded([end_limit]); 115 116 let blocks_to_compact = self 117 - .tree 118 .range(start_key..end_key) 119 .collect::<Result<Vec<_>, _>>()?; 120 if blocks_to_compact.len() < 2 { 121 - tracing::info!("{}: nothing to compact", self.nsid); 122 return Ok(()); 123 } 124 ··· 155 let end_blocks_size = new_blocks.len(); 156 157 for key in keys_to_delete { 158 - self.tree.remove(key.clone())?; 159 } 160 for block in new_blocks { 161 - self.tree.insert(block.key, block.data)?; 162 } 163 164 tracing::info!( 165 - "{}: compacted {} blocks to {} blocks ({}% reduction)", 166 - self.nsid, 167 - start_blocks_size, 168 - end_blocks_size, 169 - ((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0, 170 ); 171 172 Ok(()) 173 } 174 175 pub fn encode_block_from_items(
··· 1 use std::{ 2 fmt::Debug, 3 io::Cursor, 4 + ops::{Bound, RangeBounds}, 5 sync::atomic::{AtomicU64, Ordering as AtomicOrdering}, 6 time::Duration, 7 }; 8 9 use byteview::ByteView; 10 + use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice, Snapshot}; 11 use itertools::Itertools; 12 use parking_lot::Mutex; 13 use rayon::iter::{IntoParallelIterator, ParallelIterator}; ··· 16 17 use crate::{ 18 db::{EventRecord, NsidHit, block}, 19 + error::{AppError, AppResult}, 20 + utils::{ 21 + ArcRefCnt, ArcliteSwap, CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, 22 + varints_unsigned_encoded, 23 + }, 24 }; 25 26 pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; ··· 34 } 35 36 pub struct LexiconHandle { 37 + write_tree: Partition, 38 + read_tree: ArcliteSwap<Snapshot>, 39 nsid: SmolStr, 40 buf: Arc<Mutex<Vec<EventRecord>>>, 41 last_insert: AtomicU64, // relaxed ··· 50 } 51 } 52 53 impl LexiconHandle { 54 pub fn new(keyspace: &Keyspace, nsid: &str) -> Self { 55 let opts = PartitionCreateOptions::default() 56 + .block_size(1024 * 48) 57 .compression(fjall::CompressionType::Miniz(9)); 58 + let write_tree = keyspace.open_partition(nsid, opts).unwrap(); 59 + let read_tree = ArcliteSwap::new(ArcRefCnt::new(write_tree.snapshot())); 60 Self { 61 + write_tree, 62 + read_tree, 63 nsid: nsid.into(), 64 buf: Default::default(), 65 last_insert: AtomicU64::new(0), ··· 67 } 68 } 69 70 + #[inline(always)] 71 + pub fn read(&self) -> arc_swap::Guard<ArcRefCnt<Snapshot>> { 72 + self.read_tree.load() 73 + } 74 + 75 + #[inline(always)] 76 + pub fn update_tree(&self) { 77 + self.read_tree 78 + .store(ArcRefCnt::new(self.write_tree.snapshot())); 79 + } 80 + 81 + #[inline(always)] 82 + pub fn span(&self) -> tracing::Span { 83 + tracing::info_span!("handle", nsid = %self.nsid) 84 + } 85 + 86 + #[inline(always)] 87 pub fn nsid(&self) -> &SmolStr { 88 &self.nsid 89 } 90 91 + #[inline(always)] 92 pub fn item_count(&self) -> usize { 93 self.buf.lock().len() 94 } 95 96 + pub fn since_last_activity(&self) -> Duration { 97 + Duration::from_nanos( 98 + CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()), 99 + ) 100 } 101 102 pub fn suggested_block_size(&self) -> usize { ··· 118 range: impl RangeBounds<u64>, 119 sort: bool, 120 ) -> AppResult<()> { 121 + let _span = self.span().entered(); 122 + 123 let start_limit = match range.start_bound().cloned() { 124 Bound::Included(start) => start, 125 Bound::Excluded(start) => start.saturating_add(1), ··· 135 let end_key = varints_unsigned_encoded([end_limit]); 136 137 let blocks_to_compact = self 138 + .read() 139 .range(start_key..end_key) 140 .collect::<Result<Vec<_>, _>>()?; 141 if blocks_to_compact.len() < 2 { 142 return Ok(()); 143 } 144 ··· 175 let end_blocks_size = new_blocks.len(); 176 177 for key in keys_to_delete { 178 + self.write_tree.remove(key.clone())?; 179 } 180 for block in new_blocks { 181 + self.write_tree.insert(block.key, block.data)?; 182 } 183 184 + let reduction = 185 + ((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0; 186 tracing::info!( 187 + { 188 + start = start_blocks_size, 189 + end = end_blocks_size, 190 + }, 191 + "blocks compacted {reduction:.2}%", 192 ); 193 194 Ok(()) 195 + } 196 + 197 + pub fn insert_block(&self, block: Block) -> AppResult<()> { 198 + self.write_tree 199 + .insert(block.key, block.data) 200 + .map_err(AppError::from) 201 } 202 203 pub fn encode_block_from_items(
+139 -82
server/src/db/mod.rs
··· 1 use std::{ 2 - collections::HashMap, 3 fmt::Debug, 4 io::Cursor, 5 ops::{Bound, Deref, RangeBounds}, 6 - path::{Path, PathBuf}, 7 time::Duration, 8 }; 9 10 use byteview::StrView; 11 - use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; 12 use itertools::{Either, Itertools}; 13 - use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; 14 use rclite::Arc; 15 use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 16 use smol_str::{SmolStr, ToSmolStr}; ··· 21 db::handle::{ItemDecoder, LexiconHandle}, 22 error::{AppError, AppResult}, 23 jetstream::JetstreamEvent, 24 - utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded}, 25 }; 26 27 mod block; ··· 71 } 72 73 pub struct DbInfo { 74 - pub nsids: HashMap<SmolStr, Vec<usize>>, 75 pub disk_size: u64, 76 } 77 ··· 79 pub ks_config: fjall::Config, 80 pub min_block_size: usize, 81 pub max_block_size: usize, 82 - pub max_last_activity: u64, 83 } 84 85 impl DbConfig { ··· 97 impl Default for DbConfig { 98 fn default() -> Self { 99 Self { 100 - ks_config: fjall::Config::default(), 101 - min_block_size: 512, 102 - max_block_size: 500_000, 103 - max_last_activity: Duration::from_secs(10).as_nanos() as u64, 104 } 105 } 106 } ··· 111 pub cfg: DbConfig, 112 pub ks: Keyspace, 113 counts: Partition, 114 - hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 115 sync_pool: threadpool::ThreadPool, 116 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 117 - eps: RateTracker<100>, 118 cancel_token: CancellationToken, 119 } 120 ··· 160 } 161 162 pub fn sync(&self, all: bool) -> AppResult<()> { 163 // prepare all the data 164 - let mut data = Vec::with_capacity(self.hits.len()); 165 let _guard = scc::ebr::Guard::new(); 166 - for (_, handle) in self.hits.iter(&_guard) { 167 let mut nsid_data = Vec::with_capacity(2); 168 - let mut total_count = 0; 169 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; 170 // if we disconnect for a long time, we want to sync all of what we 171 // have to avoid having many small blocks (even if we run compaction ··· 180 let count = handle.item_count(); 181 let data_count = count / block_size; 182 if count > 0 && (all || data_count > 0 || is_too_old) { 183 - for i in 0..data_count { 184 - nsid_data.push((i, handle.clone(), block_size)); 185 - total_count += block_size; 186 } 187 // only sync remainder if we haven't met block size 188 let remainder = count % block_size; 189 if (all || data_count == 0) && remainder > 0 { 190 - nsid_data.push((data_count, handle.clone(), remainder)); 191 - total_count += remainder; 192 } 193 } 194 - tracing::info!( 195 - "{}: will sync {} blocks ({} count)", 196 - handle.nsid(), 197 - nsid_data.len(), 198 - total_count, 199 - ); 200 - data.push(nsid_data); 201 } 202 drop(_guard); 203 ··· 206 .map(|chunk| { 207 chunk 208 .into_iter() 209 - .map(|(i, handle, max_block_size)| { 210 - (i, handle.take_block_items(max_block_size), handle) 211 }) 212 .collect::<Vec<_>>() 213 .into_par_iter() 214 - .map(|(i, items, handle)| { 215 let count = items.len(); 216 let block = LexiconHandle::encode_block_from_items(items, count)?; 217 - tracing::info!( 218 - "{}: encoded block with {} items", 219 - handle.nsid(), 220 - block.written, 221 - ); 222 - AppResult::Ok((i, block, handle)) 223 }) 224 .collect::<Result<Vec<_>, _>>() 225 }) 226 .try_for_each(|chunk| { 227 let chunk = chunk?; 228 - for (i, block, handle) in chunk { 229 - self.sync_pool 230 - .execute(move || match handle.insert(block.key, block.data) { 231 Ok(_) => { 232 - tracing::info!("{}: [{i}] synced {}", block.written, handle.nsid()) 233 } 234 - Err(err) => tracing::error!("failed to sync block: {}", err), 235 - }); 236 } 237 AppResult::Ok(()) 238 })?; 239 self.sync_pool.join(); 240 241 Ok(()) 242 } 243 ··· 251 let Some(handle) = self.get_handle(nsid) else { 252 return Ok(()); 253 }; 254 - handle.compact(max_count, range, sort) 255 } 256 257 pub fn compact_all( ··· 268 269 pub fn major_compact(&self) -> AppResult<()> { 270 self.compact_all(self.cfg.max_block_size, .., true)?; 271 - let _guard = scc::ebr::Guard::new(); 272 - for (_, handle) in self.hits.iter(&_guard) { 273 - handle.deref().major_compact()?; 274 - } 275 Ok(()) 276 } 277 ··· 301 } 302 303 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> { 304 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() { 305 let mut counts = self.get_count(&key)?; 306 - let mut count = 0; 307 self.ensure_handle(&key).queue(chunk.inspect(|e| { 308 // increment count 309 counts.last_seen = e.timestamp; ··· 312 } else { 313 counts.count += 1; 314 } 315 - count += 1; 316 })); 317 - self.eps.observe(count); 318 self.insert_count(&key, &counts)?; 319 if self.event_broadcaster.receiver_count() > 0 { 320 let _ = self.event_broadcaster.send((key, counts)); 321 } 322 } 323 Ok(()) 324 } 325 ··· 359 } 360 361 pub fn info(&self) -> AppResult<DbInfo> { 362 - let mut nsids = HashMap::new(); 363 for nsid in self.get_nsids() { 364 let Some(handle) = self.get_handle(&nsid) else { 365 continue; 366 }; 367 - let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| { 368 - let (key, value) = item?; 369 - let mut timestamps = Cursor::new(key); 370 - let start_timestamp = timestamps.read_varint()?; 371 - let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 372 - acc.push(decoder.item_count()); 373 - AppResult::Ok(acc) 374 - })?; 375 nsids.insert(nsid.to_smolstr(), block_lens); 376 } 377 Ok(DbInfo { ··· 384 &self, 385 nsid: &str, 386 range: impl RangeBounds<u64> + std::fmt::Debug, 387 ) -> impl Iterator<Item = AppResult<handle::Item>> { 388 let start_limit = match range.start_bound().cloned() { 389 Bound::Included(start) => start, ··· 401 return Either::Right(std::iter::empty()); 402 }; 403 404 - let map_block = move |(key, val)| { 405 let mut key_reader = Cursor::new(key); 406 let start_timestamp = key_reader.read_varint::<u64>()?; 407 if start_timestamp < start_limit { 408 - return Ok(None); 409 } 410 - let items = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)? 411 - .take_while(move |item| { 412 - item.as_ref().map_or(true, |item| { 413 - item.timestamp <= end_limit && item.timestamp >= start_limit 414 - }) 415 - }) 416 - .map(|res| res.map_err(AppError::from)); 417 - Ok(Some(items)) 418 }; 419 420 - Either::Left( 421 - handle 422 - .range(..end_key) 423 - .rev() 424 - .map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose()) 425 - .collect::<Vec<_>>() 426 - .into_iter() 427 - .rev() 428 - .flatten() 429 - .flatten(), 430 - ) 431 } 432 433 pub fn tracking_since(&self) -> AppResult<u64> { ··· 436 let Some(handle) = self.get_handle("app.bsky.feed.like") else { 437 return Ok(0); 438 }; 439 - let Some((timestamps_raw, _)) = handle.first_key_value()? else { 440 return Ok(0); 441 }; 442 let mut timestamp_reader = Cursor::new(timestamps_raw);
··· 1 use std::{ 2 fmt::Debug, 3 io::Cursor, 4 ops::{Bound, Deref, RangeBounds}, 5 + path::Path, 6 time::Duration, 7 + u64, 8 }; 9 10 + use ahash::{AHashMap, AHashSet}; 11 use byteview::StrView; 12 + use fjall::{Keyspace, Partition, PartitionCreateOptions}; 13 use itertools::{Either, Itertools}; 14 + use rayon::iter::{IntoParallelIterator, ParallelIterator}; 15 use rclite::Arc; 16 use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 17 use smol_str::{SmolStr, ToSmolStr}; ··· 22 db::handle::{ItemDecoder, LexiconHandle}, 23 error::{AppError, AppResult}, 24 jetstream::JetstreamEvent, 25 + utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 26 }; 27 28 mod block; ··· 72 } 73 74 pub struct DbInfo { 75 + pub nsids: AHashMap<SmolStr, Vec<usize>>, 76 pub disk_size: u64, 77 } 78 ··· 80 pub ks_config: fjall::Config, 81 pub min_block_size: usize, 82 pub max_block_size: usize, 83 + pub max_last_activity: Duration, 84 } 85 86 impl DbConfig { ··· 98 impl Default for DbConfig { 99 fn default() -> Self { 100 Self { 101 + ks_config: fjall::Config::default() 102 + .cache_size(1024 * 1024 * 512) 103 + .max_write_buffer_size(u64::MAX), 104 + min_block_size: 1000, 105 + max_block_size: 250_000, 106 + max_last_activity: Duration::from_secs(10), 107 } 108 } 109 } ··· 114 pub cfg: DbConfig, 115 pub ks: Keyspace, 116 counts: Partition, 117 + hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>, ahash::RandomState>, 118 sync_pool: threadpool::ThreadPool, 119 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 120 + eps: RateTracker<100>, // 100 millis buckets 121 cancel_token: CancellationToken, 122 } 123 ··· 163 } 164 165 pub fn sync(&self, all: bool) -> AppResult<()> { 166 + let start = CLOCK.now(); 167 // prepare all the data 168 + let nsids_len = self.hits.len(); 169 + let mut data = Vec::with_capacity(nsids_len); 170 + let mut nsids = AHashSet::with_capacity(nsids_len); 171 let _guard = scc::ebr::Guard::new(); 172 + for (nsid, handle) in self.hits.iter(&_guard) { 173 let mut nsid_data = Vec::with_capacity(2); 174 + // let mut total_count = 0; 175 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; 176 // if we disconnect for a long time, we want to sync all of what we 177 // have to avoid having many small blocks (even if we run compaction ··· 186 let count = handle.item_count(); 187 let data_count = count / block_size; 188 if count > 0 && (all || data_count > 0 || is_too_old) { 189 + for _ in 0..data_count { 190 + nsid_data.push((handle.clone(), block_size)); 191 + // total_count += block_size; 192 } 193 // only sync remainder if we haven't met block size 194 let remainder = count % block_size; 195 if (all || data_count == 0) && remainder > 0 { 196 + nsid_data.push((handle.clone(), remainder)); 197 + // total_count += remainder; 198 } 199 } 200 + let _span = handle.span().entered(); 201 + if nsid_data.len() > 0 { 202 + // tracing::info!( 203 + // {blocks = %nsid_data.len(), count = %total_count}, 204 + // "will encode & sync", 205 + // ); 206 + nsids.insert(nsid.clone()); 207 + data.push(nsid_data); 208 + } 209 } 210 drop(_guard); 211 ··· 214 .map(|chunk| { 215 chunk 216 .into_iter() 217 + .map(|(handle, max_block_size)| { 218 + (handle.take_block_items(max_block_size), handle) 219 }) 220 .collect::<Vec<_>>() 221 .into_par_iter() 222 + .map(|(items, handle)| { 223 let count = items.len(); 224 let block = LexiconHandle::encode_block_from_items(items, count)?; 225 + AppResult::Ok((block, handle)) 226 }) 227 .collect::<Result<Vec<_>, _>>() 228 }) 229 .try_for_each(|chunk| { 230 let chunk = chunk?; 231 + for (block, handle) in chunk { 232 + self.sync_pool.execute(move || { 233 + let _span = handle.span().entered(); 234 + let written = block.written; 235 + match handle.insert_block(block) { 236 Ok(_) => { 237 + tracing::info!({count = %written}, "synced") 238 } 239 + Err(err) => tracing::error!({ err = %err }, "failed to sync block"), 240 + } 241 + }); 242 } 243 AppResult::Ok(()) 244 })?; 245 self.sync_pool.join(); 246 247 + // update snapshots for all (changed) handles 248 + for nsid in nsids { 249 + self.hits.peek_with(&nsid, |_, handle| handle.update_tree()); 250 + } 251 + 252 + tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks"); 253 + 254 Ok(()) 255 } 256 ··· 264 let Some(handle) = self.get_handle(nsid) else { 265 return Ok(()); 266 }; 267 + handle.compact(max_count, range, sort)?; 268 + handle.update_tree(); 269 + Ok(()) 270 } 271 272 pub fn compact_all( ··· 283 284 pub fn major_compact(&self) -> AppResult<()> { 285 self.compact_all(self.cfg.max_block_size, .., true)?; 286 Ok(()) 287 } 288 ··· 312 } 313 314 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> { 315 + let mut seen_events = 0; 316 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() { 317 let mut counts = self.get_count(&key)?; 318 self.ensure_handle(&key).queue(chunk.inspect(|e| { 319 // increment count 320 counts.last_seen = e.timestamp; ··· 323 } else { 324 counts.count += 1; 325 } 326 + seen_events += 1; 327 })); 328 self.insert_count(&key, &counts)?; 329 if self.event_broadcaster.receiver_count() > 0 { 330 let _ = self.event_broadcaster.send((key, counts)); 331 } 332 } 333 + self.eps.observe(seen_events); 334 Ok(()) 335 } 336 ··· 370 } 371 372 pub fn info(&self) -> AppResult<DbInfo> { 373 + let mut nsids = AHashMap::new(); 374 for nsid in self.get_nsids() { 375 let Some(handle) = self.get_handle(&nsid) else { 376 continue; 377 }; 378 + let block_lens = handle 379 + .read() 380 + .iter() 381 + .rev() 382 + .try_fold(Vec::new(), |mut acc, item| { 383 + let (key, value) = item?; 384 + let mut timestamps = Cursor::new(key); 385 + let start_timestamp = timestamps.read_varint()?; 386 + let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 387 + acc.push(decoder.item_count()); 388 + AppResult::Ok(acc) 389 + })?; 390 nsids.insert(nsid.to_smolstr(), block_lens); 391 } 392 Ok(DbInfo { ··· 399 &self, 400 nsid: &str, 401 range: impl RangeBounds<u64> + std::fmt::Debug, 402 + max_items: usize, 403 ) -> impl Iterator<Item = AppResult<handle::Item>> { 404 let start_limit = match range.start_bound().cloned() { 405 Bound::Included(start) => start, ··· 417 return Either::Right(std::iter::empty()); 418 }; 419 420 + // let mut ts = CLOCK.now(); 421 + let map_block = move |(res, current_item_count)| -> AppResult<(Option<_>, usize)> { 422 + if current_item_count >= max_items { 423 + return Ok((None, current_item_count)); 424 + } 425 + let (key, val) = res?; 426 let mut key_reader = Cursor::new(key); 427 let start_timestamp = key_reader.read_varint::<u64>()?; 428 + // let end_timestamp = key_reader.read_varint::<u64>()?; 429 if start_timestamp < start_limit { 430 + // tracing::info!( 431 + // "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater" 432 + // ); 433 + return Ok((None, current_item_count)); 434 } 435 + let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?; 436 + let current_item_count = current_item_count + decoder.item_count(); 437 + // tracing::info!( 438 + // "took {}ns to get block with size {}", 439 + // ts.elapsed().as_nanos(), 440 + // decoder.item_count() 441 + // ); 442 + // ts = CLOCK.now(); 443 + Ok(( 444 + Some( 445 + decoder 446 + .take_while(move |item| { 447 + item.as_ref().map_or(true, |item| { 448 + item.timestamp <= end_limit && item.timestamp >= start_limit 449 + }) 450 + }) 451 + .map(|res| res.map_err(AppError::from)), 452 + ), 453 + current_item_count, 454 + )) 455 }; 456 457 + let (blocks, _counted) = handle 458 + .read() 459 + .range(..end_key) 460 + .map(|res| res.map_err(AppError::from)) 461 + .rev() 462 + .fold_while( 463 + (Vec::with_capacity(20), 0), 464 + |(mut blocks, current_item_count), res| { 465 + use itertools::FoldWhile::*; 466 + 467 + match map_block((res, current_item_count)) { 468 + Ok((Some(block), current_item_count)) => { 469 + blocks.push(Ok(block)); 470 + Continue((blocks, current_item_count)) 471 + } 472 + Ok((None, current_item_count)) => Done((blocks, current_item_count)), 473 + Err(err) => { 474 + blocks.push(Err(err)); 475 + Done((blocks, current_item_count)) 476 + } 477 + } 478 + }, 479 + ) 480 + .into_inner(); 481 + 482 + // tracing::info!( 483 + // "got blocks with size {}, item count {counted}", 484 + // blocks.len() 485 + // ); 486 + 487 + Either::Left(blocks.into_iter().rev().flatten().flatten()) 488 } 489 490 pub fn tracking_since(&self) -> AppResult<u64> { ··· 493 let Some(handle) = self.get_handle("app.bsky.feed.like") else { 494 return Ok(0); 495 }; 496 + let Some((timestamps_raw, _)) = handle.read().first_key_value()? else { 497 return Ok(0); 498 }; 499 let mut timestamp_reader = Cursor::new(timestamps_raw);
-501
server/src/db_old/block.rs
··· 1 - use ordered_varint::Variable; 2 - use rkyv::{ 3 - Archive, Deserialize, Serialize, 4 - api::high::{HighSerializer, HighValidator}, 5 - bytecheck::CheckBytes, 6 - de::Pool, 7 - rancor::{self, Strategy}, 8 - ser::allocator::ArenaHandle, 9 - util::AlignedVec, 10 - }; 11 - use std::{ 12 - io::{self, Read, Write}, 13 - marker::PhantomData, 14 - }; 15 - 16 - use crate::error::{AppError, AppResult}; 17 - 18 - pub struct Item<T> { 19 - pub timestamp: u64, 20 - data: AlignedVec, 21 - phantom: PhantomData<T>, 22 - } 23 - 24 - impl<T: Archive> Item<T> { 25 - pub fn access(&self) -> &T::Archived { 26 - unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) } 27 - } 28 - } 29 - 30 - impl<T> Item<T> 31 - where 32 - T: Archive, 33 - T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>> 34 - + Deserialize<T, Strategy<Pool, rancor::Error>>, 35 - { 36 - pub fn deser(&self) -> AppResult<T> { 37 - rkyv::from_bytes(&self.data).map_err(AppError::from) 38 - } 39 - } 40 - 41 - impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> { 42 - pub fn new(timestamp: u64, data: &T) -> Self { 43 - Item { 44 - timestamp, 45 - data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() }, 46 - phantom: PhantomData, 47 - } 48 - } 49 - } 50 - 51 - pub struct ItemEncoder<W: Write, T> { 52 - writer: W, 53 - prev_timestamp: u64, 54 - prev_delta: i64, 55 - _item: PhantomData<T>, 56 - } 57 - 58 - impl<W: Write, T> ItemEncoder<W, T> { 59 - pub fn new(writer: W) -> Self { 60 - ItemEncoder { 61 - writer, 62 - prev_timestamp: 0, 63 - prev_delta: 0, 64 - _item: PhantomData, 65 - } 66 - } 67 - 68 - pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> { 69 - if self.prev_timestamp == 0 { 70 - // self.writer.write_varint(item.timestamp)?; 71 - self.prev_timestamp = item.timestamp; 72 - self.write_data(&item.data)?; 73 - return Ok(()); 74 - } 75 - 76 - let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64; 77 - 78 - self.writer.write_varint(delta - self.prev_delta)?; 79 - self.prev_timestamp = item.timestamp; 80 - self.prev_delta = delta; 81 - 82 - self.write_data(&item.data)?; 83 - 84 - Ok(()) 85 - } 86 - 87 - fn write_data(&mut self, data: &[u8]) -> AppResult<()> { 88 - self.writer.write_varint(data.len())?; 89 - self.writer.write_all(data)?; 90 - Ok(()) 91 - } 92 - 93 - pub fn finish(mut self) -> AppResult<W> { 94 - self.writer.flush()?; 95 - Ok(self.writer) 96 - } 97 - } 98 - 99 - pub struct ItemDecoder<R, T> { 100 - reader: R, 101 - current_timestamp: u64, 102 - current_delta: i64, 103 - first_item: bool, 104 - _item: PhantomData<T>, 105 - } 106 - 107 - impl<R: Read, T: Archive> ItemDecoder<R, T> { 108 - pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> { 109 - Ok(ItemDecoder { 110 - reader, 111 - current_timestamp: start_timestamp, 112 - current_delta: 0, 113 - first_item: true, 114 - _item: PhantomData, 115 - }) 116 - } 117 - 118 - pub fn decode(&mut self) -> AppResult<Option<Item<T>>> { 119 - if self.first_item { 120 - // read the first timestamp 121 - // let timestamp = match self.reader.read_varint::<u64>() { 122 - // Ok(timestamp) => timestamp, 123 - // Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 124 - // Err(e) => return Err(e.into()), 125 - // }; 126 - // self.current_timestamp = timestamp; 127 - 128 - let Some(data_raw) = self.read_item()? else { 129 - return Ok(None); 130 - }; 131 - self.first_item = false; 132 - return Ok(Some(Item { 133 - timestamp: self.current_timestamp, 134 - data: data_raw, 135 - phantom: PhantomData, 136 - })); 137 - } 138 - 139 - let Some(_delta) = self.read_timestamp()? else { 140 - return Ok(None); 141 - }; 142 - 143 - // read data 144 - let data_raw = match self.read_item()? { 145 - Some(data_raw) => data_raw, 146 - None => { 147 - return Err(io::Error::new( 148 - io::ErrorKind::UnexpectedEof, 149 - "expected data after delta", 150 - ) 151 - .into()); 152 - } 153 - }; 154 - 155 - Ok(Some(Item { 156 - timestamp: self.current_timestamp, 157 - data: data_raw, 158 - phantom: PhantomData, 159 - })) 160 - } 161 - 162 - // [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1] 163 - fn read_timestamp(&mut self) -> AppResult<Option<u64>> { 164 - let delta = match self.reader.read_varint::<i64>() { 165 - Ok(delta) => delta, 166 - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 167 - Err(e) => return Err(e.into()), 168 - }; 169 - self.current_delta += delta; 170 - self.current_timestamp = 171 - (self.current_timestamp as i128 + self.current_delta as i128) as u64; 172 - Ok(Some(self.current_timestamp)) 173 - } 174 - 175 - fn read_item(&mut self) -> AppResult<Option<AlignedVec>> { 176 - let data_len = match self.reader.read_varint::<usize>() { 177 - Ok(data_len) => data_len, 178 - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 179 - Err(e) => return Err(e.into()), 180 - }; 181 - let mut data_raw = AlignedVec::with_capacity(data_len); 182 - for _ in 0..data_len { 183 - data_raw.push(0); 184 - } 185 - self.reader.read_exact(data_raw.as_mut_slice())?; 186 - Ok(Some(data_raw)) 187 - } 188 - } 189 - 190 - impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> { 191 - type Item = AppResult<Item<T>>; 192 - 193 - fn next(&mut self) -> Option<Self::Item> { 194 - self.decode().transpose() 195 - } 196 - } 197 - 198 - pub trait WriteVariableExt: Write { 199 - fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> { 200 - value.encode_variable(self) 201 - } 202 - } 203 - impl<W: Write> WriteVariableExt for W {} 204 - 205 - pub trait ReadVariableExt: Read { 206 - fn read_varint<T: Variable>(&mut self) -> io::Result<T> { 207 - T::decode_variable(self) 208 - } 209 - } 210 - impl<R: Read> ReadVariableExt for R {} 211 - 212 - #[cfg(test)] 213 - mod test { 214 - use super::*; 215 - use rkyv::{Archive, Deserialize, Serialize}; 216 - use std::io::Cursor; 217 - 218 - #[derive(Archive, Deserialize, Serialize, Debug, PartialEq)] 219 - #[rkyv(compare(PartialEq))] 220 - struct TestData { 221 - id: u32, 222 - value: String, 223 - } 224 - 225 - #[test] 226 - fn test_encoder_decoder_single_item() { 227 - let data = TestData { 228 - id: 123, 229 - value: "test".to_string(), 230 - }; 231 - 232 - let item = Item::new(1000, &data); 233 - 234 - // encode 235 - let mut buffer = Vec::new(); 236 - let mut encoder = ItemEncoder::new(&mut buffer); 237 - encoder.encode(&item).unwrap(); 238 - encoder.finish().unwrap(); 239 - 240 - // decode 241 - let cursor = Cursor::new(buffer); 242 - let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 243 - 244 - let decoded_item = decoder.decode().unwrap().unwrap(); 245 - assert_eq!(decoded_item.timestamp, 1000); 246 - 247 - let decoded_data = decoded_item.access(); 248 - assert_eq!(decoded_data.id, 123); 249 - assert_eq!(decoded_data.value.as_str(), "test"); 250 - } 251 - 252 - #[test] 253 - fn test_encoder_decoder_multiple_items() { 254 - let items = vec![ 255 - Item::new( 256 - 1000, 257 - &TestData { 258 - id: 1, 259 - value: "first".to_string(), 260 - }, 261 - ), 262 - Item::new( 263 - 1010, 264 - &TestData { 265 - id: 2, 266 - value: "second".to_string(), 267 - }, 268 - ), 269 - Item::new( 270 - 1015, 271 - &TestData { 272 - id: 3, 273 - value: "third".to_string(), 274 - }, 275 - ), 276 - Item::new( 277 - 1025, 278 - &TestData { 279 - id: 4, 280 - value: "fourth".to_string(), 281 - }, 282 - ), 283 - ]; 284 - 285 - // encode 286 - let mut buffer = Vec::new(); 287 - let mut encoder = ItemEncoder::new(&mut buffer); 288 - 289 - for item in &items { 290 - encoder.encode(item).unwrap(); 291 - } 292 - encoder.finish().unwrap(); 293 - 294 - // decode 295 - let cursor = Cursor::new(buffer); 296 - let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 297 - 298 - let mut decoded_items = Vec::new(); 299 - while let Some(item) = decoder.decode().unwrap() { 300 - decoded_items.push(item); 301 - } 302 - 303 - assert_eq!(decoded_items.len(), 4); 304 - 305 - for (original, decoded) in items.iter().zip(decoded_items.iter()) { 306 - assert_eq!(original.timestamp, decoded.timestamp); 307 - assert_eq!(original.access().id, decoded.access().id); 308 - assert_eq!( 309 - original.access().value.as_str(), 310 - decoded.access().value.as_str() 311 - ); 312 - } 313 - } 314 - 315 - #[test] 316 - fn test_encoder_decoder_with_iterator() { 317 - let items = vec![ 318 - Item::new( 319 - 2000, 320 - &TestData { 321 - id: 10, 322 - value: "a".to_string(), 323 - }, 324 - ), 325 - Item::new( 326 - 2005, 327 - &TestData { 328 - id: 20, 329 - value: "b".to_string(), 330 - }, 331 - ), 332 - Item::new( 333 - 2012, 334 - &TestData { 335 - id: 30, 336 - value: "c".to_string(), 337 - }, 338 - ), 339 - ]; 340 - 341 - // encode 342 - let mut buffer = Vec::new(); 343 - let mut encoder = ItemEncoder::new(&mut buffer); 344 - 345 - for item in &items { 346 - encoder.encode(item).unwrap(); 347 - } 348 - encoder.finish().unwrap(); 349 - 350 - // decode 351 - let cursor = Cursor::new(buffer); 352 - let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap(); 353 - 354 - let decoded_items: Result<Vec<_>, _> = decoder.collect(); 355 - let decoded_items = decoded_items.unwrap(); 356 - 357 - assert_eq!(decoded_items.len(), 3); 358 - assert_eq!(decoded_items[0].timestamp, 2000); 359 - assert_eq!(decoded_items[1].timestamp, 2005); 360 - assert_eq!(decoded_items[2].timestamp, 2012); 361 - 362 - assert_eq!(decoded_items[0].access().id, 10); 363 - assert_eq!(decoded_items[1].access().id, 20); 364 - assert_eq!(decoded_items[2].access().id, 30); 365 - } 366 - 367 - #[test] 368 - fn test_delta_compression() { 369 - let items = vec![ 370 - Item::new( 371 - 1000, 372 - &TestData { 373 - id: 1, 374 - value: "a".to_string(), 375 - }, 376 - ), 377 - Item::new( 378 - 1010, 379 - &TestData { 380 - id: 2, 381 - value: "b".to_string(), 382 - }, 383 - ), // delta = 10 384 - Item::new( 385 - 1020, 386 - &TestData { 387 - id: 3, 388 - value: "c".to_string(), 389 - }, 390 - ), // delta = 10, delta-of-delta = 0 391 - Item::new( 392 - 1025, 393 - &TestData { 394 - id: 4, 395 - value: "d".to_string(), 396 - }, 397 - ), // delta = 5, delta-of-delta = -5 398 - ]; 399 - 400 - let mut buffer = Vec::new(); 401 - let mut encoder = ItemEncoder::new(&mut buffer); 402 - 403 - for item in &items { 404 - encoder.encode(item).unwrap(); 405 - } 406 - encoder.finish().unwrap(); 407 - 408 - // decode and verify 409 - let cursor = Cursor::new(buffer); 410 - let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 411 - 412 - let decoded_items: Result<Vec<_>, _> = decoder.collect(); 413 - let decoded_items = decoded_items.unwrap(); 414 - 415 - for (original, decoded) in items.iter().zip(decoded_items.iter()) { 416 - assert_eq!(original.timestamp, decoded.timestamp); 417 - assert_eq!(original.access().id, decoded.access().id); 418 - } 419 - } 420 - 421 - #[test] 422 - fn test_empty_decode() { 423 - let buffer = Vec::new(); 424 - let cursor = Cursor::new(buffer); 425 - let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 426 - 427 - let result = decoder.decode().unwrap(); 428 - assert!(result.is_none()); 429 - } 430 - 431 - #[test] 432 - fn test_backwards_timestamp() { 433 - let items = vec![ 434 - Item::new( 435 - 1000, 436 - &TestData { 437 - id: 1, 438 - value: "first".to_string(), 439 - }, 440 - ), 441 - Item::new( 442 - 900, 443 - &TestData { 444 - id: 2, 445 - value: "second".to_string(), 446 - }, 447 - ), 448 - ]; 449 - 450 - let mut buffer = Vec::new(); 451 - let mut encoder = ItemEncoder::new(&mut buffer); 452 - 453 - for item in &items { 454 - encoder.encode(item).unwrap(); 455 - } 456 - encoder.finish().unwrap(); 457 - 458 - let cursor = Cursor::new(buffer); 459 - let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 460 - 461 - let decoded_items: Result<Vec<_>, _> = decoder.collect(); 462 - let decoded_items = decoded_items.unwrap(); 463 - 464 - assert_eq!(decoded_items.len(), 2); 465 - assert_eq!(decoded_items[0].timestamp, 1000); 466 - assert_eq!(decoded_items[1].timestamp, 900); 467 - } 468 - 469 - #[test] 470 - fn test_different_data_sizes() { 471 - let small_data = TestData { 472 - id: 1, 473 - value: "x".to_string(), 474 - }; 475 - let large_data = TestData { 476 - id: 2, 477 - value: "a".repeat(1000), 478 - }; 479 - 480 - let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)]; 481 - 482 - let mut buffer = Vec::new(); 483 - let mut encoder = ItemEncoder::new(&mut buffer); 484 - 485 - for item in &items { 486 - encoder.encode(item).unwrap(); 487 - } 488 - encoder.finish().unwrap(); 489 - 490 - let cursor = Cursor::new(buffer); 491 - let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 492 - 493 - let decoded_items: Result<Vec<_>, _> = decoder.collect(); 494 - let decoded_items = decoded_items.unwrap(); 495 - 496 - assert_eq!(decoded_items.len(), 2); 497 - assert_eq!(decoded_items[0].access().value.as_str(), "x"); 498 - assert_eq!(decoded_items[1].access().value.len(), 1000); 499 - assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000)); 500 - } 501 - }
···
-424
server/src/db_old/mod.rs
··· 1 - use std::{ 2 - io::Cursor, 3 - ops::{Bound, Deref, RangeBounds}, 4 - path::Path, 5 - sync::{ 6 - Arc, 7 - atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering}, 8 - }, 9 - time::{Duration, Instant}, 10 - }; 11 - 12 - use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice}; 13 - use ordered_varint::Variable; 14 - use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 15 - use smol_str::SmolStr; 16 - use tokio::sync::broadcast; 17 - 18 - use crate::{ 19 - db_old::block::{ReadVariableExt, WriteVariableExt}, 20 - error::{AppError, AppResult}, 21 - jetstream::JetstreamEvent, 22 - utils::{DefaultRateTracker, get_time}, 23 - }; 24 - 25 - mod block; 26 - 27 - #[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 28 - #[rkyv(compare(PartialEq), derive(Debug))] 29 - pub struct NsidCounts { 30 - pub count: u128, 31 - pub deleted_count: u128, 32 - pub last_seen: u64, 33 - } 34 - 35 - #[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 36 - #[rkyv(compare(PartialEq), derive(Debug))] 37 - pub struct NsidHit { 38 - pub deleted: bool, 39 - } 40 - 41 - #[derive(Clone)] 42 - pub struct EventRecord { 43 - pub nsid: SmolStr, 44 - pub timestamp: u64, // seconds 45 - pub deleted: bool, 46 - } 47 - 48 - impl EventRecord { 49 - pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 50 - match event { 51 - JetstreamEvent::Commit { 52 - time_us, commit, .. 53 - } => Some(Self { 54 - nsid: commit.collection.into(), 55 - timestamp: time_us / 1_000_000, 56 - deleted: false, 57 - }), 58 - JetstreamEvent::Delete { 59 - time_us, commit, .. 60 - } => Some(Self { 61 - nsid: commit.collection.into(), 62 - timestamp: time_us / 1_000_000, 63 - deleted: true, 64 - }), 65 - _ => None, 66 - } 67 - } 68 - } 69 - 70 - type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; 71 - type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>; 72 - type Item = block::Item<NsidHit>; 73 - 74 - pub struct LexiconHandle { 75 - tree: Partition, 76 - buf: Arc<scc::Queue<EventRecord>>, 77 - buf_len: AtomicUsize, 78 - last_insert: AtomicU64, 79 - eps: DefaultRateTracker, 80 - block_size: AtomicUsize, 81 - } 82 - 83 - impl LexiconHandle { 84 - fn new(keyspace: &Keyspace, nsid: &str) -> Self { 85 - let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9)); 86 - Self { 87 - tree: keyspace.open_partition(nsid, opts).unwrap(), 88 - buf: Default::default(), 89 - buf_len: AtomicUsize::new(0), 90 - last_insert: AtomicU64::new(0), 91 - eps: DefaultRateTracker::new(Duration::from_secs(5)), 92 - block_size: AtomicUsize::new(1000), 93 - } 94 - } 95 - 96 - fn item_count(&self) -> usize { 97 - self.buf_len.load(AtomicOrdering::Acquire) 98 - } 99 - 100 - fn last_insert(&self) -> u64 { 101 - self.last_insert.load(AtomicOrdering::Acquire) 102 - } 103 - 104 - fn suggested_block_size(&self) -> usize { 105 - self.block_size.load(AtomicOrdering::Relaxed) 106 - } 107 - 108 - fn insert(&self, event: EventRecord) { 109 - self.buf.push(event); 110 - self.buf_len.fetch_add(1, AtomicOrdering::Release); 111 - self.last_insert 112 - .store(get_time().as_millis() as u64, AtomicOrdering::Release); 113 - self.eps.observe(1); 114 - let rate = self.eps.rate() as usize; 115 - if rate != 0 { 116 - self.block_size.store(rate * 60, AtomicOrdering::Relaxed); 117 - } 118 - } 119 - 120 - fn sync(&self, max_block_size: usize) -> AppResult<usize> { 121 - let mut writer = ItemEncoder::new(Vec::with_capacity( 122 - size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(), 123 - )); 124 - let mut start_timestamp = None; 125 - let mut end_timestamp = None; 126 - let mut written = 0_usize; 127 - while let Some(event) = self.buf.pop() { 128 - let item = Item::new( 129 - event.timestamp, 130 - &NsidHit { 131 - deleted: event.deleted, 132 - }, 133 - ); 134 - writer.encode(&item)?; 135 - if start_timestamp.is_none() { 136 - start_timestamp = Some(event.timestamp); 137 - } 138 - end_timestamp = Some(event.timestamp); 139 - if written >= max_block_size { 140 - break; 141 - } 142 - written += 1; 143 - } 144 - if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) { 145 - self.buf_len.store(0, AtomicOrdering::Release); 146 - let value = writer.finish()?; 147 - let mut key = Vec::with_capacity(size_of::<u64>() * 2); 148 - key.write_varint(start_timestamp)?; 149 - key.write_varint(end_timestamp)?; 150 - self.tree.insert(key, value)?; 151 - } 152 - Ok(written) 153 - } 154 - } 155 - 156 - type BoxedIter<T> = Box<dyn Iterator<Item = T>>; 157 - 158 - // counts is nsid -> NsidCounts 159 - // hits is tree per nsid: varint start time + varint end time -> block of hits 160 - pub struct Db { 161 - inner: Keyspace, 162 - hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 163 - counts: Partition, 164 - event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 165 - eps: DefaultRateTracker, 166 - min_block_size: usize, 167 - max_block_size: usize, 168 - max_last_activity: Duration, 169 - } 170 - 171 - impl Db { 172 - pub fn new(path: impl AsRef<Path>) -> AppResult<Self> { 173 - tracing::info!("opening db..."); 174 - let ks = Config::new(path) 175 - .cache_size(8 * 1024 * 1024) // from talna 176 - .open()?; 177 - Ok(Self { 178 - hits: Default::default(), 179 - counts: ks.open_partition( 180 - "_counts", 181 - PartitionCreateOptions::default().compression(fjall::CompressionType::None), 182 - )?, 183 - inner: ks, 184 - event_broadcaster: broadcast::channel(1000).0, 185 - eps: DefaultRateTracker::new(Duration::from_secs(1)), 186 - min_block_size: 512, 187 - max_block_size: 100_000, 188 - max_last_activity: Duration::from_secs(10), 189 - }) 190 - } 191 - 192 - pub fn sync(&self, all: bool) -> AppResult<()> { 193 - let _guard = scc::ebr::Guard::new(); 194 - for (nsid, tree) in self.hits.iter(&_guard) { 195 - let count = tree.item_count(); 196 - let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size()); 197 - let is_too_old = (get_time().as_millis() as u64 - tree.last_insert()) 198 - > self.max_last_activity.as_millis() as u64; 199 - if count > 0 && (all || is_max_block_size || is_too_old) { 200 - loop { 201 - let synced = tree.sync(self.max_block_size)?; 202 - if synced == 0 { 203 - break; 204 - } 205 - tracing::info!("synced {synced} of {nsid} to db"); 206 - } 207 - } 208 - } 209 - Ok(()) 210 - } 211 - 212 - #[inline(always)] 213 - pub fn eps(&self) -> usize { 214 - self.eps.rate() as usize 215 - } 216 - 217 - #[inline(always)] 218 - pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 219 - self.event_broadcaster.subscribe() 220 - } 221 - 222 - #[inline(always)] 223 - fn maybe_run_in_nsid_tree<T>( 224 - &self, 225 - nsid: &str, 226 - f: impl FnOnce(&LexiconHandle) -> T, 227 - ) -> Option<T> { 228 - let _guard = scc::ebr::Guard::new(); 229 - let handle = match self.hits.peek(nsid, &_guard) { 230 - Some(handle) => handle.clone(), 231 - None => { 232 - if self.inner.partition_exists(nsid) { 233 - let handle = Arc::new(LexiconHandle::new(&self.inner, nsid)); 234 - let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 235 - handle 236 - } else { 237 - return None; 238 - } 239 - } 240 - }; 241 - Some(f(&handle)) 242 - } 243 - 244 - #[inline(always)] 245 - fn run_in_nsid_tree<T>( 246 - &self, 247 - nsid: SmolStr, 248 - f: impl FnOnce(&LexiconHandle) -> AppResult<T>, 249 - ) -> AppResult<T> { 250 - f(self 251 - .hits 252 - .entry(nsid.clone()) 253 - .or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid))) 254 - .get()) 255 - } 256 - 257 - pub fn record_event(&self, e: EventRecord) -> AppResult<()> { 258 - let EventRecord { 259 - nsid, 260 - timestamp, 261 - deleted, 262 - } = e.clone(); 263 - 264 - // insert event 265 - self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?; 266 - // increment count 267 - let mut counts = self.get_count(&nsid)?; 268 - counts.last_seen = timestamp; 269 - if deleted { 270 - counts.deleted_count += 1; 271 - } else { 272 - counts.count += 1; 273 - } 274 - self.insert_count(&nsid, counts.clone())?; 275 - if self.event_broadcaster.receiver_count() > 0 { 276 - let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts)); 277 - } 278 - self.eps.observe(1); 279 - Ok(()) 280 - } 281 - 282 - #[inline(always)] 283 - fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> { 284 - self.counts 285 - .insert( 286 - nsid, 287 - unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(), 288 - ) 289 - .map_err(AppError::from) 290 - } 291 - 292 - pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 293 - let Some(raw) = self.counts.get(nsid)? else { 294 - return Ok(NsidCounts::default()); 295 - }; 296 - Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 297 - } 298 - 299 - pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 300 - self.counts.iter().map(|res| { 301 - res.map_err(AppError::from).map(|(key, val)| { 302 - ( 303 - SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 304 - unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 305 - ) 306 - }) 307 - }) 308 - } 309 - 310 - pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> { 311 - self.inner 312 - .list_partitions() 313 - .into_iter() 314 - .filter(|k| k.deref() != "_counts") 315 - } 316 - 317 - pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> { 318 - self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> { 319 - Box::new( 320 - handle 321 - .tree 322 - .iter() 323 - .rev() 324 - .map(|res| res.map_err(AppError::from)), 325 - ) 326 - }) 327 - .unwrap_or_else(|| Box::new(std::iter::empty())) 328 - } 329 - 330 - pub fn get_hits( 331 - &self, 332 - nsid: &str, 333 - range: impl RangeBounds<u64> + std::fmt::Debug, 334 - ) -> BoxedIter<AppResult<Item>> { 335 - let start = range 336 - .start_bound() 337 - .cloned() 338 - .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 339 - let end = range 340 - .end_bound() 341 - .cloned() 342 - .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 343 - let limit = match range.end_bound().cloned() { 344 - Bound::Included(end) => end, 345 - Bound::Excluded(end) => end.saturating_sub(1), 346 - Bound::Unbounded => u64::MAX, 347 - }; 348 - 349 - self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> { 350 - let map_block = move |(key, val)| { 351 - let mut key_reader = Cursor::new(key); 352 - let start_timestamp = key_reader.read_varint::<u64>()?; 353 - let items = 354 - ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| { 355 - item.as_ref().map_or(true, |item| item.timestamp <= limit) 356 - }); 357 - Ok(items) 358 - }; 359 - 360 - Box::new( 361 - handle 362 - .tree 363 - .range(TimestampRange { start, end }) 364 - .map(move |res| res.map_err(AppError::from).and_then(map_block)) 365 - .flatten() 366 - .flatten(), 367 - ) 368 - }) 369 - .unwrap_or_else(|| Box::new(std::iter::empty())) 370 - } 371 - 372 - pub fn tracking_since(&self) -> AppResult<u64> { 373 - // HACK: we should actually store when we started tracking but im lazy 374 - // should be accurate enough 375 - self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| { 376 - let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else { 377 - return Ok(0); 378 - }; 379 - let mut timestamp_reader = Cursor::new(timestamps_raw); 380 - timestamp_reader 381 - .read_varint::<u64>() 382 - .map_err(AppError::from) 383 - }) 384 - .unwrap_or(Ok(0)) 385 - } 386 - } 387 - 388 - type TimestampRepr = Vec<u8>; 389 - 390 - struct TimestampRange { 391 - start: Bound<TimestampRepr>, 392 - end: Bound<TimestampRepr>, 393 - } 394 - 395 - impl RangeBounds<TimestampRepr> for TimestampRange { 396 - #[inline(always)] 397 - fn start_bound(&self) -> Bound<&TimestampRepr> { 398 - self.start.as_ref() 399 - } 400 - 401 - #[inline(always)] 402 - fn end_bound(&self) -> Bound<&TimestampRepr> { 403 - self.end.as_ref() 404 - } 405 - } 406 - 407 - type TimestampReprOld = [u8; 8]; 408 - 409 - struct TimestampRangeOld { 410 - start: Bound<TimestampReprOld>, 411 - end: Bound<TimestampReprOld>, 412 - } 413 - 414 - impl RangeBounds<TimestampReprOld> for TimestampRangeOld { 415 - #[inline(always)] 416 - fn start_bound(&self) -> Bound<&TimestampReprOld> { 417 - self.start.as_ref() 418 - } 419 - 420 - #[inline(always)] 421 - fn end_bound(&self) -> Bound<&TimestampReprOld> { 422 - self.end.as_ref() 423 - } 424 - }
···
+21 -11
server/src/jetstream.rs
··· 13 pub struct JetstreamClient { 14 stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>, 15 tls_connector: tokio_websockets::Connector, 16 - url: SmolStr, 17 } 18 19 impl JetstreamClient { 20 - pub fn new(url: &str) -> AppResult<Self> { 21 Ok(Self { 22 stream: None, 23 tls_connector: tokio_websockets::Connector::new()?, 24 - url: SmolStr::new(url), 25 }) 26 } 27 28 pub async fn connect(&mut self) -> AppResult<()> { 29 - let (stream, _) = ClientBuilder::new() 30 - .connector(&self.tls_connector) 31 - .uri(&self.url)? 32 - .connect() 33 - .await?; 34 - self.stream = Some(stream); 35 - tracing::info!("connected to jetstream ({})", self.url); 36 - Ok(()) 37 } 38 39 // automatically retries connection, only returning error if it fails many times
··· 13 pub struct JetstreamClient { 14 stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>, 15 tls_connector: tokio_websockets::Connector, 16 + urls: Vec<SmolStr>, 17 } 18 19 impl JetstreamClient { 20 + pub fn new(urls: impl IntoIterator<Item = impl Into<SmolStr>>) -> AppResult<Self> { 21 Ok(Self { 22 stream: None, 23 tls_connector: tokio_websockets::Connector::new()?, 24 + urls: urls.into_iter().map(Into::into).collect(), 25 }) 26 } 27 28 pub async fn connect(&mut self) -> AppResult<()> { 29 + for uri in &self.urls { 30 + let conn_result = ClientBuilder::new() 31 + .connector(&self.tls_connector) 32 + .uri(uri)? 33 + .connect() 34 + .await; 35 + match conn_result { 36 + Ok((stream, _)) => { 37 + self.stream = Some(stream); 38 + tracing::info!("connected to jetstream {}", uri); 39 + return Ok(()); 40 + } 41 + Err(err) => { 42 + tracing::error!("failed to connect to jetstream {uri}: {err}"); 43 + } 44 + }; 45 + } 46 + Err(anyhow!("failed to connect to any jetstream server").into()) 47 } 48 49 // automatically retries connection, only returning error if it fails many times
+48 -21
server/src/main.rs
··· 1 - use std::{ops::Deref, time::Duration, u64}; 2 3 use itertools::Itertools; 4 use rclite::Arc; ··· 17 18 mod api; 19 mod db; 20 - mod db_old; 21 mod error; 22 mod jetstream; 23 mod utils; ··· 26 #[global_allocator] 27 static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; 28 29 - #[cfg(target_env = "msvc")] 30 - #[global_allocator] 31 - static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; 32 - 33 #[tokio::main] 34 async fn main() { 35 tracing_subscriber::fmt::fmt() ··· 54 debug(); 55 return; 56 } 57 Some(x) => { 58 tracing::error!("unknown command: {}", x); 59 return; ··· 71 .install_default() 72 .expect("cant install rustls crypto provider"); 73 74 - let mut jetstream = 75 - match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") { 76 - Ok(client) => client, 77 - Err(err) => { 78 - tracing::error!("can't create jetstream client: {err}"); 79 - return; 80 - } 81 - }; 82 83 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 84 let consume_events = tokio::spawn({ ··· 107 move || { 108 let mut buffer = Vec::new(); 109 loop { 110 - let read = event_rx.blocking_recv_many(&mut buffer, 100); 111 if let Err(err) = db.ingest_events(buffer.drain(..)) { 112 tracing::error!("failed to ingest events: {}", err); 113 } ··· 153 if db.is_shutting_down() { 154 return; 155 } 156 - let end = get_time() - compact_period / 2; 157 let start = end - compact_period; 158 let range = start.as_secs()..end.as_secs(); 159 tracing::info!( ··· 208 db.sync(true).expect("cant sync db"); 209 } 210 211 fn debug() { 212 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 213 let info = db.info().expect("cant get db info"); ··· 260 261 fn migrate() { 262 let cancel_token = CancellationToken::new(); 263 - 264 - let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db")); 265 - 266 let to = Arc::new( 267 Db::new( 268 DbConfig::default().path(".fjall_data_to").ks(|c| { ··· 277 ); 278 279 let nsids = from.get_nsids().collect::<Vec<_>>(); 280 - let eps_thread = std::thread::spawn({ 281 let to = to.clone(); 282 move || { 283 loop { ··· 297 threads.push(std::thread::spawn(move || { 298 tracing::info!("{}: migrating...", nsid.deref()); 299 let mut count = 0_u64; 300 - for hits in from.get_hits(&nsid, ..).chunks(100000).into_iter() { 301 to.ingest_events(hits.map(|hit| { 302 count += 1; 303 let hit = hit.expect("cant decode hit");
··· 1 + use std::{ops::Deref, time::Duration, u64, usize}; 2 3 use itertools::Itertools; 4 use rclite::Arc; ··· 17 18 mod api; 19 mod db; 20 mod error; 21 mod jetstream; 22 mod utils; ··· 25 #[global_allocator] 26 static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; 27 28 #[tokio::main] 29 async fn main() { 30 tracing_subscriber::fmt::fmt() ··· 49 debug(); 50 return; 51 } 52 + Some("print") => { 53 + print_all(); 54 + return; 55 + } 56 Some(x) => { 57 tracing::error!("unknown command: {}", x); 58 return; ··· 70 .install_default() 71 .expect("cant install rustls crypto provider"); 72 73 + let urls = [ 74 + "wss://jetstream1.us-west.bsky.network/subscribe", 75 + "wss://jetstream2.us-west.bsky.network/subscribe", 76 + "wss://jetstream2.fr.hose.cam/subscribe", 77 + "wss://jetstream.fire.hose.cam/subscribe", 78 + ]; 79 + let mut jetstream = match JetstreamClient::new(urls) { 80 + Ok(client) => client, 81 + Err(err) => { 82 + tracing::error!("can't create jetstream client: {err}"); 83 + return; 84 + } 85 + }; 86 87 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 88 let consume_events = tokio::spawn({ ··· 111 move || { 112 let mut buffer = Vec::new(); 113 loop { 114 + let read = event_rx.blocking_recv_many(&mut buffer, 500); 115 if let Err(err) = db.ingest_events(buffer.drain(..)) { 116 tracing::error!("failed to ingest events: {}", err); 117 } ··· 157 if db.is_shutting_down() { 158 return; 159 } 160 + let end = get_time(); 161 let start = end - compact_period; 162 let range = start.as_secs()..end.as_secs(); 163 tracing::info!( ··· 212 db.sync(true).expect("cant sync db"); 213 } 214 215 + fn print_all() { 216 + let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 217 + let nsids = db.get_nsids().collect::<Vec<_>>(); 218 + let mut count = 0_usize; 219 + for nsid in nsids { 220 + println!("{}:", nsid.deref()); 221 + for hit in db.get_hits(&nsid, .., usize::MAX) { 222 + let hit = hit.expect("aaa"); 223 + println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted); 224 + count += 1; 225 + } 226 + } 227 + println!("total hits: {}", count); 228 + } 229 + 230 fn debug() { 231 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 232 let info = db.info().expect("cant get db info"); ··· 279 280 fn migrate() { 281 let cancel_token = CancellationToken::new(); 282 + let from = Arc::new( 283 + Db::new( 284 + DbConfig::default().path(".fjall_data_from"), 285 + cancel_token.child_token(), 286 + ) 287 + .expect("couldnt create db"), 288 + ); 289 let to = Arc::new( 290 Db::new( 291 DbConfig::default().path(".fjall_data_to").ks(|c| { ··· 300 ); 301 302 let nsids = from.get_nsids().collect::<Vec<_>>(); 303 + let _eps_thread = std::thread::spawn({ 304 let to = to.clone(); 305 move || { 306 loop { ··· 320 threads.push(std::thread::spawn(move || { 321 tracing::info!("{}: migrating...", nsid.deref()); 322 let mut count = 0_u64; 323 + for hits in from 324 + .get_hits(&nsid, .., usize::MAX) 325 + .chunks(100000) 326 + .into_iter() 327 + { 328 to.ingest_events(hits.map(|hit| { 329 count += 1; 330 let hit = hit.expect("cant decode hit");
+66
server/src/utils.rs
··· 1 use std::io::{self, Read, Write}; 2 use std::sync::atomic::{AtomicU64, Ordering}; 3 use std::time::Duration; 4 5 use byteview::ByteView; 6 use ordered_varint::Variable; 7 8 pub fn get_time() -> Duration { 9 std::time::SystemTime::now() ··· 320 } 321 } 322 }
··· 1 use std::io::{self, Read, Write}; 2 + use std::ops::Deref; 3 use std::sync::atomic::{AtomicU64, Ordering}; 4 use std::time::Duration; 5 6 + use arc_swap::RefCnt; 7 use byteview::ByteView; 8 use ordered_varint::Variable; 9 + use rclite::Arc; 10 11 pub fn get_time() -> Duration { 12 std::time::SystemTime::now() ··· 323 } 324 } 325 } 326 + 327 + pub type ArcliteSwap<T> = arc_swap::ArcSwapAny<ArcRefCnt<T>>; 328 + 329 + pub struct ArcRefCnt<T>(Arc<T>); 330 + 331 + impl<T> ArcRefCnt<T> { 332 + pub fn new(value: T) -> Self { 333 + Self(Arc::new(value)) 334 + } 335 + } 336 + 337 + impl<T> Deref for ArcRefCnt<T> { 338 + type Target = T; 339 + 340 + fn deref(&self) -> &Self::Target { 341 + &self.0 342 + } 343 + } 344 + 345 + impl<T> Clone for ArcRefCnt<T> { 346 + fn clone(&self) -> Self { 347 + Self(self.0.clone()) 348 + } 349 + } 350 + 351 + // SAFETY: uhhhhhhhh copied the Arc impl from arc_swap xd 352 + unsafe impl<T> RefCnt for ArcRefCnt<T> { 353 + type Base = T; 354 + 355 + fn into_ptr(me: Self) -> *mut Self::Base { 356 + Arc::into_raw(me.0) as *mut T 357 + } 358 + 359 + fn as_ptr(me: &Self) -> *mut Self::Base { 360 + // Slightly convoluted way to do this, but this avoids stacked borrows violations. The same 361 + // intention as 362 + // 363 + // me as &T as *const T as *mut T 364 + // 365 + // We first create a "shallow copy" of me - one that doesn't really own its ref count 366 + // (that's OK, me _does_ own it, so it can't be destroyed in the meantime). 367 + // Then we can use into_raw (which preserves not having the ref count). 368 + // 369 + // We need to "revert" the changes we did. In current std implementation, the combination 370 + // of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw 371 + // and that read shall be paired with forget to properly "close the brackets". In future 372 + // versions of STD, these may become something else that's not really no-op (unlikely, but 373 + // possible), so we future-proof it a bit. 374 + 375 + // SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads 376 + let ptr = Arc::into_raw(unsafe { std::ptr::read(&me.0) }); 377 + let ptr = ptr as *mut T; 378 + 379 + // SAFETY: We got the pointer from into_raw just above 380 + std::mem::forget(unsafe { Arc::from_raw(ptr) }); 381 + 382 + ptr 383 + } 384 + 385 + unsafe fn from_ptr(ptr: *const Self::Base) -> Self { 386 + Self(unsafe { Arc::from_raw(ptr) }) 387 + } 388 + }