+24
languages/python/ecosystem/pydantic.md
+24
languages/python/ecosystem/pydantic.md
···
83
83
84
84
from [coping with python's type system](https://blog.zzstoatzz.io/coping-with-python-type-system/)
85
85
86
+
## model_validator for side effects
87
+
88
+
run setup code when settings load:
89
+
90
+
```python
91
+
from typing import Self
92
+
from pydantic import model_validator
93
+
from pydantic_settings import BaseSettings
94
+
95
+
class Settings(BaseSettings):
96
+
debug: bool = False
97
+
98
+
@model_validator(mode="after")
99
+
def configure_logging(self) -> Self:
100
+
setup_logging(debug=self.debug)
101
+
return self
102
+
103
+
settings = Settings() # logging configured on import
104
+
```
105
+
106
+
the validator runs after all fields are set. use for side effects that depend on configuration values.
107
+
108
+
from [bot/config.py](https://github.com/zzstoatzz/bot)
109
+
86
110
## when to use what
87
111
88
112
pydantic models are heavier than they look - they do a lot of work on instantiation. for internal data you control, python's `dataclasses` are simpler:
+46
languages/python/language/async.md
+46
languages/python/language/async.md
···
77
77
at most `concurrency` delete operations run at once. the rest wait.
78
78
79
79
from [pdsx/_internal/batch.py](https://github.com/zzstoatzz/pdsx/blob/main/src/pdsx/_internal/batch.py)
80
+
81
+
## connection pools
82
+
83
+
module-level singleton pool, lazily initialized:
84
+
85
+
```python
86
+
_pool: asyncpg.Pool | None = None
87
+
88
+
async def get_pool() -> asyncpg.Pool:
89
+
global _pool
90
+
if _pool is None:
91
+
_pool = await asyncpg.create_pool(db_url, min_size=2, max_size=10)
92
+
return _pool
93
+
94
+
@asynccontextmanager
95
+
async def get_conn() -> AsyncGenerator[asyncpg.Connection, None]:
96
+
pool = await get_pool()
97
+
async with pool.acquire() as conn:
98
+
yield conn
99
+
```
100
+
101
+
callers use `async with get_conn() as conn:` - pool handles connection lifecycle.
102
+
103
+
## batch writes with unnest
104
+
105
+
postgres `unnest()` turns arrays into rows. one round trip for thousands of inserts:
106
+
107
+
```python
108
+
async def batch_upsert_follows(follows: list[tuple[str, str, str]]) -> None:
109
+
follower_ids = [f[0] for f in follows]
110
+
rkeys = [f[1] for f in follows]
111
+
subject_ids = [f[2] for f in follows]
112
+
113
+
async with get_conn() as conn:
114
+
await conn.execute(
115
+
"""
116
+
INSERT INTO follows (follower_id, rkey, subject_id)
117
+
SELECT * FROM unnest($1::bigint[], $2::text[], $3::bigint[])
118
+
ON CONFLICT (follower_id, rkey) DO UPDATE
119
+
SET subject_id = EXCLUDED.subject_id
120
+
""",
121
+
follower_ids, rkeys, subject_ids,
122
+
)
123
+
```
124
+
125
+
from [follower-weight/db.py](https://github.com/zzstoatzz/follower-weight)
+38
-10
protocols/atproto/firehose.md
+38
-10
protocols/atproto/firehose.md
···
92
92
93
93
## batch processing
94
94
95
-
for high-volume consumption, batch writes:
95
+
for high-volume consumption, batch by operation type and flush on size OR time:
96
96
97
97
```python
98
98
BATCH_SIZE = 1000
99
-
buffer = []
99
+
FLUSH_INTERVAL = 2.0
100
100
101
-
async for event in firehose.subscribe():
102
-
buffer.append(event)
103
-
if len(buffer) >= BATCH_SIZE:
104
-
await bulk_insert(buffer)
105
-
await ack_cursor(event.seq) # ack AFTER persistence
106
-
buffer = []
101
+
follow_batch: list[tuple[str, str, str]] = [] # (follower_did, rkey, subject_did)
102
+
unfollow_batch: list[tuple[str, str]] = [] # (follower_did, rkey)
103
+
pending_acks: set[int] = set()
104
+
last_flush = time.monotonic()
105
+
106
+
async def flush(ws):
107
+
if follow_batch:
108
+
await db.batch_upsert_follows(follow_batch)
109
+
follow_batch.clear()
110
+
if unfollow_batch:
111
+
await db.batch_delete_follows(unfollow_batch)
112
+
unfollow_batch.clear()
113
+
# ack AFTER persist
114
+
for ack_id in sorted(pending_acks):
115
+
await ws.send(json.dumps({"type": "ack", "id": ack_id}))
116
+
pending_acks.clear()
117
+
118
+
while True:
119
+
try:
120
+
msg = await asyncio.wait_for(ws.recv(), timeout=0.1)
121
+
except asyncio.TimeoutError:
122
+
# time-based flush for low-volume periods
123
+
if time.monotonic() - last_flush > FLUSH_INTERVAL:
124
+
await flush(ws)
125
+
last_flush = time.monotonic()
126
+
continue
127
+
128
+
event = json.loads(msg)
129
+
# ... parse and append to appropriate batch ...
130
+
pending_acks.add(event["id"])
131
+
132
+
if len(follow_batch) >= BATCH_SIZE:
133
+
await flush(ws)
134
+
last_flush = time.monotonic()
107
135
```
108
136
109
-
critical: acknowledge cursor only after successful persistence. if you crash, you'll replay from the last ack.
137
+
critical: ack cursor only after successful persistence. if you crash, you replay from last ack.
110
138
111
-
from [follower-weight](https://github.com/zzstoatzz/follower-weight) - batches 1000 events, acks after postgres commit.
139
+
from [follower-weight/tap_consumer.py](https://github.com/zzstoatzz/follower-weight)
112
140
113
141
## cursor management
114
142