async#
python's async/await syntax is straightforward. the interesting part is how you structure code around it.
async with#
the core insight from async python codebases: async with is how you manage resources. not try/finally, not callbacks - the context manager protocol.
when you open a connection, start a session, or acquire any resource that needs cleanup, you wrap it in an async context manager:
@asynccontextmanager
async def get_atproto_client(
require_auth: bool = False,
operation: str = "this operation",
target_repo: str | None = None,
) -> AsyncIterator[AsyncClient]:
"""get an atproto client using credentials from context or environment."""
client = AsyncClient(pds_url)
if require_auth and handle and password:
await client.login(handle, password)
try:
yield client
finally:
pass # AsyncClient doesn't need explicit cleanup
the caller writes async with get_atproto_client() as client: and cleanup happens automatically. this pattern appears constantly - database connections, HTTP sessions, file handles, locks.
from pdsx/mcp/client.py
the alternative - manual try/finally blocks scattered through the code, or worse, forgetting cleanup entirely - is why this pattern dominates. you encode the lifecycle once in the context manager, and every use site gets it right by default.
ContextVar#
python added contextvars to solve a specific problem: how do you have request-scoped state in async code without passing it through every function?
in sync code, you might use thread-locals. but async tasks can interleave on the same thread, so thread-locals don't work. ContextVar gives each task its own copy:
from contextvars import ContextVar
_current_docket: ContextVar[Docket | None] = ContextVar("docket", default=None)
_current_worker: ContextVar[Worker | None] = ContextVar("worker", default=None)
_current_server: ContextVar[weakref.ref[FastMCP] | None] = ContextVar("server", default=None)
set it at the start of handling a request, and any code called from that task can access it. this is how frameworks like fastapi and fastmcp pass request context without threading it through every function signature.
the pattern: set at the boundary (request handler, task entry), read anywhere inside. reset when you're done.
from fastmcp/server/dependencies.py
concurrency control#
asyncio.gather() runs tasks concurrently, but sometimes you need to limit how many run at once - rate limits, connection pools, memory constraints.
asyncio.Semaphore is the primitive for this. acquire before work, release after. the async with syntax makes it clean:
semaphore = asyncio.Semaphore(concurrency)
async def delete_one(uri: str) -> None:
"""delete a single record with concurrency control."""
async with semaphore:
try:
await delete_record(client, uri)
successful.append(uri)
except Exception as e:
failed.append((uri, e))
if fail_fast:
raise
await asyncio.gather(*[delete_one(uri) for uri in uris])
at most concurrency delete operations run at once. the rest wait.
connection pools#
module-level singleton pool, lazily initialized:
_pool: asyncpg.Pool | None = None
async def get_pool() -> asyncpg.Pool:
global _pool
if _pool is None:
_pool = await asyncpg.create_pool(db_url, min_size=2, max_size=10)
return _pool
@asynccontextmanager
async def get_conn() -> AsyncGenerator[asyncpg.Connection, None]:
pool = await get_pool()
async with pool.acquire() as conn:
yield conn
callers use async with get_conn() as conn: - pool handles connection lifecycle.
batch writes with unnest#
postgres unnest() turns arrays into rows. one round trip for thousands of inserts:
async def batch_upsert_follows(follows: list[tuple[str, str, str]]) -> None:
follower_ids = [f[0] for f in follows]
rkeys = [f[1] for f in follows]
subject_ids = [f[2] for f in follows]
async with get_conn() as conn:
await conn.execute(
"""
INSERT INTO follows (follower_id, rkey, subject_id)
SELECT * FROM unnest($1::bigint[], $2::text[], $3::bigint[])
ON CONFLICT (follower_id, rkey) DO UPDATE
SET subject_id = EXCLUDED.subject_id
""",
follower_ids, rkeys, subject_ids,
)