import base64 from datetime import datetime import logging from typing import Any, Dict, Optional from pydantic import BaseModel, Field, field_validator logger = logging.getLogger(__name__) class RecordEvent(BaseModel): """ A model for record events that come from Tap, in Kafka mode """ live: Optional[bool] = False did: str rev: str collection: str rkey: str action: str record: Optional[Dict[str, Any]] = None cid: Optional[str] = None class IdentityEvent(BaseModel): """ A model for identity events taht come from Tap, in Kafka mode """ live: Optional[bool] = False handle: Optional[str] is_active: bool status: str class TapEvent(BaseModel): """ The base model for events that come from Tap, in Kafka mode """ id: int type: str record: Optional[RecordEvent] = None identity: Optional[IdentityEvent] = None @field_validator("record", "identity", mode="before") @classmethod def decode_base64(cls, v: Any): if v is not None and isinstance(v, str): try: return base64.b64decode(v).decode("utf-8") except Exception as e: logger.error(f"Error decoding event base64: {e}") return v return v # class AtKafkaOp(BaseModel): # action: str # collection: str # rkey: str # uri: str # cid: str # path: str # record: Optional[Dict[str, Any]] # # # class AtKafkaIdentity(BaseModel): # seq: int # handle: str # # # class AtKafkaInfo(BaseModel): # name: str # message: Optional[str] = None # # # class AtKafkaAccount(BaseModel): # active: bool # seq: int # status: Optional[str] = None # # # class DIDDocument(BaseModel): # context: Optional[List[Any]] = Field(None, alias="@context") # id: Optional[str] = None # also_known_as: Optional[List[Any]] = Field(None, alias="alsoKnownAs") # verification_method: Optional[List[Any]] = Field(None, alias="verificationMethod") # service: Optional[List[Any]] = None # # class Config: # populate_by_name = True # # # class ProfileViewDetailed(BaseModel): # did: str # handle: str # display_name: Optional[str] = Field(None, alias="displayName") # description: Optional[str] = None # avatar: Optional[str] = None # banner: Optional[str] = None # followers_count: Optional[int] = Field(None, alias="followersCount") # follows_count: Optional[int] = Field(None, alias="followsCount") # posts_count: Optional[int] = Field(None, alias="postsCount") # indexed_at: Optional[str] = Field(None, alias="indexedAt") # viewer: Optional[Dict[str, Any]] = None # labels: Optional[List[Any]] = None # # class Config: # populate_by_name = True # # # class EventMetadata(BaseModel): # did_document: Optional[DIDDocument] = Field(None, alias="didDocument") # pds_host: Optional[str] = Field(None, alias="pdsHost") # handle: Optional[str] = None # did_created_at: Optional[str] = Field(None, alias="didCreatedAt") # account_age: Optional[int] = Field(None, alias="accountAge") # profile: Optional[ProfileViewDetailed] = None # # class Config: # populate_by_name = True # # # class AtKafkaEvent(BaseModel): # did: str # timestamp: str # metadata: Optional[EventMetadata] = Field(None, alias="eventMetadata") # operation: Optional[AtKafkaOp] = None # account: Optional[AtKafkaAccount] = None # identity: Optional[AtKafkaIdentity] = None # info: Optional[AtKafkaInfo] = None # # class Config: # populate_by_name = True class FollowRecord(BaseModel): created_at: str = Field(..., alias="createdAt") subject: str class Follow(BaseModel): uri: str = Field(..., description="AT-uri for this follow relationship") did: str = Field(..., description="DID for the user creating the follow") subject: str = Field(..., description="DID for the subject being followed") created_at: datetime = Field(..., description="When the follow was made") class Unfollow(BaseModel): uri: str = Field(..., description="AT-uri for the deleted follow relationship") created_at: datetime = Field(..., description="When the follow was made")