Async Python Jetstream Client
Python 99.9%
Other 0.1%
9 1 3

Clone this repository

https://tangled.org/nauta.one/atproto_jetstream
git@tangled.org:nauta.one/atproto_jetstream

For self-hosted knots, clone URLs may differ based on your setup.

README.md

atproto_jetstream#

Small, typed, and async package to receive Jetstream events from the AT Protocol.

install#

Using your package manager, install the atproto_jetstream dependency.

  • pip install atproto_jetstream
  • uv add atproto_jetstream

usage#

from asyncio import run
from atproto_jetstream import Jetstream


async def main():
    async with Jetstream("jetstream1.us-east.bsky.network") as stream:
        async for event in stream:
            match event.kind:
                case "account":
                    print(event.account)
                case "identity":
                    print(event.identity)
                case "commit":
                    print(event.commit)


if __name__ == "__main__":
    run(main())