Retro Bulletin Board Systems on atproto. Web app and TUI. atbbs.xyz
python tui atproto bbs
at master 672 lines 20 kB view raw
1"""Shared record operations — create, delete, hydrate. 2 3Framework-agnostic. Used by both web and TUI. 4""" 5 6from dataclasses import dataclass 7 8import httpx 9 10from core import lexicon 11from core.constellation import get_replies, get_threads 12from core.filters import filter_moderated 13from core.models import AtUri, AuthError, BBS, Board, MiniDoc, Record, Reply, Thread 14from core.slingshot import get_records_batch, resolve_identities_batch 15from core.util import now_iso 16 17 18def thread_from_record(record: Record, author: MiniDoc) -> Thread: 19 """Construct a Thread from a raw Record and resolved author.""" 20 return Thread( 21 uri=record.uri, 22 board_uri=record.value["board"], 23 title=record.value["title"], 24 body=record.value["body"], 25 created_at=record.value["createdAt"], 26 author=author, 27 updated_at=record.value.get("updatedAt"), 28 attachments=record.value.get("attachments"), 29 ) 30 31 32def reply_from_record(record: Record, author: MiniDoc) -> Reply: 33 """Construct a Reply from a raw Record and resolved author.""" 34 return Reply( 35 uri=record.uri, 36 subject_uri=record.value["subject"], 37 body=record.value["body"], 38 created_at=record.value["createdAt"], 39 author=author, 40 updated_at=record.value.get("updatedAt"), 41 attachments=record.value.get("attachments"), 42 quote=record.value.get("quote"), 43 ) 44 45 46async def hydrate_threads( 47 client: httpx.AsyncClient, 48 bbs: BBS, 49 board: Board, 50 cursor: str | None = None, 51) -> tuple[list[Thread], str | None]: 52 """Fetch and hydrate threads for a board.""" 53 board_uri = str(AtUri(bbs.identity.did, lexicon.BOARD, board.slug)) 54 backlinks = await get_threads(client, board_uri, cursor=cursor) 55 records = await get_records_batch(client, backlinks.records) 56 records = filter_moderated(records, bbs.site.banned_dids, bbs.site.hidden_posts) 57 58 parsed = {r.uri: AtUri.parse(r.uri) for r in records} 59 dids = [p.did for p in parsed.values()] 60 authors = await resolve_identities_batch(client, dids) 61 62 threads = [ 63 thread_from_record(r, authors[parsed[r.uri].did]) 64 for r in records 65 if parsed[r.uri].did in authors 66 ] 67 threads.sort(key=lambda t: t.created_at, reverse=True) 68 return threads, backlinks.cursor 69 70 71@dataclass 72class RepliesPage: 73 """A page of hydrated replies with pagination info.""" 74 75 replies: list[Reply] 76 page: int 77 total_pages: int 78 total_replies: int 79 80 81async def hydrate_replies( 82 client: httpx.AsyncClient, 83 bbs: BBS, 84 thread_uri: str, 85 page: int = 1, 86 page_size: int = 10, 87 focus_reply: str | None = None, 88) -> RepliesPage: 89 """Fetch all reply refs, then hydrate only the requested page (oldest first). 90 91 If focus_reply is provided (an AT URI), automatically jump to the page 92 containing that reply. 93 """ 94 # Fetch all refs (cheap — just did/collection/rkey) 95 backlinks = await get_replies(client, thread_uri, limit=1000) 96 all_refs = list(reversed(backlinks.records)) # oldest first 97 98 total = len(all_refs) 99 total_pages = max(1, (total + page_size - 1) // page_size) 100 101 # If a specific reply is requested, find its page 102 if focus_reply: 103 for i, ref in enumerate(all_refs): 104 if f"at://{ref.did}/{ref.collection}/{ref.rkey}" == focus_reply: 105 page = (i // page_size) + 1 106 break 107 108 page = max(1, min(page, total_pages)) 109 110 # Slice the page we need 111 start = (page - 1) * page_size 112 page_refs = all_refs[start : start + page_size] 113 114 if not page_refs: 115 return RepliesPage( 116 replies=[], page=page, total_pages=total_pages, total_replies=total 117 ) 118 119 # Hydrate only this page 120 records = await get_records_batch(client, page_refs) 121 records = filter_moderated(records, bbs.site.banned_dids, bbs.site.hidden_posts) 122 123 parsed = {r.uri: AtUri.parse(r.uri) for r in records} 124 dids = [p.did for p in parsed.values()] 125 authors = await resolve_identities_batch(client, dids) 126 127 replies = [ 128 reply_from_record(r, authors[parsed[r.uri].did]) 129 for r in records 130 if parsed[r.uri].did in authors 131 ] 132 replies.sort(key=lambda t: t.created_at) 133 return RepliesPage( 134 replies=replies, page=page, total_pages=total_pages, total_replies=total 135 ) 136 137 138async def _try_refresh_token(client, session, session_updater): 139 """Attempt to refresh an expired OAuth token. Updates session in place.""" 140 if not session.get("dpop_private_jwk") or not session.get("refresh_token"): 141 return False 142 try: 143 import json 144 import os 145 146 from core.auth.config import load_secrets 147 from core.auth.oauth import refresh_tokens 148 149 data_dir = os.environ.get("ATBBS_DATA_DIR") 150 if not data_dir: 151 from platformdirs import user_data_dir 152 153 data_dir = user_data_dir("atbbs") 154 secrets = load_secrets(data_dir) 155 client_secret_jwk = json.loads(secrets["client_secret_jwk"]) 156 157 # Use stored client_id — required for token refresh 158 client_id = session.get("client_id") 159 if not client_id: 160 return False 161 162 token_resp, dpop_nonce = await refresh_tokens( 163 client=client, 164 session=session, 165 client_id=client_id, 166 client_secret_jwk=client_secret_jwk, 167 ) 168 169 session["access_token"] = token_resp["access_token"] 170 if "refresh_token" in token_resp: 171 session["refresh_token"] = token_resp["refresh_token"] 172 session["dpop_authserver_nonce"] = dpop_nonce 173 174 async def _noop(*a): 175 pass 176 177 updater = session_updater or _noop 178 await updater(session["did"], "access_token", session["access_token"]) 179 await updater(session["did"], "refresh_token", session["refresh_token"]) 180 await updater(session["did"], "dpop_authserver_nonce", dpop_nonce) 181 return True 182 except Exception: 183 return False 184 185 186async def pds_post( 187 client: httpx.AsyncClient, 188 session: dict, 189 endpoint: str, 190 body: dict, 191 session_updater=None, 192) -> httpx.Response: 193 """POST to a user's PDS, using DPoP if available, Bearer otherwise. Refreshes tokens on 401.""" 194 url = f"{session['pds_url']}/xrpc/{endpoint}" 195 196 if "dpop_private_jwk" in session and session["dpop_private_jwk"]: 197 from core.auth.oauth import pds_request 198 199 async def _noop(*a): 200 pass 201 202 updater = session_updater or _noop 203 resp = await pds_request(client, "POST", url, session, updater, body=body) 204 205 if resp.status_code == 401: 206 if await _try_refresh_token(client, session, session_updater): 207 resp = await pds_request( 208 client, "POST", url, session, updater, body=body 209 ) 210 else: 211 raise AuthError("Session expired. Please log in again.") 212 213 return resp 214 215 resp = await client.post( 216 url, 217 headers={"Authorization": f"Bearer {session['access_token']}"}, 218 json=body, 219 ) 220 return resp 221 222 223async def upload_blob( 224 client: httpx.AsyncClient, 225 session: dict, 226 data: bytes, 227 mime_type: str, 228 session_updater=None, 229) -> dict: 230 """Upload a blob to the user's PDS. Returns the blob ref.""" 231 url = f"{session['pds_url']}/xrpc/com.atproto.repo.uploadBlob" 232 233 if "dpop_private_jwk" in session and session["dpop_private_jwk"]: 234 from core.auth.oauth import pds_request 235 236 async def _noop(*a): 237 pass 238 239 updater = session_updater or _noop 240 resp = await pds_request( 241 client, 242 "POST", 243 url, 244 session, 245 updater, 246 content=data, 247 content_type=mime_type, 248 ) 249 250 if resp.status_code == 401: 251 if await _try_refresh_token(client, session, session_updater): 252 resp = await pds_request( 253 client, 254 "POST", 255 url, 256 session, 257 updater, 258 content=data, 259 content_type=mime_type, 260 ) 261 else: 262 raise AuthError("Session expired. Please log in again.") 263 else: 264 resp = await client.post( 265 url, 266 headers={ 267 "Authorization": f"Bearer {session['access_token']}", 268 "Content-Type": mime_type, 269 }, 270 content=data, 271 ) 272 273 resp.raise_for_status() 274 return resp.json()["blob"] 275 276 277async def create_thread_record( 278 client: httpx.AsyncClient, 279 session: dict, 280 board_uri: str, 281 title: str, 282 body: str, 283 attachments: list[dict] | None = None, 284 session_updater=None, 285) -> httpx.Response: 286 """Create a thread record in the user's repo.""" 287 record = { 288 "$type": lexicon.THREAD, 289 "board": board_uri, 290 "title": title, 291 "body": body, 292 "createdAt": now_iso(), 293 } 294 if attachments: 295 record["attachments"] = attachments 296 return await pds_post( 297 client, 298 session, 299 "com.atproto.repo.createRecord", 300 { 301 "repo": session["did"], 302 "collection": lexicon.THREAD, 303 "record": record, 304 }, 305 session_updater, 306 ) 307 308 309async def create_reply_record( 310 client: httpx.AsyncClient, 311 session: dict, 312 thread_uri: str, 313 body: str, 314 attachments: list[dict] | None = None, 315 quote: str | None = None, 316 session_updater=None, 317) -> httpx.Response: 318 """Create a reply record in the user's repo.""" 319 record = { 320 "$type": lexicon.REPLY, 321 "subject": thread_uri, 322 "body": body, 323 "createdAt": now_iso(), 324 } 325 if attachments: 326 record["attachments"] = attachments 327 if quote: 328 record["quote"] = quote 329 return await pds_post( 330 client, 331 session, 332 "com.atproto.repo.createRecord", 333 { 334 "repo": session["did"], 335 "collection": lexicon.REPLY, 336 "record": record, 337 }, 338 session_updater, 339 ) 340 341 342async def delete_record( 343 client: httpx.AsyncClient, 344 session: dict, 345 collection: str, 346 rkey: str, 347 session_updater=None, 348) -> httpx.Response: 349 """Delete a record from the user's repo.""" 350 resp = await pds_post( 351 client, 352 session, 353 "com.atproto.repo.deleteRecord", 354 { 355 "repo": session["did"], 356 "collection": collection, 357 "rkey": rkey, 358 }, 359 session_updater, 360 ) 361 resp.raise_for_status() 362 return resp 363 364 365async def list_pds_records( 366 client: httpx.AsyncClient, 367 pds_url: str, 368 did: str, 369 collection: str, 370 limit: int = 100, 371) -> list[dict]: 372 """Fetch all records of a collection from a PDS via listRecords.""" 373 records = [] 374 cursor = None 375 while True: 376 params = {"repo": did, "collection": collection, "limit": limit} 377 if cursor: 378 params["cursor"] = cursor 379 resp = await client.get( 380 f"{pds_url}/xrpc/com.atproto.repo.listRecords", params=params 381 ) 382 resp.raise_for_status() 383 data = resp.json() 384 records.extend(data.get("records", [])) 385 cursor = data.get("cursor") 386 if not cursor: 387 break 388 return records 389 390 391async def create_ban_record( 392 client: httpx.AsyncClient, 393 session: dict, 394 banned_did: str, 395 session_updater=None, 396) -> httpx.Response: 397 """Create a ban record in the sysop's repo.""" 398 return await pds_post( 399 client, 400 session, 401 "com.atproto.repo.createRecord", 402 { 403 "repo": session["did"], 404 "collection": lexicon.BAN, 405 "record": { 406 "$type": lexicon.BAN, 407 "did": banned_did, 408 "createdAt": now_iso(), 409 }, 410 }, 411 session_updater, 412 ) 413 414 415async def create_hidden_record( 416 client: httpx.AsyncClient, 417 session: dict, 418 post_uri: str, 419 session_updater=None, 420) -> httpx.Response: 421 """Create a hidden post record in the sysop's repo.""" 422 return await pds_post( 423 client, 424 session, 425 "com.atproto.repo.createRecord", 426 { 427 "repo": session["did"], 428 "collection": lexicon.HIDE, 429 "record": { 430 "$type": lexicon.HIDE, 431 "uri": post_uri, 432 "createdAt": now_iso(), 433 }, 434 }, 435 session_updater, 436 ) 437 438 439async def put_board_record( 440 client: httpx.AsyncClient, 441 session: dict, 442 slug: str, 443 name: str, 444 description: str, 445 created_at: str, 446 session_updater=None, 447) -> httpx.Response: 448 """Create or update a board record in the user's repo.""" 449 return await pds_post( 450 client, 451 session, 452 "com.atproto.repo.putRecord", 453 { 454 "repo": session["did"], 455 "collection": lexicon.BOARD, 456 "rkey": slug, 457 "record": { 458 "$type": lexicon.BOARD, 459 "name": name, 460 "description": description, 461 "createdAt": created_at, 462 }, 463 }, 464 session_updater, 465 ) 466 467 468async def put_site_record( 469 client: httpx.AsyncClient, 470 session: dict, 471 site_value: dict, 472 session_updater=None, 473) -> httpx.Response: 474 """Create or update the site record in the user's repo.""" 475 return await pds_post( 476 client, 477 session, 478 "com.atproto.repo.putRecord", 479 { 480 "repo": session["did"], 481 "collection": lexicon.SITE, 482 "rkey": "self", 483 "record": site_value, 484 }, 485 session_updater, 486 ) 487 488 489async def create_news_record( 490 client: httpx.AsyncClient, 491 session: dict, 492 site_uri: str, 493 title: str, 494 body: str, 495 session_updater=None, 496) -> httpx.Response: 497 """Create a news record in the user's repo.""" 498 return await pds_post( 499 client, 500 session, 501 "com.atproto.repo.createRecord", 502 { 503 "repo": session["did"], 504 "collection": lexicon.NEWS, 505 "record": { 506 "$type": lexicon.NEWS, 507 "site": site_uri, 508 "title": title, 509 "body": body, 510 "createdAt": now_iso(), 511 }, 512 }, 513 session_updater, 514 ) 515 516 517async def fetch_inbox( 518 client: httpx.AsyncClient, 519 did: str, 520 pds_url: str, 521 max_items: int = 50, 522) -> list[dict]: 523 """Fetch inbox: replies to user's threads + quotes of user's replies.""" 524 import asyncio 525 526 from core.constellation import get_backlinks 527 528 SCAN_LIMIT = 20 # how many threads/replies to scan 529 BACKLINK_LIMIT = 25 # backlinks per record 530 MAX_CONCURRENT = 10 # concurrent API calls 531 532 sem = asyncio.Semaphore(MAX_CONCURRENT) 533 534 # Fetch thread and reply lists concurrently 535 async def list_records(collection): 536 try: 537 resp = await client.get( 538 f"{pds_url}/xrpc/com.atproto.repo.listRecords", 539 params={"repo": did, "collection": collection, "limit": SCAN_LIMIT}, 540 ) 541 resp.raise_for_status() 542 return resp.json().get("records", []) 543 except Exception: 544 return [] 545 546 thread_records, reply_records = await asyncio.gather( 547 list_records(lexicon.THREAD), 548 list_records(lexicon.REPLY), 549 ) 550 551 # Batch-resolve BBS handles for all threads at once 552 bbs_dids = set() 553 for tr in thread_records: 554 board_uri = tr["value"].get("board", "") 555 if board_uri: 556 bbs_dids.add(AtUri.parse(board_uri).did) 557 try: 558 bbs_authors = ( 559 await resolve_identities_batch(client, list(bbs_dids)) if bbs_dids else {} 560 ) 561 except Exception: 562 bbs_authors = {} 563 564 # 1. Fetch replies to user's threads (concurrent) 565 async def fetch_thread_replies(tr): 566 async with sem: 567 thread_uri = tr["uri"] 568 thread_title = tr["value"].get("title", "") 569 board_uri = tr["value"].get("board", "") 570 bbs_did = AtUri.parse(board_uri).did if board_uri else did 571 bbs_handle = bbs_authors[bbs_did].handle if bbs_did in bbs_authors else "" 572 573 try: 574 backlinks = await get_replies(client, thread_uri, limit=BACKLINK_LIMIT) 575 records = await get_records_batch(client, backlinks.records) 576 parsed = {r.uri: AtUri.parse(r.uri) for r in records} 577 records = [r for r in records if parsed[r.uri].did != did] 578 if not records: 579 return [] 580 581 dids = [parsed[r.uri].did for r in records] 582 authors = await resolve_identities_batch(client, dids) 583 584 items = [] 585 for r in records: 586 author_did = parsed[r.uri].did 587 if author_did not in authors: 588 continue 589 items.append( 590 { 591 "type": "reply", 592 "reply_uri": r.uri, 593 "thread_title": thread_title, 594 "thread_uri": thread_uri, 595 "handle": authors[author_did].handle, 596 "body": r.value.get("body", "")[:200], 597 "created_at": r.value.get("createdAt", ""), 598 "bbs_handle": bbs_handle, 599 } 600 ) 601 return items 602 except Exception: 603 return [] 604 605 # 2. Fetch quotes of user's replies (concurrent) 606 async def fetch_reply_quotes(rr): 607 async with sem: 608 reply_uri = rr["uri"] 609 thread_uri = rr["value"].get("subject", "") 610 try: 611 backlinks = await get_backlinks( 612 client, 613 subject=reply_uri, 614 source=f"{lexicon.REPLY}:quote", 615 limit=BACKLINK_LIMIT, 616 ) 617 if not backlinks.records: 618 return [] 619 620 records = await get_records_batch(client, backlinks.records) 621 parsed = {r.uri: AtUri.parse(r.uri) for r in records} 622 records = [r for r in records if parsed[r.uri].did != did] 623 if not records: 624 return [] 625 626 dids = [parsed[r.uri].did for r in records] 627 authors = await resolve_identities_batch(client, dids) 628 629 items = [] 630 for r in records: 631 author_did = parsed[r.uri].did 632 if author_did not in authors: 633 continue 634 items.append( 635 { 636 "type": "quote", 637 "reply_uri": r.uri, 638 "thread_title": "", 639 "thread_uri": thread_uri, 640 "handle": authors[author_did].handle, 641 "body": r.value.get("body", "")[:200], 642 "created_at": r.value.get("createdAt", ""), 643 "bbs_handle": "", 644 } 645 ) 646 return items 647 except Exception: 648 return [] 649 650 # Run all lookups concurrently 651 results = await asyncio.gather( 652 *[fetch_thread_replies(tr) for tr in thread_records], 653 *[fetch_reply_quotes(rr) for rr in reply_records], 654 ) 655 656 all_items = [] 657 for items in results: 658 all_items.extend(items) 659 660 # Deduplicate and prefer quotes if same record appears in both 661 seen = {} 662 for item in all_items: 663 key = item["handle"] + item["body"] + item["created_at"] 664 if key in seen: 665 if item["type"] == "quote": 666 seen[key] = item 667 else: 668 seen[key] = item 669 670 deduped = list(seen.values()) 671 deduped.sort(key=lambda a: a["created_at"], reverse=True) 672 return deduped