+12
-15
bsky-activity.py
+12
-15
bsky-activity.py
···
2
3
import asyncio
4
from datetime import datetime, timezone
5
-
from io import BytesIO
6
import json
7
import os
8
import sqlite3
9
import sys
10
11
-
from atproto import CAR
12
import redis
13
-
import dag_cbor
14
import websockets
15
16
app_bsky_allowlist = set([
···
39
40
async with websockets.connect(relay_url, ping_timeout=60) as firehose:
41
while True:
42
-
payload = BytesIO(await firehose.recv())
43
-
44
-
yield json.load(payload)
45
46
async def main():
47
redis_cnx = redis.Redis()
···
66
sys.stdout.flush()
67
68
op_count = 0
69
-
async for payload in bsky_activity():
70
-
if payload['opType'] != 'c':
71
continue
72
73
collection = payload['collection']
74
if collection not in app_bsky_allowlist:
75
continue
76
77
-
repo_did = payload['did']
78
repo_update_time = datetime.now(timezone.utc)
79
db_cnx.execute(
80
'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts',
···
93
94
op_count += 1
95
if op_count % 500 == 0:
96
-
now = datetime.now(timezone.utc)
97
-
payload_seq = payload['seq']
98
-
payload_lag = now - repo_update_time
99
-
100
-
sys.stdout.write(f'seq: {payload_seq}, lag: {payload_lag.total_seconds()}\n')
101
-
redis_pipe.set('dev.edavis.muninsky.seq', payload_seq)
102
redis_pipe.execute()
103
db_cnx.commit()
104
sys.stdout.flush()
···
2
3
import asyncio
4
from datetime import datetime, timezone
5
import json
6
import os
7
import sqlite3
8
import sys
9
10
import redis
11
import websockets
12
13
app_bsky_allowlist = set([
···
36
37
async with websockets.connect(relay_url, ping_timeout=60) as firehose:
38
while True:
39
+
yield json.loads(await firehose.recv())
40
41
async def main():
42
redis_cnx = redis.Redis()
···
61
sys.stdout.flush()
62
63
op_count = 0
64
+
async for event in bsky_activity():
65
+
if event['type'] != 'com':
66
+
continue
67
+
68
+
payload = event['commit']
69
+
if payload['type'] != 'c':
70
continue
71
72
collection = payload['collection']
73
if collection not in app_bsky_allowlist:
74
continue
75
76
+
repo_did = event['did']
77
repo_update_time = datetime.now(timezone.utc)
78
db_cnx.execute(
79
'insert into users values (:did, :ts) on conflict (did) do update set ts = :ts',
···
92
93
op_count += 1
94
if op_count % 500 == 0:
95
+
current_time_ms = datetime.now(timezone.utc).timestamp()
96
+
event_time_ms = event['time_us'] / 1_000_000
97
+
current_lag = current_time_ms - event_time_ms
98
+
sys.stdout.write(f'lag: {current_lag:.2f}\n')
99
redis_pipe.execute()
100
db_cnx.commit()
101
sys.stdout.flush()