The code for my Bluesky feed generator
1import logging
2from collections import defaultdict
3
4from atproto import (
5 CAR,
6 AtUri,
7 FirehoseSubscribeReposClient,
8 firehose_models,
9 models,
10 parse_subscribe_repos_message,
11)
12from atproto.exceptions import FirehoseError
13
14from server import RELEVANT_RECORDS
15from server.database import FirehoseSubscriptionState
16from server.logger import logger
17
18
19def _get_ops_by_type(commit: models.ComAtprotoSyncSubscribeRepos.Commit) -> defaultdict:
20 """
21 Returns a dictionary of operations by type.
22
23 The keys are the record types, and the values are dictionaries with two keys:
24 - 'created': a list of dictionaries with the following keys:
25 - 'record': the record
26 - 'uri': the URI of the record
27 - 'cid': the CID of the record
28 - 'author': the author of the record
29 - 'deleted': a list of dictionaries with the following keys:
30 - 'uri': the URI of the record
31
32 Args:
33 commit: The commit object.
34
35 Returns:
36 A dictionary of operations by type.
37 """
38
39 operation_by_type = defaultdict(lambda: {"created": [], "deleted": []})
40
41 car = CAR.from_bytes(commit.blocks)
42 for op in commit.ops:
43 if op.action == "update":
44 # we are not interested in updates
45 continue
46
47 at_uri = AtUri.from_str(f"at://{commit.repo}/{op.path}")
48
49 if op.action == "create":
50 if not op.cid:
51 continue
52
53 create_info = {
54 "uri": str(at_uri),
55 "cid": str(op.cid),
56 "author": commit.repo,
57 }
58
59 record_raw_data = car.blocks.get(op.cid)
60 if not record_raw_data:
61 continue
62
63 record = models.get_or_create(record_raw_data, strict=False)
64 if record is None: # unknown record (out of bsky lexicon)
65 continue
66
67 for record_type, record_nsid in RELEVANT_RECORDS.items():
68 if at_uri.collection == record_nsid and models.is_record_type(
69 record, record_type
70 ):
71 operation_by_type[record_nsid]["created"].append(
72 {"record": record, **create_info}
73 )
74 break
75
76 if op.action == "delete":
77 operation_by_type[at_uri.collection]["deleted"].append({"uri": str(at_uri)})
78
79 return operation_by_type
80
81
82def run(name, operations_callback, stream_stop_event=None):
83 while stream_stop_event is None or not stream_stop_event.is_set():
84 try:
85 _run(name, operations_callback, stream_stop_event)
86 except FirehoseError as e:
87 if logger.level == logging.DEBUG:
88 raise e
89 logger.error(f"Firehose error: {e}. Reconnecting to the firehose.")
90
91
92def _run(name, operations_callback, stream_stop_event=None):
93 state = FirehoseSubscriptionState.get_or_none(
94 FirehoseSubscriptionState.service == name
95 )
96
97 params = None
98 if state:
99 params = models.ComAtprotoSyncSubscribeRepos.Params(cursor=state.cursor)
100
101 client = FirehoseSubscribeReposClient(params)
102
103 if not state:
104 FirehoseSubscriptionState.create(service=name, cursor=0)
105
106 def on_message_handler(message: firehose_models.MessageFrame) -> None:
107 # stop on next message if requested
108 if stream_stop_event and stream_stop_event.is_set():
109 client.stop()
110 return
111
112 commit = parse_subscribe_repos_message(message)
113 if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
114 return
115
116 # update stored state every ~1k events
117 if commit.seq % 1000 == 0: # lower value could lead to performance issues
118 logger.debug(f"Updated cursor for {name} to {commit.seq}")
119 client.update_params(
120 models.ComAtprotoSyncSubscribeRepos.Params(cursor=commit.seq)
121 )
122 FirehoseSubscriptionState.update(cursor=commit.seq).where(
123 FirehoseSubscriptionState.service == name
124 ).execute()
125
126 if not commit.blocks:
127 return
128
129 operations_callback(_get_ops_by_type(commit))
130
131 client.start(on_message_handler)