this repo has no description

field validator for base64

Changed files
+22 -6
+22 -6
models.py
··· 1 + import base64 2 + 1 3 from datetime import datetime 4 + import logging 2 5 from typing import Any, Dict, Optional 3 6 4 - from pydantic import BaseModel, Field 7 + from pydantic import BaseModel, Field, field_validator 8 + 9 + logger = logging.getLogger(__name__) 5 10 6 11 7 12 class RecordEvent(BaseModel): ··· 9 14 A model for record events that come from Tap, in Kafka mode 10 15 """ 11 16 12 - live: bool 17 + live: Optional[bool] = False 13 18 did: str 14 19 rev: str 15 20 collection: str 16 21 rkey: str 17 22 action: str 18 - record: Optional[Dict[str, Any]] 19 - cid: Optional[str] 23 + record: Optional[Dict[str, Any]] = None 24 + cid: Optional[str] = None 20 25 21 26 22 27 class IdentityEvent(BaseModel): ··· 37 42 38 43 id: int 39 44 type: str 40 - record: RecordEvent 41 - identity: IdentityEvent 45 + record: Optional[RecordEvent] = None 46 + identity: Optional[IdentityEvent] = None 47 + 48 + @field_validator("record", "identity", mode="before") 49 + @classmethod 50 + def decode_base64(cls, v: Any): 51 + if v is not None and isinstance(v, str): 52 + try: 53 + return base64.b64decode(v).decode("utf-8") 54 + except Exception as e: 55 + logger.error(f"Error decoding event base64: {e}") 56 + return v 57 + return v 42 58 43 59 44 60 # class AtKafkaOp(BaseModel):