+12
-3
README.md
+12
-3
README.md
···
1
-
a webapp and server that monitors the jetstream and tracks the different lexicons as they are created or deleted.
2
-
it shows you which collections are most active on the network.
1
+
a webapp and server that monitors the jetstream and tracks the different
2
+
lexicons as they are created or deleted. it shows you which collections are most
3
+
active on the network.
3
4
4
5
for backend it uses rust with fjall as db, the frontend is built with sveltekit.
5
6
6
7
see [here](https://gaze.systems/nsid-tracker) for a hosted instance of it.
7
8
9
+
## performance / storage
10
+
11
+
it uses about 50MB of space for 620M recorded events (events being just
12
+
timestamp in seconds and deleted boolean for now). and around 50-60ms for
13
+
querying 300-400k events.
14
+
15
+
this is on a machine with AMD EPYC 7281 (32) @ 2.100GHz.
16
+
8
17
## running
9
18
10
19
### with nix
11
20
12
-
- run the server: `nix run git+https://tangled.sh/@poor.dog/nsid-tracker#server`
21
+
- build the server: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#server`
13
22
- build the client: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#client`
14
23
15
24
### manually
+9
client/bun.lock
+9
client/bun.lock
···
5
5
"name": "nsid-tracker",
6
6
"dependencies": {
7
7
"@number-flow/svelte": "^0.3.9",
8
+
"svelte-adapter-bun": "^0.5.2",
8
9
},
9
10
"devDependencies": {
10
11
"@eslint/compat": "^1.2.5",
···
353
354
354
355
"globals": ["globals@16.3.0", "", {}, "sha512-bqWEnJ1Nt3neqx2q5SFfGS8r/ahumIakg3HcwtNlrVlwXIeNumWn/c7Pn/wKzGhf6SaW6H6uWXLqC30STCMchQ=="],
355
356
357
+
"globalyzer": ["globalyzer@0.1.0", "", {}, "sha512-40oNTM9UfG6aBmuKxk/giHn5nQ8RVz/SS4Ir6zgzOv9/qC3kKZ9v4etGTcJbEl/NyVQH7FGU7d+X1egr57Md2Q=="],
358
+
359
+
"globrex": ["globrex@0.1.2", "", {}, "sha512-uHJgbwAMwNFf5mLst7IWLNg14x1CkeqglJb/K3doi4dw6q2IvAAmM/Y81kevy83wP+Sst+nutFTYOGg3d1lsxg=="],
360
+
356
361
"graceful-fs": ["graceful-fs@4.2.11", "", {}, "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="],
357
362
358
363
"graphemer": ["graphemer@1.4.0", "", {}, "sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag=="],
···
511
516
512
517
"svelte": ["svelte@5.36.8", "", { "dependencies": { "@ampproject/remapping": "^2.3.0", "@jridgewell/sourcemap-codec": "^1.5.0", "@sveltejs/acorn-typescript": "^1.0.5", "@types/estree": "^1.0.5", "acorn": "^8.12.1", "aria-query": "^5.3.1", "axobject-query": "^4.1.0", "clsx": "^2.1.1", "esm-env": "^1.2.1", "esrap": "^2.1.0", "is-reference": "^3.0.3", "locate-character": "^3.0.0", "magic-string": "^0.30.11", "zimmerframe": "^1.1.2" } }, "sha512-8JbZWQu96hMjH/oYQPxXW6taeC6Awl6muGHeZzJTxQx7NGRQ/J9wN1hkzRKLOlSDlbS2igiFg7p5xyTp5uXG3A=="],
513
518
519
+
"svelte-adapter-bun": ["svelte-adapter-bun@0.5.2", "", { "dependencies": { "tiny-glob": "^0.2.9" } }, "sha512-xEtFgaal6UgrCwwkSIcapO9kopoFNUYCYqyKCikdqxX9bz2TDYnrWQZ7qBnkunMxi1HOIERUCvTcebYGiarZLA=="],
520
+
514
521
"svelte-check": ["svelte-check@4.3.0", "", { "dependencies": { "@jridgewell/trace-mapping": "^0.3.25", "chokidar": "^4.0.1", "fdir": "^6.2.0", "picocolors": "^1.0.0", "sade": "^1.7.4" }, "peerDependencies": { "svelte": "^4.0.0 || ^5.0.0-next.0", "typescript": ">=5.0.0" }, "bin": { "svelte-check": "bin/svelte-check" } }, "sha512-Iz8dFXzBNAM7XlEIsUjUGQhbEE+Pvv9odb9+0+ITTgFWZBGeJRRYqHUUglwe2EkLD5LIsQaAc4IUJyvtKuOO5w=="],
515
522
516
523
"svelte-eslint-parser": ["svelte-eslint-parser@1.3.0", "", { "dependencies": { "eslint-scope": "^8.2.0", "eslint-visitor-keys": "^4.0.0", "espree": "^10.0.0", "postcss": "^8.4.49", "postcss-scss": "^4.0.9", "postcss-selector-parser": "^7.0.0" }, "peerDependencies": { "svelte": "^3.37.0 || ^4.0.0 || ^5.0.0" }, "optionalPeers": ["svelte"] }, "sha512-VCgMHKV7UtOGcGLGNFSbmdm6kEKjtzo5nnpGU/mnx4OsFY6bZ7QwRF5DUx+Hokw5Lvdyo8dpk8B1m8mliomrNg=="],
···
520
527
"tapable": ["tapable@2.2.2", "", {}, "sha512-Re10+NauLTMCudc7T5WLFLAwDhQ0JWdrMK+9B2M8zR5hRExKmsRDCBA7/aV/pNJFltmBFO5BAMlQFi/vq3nKOg=="],
521
528
522
529
"tar": ["tar@7.4.3", "", { "dependencies": { "@isaacs/fs-minipass": "^4.0.0", "chownr": "^3.0.0", "minipass": "^7.1.2", "minizlib": "^3.0.1", "mkdirp": "^3.0.1", "yallist": "^5.0.0" } }, "sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw=="],
530
+
531
+
"tiny-glob": ["tiny-glob@0.2.9", "", { "dependencies": { "globalyzer": "0.1.0", "globrex": "^0.1.2" } }, "sha512-g/55ssRPUjShh+xkfx9UPDXqhckHEsHr4Vd9zX55oSdGZc/MD0m3sferOkwWtp98bv+kcVfEHtRJgBVJzelrzg=="],
523
532
524
533
"tinyglobby": ["tinyglobby@0.2.14", "", { "dependencies": { "fdir": "^6.4.4", "picomatch": "^4.0.2" } }, "sha512-tX5e7OM1HnYr2+a2C/4V0htOcSQcoSTH9KgJnVvNm5zm/cyEWKJ7j7YutsH9CxMdtOkkLFy2AHrMci9IM8IPZQ=="],
525
534
+2
-1
client/package.json
+2
-1
client/package.json
+4
client/src/app.css
+4
client/src/app.css
+9
-9
client/src/app.html
+9
-9
client/src/app.html
···
1
1
<!doctype html>
2
2
<html lang="en">
3
-
<head>
4
-
<meta charset="utf-8" />
5
-
<link rel="icon" href="%sveltekit.assets%/favicon.svg" />
6
-
<meta name="viewport" content="width=device-width, initial-scale=1" />
7
-
%sveltekit.head%
8
-
</head>
9
-
<body data-sveltekit-preload-data="hover">
10
-
<div style="display: contents">%sveltekit.body%</div>
11
-
</body>
3
+
<head>
4
+
<meta charset="utf-8" />
5
+
<link rel="icon" href="%sveltekit.assets%/favicon.svg" />
6
+
<meta name="viewport" content="width=device-width, initial-scale=1" />
7
+
%sveltekit.head%
8
+
</head>
9
+
<body class="bg-white dark:bg-gray-900" data-sveltekit-preload-data="hover">
10
+
<div style="display: contents">%sveltekit.body%</div>
11
+
</body>
12
12
</html>
+2
-9
client/src/lib/components/BskyToggle.svelte
+2
-9
client/src/lib/components/BskyToggle.svelte
···
11
11
<!-- svelte-ignore a11y_no_static_element_interactions -->
12
12
<button
13
13
onclick={onBskyToggle}
14
-
class="wsbadge !mt-0 !font-normal bg-yellow-100 hover:bg-yellow-200 border-yellow-300"
14
+
class="wsbadge !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700"
15
15
>
16
16
<input checked={dontShowBsky} type="checkbox" />
17
-
<span class="ml-0.5"> hide app.bsky.* </span>
17
+
<span class="ml-0.5 text-black dark:text-gray-200"> hide app.bsky.* </span>
18
18
</button>
19
-
20
-
<style lang="postcss">
21
-
@reference "../../app.css";
22
-
.wsbadge {
23
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
24
-
}
25
-
</style>
+8
-5
client/src/lib/components/EventCard.svelte
+8
-5
client/src/lib/components/EventCard.svelte
···
104
104
</script>
105
105
106
106
<div
107
-
class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white border border-gray-200 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform"
107
+
class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white dark:bg-gray-800/50 border border-gray-200 dark:border-gray-950 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform"
108
108
class:has-activity={isAnimating}
109
109
style="--border-thickness: {borderThickness}px"
110
110
>
111
111
<div class="flex items-start gap-2">
112
112
<div
113
-
class="text-sm font-bold text-blue-600 bg-blue-100 px-3 py-1 rounded-full"
113
+
class="text-sm font-bold text-blue-600 bg-blue-100 dark:bg-indigo-950 px-3 py-1 rounded-full"
114
114
>
115
115
#{index + 1}
116
116
</div>
117
117
<div
118
118
title={event.nsid}
119
-
class="font-mono text-sm text-gray-700 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 border-gray-100 group-hover:border transition-all px-1"
119
+
class="font-mono text-sm text-gray-700 dark:text-gray-300 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 dark:group-hover:bg-gray-700 border-gray-100 dark:border-gray-900 group-hover:border transition-all px-1"
120
120
>
121
121
{event.nsid}
122
122
</div>
···
136
136
</div>
137
137
</div>
138
138
139
-
<style>
139
+
<style lang="postcss">
140
140
.has-activity {
141
141
position: relative;
142
142
transition: all 0.2s ease-out;
143
143
}
144
144
145
145
.has-activity::before {
146
+
@reference "../../app.css";
147
+
@apply border-blue-500 dark:border-blue-800;
146
148
content: "";
147
149
position: absolute;
148
150
top: calc(-1 * var(--border-thickness));
149
151
left: calc(-1 * var(--border-thickness));
150
152
right: calc(-1 * var(--border-thickness));
151
153
bottom: calc(-1 * var(--border-thickness));
152
-
border: var(--border-thickness) solid rgba(59, 130, 246, 0.8);
154
+
border-width: var(--border-thickness);
155
+
border-style: solid;
153
156
border-radius: calc(0.5rem + var(--border-thickness));
154
157
pointer-events: none;
155
158
transition: all 0.3s ease-out;
+5
-10
client/src/lib/components/FilterControls.svelte
+5
-10
client/src/lib/components/FilterControls.svelte
···
8
8
</script>
9
9
10
10
<div
11
-
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 hover:bg-blue-200 border-blue-300"
11
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700"
12
12
>
13
-
<label for="filter-regex" class="text-blue-800 mr-1"> filter: </label>
13
+
<label for="filter-regex" class="text-blue-800 dark:text-gray-200 mr-1">
14
+
filter:
15
+
</label>
14
16
<input
15
17
id="filter-regex"
16
18
value={filterRegex}
17
19
oninput={(e) => onFilterChange((e.target as HTMLInputElement).value)}
18
20
type="text"
19
21
placeholder="regex..."
20
-
class="bg-blue-50 text-blue-900 placeholder-blue-400 border border-blue-200 rounded-full px-1 outline-none focus:bg-white focus:border-blue-400 min-w-0 w-24"
22
+
class="bg-blue-50 dark:bg-blue-950 text-blue-900 dark:text-gray-400 placeholder-blue-400 dark:placeholder-blue-700 border border-blue-200 dark:border-blue-700 rounded-full px-1 outline-none focus:border-blue-400 min-w-0 w-24"
21
23
/>
22
24
</div>
23
-
24
-
<style lang="postcss">
25
-
@reference "../../app.css";
26
-
.wsbadge {
27
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
28
-
}
29
-
</style>
+6
-11
client/src/lib/components/RefreshControl.svelte
+6
-11
client/src/lib/components/RefreshControl.svelte
···
8
8
</script>
9
9
10
10
<div
11
-
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-green-100 hover:bg-green-200 border-green-300"
11
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-lime-100 dark:bg-lime-900 dark:hover:bg-lime-800 hover:bg-lime-200 border-lime-300 dark:border-lime-700"
12
12
>
13
-
<label for="refresh-rate" class="text-green-800 mr-1">refresh:</label>
13
+
<label for="refresh-rate" class="text-lime-800 dark:text-lime-200 mr-1"
14
+
>refresh:</label
15
+
>
14
16
<input
15
17
id="refresh-rate"
16
18
value={refreshRate}
···
24
26
pattern="[0-9]*"
25
27
min="0"
26
28
placeholder="real-time"
27
-
class="bg-green-50 text-green-900 placeholder-green-400 border border-green-200 rounded-full px-1 outline-none focus:bg-white focus:border-green-400 min-w-0 w-20"
29
+
class="bg-green-50 dark:bg-green-900 text-lime-900 dark:text-lime-200 placeholder-lime-600 dark:placeholder-lime-400 border border-lime-200 dark:border-lime-700 rounded-full px-1 outline-none focus:border-lime-400 min-w-0 w-20"
28
30
/>
29
-
<span class="text-green-700">s</span>
31
+
<span class="text-lime-800 dark:text-lime-200">s</span>
30
32
</div>
31
-
32
-
<style lang="postcss">
33
-
@reference "../../app.css";
34
-
.wsbadge {
35
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
36
-
}
37
-
</style>
+31
client/src/lib/components/ShowControls.svelte
+31
client/src/lib/components/ShowControls.svelte
···
1
+
<script lang="ts">
2
+
import type { ShowOption } from "$lib/types";
3
+
4
+
interface Props {
5
+
show: ShowOption;
6
+
onShowChange: (value: ShowOption) => void;
7
+
}
8
+
9
+
let { show, onShowChange }: Props = $props();
10
+
11
+
const showOptions: ShowOption[] = ["server init", "stream start"];
12
+
</script>
13
+
14
+
<div
15
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-pink-100 dark:bg-pink-800 hover:bg-pink-200 dark:hover:bg-pink-700 border-pink-300 dark:border-pink-700"
16
+
>
17
+
<label for="show" class="text-pink-800 dark:text-pink-100 mr-1">
18
+
show since:
19
+
</label>
20
+
<select
21
+
id="show"
22
+
value={show}
23
+
onchange={(e) =>
24
+
onShowChange((e.target as HTMLSelectElement).value as ShowOption)}
25
+
class="bg-pink-50 dark:bg-pink-900 text-pink-900 dark:text-pink-100 border border-pink-200 dark:border-pink-700 rounded-full px-1 outline-none focus:border-pink-400 min-w-0"
26
+
>
27
+
{#each showOptions as option}
28
+
<option value={option}>{option}</option>
29
+
{/each}
30
+
</select>
31
+
</div>
+5
-10
client/src/lib/components/SortControls.svelte
+5
-10
client/src/lib/components/SortControls.svelte
···
17
17
</script>
18
18
19
19
<div
20
-
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 hover:bg-purple-200 border-purple-300"
20
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 dark:bg-purple-800 hover:bg-purple-200 dark:hover:bg-purple-700 border-purple-300 dark:border-purple-700"
21
21
>
22
-
<label for="sort-by" class="text-purple-800 mr-1"> sort by: </label>
22
+
<label for="sort-by" class="text-purple-800 dark:text-purple-300 mr-1">
23
+
sort by:
24
+
</label>
23
25
<select
24
26
id="sort-by"
25
27
value={sortBy}
26
28
onchange={(e) =>
27
29
onSortChange((e.target as HTMLSelectElement).value as SortOption)}
28
-
class="bg-purple-50 text-purple-900 border border-purple-200 rounded-full px-1 outline-none focus:bg-white focus:border-purple-400 min-w-0"
30
+
class="bg-purple-50 dark:bg-purple-900 text-purple-900 dark:text-purple-300 border border-purple-200 dark:border-purple-700 rounded-full px-1 outline-none focus:border-purple-400 min-w-0"
29
31
>
30
32
{#each sortOptions as option}
31
33
<option value={option.value}>{option.label}</option>
32
34
{/each}
33
35
</select>
34
36
</div>
35
-
36
-
<style lang="postcss">
37
-
@reference "../../app.css";
38
-
.wsbadge {
39
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
40
-
}
41
-
</style>
+17
-18
client/src/lib/components/StatsCard.svelte
+17
-18
client/src/lib/components/StatsCard.svelte
···
1
1
<script lang="ts">
2
2
import { formatNumber } from "$lib/format";
3
-
import NumberFlow from "@number-flow/svelte";
4
3
5
4
const colorClasses = {
6
5
green: {
7
-
bg: "from-green-50 to-green-100",
8
-
border: "border-green-200",
9
-
titleText: "text-green-700",
10
-
valueText: "text-green-900",
6
+
bg: "from-green-50 to-green-100 dark:from-green-900 dark:to-green-800",
7
+
border: "border-green-200 dark:border-green-800",
8
+
titleText: "text-green-700 dark:text-green-400",
9
+
valueText: "text-green-900 dark:text-green-200",
11
10
},
12
11
red: {
13
-
bg: "from-red-50 to-red-100",
14
-
border: "border-red-200",
15
-
titleText: "text-red-700",
16
-
valueText: "text-red-900",
12
+
bg: "from-red-50 to-red-100 dark:from-red-900 dark:to-red-800",
13
+
border: "border-red-200 dark:border-red-800",
14
+
titleText: "text-red-700 dark:text-red-400",
15
+
valueText: "text-red-900 dark:text-red-200",
17
16
},
18
17
turqoise: {
19
-
bg: "from-teal-50 to-teal-100",
20
-
border: "border-teal-200",
21
-
titleText: "text-teal-700",
22
-
valueText: "text-teal-900",
18
+
bg: "from-teal-50 to-teal-100 dark:from-teal-900 dark:to-teal-800",
19
+
border: "border-teal-200 dark:border-teal-800",
20
+
titleText: "text-teal-700 dark:text-teal-400",
21
+
valueText: "text-teal-900 dark:text-teal-200",
23
22
},
24
23
orange: {
25
-
bg: "from-orange-50 to-orange-100",
26
-
border: "border-orange-200",
27
-
titleText: "text-orange-700",
28
-
valueText: "text-orange-900",
24
+
bg: "from-orange-50 to-orange-100 dark:from-orange-900 dark:to-orange-800",
25
+
border: "border-orange-200 dark:border-orange-800",
26
+
titleText: "text-orange-700 dark:text-orange-400",
27
+
valueText: "text-orange-900 dark:text-orange-200",
29
28
},
30
29
};
31
30
···
45
44
{title}
46
45
</h3>
47
46
<p class="text-xl md:text-2xl font-bold {colors.valueText}">
48
-
<NumberFlow {value} />
47
+
{formatNumber(value)}
49
48
</p>
50
49
</div>
+18
-14
client/src/lib/components/StatusBadge.svelte
+18
-14
client/src/lib/components/StatusBadge.svelte
···
8
8
const statusConfig = {
9
9
connected: {
10
10
text: "stream live",
11
-
classes: "bg-green-100 text-green-800 border-green-200",
11
+
classes:
12
+
"bg-green-100 dark:bg-green-900 text-green-800 dark:text-green-200 border-green-200 dark:border-green-800",
12
13
},
13
14
connecting: {
14
15
text: "stream connecting",
15
-
classes: "bg-yellow-100 text-yellow-800 border-yellow-200",
16
+
classes:
17
+
"bg-yellow-100 dark:bg-yellow-900 text-yellow-800 dark:text-yellow-200 border-yellow-200 dark:border-yellow-800",
16
18
},
17
19
error: {
18
20
text: "stream errored",
19
-
classes: "bg-red-100 text-red-800 border-red-200",
21
+
classes:
22
+
"bg-red-100 dark:bg-red-900 text-red-800 dark:text-red-200 border-red-200 dark:border-red-800",
20
23
},
21
24
disconnected: {
22
25
text: "stream offline",
23
-
classes: "bg-gray-100 text-gray-800 border-gray-200",
26
+
classes:
27
+
"bg-gray-100 dark:bg-gray-900 text-gray-800 dark:text-gray-200 border-gray-200 dark:border-gray-800",
24
28
},
25
29
};
26
30
27
31
const config = $derived(statusConfig[status]);
28
32
</script>
29
33
30
-
<span class="wsbadge {config.classes}">
31
-
{config.text}
32
-
</span>
33
-
34
-
<style lang="postcss">
35
-
@reference "../../app.css";
36
-
.wsbadge {
37
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
38
-
}
39
-
</style>
34
+
<div class="flex flex-row items-center gap-2 wsbadge {config.classes}">
35
+
<!-- connecting spinner -->
36
+
{#if status === "connecting"}
37
+
<div
38
+
class="animate-spin rounded-full h-4 w-4 border-b-2 border-yellow-800 dark:border-yellow-200"
39
+
></div>
40
+
{/if}
41
+
<!-- status text -->
42
+
<span>{config.text}</span>
43
+
</div>
+5
-1
client/src/lib/format.ts
+5
-1
client/src/lib/format.ts
···
2
2
return num.toLocaleString();
3
3
};
4
4
5
+
const isValidDate = (d: Date) => d instanceof Date && !isNaN(d.getTime());
5
6
export const formatTimestamp = (timestamp: number): string => {
6
-
return new Date(timestamp / 1000).toLocaleString();
7
+
const date = new Date(timestamp * 1000);
8
+
return isValidDate(date)
9
+
? date.toLocaleString()
10
+
: new Date(timestamp / 1000).toLocaleString();
7
11
};
+1
client/src/lib/types.ts
+1
client/src/lib/types.ts
+3
-2
client/src/routes/+layout.ts
+3
-2
client/src/routes/+layout.ts
+7
client/src/routes/+page.server.ts
+7
client/src/routes/+page.server.ts
+109
-51
client/src/routes/+page.svelte
+109
-51
client/src/routes/+page.svelte
···
1
1
<script lang="ts">
2
2
import { dev } from "$app/environment";
3
-
import type { EventRecord, NsidCount, SortOption } from "$lib/types";
3
+
import type {
4
+
EventRecord,
5
+
Events,
6
+
NsidCount,
7
+
ShowOption,
8
+
Since,
9
+
SortOption,
10
+
} from "$lib/types";
4
11
import { onMount, onDestroy } from "svelte";
5
-
import { writable } from "svelte/store";
12
+
import { get, writable } from "svelte/store";
6
13
import { PUBLIC_API_URL } from "$env/static/public";
7
14
import { fetchEvents, fetchTrackingSince } from "$lib/api";
8
15
import { createRegexFilter } from "$lib/filter";
···
14
21
import BskyToggle from "$lib/components/BskyToggle.svelte";
15
22
import RefreshControl from "$lib/components/RefreshControl.svelte";
16
23
import { formatTimestamp } from "$lib/format";
24
+
import ShowControls from "$lib/components/ShowControls.svelte";
25
+
26
+
type Props = {
27
+
data: { events: Events; trackingSince: Since };
28
+
};
17
29
18
-
const events = writable(new Map<string, EventRecord>());
30
+
const { data }: Props = $props();
31
+
32
+
const events = writable(
33
+
new Map<string, EventRecord>(Object.entries(data.events.events)),
34
+
);
35
+
const eventsStart = new Map<string, EventRecord>(
36
+
Object.entries(data.events.events),
37
+
);
19
38
const pendingUpdates = new Map<string, EventRecord>();
20
-
let eventsList: NsidCount[] = $state([]);
39
+
21
40
let updateTimer: NodeJS.Timeout | null = null;
22
-
events.subscribe((value) => {
23
-
eventsList = value
24
-
.entries()
25
-
.map(([nsid, event]) => ({
26
-
nsid,
27
-
...event,
28
-
}))
29
-
.toArray();
30
-
});
31
-
let per_second = $state(0);
32
-
let tracking_since = $state(0);
41
+
let per_second = $state(data.events.per_second);
42
+
let tracking_since = $state(data.trackingSince.since);
33
43
44
+
const diffEvents = (
45
+
oldEvents: Map<string, EventRecord>,
46
+
newEvents: Map<string, EventRecord>,
47
+
): NsidCount[] => {
48
+
const nsidCounts: NsidCount[] = [];
49
+
for (const [nsid, event] of newEvents.entries()) {
50
+
const oldEvent = oldEvents.get(nsid);
51
+
if (oldEvent) {
52
+
const counts = {
53
+
nsid,
54
+
count: event.count - oldEvent.count,
55
+
deleted_count: event.deleted_count - oldEvent.deleted_count,
56
+
last_seen: event.last_seen,
57
+
};
58
+
if (counts.count > 0 || counts.deleted_count > 0)
59
+
nsidCounts.push(counts);
60
+
} else {
61
+
nsidCounts.push({
62
+
nsid,
63
+
...event,
64
+
});
65
+
}
66
+
}
67
+
return nsidCounts;
68
+
};
69
+
const applyEvents = (newEvents: Record<string, EventRecord>) => {
70
+
events.update((map) => {
71
+
for (const [nsid, event] of Object.entries(newEvents)) {
72
+
map.set(nsid, event);
73
+
}
74
+
return map;
75
+
});
76
+
};
77
+
78
+
let error: string | null = $state(null);
79
+
let filterRegex = $state("");
80
+
let dontShowBsky = $state(false);
81
+
let sortBy: SortOption = $state("total");
82
+
let refreshRate = $state("");
83
+
let changedByUser = $state(false);
84
+
let show: ShowOption = $state("server init");
85
+
let eventsList: NsidCount[] = $state([]);
86
+
let updateEventsList = $derived((value: Map<string, EventRecord>) => {
87
+
switch (show) {
88
+
case "server init":
89
+
eventsList = value
90
+
.entries()
91
+
.map(([nsid, event]) => ({
92
+
nsid,
93
+
...event,
94
+
}))
95
+
.toArray();
96
+
break;
97
+
case "stream start":
98
+
eventsList = diffEvents(eventsStart, value);
99
+
break;
100
+
}
101
+
});
102
+
events.subscribe((value) => updateEventsList(value));
34
103
let all: EventRecord = $derived(
35
104
eventsList.reduce(
36
105
(acc, event) => {
···
50
119
},
51
120
),
52
121
);
53
-
let error: string | null = $state(null);
54
-
let filterRegex = $state("");
55
-
let dontShowBsky = $state(false);
56
-
let sortBy: SortOption = $state("total");
57
-
let refreshRate = $state("");
58
-
let changedByUser = $state(false);
59
122
60
123
let websocket: WebSocket | null = null;
61
124
let isStreamOpen = $state(false);
···
76
139
};
77
140
websocket.onmessage = async (event) => {
78
141
const jsonData = JSON.parse(event.data);
79
-
80
-
if (jsonData.per_second > 0) {
81
-
per_second = jsonData.per_second;
82
-
}
83
-
84
-
// Store updates in pending map if refresh rate is set
142
+
per_second = jsonData.per_second;
85
143
if (refreshRate) {
86
144
for (const [nsid, event] of Object.entries(jsonData.events)) {
87
145
pendingUpdates.set(nsid, event as EventRecord);
88
146
}
89
147
} else {
90
-
// Apply updates immediately if no refresh rate
91
-
events.update((map) => {
92
-
for (const [nsid, event] of Object.entries(
93
-
jsonData.events,
94
-
)) {
95
-
map.set(nsid, event as EventRecord);
96
-
}
97
-
return map;
98
-
});
148
+
applyEvents(jsonData.events);
99
149
}
100
150
};
101
151
websocket.onerror = (error) => {
···
114
164
error = null;
115
165
const data = await fetchEvents();
116
166
per_second = data.per_second;
117
-
events.update((map) => {
118
-
for (const [nsid, event] of Object.entries(data.events)) {
119
-
map.set(nsid, event);
120
-
}
121
-
return map;
122
-
});
167
+
applyEvents(data.events);
123
168
tracking_since = (await fetchTrackingSince()).since;
124
169
} catch (err) {
125
170
error =
···
222
267
/>
223
268
</svelte:head>
224
269
225
-
<header class="border-gray-300 border-b mb-4 pb-2">
270
+
<header
271
+
class="bg-white dark:bg-gray-900 border-gray-300 dark:border-gray-950 border-b mb-4 pb-2"
272
+
>
226
273
<div
227
274
class="px-2 md:ml-[19vw] mx-auto flex flex-wrap items-center text-center"
228
275
>
229
-
<h1 class="text-4xl font-bold mr-4 text-gray-900">lexicon tracker</h1>
230
-
<p class="text-lg mt-1 text-gray-600">
276
+
<h1 class="text-4xl font-bold mr-4 text-gray-900 dark:text-gray-200">
277
+
lexicon tracker
278
+
</h1>
279
+
<p class="text-lg mt-1 text-gray-600 dark:text-gray-300">
231
280
tracks lexicons seen on the jetstream {tracking_since === 0
232
281
? ""
233
282
: `(since: ${formatTimestamp(tracking_since)})`}
234
283
</p>
235
284
</div>
236
285
</header>
237
-
<div class="md:max-w-[61vw] mx-auto p-2">
286
+
<div class="bg-white dark:bg-gray-900 md:max-w-[61vw] mx-auto p-2">
238
287
<div class="min-w-fit grid grid-cols-2 xl:grid-cols-4 gap-2 2xl:gap-6 mb-8">
239
288
<StatsCard
240
289
title="total creation"
···
260
309
261
310
{#if error}
262
311
<div
263
-
class="bg-red-100 border border-red-300 text-red-700 px-4 py-3 rounded-lg mb-6"
312
+
class="bg-red-100 dark:bg-red-900 border border-red-300 dark:border-red-700 text-red-700 dark:text-red-200 px-4 py-3 rounded-lg mb-6"
264
313
>
265
314
<p>Error: {error}</p>
266
315
</div>
···
269
318
{#if eventsList.length > 0}
270
319
<div class="mb-8">
271
320
<div class="flex flex-wrap items-center gap-3 mb-3">
272
-
<h2 class="text-2xl font-bold text-gray-900">seen lexicons</h2>
321
+
<h2 class="text-2xl font-bold text-gray-900 dark:text-gray-200">
322
+
seen lexicons
323
+
</h2>
273
324
<StatusBadge status={websocketStatus} />
274
325
</div>
275
326
<div class="flex flex-wrap items-center gap-1.5 mb-6">
···
291
342
refreshRate = "";
292
343
}}
293
344
/>
345
+
<ShowControls
346
+
{show}
347
+
onShowChange={(value: ShowOption) => {
348
+
show = value;
349
+
updateEventsList(get(events));
350
+
}}
351
+
/>
294
352
<RefreshControl
295
353
{refreshRate}
296
354
onRefreshChange={(value) => {
···
315
373
{/if}
316
374
</div>
317
375
318
-
<footer class="py-2 border-t border-gray-200 text-center">
319
-
<p class="text-gray-600 text-sm">
376
+
<footer class="py-2 border-t border-gray-200 dark:border-gray-800 text-center">
377
+
<p class="text-gray-600 dark:text-gray-200 text-sm">
320
378
source code <a
321
379
href="https://tangled.sh/@poor.dog/nsid-tracker"
322
380
target="_blank"
323
381
rel="noopener noreferrer"
324
-
class="text-blue-600 hover:text-blue-800 underline"
382
+
class="text-blue-600 dark:text-blue-400 hover:text-blue-800 dark:hover:text-blue-600 underline"
325
383
>@poor.dog/nsid-tracker</a
326
384
>
327
385
</p>
+1
-1
client/svelte.config.js
+1
-1
client/svelte.config.js
+1
-1
nix/client-modules.nix
+1
-1
nix/client-modules.nix
+10
-2
nix/client.nix
+10
-2
nix/client.nix
···
1
1
{
2
+
lib,
2
3
stdenv,
3
4
makeBinaryWrapper,
4
5
bun,
···
28
29
'';
29
30
buildPhase = ''
30
31
runHook preBuild
31
-
bun --prefer-offline run --bun build
32
+
bun --prefer-offline run build
32
33
runHook postBuild
33
34
'';
34
35
installPhase = ''
35
36
runHook preInstall
36
-
mkdir -p $out
37
+
38
+
mkdir -p $out/bin
37
39
cp -R ./build/* $out
40
+
cp -R ./node_modules $out
41
+
42
+
makeBinaryWrapper ${bun}/bin/bun $out/bin/website \
43
+
--prefix PATH : ${lib.makeBinPath [ bun ]} \
44
+
--add-flags "run --bun --no-install --cwd $out start"
45
+
38
46
runHook postInstall
39
47
'';
40
48
}
+42
-28
server/Cargo.lock
+42
-28
server/Cargo.lock
···
18
18
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
19
19
20
20
[[package]]
21
+
name = "ahash"
22
+
version = "0.8.12"
23
+
source = "registry+https://github.com/rust-lang/crates.io-index"
24
+
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
25
+
dependencies = [
26
+
"cfg-if",
27
+
"getrandom 0.3.3",
28
+
"once_cell",
29
+
"serde",
30
+
"version_check",
31
+
"zerocopy",
32
+
]
33
+
34
+
[[package]]
21
35
name = "aho-corasick"
22
36
version = "1.1.3"
23
37
source = "registry+https://github.com/rust-lang/crates.io-index"
···
46
60
version = "1.0.98"
47
61
source = "registry+https://github.com/rust-lang/crates.io-index"
48
62
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
63
+
64
+
[[package]]
65
+
name = "arc-swap"
66
+
version = "1.7.1"
67
+
source = "registry+https://github.com/rust-lang/crates.io-index"
68
+
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
49
69
50
70
[[package]]
51
71
name = "async-compression"
···
307
327
version = "0.2.1"
308
328
source = "registry+https://github.com/rust-lang/crates.io-index"
309
329
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
310
-
311
-
[[package]]
312
-
name = "cmake"
313
-
version = "0.1.54"
314
-
source = "registry+https://github.com/rust-lang/crates.io-index"
315
-
checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0"
316
-
dependencies = [
317
-
"cc",
318
-
]
319
330
320
331
[[package]]
321
332
name = "combine"
···
1552
1563
name = "server"
1553
1564
version = "0.1.0"
1554
1565
dependencies = [
1566
+
"ahash",
1555
1567
"anyhow",
1568
+
"arc-swap",
1556
1569
"async-trait",
1557
1570
"axum",
1558
1571
"axum-tws",
···
1571
1584
"serde",
1572
1585
"serde_json",
1573
1586
"smol_str",
1574
-
"snmalloc-rs",
1575
1587
"threadpool",
1576
1588
"tikv-jemallocator",
1577
1589
"tokio",
···
1643
1655
dependencies = [
1644
1656
"borsh",
1645
1657
"serde",
1646
-
]
1647
-
1648
-
[[package]]
1649
-
name = "snmalloc-rs"
1650
-
version = "0.3.8"
1651
-
source = "registry+https://github.com/rust-lang/crates.io-index"
1652
-
checksum = "eb317153089fdfa4d8a2eec059d40a5a23c3bde43995ea23b19121c3f621e74a"
1653
-
dependencies = [
1654
-
"snmalloc-sys",
1655
-
]
1656
-
1657
-
[[package]]
1658
-
name = "snmalloc-sys"
1659
-
version = "0.3.8"
1660
-
source = "registry+https://github.com/rust-lang/crates.io-index"
1661
-
checksum = "065fea53d32bb77bc36cca466cb191f2e5216ebfd0ed360b1d64889ee6e559ea"
1662
-
dependencies = [
1663
-
"cmake",
1664
1658
]
1665
1659
1666
1660
[[package]]
···
2406
2400
version = "0.8.15"
2407
2401
source = "registry+https://github.com/rust-lang/crates.io-index"
2408
2402
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
2403
+
2404
+
[[package]]
2405
+
name = "zerocopy"
2406
+
version = "0.8.26"
2407
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2408
+
checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f"
2409
+
dependencies = [
2410
+
"zerocopy-derive",
2411
+
]
2412
+
2413
+
[[package]]
2414
+
name = "zerocopy-derive"
2415
+
version = "0.8.26"
2416
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2417
+
checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181"
2418
+
dependencies = [
2419
+
"proc-macro2",
2420
+
"quote",
2421
+
"syn",
2422
+
]
2409
2423
2410
2424
[[package]]
2411
2425
name = "zeroize"
+2
-2
server/Cargo.toml
+2
-2
server/Cargo.toml
···
30
30
rayon = "1.10.0"
31
31
parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] }
32
32
rclite = "0.2.7"
33
+
arc-swap = "1.7.1"
34
+
ahash = { version = "0.8.12", features = ["serde"] }
33
35
34
-
[target.'cfg(target_env = "msvc")'.dependencies]
35
-
snmalloc-rs = "0.3.8"
36
36
37
37
[target.'cfg(not(target_env = "msvc"))'.dependencies]
38
38
tikv-jemallocator = "0.6"
+16
-18
server/src/api.rs
+16
-18
server/src/api.rs
···
1
1
use std::{
2
-
collections::HashMap,
3
2
fmt::Display,
4
3
net::SocketAddr,
5
4
ops::{Bound, Deref, RangeBounds},
6
5
time::Duration,
7
6
};
8
7
8
+
use ahash::AHashMap;
9
9
use anyhow::anyhow;
10
10
use axum::{
11
11
Json, Router,
···
117
117
#[derive(Serialize)]
118
118
struct Events {
119
119
per_second: usize,
120
-
events: HashMap<SmolStr, NsidCount>,
120
+
events: AHashMap<SmolStr, NsidCount>,
121
121
}
122
122
123
123
async fn events(db: State<Arc<Db>>) -> AppResult<Json<Events>> {
124
-
let mut events = HashMap::new();
124
+
let mut events = AHashMap::new();
125
125
for result in db.get_counts() {
126
126
let (nsid, counts) = result?;
127
127
events.insert(
···
176
176
) -> AppResult<Json<Vec<Hit>>> {
177
177
let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded);
178
178
let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded);
179
-
let maybe_hits = db
180
-
.get_hits(¶ms.nsid, HitsRange { from, to })
181
-
.take(MAX_HITS);
182
-
let mut hits = Vec::with_capacity(maybe_hits.size_hint().0);
183
179
184
-
for maybe_hit in maybe_hits {
185
-
let hit = maybe_hit?;
186
-
let hit_data = hit.deser()?;
187
-
188
-
hits.push(Hit {
189
-
timestamp: hit.timestamp,
190
-
deleted: hit_data.deleted,
191
-
});
192
-
}
180
+
db.get_hits(¶ms.nsid, HitsRange { from, to }, MAX_HITS)
181
+
.take(MAX_HITS)
182
+
.try_fold(Vec::with_capacity(MAX_HITS), |mut acc, hit| {
183
+
let hit = hit?;
184
+
let hit_data = hit.deser()?;
193
185
194
-
Ok(Json(hits))
186
+
acc.push(Hit {
187
+
timestamp: hit.timestamp,
188
+
deleted: hit_data.deleted,
189
+
});
190
+
Ok(acc)
191
+
})
192
+
.map(Json)
195
193
}
196
194
197
195
async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response {
···
200
198
(async move {
201
199
let mut listener = db.new_listener();
202
200
let mut data = Events {
203
-
events: HashMap::<SmolStr, NsidCount>::with_capacity(10),
201
+
events: AHashMap::<SmolStr, NsidCount>::with_capacity(10),
204
202
per_second: 0,
205
203
};
206
204
let mut updates = 0;
+54
-26
server/src/db/handle.rs
+54
-26
server/src/db/handle.rs
···
1
1
use std::{
2
2
fmt::Debug,
3
3
io::Cursor,
4
-
ops::{Bound, Deref, RangeBounds},
4
+
ops::{Bound, RangeBounds},
5
5
sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
6
6
time::Duration,
7
7
};
8
8
9
9
use byteview::ByteView;
10
-
use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice};
10
+
use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice, Snapshot};
11
11
use itertools::Itertools;
12
12
use parking_lot::Mutex;
13
13
use rayon::iter::{IntoParallelIterator, ParallelIterator};
···
16
16
17
17
use crate::{
18
18
db::{EventRecord, NsidHit, block},
19
-
error::AppResult,
20
-
utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded},
19
+
error::{AppError, AppResult},
20
+
utils::{
21
+
ArcRefCnt, ArcliteSwap, CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt,
22
+
varints_unsigned_encoded,
23
+
},
21
24
};
22
25
23
26
pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
···
31
34
}
32
35
33
36
pub struct LexiconHandle {
34
-
tree: Partition,
37
+
write_tree: Partition,
38
+
read_tree: ArcliteSwap<Snapshot>,
35
39
nsid: SmolStr,
36
40
buf: Arc<Mutex<Vec<EventRecord>>>,
37
41
last_insert: AtomicU64, // relaxed
···
46
50
}
47
51
}
48
52
49
-
impl Deref for LexiconHandle {
50
-
type Target = Partition;
51
-
52
-
fn deref(&self) -> &Self::Target {
53
-
&self.tree
54
-
}
55
-
}
56
-
57
53
impl LexiconHandle {
58
54
pub fn new(keyspace: &Keyspace, nsid: &str) -> Self {
59
55
let opts = PartitionCreateOptions::default()
60
-
.block_size(1024 * 128)
56
+
.block_size(1024 * 48)
61
57
.compression(fjall::CompressionType::Miniz(9));
58
+
let write_tree = keyspace.open_partition(nsid, opts).unwrap();
59
+
let read_tree = ArcliteSwap::new(ArcRefCnt::new(write_tree.snapshot()));
62
60
Self {
63
-
tree: keyspace.open_partition(nsid, opts).unwrap(),
61
+
write_tree,
62
+
read_tree,
64
63
nsid: nsid.into(),
65
64
buf: Default::default(),
66
65
last_insert: AtomicU64::new(0),
···
68
67
}
69
68
}
70
69
70
+
#[inline(always)]
71
+
pub fn read(&self) -> arc_swap::Guard<ArcRefCnt<Snapshot>> {
72
+
self.read_tree.load()
73
+
}
74
+
75
+
#[inline(always)]
76
+
pub fn update_tree(&self) {
77
+
self.read_tree
78
+
.store(ArcRefCnt::new(self.write_tree.snapshot()));
79
+
}
80
+
81
+
#[inline(always)]
82
+
pub fn span(&self) -> tracing::Span {
83
+
tracing::info_span!("handle", nsid = %self.nsid)
84
+
}
85
+
86
+
#[inline(always)]
71
87
pub fn nsid(&self) -> &SmolStr {
72
88
&self.nsid
73
89
}
74
90
91
+
#[inline(always)]
75
92
pub fn item_count(&self) -> usize {
76
93
self.buf.lock().len()
77
94
}
78
95
79
-
pub fn since_last_activity(&self) -> u64 {
80
-
CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw())
96
+
pub fn since_last_activity(&self) -> Duration {
97
+
Duration::from_nanos(
98
+
CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()),
99
+
)
81
100
}
82
101
83
102
pub fn suggested_block_size(&self) -> usize {
···
99
118
range: impl RangeBounds<u64>,
100
119
sort: bool,
101
120
) -> AppResult<()> {
121
+
let _span = self.span().entered();
122
+
102
123
let start_limit = match range.start_bound().cloned() {
103
124
Bound::Included(start) => start,
104
125
Bound::Excluded(start) => start.saturating_add(1),
···
114
135
let end_key = varints_unsigned_encoded([end_limit]);
115
136
116
137
let blocks_to_compact = self
117
-
.tree
138
+
.read()
118
139
.range(start_key..end_key)
119
140
.collect::<Result<Vec<_>, _>>()?;
120
141
if blocks_to_compact.len() < 2 {
121
-
tracing::info!("{}: nothing to compact", self.nsid);
122
142
return Ok(());
123
143
}
124
144
···
155
175
let end_blocks_size = new_blocks.len();
156
176
157
177
for key in keys_to_delete {
158
-
self.tree.remove(key.clone())?;
178
+
self.write_tree.remove(key.clone())?;
159
179
}
160
180
for block in new_blocks {
161
-
self.tree.insert(block.key, block.data)?;
181
+
self.write_tree.insert(block.key, block.data)?;
162
182
}
163
183
184
+
let reduction =
185
+
((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0;
164
186
tracing::info!(
165
-
"{}: compacted {} blocks to {} blocks ({}% reduction)",
166
-
self.nsid,
167
-
start_blocks_size,
168
-
end_blocks_size,
169
-
((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0,
187
+
{
188
+
start = start_blocks_size,
189
+
end = end_blocks_size,
190
+
},
191
+
"blocks compacted {reduction:.2}%",
170
192
);
171
193
172
194
Ok(())
195
+
}
196
+
197
+
pub fn insert_block(&self, block: Block) -> AppResult<()> {
198
+
self.write_tree
199
+
.insert(block.key, block.data)
200
+
.map_err(AppError::from)
173
201
}
174
202
175
203
pub fn encode_block_from_items(
+139
-82
server/src/db/mod.rs
+139
-82
server/src/db/mod.rs
···
1
1
use std::{
2
-
collections::HashMap,
3
2
fmt::Debug,
4
3
io::Cursor,
5
4
ops::{Bound, Deref, RangeBounds},
6
-
path::{Path, PathBuf},
5
+
path::Path,
7
6
time::Duration,
7
+
u64,
8
8
};
9
9
10
+
use ahash::{AHashMap, AHashSet};
10
11
use byteview::StrView;
11
-
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
12
+
use fjall::{Keyspace, Partition, PartitionCreateOptions};
12
13
use itertools::{Either, Itertools};
13
-
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
14
+
use rayon::iter::{IntoParallelIterator, ParallelIterator};
14
15
use rclite::Arc;
15
16
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
16
17
use smol_str::{SmolStr, ToSmolStr};
···
21
22
db::handle::{ItemDecoder, LexiconHandle},
22
23
error::{AppError, AppResult},
23
24
jetstream::JetstreamEvent,
24
-
utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded},
25
+
utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded},
25
26
};
26
27
27
28
mod block;
···
71
72
}
72
73
73
74
pub struct DbInfo {
74
-
pub nsids: HashMap<SmolStr, Vec<usize>>,
75
+
pub nsids: AHashMap<SmolStr, Vec<usize>>,
75
76
pub disk_size: u64,
76
77
}
77
78
···
79
80
pub ks_config: fjall::Config,
80
81
pub min_block_size: usize,
81
82
pub max_block_size: usize,
82
-
pub max_last_activity: u64,
83
+
pub max_last_activity: Duration,
83
84
}
84
85
85
86
impl DbConfig {
···
97
98
impl Default for DbConfig {
98
99
fn default() -> Self {
99
100
Self {
100
-
ks_config: fjall::Config::default(),
101
-
min_block_size: 512,
102
-
max_block_size: 500_000,
103
-
max_last_activity: Duration::from_secs(10).as_nanos() as u64,
101
+
ks_config: fjall::Config::default()
102
+
.cache_size(1024 * 1024 * 512)
103
+
.max_write_buffer_size(u64::MAX),
104
+
min_block_size: 1000,
105
+
max_block_size: 250_000,
106
+
max_last_activity: Duration::from_secs(10),
104
107
}
105
108
}
106
109
}
···
111
114
pub cfg: DbConfig,
112
115
pub ks: Keyspace,
113
116
counts: Partition,
114
-
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
117
+
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>, ahash::RandomState>,
115
118
sync_pool: threadpool::ThreadPool,
116
119
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
117
-
eps: RateTracker<100>,
120
+
eps: RateTracker<100>, // 100 millis buckets
118
121
cancel_token: CancellationToken,
119
122
}
120
123
···
160
163
}
161
164
162
165
pub fn sync(&self, all: bool) -> AppResult<()> {
166
+
let start = CLOCK.now();
163
167
// prepare all the data
164
-
let mut data = Vec::with_capacity(self.hits.len());
168
+
let nsids_len = self.hits.len();
169
+
let mut data = Vec::with_capacity(nsids_len);
170
+
let mut nsids = AHashSet::with_capacity(nsids_len);
165
171
let _guard = scc::ebr::Guard::new();
166
-
for (_, handle) in self.hits.iter(&_guard) {
172
+
for (nsid, handle) in self.hits.iter(&_guard) {
167
173
let mut nsid_data = Vec::with_capacity(2);
168
-
let mut total_count = 0;
174
+
// let mut total_count = 0;
169
175
let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity;
170
176
// if we disconnect for a long time, we want to sync all of what we
171
177
// have to avoid having many small blocks (even if we run compaction
···
180
186
let count = handle.item_count();
181
187
let data_count = count / block_size;
182
188
if count > 0 && (all || data_count > 0 || is_too_old) {
183
-
for i in 0..data_count {
184
-
nsid_data.push((i, handle.clone(), block_size));
185
-
total_count += block_size;
189
+
for _ in 0..data_count {
190
+
nsid_data.push((handle.clone(), block_size));
191
+
// total_count += block_size;
186
192
}
187
193
// only sync remainder if we haven't met block size
188
194
let remainder = count % block_size;
189
195
if (all || data_count == 0) && remainder > 0 {
190
-
nsid_data.push((data_count, handle.clone(), remainder));
191
-
total_count += remainder;
196
+
nsid_data.push((handle.clone(), remainder));
197
+
// total_count += remainder;
192
198
}
193
199
}
194
-
tracing::info!(
195
-
"{}: will sync {} blocks ({} count)",
196
-
handle.nsid(),
197
-
nsid_data.len(),
198
-
total_count,
199
-
);
200
-
data.push(nsid_data);
200
+
let _span = handle.span().entered();
201
+
if nsid_data.len() > 0 {
202
+
// tracing::info!(
203
+
// {blocks = %nsid_data.len(), count = %total_count},
204
+
// "will encode & sync",
205
+
// );
206
+
nsids.insert(nsid.clone());
207
+
data.push(nsid_data);
208
+
}
201
209
}
202
210
drop(_guard);
203
211
···
206
214
.map(|chunk| {
207
215
chunk
208
216
.into_iter()
209
-
.map(|(i, handle, max_block_size)| {
210
-
(i, handle.take_block_items(max_block_size), handle)
217
+
.map(|(handle, max_block_size)| {
218
+
(handle.take_block_items(max_block_size), handle)
211
219
})
212
220
.collect::<Vec<_>>()
213
221
.into_par_iter()
214
-
.map(|(i, items, handle)| {
222
+
.map(|(items, handle)| {
215
223
let count = items.len();
216
224
let block = LexiconHandle::encode_block_from_items(items, count)?;
217
-
tracing::info!(
218
-
"{}: encoded block with {} items",
219
-
handle.nsid(),
220
-
block.written,
221
-
);
222
-
AppResult::Ok((i, block, handle))
225
+
AppResult::Ok((block, handle))
223
226
})
224
227
.collect::<Result<Vec<_>, _>>()
225
228
})
226
229
.try_for_each(|chunk| {
227
230
let chunk = chunk?;
228
-
for (i, block, handle) in chunk {
229
-
self.sync_pool
230
-
.execute(move || match handle.insert(block.key, block.data) {
231
+
for (block, handle) in chunk {
232
+
self.sync_pool.execute(move || {
233
+
let _span = handle.span().entered();
234
+
let written = block.written;
235
+
match handle.insert_block(block) {
231
236
Ok(_) => {
232
-
tracing::info!("{}: [{i}] synced {}", block.written, handle.nsid())
237
+
tracing::info!({count = %written}, "synced")
233
238
}
234
-
Err(err) => tracing::error!("failed to sync block: {}", err),
235
-
});
239
+
Err(err) => tracing::error!({ err = %err }, "failed to sync block"),
240
+
}
241
+
});
236
242
}
237
243
AppResult::Ok(())
238
244
})?;
239
245
self.sync_pool.join();
240
246
247
+
// update snapshots for all (changed) handles
248
+
for nsid in nsids {
249
+
self.hits.peek_with(&nsid, |_, handle| handle.update_tree());
250
+
}
251
+
252
+
tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks");
253
+
241
254
Ok(())
242
255
}
243
256
···
251
264
let Some(handle) = self.get_handle(nsid) else {
252
265
return Ok(());
253
266
};
254
-
handle.compact(max_count, range, sort)
267
+
handle.compact(max_count, range, sort)?;
268
+
handle.update_tree();
269
+
Ok(())
255
270
}
256
271
257
272
pub fn compact_all(
···
268
283
269
284
pub fn major_compact(&self) -> AppResult<()> {
270
285
self.compact_all(self.cfg.max_block_size, .., true)?;
271
-
let _guard = scc::ebr::Guard::new();
272
-
for (_, handle) in self.hits.iter(&_guard) {
273
-
handle.deref().major_compact()?;
274
-
}
275
286
Ok(())
276
287
}
277
288
···
301
312
}
302
313
303
314
pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
315
+
let mut seen_events = 0;
304
316
for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() {
305
317
let mut counts = self.get_count(&key)?;
306
-
let mut count = 0;
307
318
self.ensure_handle(&key).queue(chunk.inspect(|e| {
308
319
// increment count
309
320
counts.last_seen = e.timestamp;
···
312
323
} else {
313
324
counts.count += 1;
314
325
}
315
-
count += 1;
326
+
seen_events += 1;
316
327
}));
317
-
self.eps.observe(count);
318
328
self.insert_count(&key, &counts)?;
319
329
if self.event_broadcaster.receiver_count() > 0 {
320
330
let _ = self.event_broadcaster.send((key, counts));
321
331
}
322
332
}
333
+
self.eps.observe(seen_events);
323
334
Ok(())
324
335
}
325
336
···
359
370
}
360
371
361
372
pub fn info(&self) -> AppResult<DbInfo> {
362
-
let mut nsids = HashMap::new();
373
+
let mut nsids = AHashMap::new();
363
374
for nsid in self.get_nsids() {
364
375
let Some(handle) = self.get_handle(&nsid) else {
365
376
continue;
366
377
};
367
-
let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| {
368
-
let (key, value) = item?;
369
-
let mut timestamps = Cursor::new(key);
370
-
let start_timestamp = timestamps.read_varint()?;
371
-
let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?;
372
-
acc.push(decoder.item_count());
373
-
AppResult::Ok(acc)
374
-
})?;
378
+
let block_lens = handle
379
+
.read()
380
+
.iter()
381
+
.rev()
382
+
.try_fold(Vec::new(), |mut acc, item| {
383
+
let (key, value) = item?;
384
+
let mut timestamps = Cursor::new(key);
385
+
let start_timestamp = timestamps.read_varint()?;
386
+
let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?;
387
+
acc.push(decoder.item_count());
388
+
AppResult::Ok(acc)
389
+
})?;
375
390
nsids.insert(nsid.to_smolstr(), block_lens);
376
391
}
377
392
Ok(DbInfo {
···
384
399
&self,
385
400
nsid: &str,
386
401
range: impl RangeBounds<u64> + std::fmt::Debug,
402
+
max_items: usize,
387
403
) -> impl Iterator<Item = AppResult<handle::Item>> {
388
404
let start_limit = match range.start_bound().cloned() {
389
405
Bound::Included(start) => start,
···
401
417
return Either::Right(std::iter::empty());
402
418
};
403
419
404
-
let map_block = move |(key, val)| {
420
+
// let mut ts = CLOCK.now();
421
+
let map_block = move |(res, current_item_count)| -> AppResult<(Option<_>, usize)> {
422
+
if current_item_count >= max_items {
423
+
return Ok((None, current_item_count));
424
+
}
425
+
let (key, val) = res?;
405
426
let mut key_reader = Cursor::new(key);
406
427
let start_timestamp = key_reader.read_varint::<u64>()?;
428
+
// let end_timestamp = key_reader.read_varint::<u64>()?;
407
429
if start_timestamp < start_limit {
408
-
return Ok(None);
430
+
// tracing::info!(
431
+
// "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater"
432
+
// );
433
+
return Ok((None, current_item_count));
409
434
}
410
-
let items = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?
411
-
.take_while(move |item| {
412
-
item.as_ref().map_or(true, |item| {
413
-
item.timestamp <= end_limit && item.timestamp >= start_limit
414
-
})
415
-
})
416
-
.map(|res| res.map_err(AppError::from));
417
-
Ok(Some(items))
435
+
let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?;
436
+
let current_item_count = current_item_count + decoder.item_count();
437
+
// tracing::info!(
438
+
// "took {}ns to get block with size {}",
439
+
// ts.elapsed().as_nanos(),
440
+
// decoder.item_count()
441
+
// );
442
+
// ts = CLOCK.now();
443
+
Ok((
444
+
Some(
445
+
decoder
446
+
.take_while(move |item| {
447
+
item.as_ref().map_or(true, |item| {
448
+
item.timestamp <= end_limit && item.timestamp >= start_limit
449
+
})
450
+
})
451
+
.map(|res| res.map_err(AppError::from)),
452
+
),
453
+
current_item_count,
454
+
))
418
455
};
419
456
420
-
Either::Left(
421
-
handle
422
-
.range(..end_key)
423
-
.rev()
424
-
.map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose())
425
-
.collect::<Vec<_>>()
426
-
.into_iter()
427
-
.rev()
428
-
.flatten()
429
-
.flatten(),
430
-
)
457
+
let (blocks, _counted) = handle
458
+
.read()
459
+
.range(..end_key)
460
+
.map(|res| res.map_err(AppError::from))
461
+
.rev()
462
+
.fold_while(
463
+
(Vec::with_capacity(20), 0),
464
+
|(mut blocks, current_item_count), res| {
465
+
use itertools::FoldWhile::*;
466
+
467
+
match map_block((res, current_item_count)) {
468
+
Ok((Some(block), current_item_count)) => {
469
+
blocks.push(Ok(block));
470
+
Continue((blocks, current_item_count))
471
+
}
472
+
Ok((None, current_item_count)) => Done((blocks, current_item_count)),
473
+
Err(err) => {
474
+
blocks.push(Err(err));
475
+
Done((blocks, current_item_count))
476
+
}
477
+
}
478
+
},
479
+
)
480
+
.into_inner();
481
+
482
+
// tracing::info!(
483
+
// "got blocks with size {}, item count {counted}",
484
+
// blocks.len()
485
+
// );
486
+
487
+
Either::Left(blocks.into_iter().rev().flatten().flatten())
431
488
}
432
489
433
490
pub fn tracking_since(&self) -> AppResult<u64> {
···
436
493
let Some(handle) = self.get_handle("app.bsky.feed.like") else {
437
494
return Ok(0);
438
495
};
439
-
let Some((timestamps_raw, _)) = handle.first_key_value()? else {
496
+
let Some((timestamps_raw, _)) = handle.read().first_key_value()? else {
440
497
return Ok(0);
441
498
};
442
499
let mut timestamp_reader = Cursor::new(timestamps_raw);
-501
server/src/db_old/block.rs
-501
server/src/db_old/block.rs
···
1
-
use ordered_varint::Variable;
2
-
use rkyv::{
3
-
Archive, Deserialize, Serialize,
4
-
api::high::{HighSerializer, HighValidator},
5
-
bytecheck::CheckBytes,
6
-
de::Pool,
7
-
rancor::{self, Strategy},
8
-
ser::allocator::ArenaHandle,
9
-
util::AlignedVec,
10
-
};
11
-
use std::{
12
-
io::{self, Read, Write},
13
-
marker::PhantomData,
14
-
};
15
-
16
-
use crate::error::{AppError, AppResult};
17
-
18
-
pub struct Item<T> {
19
-
pub timestamp: u64,
20
-
data: AlignedVec,
21
-
phantom: PhantomData<T>,
22
-
}
23
-
24
-
impl<T: Archive> Item<T> {
25
-
pub fn access(&self) -> &T::Archived {
26
-
unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) }
27
-
}
28
-
}
29
-
30
-
impl<T> Item<T>
31
-
where
32
-
T: Archive,
33
-
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
34
-
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
35
-
{
36
-
pub fn deser(&self) -> AppResult<T> {
37
-
rkyv::from_bytes(&self.data).map_err(AppError::from)
38
-
}
39
-
}
40
-
41
-
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
42
-
pub fn new(timestamp: u64, data: &T) -> Self {
43
-
Item {
44
-
timestamp,
45
-
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
46
-
phantom: PhantomData,
47
-
}
48
-
}
49
-
}
50
-
51
-
pub struct ItemEncoder<W: Write, T> {
52
-
writer: W,
53
-
prev_timestamp: u64,
54
-
prev_delta: i64,
55
-
_item: PhantomData<T>,
56
-
}
57
-
58
-
impl<W: Write, T> ItemEncoder<W, T> {
59
-
pub fn new(writer: W) -> Self {
60
-
ItemEncoder {
61
-
writer,
62
-
prev_timestamp: 0,
63
-
prev_delta: 0,
64
-
_item: PhantomData,
65
-
}
66
-
}
67
-
68
-
pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> {
69
-
if self.prev_timestamp == 0 {
70
-
// self.writer.write_varint(item.timestamp)?;
71
-
self.prev_timestamp = item.timestamp;
72
-
self.write_data(&item.data)?;
73
-
return Ok(());
74
-
}
75
-
76
-
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
77
-
78
-
self.writer.write_varint(delta - self.prev_delta)?;
79
-
self.prev_timestamp = item.timestamp;
80
-
self.prev_delta = delta;
81
-
82
-
self.write_data(&item.data)?;
83
-
84
-
Ok(())
85
-
}
86
-
87
-
fn write_data(&mut self, data: &[u8]) -> AppResult<()> {
88
-
self.writer.write_varint(data.len())?;
89
-
self.writer.write_all(data)?;
90
-
Ok(())
91
-
}
92
-
93
-
pub fn finish(mut self) -> AppResult<W> {
94
-
self.writer.flush()?;
95
-
Ok(self.writer)
96
-
}
97
-
}
98
-
99
-
pub struct ItemDecoder<R, T> {
100
-
reader: R,
101
-
current_timestamp: u64,
102
-
current_delta: i64,
103
-
first_item: bool,
104
-
_item: PhantomData<T>,
105
-
}
106
-
107
-
impl<R: Read, T: Archive> ItemDecoder<R, T> {
108
-
pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> {
109
-
Ok(ItemDecoder {
110
-
reader,
111
-
current_timestamp: start_timestamp,
112
-
current_delta: 0,
113
-
first_item: true,
114
-
_item: PhantomData,
115
-
})
116
-
}
117
-
118
-
pub fn decode(&mut self) -> AppResult<Option<Item<T>>> {
119
-
if self.first_item {
120
-
// read the first timestamp
121
-
// let timestamp = match self.reader.read_varint::<u64>() {
122
-
// Ok(timestamp) => timestamp,
123
-
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
124
-
// Err(e) => return Err(e.into()),
125
-
// };
126
-
// self.current_timestamp = timestamp;
127
-
128
-
let Some(data_raw) = self.read_item()? else {
129
-
return Ok(None);
130
-
};
131
-
self.first_item = false;
132
-
return Ok(Some(Item {
133
-
timestamp: self.current_timestamp,
134
-
data: data_raw,
135
-
phantom: PhantomData,
136
-
}));
137
-
}
138
-
139
-
let Some(_delta) = self.read_timestamp()? else {
140
-
return Ok(None);
141
-
};
142
-
143
-
// read data
144
-
let data_raw = match self.read_item()? {
145
-
Some(data_raw) => data_raw,
146
-
None => {
147
-
return Err(io::Error::new(
148
-
io::ErrorKind::UnexpectedEof,
149
-
"expected data after delta",
150
-
)
151
-
.into());
152
-
}
153
-
};
154
-
155
-
Ok(Some(Item {
156
-
timestamp: self.current_timestamp,
157
-
data: data_raw,
158
-
phantom: PhantomData,
159
-
}))
160
-
}
161
-
162
-
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
163
-
fn read_timestamp(&mut self) -> AppResult<Option<u64>> {
164
-
let delta = match self.reader.read_varint::<i64>() {
165
-
Ok(delta) => delta,
166
-
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
167
-
Err(e) => return Err(e.into()),
168
-
};
169
-
self.current_delta += delta;
170
-
self.current_timestamp =
171
-
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
172
-
Ok(Some(self.current_timestamp))
173
-
}
174
-
175
-
fn read_item(&mut self) -> AppResult<Option<AlignedVec>> {
176
-
let data_len = match self.reader.read_varint::<usize>() {
177
-
Ok(data_len) => data_len,
178
-
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
179
-
Err(e) => return Err(e.into()),
180
-
};
181
-
let mut data_raw = AlignedVec::with_capacity(data_len);
182
-
for _ in 0..data_len {
183
-
data_raw.push(0);
184
-
}
185
-
self.reader.read_exact(data_raw.as_mut_slice())?;
186
-
Ok(Some(data_raw))
187
-
}
188
-
}
189
-
190
-
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
191
-
type Item = AppResult<Item<T>>;
192
-
193
-
fn next(&mut self) -> Option<Self::Item> {
194
-
self.decode().transpose()
195
-
}
196
-
}
197
-
198
-
pub trait WriteVariableExt: Write {
199
-
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
200
-
value.encode_variable(self)
201
-
}
202
-
}
203
-
impl<W: Write> WriteVariableExt for W {}
204
-
205
-
pub trait ReadVariableExt: Read {
206
-
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
207
-
T::decode_variable(self)
208
-
}
209
-
}
210
-
impl<R: Read> ReadVariableExt for R {}
211
-
212
-
#[cfg(test)]
213
-
mod test {
214
-
use super::*;
215
-
use rkyv::{Archive, Deserialize, Serialize};
216
-
use std::io::Cursor;
217
-
218
-
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
219
-
#[rkyv(compare(PartialEq))]
220
-
struct TestData {
221
-
id: u32,
222
-
value: String,
223
-
}
224
-
225
-
#[test]
226
-
fn test_encoder_decoder_single_item() {
227
-
let data = TestData {
228
-
id: 123,
229
-
value: "test".to_string(),
230
-
};
231
-
232
-
let item = Item::new(1000, &data);
233
-
234
-
// encode
235
-
let mut buffer = Vec::new();
236
-
let mut encoder = ItemEncoder::new(&mut buffer);
237
-
encoder.encode(&item).unwrap();
238
-
encoder.finish().unwrap();
239
-
240
-
// decode
241
-
let cursor = Cursor::new(buffer);
242
-
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
243
-
244
-
let decoded_item = decoder.decode().unwrap().unwrap();
245
-
assert_eq!(decoded_item.timestamp, 1000);
246
-
247
-
let decoded_data = decoded_item.access();
248
-
assert_eq!(decoded_data.id, 123);
249
-
assert_eq!(decoded_data.value.as_str(), "test");
250
-
}
251
-
252
-
#[test]
253
-
fn test_encoder_decoder_multiple_items() {
254
-
let items = vec![
255
-
Item::new(
256
-
1000,
257
-
&TestData {
258
-
id: 1,
259
-
value: "first".to_string(),
260
-
},
261
-
),
262
-
Item::new(
263
-
1010,
264
-
&TestData {
265
-
id: 2,
266
-
value: "second".to_string(),
267
-
},
268
-
),
269
-
Item::new(
270
-
1015,
271
-
&TestData {
272
-
id: 3,
273
-
value: "third".to_string(),
274
-
},
275
-
),
276
-
Item::new(
277
-
1025,
278
-
&TestData {
279
-
id: 4,
280
-
value: "fourth".to_string(),
281
-
},
282
-
),
283
-
];
284
-
285
-
// encode
286
-
let mut buffer = Vec::new();
287
-
let mut encoder = ItemEncoder::new(&mut buffer);
288
-
289
-
for item in &items {
290
-
encoder.encode(item).unwrap();
291
-
}
292
-
encoder.finish().unwrap();
293
-
294
-
// decode
295
-
let cursor = Cursor::new(buffer);
296
-
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
297
-
298
-
let mut decoded_items = Vec::new();
299
-
while let Some(item) = decoder.decode().unwrap() {
300
-
decoded_items.push(item);
301
-
}
302
-
303
-
assert_eq!(decoded_items.len(), 4);
304
-
305
-
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
306
-
assert_eq!(original.timestamp, decoded.timestamp);
307
-
assert_eq!(original.access().id, decoded.access().id);
308
-
assert_eq!(
309
-
original.access().value.as_str(),
310
-
decoded.access().value.as_str()
311
-
);
312
-
}
313
-
}
314
-
315
-
#[test]
316
-
fn test_encoder_decoder_with_iterator() {
317
-
let items = vec![
318
-
Item::new(
319
-
2000,
320
-
&TestData {
321
-
id: 10,
322
-
value: "a".to_string(),
323
-
},
324
-
),
325
-
Item::new(
326
-
2005,
327
-
&TestData {
328
-
id: 20,
329
-
value: "b".to_string(),
330
-
},
331
-
),
332
-
Item::new(
333
-
2012,
334
-
&TestData {
335
-
id: 30,
336
-
value: "c".to_string(),
337
-
},
338
-
),
339
-
];
340
-
341
-
// encode
342
-
let mut buffer = Vec::new();
343
-
let mut encoder = ItemEncoder::new(&mut buffer);
344
-
345
-
for item in &items {
346
-
encoder.encode(item).unwrap();
347
-
}
348
-
encoder.finish().unwrap();
349
-
350
-
// decode
351
-
let cursor = Cursor::new(buffer);
352
-
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
353
-
354
-
let decoded_items: Result<Vec<_>, _> = decoder.collect();
355
-
let decoded_items = decoded_items.unwrap();
356
-
357
-
assert_eq!(decoded_items.len(), 3);
358
-
assert_eq!(decoded_items[0].timestamp, 2000);
359
-
assert_eq!(decoded_items[1].timestamp, 2005);
360
-
assert_eq!(decoded_items[2].timestamp, 2012);
361
-
362
-
assert_eq!(decoded_items[0].access().id, 10);
363
-
assert_eq!(decoded_items[1].access().id, 20);
364
-
assert_eq!(decoded_items[2].access().id, 30);
365
-
}
366
-
367
-
#[test]
368
-
fn test_delta_compression() {
369
-
let items = vec![
370
-
Item::new(
371
-
1000,
372
-
&TestData {
373
-
id: 1,
374
-
value: "a".to_string(),
375
-
},
376
-
),
377
-
Item::new(
378
-
1010,
379
-
&TestData {
380
-
id: 2,
381
-
value: "b".to_string(),
382
-
},
383
-
), // delta = 10
384
-
Item::new(
385
-
1020,
386
-
&TestData {
387
-
id: 3,
388
-
value: "c".to_string(),
389
-
},
390
-
), // delta = 10, delta-of-delta = 0
391
-
Item::new(
392
-
1025,
393
-
&TestData {
394
-
id: 4,
395
-
value: "d".to_string(),
396
-
},
397
-
), // delta = 5, delta-of-delta = -5
398
-
];
399
-
400
-
let mut buffer = Vec::new();
401
-
let mut encoder = ItemEncoder::new(&mut buffer);
402
-
403
-
for item in &items {
404
-
encoder.encode(item).unwrap();
405
-
}
406
-
encoder.finish().unwrap();
407
-
408
-
// decode and verify
409
-
let cursor = Cursor::new(buffer);
410
-
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
411
-
412
-
let decoded_items: Result<Vec<_>, _> = decoder.collect();
413
-
let decoded_items = decoded_items.unwrap();
414
-
415
-
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
416
-
assert_eq!(original.timestamp, decoded.timestamp);
417
-
assert_eq!(original.access().id, decoded.access().id);
418
-
}
419
-
}
420
-
421
-
#[test]
422
-
fn test_empty_decode() {
423
-
let buffer = Vec::new();
424
-
let cursor = Cursor::new(buffer);
425
-
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
426
-
427
-
let result = decoder.decode().unwrap();
428
-
assert!(result.is_none());
429
-
}
430
-
431
-
#[test]
432
-
fn test_backwards_timestamp() {
433
-
let items = vec![
434
-
Item::new(
435
-
1000,
436
-
&TestData {
437
-
id: 1,
438
-
value: "first".to_string(),
439
-
},
440
-
),
441
-
Item::new(
442
-
900,
443
-
&TestData {
444
-
id: 2,
445
-
value: "second".to_string(),
446
-
},
447
-
),
448
-
];
449
-
450
-
let mut buffer = Vec::new();
451
-
let mut encoder = ItemEncoder::new(&mut buffer);
452
-
453
-
for item in &items {
454
-
encoder.encode(item).unwrap();
455
-
}
456
-
encoder.finish().unwrap();
457
-
458
-
let cursor = Cursor::new(buffer);
459
-
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
460
-
461
-
let decoded_items: Result<Vec<_>, _> = decoder.collect();
462
-
let decoded_items = decoded_items.unwrap();
463
-
464
-
assert_eq!(decoded_items.len(), 2);
465
-
assert_eq!(decoded_items[0].timestamp, 1000);
466
-
assert_eq!(decoded_items[1].timestamp, 900);
467
-
}
468
-
469
-
#[test]
470
-
fn test_different_data_sizes() {
471
-
let small_data = TestData {
472
-
id: 1,
473
-
value: "x".to_string(),
474
-
};
475
-
let large_data = TestData {
476
-
id: 2,
477
-
value: "a".repeat(1000),
478
-
};
479
-
480
-
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
481
-
482
-
let mut buffer = Vec::new();
483
-
let mut encoder = ItemEncoder::new(&mut buffer);
484
-
485
-
for item in &items {
486
-
encoder.encode(item).unwrap();
487
-
}
488
-
encoder.finish().unwrap();
489
-
490
-
let cursor = Cursor::new(buffer);
491
-
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
492
-
493
-
let decoded_items: Result<Vec<_>, _> = decoder.collect();
494
-
let decoded_items = decoded_items.unwrap();
495
-
496
-
assert_eq!(decoded_items.len(), 2);
497
-
assert_eq!(decoded_items[0].access().value.as_str(), "x");
498
-
assert_eq!(decoded_items[1].access().value.len(), 1000);
499
-
assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000));
500
-
}
501
-
}
-424
server/src/db_old/mod.rs
-424
server/src/db_old/mod.rs
···
1
-
use std::{
2
-
io::Cursor,
3
-
ops::{Bound, Deref, RangeBounds},
4
-
path::Path,
5
-
sync::{
6
-
Arc,
7
-
atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
8
-
},
9
-
time::{Duration, Instant},
10
-
};
11
-
12
-
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
13
-
use ordered_varint::Variable;
14
-
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
15
-
use smol_str::SmolStr;
16
-
use tokio::sync::broadcast;
17
-
18
-
use crate::{
19
-
db_old::block::{ReadVariableExt, WriteVariableExt},
20
-
error::{AppError, AppResult},
21
-
jetstream::JetstreamEvent,
22
-
utils::{DefaultRateTracker, get_time},
23
-
};
24
-
25
-
mod block;
26
-
27
-
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
28
-
#[rkyv(compare(PartialEq), derive(Debug))]
29
-
pub struct NsidCounts {
30
-
pub count: u128,
31
-
pub deleted_count: u128,
32
-
pub last_seen: u64,
33
-
}
34
-
35
-
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
36
-
#[rkyv(compare(PartialEq), derive(Debug))]
37
-
pub struct NsidHit {
38
-
pub deleted: bool,
39
-
}
40
-
41
-
#[derive(Clone)]
42
-
pub struct EventRecord {
43
-
pub nsid: SmolStr,
44
-
pub timestamp: u64, // seconds
45
-
pub deleted: bool,
46
-
}
47
-
48
-
impl EventRecord {
49
-
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
50
-
match event {
51
-
JetstreamEvent::Commit {
52
-
time_us, commit, ..
53
-
} => Some(Self {
54
-
nsid: commit.collection.into(),
55
-
timestamp: time_us / 1_000_000,
56
-
deleted: false,
57
-
}),
58
-
JetstreamEvent::Delete {
59
-
time_us, commit, ..
60
-
} => Some(Self {
61
-
nsid: commit.collection.into(),
62
-
timestamp: time_us / 1_000_000,
63
-
deleted: true,
64
-
}),
65
-
_ => None,
66
-
}
67
-
}
68
-
}
69
-
70
-
type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
71
-
type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
72
-
type Item = block::Item<NsidHit>;
73
-
74
-
pub struct LexiconHandle {
75
-
tree: Partition,
76
-
buf: Arc<scc::Queue<EventRecord>>,
77
-
buf_len: AtomicUsize,
78
-
last_insert: AtomicU64,
79
-
eps: DefaultRateTracker,
80
-
block_size: AtomicUsize,
81
-
}
82
-
83
-
impl LexiconHandle {
84
-
fn new(keyspace: &Keyspace, nsid: &str) -> Self {
85
-
let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
86
-
Self {
87
-
tree: keyspace.open_partition(nsid, opts).unwrap(),
88
-
buf: Default::default(),
89
-
buf_len: AtomicUsize::new(0),
90
-
last_insert: AtomicU64::new(0),
91
-
eps: DefaultRateTracker::new(Duration::from_secs(5)),
92
-
block_size: AtomicUsize::new(1000),
93
-
}
94
-
}
95
-
96
-
fn item_count(&self) -> usize {
97
-
self.buf_len.load(AtomicOrdering::Acquire)
98
-
}
99
-
100
-
fn last_insert(&self) -> u64 {
101
-
self.last_insert.load(AtomicOrdering::Acquire)
102
-
}
103
-
104
-
fn suggested_block_size(&self) -> usize {
105
-
self.block_size.load(AtomicOrdering::Relaxed)
106
-
}
107
-
108
-
fn insert(&self, event: EventRecord) {
109
-
self.buf.push(event);
110
-
self.buf_len.fetch_add(1, AtomicOrdering::Release);
111
-
self.last_insert
112
-
.store(get_time().as_millis() as u64, AtomicOrdering::Release);
113
-
self.eps.observe(1);
114
-
let rate = self.eps.rate() as usize;
115
-
if rate != 0 {
116
-
self.block_size.store(rate * 60, AtomicOrdering::Relaxed);
117
-
}
118
-
}
119
-
120
-
fn sync(&self, max_block_size: usize) -> AppResult<usize> {
121
-
let mut writer = ItemEncoder::new(Vec::with_capacity(
122
-
size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(),
123
-
));
124
-
let mut start_timestamp = None;
125
-
let mut end_timestamp = None;
126
-
let mut written = 0_usize;
127
-
while let Some(event) = self.buf.pop() {
128
-
let item = Item::new(
129
-
event.timestamp,
130
-
&NsidHit {
131
-
deleted: event.deleted,
132
-
},
133
-
);
134
-
writer.encode(&item)?;
135
-
if start_timestamp.is_none() {
136
-
start_timestamp = Some(event.timestamp);
137
-
}
138
-
end_timestamp = Some(event.timestamp);
139
-
if written >= max_block_size {
140
-
break;
141
-
}
142
-
written += 1;
143
-
}
144
-
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
145
-
self.buf_len.store(0, AtomicOrdering::Release);
146
-
let value = writer.finish()?;
147
-
let mut key = Vec::with_capacity(size_of::<u64>() * 2);
148
-
key.write_varint(start_timestamp)?;
149
-
key.write_varint(end_timestamp)?;
150
-
self.tree.insert(key, value)?;
151
-
}
152
-
Ok(written)
153
-
}
154
-
}
155
-
156
-
type BoxedIter<T> = Box<dyn Iterator<Item = T>>;
157
-
158
-
// counts is nsid -> NsidCounts
159
-
// hits is tree per nsid: varint start time + varint end time -> block of hits
160
-
pub struct Db {
161
-
inner: Keyspace,
162
-
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
163
-
counts: Partition,
164
-
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
165
-
eps: DefaultRateTracker,
166
-
min_block_size: usize,
167
-
max_block_size: usize,
168
-
max_last_activity: Duration,
169
-
}
170
-
171
-
impl Db {
172
-
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
173
-
tracing::info!("opening db...");
174
-
let ks = Config::new(path)
175
-
.cache_size(8 * 1024 * 1024) // from talna
176
-
.open()?;
177
-
Ok(Self {
178
-
hits: Default::default(),
179
-
counts: ks.open_partition(
180
-
"_counts",
181
-
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
182
-
)?,
183
-
inner: ks,
184
-
event_broadcaster: broadcast::channel(1000).0,
185
-
eps: DefaultRateTracker::new(Duration::from_secs(1)),
186
-
min_block_size: 512,
187
-
max_block_size: 100_000,
188
-
max_last_activity: Duration::from_secs(10),
189
-
})
190
-
}
191
-
192
-
pub fn sync(&self, all: bool) -> AppResult<()> {
193
-
let _guard = scc::ebr::Guard::new();
194
-
for (nsid, tree) in self.hits.iter(&_guard) {
195
-
let count = tree.item_count();
196
-
let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size());
197
-
let is_too_old = (get_time().as_millis() as u64 - tree.last_insert())
198
-
> self.max_last_activity.as_millis() as u64;
199
-
if count > 0 && (all || is_max_block_size || is_too_old) {
200
-
loop {
201
-
let synced = tree.sync(self.max_block_size)?;
202
-
if synced == 0 {
203
-
break;
204
-
}
205
-
tracing::info!("synced {synced} of {nsid} to db");
206
-
}
207
-
}
208
-
}
209
-
Ok(())
210
-
}
211
-
212
-
#[inline(always)]
213
-
pub fn eps(&self) -> usize {
214
-
self.eps.rate() as usize
215
-
}
216
-
217
-
#[inline(always)]
218
-
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
219
-
self.event_broadcaster.subscribe()
220
-
}
221
-
222
-
#[inline(always)]
223
-
fn maybe_run_in_nsid_tree<T>(
224
-
&self,
225
-
nsid: &str,
226
-
f: impl FnOnce(&LexiconHandle) -> T,
227
-
) -> Option<T> {
228
-
let _guard = scc::ebr::Guard::new();
229
-
let handle = match self.hits.peek(nsid, &_guard) {
230
-
Some(handle) => handle.clone(),
231
-
None => {
232
-
if self.inner.partition_exists(nsid) {
233
-
let handle = Arc::new(LexiconHandle::new(&self.inner, nsid));
234
-
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
235
-
handle
236
-
} else {
237
-
return None;
238
-
}
239
-
}
240
-
};
241
-
Some(f(&handle))
242
-
}
243
-
244
-
#[inline(always)]
245
-
fn run_in_nsid_tree<T>(
246
-
&self,
247
-
nsid: SmolStr,
248
-
f: impl FnOnce(&LexiconHandle) -> AppResult<T>,
249
-
) -> AppResult<T> {
250
-
f(self
251
-
.hits
252
-
.entry(nsid.clone())
253
-
.or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid)))
254
-
.get())
255
-
}
256
-
257
-
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
258
-
let EventRecord {
259
-
nsid,
260
-
timestamp,
261
-
deleted,
262
-
} = e.clone();
263
-
264
-
// insert event
265
-
self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?;
266
-
// increment count
267
-
let mut counts = self.get_count(&nsid)?;
268
-
counts.last_seen = timestamp;
269
-
if deleted {
270
-
counts.deleted_count += 1;
271
-
} else {
272
-
counts.count += 1;
273
-
}
274
-
self.insert_count(&nsid, counts.clone())?;
275
-
if self.event_broadcaster.receiver_count() > 0 {
276
-
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
277
-
}
278
-
self.eps.observe(1);
279
-
Ok(())
280
-
}
281
-
282
-
#[inline(always)]
283
-
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
284
-
self.counts
285
-
.insert(
286
-
nsid,
287
-
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
288
-
)
289
-
.map_err(AppError::from)
290
-
}
291
-
292
-
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
293
-
let Some(raw) = self.counts.get(nsid)? else {
294
-
return Ok(NsidCounts::default());
295
-
};
296
-
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
297
-
}
298
-
299
-
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
300
-
self.counts.iter().map(|res| {
301
-
res.map_err(AppError::from).map(|(key, val)| {
302
-
(
303
-
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
304
-
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
305
-
)
306
-
})
307
-
})
308
-
}
309
-
310
-
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> {
311
-
self.inner
312
-
.list_partitions()
313
-
.into_iter()
314
-
.filter(|k| k.deref() != "_counts")
315
-
}
316
-
317
-
pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> {
318
-
self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> {
319
-
Box::new(
320
-
handle
321
-
.tree
322
-
.iter()
323
-
.rev()
324
-
.map(|res| res.map_err(AppError::from)),
325
-
)
326
-
})
327
-
.unwrap_or_else(|| Box::new(std::iter::empty()))
328
-
}
329
-
330
-
pub fn get_hits(
331
-
&self,
332
-
nsid: &str,
333
-
range: impl RangeBounds<u64> + std::fmt::Debug,
334
-
) -> BoxedIter<AppResult<Item>> {
335
-
let start = range
336
-
.start_bound()
337
-
.cloned()
338
-
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
339
-
let end = range
340
-
.end_bound()
341
-
.cloned()
342
-
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
343
-
let limit = match range.end_bound().cloned() {
344
-
Bound::Included(end) => end,
345
-
Bound::Excluded(end) => end.saturating_sub(1),
346
-
Bound::Unbounded => u64::MAX,
347
-
};
348
-
349
-
self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> {
350
-
let map_block = move |(key, val)| {
351
-
let mut key_reader = Cursor::new(key);
352
-
let start_timestamp = key_reader.read_varint::<u64>()?;
353
-
let items =
354
-
ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| {
355
-
item.as_ref().map_or(true, |item| item.timestamp <= limit)
356
-
});
357
-
Ok(items)
358
-
};
359
-
360
-
Box::new(
361
-
handle
362
-
.tree
363
-
.range(TimestampRange { start, end })
364
-
.map(move |res| res.map_err(AppError::from).and_then(map_block))
365
-
.flatten()
366
-
.flatten(),
367
-
)
368
-
})
369
-
.unwrap_or_else(|| Box::new(std::iter::empty()))
370
-
}
371
-
372
-
pub fn tracking_since(&self) -> AppResult<u64> {
373
-
// HACK: we should actually store when we started tracking but im lazy
374
-
// should be accurate enough
375
-
self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| {
376
-
let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else {
377
-
return Ok(0);
378
-
};
379
-
let mut timestamp_reader = Cursor::new(timestamps_raw);
380
-
timestamp_reader
381
-
.read_varint::<u64>()
382
-
.map_err(AppError::from)
383
-
})
384
-
.unwrap_or(Ok(0))
385
-
}
386
-
}
387
-
388
-
type TimestampRepr = Vec<u8>;
389
-
390
-
struct TimestampRange {
391
-
start: Bound<TimestampRepr>,
392
-
end: Bound<TimestampRepr>,
393
-
}
394
-
395
-
impl RangeBounds<TimestampRepr> for TimestampRange {
396
-
#[inline(always)]
397
-
fn start_bound(&self) -> Bound<&TimestampRepr> {
398
-
self.start.as_ref()
399
-
}
400
-
401
-
#[inline(always)]
402
-
fn end_bound(&self) -> Bound<&TimestampRepr> {
403
-
self.end.as_ref()
404
-
}
405
-
}
406
-
407
-
type TimestampReprOld = [u8; 8];
408
-
409
-
struct TimestampRangeOld {
410
-
start: Bound<TimestampReprOld>,
411
-
end: Bound<TimestampReprOld>,
412
-
}
413
-
414
-
impl RangeBounds<TimestampReprOld> for TimestampRangeOld {
415
-
#[inline(always)]
416
-
fn start_bound(&self) -> Bound<&TimestampReprOld> {
417
-
self.start.as_ref()
418
-
}
419
-
420
-
#[inline(always)]
421
-
fn end_bound(&self) -> Bound<&TimestampReprOld> {
422
-
self.end.as_ref()
423
-
}
424
-
}
+21
-11
server/src/jetstream.rs
+21
-11
server/src/jetstream.rs
···
13
13
pub struct JetstreamClient {
14
14
stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
15
15
tls_connector: tokio_websockets::Connector,
16
-
url: SmolStr,
16
+
urls: Vec<SmolStr>,
17
17
}
18
18
19
19
impl JetstreamClient {
20
-
pub fn new(url: &str) -> AppResult<Self> {
20
+
pub fn new(urls: impl IntoIterator<Item = impl Into<SmolStr>>) -> AppResult<Self> {
21
21
Ok(Self {
22
22
stream: None,
23
23
tls_connector: tokio_websockets::Connector::new()?,
24
-
url: SmolStr::new(url),
24
+
urls: urls.into_iter().map(Into::into).collect(),
25
25
})
26
26
}
27
27
28
28
pub async fn connect(&mut self) -> AppResult<()> {
29
-
let (stream, _) = ClientBuilder::new()
30
-
.connector(&self.tls_connector)
31
-
.uri(&self.url)?
32
-
.connect()
33
-
.await?;
34
-
self.stream = Some(stream);
35
-
tracing::info!("connected to jetstream ({})", self.url);
36
-
Ok(())
29
+
for uri in &self.urls {
30
+
let conn_result = ClientBuilder::new()
31
+
.connector(&self.tls_connector)
32
+
.uri(uri)?
33
+
.connect()
34
+
.await;
35
+
match conn_result {
36
+
Ok((stream, _)) => {
37
+
self.stream = Some(stream);
38
+
tracing::info!("connected to jetstream {}", uri);
39
+
return Ok(());
40
+
}
41
+
Err(err) => {
42
+
tracing::error!("failed to connect to jetstream {uri}: {err}");
43
+
}
44
+
};
45
+
}
46
+
Err(anyhow!("failed to connect to any jetstream server").into())
37
47
}
38
48
39
49
// automatically retries connection, only returning error if it fails many times
+48
-21
server/src/main.rs
+48
-21
server/src/main.rs
···
1
-
use std::{ops::Deref, time::Duration, u64};
1
+
use std::{ops::Deref, time::Duration, u64, usize};
2
2
3
3
use itertools::Itertools;
4
4
use rclite::Arc;
···
17
17
18
18
mod api;
19
19
mod db;
20
-
mod db_old;
21
20
mod error;
22
21
mod jetstream;
23
22
mod utils;
···
26
25
#[global_allocator]
27
26
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
28
27
29
-
#[cfg(target_env = "msvc")]
30
-
#[global_allocator]
31
-
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
32
-
33
28
#[tokio::main]
34
29
async fn main() {
35
30
tracing_subscriber::fmt::fmt()
···
54
49
debug();
55
50
return;
56
51
}
52
+
Some("print") => {
53
+
print_all();
54
+
return;
55
+
}
57
56
Some(x) => {
58
57
tracing::error!("unknown command: {}", x);
59
58
return;
···
71
70
.install_default()
72
71
.expect("cant install rustls crypto provider");
73
72
74
-
let mut jetstream =
75
-
match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") {
76
-
Ok(client) => client,
77
-
Err(err) => {
78
-
tracing::error!("can't create jetstream client: {err}");
79
-
return;
80
-
}
81
-
};
73
+
let urls = [
74
+
"wss://jetstream1.us-west.bsky.network/subscribe",
75
+
"wss://jetstream2.us-west.bsky.network/subscribe",
76
+
"wss://jetstream2.fr.hose.cam/subscribe",
77
+
"wss://jetstream.fire.hose.cam/subscribe",
78
+
];
79
+
let mut jetstream = match JetstreamClient::new(urls) {
80
+
Ok(client) => client,
81
+
Err(err) => {
82
+
tracing::error!("can't create jetstream client: {err}");
83
+
return;
84
+
}
85
+
};
82
86
83
87
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
84
88
let consume_events = tokio::spawn({
···
107
111
move || {
108
112
let mut buffer = Vec::new();
109
113
loop {
110
-
let read = event_rx.blocking_recv_many(&mut buffer, 100);
114
+
let read = event_rx.blocking_recv_many(&mut buffer, 500);
111
115
if let Err(err) = db.ingest_events(buffer.drain(..)) {
112
116
tracing::error!("failed to ingest events: {}", err);
113
117
}
···
153
157
if db.is_shutting_down() {
154
158
return;
155
159
}
156
-
let end = get_time() - compact_period / 2;
160
+
let end = get_time();
157
161
let start = end - compact_period;
158
162
let range = start.as_secs()..end.as_secs();
159
163
tracing::info!(
···
208
212
db.sync(true).expect("cant sync db");
209
213
}
210
214
215
+
fn print_all() {
216
+
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
217
+
let nsids = db.get_nsids().collect::<Vec<_>>();
218
+
let mut count = 0_usize;
219
+
for nsid in nsids {
220
+
println!("{}:", nsid.deref());
221
+
for hit in db.get_hits(&nsid, .., usize::MAX) {
222
+
let hit = hit.expect("aaa");
223
+
println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted);
224
+
count += 1;
225
+
}
226
+
}
227
+
println!("total hits: {}", count);
228
+
}
229
+
211
230
fn debug() {
212
231
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
213
232
let info = db.info().expect("cant get db info");
···
260
279
261
280
fn migrate() {
262
281
let cancel_token = CancellationToken::new();
263
-
264
-
let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db"));
265
-
282
+
let from = Arc::new(
283
+
Db::new(
284
+
DbConfig::default().path(".fjall_data_from"),
285
+
cancel_token.child_token(),
286
+
)
287
+
.expect("couldnt create db"),
288
+
);
266
289
let to = Arc::new(
267
290
Db::new(
268
291
DbConfig::default().path(".fjall_data_to").ks(|c| {
···
277
300
);
278
301
279
302
let nsids = from.get_nsids().collect::<Vec<_>>();
280
-
let eps_thread = std::thread::spawn({
303
+
let _eps_thread = std::thread::spawn({
281
304
let to = to.clone();
282
305
move || {
283
306
loop {
···
297
320
threads.push(std::thread::spawn(move || {
298
321
tracing::info!("{}: migrating...", nsid.deref());
299
322
let mut count = 0_u64;
300
-
for hits in from.get_hits(&nsid, ..).chunks(100000).into_iter() {
323
+
for hits in from
324
+
.get_hits(&nsid, .., usize::MAX)
325
+
.chunks(100000)
326
+
.into_iter()
327
+
{
301
328
to.ingest_events(hits.map(|hit| {
302
329
count += 1;
303
330
let hit = hit.expect("cant decode hit");
+66
server/src/utils.rs
+66
server/src/utils.rs
···
1
1
use std::io::{self, Read, Write};
2
+
use std::ops::Deref;
2
3
use std::sync::atomic::{AtomicU64, Ordering};
3
4
use std::time::Duration;
4
5
6
+
use arc_swap::RefCnt;
5
7
use byteview::ByteView;
6
8
use ordered_varint::Variable;
9
+
use rclite::Arc;
7
10
8
11
pub fn get_time() -> Duration {
9
12
std::time::SystemTime::now()
···
320
323
}
321
324
}
322
325
}
326
+
327
+
pub type ArcliteSwap<T> = arc_swap::ArcSwapAny<ArcRefCnt<T>>;
328
+
329
+
pub struct ArcRefCnt<T>(Arc<T>);
330
+
331
+
impl<T> ArcRefCnt<T> {
332
+
pub fn new(value: T) -> Self {
333
+
Self(Arc::new(value))
334
+
}
335
+
}
336
+
337
+
impl<T> Deref for ArcRefCnt<T> {
338
+
type Target = T;
339
+
340
+
fn deref(&self) -> &Self::Target {
341
+
&self.0
342
+
}
343
+
}
344
+
345
+
impl<T> Clone for ArcRefCnt<T> {
346
+
fn clone(&self) -> Self {
347
+
Self(self.0.clone())
348
+
}
349
+
}
350
+
351
+
// SAFETY: uhhhhhhhh copied the Arc impl from arc_swap xd
352
+
unsafe impl<T> RefCnt for ArcRefCnt<T> {
353
+
type Base = T;
354
+
355
+
fn into_ptr(me: Self) -> *mut Self::Base {
356
+
Arc::into_raw(me.0) as *mut T
357
+
}
358
+
359
+
fn as_ptr(me: &Self) -> *mut Self::Base {
360
+
// Slightly convoluted way to do this, but this avoids stacked borrows violations. The same
361
+
// intention as
362
+
//
363
+
// me as &T as *const T as *mut T
364
+
//
365
+
// We first create a "shallow copy" of me - one that doesn't really own its ref count
366
+
// (that's OK, me _does_ own it, so it can't be destroyed in the meantime).
367
+
// Then we can use into_raw (which preserves not having the ref count).
368
+
//
369
+
// We need to "revert" the changes we did. In current std implementation, the combination
370
+
// of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw
371
+
// and that read shall be paired with forget to properly "close the brackets". In future
372
+
// versions of STD, these may become something else that's not really no-op (unlikely, but
373
+
// possible), so we future-proof it a bit.
374
+
375
+
// SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads
376
+
let ptr = Arc::into_raw(unsafe { std::ptr::read(&me.0) });
377
+
let ptr = ptr as *mut T;
378
+
379
+
// SAFETY: We got the pointer from into_raw just above
380
+
std::mem::forget(unsafe { Arc::from_raw(ptr) });
381
+
382
+
ptr
383
+
}
384
+
385
+
unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
386
+
Self(unsafe { Arc::from_raw(ptr) })
387
+
}
388
+
}