+144
src/pds.js
+144
src/pds.js
···
909
909
method: 'POST',
910
910
handler: (pds, req, url) => pds.handleCreateRecord(req)
911
911
},
912
+
'/xrpc/com.atproto.repo.deleteRecord': {
913
+
method: 'POST',
914
+
handler: (pds, req, url) => pds.handleDeleteRecord(req)
915
+
},
912
916
'/xrpc/com.atproto.repo.getRecord': {
913
917
handler: (pds, req, url) => pds.handleGetRecord(url)
914
918
},
···
1154
1158
return { uri, cid: recordCidStr, commit: commitCidStr }
1155
1159
}
1156
1160
1161
+
async deleteRecord(collection, rkey) {
1162
+
const did = await this.getDid()
1163
+
if (!did) throw new Error('PDS not initialized')
1164
+
1165
+
const uri = `at://${did}/${collection}/${rkey}`
1166
+
1167
+
// Check if record exists
1168
+
const existing = this.sql.exec(
1169
+
`SELECT cid FROM records WHERE uri = ?`, uri
1170
+
).toArray()
1171
+
if (existing.length === 0) {
1172
+
return { error: 'RecordNotFound', message: 'record not found' }
1173
+
}
1174
+
1175
+
// Delete from records table
1176
+
this.sql.exec(`DELETE FROM records WHERE uri = ?`, uri)
1177
+
1178
+
// Rebuild MST
1179
+
const mst = new MST(this.sql)
1180
+
const dataRoot = await mst.computeRoot()
1181
+
1182
+
// Get previous commit
1183
+
const prevCommits = this.sql.exec(
1184
+
`SELECT cid, rev FROM commits ORDER BY seq DESC LIMIT 1`
1185
+
).toArray()
1186
+
const prevCommit = prevCommits.length > 0 ? prevCommits[0] : null
1187
+
1188
+
// Create commit
1189
+
const rev = createTid()
1190
+
const commit = {
1191
+
did,
1192
+
version: 3,
1193
+
data: dataRoot ? new CID(cidToBytes(dataRoot)) : null,
1194
+
rev,
1195
+
prev: prevCommit?.cid ? new CID(cidToBytes(prevCommit.cid)) : null
1196
+
}
1197
+
1198
+
// Sign commit
1199
+
const commitBytes = cborEncodeDagCbor(commit)
1200
+
const signingKey = await this.getSigningKey()
1201
+
const sig = await sign(signingKey, commitBytes)
1202
+
1203
+
const signedCommit = { ...commit, sig }
1204
+
const signedBytes = cborEncodeDagCbor(signedCommit)
1205
+
const commitCid = await createCid(signedBytes)
1206
+
const commitCidStr = cidToString(commitCid)
1207
+
1208
+
// Store commit block
1209
+
this.sql.exec(
1210
+
`INSERT OR REPLACE INTO blocks (cid, data) VALUES (?, ?)`,
1211
+
commitCidStr, signedBytes
1212
+
)
1213
+
1214
+
// Store commit reference
1215
+
this.sql.exec(
1216
+
`INSERT INTO commits (cid, rev, prev) VALUES (?, ?, ?)`,
1217
+
commitCidStr, rev, prevCommit?.cid || null
1218
+
)
1219
+
1220
+
// Update head and rev
1221
+
await this.state.storage.put('head', commitCidStr)
1222
+
await this.state.storage.put('rev', rev)
1223
+
1224
+
// Collect blocks for the event (commit + MST nodes, no record block)
1225
+
const newBlocks = []
1226
+
newBlocks.push({ cid: commitCidStr, data: signedBytes })
1227
+
if (dataRoot) {
1228
+
const mstBlocks = this.collectMstBlocks(dataRoot)
1229
+
newBlocks.push(...mstBlocks)
1230
+
}
1231
+
1232
+
// Sequence event with delete action
1233
+
const eventTime = new Date().toISOString()
1234
+
const evt = cborEncode({
1235
+
ops: [{ action: 'delete', path: `${collection}/${rkey}`, cid: null }],
1236
+
blocks: buildCarFile(commitCidStr, newBlocks),
1237
+
rev,
1238
+
time: eventTime
1239
+
})
1240
+
this.sql.exec(
1241
+
`INSERT INTO seq_events (did, commit_cid, evt) VALUES (?, ?, ?)`,
1242
+
did, commitCidStr, evt
1243
+
)
1244
+
1245
+
// Broadcast to subscribers
1246
+
const evtRows = this.sql.exec(
1247
+
`SELECT * FROM seq_events ORDER BY seq DESC LIMIT 1`
1248
+
).toArray()
1249
+
if (evtRows.length > 0) {
1250
+
this.broadcastEvent(evtRows[0])
1251
+
// Forward to default DO for relay subscribers
1252
+
if (this.env?.PDS) {
1253
+
const defaultId = this.env.PDS.idFromName('default')
1254
+
const defaultPds = this.env.PDS.get(defaultId)
1255
+
const row = evtRows[0]
1256
+
const evtArray = Array.from(new Uint8Array(row.evt))
1257
+
defaultPds.fetch(new Request('http://internal/forward-event', {
1258
+
method: 'POST',
1259
+
body: JSON.stringify({ ...row, evt: evtArray })
1260
+
})).catch(e => console.log('forward error:', e))
1261
+
}
1262
+
}
1263
+
1264
+
return { ok: true }
1265
+
}
1266
+
1157
1267
formatEvent(evt) {
1158
1268
// AT Protocol frame format: header + body
1159
1269
// Use DAG-CBOR encoding for body (CIDs need tag 42 + 0x00 prefix)
···
1332
1442
try {
1333
1443
const result = await this.createRecord(body.collection, body.record, body.rkey)
1334
1444
return Response.json(result)
1445
+
} catch (err) {
1446
+
return Response.json({ error: err.message }, { status: 500 })
1447
+
}
1448
+
}
1449
+
1450
+
async handleDeleteRecord(request) {
1451
+
const body = await request.json()
1452
+
if (!body.collection || !body.rkey) {
1453
+
return Response.json({ error: 'InvalidRequest', message: 'missing collection or rkey' }, { status: 400 })
1454
+
}
1455
+
try {
1456
+
const result = await this.deleteRecord(body.collection, body.rkey)
1457
+
if (result.error) {
1458
+
return Response.json(result, { status: 404 })
1459
+
}
1460
+
return Response.json({})
1335
1461
} catch (err) {
1336
1462
return Response.json({ error: err.message }, { status: 500 })
1337
1463
}
···
1722
1848
const id = env.PDS.idFromName(repo)
1723
1849
const pds = env.PDS.get(id)
1724
1850
return pds.fetch(request)
1851
+
}
1852
+
1853
+
// POST repo endpoints have repo in body
1854
+
if (url.pathname === '/xrpc/com.atproto.repo.deleteRecord' ||
1855
+
url.pathname === '/xrpc/com.atproto.repo.createRecord') {
1856
+
const body = await request.json()
1857
+
const repo = body.repo
1858
+
if (!repo) {
1859
+
return Response.json({ error: 'InvalidRequest', message: 'missing repo param' }, { status: 400 })
1860
+
}
1861
+
const id = env.PDS.idFromName(repo)
1862
+
const pds = env.PDS.get(id)
1863
+
// Re-create request with body since we consumed it
1864
+
return pds.fetch(new Request(request.url, {
1865
+
method: 'POST',
1866
+
headers: request.headers,
1867
+
body: JSON.stringify(body)
1868
+
}))
1725
1869
}
1726
1870
1727
1871
const did = url.searchParams.get('did')