+13
-57
apps/ingester/src/index.ts
+13
-57
apps/ingester/src/index.ts
···
1
1
import { JetstreamSubscription } from "@atcute/jetstream";
2
2
import env from "./config.js";
3
-
import { db, and, eq } from "@cookware/database";
4
-
import { BlueRecipesFeedRecipe } from "@cookware/lexicons";
5
-
import { recipeTable } from "@cookware/database/schema";
6
-
import { is } from '@atcute/lexicons';
3
+
import { BlueRecipesActorProfile, BlueRecipesFeedRecipe } from "@cookware/lexicons";
7
4
import { isAtprotoDid } from '@atcute/identity';
8
5
import pino from "pino";
6
+
import { ingestRecipe } from "./ingesters/recipe.js";
7
+
import { ingestProfile } from "./ingesters/profile.js";
9
8
10
9
export const newIngester = () => {
11
10
const logger = pino({
···
37
36
38
37
if (event.kind === 'commit') {
39
38
const commit = event.commit;
40
-
41
-
if (commit.collection !== 'blue.recipes.feed.recipe') {
42
-
logger.trace(`Skipping unknown collection: ${commit.collection}`);
43
-
continue;
44
-
}
45
-
46
-
if (commit.operation == 'create' || commit.operation == 'update') {
47
-
const rkey = commit.rkey;
48
-
const record = commit.record;
49
-
50
-
if (!is(BlueRecipesFeedRecipe.mainSchema, record)) {
51
-
logger.warn(`Invalid recipe schema for ${commit['operation']} ${authorDid}/${rkey}`);
52
-
continue;
53
-
}
54
-
55
-
db
56
-
.insert(recipeTable)
57
-
.values({
58
-
rkey,
59
-
authorDid,
60
-
createdAt: new Date(),
61
-
title: record.title,
62
-
time: record.time ?? 0,
63
-
serves: record.serves ?? null,
64
-
description: record.description ?? null,
65
-
ingredients: record.ingredients,
66
-
steps: record.steps,
67
-
})
68
-
.onConflictDoUpdate({
69
-
target: [recipeTable.authorDid, recipeTable.rkey],
70
-
set: {
71
-
title: record.title,
72
-
time: record.time ?? 0,
73
-
serves: record.serves ?? null,
74
-
description: record.description ?? null,
75
-
ingredients: record.ingredients,
76
-
steps: record.steps,
77
-
},
78
-
});
79
-
80
-
logger.info(`Upserted recipe ${authorDid}/${rkey}`);
81
-
} else if (commit.operation == 'delete') {
82
-
const rkey = commit.rkey;
83
-
db
84
-
.delete(recipeTable)
85
-
.where(and(
86
-
eq(recipeTable.authorDid, authorDid),
87
-
eq(recipeTable.rkey, rkey),
88
-
));
89
-
logger.info(`Deleted recipe ${authorDid}/${rkey}`);
90
-
} else {
91
-
logger.warn(`Unknown operation type: ${commit['operation']}`);
92
-
continue;
39
+
switch (commit.collection) {
40
+
case BlueRecipesFeedRecipe.mainSchema.object.shape.$type.expected:
41
+
ingestRecipe(authorDid, commit, logger);
42
+
break;
43
+
case BlueRecipesActorProfile.mainSchema.object.shape.$type.expected:
44
+
ingestProfile(authorDid, commit, logger);
45
+
break;
46
+
default:
47
+
logger.trace({ collection: commit.collection }, "skipping unknown collection");
48
+
break;
93
49
}
94
50
} else {
95
51
logger.trace({ kind: event.kind, authorDid }, `Skipping non-commit event for did: ${event.did}`);
+60
apps/ingester/src/ingesters/profile.ts
+60
apps/ingester/src/ingesters/profile.ts
···
1
+
import { db, and, eq } from "@cookware/database";
2
+
import { profilesTable } from "@cookware/database/schema";
3
+
import { is } from '@atcute/lexicons';
4
+
import { BlueRecipesActorProfile } from "@cookware/lexicons";
5
+
import { CommitOperation } from "@atcute/jetstream";
6
+
import { Logger } from "pino";
7
+
import { AtprotoDid } from "@atcute/lexicons/syntax";
8
+
9
+
export const ingestProfile = (did: AtprotoDid, commit: CommitOperation, logger: Logger) => {
10
+
if (commit.operation == 'create' || commit.operation == 'update') {
11
+
const { rkey, record, cid } = commit;
12
+
13
+
if (rkey != "self") {
14
+
logger.warn(`Invalid profile rkey for ${commit['operation']} ${did}: ${rkey}`);
15
+
return;
16
+
}
17
+
if (!is(BlueRecipesActorProfile.mainSchema, record)) {
18
+
logger.warn(`Invalid profile schema for ${commit['operation']} ${did}`);
19
+
return;
20
+
}
21
+
22
+
db
23
+
.insert(profilesTable)
24
+
.values({
25
+
cid,
26
+
did,
27
+
displayName: record.displayName,
28
+
avatarRef: record.avatar,
29
+
bannerRef: record.banner,
30
+
description: record.description,
31
+
pronouns: record.pronouns,
32
+
website: record.website,
33
+
createdAt: record.createdAt ? new Date(record.createdAt) : new Date(),
34
+
ingestedAt: new Date(),
35
+
})
36
+
.onConflictDoUpdate({
37
+
target: [profilesTable.did],
38
+
set: {
39
+
cid,
40
+
displayName: record.displayName,
41
+
avatarRef: record.avatar,
42
+
bannerRef: record.banner,
43
+
description: record.description,
44
+
pronouns: record.pronouns,
45
+
website: record.website,
46
+
createdAt: record.createdAt ? new Date(record.createdAt) : new Date(),
47
+
},
48
+
});
49
+
50
+
logger.info(`Upserted profile ${did}/${rkey}`);
51
+
} else if (commit.operation == 'delete') {
52
+
const rkey = commit.rkey;
53
+
db
54
+
.delete(profilesTable)
55
+
.where(eq(profilesTable.did, did));
56
+
logger.info(`Deleted profile ${did}/${rkey}`);
57
+
} else {
58
+
logger.warn(`Unknown operation type: ${commit['operation']}`);
59
+
}
60
+
};
+57
apps/ingester/src/ingesters/recipe.ts
+57
apps/ingester/src/ingesters/recipe.ts
···
1
+
import { db, and, eq } from "@cookware/database";
2
+
import { recipeTable } from "@cookware/database/schema";
3
+
import { is } from '@atcute/lexicons';
4
+
import { BlueRecipesFeedRecipe } from "@cookware/lexicons";
5
+
import { CommitOperation } from "@atcute/jetstream";
6
+
import { Logger } from "pino";
7
+
import { AtprotoDid } from "@atcute/lexicons/syntax";
8
+
9
+
export const ingestRecipe = (did: AtprotoDid, commit: CommitOperation, logger: Logger) => {
10
+
if (commit.operation == 'create' || commit.operation == 'update') {
11
+
const { rkey, record, cid } = commit;
12
+
13
+
if (!is(BlueRecipesFeedRecipe.mainSchema, record)) {
14
+
logger.warn(`Invalid recipe schema for ${commit['operation']} ${did}/${rkey}`);
15
+
return;
16
+
}
17
+
18
+
db
19
+
.insert(recipeTable)
20
+
.values({
21
+
cid, rkey, did,
22
+
title: record.title,
23
+
time: record.time ?? 0,
24
+
serves: record.serves ?? null,
25
+
description: record.description ?? null,
26
+
ingredients: record.ingredients,
27
+
steps: record.steps,
28
+
createdAt: record.createdAt ? new Date(record.createdAt) : new Date(),
29
+
})
30
+
.onConflictDoUpdate({
31
+
target: [recipeTable.did, recipeTable.rkey],
32
+
set: {
33
+
cid,
34
+
title: record.title,
35
+
time: record.time ?? 0,
36
+
serves: record.serves ?? null,
37
+
description: record.description ?? null,
38
+
ingredients: record.ingredients,
39
+
steps: record.steps,
40
+
createdAt: record.createdAt ? new Date(record.createdAt) : new Date(),
41
+
},
42
+
});
43
+
44
+
logger.info(`Upserted recipe ${did}/${rkey}`);
45
+
} else if (commit.operation == 'delete') {
46
+
const rkey = commit.rkey;
47
+
db
48
+
.delete(recipeTable)
49
+
.where(and(
50
+
eq(recipeTable.did, did),
51
+
eq(recipeTable.rkey, rkey),
52
+
));
53
+
logger.info(`Deleted recipe ${did}/${rkey}`);
54
+
} else {
55
+
logger.warn(`Unknown operation type: ${commit['operation']}`);
56
+
}
57
+
};
+5
-1
libs/database/lib/schema.ts
+5
-1
libs/database/lib/schema.ts
···
1
1
import { customType, index, int, primaryKey, sqliteTable, text } from "drizzle-orm/sqlite-core";
2
2
import { BlueRecipesFeedRecipe, BlueRecipesActorProfile } from "@cookware/lexicons";
3
-
import { isCid, ResourceUri, type AtprotoDid } from "@atcute/lexicons/syntax";
3
+
import { Cid, isCid, ResourceUri, type AtprotoDid } from "@atcute/lexicons/syntax";
4
4
import { Blob, LegacyBlob } from "@atcute/lexicons";
5
5
import { relations, sql, type SQL } from "drizzle-orm";
6
6
import { isBlob, isCidLink, isLegacyBlob } from "@atcute/lexicons/interfaces";
···
57
57
uri: text('uri')
58
58
.generatedAlwaysAs((): SQL => sql`'at://' || ${profilesTable.did} || '/blue.recipes.actor.profile/self'`)
59
59
.$type<ResourceUri>(),
60
+
cid: text("cid").$type<Cid>().notNull(),
60
61
did: text("did").$type<AtprotoDid>().notNull().primaryKey(),
61
62
ingestedAt: dateIsoText("ingested_at").notNull().default(sql`CURRENT_TIMESTAMP`),
62
63
···
68
69
bannerRef: atBlob('banner'),
69
70
createdAt: dateIsoText("created_at").notNull(),
70
71
}, t => ([
72
+
index('profiles_cid_idx').on(t.cid),
71
73
index('profiles_cat_idx').on(t.createdAt),
72
74
index('profiles_iat_idx').on(t.ingestedAt),
73
75
]));
···
76
78
uri: text('uri')
77
79
.generatedAlwaysAs((): SQL => sql`'at://' || ${recipeTable.did} || '/blue.recipes.feed.recipe/' || ${recipeTable.rkey}`),
78
80
81
+
cid: text("cid").$type<Cid>().notNull(),
79
82
did: text("author_did")
80
83
.$type<AtprotoDid>()
81
84
.notNull()
···
99
102
ingestedAt: dateIsoText("ingested_at").notNull().default(sql`CURRENT_TIMESTAMP`),
100
103
}, t => ([
101
104
index('recipes_title_idx').on(t.title),
105
+
index('recipes_cid_idx').on(t.cid),
102
106
index('recipes_cat_idx').on(t.createdAt),
103
107
index('recipes_iat_idx').on(t.ingestedAt),
104
108
primaryKey({ columns: [ t.did, t.rkey ] }),
+4
libs/database/migrations/0003_long_blue_marvel.sql
+4
libs/database/migrations/0003_long_blue_marvel.sql
+312
libs/database/migrations/meta/0003_snapshot.json
+312
libs/database/migrations/meta/0003_snapshot.json
···
1
+
{
2
+
"version": "6",
3
+
"dialect": "sqlite",
4
+
"id": "ca3337d9-69a0-468d-8364-0f05e91a0233",
5
+
"prevId": "25f6fc02-0357-4a4a-a43c-6fc138a21401",
6
+
"tables": {
7
+
"profiles": {
8
+
"name": "profiles",
9
+
"columns": {
10
+
"uri": {
11
+
"name": "uri",
12
+
"type": "text",
13
+
"primaryKey": false,
14
+
"notNull": false,
15
+
"autoincrement": false,
16
+
"generated": {
17
+
"as": "('at://' || \"did\" || '/blue.recipes.actor.profile/self')",
18
+
"type": "virtual"
19
+
}
20
+
},
21
+
"cid": {
22
+
"name": "cid",
23
+
"type": "text",
24
+
"primaryKey": false,
25
+
"notNull": true,
26
+
"autoincrement": false
27
+
},
28
+
"did": {
29
+
"name": "did",
30
+
"type": "text",
31
+
"primaryKey": true,
32
+
"notNull": true,
33
+
"autoincrement": false
34
+
},
35
+
"ingested_at": {
36
+
"name": "ingested_at",
37
+
"type": "text",
38
+
"primaryKey": false,
39
+
"notNull": true,
40
+
"autoincrement": false,
41
+
"default": "CURRENT_TIMESTAMP"
42
+
},
43
+
"display_name": {
44
+
"name": "display_name",
45
+
"type": "text(640)",
46
+
"primaryKey": false,
47
+
"notNull": true,
48
+
"autoincrement": false
49
+
},
50
+
"description": {
51
+
"name": "description",
52
+
"type": "text(2500)",
53
+
"primaryKey": false,
54
+
"notNull": false,
55
+
"autoincrement": false
56
+
},
57
+
"pronouns": {
58
+
"name": "pronouns",
59
+
"type": "text(200)",
60
+
"primaryKey": false,
61
+
"notNull": false,
62
+
"autoincrement": false
63
+
},
64
+
"website": {
65
+
"name": "website",
66
+
"type": "text",
67
+
"primaryKey": false,
68
+
"notNull": false,
69
+
"autoincrement": false
70
+
},
71
+
"avatar": {
72
+
"name": "avatar",
73
+
"type": "text",
74
+
"primaryKey": false,
75
+
"notNull": false,
76
+
"autoincrement": false
77
+
},
78
+
"banner": {
79
+
"name": "banner",
80
+
"type": "text",
81
+
"primaryKey": false,
82
+
"notNull": false,
83
+
"autoincrement": false
84
+
},
85
+
"created_at": {
86
+
"name": "created_at",
87
+
"type": "text",
88
+
"primaryKey": false,
89
+
"notNull": true,
90
+
"autoincrement": false
91
+
}
92
+
},
93
+
"indexes": {
94
+
"profiles_cid_idx": {
95
+
"name": "profiles_cid_idx",
96
+
"columns": [
97
+
"cid"
98
+
],
99
+
"isUnique": false
100
+
},
101
+
"profiles_cat_idx": {
102
+
"name": "profiles_cat_idx",
103
+
"columns": [
104
+
"created_at"
105
+
],
106
+
"isUnique": false
107
+
},
108
+
"profiles_iat_idx": {
109
+
"name": "profiles_iat_idx",
110
+
"columns": [
111
+
"ingested_at"
112
+
],
113
+
"isUnique": false
114
+
}
115
+
},
116
+
"foreignKeys": {},
117
+
"compositePrimaryKeys": {},
118
+
"uniqueConstraints": {},
119
+
"checkConstraints": {}
120
+
},
121
+
"recipes": {
122
+
"name": "recipes",
123
+
"columns": {
124
+
"uri": {
125
+
"name": "uri",
126
+
"type": "text",
127
+
"primaryKey": false,
128
+
"notNull": false,
129
+
"autoincrement": false,
130
+
"generated": {
131
+
"as": "('at://' || \"author_did\" || '/blue.recipes.feed.recipe/' || \"rkey\")",
132
+
"type": "virtual"
133
+
}
134
+
},
135
+
"cid": {
136
+
"name": "cid",
137
+
"type": "text",
138
+
"primaryKey": false,
139
+
"notNull": true,
140
+
"autoincrement": false
141
+
},
142
+
"author_did": {
143
+
"name": "author_did",
144
+
"type": "text",
145
+
"primaryKey": false,
146
+
"notNull": true,
147
+
"autoincrement": false
148
+
},
149
+
"rkey": {
150
+
"name": "rkey",
151
+
"type": "text",
152
+
"primaryKey": false,
153
+
"notNull": true,
154
+
"autoincrement": false
155
+
},
156
+
"image": {
157
+
"name": "image",
158
+
"type": "text",
159
+
"primaryKey": false,
160
+
"notNull": false,
161
+
"autoincrement": false
162
+
},
163
+
"title": {
164
+
"name": "title",
165
+
"type": "text",
166
+
"primaryKey": false,
167
+
"notNull": true,
168
+
"autoincrement": false
169
+
},
170
+
"time": {
171
+
"name": "time",
172
+
"type": "integer",
173
+
"primaryKey": false,
174
+
"notNull": true,
175
+
"autoincrement": false,
176
+
"default": 0
177
+
},
178
+
"serves": {
179
+
"name": "serves",
180
+
"type": "integer",
181
+
"primaryKey": false,
182
+
"notNull": false,
183
+
"autoincrement": false
184
+
},
185
+
"description": {
186
+
"name": "description",
187
+
"type": "text",
188
+
"primaryKey": false,
189
+
"notNull": false,
190
+
"autoincrement": false
191
+
},
192
+
"ingredients": {
193
+
"name": "ingredients",
194
+
"type": "text",
195
+
"primaryKey": false,
196
+
"notNull": true,
197
+
"autoincrement": false
198
+
},
199
+
"ingredients_count": {
200
+
"name": "ingredients_count",
201
+
"type": "integer",
202
+
"primaryKey": false,
203
+
"notNull": false,
204
+
"autoincrement": false,
205
+
"generated": {
206
+
"as": "(json_array_length(\"ingredients\"))",
207
+
"type": "virtual"
208
+
}
209
+
},
210
+
"steps": {
211
+
"name": "steps",
212
+
"type": "text",
213
+
"primaryKey": false,
214
+
"notNull": true,
215
+
"autoincrement": false
216
+
},
217
+
"steps_count": {
218
+
"name": "steps_count",
219
+
"type": "integer",
220
+
"primaryKey": false,
221
+
"notNull": false,
222
+
"autoincrement": false,
223
+
"generated": {
224
+
"as": "(json_array_length(\"steps\"))",
225
+
"type": "virtual"
226
+
}
227
+
},
228
+
"created_at": {
229
+
"name": "created_at",
230
+
"type": "text",
231
+
"primaryKey": false,
232
+
"notNull": true,
233
+
"autoincrement": false
234
+
},
235
+
"ingested_at": {
236
+
"name": "ingested_at",
237
+
"type": "text",
238
+
"primaryKey": false,
239
+
"notNull": true,
240
+
"autoincrement": false,
241
+
"default": "CURRENT_TIMESTAMP"
242
+
}
243
+
},
244
+
"indexes": {
245
+
"recipes_title_idx": {
246
+
"name": "recipes_title_idx",
247
+
"columns": [
248
+
"title"
249
+
],
250
+
"isUnique": false
251
+
},
252
+
"recipes_cid_idx": {
253
+
"name": "recipes_cid_idx",
254
+
"columns": [
255
+
"cid"
256
+
],
257
+
"isUnique": false
258
+
},
259
+
"recipes_cat_idx": {
260
+
"name": "recipes_cat_idx",
261
+
"columns": [
262
+
"created_at"
263
+
],
264
+
"isUnique": false
265
+
},
266
+
"recipes_iat_idx": {
267
+
"name": "recipes_iat_idx",
268
+
"columns": [
269
+
"ingested_at"
270
+
],
271
+
"isUnique": false
272
+
}
273
+
},
274
+
"foreignKeys": {
275
+
"recipes_author_did_profiles_did_fk": {
276
+
"name": "recipes_author_did_profiles_did_fk",
277
+
"tableFrom": "recipes",
278
+
"tableTo": "profiles",
279
+
"columnsFrom": [
280
+
"author_did"
281
+
],
282
+
"columnsTo": [
283
+
"did"
284
+
],
285
+
"onDelete": "cascade",
286
+
"onUpdate": "no action"
287
+
}
288
+
},
289
+
"compositePrimaryKeys": {
290
+
"recipes_author_did_rkey_pk": {
291
+
"columns": [
292
+
"author_did",
293
+
"rkey"
294
+
],
295
+
"name": "recipes_author_did_rkey_pk"
296
+
}
297
+
},
298
+
"uniqueConstraints": {},
299
+
"checkConstraints": {}
300
+
}
301
+
},
302
+
"views": {},
303
+
"enums": {},
304
+
"_meta": {
305
+
"schemas": {},
306
+
"tables": {},
307
+
"columns": {}
308
+
},
309
+
"internal": {
310
+
"indexes": {}
311
+
}
312
+
}
+7
libs/database/migrations/meta/_journal.json
+7
libs/database/migrations/meta/_journal.json