The code for my Bluesky feed generator
at main 4.2 kB view raw
1import logging 2from collections import defaultdict 3 4from atproto import ( 5 CAR, 6 AtUri, 7 FirehoseSubscribeReposClient, 8 firehose_models, 9 models, 10 parse_subscribe_repos_message, 11) 12from atproto.exceptions import FirehoseError 13 14from server import RELEVANT_RECORDS 15from server.database import FirehoseSubscriptionState 16from server.logger import logger 17 18 19def _get_ops_by_type(commit: models.ComAtprotoSyncSubscribeRepos.Commit) -> defaultdict: 20 """ 21 Returns a dictionary of operations by type. 22 23 The keys are the record types, and the values are dictionaries with two keys: 24 - 'created': a list of dictionaries with the following keys: 25 - 'record': the record 26 - 'uri': the URI of the record 27 - 'cid': the CID of the record 28 - 'author': the author of the record 29 - 'deleted': a list of dictionaries with the following keys: 30 - 'uri': the URI of the record 31 32 Args: 33 commit: The commit object. 34 35 Returns: 36 A dictionary of operations by type. 37 """ 38 39 operation_by_type = defaultdict(lambda: {"created": [], "deleted": []}) 40 41 car = CAR.from_bytes(commit.blocks) 42 for op in commit.ops: 43 if op.action == "update": 44 # we are not interested in updates 45 continue 46 47 at_uri = AtUri.from_str(f"at://{commit.repo}/{op.path}") 48 49 if op.action == "create": 50 if not op.cid: 51 continue 52 53 create_info = { 54 "uri": str(at_uri), 55 "cid": str(op.cid), 56 "author": commit.repo, 57 } 58 59 record_raw_data = car.blocks.get(op.cid) 60 if not record_raw_data: 61 continue 62 63 record = models.get_or_create(record_raw_data, strict=False) 64 if record is None: # unknown record (out of bsky lexicon) 65 continue 66 67 for record_type, record_nsid in RELEVANT_RECORDS.items(): 68 if at_uri.collection == record_nsid and models.is_record_type( 69 record, record_type 70 ): 71 operation_by_type[record_nsid]["created"].append( 72 {"record": record, **create_info} 73 ) 74 break 75 76 if op.action == "delete": 77 operation_by_type[at_uri.collection]["deleted"].append({"uri": str(at_uri)}) 78 79 return operation_by_type 80 81 82def run(name, operations_callback, stream_stop_event=None): 83 while stream_stop_event is None or not stream_stop_event.is_set(): 84 try: 85 _run(name, operations_callback, stream_stop_event) 86 except FirehoseError as e: 87 if logger.level == logging.DEBUG: 88 raise e 89 logger.error(f"Firehose error: {e}. Reconnecting to the firehose.") 90 91 92def _run(name, operations_callback, stream_stop_event=None): 93 state = FirehoseSubscriptionState.get_or_none( 94 FirehoseSubscriptionState.service == name 95 ) 96 97 params = None 98 if state: 99 params = models.ComAtprotoSyncSubscribeRepos.Params(cursor=state.cursor) 100 101 client = FirehoseSubscribeReposClient(params) 102 103 if not state: 104 FirehoseSubscriptionState.create(service=name, cursor=0) 105 106 def on_message_handler(message: firehose_models.MessageFrame) -> None: 107 # stop on next message if requested 108 if stream_stop_event and stream_stop_event.is_set(): 109 client.stop() 110 return 111 112 commit = parse_subscribe_repos_message(message) 113 if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit): 114 return 115 116 # update stored state every ~1k events 117 if commit.seq % 1000 == 0: # lower value could lead to performance issues 118 logger.debug(f"Updated cursor for {name} to {commit.seq}") 119 client.update_params( 120 models.ComAtprotoSyncSubscribeRepos.Params(cursor=commit.seq) 121 ) 122 FirehoseSubscriptionState.update(cursor=commit.seq).where( 123 FirehoseSubscriptionState.service == name 124 ).execute() 125 126 if not commit.blocks: 127 return 128 129 operations_callback(_get_ops_by_type(commit)) 130 131 client.start(on_message_handler)