+1
server/deno.json
+1
server/deno.json
···
5
5
"dk": "deno run -A --node-modules-dir npm:drizzle-kit"
6
6
},
7
7
"imports": {
8
+
"@atcute/cbor": "npm:@atcute/cbor@^2.2.6",
8
9
"@atcute/identity-resolver": "npm:@atcute/identity-resolver@^1.1.3",
9
10
"@atcute/atproto": "npm:@atcute/atproto@^3.1.4",
10
11
"@atcute/client": "npm:@atcute/client@^4.0.3",
+26
server/deno.lock
+26
server/deno.lock
···
2
2
"version": "5",
3
3
"specifiers": {
4
4
"npm:@atcute/atproto@^3.1.4": "3.1.4",
5
+
"npm:@atcute/cbor@^2.2.6": "2.2.6",
5
6
"npm:@atcute/client@^4.0.3": "4.0.3",
6
7
"npm:@atcute/identity-resolver@^1.1.3": "1.1.3_@atcute+identity@1.1.0",
7
8
"npm:@atcute/lex-cli@^2.2.0": "2.2.0_@badrap+valita@0.4.6",
···
16
17
"integrity": "sha512-v0/ue7mZYtjYw4vWbtda51bLwW88mqsUQB8F/UZNO18ANAQWmKq1HDceVqjvruaLe2QPqE43XM3WkEyZ2FhOrA==",
17
18
"dependencies": [
18
19
"@atcute/lexicons"
20
+
]
21
+
},
22
+
"@atcute/cbor@2.2.6": {
23
+
"integrity": "sha512-pDfsn/vPTmgeXZiZdyc5vCGCPSxWlfTUIGFMCd5SroAgoLk1v9xxF7R/8+gt1lj1OKAwCwhS0doVmtLjqqzdbA==",
24
+
"dependencies": [
25
+
"@atcute/cid",
26
+
"@atcute/multibase",
27
+
"@atcute/uint8array"
28
+
]
29
+
},
30
+
"@atcute/cid@2.2.4": {
31
+
"integrity": "sha512-6RUMyt7rp6KOSb4TWwifOZURnFrGgKqYyjVkYjiAcscZWgJpJxwoCUCdonxCfxhQtB0yJ+WlfqNXicGB+Pe94A==",
32
+
"dependencies": [
33
+
"@atcute/multibase",
34
+
"@atcute/uint8array"
19
35
]
20
36
},
21
37
"@atcute/client@4.0.3": {
···
63
79
"dependencies": [
64
80
"esm-env"
65
81
]
82
+
},
83
+
"@atcute/multibase@1.1.6": {
84
+
"integrity": "sha512-HBxuCgYLKPPxETV0Rot4VP9e24vKl8JdzGCZOVsDaOXJgbRZoRIF67Lp0H/OgnJeH/Xpva8Z5ReoTNJE5dn3kg==",
85
+
"dependencies": [
86
+
"@atcute/uint8array"
87
+
]
88
+
},
89
+
"@atcute/uint8array@1.0.5": {
90
+
"integrity": "sha512-XLWWxoR2HNl2qU+FCr0rp1APwJXci7HnzbOQLxK55OaMNBXZ19+xNC5ii4QCsThsDxa4JS/JTzuiQLziITWf2Q=="
66
91
},
67
92
"@atcute/util-fetch@1.0.1": {
68
93
"integrity": "sha512-Clc0E/5ufyGBVfYBUwWNlHONlZCoblSr4Ho50l1LhmRPGB1Wu/AQ9Sz+rsBg7fdaW/auve8ulmwhRhnX2cGRow==",
···
632
657
"workspace": {
633
658
"dependencies": [
634
659
"npm:@atcute/atproto@^3.1.4",
660
+
"npm:@atcute/cbor@^2.2.6",
635
661
"npm:@atcute/client@^4.0.3",
636
662
"npm:@atcute/identity-resolver@^1.1.3",
637
663
"npm:@atcute/lex-cli@^2.2.0",
+11
-5
server/src/backfill/index.ts
+11
-5
server/src/backfill/index.ts
···
4
4
import { db as db_type } from "../utils.ts";
5
5
import { Client, simpleFetchHandler } from "@atcute/client";
6
6
import oldRecords from "./old-records.ts";
7
+
import newRecords from "./new-records.ts";
7
8
8
-
const db: db_type = drizzle<typeof schema>(Deno.env.get("DB_FILE_NAME")!);
9
+
const db: db_type = drizzle<typeof schema>(
10
+
Deno.env.get("DB_FILE_NAME")! ||
11
+
(() => {
12
+
throw "DB_FILE_NAME not set";
13
+
})(),
14
+
);
9
15
10
16
// we block access to the database till we've set up listeners etc
11
17
// set an initial value for ts sake; this will set asap
···
19
25
// so we should just nuke them
20
26
await db.delete(routes);
21
27
22
-
const backfillClient = new Client({
28
+
const relay = new Client({
23
29
handler: simpleFetchHandler({
24
-
service:
25
-
Deno.env.get("ATPROTO_RELAY") ||
30
+
service: Deno.env.get("ATPROTO_RELAY") ||
26
31
(() => {
27
32
throw "ATPROTO_RELAY not set";
28
33
})(),
29
34
}),
30
35
});
31
36
32
-
oldRecords(backfillClient, db);
37
+
oldRecords(db, relay);
38
+
newRecords(db);
33
39
34
40
res(db);
+192
server/src/backfill/new-records.ts
+192
server/src/backfill/new-records.ts
···
1
+
import { db, getPds, isDid, rkeyToUrl } from "../utils.ts";
2
+
import { decodeFirst } from "@atcute/cbor";
3
+
import {
4
+
ComAtprotoSyncSubscribeRepos,
5
+
DevAtcitiesRoute,
6
+
} from "../lexicons/index.ts";
7
+
import { is } from "@atcute/lexicons";
8
+
import { Client, simpleFetchHandler } from "@atcute/client";
9
+
import { routes } from "../db/schema.ts";
10
+
import { and, eq } from "drizzle-orm";
11
+
12
+
const ErrorEvent = Symbol("error event");
13
+
export default async function newRecords(db: db) {
14
+
// https://pdsls.dev/at://did:plc:6msi3pj7krzih5qxqtryxlzw/com.atproto.lexicon.schema/com.atproto.sync.subscribeRepos
15
+
const ws = new WebSocket(
16
+
`${
17
+
Deno.env.get("ATPROTO_RELAY") ||
18
+
(() => {
19
+
throw "ATPROTO_RELAY not set";
20
+
})()
21
+
}/xrpc/com.atproto.sync.subscribeRepos`
22
+
);
23
+
24
+
ws.addEventListener("close", (ev) => {
25
+
throw ev;
26
+
});
27
+
ws.addEventListener("error", (ev) => {
28
+
throw ev;
29
+
});
30
+
31
+
try {
32
+
ws.addEventListener("message", async (ev) => {
33
+
if (ev.data instanceof Blob) {
34
+
try {
35
+
const [header, remainder] = decodeFirst(await ev.data.bytes());
36
+
const [payload] = decodeFirst(remainder);
37
+
// https://atproto.com/specs/event-stream#framing
38
+
if (typeof header !== "object" || !header) return;
39
+
if (!("op" in header) || typeof header.op !== "number") return;
40
+
if (header.op === -1) {
41
+
console.error(header);
42
+
ws.close();
43
+
throw ErrorEvent;
44
+
}
45
+
46
+
if (!is(ComAtprotoSyncSubscribeRepos.commitSchema, payload)) {
47
+
return;
48
+
}
49
+
if (!isDid(payload.repo)) {
50
+
console.warn("Invalid did:", payload.repo);
51
+
return;
52
+
}
53
+
54
+
const pdsClient = (() => {
55
+
let client: Client | undefined;
56
+
return async () => {
57
+
if (client) {
58
+
return client;
59
+
}
60
+
if (!isDid(payload.repo)) {
61
+
console.warn("Invalid did:", payload.repo);
62
+
return;
63
+
}
64
+
65
+
const pds = await getPds(payload.repo);
66
+
if (!pds) {
67
+
return;
68
+
}
69
+
client = new Client({
70
+
handler: simpleFetchHandler({
71
+
service: pds,
72
+
}),
73
+
});
74
+
75
+
return client;
76
+
};
77
+
})();
78
+
79
+
for (const op of payload.ops) {
80
+
const [collection, rkey] = op.path.split("/");
81
+
const path = rkeyToUrl(rkey);
82
+
if (!path) {
83
+
console.warn("rkey not valid!");
84
+
continue;
85
+
}
86
+
if (collection !== "dev.atcities.route") continue;
87
+
switch (op.action) {
88
+
case "create":
89
+
case "update": {
90
+
const client = await pdsClient();
91
+
if (!client) {
92
+
console.warn("could not resolve pds for", payload.repo);
93
+
return;
94
+
}
95
+
const { data, ok } = await client.get(
96
+
"com.atproto.repo.getRecord",
97
+
{
98
+
params: {
99
+
repo: payload.repo,
100
+
collection: "dev.atcities.route",
101
+
rkey,
102
+
},
103
+
}
104
+
);
105
+
106
+
if (!ok) {
107
+
console.warn(data);
108
+
continue;
109
+
}
110
+
111
+
if (!is(DevAtcitiesRoute.mainSchema, data.value)) {
112
+
console.warn("Invalid record");
113
+
continue;
114
+
}
115
+
116
+
if (data.value.page.$type !== "dev.atcities.route#blob") {
117
+
console.warn("Unknown page type");
118
+
continue;
119
+
}
120
+
121
+
const id = (
122
+
await db
123
+
.select({
124
+
id: routes.id,
125
+
})
126
+
.from(routes)
127
+
.where(
128
+
and(
129
+
eq(routes.did, payload.repo),
130
+
eq(routes.url_route, path)
131
+
)
132
+
)
133
+
).at(0);
134
+
135
+
await db.insert(routes).values({
136
+
did: payload.repo,
137
+
url_route: path,
138
+
id: id ? id.id : undefined,
139
+
blob_cid:
140
+
"ref" in data.value.page.blob
141
+
? data.value.page.blob.ref.$link
142
+
: data.value.page.blob.cid,
143
+
mime: data.value.page.blob.mimeType,
144
+
});
145
+
146
+
console.log("wrote route");
147
+
148
+
continue;
149
+
}
150
+
case "delete": {
151
+
const id = (
152
+
await db
153
+
.select({
154
+
id: routes.id,
155
+
})
156
+
.from(routes)
157
+
.where(
158
+
and(
159
+
eq(routes.did, payload.repo),
160
+
eq(routes.url_route, path)
161
+
)
162
+
)
163
+
).at(0);
164
+
if (!id) continue;
165
+
166
+
await db.delete(routes).where(eq(routes.id, id.id));
167
+
console.log("deleted route");
168
+
169
+
continue;
170
+
}
171
+
}
172
+
}
173
+
} catch (e) {
174
+
if (Error.isError(e)) {
175
+
console.warn(
176
+
"Invalid DAG-CBOR data:",
177
+
e.name,
178
+
e.message,
179
+
e.cause,
180
+
e.stack
181
+
);
182
+
} else throw e;
183
+
}
184
+
}
185
+
});
186
+
} catch (e) {
187
+
// if and only if an error message was recived, reopen the connection
188
+
if (e === ErrorEvent) newRecords(db);
189
+
else throw e;
190
+
}
191
+
ws.addEventListener("open", () => console.log("subscribed!!"));
192
+
}
+5
-4
server/src/backfill/old-records.ts
+5
-4
server/src/backfill/old-records.ts
···
25
25
break;
26
26
}
27
27
28
-
for (const record of data.records)
28
+
for (const record of data.records) {
29
29
if (is(DevAtcitiesRoute.mainSchema, record.value)) {
30
30
if (record.value.page.$type === "dev.atcities.route#blob") {
31
31
const url = rkeyToUrl(record.uri.split("/")[4]);
···
39
39
: record.value.page.blob.cid,
40
40
mime: record.value.page.blob.mimeType,
41
41
};
42
-
db.insert(routes).values(data);
42
+
await db.insert(routes).values(data);
43
43
}
44
44
} else console.warn("Invalid record:", record.uri);
45
+
}
45
46
46
47
cursor = data.cursor;
47
48
if (!cursor) break;
48
49
}
49
50
}
50
51
51
-
export default async function (backfillClient: Client, db: db) {
52
+
export default async function (db: db, relay: Client) {
52
53
let repos: `did:${string}:${string}`[] = [];
53
54
let cursor: string | undefined;
54
55
while (true) {
55
-
const { data, ok } = await backfillClient.get(
56
+
const { data, ok } = await relay.get(
56
57
"com.atproto.sync.listReposByCollection",
57
58
{
58
59
params: {