Retro Bulletin Board Systems on atproto. Web app and TUI.
atbbs.xyz
python
tui
atproto
bbs
1"""Shared record operations — create, delete, hydrate.
2
3Framework-agnostic. Used by both web and TUI.
4"""
5
6from dataclasses import dataclass
7
8import httpx
9
10from core import lexicon
11from core.constellation import get_replies, get_threads
12from core.filters import filter_moderated
13from core.models import AtUri, AuthError, BBS, Board, MiniDoc, Record, Reply, Thread
14from core.slingshot import get_records_batch, resolve_identities_batch
15from core.util import now_iso
16
17
18def thread_from_record(record: Record, author: MiniDoc) -> Thread:
19 """Construct a Thread from a raw Record and resolved author."""
20 return Thread(
21 uri=record.uri,
22 board_uri=record.value["board"],
23 title=record.value["title"],
24 body=record.value["body"],
25 created_at=record.value["createdAt"],
26 author=author,
27 updated_at=record.value.get("updatedAt"),
28 attachments=record.value.get("attachments"),
29 )
30
31
32def reply_from_record(record: Record, author: MiniDoc) -> Reply:
33 """Construct a Reply from a raw Record and resolved author."""
34 return Reply(
35 uri=record.uri,
36 subject_uri=record.value["subject"],
37 body=record.value["body"],
38 created_at=record.value["createdAt"],
39 author=author,
40 updated_at=record.value.get("updatedAt"),
41 attachments=record.value.get("attachments"),
42 quote=record.value.get("quote"),
43 )
44
45
46async def hydrate_threads(
47 client: httpx.AsyncClient,
48 bbs: BBS,
49 board: Board,
50 cursor: str | None = None,
51) -> tuple[list[Thread], str | None]:
52 """Fetch and hydrate threads for a board."""
53 board_uri = str(AtUri(bbs.identity.did, lexicon.BOARD, board.slug))
54 backlinks = await get_threads(client, board_uri, cursor=cursor)
55 records = await get_records_batch(client, backlinks.records)
56 records = filter_moderated(records, bbs.site.banned_dids, bbs.site.hidden_posts)
57
58 parsed = {r.uri: AtUri.parse(r.uri) for r in records}
59 dids = [p.did for p in parsed.values()]
60 authors = await resolve_identities_batch(client, dids)
61
62 threads = [
63 thread_from_record(r, authors[parsed[r.uri].did])
64 for r in records
65 if parsed[r.uri].did in authors
66 ]
67 threads.sort(key=lambda t: t.created_at, reverse=True)
68 return threads, backlinks.cursor
69
70
71@dataclass
72class RepliesPage:
73 """A page of hydrated replies with pagination info."""
74
75 replies: list[Reply]
76 page: int
77 total_pages: int
78 total_replies: int
79
80
81async def hydrate_replies(
82 client: httpx.AsyncClient,
83 bbs: BBS,
84 thread_uri: str,
85 page: int = 1,
86 page_size: int = 10,
87 focus_reply: str | None = None,
88) -> RepliesPage:
89 """Fetch all reply refs, then hydrate only the requested page (oldest first).
90
91 If focus_reply is provided (an AT URI), automatically jump to the page
92 containing that reply.
93 """
94 # Fetch all refs (cheap — just did/collection/rkey)
95 backlinks = await get_replies(client, thread_uri, limit=1000)
96 all_refs = list(reversed(backlinks.records)) # oldest first
97
98 total = len(all_refs)
99 total_pages = max(1, (total + page_size - 1) // page_size)
100
101 # If a specific reply is requested, find its page
102 if focus_reply:
103 for i, ref in enumerate(all_refs):
104 if f"at://{ref.did}/{ref.collection}/{ref.rkey}" == focus_reply:
105 page = (i // page_size) + 1
106 break
107
108 page = max(1, min(page, total_pages))
109
110 # Slice the page we need
111 start = (page - 1) * page_size
112 page_refs = all_refs[start : start + page_size]
113
114 if not page_refs:
115 return RepliesPage(
116 replies=[], page=page, total_pages=total_pages, total_replies=total
117 )
118
119 # Hydrate only this page
120 records = await get_records_batch(client, page_refs)
121 records = filter_moderated(records, bbs.site.banned_dids, bbs.site.hidden_posts)
122
123 parsed = {r.uri: AtUri.parse(r.uri) for r in records}
124 dids = [p.did for p in parsed.values()]
125 authors = await resolve_identities_batch(client, dids)
126
127 replies = [
128 reply_from_record(r, authors[parsed[r.uri].did])
129 for r in records
130 if parsed[r.uri].did in authors
131 ]
132 replies.sort(key=lambda t: t.created_at)
133 return RepliesPage(
134 replies=replies, page=page, total_pages=total_pages, total_replies=total
135 )
136
137
138async def _try_refresh_token(client, session, session_updater):
139 """Attempt to refresh an expired OAuth token. Updates session in place."""
140 if not session.get("dpop_private_jwk") or not session.get("refresh_token"):
141 return False
142 try:
143 import json
144 import os
145
146 from core.auth.config import load_secrets
147 from core.auth.oauth import refresh_tokens
148
149 data_dir = os.environ.get("ATBBS_DATA_DIR")
150 if not data_dir:
151 from platformdirs import user_data_dir
152
153 data_dir = user_data_dir("atbbs")
154 secrets = load_secrets(data_dir)
155 client_secret_jwk = json.loads(secrets["client_secret_jwk"])
156
157 # Use stored client_id — required for token refresh
158 client_id = session.get("client_id")
159 if not client_id:
160 return False
161
162 token_resp, dpop_nonce = await refresh_tokens(
163 client=client,
164 session=session,
165 client_id=client_id,
166 client_secret_jwk=client_secret_jwk,
167 )
168
169 session["access_token"] = token_resp["access_token"]
170 if "refresh_token" in token_resp:
171 session["refresh_token"] = token_resp["refresh_token"]
172 session["dpop_authserver_nonce"] = dpop_nonce
173
174 async def _noop(*a):
175 pass
176
177 updater = session_updater or _noop
178 await updater(session["did"], "access_token", session["access_token"])
179 await updater(session["did"], "refresh_token", session["refresh_token"])
180 await updater(session["did"], "dpop_authserver_nonce", dpop_nonce)
181 return True
182 except Exception:
183 return False
184
185
186async def pds_post(
187 client: httpx.AsyncClient,
188 session: dict,
189 endpoint: str,
190 body: dict,
191 session_updater=None,
192) -> httpx.Response:
193 """POST to a user's PDS, using DPoP if available, Bearer otherwise. Refreshes tokens on 401."""
194 url = f"{session['pds_url']}/xrpc/{endpoint}"
195
196 if "dpop_private_jwk" in session and session["dpop_private_jwk"]:
197 from core.auth.oauth import pds_request
198
199 async def _noop(*a):
200 pass
201
202 updater = session_updater or _noop
203 resp = await pds_request(client, "POST", url, session, updater, body=body)
204
205 if resp.status_code == 401:
206 if await _try_refresh_token(client, session, session_updater):
207 resp = await pds_request(
208 client, "POST", url, session, updater, body=body
209 )
210 else:
211 raise AuthError("Session expired. Please log in again.")
212
213 return resp
214
215 resp = await client.post(
216 url,
217 headers={"Authorization": f"Bearer {session['access_token']}"},
218 json=body,
219 )
220 return resp
221
222
223async def upload_blob(
224 client: httpx.AsyncClient,
225 session: dict,
226 data: bytes,
227 mime_type: str,
228 session_updater=None,
229) -> dict:
230 """Upload a blob to the user's PDS. Returns the blob ref."""
231 url = f"{session['pds_url']}/xrpc/com.atproto.repo.uploadBlob"
232
233 if "dpop_private_jwk" in session and session["dpop_private_jwk"]:
234 from core.auth.oauth import pds_request
235
236 async def _noop(*a):
237 pass
238
239 updater = session_updater or _noop
240 resp = await pds_request(
241 client,
242 "POST",
243 url,
244 session,
245 updater,
246 content=data,
247 content_type=mime_type,
248 )
249
250 if resp.status_code == 401:
251 if await _try_refresh_token(client, session, session_updater):
252 resp = await pds_request(
253 client,
254 "POST",
255 url,
256 session,
257 updater,
258 content=data,
259 content_type=mime_type,
260 )
261 else:
262 raise AuthError("Session expired. Please log in again.")
263 else:
264 resp = await client.post(
265 url,
266 headers={
267 "Authorization": f"Bearer {session['access_token']}",
268 "Content-Type": mime_type,
269 },
270 content=data,
271 )
272
273 resp.raise_for_status()
274 return resp.json()["blob"]
275
276
277async def create_thread_record(
278 client: httpx.AsyncClient,
279 session: dict,
280 board_uri: str,
281 title: str,
282 body: str,
283 attachments: list[dict] | None = None,
284 session_updater=None,
285) -> httpx.Response:
286 """Create a thread record in the user's repo."""
287 record = {
288 "$type": lexicon.THREAD,
289 "board": board_uri,
290 "title": title,
291 "body": body,
292 "createdAt": now_iso(),
293 }
294 if attachments:
295 record["attachments"] = attachments
296 return await pds_post(
297 client,
298 session,
299 "com.atproto.repo.createRecord",
300 {
301 "repo": session["did"],
302 "collection": lexicon.THREAD,
303 "record": record,
304 },
305 session_updater,
306 )
307
308
309async def create_reply_record(
310 client: httpx.AsyncClient,
311 session: dict,
312 thread_uri: str,
313 body: str,
314 attachments: list[dict] | None = None,
315 quote: str | None = None,
316 session_updater=None,
317) -> httpx.Response:
318 """Create a reply record in the user's repo."""
319 record = {
320 "$type": lexicon.REPLY,
321 "subject": thread_uri,
322 "body": body,
323 "createdAt": now_iso(),
324 }
325 if attachments:
326 record["attachments"] = attachments
327 if quote:
328 record["quote"] = quote
329 return await pds_post(
330 client,
331 session,
332 "com.atproto.repo.createRecord",
333 {
334 "repo": session["did"],
335 "collection": lexicon.REPLY,
336 "record": record,
337 },
338 session_updater,
339 )
340
341
342async def delete_record(
343 client: httpx.AsyncClient,
344 session: dict,
345 collection: str,
346 rkey: str,
347 session_updater=None,
348) -> httpx.Response:
349 """Delete a record from the user's repo."""
350 resp = await pds_post(
351 client,
352 session,
353 "com.atproto.repo.deleteRecord",
354 {
355 "repo": session["did"],
356 "collection": collection,
357 "rkey": rkey,
358 },
359 session_updater,
360 )
361 resp.raise_for_status()
362 return resp
363
364
365async def list_pds_records(
366 client: httpx.AsyncClient,
367 pds_url: str,
368 did: str,
369 collection: str,
370 limit: int = 100,
371) -> list[dict]:
372 """Fetch all records of a collection from a PDS via listRecords."""
373 records = []
374 cursor = None
375 while True:
376 params = {"repo": did, "collection": collection, "limit": limit}
377 if cursor:
378 params["cursor"] = cursor
379 resp = await client.get(
380 f"{pds_url}/xrpc/com.atproto.repo.listRecords", params=params
381 )
382 resp.raise_for_status()
383 data = resp.json()
384 records.extend(data.get("records", []))
385 cursor = data.get("cursor")
386 if not cursor:
387 break
388 return records
389
390
391async def create_ban_record(
392 client: httpx.AsyncClient,
393 session: dict,
394 banned_did: str,
395 session_updater=None,
396) -> httpx.Response:
397 """Create a ban record in the sysop's repo."""
398 return await pds_post(
399 client,
400 session,
401 "com.atproto.repo.createRecord",
402 {
403 "repo": session["did"],
404 "collection": lexicon.BAN,
405 "record": {
406 "$type": lexicon.BAN,
407 "did": banned_did,
408 "createdAt": now_iso(),
409 },
410 },
411 session_updater,
412 )
413
414
415async def create_hidden_record(
416 client: httpx.AsyncClient,
417 session: dict,
418 post_uri: str,
419 session_updater=None,
420) -> httpx.Response:
421 """Create a hidden post record in the sysop's repo."""
422 return await pds_post(
423 client,
424 session,
425 "com.atproto.repo.createRecord",
426 {
427 "repo": session["did"],
428 "collection": lexicon.HIDE,
429 "record": {
430 "$type": lexicon.HIDE,
431 "uri": post_uri,
432 "createdAt": now_iso(),
433 },
434 },
435 session_updater,
436 )
437
438
439async def put_board_record(
440 client: httpx.AsyncClient,
441 session: dict,
442 slug: str,
443 name: str,
444 description: str,
445 created_at: str,
446 session_updater=None,
447) -> httpx.Response:
448 """Create or update a board record in the user's repo."""
449 return await pds_post(
450 client,
451 session,
452 "com.atproto.repo.putRecord",
453 {
454 "repo": session["did"],
455 "collection": lexicon.BOARD,
456 "rkey": slug,
457 "record": {
458 "$type": lexicon.BOARD,
459 "name": name,
460 "description": description,
461 "createdAt": created_at,
462 },
463 },
464 session_updater,
465 )
466
467
468async def put_site_record(
469 client: httpx.AsyncClient,
470 session: dict,
471 site_value: dict,
472 session_updater=None,
473) -> httpx.Response:
474 """Create or update the site record in the user's repo."""
475 return await pds_post(
476 client,
477 session,
478 "com.atproto.repo.putRecord",
479 {
480 "repo": session["did"],
481 "collection": lexicon.SITE,
482 "rkey": "self",
483 "record": site_value,
484 },
485 session_updater,
486 )
487
488
489async def create_news_record(
490 client: httpx.AsyncClient,
491 session: dict,
492 site_uri: str,
493 title: str,
494 body: str,
495 session_updater=None,
496) -> httpx.Response:
497 """Create a news record in the user's repo."""
498 return await pds_post(
499 client,
500 session,
501 "com.atproto.repo.createRecord",
502 {
503 "repo": session["did"],
504 "collection": lexicon.NEWS,
505 "record": {
506 "$type": lexicon.NEWS,
507 "site": site_uri,
508 "title": title,
509 "body": body,
510 "createdAt": now_iso(),
511 },
512 },
513 session_updater,
514 )
515
516
517async def fetch_inbox(
518 client: httpx.AsyncClient,
519 did: str,
520 pds_url: str,
521 max_items: int = 50,
522) -> list[dict]:
523 """Fetch inbox: replies to user's threads + quotes of user's replies."""
524 import asyncio
525
526 from core.constellation import get_backlinks
527
528 SCAN_LIMIT = 20 # how many threads/replies to scan
529 BACKLINK_LIMIT = 25 # backlinks per record
530 MAX_CONCURRENT = 10 # concurrent API calls
531
532 sem = asyncio.Semaphore(MAX_CONCURRENT)
533
534 # Fetch thread and reply lists concurrently
535 async def list_records(collection):
536 try:
537 resp = await client.get(
538 f"{pds_url}/xrpc/com.atproto.repo.listRecords",
539 params={"repo": did, "collection": collection, "limit": SCAN_LIMIT},
540 )
541 resp.raise_for_status()
542 return resp.json().get("records", [])
543 except Exception:
544 return []
545
546 thread_records, reply_records = await asyncio.gather(
547 list_records(lexicon.THREAD),
548 list_records(lexicon.REPLY),
549 )
550
551 # Batch-resolve BBS handles for all threads at once
552 bbs_dids = set()
553 for tr in thread_records:
554 board_uri = tr["value"].get("board", "")
555 if board_uri:
556 bbs_dids.add(AtUri.parse(board_uri).did)
557 try:
558 bbs_authors = (
559 await resolve_identities_batch(client, list(bbs_dids)) if bbs_dids else {}
560 )
561 except Exception:
562 bbs_authors = {}
563
564 # 1. Fetch replies to user's threads (concurrent)
565 async def fetch_thread_replies(tr):
566 async with sem:
567 thread_uri = tr["uri"]
568 thread_title = tr["value"].get("title", "")
569 board_uri = tr["value"].get("board", "")
570 bbs_did = AtUri.parse(board_uri).did if board_uri else did
571 bbs_handle = bbs_authors[bbs_did].handle if bbs_did in bbs_authors else ""
572
573 try:
574 backlinks = await get_replies(client, thread_uri, limit=BACKLINK_LIMIT)
575 records = await get_records_batch(client, backlinks.records)
576 parsed = {r.uri: AtUri.parse(r.uri) for r in records}
577 records = [r for r in records if parsed[r.uri].did != did]
578 if not records:
579 return []
580
581 dids = [parsed[r.uri].did for r in records]
582 authors = await resolve_identities_batch(client, dids)
583
584 items = []
585 for r in records:
586 author_did = parsed[r.uri].did
587 if author_did not in authors:
588 continue
589 items.append(
590 {
591 "type": "reply",
592 "reply_uri": r.uri,
593 "thread_title": thread_title,
594 "thread_uri": thread_uri,
595 "handle": authors[author_did].handle,
596 "body": r.value.get("body", "")[:200],
597 "created_at": r.value.get("createdAt", ""),
598 "bbs_handle": bbs_handle,
599 }
600 )
601 return items
602 except Exception:
603 return []
604
605 # 2. Fetch quotes of user's replies (concurrent)
606 async def fetch_reply_quotes(rr):
607 async with sem:
608 reply_uri = rr["uri"]
609 thread_uri = rr["value"].get("subject", "")
610 try:
611 backlinks = await get_backlinks(
612 client,
613 subject=reply_uri,
614 source=f"{lexicon.REPLY}:quote",
615 limit=BACKLINK_LIMIT,
616 )
617 if not backlinks.records:
618 return []
619
620 records = await get_records_batch(client, backlinks.records)
621 parsed = {r.uri: AtUri.parse(r.uri) for r in records}
622 records = [r for r in records if parsed[r.uri].did != did]
623 if not records:
624 return []
625
626 dids = [parsed[r.uri].did for r in records]
627 authors = await resolve_identities_batch(client, dids)
628
629 items = []
630 for r in records:
631 author_did = parsed[r.uri].did
632 if author_did not in authors:
633 continue
634 items.append(
635 {
636 "type": "quote",
637 "reply_uri": r.uri,
638 "thread_title": "",
639 "thread_uri": thread_uri,
640 "handle": authors[author_did].handle,
641 "body": r.value.get("body", "")[:200],
642 "created_at": r.value.get("createdAt", ""),
643 "bbs_handle": "",
644 }
645 )
646 return items
647 except Exception:
648 return []
649
650 # Run all lookups concurrently
651 results = await asyncio.gather(
652 *[fetch_thread_replies(tr) for tr in thread_records],
653 *[fetch_reply_quotes(rr) for rr in reply_records],
654 )
655
656 all_items = []
657 for items in results:
658 all_items.extend(items)
659
660 # Deduplicate and prefer quotes if same record appears in both
661 seen = {}
662 for item in all_items:
663 key = item["handle"] + item["body"] + item["created_at"]
664 if key in seen:
665 if item["type"] == "quote":
666 seen[key] = item
667 else:
668 seen[key] = item
669
670 deduped = list(seen.values())
671 deduped.sort(key=lambda a: a["created_at"], reverse=True)
672 return deduped