+7
feed_manager.py
+7
feed_manager.py
···
45
45
for feed in self.feeds.values():
46
46
feed.commit_changes()
47
47
48
+
def stop_all(self):
49
+
for feed in self.feeds.values():
50
+
try:
51
+
feed.stop_db_worker()
52
+
except AttributeError:
53
+
pass
54
+
48
55
feed_manager = FeedManager()
49
56
feed_manager.register(RapidFireFeed)
50
57
feed_manager.register(PopularFeed)
+26
-1
feedgen.py
+26
-1
feedgen.py
···
4
4
from io import BytesIO
5
5
import json
6
6
import logging
7
+
import signal
7
8
8
9
from atproto import CAR
9
10
import dag_cbor
···
18
19
logging.getLogger('').setLevel(logging.WARNING)
19
20
logging.getLogger('feeds').setLevel(logging.DEBUG)
20
21
logging.getLogger('firehose').setLevel(logging.DEBUG)
22
+
logging.getLogger('feedgen').setLevel(logging.DEBUG)
23
+
24
+
logger = logging.getLogger('feedgen')
21
25
22
26
async def firehose_events():
23
27
relay_url = 'ws://localhost:6008/subscribe'
···
39
43
if event_count % 2500 == 0:
40
44
feed_manager.commit_changes()
41
45
46
+
async def shutdown(signal, loop):
47
+
logger.info(f'received exit signal {signal.name}')
48
+
feed_manager.stop_all()
49
+
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
50
+
[task.cancel() for task in tasks]
51
+
logger.info(f'cancelling {len(tasks)} outstanding tasks')
52
+
await asyncio.gather(*tasks, return_exceptions=True)
53
+
loop.stop()
54
+
42
55
if __name__ == '__main__':
43
-
asyncio.run(main())
56
+
loop = asyncio.get_event_loop()
57
+
catch_signals = (signal.SIGTERM, signal.SIGINT)
58
+
for sig in catch_signals:
59
+
loop.add_signal_handler(
60
+
sig,
61
+
lambda s=sig: asyncio.create_task(shutdown(s, loop))
62
+
)
63
+
64
+
try:
65
+
loop.create_task(main())
66
+
loop.run_forever()
67
+
finally:
68
+
loop.close()
+11
-4
feeds/mostliked.py
+11
-4
feeds/mostliked.py
···
24
24
self.changes = 0
25
25
26
26
def run(self):
27
-
while not self.stop_signal:
27
+
while True:
28
28
task = self.task_queue.get(block=True)
29
29
if task == 'STOP':
30
-
self.stop_signal = True
30
+
self.logger.debug('received STOP, breaking now')
31
+
break
31
32
elif task == 'COMMIT':
32
33
self.logger.debug(f'committing {self.changes} changes')
33
34
if self.db_cnx.in_transaction:
···
43
44
self.db_cnx.execute(sql, bindings)
44
45
self.changes += self.db_cnx.changes()
45
46
self.task_queue.task_done()
47
+
48
+
self.logger.debug('closing database connection')
46
49
self.db_cnx.close()
47
50
48
51
def stop(self):
···
79
82
self.drafts = ExpiringDict(max_len=50_000, max_age_seconds=5*60)
80
83
81
84
self.db_writes = queue.Queue()
82
-
db_worker = DatabaseWorker('mostliked', 'db/mostliked.db', self.db_writes)
83
-
db_worker.start()
85
+
self.db_worker = DatabaseWorker('mostliked', 'db/mostliked.db', self.db_writes)
86
+
self.db_worker.start()
87
+
88
+
def stop_db_worker(self):
89
+
self.logger.debug('sending STOP')
90
+
self.db_writes.put('STOP')
84
91
85
92
def process_commit(self, commit):
86
93
if commit['opType'] != 'c':