The code for my Bluesky feed generator

Initial commit

authored by Ezra Boeth and committed by GitHub af818b1d

+44
.env.example
··· 1 + # Set this to the hostname that you intend to run the service at 2 + HOSTNAME='feed.example.com' 3 + 4 + # You can obtain it by publishing of feed (run publish_feed.py) 5 + FEED_URI='at://did:plc:abcde.../app.bsky.feed.generator/example-fid-name...' 6 + 7 + # Your handle name 8 + HANDLE='' 9 + 10 + # Your app password 11 + PASSWORD='' 12 + 13 + # A short name for the record that will show in urls 14 + # Lowercase with no spaces. 15 + # Ex: whats-hot 16 + RECORD_NAME='' 17 + 18 + # A display name for your feed 19 + # Ex: What's Hot 20 + DISPLAY_NAME='' 21 + 22 + # (Optional) A description of your feed 23 + # Ex: Top trending content from the whole network 24 + DESCRIPTION='powered by The AT Protocol SDK for Python' 25 + 26 + # (Optional) The path to an image to be used as your feed's avatar 27 + # Ex: ./path/to/avatar.jpeg 28 + AVATAR_PATH='' 29 + 30 + # (Optional). Only use this if you want a service did different from did:web 31 + SERVICE_DID='' 32 + 33 + # (Optional). If your feed accepts interactions from clients 34 + ACCEPTS_INTERACTIONS='false' 35 + 36 + # (Optional). If your feed is a video feed 37 + IS_VIDEO_FEED='false' 38 + 39 + # (Optional). Ignore reply posts 40 + #IGNORE_REPLY_POSTS='true' 41 + 42 + # (Optional). Ignore posts with a created_at timestamp older than 1 day 43 + # to avoid including archived posts from X/Twitter 44 + #IGNORE_OLD_POSTS='true'
+3
.flaskenv
··· 1 + FLASK_APP=server.app 2 + FLASK_RUN_PORT=8000 3 + FLASK_RUN_HOST=127.0.0.1
+1
.github/FUNDING.yml
··· 1 + github: MarshalX
+168
.gitignore
··· 1 + .DS_Store 2 + .idea 3 + *.iml 4 + .env 5 + *.db 6 + 7 + # Byte-compiled / optimized / DLL files 8 + __pycache__/ 9 + *.py[cod] 10 + *$py.class 11 + 12 + # C extensions 13 + *.so 14 + 15 + # Distribution / packaging 16 + .Python 17 + bin/ 18 + build/ 19 + develop-eggs/ 20 + dist/ 21 + downloads/ 22 + eggs/ 23 + .eggs/ 24 + lib/ 25 + lib64/ 26 + parts/ 27 + sdist/ 28 + var/ 29 + wheels/ 30 + share/python-wheels/ 31 + *.egg-info/ 32 + .installed.cfg 33 + *.egg 34 + MANIFEST 35 + 36 + # PyInstaller 37 + # Usually these files are written by a python script from a template 38 + # before PyInstaller builds the exe, so as to inject date/other infos into it. 39 + *.manifest 40 + *.spec 41 + 42 + # Installer logs 43 + pip-log.txt 44 + pip-delete-this-directory.txt 45 + 46 + # Unit test / coverage reports 47 + htmlcov/ 48 + .tox/ 49 + .nox/ 50 + .coverage 51 + .coverage.* 52 + .cache 53 + nosetests.xml 54 + coverage.xml 55 + *.cover 56 + *.py,cover 57 + .hypothesis/ 58 + .pytest_cache/ 59 + cover/ 60 + 61 + # Translations 62 + *.mo 63 + *.pot 64 + 65 + # Django stuff: 66 + *.log 67 + local_settings.py 68 + db.sqlite3 69 + db.sqlite3-journal 70 + 71 + # Flask stuff: 72 + instance/ 73 + .webassets-cache 74 + 75 + # Scrapy stuff: 76 + .scrapy 77 + 78 + # Sphinx documentation 79 + docs/_build/ 80 + 81 + # PyBuilder 82 + .pybuilder/ 83 + target/ 84 + 85 + # Jupyter Notebook 86 + .ipynb_checkpoints 87 + 88 + # IPython 89 + profile_default/ 90 + ipython_config.py 91 + 92 + # pyenv 93 + # For a library or package, you might want to ignore these files since the code is 94 + # intended to run in multiple environments; otherwise, check them in: 95 + # .python-version 96 + 97 + # pipenv 98 + # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. 99 + # However, in case of collaboration, if having platform-specific dependencies or dependencies 100 + # having no cross-platform support, pipenv may install dependencies that don't work, or not 101 + # install all needed dependencies. 102 + #Pipfile.lock 103 + 104 + # poetry 105 + # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. 106 + # This is especially recommended for binary packages to ensure reproducibility, and is more 107 + # commonly ignored for libraries. 108 + # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control 109 + #poetry.lock 110 + 111 + # pdm 112 + # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. 113 + #pdm.lock 114 + # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it 115 + # in version control. 116 + # https://pdm.fming.dev/#use-with-ide 117 + .pdm.toml 118 + 119 + # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm 120 + __pypackages__/ 121 + 122 + # Celery stuff 123 + celerybeat-schedule 124 + celerybeat.pid 125 + 126 + # SageMath parsed files 127 + *.sage.py 128 + 129 + # Environments 130 + .env 131 + .venv 132 + env/ 133 + venv/ 134 + ENV/ 135 + env.bak/ 136 + venv.bak/ 137 + pyvenv.cfg 138 + 139 + # Spyder project settings 140 + .spyderproject 141 + .spyproject 142 + 143 + # Rope project settings 144 + .ropeproject 145 + 146 + # mkdocs documentation 147 + /site 148 + 149 + # mypy 150 + .mypy_cache/ 151 + .dmypy.json 152 + dmypy.json 153 + 154 + # Pyre type checker 155 + .pyre/ 156 + 157 + # pytype static type analyzer 158 + .pytype/ 159 + 160 + # Cython debug symbols 161 + cython_debug/ 162 + 163 + # PyCharm 164 + # JetBrains specific template is maintained in a separate JetBrains.gitignore that can 165 + # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore 166 + # and can be added to the global gitignore or merged into this file. For a more nuclear 167 + # option (not recommended) you can uncomment the following to ignore the entire idea folder. 168 + #.idea/
+8
.vscode/settings.json
··· 1 + { 2 + "cSpell.words": [ 3 + "ATPROTO", 4 + "bluesky", 5 + "bsky", 6 + "dotenv" 7 + ] 8 + }
+21
LICENSE
··· 1 + MIT License 2 + 3 + Copyright (c) 2023 Ilya Siamionau 4 + 5 + Permission is hereby granted, free of charge, to any person obtaining a copy 6 + of this software and associated documentation files (the "Software"), to deal 7 + in the Software without restriction, including without limitation the rights 8 + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 + copies of the Software, and to permit persons to whom the Software is 10 + furnished to do so, subject to the following conditions: 11 + 12 + The above copyright notice and this permission notice shall be included in all 13 + copies or substantial portions of the Software. 14 + 15 + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 21 + SOFTWARE.
+70
README.md
··· 1 + # ATProto Feed Generator powered by [The AT Protocol SDK for Python](https://github.com/MarshalX/atproto) 2 + 3 + > Feed Generators are services that provide custom algorithms to users through the AT Protocol. 4 + 5 + Official overview (read it first): https://github.com/bluesky-social/feed-generator#overview 6 + 7 + ## Getting Started 8 + 9 + We've set up this simple server with SQLite to store and query data. Feel free to switch this out for whichever database you prefer. 10 + 11 + Next, you will need to do two things: 12 + 13 + 1. Implement filtering logic in `server/data_filter.py`. 14 + 2. Copy `.env.example` to `.env` 15 + 3. Optionally implement custom feed generation logic in `server/algos`. 16 + 17 + We've taken care of setting this server up with a did:web. However, you're free to switch this out for did:plc if you like - you may want to if you expect this Feed Generator to be long-standing and possibly migrating domains. 18 + 19 + ## Publishing your feed 20 + 21 + To publish your feed, simply run `python publish_feed.py`. 22 + 23 + To update your feed's display data (name, avatar, description, etc.), just update the relevant variables in `.env` and re-run the script. 24 + 25 + After successfully running the script, you should be able to see your feed from within the app, as well as share it by embedding a link in a post (similar to a quote post). 26 + 27 + ## Running the Server 28 + 29 + Install Python 3.7+. 30 + 31 + Run `setupvenv.sh` to setup a virtual environment and install the dependencies: 32 + 33 + ```shell 34 + ./setupvenv.sh 35 + ``` 36 + 37 + **Note**: To get value for `FEED_URI` you need to publish the feed first 38 + 39 + To run a development Flask server: 40 + 41 + ```shell 42 + flask run 43 + ``` 44 + 45 + **Warning** The Flask development server is not designed for production use. In production, you should use production WSGI server such as [`waitress`](https://flask.palletsprojects.com/en/stable/deploying/waitress/) behind a reverse proxy such as NGINX instead. 46 + 47 + ```shell 48 + pip install waitress 49 + waitress-serve --listen=127.0.0.1:8080 server.app:app 50 + ``` 51 + 52 + To run a development server with debugging: 53 + 54 + ```shell 55 + flask --debug run 56 + ``` 57 + 58 + **Note**: Duplication of data stream instances in debug mode is fine. 59 + 60 + **Warning**: If you want to run server in many workers, you should run Data Stream (Firehose) separately. 61 + 62 + ### Endpoints 63 + 64 + - `/.well-known/did.json` 65 + - `/xrpc/app.bsky.feed.describeFeedGenerator` 66 + - `/xrpc/app.bsky.feed.getFeedSkeleton` 67 + 68 + ## License 69 + 70 + MIT
+102
publish_feed.py
··· 1 + #!/usr/bin/env python3 2 + # YOU MUST INSTALL ATPROTO SDK 3 + # pip3 install atproto 4 + 5 + import os 6 + 7 + from dotenv import load_dotenv 8 + from atproto import Client, models 9 + 10 + load_dotenv() 11 + 12 + def _get_bool_env_var(value: str) -> bool: 13 + # Helper function to convert string to bool 14 + 15 + if value is None: 16 + return False 17 + 18 + normalized_value = value.strip().lower() 19 + if normalized_value in {'1', 'true', 't', 'yes', 'y'}: 20 + return True 21 + 22 + return False 23 + 24 + 25 + # YOUR bluesky handle 26 + # Ex: user.bsky.social 27 + HANDLE: str = os.environ.get('HANDLE') 28 + 29 + # YOUR bluesky password, or preferably an App Password (found in your client settings) 30 + # Ex: abcd-1234-efgh-5678 31 + PASSWORD: str = os.environ.get('PASSWORD') 32 + 33 + # The hostname of the server where feed server will be hosted 34 + # Ex: feed.bsky.dev 35 + HOSTNAME: str = os.environ.get('HOSTNAME') 36 + 37 + # A short name for the record that will show in urls 38 + # Lowercase with no spaces. 39 + # Ex: whats-hot 40 + RECORD_NAME: str = os.environ.get('RECORD_NAME') 41 + 42 + # A display name for your feed 43 + # Ex: What's Hot 44 + DISPLAY_NAME: str = os.environ.get('DISPLAY_NAME') 45 + 46 + # (Optional) A description of your feed 47 + # Ex: Top trending content from the whole network 48 + DESCRIPTION: str = os.environ.get('DESCRIPTION') 49 + 50 + # (Optional) The path to an image to be used as your feed's avatar 51 + # Ex: ./path/to/avatar.jpeg 52 + AVATAR_PATH: str = os.environ.get('AVATAR_PATH') 53 + 54 + # (Optional). Only use this if you want a service did different from did:web 55 + SERVICE_DID: str = os.environ.get('SERVICE_DID') 56 + 57 + # (Optional). If your feed accepts interactions from clients 58 + ACCEPTS_INTERACTIONS: bool = _get_bool_env_var(os.environ.get('ACCEPTS_INTERACTIONS')) 59 + 60 + # (Optional). If your feed is a video feed 61 + IS_VIDEO_FEED: bool = _get_bool_env_var(os.environ.get('IS_VIDEO_FEED')) 62 + 63 + # ------------------------------------- 64 + # NO NEED TO TOUCH ANYTHING BELOW HERE 65 + # ------------------------------------- 66 + 67 + 68 + def main(): 69 + client = Client() 70 + client.login(HANDLE, PASSWORD) 71 + 72 + feed_did = SERVICE_DID 73 + if not feed_did: 74 + feed_did = f'did:web:{HOSTNAME}' 75 + 76 + avatar_blob = None 77 + if AVATAR_PATH: 78 + with open(AVATAR_PATH, 'rb') as f: 79 + avatar_data = f.read() 80 + avatar_blob = client.upload_blob(avatar_data).blob 81 + 82 + response = client.com.atproto.repo.put_record(models.ComAtprotoRepoPutRecord.Data( 83 + repo=client.me.did, 84 + collection=models.ids.AppBskyFeedGenerator, 85 + rkey=RECORD_NAME, 86 + record=models.AppBskyFeedGenerator.Record( 87 + did=feed_did, 88 + display_name=DISPLAY_NAME, 89 + accepts_interactions=ACCEPTS_INTERACTIONS, 90 + description=DESCRIPTION, 91 + avatar=avatar_blob, 92 + content_mode='app.bsky.feed.defs#contentModeVideo' if IS_VIDEO_FEED else None, 93 + created_at=client.get_current_time_iso(), 94 + ) 95 + )) 96 + 97 + print('Successfully published!') 98 + print('Feed URI (put in "FEED_URI" env var):', response.uri) 99 + 100 + 101 + if __name__ == '__main__': 102 + main()
+4
requirements.txt
··· 1 + atproto==0.0.59 2 + peewee~=3.16.2 3 + Flask~=2.3.2 4 + python-dotenv~=1.0.0
server/__init__.py

