this repo has no description

feat(mostliked): only feedweb stuff now

Changed files
-141
feeds
-5
feed_manager.py
··· 53 pass 54 55 feed_manager = FeedManager() 56 - feed_manager.register(RapidFireFeed) 57 feed_manager.register(PopularFeed) 58 - feed_manager.register(HomeRunsTeamFeed) 59 - feed_manager.register(NoraZoneInteresting) 60 - feed_manager.register(SevenDirtyWordsFeed) 61 feed_manager.register(MostLikedFeed) 62 - # feed_manager.register(PopularQuotePostsFeed)
··· 53 pass 54 55 feed_manager = FeedManager() 56 feed_manager.register(PopularFeed) 57 feed_manager.register(MostLikedFeed)
-136
feeds/mostliked.py
··· 2 3 import apsw 4 import apsw.ext 5 - from expiringdict import ExpiringDict 6 - import threading 7 - import queue 8 9 from . import BaseFeed 10 11 - # store post in database once it has this many likes 12 - MIN_LIKES = 5 13 - 14 - class DatabaseWorker(threading.Thread): 15 - def __init__(self, name, db_path, task_queue): 16 - super().__init__() 17 - self.db_cnx = apsw.Connection(db_path) 18 - self.db_cnx.pragma('foreign_keys', True) 19 - self.db_cnx.pragma('journal_mode', 'WAL') 20 - self.db_cnx.pragma('wal_autocheckpoint', '0') 21 - self.stop_signal = False 22 - self.task_queue = task_queue 23 - self.logger = logging.getLogger(f'feeds.db.{name}') 24 - self.changes = 0 25 - 26 - def run(self): 27 - while True: 28 - task = self.task_queue.get(block=True) 29 - if task == 'STOP': 30 - self.logger.debug('received STOP, breaking now') 31 - break 32 - elif task == 'COMMIT': 33 - self.logger.debug(f'committing {self.changes} changes') 34 - if self.db_cnx.in_transaction: 35 - self.db_cnx.execute('COMMIT') 36 - checkpoint = self.db_cnx.execute('PRAGMA wal_checkpoint(PASSIVE)') 37 - self.logger.debug(f'checkpoint: {checkpoint.fetchall()!r}') 38 - self.changes = 0 39 - self.logger.debug(f'qsize: {self.task_queue.qsize()}') 40 - else: 41 - sql, bindings = task 42 - if not self.db_cnx.in_transaction: 43 - self.db_cnx.execute('BEGIN') 44 - self.db_cnx.execute(sql, bindings) 45 - self.changes += self.db_cnx.changes() 46 - self.task_queue.task_done() 47 - 48 - self.logger.debug('closing database connection') 49 - self.db_cnx.close() 50 - 51 - def stop(self): 52 - self.task_queue.put('STOP') 53 - 54 class MostLikedFeed(BaseFeed): 55 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/most-liked' 56 - DELETE_OLD_POSTS_QUERY = """ 57 - delete from posts where create_ts < unixepoch('now', '-24 hours'); 58 - """ 59 60 def __init__(self): 61 self.db_cnx = apsw.Connection('db/mostliked.db') 62 self.db_cnx.pragma('foreign_keys', True) 63 self.db_cnx.pragma('journal_mode', 'WAL') 64 - self.db_cnx.pragma('wal_autocheckpoint', '0') 65 - 66 - with self.db_cnx: 67 - self.db_cnx.execute(""" 68 - create table if not exists posts ( 69 - uri text primary key, 70 - create_ts timestamp, 71 - likes int 72 - ); 73 - create table if not exists langs ( 74 - uri text, 75 - lang text, 76 - foreign key(uri) references posts(uri) on delete cascade 77 - ); 78 - create index if not exists ts_idx on posts(create_ts); 79 - """) 80 - 81 - self.logger = logging.getLogger('feeds.mostliked') 82 - self.drafts = ExpiringDict(max_len=50_000, max_age_seconds=5*60) 83 - 84 - self.db_writes = queue.Queue() 85 - self.db_worker = DatabaseWorker('mostliked', 'db/mostliked.db', self.db_writes) 86 - self.db_worker.start() 87 - 88 - def stop_db_worker(self): 89 - self.logger.debug('sending STOP') 90 - self.db_writes.put('STOP') 91 - 92 - def process_commit(self, commit): 93 - if commit['opType'] != 'c': 94 - return 95 - 96 - if commit['collection'] == 'app.bsky.feed.post': 97 - record = commit.get('record') 98 - post_uri = f"at://{commit['did']}/app.bsky.feed.post/{commit['rkey']}" 99 - 100 - # to keep the DB in check, instead of adding every post right away 101 - # we make note of it but only add to DB once it gets some likes 102 - self.drafts[post_uri] = { 103 - 'ts': self.safe_timestamp(record.get('createdAt')).timestamp(), 104 - 'langs': record.get('langs', []), 105 - 'likes': 0, 106 - } 107 - 108 - elif commit['collection'] == 'app.bsky.feed.like': 109 - record = commit.get('record') 110 - try: 111 - subject_uri = record['subject']['uri'] 112 - except KeyError: 113 - return 114 - 115 - if subject_uri in self.drafts: 116 - record_info = self.drafts.pop(subject_uri).copy() 117 - record_info['likes'] += 1 118 - if record_info['likes'] < MIN_LIKES: 119 - self.drafts[subject_uri] = record_info 120 - return 121 - 122 - self.logger.debug(f'graduating {subject_uri}') 123 - 124 - task = ( 125 - 'insert or ignore into posts (uri, create_ts, likes) values (:uri, :ts, :likes)', 126 - {'uri': subject_uri, 'ts': record_info['ts'], 'likes': record_info['likes']} 127 - ) 128 - self.db_writes.put(task) 129 - 130 - for lang in record_info['langs']: 131 - task = ( 132 - 'insert or ignore into langs (uri, lang) values (:uri, :lang)', 133 - {'uri': subject_uri, 'lang': lang} 134 - ) 135 - self.db_writes.put(task) 136 - 137 - subject_exists = self.db_cnx.execute('select 1 from posts where uri = ?', [subject_uri]) 138 - if subject_exists.fetchone() is None: 139 - return 140 - 141 - task = ( 142 - 'update posts set likes = likes + 1 where uri = :uri', 143 - {'uri': subject_uri} 144 - ) 145 - self.db_writes.put(task) 146 - 147 - def commit_changes(self): 148 - self.db_writes.put((self.DELETE_OLD_POSTS_QUERY, {})) 149 - self.db_writes.put('COMMIT') 150 - self.logger.debug(f'there are {len(self.drafts)} drafts') 151 152 def generate_sql(self, limit, offset, langs): 153 bindings = []
··· 2 3 import apsw 4 import apsw.ext 5 6 from . import BaseFeed 7 8 class MostLikedFeed(BaseFeed): 9 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/most-liked' 10 11 def __init__(self): 12 self.db_cnx = apsw.Connection('db/mostliked.db') 13 self.db_cnx.pragma('foreign_keys', True) 14 self.db_cnx.pragma('journal_mode', 'WAL') 15 16 def generate_sql(self, limit, offset, langs): 17 bindings = []