# async python's `async`/`await` syntax is straightforward. the interesting part is how you structure code around it. ## async with the core insight from async python codebases: `async with` is how you manage resources. not try/finally, not callbacks - the context manager protocol. when you open a connection, start a session, or acquire any resource that needs cleanup, you wrap it in an async context manager: ```python @asynccontextmanager async def get_atproto_client( require_auth: bool = False, operation: str = "this operation", target_repo: str | None = None, ) -> AsyncIterator[AsyncClient]: """get an atproto client using credentials from context or environment.""" client = AsyncClient(pds_url) if require_auth and handle and password: await client.login(handle, password) try: yield client finally: pass # AsyncClient doesn't need explicit cleanup ``` the caller writes `async with get_atproto_client() as client:` and cleanup happens automatically. this pattern appears constantly - database connections, HTTP sessions, file handles, locks. from [pdsx/mcp/client.py](https://github.com/zzstoatzz/pdsx/blob/main/src/pdsx/mcp/client.py) the alternative - manual try/finally blocks scattered through the code, or worse, forgetting cleanup entirely - is why this pattern dominates. you encode the lifecycle once in the context manager, and every use site gets it right by default. ## ContextVar python added `contextvars` to solve a specific problem: how do you have request-scoped state in async code without passing it through every function? in sync code, you might use thread-locals. but async tasks can interleave on the same thread, so thread-locals don't work. `ContextVar` gives each task its own copy: ```python from contextvars import ContextVar _current_docket: ContextVar[Docket | None] = ContextVar("docket", default=None) _current_worker: ContextVar[Worker | None] = ContextVar("worker", default=None) _current_server: ContextVar[weakref.ref[FastMCP] | None] = ContextVar("server", default=None) ``` set it at the start of handling a request, and any code called from that task can access it. this is how frameworks like fastapi and fastmcp pass request context without threading it through every function signature. the pattern: set at the boundary (request handler, task entry), read anywhere inside. reset when you're done. from [fastmcp/server/dependencies.py](https://github.com/jlowin/fastmcp/blob/main/src/fastmcp/server/dependencies.py) ## concurrency control `asyncio.gather()` runs tasks concurrently, but sometimes you need to limit how many run at once - rate limits, connection pools, memory constraints. `asyncio.Semaphore` is the primitive for this. acquire before work, release after. the `async with` syntax makes it clean: ```python semaphore = asyncio.Semaphore(concurrency) async def delete_one(uri: str) -> None: """delete a single record with concurrency control.""" async with semaphore: try: await delete_record(client, uri) successful.append(uri) except Exception as e: failed.append((uri, e)) if fail_fast: raise await asyncio.gather(*[delete_one(uri) for uri in uris]) ``` at most `concurrency` delete operations run at once. the rest wait. from [pdsx/_internal/batch.py](https://github.com/zzstoatzz/pdsx/blob/main/src/pdsx/_internal/batch.py) ## connection pools module-level singleton pool, lazily initialized: ```python _pool: asyncpg.Pool | None = None async def get_pool() -> asyncpg.Pool: global _pool if _pool is None: _pool = await asyncpg.create_pool(db_url, min_size=2, max_size=10) return _pool @asynccontextmanager async def get_conn() -> AsyncGenerator[asyncpg.Connection, None]: pool = await get_pool() async with pool.acquire() as conn: yield conn ``` callers use `async with get_conn() as conn:` - pool handles connection lifecycle. ## batch writes with unnest postgres `unnest()` turns arrays into rows. one round trip for thousands of inserts: ```python async def batch_upsert_follows(follows: list[tuple[str, str, str]]) -> None: follower_ids = [f[0] for f in follows] rkeys = [f[1] for f in follows] subject_ids = [f[2] for f in follows] async with get_conn() as conn: await conn.execute( """ INSERT INTO follows (follower_id, rkey, subject_id) SELECT * FROM unnest($1::bigint[], $2::text[], $3::bigint[]) ON CONFLICT (follower_id, rkey) DO UPDATE SET subject_id = EXCLUDED.subject_id """, follower_ids, rkeys, subject_ids, ) ``` from [follower-weight/db.py](https://github.com/zzstoatzz/follower-weight)