This is a binary file and will not be displayed.

+9
server/__main__.py
··· 1 + import logging 2 + 3 + from app import app 4 + from server.logger import logger 5 + 6 + if __name__ == '__main__': 7 + # FOR DEBUG PURPOSE ONLY 8 + logger.setLevel(logging.DEBUG) 9 + app.run(host='127.0.0.1', port=8000, debug=True)
+5
server/algos/__init__.py
··· 1 + from . import feed 2 + 3 + algos = { 4 + feed.uri: feed.handler 5 + }
+38
server/algos/feed.py
··· 1 + from datetime import datetime 2 + from typing import Optional 3 + 4 + from server import config 5 + from server.database import Post 6 + 7 + uri = config.FEED_URI 8 + CURSOR_EOF = 'eof' 9 + 10 + 11 + def handler(cursor: Optional[str], limit: int) -> dict: 12 + posts = Post.select().order_by(Post.cid.desc()).order_by(Post.indexed_at.desc()).limit(limit) 13 + 14 + if cursor: 15 + if cursor == CURSOR_EOF: 16 + return { 17 + 'cursor': CURSOR_EOF, 18 + 'feed': [] 19 + } 20 + cursor_parts = cursor.split('::') 21 + if len(cursor_parts) != 2: 22 + raise ValueError('Malformed cursor') 23 + 24 + indexed_at, cid = cursor_parts 25 + indexed_at = datetime.fromtimestamp(int(indexed_at) / 1000) 26 + posts = posts.where(((Post.indexed_at == indexed_at) & (Post.cid < cid)) | (Post.indexed_at < indexed_at)) 27 + 28 + feed = [{'post': post.uri} for post in posts] 29 + 30 + cursor = CURSOR_EOF 31 + last_post = posts[-1] if posts else None 32 + if last_post: 33 + cursor = f'{int(last_post.indexed_at.timestamp() * 1000)}::{last_post.cid}' 34 + 35 + return { 36 + 'cursor': cursor, 37 + 'feed': feed 38 + }
+90
server/app.py
··· 1 + import sys 2 + import signal 3 + import threading 4 + 5 + from server import config 6 + from server import data_stream 7 + 8 + from flask import Flask, jsonify, request 9 + 10 + from server.algos import algos 11 + from server.data_filter import operations_callback 12 + 13 + app = Flask(__name__) 14 + 15 + stream_stop_event = threading.Event() 16 + stream_thread = threading.Thread( 17 + target=data_stream.run, args=(config.SERVICE_DID, operations_callback, stream_stop_event,) 18 + ) 19 + stream_thread.start() 20 + 21 + 22 + def sigint_handler(*_): 23 + print('Stopping data stream...') 24 + stream_stop_event.set() 25 + sys.exit(0) 26 + 27 + 28 + signal.signal(signal.SIGINT, sigint_handler) 29 + 30 + 31 + @app.route('/') 32 + def index(): 33 + return 'ATProto Feed Generator powered by The AT Protocol SDK for Python (https://github.com/MarshalX/atproto).' 34 + 35 + 36 + @app.route('/.well-known/did.json', methods=['GET']) 37 + def did_json(): 38 + if not config.SERVICE_DID.endswith(config.HOSTNAME): 39 + return '', 404 40 + 41 + return jsonify({ 42 + '@context': ['https://www.w3.org/ns/did/v1'], 43 + 'id': config.SERVICE_DID, 44 + 'service': [ 45 + { 46 + 'id': '#bsky_fg', 47 + 'type': 'BskyFeedGenerator', 48 + 'serviceEndpoint': f'https://{config.HOSTNAME}' 49 + } 50 + ] 51 + }) 52 + 53 + 54 + @app.route('/xrpc/app.bsky.feed.describeFeedGenerator', methods=['GET']) 55 + def describe_feed_generator(): 56 + feeds = [{'uri': uri} for uri in algos.keys()] 57 + response = { 58 + 'encoding': 'application/json', 59 + 'body': { 60 + 'did': config.SERVICE_DID, 61 + 'feeds': feeds 62 + } 63 + } 64 + return jsonify(response) 65 + 66 + 67 + @app.route('/xrpc/app.bsky.feed.getFeedSkeleton', methods=['GET']) 68 + def get_feed_skeleton(): 69 + feed = request.args.get('feed', default=None, type=str) 70 + algo = algos.get(feed) 71 + if not algo: 72 + return 'Unsupported algorithm', 400 73 + 74 + # Example of how to check auth if giving user-specific results: 75 + """ 76 + from server.auth import AuthorizationError, validate_auth 77 + try: 78 + requester_did = validate_auth(request) 79 + except AuthorizationError: 80 + return 'Unauthorized', 401 81 + """ 82 + 83 + try: 84 + cursor = request.args.get('cursor', default=None, type=str) 85 + limit = request.args.get('limit', default=20, type=int) 86 + body = algo(cursor, limit) 87 + except ValueError: 88 + return 'Malformed cursor', 400 89 + 90 + return jsonify(body)
+41
server/auth.py
··· 1 + from atproto import DidInMemoryCache, IdResolver, verify_jwt 2 + from atproto.exceptions import TokenInvalidSignatureError 3 + from flask import Request 4 + 5 + 6 + _CACHE = DidInMemoryCache() 7 + _ID_RESOLVER = IdResolver(cache=_CACHE) 8 + 9 + _AUTHORIZATION_HEADER_NAME = 'Authorization' 10 + _AUTHORIZATION_HEADER_VALUE_PREFIX = 'Bearer ' 11 + 12 + 13 + class AuthorizationError(Exception): 14 + ... 15 + 16 + 17 + def validate_auth(request: 'Request') -> str: 18 + """Validate authorization header. 19 + 20 + Args: 21 + request: The request to validate. 22 + 23 + Returns: 24 + :obj:`str`: Requester DID. 25 + 26 + Raises: 27 + :obj:`AuthorizationError`: If the authorization header is invalid. 28 + """ 29 + auth_header = request.headers.get(_AUTHORIZATION_HEADER_NAME) 30 + if not auth_header: 31 + raise AuthorizationError('Authorization header is missing') 32 + 33 + if not auth_header.startswith(_AUTHORIZATION_HEADER_VALUE_PREFIX): 34 + raise AuthorizationError('Invalid authorization header') 35 + 36 + jwt = auth_header[len(_AUTHORIZATION_HEADER_VALUE_PREFIX) :].strip() 37 + 38 + try: 39 + return verify_jwt(jwt, _ID_RESOLVER.did.resolve_atproto_key).iss 40 + except TokenInvalidSignatureError as e: 41 + raise AuthorizationError('Invalid signature') from e
+42
server/config.py
··· 1 + import os 2 + import logging 3 + 4 + from dotenv import load_dotenv 5 + 6 + from server.logger import logger 7 + 8 + load_dotenv() 9 + 10 + SERVICE_DID = os.environ.get('SERVICE_DID') 11 + HOSTNAME = os.environ.get('HOSTNAME') 12 + FLASK_RUN_FROM_CLI = os.environ.get('FLASK_RUN_FROM_CLI') 13 + 14 + if FLASK_RUN_FROM_CLI: 15 + logger.setLevel(logging.DEBUG) 16 + 17 + if not HOSTNAME: 18 + raise RuntimeError('You should set "HOSTNAME" environment variable first.') 19 + 20 + if not SERVICE_DID: 21 + SERVICE_DID = f'did:web:{HOSTNAME}' 22 + 23 + 24 + FEED_URI = os.environ.get('FEED_URI') 25 + if not FEED_URI: 26 + raise RuntimeError('Publish your feed first (run publish_feed.py) to obtain Feed URI. ' 27 + 'Set this URI to "FEED_URI" environment variable.') 28 + 29 + 30 + def _get_bool_env_var(value: str) -> bool: 31 + if value is None: 32 + return False 33 + 34 + normalized_value = value.strip().lower() 35 + if normalized_value in {'1', 'true', 't', 'yes', 'y'}: 36 + return True 37 + 38 + return False 39 + 40 + 41 + IGNORE_ARCHIVED_POSTS = _get_bool_env_var(os.environ.get('IGNORE_ARCHIVED_POSTS')) 42 + IGNORE_REPLY_POSTS = _get_bool_env_var(os.environ.get('IGNORE_REPLY_POSTS'))
+98
server/data_filter.py
··· 1 + import datetime 2 + 3 + from collections import defaultdict 4 + 5 + from atproto import models 6 + 7 + from server import config 8 + from server.logger import logger 9 + from server.database import db, Post 10 + 11 + 12 + def is_archive_post(record: 'models.AppBskyFeedPost.Record') -> bool: 13 + # Sometimes users will import old posts from Twitter/X which con flood a feed with 14 + # old posts. Unfortunately, the only way to test for this is to look an old 15 + # created_at date. However, there are other reasons why a post might have an old 16 + # date, such as firehose or firehose consumer outages. It is up to you, the feed 17 + # creator to weigh the pros and cons, amd and optionally include this function in 18 + # your filter conditions, and adjust the threshold to your liking. 19 + # 20 + # See https://github.com/MarshalX/bluesky-feed-generator/pull/21 21 + 22 + archived_threshold = datetime.timedelta(days=1) 23 + created_at = datetime.datetime.fromisoformat(record.created_at) 24 + now = datetime.datetime.now(datetime.UTC) 25 + 26 + return now - created_at > archived_threshold 27 + 28 + 29 + def should_ignore_post(created_post: dict) -> bool: 30 + record = created_post['record'] 31 + uri = created_post['uri'] 32 + 33 + if config.IGNORE_ARCHIVED_POSTS and is_archive_post(record): 34 + logger.debug(f'Ignoring archived post: {uri}') 35 + return True 36 + 37 + if config.IGNORE_REPLY_POSTS and record.reply: 38 + logger.debug(f'Ignoring reply post: {uri}') 39 + return True 40 + 41 + return False 42 + 43 + 44 + def operations_callback(ops: defaultdict) -> None: 45 + # Here we can filter, process, run ML classification, etc. 46 + # After our feed alg we can save posts into our DB 47 + # Also, we should process deleted posts to remove them from our DB and keep it in sync 48 + 49 + # for example, let's create our custom feed that will contain all posts that contains 'python' related text 50 + 51 + posts_to_create = [] 52 + for created_post in ops[models.ids.AppBskyFeedPost]['created']: 53 + author = created_post['author'] 54 + record = created_post['record'] 55 + 56 + post_with_images = isinstance(record.embed, models.AppBskyEmbedImages.Main) 57 + post_with_video = isinstance(record.embed, models.AppBskyEmbedVideo.Main) 58 + inlined_text = record.text.replace('\n', ' ') 59 + 60 + # print all texts just as demo that data stream works 61 + logger.debug( 62 + f'NEW POST ' 63 + f'[CREATED_AT={record.created_at}]' 64 + f'[AUTHOR={author}]' 65 + f'[WITH_IMAGE={post_with_images}]' 66 + f'[WITH_VIDEO={post_with_video}]' 67 + f': {inlined_text}' 68 + ) 69 + 70 + if should_ignore_post(created_post): 71 + continue 72 + 73 + # only python-related posts 74 + if 'python' in record.text.lower(): 75 + reply_root = reply_parent = None 76 + if record.reply: 77 + reply_root = record.reply.root.uri 78 + reply_parent = record.reply.parent.uri 79 + 80 + post_dict = { 81 + 'uri': created_post['uri'], 82 + 'cid': created_post['cid'], 83 + 'reply_parent': reply_parent, 84 + 'reply_root': reply_root, 85 + } 86 + posts_to_create.append(post_dict) 87 + 88 + posts_to_delete = ops[models.ids.AppBskyFeedPost]['deleted'] 89 + if posts_to_delete: 90 + post_uris_to_delete = [post['uri'] for post in posts_to_delete] 91 + Post.delete().where(Post.uri.in_(post_uris_to_delete)) 92 + logger.debug(f'Deleted from feed: {len(post_uris_to_delete)}') 93 + 94 + if posts_to_create: 95 + with db.atomic(): 96 + for post_dict in posts_to_create: 97 + Post.create(**post_dict) 98 + logger.debug(f'Added to feed: {len(posts_to_create)}')
+96
server/data_stream.py
··· 1 + import logging 2 + from collections import defaultdict 3 + 4 + from atproto import AtUri, CAR, firehose_models, FirehoseSubscribeReposClient, models, parse_subscribe_repos_message 5 + from atproto.exceptions import FirehoseError 6 + 7 + from server.database import SubscriptionState 8 + from server.logger import logger 9 + 10 + _INTERESTED_RECORDS = { 11 + models.AppBskyFeedLike: models.ids.AppBskyFeedLike, 12 + models.AppBskyFeedPost: models.ids.AppBskyFeedPost, 13 + models.AppBskyGraphFollow: models.ids.AppBskyGraphFollow, 14 + } 15 + 16 + 17 + def _get_ops_by_type(commit: models.ComAtprotoSyncSubscribeRepos.Commit) -> defaultdict: 18 + operation_by_type = defaultdict(lambda: {'created': [], 'deleted': []}) 19 + 20 + car = CAR.from_bytes(commit.blocks) 21 + for op in commit.ops: 22 + if op.action == 'update': 23 + # we are not interested in updates 24 + continue 25 + 26 + uri = AtUri.from_str(f'at://{commit.repo}/{op.path}') 27 + 28 + if op.action == 'create': 29 + if not op.cid: 30 + continue 31 + 32 + create_info = {'uri': str(uri), 'cid': str(op.cid), 'author': commit.repo} 33 + 34 + record_raw_data = car.blocks.get(op.cid) 35 + if not record_raw_data: 36 + continue 37 + 38 + record = models.get_or_create(record_raw_data, strict=False) 39 + if record is None: # unknown record (out of bsky lexicon) 40 + continue 41 + 42 + for record_type, record_nsid in _INTERESTED_RECORDS.items(): 43 + if uri.collection == record_nsid and models.is_record_type(record, record_type): 44 + operation_by_type[record_nsid]['created'].append({'record': record, **create_info}) 45 + break 46 + 47 + if op.action == 'delete': 48 + operation_by_type[uri.collection]['deleted'].append({'uri': str(uri)}) 49 + 50 + return operation_by_type 51 + 52 + 53 + def run(name, operations_callback, stream_stop_event=None): 54 + while stream_stop_event is None or not stream_stop_event.is_set(): 55 + try: 56 + _run(name, operations_callback, stream_stop_event) 57 + except FirehoseError as e: 58 + if logger.level == logging.DEBUG: 59 + raise e 60 + logger.error(f'Firehose error: {e}. Reconnecting to the firehose.') 61 + 62 + 63 + def _run(name, operations_callback, stream_stop_event=None): 64 + state = SubscriptionState.get_or_none(SubscriptionState.service == name) 65 + 66 + params = None 67 + if state: 68 + params = models.ComAtprotoSyncSubscribeRepos.Params(cursor=state.cursor) 69 + 70 + client = FirehoseSubscribeReposClient(params) 71 + 72 + if not state: 73 + SubscriptionState.create(service=name, cursor=0) 74 + 75 + def on_message_handler(message: firehose_models.MessageFrame) -> None: 76 + # stop on next message if requested 77 + if stream_stop_event and stream_stop_event.is_set(): 78 + client.stop() 79 + return 80 + 81 + commit = parse_subscribe_repos_message(message) 82 + if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit): 83 + return 84 + 85 + # update stored state every ~1k events 86 + if commit.seq % 1000 == 0: # lower value could lead to performance issues 87 + logger.debug(f'Updated cursor for {name} to {commit.seq}') 88 + client.update_params(models.ComAtprotoSyncSubscribeRepos.Params(cursor=commit.seq)) 89 + SubscriptionState.update(cursor=commit.seq).where(SubscriptionState.service == name).execute() 90 + 91 + if not commit.blocks: 92 + return 93 + 94 + operations_callback(_get_ops_by_type(commit)) 95 + 96 + client.start(on_message_handler)
+28
server/database.py
··· 1 + from datetime import datetime 2 + 3 + import peewee 4 + 5 + db = peewee.SqliteDatabase('feed_database.db') 6 + 7 + 8 + class BaseModel(peewee.Model): 9 + class Meta: 10 + database = db 11 + 12 + 13 + class Post(BaseModel): 14 + uri = peewee.CharField(index=True) 15 + cid = peewee.CharField() 16 + reply_parent = peewee.CharField(null=True, default=None) 17 + reply_root = peewee.CharField(null=True, default=None) 18 + indexed_at = peewee.DateTimeField(default=datetime.utcnow) 19 + 20 + 21 + class SubscriptionState(BaseModel): 22 + service = peewee.CharField(unique=True) 23 + cursor = peewee.BigIntegerField() 24 + 25 + 26 + if db.is_closed(): 27 + db.connect() 28 + db.create_tables([Post, SubscriptionState])
+4
server/logger.py
··· 1 + import logging 2 + 3 + logger = logging.getLogger(__name__) 4 + logging.basicConfig(level=logging.INFO)
+11
setupvenv.sh
··· 1 + #!/usr/bin/env bash 2 + 3 + set -e 4 + 5 + if [ ! -d "venv" ]; then 6 + virtualenv venv || exit 7 + fi 8 + 9 + . venv/bin/activate 10 + 11 + pip install -r requirements.txt