-53
feed_manager.py
-53
feed_manager.py
···
1
-
from fnmatch import fnmatchcase
2
-
3
-
from feeds.battle import BattleFeed
4
-
from feeds.rapidfire import RapidFireFeed
5
-
from feeds.homeruns import HomeRunsTeamFeed
6
-
from feeds.norazone_interesting import NoraZoneInteresting
7
-
from feeds.sevendirtywords import SevenDirtyWordsFeed
8
-
from feeds.ratio import RatioFeed
9
-
from feeds.outlinetags import OutlineTagsFeed
10
-
from feeds.popqp import PopularQuotePostsFeed
11
-
12
-
class FeedManager:
13
-
def __init__(self):
14
-
self.feeds = {}
15
-
16
-
def register(self, feed):
17
-
self.feeds[feed.FEED_URI] = feed()
18
-
19
-
def process_commit(self, commit):
20
-
for feed in self.feeds.values():
21
-
feed.process_commit(commit)
22
-
23
-
def serve_feed(self, feed_uri, limit, offset, langs, debug=False):
24
-
for pattern, feed in self.feeds.items():
25
-
if fnmatchcase(feed_uri, pattern):
26
-
break
27
-
else:
28
-
raise Exception('no matching feed pattern found')
29
-
30
-
if '*' in pattern and debug:
31
-
return feed.serve_wildcard_feed_debug(feed_uri, limit, offset, langs)
32
-
33
-
elif '*' in pattern and not debug:
34
-
return feed.serve_wildcard_feed(feed_uri, limit, offset, langs)
35
-
36
-
elif '*' not in pattern and debug:
37
-
return feed.serve_feed_debug(limit, offset, langs)
38
-
39
-
elif '*' not in pattern and not debug:
40
-
return feed.serve_feed(limit, offset, langs)
41
-
42
-
def commit_changes(self):
43
-
for feed in self.feeds.values():
44
-
feed.commit_changes()
45
-
46
-
def stop_all(self):
47
-
for feed in self.feeds.values():
48
-
try:
49
-
feed.stop_db_worker()
50
-
except AttributeError:
51
-
pass
52
-
53
-
feed_manager = FeedManager()
-76
feedgen.py
-76
feedgen.py
···
1
-
#!/usr/bin/env python3
2
-
3
-
import asyncio
4
-
from io import BytesIO
5
-
import json
6
-
import logging
7
-
import signal
8
-
9
-
from atproto import CAR
10
-
import dag_cbor
11
-
import websockets
12
-
13
-
from feed_manager import feed_manager
14
-
15
-
logging.basicConfig(
16
-
format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
17
-
level=logging.DEBUG
18
-
)
19
-
logging.getLogger('').setLevel(logging.WARNING)
20
-
logging.getLogger('feeds').setLevel(logging.DEBUG)
21
-
logging.getLogger('firehose').setLevel(logging.DEBUG)
22
-
logging.getLogger('feedgen').setLevel(logging.DEBUG)
23
-
24
-
logger = logging.getLogger('feedgen')
25
-
26
-
async def firehose_events():
27
-
relay_url = 'ws://localhost:6008/subscribe'
28
-
29
-
logger = logging.getLogger('feeds.events')
30
-
logger.info(f'opening websocket connection to {relay_url}')
31
-
32
-
async with websockets.connect(relay_url, ping_timeout=60) as firehose:
33
-
while True:
34
-
payload = BytesIO(await firehose.recv())
35
-
yield json.load(payload)
36
-
37
-
async def main():
38
-
event_count = 0
39
-
40
-
async for commit in firehose_events():
41
-
feed_manager.process_commit(commit)
42
-
event_count += 1
43
-
if event_count % 2500 == 0:
44
-
feed_manager.commit_changes()
45
-
46
-
def handle_exception(loop, context):
47
-
msg = context.get("exception", context["message"])
48
-
logger.error(f"Caught exception: {msg}")
49
-
logger.info("Shutting down...")
50
-
asyncio.create_task(shutdown(loop))
51
-
52
-
async def shutdown(loop, signal=None):
53
-
if signal:
54
-
logger.info(f'received exit signal {signal.name}')
55
-
feed_manager.stop_all()
56
-
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
57
-
[task.cancel() for task in tasks]
58
-
logger.info(f'cancelling {len(tasks)} outstanding tasks')
59
-
await asyncio.gather(*tasks, return_exceptions=True)
60
-
loop.stop()
61
-
62
-
if __name__ == '__main__':
63
-
loop = asyncio.get_event_loop()
64
-
catch_signals = (signal.SIGTERM, signal.SIGINT)
65
-
for sig in catch_signals:
66
-
loop.add_signal_handler(
67
-
sig,
68
-
lambda s=sig: asyncio.create_task(shutdown(loop, signal=s))
69
-
)
70
-
loop.set_exception_handler(handle_exception)
71
-
72
-
try:
73
-
loop.create_task(main())
74
-
loop.run_forever()
75
-
finally:
76
-
loop.close()
-65
feeds/__init__.py
-65
feeds/__init__.py
···
1
-
from datetime import datetime, timezone, timedelta
2
-
3
-
class BaseFeed:
4
-
def process_commit(self, commit):
5
-
raise NotImplementedError
6
-
7
-
def serve_feed(self, limit, offset, langs):
8
-
raise NotImplementedError
9
-
10
-
def serve_wildcard_feed(self, feed_uri, limit, offset, langs):
11
-
raise NotImplementedError
12
-
13
-
def commit_changes(self):
14
-
raise NotImplementedError
15
-
16
-
def parse_timestamp(self, timestamp):
17
-
# https://atproto.com/specs/lexicon#datetime
18
-
formats = {
19
-
# preferred
20
-
'1985-04-12T23:20:50.123Z': '%Y-%m-%dT%H:%M:%S.%f%z',
21
-
# '1985-04-12T23:20:50.123456Z': '%Y-%m-%dT%H:%M:%S.%f%z',
22
-
# '1985-04-12T23:20:50.120Z': '%Y-%m-%dT%H:%M:%S.%f%z',
23
-
# '1985-04-12T23:20:50.120000Z': '%Y-%m-%dT%H:%M:%S.%f%z',
24
-
25
-
# supported
26
-
# '1985-04-12T23:20:50.12345678912345Z': '',
27
-
'1985-04-12T23:20:50Z': '%Y-%m-%dT%H:%M:%S%z',
28
-
# '1985-04-12T23:20:50.0Z': '%Y-%m-%dT%H:%M:%S.%f%z',
29
-
# '1985-04-12T23:20:50.123+00:00': '%Y-%m-%dT%H:%M:%S.%f%z',
30
-
# '1985-04-12T23:20:50.123-07:00': '%Y-%m-%dT%H:%M:%S.%f%z',
31
-
}
32
-
33
-
for format in formats.values():
34
-
try:
35
-
ts = datetime.strptime(timestamp, format)
36
-
except ValueError:
37
-
continue
38
-
else:
39
-
return ts
40
-
41
-
return datetime.now(timezone.utc)
42
-
43
-
def safe_timestamp(self, timestamp):
44
-
utc_now = datetime.now(timezone.utc)
45
-
if timestamp is None:
46
-
return utc_now
47
-
48
-
parsed = self.parse_timestamp(timestamp)
49
-
if parsed.timestamp() <= 0:
50
-
return utc_now
51
-
elif parsed - timedelta(minutes=2) < utc_now:
52
-
return parsed
53
-
elif parsed > utc_now:
54
-
return utc_now
55
-
56
-
def transaction_begin(self, db):
57
-
if not db.in_transaction:
58
-
db.execute('BEGIN')
59
-
60
-
def transaction_commit(self, db):
61
-
if db.in_transaction:
62
-
db.execute('COMMIT')
63
-
64
-
def wal_checkpoint(self, db, mode='PASSIVE'):
65
-
return db.execute(f'PRAGMA wal_checkpoint({mode})')
-100
feeds/battle.py
-100
feeds/battle.py
···
1
-
import logging
2
-
3
-
import apsw
4
-
import apsw.ext
5
-
import grapheme
6
-
7
-
from . import BaseFeed
8
-
9
-
class BattleFeed(BaseFeed):
10
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/battle'
11
-
12
-
def __init__(self):
13
-
self.db_cnx = apsw.Connection('db/battle.db')
14
-
self.db_cnx.pragma('journal_mode', 'WAL')
15
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
16
-
17
-
with self.db_cnx:
18
-
self.db_cnx.execute("""
19
-
create table if not exists posts (
20
-
uri text,
21
-
grapheme_length integer,
22
-
create_ts timestamp,
23
-
lang text
24
-
);
25
-
create unique index if not exists ll_idx on posts(grapheme_length, lang);
26
-
""")
27
-
28
-
self.logger = logging.getLogger('feeds.battle')
29
-
30
-
def process_commit(self, commit):
31
-
if commit['opType'] != 'c':
32
-
return
33
-
34
-
if commit['collection'] != 'app.bsky.feed.post':
35
-
return
36
-
37
-
record = commit.get('record')
38
-
if record is None:
39
-
return
40
-
41
-
repo = commit['did']
42
-
rkey = commit['rkey']
43
-
post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}'
44
-
length = grapheme.length(record.get('text', ''))
45
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
46
-
47
-
self.transaction_begin(self.db_cnx)
48
-
49
-
langs = record.get('langs') or ['']
50
-
for lang in langs:
51
-
self.db_cnx.execute("""
52
-
insert into posts(uri, grapheme_length, create_ts, lang)
53
-
values(:uri, :length, :ts, :lang)
54
-
on conflict do update set uri = :uri, create_ts = :ts
55
-
""", dict(uri=post_uri, length=length, ts=ts, lang=lang))
56
-
57
-
def commit_changes(self):
58
-
self.logger.debug('committing changes')
59
-
self.transaction_commit(self.db_cnx)
60
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
61
-
62
-
def serve_feed(self, limit, offset, langs):
63
-
if '*' in langs:
64
-
cur = self.db_cnx.execute("""
65
-
select uri
66
-
from posts
67
-
order by grapheme_length asc
68
-
limit :limit offset :offset
69
-
""", dict(limit=limit, offset=offset))
70
-
return [uri for (uri,) in cur]
71
-
else:
72
-
lang_values = list(langs.values())
73
-
lang_selects = ['select uri, grapheme_length from posts where lang = ?'] * len(lang_values)
74
-
lang_clause = ' union '.join(lang_selects)
75
-
cur = self.db_cnx.execute(
76
-
lang_clause + ' order by grapheme_length asc limit ? offset ?',
77
-
[*lang_values, limit, offset]
78
-
)
79
-
return [uri for (uri, grapheme_length) in cur]
80
-
81
-
def serve_feed_debug(self, limit, offset, langs):
82
-
if '*' in langs:
83
-
query = """
84
-
select *, unixepoch('now') - create_ts as age_seconds
85
-
from posts
86
-
order by grapheme_length asc
87
-
limit :limit offset :offset
88
-
"""
89
-
bindings = [limit, offset]
90
-
else:
91
-
lang_values = list(langs.values())
92
-
lang_selects = ["select *, unixepoch('now') - create_ts as age_seconds from posts where lang = ?"] * len(lang_values)
93
-
lang_clause = ' union '.join(lang_selects)
94
-
query = lang_clause + ' order by grapheme_length asc limit ? offset ?'
95
-
bindings = [*lang_values, limit, offset]
96
-
97
-
return apsw.ext.format_query_table(
98
-
self.db_cnx, query, bindings,
99
-
string_sanitize=2, text_width=9999, use_unicode=True
100
-
)
-107
feeds/homeruns.py
-107
feeds/homeruns.py
···
1
-
import logging
2
-
3
-
import apsw
4
-
import apsw.ext
5
-
6
-
from . import BaseFeed
7
-
8
-
MLBHRS_DID = 'did:plc:pnksqegntq5t3o7pusp2idx3'
9
-
10
-
TEAM_ABBR_LOOKUP = {
11
-
"OAK":"OaklandAthletics",
12
-
"PIT":"PittsburghPirates",
13
-
"SDN":"SanDiegoPadres",
14
-
"SEA":"SeattleMariners",
15
-
"SFN":"SanFranciscoGiants",
16
-
"SLN":"StLouisCardinals",
17
-
"TBA":"TampaBayRays",
18
-
"TEX":"TexasRangers",
19
-
"TOR":"TorontoBlueJays",
20
-
"MIN":"MinnesotaTwins",
21
-
"PHI":"PhiladelphiaPhillies",
22
-
"ATL":"AtlantaBraves",
23
-
"CHA":"ChicagoWhiteSox",
24
-
"MIA":"MiamiMarlins",
25
-
"NYA":"NewYorkYankees",
26
-
"MIL":"MilwaukeeBrewers",
27
-
"LAA":"LosAngelesAngels",
28
-
"ARI":"ArizonaDiamondbacks",
29
-
"BAL":"BaltimoreOrioles",
30
-
"BOS":"BostonRedSox",
31
-
"CHN":"ChicagoCubs",
32
-
"CIN":"CincinnatiReds",
33
-
"CLE":"ClevelandGuardians",
34
-
"COL":"ColoradoRockies",
35
-
"DET":"DetroitTigers",
36
-
"HOU":"HoustonAstros",
37
-
"KCA":"KansasCityRoyals",
38
-
"LAN":"LosAngelesDodgers",
39
-
"WAS":"WashingtonNationals",
40
-
"NYN":"NewYorkMets",
41
-
}
42
-
43
-
class HomeRunsTeamFeed(BaseFeed):
44
-
FEED_URI = 'at://did:plc:pnksqegntq5t3o7pusp2idx3/app.bsky.feed.generator/team:*'
45
-
46
-
def __init__(self):
47
-
self.db_cnx = apsw.Connection('db/homeruns.db')
48
-
self.db_cnx.pragma('journal_mode', 'WAL')
49
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
50
-
51
-
with self.db_cnx:
52
-
self.db_cnx.execute("""
53
-
create table if not exists posts (uri text, tag text);
54
-
create index if not exists tag_idx on posts(tag);
55
-
""")
56
-
57
-
self.logger = logging.getLogger('feeds.homeruns')
58
-
59
-
def process_commit(self, commit):
60
-
if commit['did'] != MLBHRS_DID:
61
-
return
62
-
63
-
if commit['opType'] != 'c':
64
-
return
65
-
66
-
if commit['collection'] != 'app.bsky.feed.post':
67
-
return
68
-
69
-
record = commit.get('record')
70
-
if record is None:
71
-
return
72
-
73
-
uri = 'at://{repo}/app.bsky.feed.post/{rkey}'.format(
74
-
repo = commit['did'],
75
-
rkey = commit['rkey']
76
-
)
77
-
tags = record.get('tags', [])
78
-
79
-
self.logger.debug(f'adding {uri!r} under {tags!r}')
80
-
81
-
with self.db_cnx:
82
-
for tag in tags:
83
-
self.db_cnx.execute(
84
-
"insert into posts (uri, tag) values (:uri, :tag)",
85
-
dict(uri=uri, tag=tag)
86
-
)
87
-
88
-
def commit_changes(self):
89
-
self.logger.debug('committing changes')
90
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
91
-
92
-
def serve_wildcard_feed(self, feed_uri, limit, offset, langs):
93
-
prefix, sep, team_abbr = feed_uri.rpartition(':')
94
-
team_tag = TEAM_ABBR_LOOKUP[team_abbr]
95
-
96
-
cur = self.db_cnx.execute("""
97
-
select uri
98
-
from posts
99
-
where tag = :tag
100
-
order by uri desc
101
-
limit :limit offset :offset
102
-
""", dict(tag=team_tag, limit=limit, offset=offset))
103
-
104
-
return [uri for (uri,) in cur]
105
-
106
-
def serve_wildcard_feed_debug(self, feed_uri, limit, offset, langs):
107
-
pass
-35
feeds/norazone_interesting.py
-35
feeds/norazone_interesting.py
···
1
-
import logging
2
-
3
-
from atproto import Client, models
4
-
import apsw
5
-
import apsw.ext
6
-
7
-
from . import BaseFeed
8
-
9
-
# https://bsky.app/profile/nora.zone/post/3kv35hqi4a22b
10
-
TARGET_QUOTE_URI = 'at://did:plc:4qqizocrnriintskkh6trnzv/app.bsky.feed.post/3kv35hqi4a22b'
11
-
12
-
class NoraZoneInteresting(BaseFeed):
13
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/nz-interesting'
14
-
15
-
def __init__(self):
16
-
self.client = Client('https://public.api.bsky.app')
17
-
18
-
def process_commit(self, commit):
19
-
pass
20
-
21
-
def commit_changes(self):
22
-
pass
23
-
24
-
def serve_feed(self, limit, cursor, langs):
25
-
quotes = self.client.app.bsky.feed.get_quotes(
26
-
models.AppBskyFeedGetQuotes.Params(
27
-
uri = TARGET_QUOTE_URI,
28
-
limit = limit,
29
-
cursor = cursor,
30
-
)
31
-
)
32
-
return {
33
-
'cursor': quotes.cursor,
34
-
'feed': [dict(post=post.uri) for post in quotes.posts],
35
-
}
-90
feeds/popqp.py
-90
feeds/popqp.py
···
1
-
import logging
2
-
3
-
import apsw
4
-
import apsw.ext
5
-
6
-
from . import BaseFeed
7
-
8
-
class PopularQuotePostsFeed(BaseFeed):
9
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/popqp'
10
-
SERVE_FEED_QUERY = """
11
-
select uri, create_ts, update_ts, quote_count, exp( -1 * ( ( unixepoch('now') - create_ts ) / 10800.0 ) ) as decay,
12
-
quote_count * exp( -1 * ( ( unixepoch('now') - create_ts ) / 10800.0 ) ) as score
13
-
from posts
14
-
order by quote_count * exp( -1 * ( ( unixepoch('now') - create_ts ) / 10800.0 ) ) desc
15
-
limit :limit offset :offset
16
-
"""
17
-
DELETE_OLD_POSTS_QUERY = """
18
-
delete from posts where
19
-
quote_count * exp( -1 * ( ( unixepoch('now') - create_ts ) / 10800.0 ) ) < 1.0
20
-
and create_ts < unixepoch('now', '-24 hours')
21
-
"""
22
-
23
-
def __init__(self):
24
-
self.db_cnx = apsw.Connection('db/popqp.db')
25
-
self.db_cnx.pragma('journal_mode', 'WAL')
26
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
27
-
28
-
with self.db_cnx:
29
-
self.db_cnx.execute("""
30
-
create table if not exists posts (
31
-
uri text, create_ts timestamp, update_ts timestamp, quote_count int
32
-
);
33
-
create unique index if not exists uri_idx on posts(uri);
34
-
""")
35
-
36
-
self.logger = logging.getLogger('feeds.popqp')
37
-
38
-
def process_commit(self, commit):
39
-
if commit['opType'] != 'c':
40
-
return
41
-
42
-
if commit['collection'] != 'app.bsky.feed.post':
43
-
return
44
-
45
-
record = commit.get('record')
46
-
if record is None:
47
-
return
48
-
49
-
embed = record.get('embed')
50
-
if embed is None:
51
-
return
52
-
53
-
embed_type = embed.get('$type')
54
-
if embed_type == 'app.bsky.embed.record':
55
-
quote_post_uri = embed['record']['uri']
56
-
elif embed_type == 'app.bsky.embed.recordWithMedia':
57
-
quote_post_uri = embed['record']['record']['uri']
58
-
else:
59
-
return
60
-
61
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
62
-
self.transaction_begin(self.db_cnx)
63
-
64
-
self.db_cnx.execute("""
65
-
insert into posts (uri, create_ts, update_ts, quote_count)
66
-
values (:uri, :ts, :ts, 1)
67
-
on conflict (uri) do
68
-
update set quote_count = quote_count + 1, update_ts = :ts
69
-
""", dict(uri=quote_post_uri, ts=ts))
70
-
71
-
def delete_old_posts(self):
72
-
self.db_cnx.execute(self.DELETE_OLD_POSTS_QUERY)
73
-
self.logger.debug('deleted {} old posts'.format(self.db_cnx.changes()))
74
-
75
-
def commit_changes(self):
76
-
self.delete_old_posts()
77
-
self.logger.debug('committing changes')
78
-
self.transaction_commit(self.db_cnx)
79
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
80
-
81
-
def serve_feed(self, limit, offset, langs):
82
-
cur = self.db_cnx.execute(self.SERVE_FEED_QUERY, dict(limit=limit, offset=offset))
83
-
return [row[0] for row in cur]
84
-
85
-
def serve_feed_debug(self, limit, offset, langs):
86
-
bindings = dict(limit=limit, offset=offset)
87
-
return apsw.ext.format_query_table(
88
-
self.db_cnx, self.SERVE_FEED_QUERY, bindings,
89
-
string_sanitize=2, text_width=9999, use_unicode=True
90
-
)
-98
feeds/rapidfire.py
-98
feeds/rapidfire.py
···
1
-
import logging
2
-
3
-
import apsw
4
-
import apsw.ext
5
-
import grapheme
6
-
7
-
from . import BaseFeed
8
-
9
-
MAX_TEXT_LENGTH = 140
10
-
11
-
class RapidFireFeed(BaseFeed):
12
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire'
13
-
14
-
def __init__(self):
15
-
self.db_cnx = apsw.Connection('db/rapidfire.db')
16
-
self.db_cnx.pragma('journal_mode', 'WAL')
17
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
18
-
19
-
with self.db_cnx:
20
-
self.db_cnx.execute("""
21
-
create table if not exists posts (uri text, create_ts timestamp, lang text);
22
-
create index if not exists create_ts_idx on posts(create_ts);
23
-
""")
24
-
25
-
self.logger = logging.getLogger('feeds.rapidfire')
26
-
27
-
def process_commit(self, commit):
28
-
if commit['opType'] != 'c':
29
-
return
30
-
31
-
if commit['collection'] != 'app.bsky.feed.post':
32
-
return
33
-
34
-
record = commit.get('record')
35
-
if record is None:
36
-
return
37
-
38
-
if all([
39
-
grapheme.length(record.get('text', '')) <= MAX_TEXT_LENGTH,
40
-
record.get('reply') is None,
41
-
record.get('embed') is None,
42
-
record.get('facets') is None
43
-
]):
44
-
repo = commit['did']
45
-
rkey = commit['rkey']
46
-
post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}'
47
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
48
-
49
-
self.transaction_begin(self.db_cnx)
50
-
51
-
langs = record.get('langs') or ['']
52
-
for lang in langs:
53
-
self.db_cnx.execute(
54
-
'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)',
55
-
dict(uri=post_uri, ts=ts, lang=lang)
56
-
)
57
-
58
-
def delete_old_posts(self):
59
-
self.db_cnx.execute(
60
-
"delete from posts where create_ts < unixepoch('now', '-15 minutes')"
61
-
)
62
-
self.logger.debug('deleted {} old posts'.format(self.db_cnx.changes()))
63
-
64
-
def commit_changes(self):
65
-
self.delete_old_posts()
66
-
self.logger.debug('committing changes')
67
-
self.transaction_commit(self.db_cnx)
68
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
69
-
70
-
def serve_feed(self, limit, offset, langs):
71
-
if '*' in langs:
72
-
cur = self.db_cnx.execute(
73
-
"select uri from posts order by create_ts desc limit :limit offset :offset",
74
-
dict(limit=limit, offset=offset)
75
-
)
76
-
return [uri for (uri,) in cur]
77
-
else:
78
-
lang_values = list(langs.values())
79
-
lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values)
80
-
lang_clause = ' union '.join(lang_selects)
81
-
cur = self.db_cnx.execute(
82
-
lang_clause + ' order by create_ts desc limit ? offset ?',
83
-
[*lang_values, limit, offset]
84
-
)
85
-
return [uri for (uri, create_ts) in cur]
86
-
87
-
def serve_feed_debug(self, limit, offset, langs):
88
-
query = """
89
-
select *, unixepoch('now') - create_ts as age_seconds
90
-
from posts
91
-
order by create_ts desc
92
-
limit :limit offset :offset
93
-
"""
94
-
bindings = dict(limit=limit, offset=offset)
95
-
return apsw.ext.format_query_table(
96
-
self.db_cnx, query, bindings,
97
-
string_sanitize=2, text_width=9999, use_unicode=True
98
-
)
-144
feeds/ratio.py
-144
feeds/ratio.py
···
1
-
import logging
2
-
3
-
import apsw
4
-
import apsw.ext
5
-
6
-
from . import BaseFeed
7
-
8
-
class RatioFeed(BaseFeed):
9
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/ratio'
10
-
SERVE_FEED_QUERY = """
11
-
with served as (
12
-
select
13
-
uri,
14
-
create_ts,
15
-
( unixepoch('now') - create_ts ) as age_seconds,
16
-
replies,
17
-
quoteposts,
18
-
likes,
19
-
reposts,
20
-
( replies + quoteposts ) / ( likes + reposts + 1 ) as ratio,
21
-
exp( -1 * ( ( unixepoch('now') - create_ts ) / ( 3600.0 * 16 ) ) ) as decay
22
-
from posts
23
-
)
24
-
select
25
-
*,
26
-
( ratio * decay ) as score
27
-
from served
28
-
where replies > 15 and ratio > 2.5
29
-
order by score desc
30
-
limit :limit offset :offset
31
-
"""
32
-
DELETE_OLD_POSTS_QUERY = """
33
-
delete from posts
34
-
where
35
-
create_ts < unixepoch('now', '-5 days')
36
-
"""
37
-
38
-
def __init__(self):
39
-
self.db_cnx = apsw.Connection('db/ratio.db')
40
-
self.db_cnx.pragma('journal_mode', 'WAL')
41
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
42
-
43
-
with self.db_cnx:
44
-
self.db_cnx.execute("""
45
-
create table if not exists posts (
46
-
uri text, create_ts timestamp,
47
-
replies float, likes float, reposts float, quoteposts float
48
-
);
49
-
create unique index if not exists uri_idx on posts(uri);
50
-
""")
51
-
52
-
self.logger = logging.getLogger('feeds.ratio')
53
-
54
-
def process_commit(self, commit):
55
-
if commit['opType'] != 'c':
56
-
return
57
-
58
-
subject_uri = None
59
-
is_reply = False
60
-
is_quotepost = False
61
-
62
-
if commit['collection'] in {'app.bsky.feed.like', 'app.bsky.feed.repost'}:
63
-
record = commit.get('record')
64
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
65
-
try:
66
-
subject_uri = record['subject']['uri']
67
-
except KeyError:
68
-
return
69
-
elif commit['collection'] == 'app.bsky.feed.post':
70
-
record = commit.get('record')
71
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
72
-
if record.get('reply') is not None:
73
-
is_reply = True
74
-
try:
75
-
subject_uri = record['reply']['parent']['uri']
76
-
except KeyError:
77
-
return
78
-
79
-
# only count non-OP replies
80
-
if subject_uri.startswith('at://' + commit['did']):
81
-
return
82
-
83
-
elif record.get('embed') is not None:
84
-
is_quotepost = True
85
-
t = record['embed']['$type']
86
-
if t == 'app.bsky.embed.record':
87
-
try:
88
-
subject_uri = record['embed']['record']['uri']
89
-
except KeyError:
90
-
return
91
-
elif t == 'app.bsky.embed.recordWithMedia':
92
-
try:
93
-
subject_uri = record['embed']['record']['record']['uri']
94
-
except KeyError:
95
-
return
96
-
97
-
if subject_uri is None:
98
-
return
99
-
100
-
params = {
101
-
'uri': subject_uri,
102
-
'ts': ts,
103
-
'is_reply': int(is_reply),
104
-
'is_like': int(commit['collection'] == 'app.bsky.feed.like'),
105
-
'is_repost': int(commit['collection'] == 'app.bsky.feed.repost'),
106
-
'is_quotepost': int(is_quotepost),
107
-
}
108
-
109
-
self.transaction_begin(self.db_cnx)
110
-
111
-
self.db_cnx.execute("""
112
-
insert into posts(uri, create_ts, replies, likes, reposts, quoteposts)
113
-
values (:uri, :ts,
114
-
case when :is_reply then 1 else 0 end,
115
-
case when :is_like then 1 else 0 end,
116
-
case when :is_repost then 1 else 0 end,
117
-
case when :is_quotepost then 1 else 0 end)
118
-
on conflict(uri)
119
-
do update set
120
-
replies = replies + case when :is_reply then 1 else 0 end,
121
-
likes = likes + case when :is_like then 1 else 0 end,
122
-
reposts = reposts + case when :is_repost then 1 else 0 end,
123
-
quoteposts = quoteposts + case when :is_quotepost then 1 else 0 end
124
-
""", params)
125
-
126
-
def delete_old_posts(self):
127
-
self.db_cnx.execute(self.DELETE_OLD_POSTS_QUERY)
128
-
129
-
def commit_changes(self):
130
-
self.logger.debug('committing changes')
131
-
self.delete_old_posts()
132
-
self.transaction_commit(self.db_cnx)
133
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
134
-
135
-
def serve_feed(self, limit, offset, langs):
136
-
cur = self.db_cnx.execute(self.SERVE_FEED_QUERY, dict(limit=limit, offset=offset))
137
-
return [row[0] for row in cur]
138
-
139
-
def serve_feed_debug(self, limit, offset, langs):
140
-
bindings = dict(limit=limit, offset=offset)
141
-
return apsw.ext.format_query_table(
142
-
self.db_cnx, self.SERVE_FEED_QUERY, bindings,
143
-
string_sanitize=2, text_width=9999, use_unicode=True
144
-
)
-80
feeds/sevendirtywords.py
-80
feeds/sevendirtywords.py
···
1
-
import logging
2
-
import re
3
-
4
-
import apsw
5
-
import apsw.ext
6
-
7
-
from . import BaseFeed
8
-
9
-
# https://en.wikipedia.org/wiki/Seven_dirty_words
10
-
SDW_REGEX = re.compile(r'^(shit|piss|fuck|cunt|cocksucker|motherfucker|tits)[!,./;?~ ]*$', re.I|re.A)
11
-
12
-
class SevenDirtyWordsFeed(BaseFeed):
13
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/sdw'
14
-
15
-
def __init__(self):
16
-
self.db_cnx = apsw.Connection('db/sdw.db')
17
-
self.db_cnx.pragma('journal_mode', 'WAL')
18
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
19
-
20
-
with self.db_cnx:
21
-
self.db_cnx.execute("""
22
-
create table if not exists posts (uri text, create_ts timestamp);
23
-
create unique index if not exists create_ts_idx on posts(create_ts);
24
-
""")
25
-
26
-
self.logger = logging.getLogger('feeds.sdw')
27
-
28
-
def process_commit(self, commit):
29
-
if commit['opType'] != 'c':
30
-
return
31
-
32
-
if commit['collection'] != 'app.bsky.feed.post':
33
-
return
34
-
35
-
record = commit.get('record')
36
-
if record is None:
37
-
return
38
-
39
-
conds = [
40
-
record.get('reply') is None,
41
-
record.get('embed') is None,
42
-
record.get('facets') is None,
43
-
SDW_REGEX.search(record.get('text', '')) is not None,
44
-
]
45
-
46
-
if not all(conds):
47
-
return
48
-
49
-
repo = commit['did']
50
-
rkey = commit['rkey']
51
-
post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}'
52
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
53
-
self.transaction_begin(self.db_cnx)
54
-
self.db_cnx.execute(
55
-
'insert into posts (uri, create_ts) values (:uri, :ts)',
56
-
dict(uri=post_uri, ts=ts)
57
-
)
58
-
59
-
def commit_changes(self):
60
-
self.logger.debug('committing changes')
61
-
self.transaction_commit(self.db_cnx)
62
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
63
-
64
-
def serve_feed(self, limit, offset, langs):
65
-
cur = self.db_cnx.execute("""
66
-
select uri
67
-
from posts
68
-
order by create_ts desc
69
-
limit :limit
70
-
offset :offset
71
-
""", dict(limit=limit, offset=offset))
72
-
return [uri for (uri,) in cur]
73
-
74
-
def serve_feed_debug(self, limit, offset, langs):
75
-
query = "select * from posts order by create_ts desc limit :limit offset :offset"
76
-
bindings = dict(limit=limit, offset=offset)
77
-
return apsw.ext.format_query_table(
78
-
self.db_cnx, query, bindings,
79
-
string_sanitize=2, text_width=9999, use_unicode=True
80
-
)
-65
feedweb.py
-65
feedweb.py
···
1
-
#!/usr/bin/env python3
2
-
3
-
from flask import Flask, request, jsonify
4
-
from prometheus_client import Counter, make_wsgi_app
5
-
from werkzeug.middleware.dispatcher import DispatcherMiddleware
6
-
from werkzeug.datastructures import LanguageAccept
7
-
8
-
from feed_manager import feed_manager
9
-
10
-
feed_requests = Counter('feed_requests', 'requests by feed URI', ['feed'])
11
-
12
-
app = Flask(__name__)
13
-
14
-
@app.route('/xrpc/app.bsky.feed.getFeedSkeleton')
15
-
def get_feed_skeleton():
16
-
try:
17
-
limit = int(request.args.get('limit', 50))
18
-
except ValueError:
19
-
limit = 50
20
-
21
-
if 'nz-interesting' in request.args['feed']:
22
-
offset = request.args.get('cursor')
23
-
else:
24
-
try:
25
-
offset = int(request.args.get('cursor', 0))
26
-
except ValueError:
27
-
offset = 0
28
-
29
-
feed_uri = request.args['feed']
30
-
if feed_uri.endswith('-dev'):
31
-
feed_uri = feed_uri.replace('-dev', '')
32
-
else:
33
-
(prefix, sep, rkey) = feed_uri.rpartition('/')
34
-
feed_requests.labels(rkey).inc()
35
-
36
-
if request.args.getlist('langs'):
37
-
req_langs = request.args.getlist('langs')
38
-
langs = LanguageAccept([(l, 1) for l in req_langs])
39
-
else:
40
-
langs = request.accept_languages
41
-
42
-
if request.args.get('debug', '0') == '1':
43
-
headers = {'Content-Type': 'text/plain; charset=utf-8'}
44
-
debug = feed_manager.serve_feed(feed_uri, limit, offset, langs, debug=True)
45
-
return debug, headers
46
-
47
-
posts = feed_manager.serve_feed(feed_uri, limit, offset, langs, debug=False)
48
-
if isinstance(posts, dict):
49
-
return posts
50
-
51
-
if len(posts) < limit:
52
-
return dict(feed=[dict(post=uri) for uri in posts])
53
-
else:
54
-
offset += len(posts)
55
-
return dict(cursor=str(offset), feed=[dict(post=uri) for uri in posts])
56
-
57
-
app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {
58
-
'/metrics': make_wsgi_app()
59
-
})
60
-
61
-
if __name__ == '__main__':
62
-
from feedweb_utils import did_doc
63
-
app.add_url_rule('/.well-known/did.json', view_func=did_doc)
64
-
65
-
app.run(debug=True)
-14
feedweb_utils.py
-14
feedweb_utils.py
···
1
-
NGROK_HOSTNAME = 'routinely-right-barnacle.ngrok-free.app'
2
-
3
-
def did_doc():
4
-
return {
5
-
'@context': ['https://www.w3.org/ns/did/v1'],
6
-
'id': f'did:web:{NGROK_HOSTNAME}',
7
-
'service': [
8
-
{
9
-
'id': '#bsky_fg',
10
-
'type': 'BskyFeedGenerator',
11
-
'serviceEndpoint': f'https://{NGROK_HOSTNAME}',
12
-
},
13
-
],
14
-
}