ANProto over ATProto -- using Bluesky PDSes to store ANProto messages and blobs

add a friendly message when a message is already on the pds

Changed files
+239 -46
src
+239 -46
src/index.ts
··· 170 170 return res.status(401).json({ error: 'Not logged in' }) 171 171 } 172 172 173 - const { anmsg, blob, rkey, anhash: providedAnhash, blobhash: providedBlobhash } = req.body ?? {} 173 + const { 174 + anmsg, 175 + blob, 176 + anblob: bodyAnblob, 177 + rkey, 178 + anhash: providedAnhash, 179 + blobhash: providedBlobhash, 180 + } = req.body ?? {} 181 + let computedRkey: string | undefined 174 182 if (typeof anmsg !== 'string' || !anmsg.trim()) { 175 183 return res.status(400).json({ error: 'anmsg is required' }) 176 184 } 185 + if (blob && blob === anmsg) { 186 + return res.status(400).json({ error: 'anmsg must be the signature; blob belongs in the blob field' }) 187 + } 177 188 const maxAnmsgLen = 20000 178 189 if (anmsg.length > maxAnmsgLen) { 179 190 return res.status(400).json({ error: 'anmsg too large' }) 180 191 } 192 + 193 + let agent: Agent | null = null 181 194 182 195 try { 183 196 const oauthSession = await client.restore(session.did) ··· 186 199 return res.status(401).json({ error: 'Session expired' }) 187 200 } 188 201 189 - const agent = new Agent(oauthSession) 202 + agent = new Agent(oauthSession) 190 203 let finalBlobhash: string | undefined 191 204 let anblob: string | undefined 192 205 193 - if (blob) { 194 - if (typeof blob !== 'string') { 195 - return res.status(400).json({ error: 'blob must be base64 string' }) 196 - } 197 - const buf = Buffer.from(blob, 'base64') 206 + const incomingBlob = 207 + typeof bodyAnblob === 'string' && bodyAnblob.trim() 208 + ? bodyAnblob 209 + : typeof blob === 'string' 210 + ? blob 211 + : '' 212 + if (!incomingBlob) { 213 + return res.status(400).json({ error: 'anblob is required' }) 214 + } 215 + if (incomingBlob) { 216 + const buf = Buffer.from(incomingBlob) 198 217 if (!buf.length) { 199 - return res.status(400).json({ error: 'blob decode failed' }) 218 + return res.status(400).json({ error: 'anblob decode failed' }) 200 219 } 201 220 const maxBlobBytes = 6 * 1024 * 1024 202 221 if (buf.length > maxBlobBytes) { 203 - return res.status(400).json({ error: 'blob too large (max 6MB)' }) 222 + return res.status(400).json({ error: 'anblob too large (max 6MB)' }) 204 223 } 205 - anblob = blob 206 - finalBlobhash = await hashAn(blob) 224 + anblob = incomingBlob 225 + finalBlobhash = await hashAn(incomingBlob) 207 226 } else if (typeof providedBlobhash === 'string' && providedBlobhash.trim()) { 208 227 finalBlobhash = providedBlobhash.trim() 209 228 } ··· 212 231 if (providedAnhash && providedAnhash !== anhash) { 213 232 return res.status(400).json({ error: 'anhash does not match anmsg hash' }) 214 233 } 215 - const computedRkey = toBase64Url(anhash) 234 + computedRkey = toBase64Url(anhash) 216 235 if (typeof rkey === 'string' && rkey.trim() && rkey.trim() !== computedRkey) { 217 236 return res.status(400).json({ error: 'rkey does not match anmsg hash' }) 218 237 } ··· 246 265 247 266 res.json({ ok: true, rkey: computedRkey }) 248 267 } catch (err) { 249 - console.error('Publish failed:', err) 250 - res.status(500).json({ error: 'Publish failed', detail: String(err) }) 268 + let responseData = (err as any)?.response?.data 269 + let responseStatus = (err as any)?.response?.status 270 + const responseError = typeof responseData?.error === 'string' ? responseData.error : '' 271 + const responseMessage = 272 + typeof responseData?.message === 'string' 273 + ? responseData.message 274 + : typeof (err as any)?.message === 'string' 275 + ? (err as any).message 276 + : '' 277 + const isDuplicate = 278 + (err as any)?.response?.status === 409 || 279 + responseError === 'RecordAlreadyExists' || 280 + responseError === 'already_exists' || 281 + /already exists/i.test(responseMessage) || 282 + /internal server error/i.test(responseMessage) 283 + 284 + if (!isDuplicate && computedRkey && session.did) { 285 + try { 286 + const ensureAgent = async () => { 287 + if (agent) return agent 288 + const oauthSession = await client.restore(session.did!) 289 + if (!oauthSession) return null 290 + return new Agent(oauthSession) 291 + } 292 + const fetchAgent = await ensureAgent() 293 + if (fetchAgent) { 294 + const existing = await fetchAgent.com.atproto.repo.getRecord({ 295 + repo: session.did!, 296 + collection: messageLexUri, 297 + rkey: computedRkey, 298 + }) 299 + if (existing?.data) { 300 + isDuplicate = true 301 + responseStatus = responseStatus || 409 302 + responseData = existing.data 303 + } 304 + } 305 + } catch { 306 + // ignore fallback duplicate check errors 307 + } 308 + } 309 + 310 + const status = isDuplicate 311 + ? 409 312 + : (err as any)?.status || (err as any)?.response?.status || 500 313 + const atUri = 314 + session.did && computedRkey ? `at://${session.did}/${messageLexUri}/${computedRkey}` : undefined 315 + const detail = isDuplicate 316 + ? { 317 + message: 'Message already on PDS.', 318 + ...(computedRkey ? { rkey: computedRkey } : {}), 319 + ...(atUri ? { atUri } : {}), 320 + } 321 + : responseData || responseMessage || String(err) 322 + console.error('Publish failed:', { 323 + status, 324 + detail, 325 + duplicate: isDuplicate, 326 + stack: (err as any)?.stack, 327 + responseData, 328 + }) 329 + res 330 + .status(status) 331 + .json({ 332 + code: isDuplicate ? 'already_exists' : 'publish_failed', 333 + error: isDuplicate ? 'Message already on PDS.' : 'Publish failed', 334 + ...(computedRkey ? { rkey: computedRkey } : {}), 335 + ...(atUri ? { atUri } : {}), 336 + detail, 337 + }) 251 338 } 252 339 }) 253 340 ··· 1026 1113 return pub 1027 1114 } 1028 1115 1116 + const resolveSignatureForEntry = async (msg) => { 1117 + if (msg?.sig) return msg.sig 1118 + if (msg?.hash) { 1119 + try { 1120 + const maybeSig = await apds.get(msg.hash) 1121 + if (maybeSig) { 1122 + const opened = await apds.open(maybeSig) 1123 + if (opened) return maybeSig 1124 + } 1125 + } catch { 1126 + // ignore 1127 + } 1128 + } 1129 + if (msg?.text) { 1130 + try { 1131 + const opened = await apds.open(msg.text) 1132 + if (opened) return msg.text 1133 + } catch { 1134 + // ignore 1135 + } 1136 + } 1137 + return null 1138 + } 1139 + 1140 + const buildPayloadFromSig = async (sig) => { 1141 + if (!sig) return null 1142 + const payload = { anmsg: sig } 1143 + try { 1144 + payload.anhash = await apds.hash(sig) 1145 + } catch { 1146 + // leave out anhash if it fails 1147 + } 1148 + try { 1149 + const opened = await apds.open(sig) 1150 + if (opened && opened.length > 13) { 1151 + const contentHash = opened.substring(13) 1152 + const maybeBlob = await apds.get(contentHash) 1153 + if (typeof maybeBlob === 'string' && maybeBlob.length) { 1154 + payload.anblob = maybeBlob 1155 + } else { 1156 + logEvent('Missing blob for ' + contentHash, true) 1157 + return null 1158 + } 1159 + } 1160 + } catch { 1161 + // ignore blob failures 1162 + } 1163 + return payload 1164 + } 1165 + 1166 + const publishWithRetry = async (payload, label = 'Publish') => { 1167 + const isAlreadyExists = (res, data) => { 1168 + const code = (data?.code || data?.error || '').toString().toLowerCase() 1169 + const msg = (data?.message || data?.error || data?.detail?.message || '').toString().toLowerCase() 1170 + return ( 1171 + res?.status === 409 || 1172 + code === 'already_exists' || 1173 + code === 'recordalreadyexists' || 1174 + msg.includes('already exists') || 1175 + msg.includes('already on pds') || 1176 + msg === 'message already on pds.' 1177 + ) 1178 + } 1179 + 1180 + const send = async (body) => { 1181 + console.log('Publish payload', label, body) 1182 + try { 1183 + const res = await fetch('/publish', { 1184 + method: 'POST', 1185 + headers: { 'Content-Type': 'application/json' }, 1186 + body: JSON.stringify(body), 1187 + }) 1188 + let data = {} 1189 + try { 1190 + data = await res.json() 1191 + } catch { 1192 + // ignore parse errors 1193 + } 1194 + if (!res.ok) { 1195 + console.warn('Publish response', label, res.status, res.statusText, data) 1196 + } 1197 + return { res, data, duplicate: isAlreadyExists(res, data) } 1198 + } catch (err) { 1199 + console.error('Publish fetch failed', label, err) 1200 + return { res: null, data: { error: String(err) } } 1201 + } 1202 + } 1203 + const first = await send(payload) 1204 + if (first.duplicate) { 1205 + logEvent('Message already on PDS.', false) 1206 + return first 1207 + } 1208 + if (first.res?.ok) return first 1209 + const reason = first.data?.error || first.res?.statusText || 'rejected' 1210 + logEvent(label + ' rejected: ' + reason, true) 1211 + let second = null 1212 + if (payload?.anmsg) { 1213 + const rebuilt = await buildPayloadFromSig(payload.anmsg) 1214 + if (rebuilt) { 1215 + second = await send(rebuilt) 1216 + if (second.duplicate) { 1217 + logEvent('Message already on PDS.', false) 1218 + return second 1219 + } 1220 + if (second.res?.ok) { 1221 + logEvent(label + ' retry succeeded', false) 1222 + return second 1223 + } 1224 + const reason2 = second.data?.error || second.res?.statusText || 'retry failed' 1225 + logEvent(label + ' retry failed: ' + reason2, true) 1226 + } 1227 + } 1228 + return first 1229 + } 1230 + 1029 1231 const syncLocalToPds = async () => { 1030 1232 if (!loggedIn || syncInFlight) return 1031 1233 const now = Date.now() ··· 1038 1240 const entries = sortEntries((await apds.query()) || []) 1039 1241 const ours = entries.filter((e) => e.author === pub) 1040 1242 for (const msg of ours) { 1041 - if (!msg.text) continue 1042 1243 if (await hasSynced(msg.hash)) continue 1043 - let anhash = '' 1044 - try { 1045 - anhash = await apds.hash(msg.text) 1046 - } catch { 1047 - continue 1048 - } 1244 + const sig = await resolveSignatureForEntry(msg) 1245 + if (!sig) continue 1246 + const payload = await buildPayloadFromSig(sig) 1247 + if (!payload) continue 1049 1248 try { 1050 - const res = await fetch('/publish', { 1051 - method: 'POST', 1052 - headers: { 'Content-Type': 'application/json' }, 1053 - body: JSON.stringify({ anmsg: msg.text, anhash }), 1054 - }) 1055 - const data = await res.json().catch(() => ({})) 1056 - if (!res.ok) { 1249 + const result = await publishWithRetry(payload, 'Sync') 1250 + const { res, data, duplicate } = result || {} 1251 + if (!res?.ok && !duplicate) { 1057 1252 // If already exists, mark synced to avoid replays. 1058 - if (res.status === 409 || String(data?.error || '').toLowerCase().includes('already')) { 1253 + if (res?.status === 409 || String(data?.error || '').toLowerCase().includes('already')) { 1059 1254 await markSynced(msg.hash) 1060 1255 continue 1061 1256 } ··· 1278 1473 return 1279 1474 } 1280 1475 1281 - let payload = {} 1476 + let payload = null 1282 1477 let protocolHash = '' 1283 1478 let publishOk = false 1284 1479 1285 1480 try { 1286 1481 protocolHash = await apds.compose(content) 1287 - const anmsg = await apds.get(protocolHash) 1288 - if (!anmsg) { 1482 + const sig = await apds.get(protocolHash) 1483 + if (!sig) { 1289 1484 logEvent('Could not load signed message', true) 1290 1485 return 1291 1486 } 1292 - payload.anmsg = anmsg 1293 - try { 1294 - const anhash = await apds.hash(anmsg) 1295 - payload.anhash = anhash 1296 - } catch (err) { 1297 - console.warn('Could not compute anhash', err) 1487 + payload = await buildPayloadFromSig(sig) 1488 + if (!payload) { 1489 + logEvent('Could not build publish payload', true) 1490 + return 1298 1491 } 1299 1492 } catch (err) { 1300 1493 logEvent('Signing failed', true) ··· 1303 1496 1304 1497 try { 1305 1498 if (loggedIn) { 1306 - const res = await fetch('/publish', { 1307 - method: 'POST', 1308 - headers: { 'Content-Type': 'application/json' }, 1309 - body: JSON.stringify(payload), 1310 - }) 1311 - const data = await res.json() 1312 - if (!res.ok) { 1499 + const result = await publishWithRetry(payload, 'Publish') 1500 + const { res, data, duplicate } = result || {} 1501 + if (!res?.ok && !duplicate) { 1313 1502 throw new Error(data?.error || 'Publish failed') 1314 1503 } 1315 1504 publishOk = true 1316 - logEvent('Saved with rkey ' + data.rkey, false) 1505 + if (duplicate) { 1506 + logEvent('Message already on PDS.', false) 1507 + } else { 1508 + logEvent('Saved with rkey ' + data.rkey, false) 1509 + } 1317 1510 } else { 1318 1511 logEvent('Signed locally. Login to publish to PDS.', false) 1319 1512 }