+2
-1
apps/ingester/package.json
+2
-1
apps/ingester/package.json
···
18
18
"@atcute/identity": "^1.1.3",
19
19
"@atcute/jetstream": "^1.1.2",
20
20
"@atcute/lexicons": "catalog:",
21
+
"@badrap/valita": "^0.4.6",
22
+
"@cookware/database": "workspace:^",
21
23
"@cookware/lexicons": "workspace:*",
22
-
"@cookware/database": "workspace:^",
23
24
"@sentry/node": "^8.42.0",
24
25
"pino": "^9.5.0"
25
26
},
+12
-17
apps/ingester/src/config.ts
+12
-17
apps/ingester/src/config.ts
···
1
-
import { z } from "zod";
1
+
import * as v from "@badrap/valita";
2
2
3
-
const envSchema = z.object({
4
-
TURSO_CONNECTION_URL: z.string().default('https://turso.dev.hayden.moe'),
5
-
TURSO_AUTH_TOKEN: z.string().or(z.undefined()),
3
+
const envSchema = v.object({
4
+
TURSO_CONNECTION_URL: v.string().optional(() => 'https://turso.dev.hayden.moe'),
5
+
TURSO_AUTH_TOKEN: v.string().optional(),
6
6
7
-
JETSTREAM_ENDPOINT: z
8
-
.string()
9
-
.url()
10
-
.default('wss://jetstream1.us-east.bsky.network/subscribe'),
11
-
PLC_DIRECTORY_URL: z.string().url().default('https://plc.directory'),
7
+
JETSTREAM_ENDPOINT: v.string()
8
+
.optional(() => 'wss://jetstream2.us-east.bsky.network'),
9
+
PLC_DIRECTORY_URL: v.string().optional(() => 'https://plc.directory'),
12
10
13
-
ENV: z
14
-
.union([
15
-
z.literal('development'),
16
-
z.literal('production'),
17
-
])
18
-
.default('development'),
11
+
ENV: v
12
+
.union(v.literal('development'), v.literal('production'))
13
+
.optional(() => 'development'),
19
14
});
20
15
21
-
const env = envSchema.parse(process.env);
16
+
const env = envSchema.parse(process.env, { mode: 'strip' });
22
17
23
18
export default env;
24
-
export type Env = z.infer<typeof envSchema>;
19
+
export type Env = v.Infer<typeof envSchema>;
+21
-6
apps/ingester/src/index.ts
+21
-6
apps/ingester/src/index.ts
···
1
1
import { JetstreamSubscription } from "@atcute/jetstream";
2
-
import { ingestLogger } from "./logger.js";
3
2
import env from "./config.js";
4
3
import { db, and, eq } from "@cookware/database";
5
4
import { BlueRecipesFeedRecipe } from "@cookware/lexicons";
6
5
import { recipeTable } from "@cookware/database/schema";
7
6
import { is } from '@atcute/lexicons';
8
7
import { isAtprotoDid } from '@atcute/identity';
8
+
import pino from "pino";
9
9
10
10
export const newIngester = () => {
11
+
const logger = pino({
12
+
name: 'recipes.ingester',
13
+
level: env.ENV === 'development' ? 'debug' : 'info',
14
+
});
15
+
11
16
const subscription = new JetstreamSubscription({
12
17
url: env.JETSTREAM_ENDPOINT,
13
18
wantedCollections: ['blue.recipes.*'],
14
19
cursor: 0,
20
+
onConnectionOpen: () => logger.info('Connected to Jetstream'),
21
+
onConnectionError: err => {
22
+
logger.error(err, 'Failed to connect to Jetstream');
23
+
process.exit(1);
24
+
},
25
+
onConnectionClose: () => logger.info('Disconnected from Jetstream'),
15
26
});
16
27
17
28
return {
···
20
31
for await (const event of subscription) {
21
32
const authorDid = event.did;
22
33
if (!isAtprotoDid(authorDid)) {
23
-
ingestLogger.warn(`Invalid did: ${authorDid}`);
34
+
logger.warn(`Invalid did: ${authorDid}`);
24
35
continue;
25
36
}
26
37
···
28
39
const commit = event.commit;
29
40
30
41
if (commit.collection !== 'blue.recipes.feed.recipe') {
42
+
logger.trace(`Skipping unknown collection: ${commit.collection}`);
31
43
continue;
32
44
}
33
45
···
36
48
const record = commit.record;
37
49
38
50
if (!is(BlueRecipesFeedRecipe.mainSchema, record)) {
39
-
ingestLogger.warn(`Invalid recipe schema for ${commit['operation']} ${authorDid}/${rkey}`);
51
+
logger.warn(`Invalid recipe schema for ${commit['operation']} ${authorDid}/${rkey}`);
40
52
continue;
41
53
}
42
54
···
65
77
},
66
78
});
67
79
68
-
ingestLogger.info(`Upserted recipe ${authorDid}/${rkey}`);
80
+
logger.info(`Upserted recipe ${authorDid}/${rkey}`);
69
81
} else if (commit.operation == 'delete') {
70
82
const rkey = commit.rkey;
71
83
db
···
74
86
eq(recipeTable.authorDid, authorDid),
75
87
eq(recipeTable.rkey, rkey),
76
88
));
77
-
ingestLogger.info(`Deleted recipe ${authorDid}/${rkey}`);
89
+
logger.info(`Deleted recipe ${authorDid}/${rkey}`);
78
90
} else {
79
-
ingestLogger.warn(`Unknown operation type: ${commit['operation']}`);
91
+
logger.warn(`Unknown operation type: ${commit['operation']}`);
80
92
continue;
81
93
}
94
+
} else {
95
+
logger.trace({ kind: event.kind, authorDid }, `Skipping non-commit event for did: ${event.did}`);
96
+
continue;
82
97
}
83
98
}
84
99
},
-4
apps/ingester/src/logger.ts
-4
apps/ingester/src/logger.ts
+1
bun.lock
+1
bun.lock
+1
-1
libs/database/package.json
+1
-1
libs/database/package.json
+3
-3
libs/database/tsconfig.build.json
+3
-3
libs/database/tsconfig.build.json
+4
-1
libs/database/tsconfig.json
+4
-1
libs/database/tsconfig.json
+2
-1
libs/lexicons/package.json
+2
-1
libs/lexicons/package.json
···
20
20
"./did": "./dist/did.js"
21
21
},
22
22
"scripts": {
23
-
"build": "tsc",
23
+
"dev": "tsc --watch --project tsconfig.build.json",
24
+
"build": "tsc --project tsconfig.build.json",
24
25
"lexgen": "lex-cli generate --config ./lex.config.ts",
25
26
"prepublish": "rm -rf dist; bun run build"
26
27
},
+7
libs/lexicons/tsconfig.build.json
+7
libs/lexicons/tsconfig.build.json