this repo has no description
at main 4.3 kB view raw
1import base64 2 3from datetime import datetime 4import logging 5from typing import Any, Dict, Optional 6 7from pydantic import BaseModel, Field, field_validator 8 9logger = logging.getLogger(__name__) 10 11 12class RecordEvent(BaseModel): 13 """ 14 A model for record events that come from Tap, in Kafka mode 15 """ 16 17 live: Optional[bool] = False 18 did: str 19 rev: str 20 collection: str 21 rkey: str 22 action: str 23 record: Optional[Dict[str, Any]] = None 24 cid: Optional[str] = None 25 26 27class IdentityEvent(BaseModel): 28 """ 29 A model for identity events taht come from Tap, in Kafka mode 30 """ 31 32 live: Optional[bool] = False 33 handle: Optional[str] 34 is_active: bool 35 status: str 36 37 38class TapEvent(BaseModel): 39 """ 40 The base model for events that come from Tap, in Kafka mode 41 """ 42 43 id: int 44 type: str 45 record: Optional[RecordEvent] = None 46 identity: Optional[IdentityEvent] = None 47 48 @field_validator("record", "identity", mode="before") 49 @classmethod 50 def decode_base64(cls, v: Any): 51 if v is not None and isinstance(v, str): 52 try: 53 return base64.b64decode(v).decode("utf-8") 54 except Exception as e: 55 logger.error(f"Error decoding event base64: {e}") 56 return v 57 return v 58 59 60# class AtKafkaOp(BaseModel): 61# action: str 62# collection: str 63# rkey: str 64# uri: str 65# cid: str 66# path: str 67# record: Optional[Dict[str, Any]] 68# 69# 70# class AtKafkaIdentity(BaseModel): 71# seq: int 72# handle: str 73# 74# 75# class AtKafkaInfo(BaseModel): 76# name: str 77# message: Optional[str] = None 78# 79# 80# class AtKafkaAccount(BaseModel): 81# active: bool 82# seq: int 83# status: Optional[str] = None 84# 85# 86# class DIDDocument(BaseModel): 87# context: Optional[List[Any]] = Field(None, alias="@context") 88# id: Optional[str] = None 89# also_known_as: Optional[List[Any]] = Field(None, alias="alsoKnownAs") 90# verification_method: Optional[List[Any]] = Field(None, alias="verificationMethod") 91# service: Optional[List[Any]] = None 92# 93# class Config: 94# populate_by_name = True 95# 96# 97# class ProfileViewDetailed(BaseModel): 98# did: str 99# handle: str 100# display_name: Optional[str] = Field(None, alias="displayName") 101# description: Optional[str] = None 102# avatar: Optional[str] = None 103# banner: Optional[str] = None 104# followers_count: Optional[int] = Field(None, alias="followersCount") 105# follows_count: Optional[int] = Field(None, alias="followsCount") 106# posts_count: Optional[int] = Field(None, alias="postsCount") 107# indexed_at: Optional[str] = Field(None, alias="indexedAt") 108# viewer: Optional[Dict[str, Any]] = None 109# labels: Optional[List[Any]] = None 110# 111# class Config: 112# populate_by_name = True 113# 114# 115# class EventMetadata(BaseModel): 116# did_document: Optional[DIDDocument] = Field(None, alias="didDocument") 117# pds_host: Optional[str] = Field(None, alias="pdsHost") 118# handle: Optional[str] = None 119# did_created_at: Optional[str] = Field(None, alias="didCreatedAt") 120# account_age: Optional[int] = Field(None, alias="accountAge") 121# profile: Optional[ProfileViewDetailed] = None 122# 123# class Config: 124# populate_by_name = True 125# 126# 127# class AtKafkaEvent(BaseModel): 128# did: str 129# timestamp: str 130# metadata: Optional[EventMetadata] = Field(None, alias="eventMetadata") 131# operation: Optional[AtKafkaOp] = None 132# account: Optional[AtKafkaAccount] = None 133# identity: Optional[AtKafkaIdentity] = None 134# info: Optional[AtKafkaInfo] = None 135# 136# class Config: 137# populate_by_name = True 138 139 140class FollowRecord(BaseModel): 141 created_at: str = Field(..., alias="createdAt") 142 subject: str 143 144 145class Follow(BaseModel): 146 uri: str = Field(..., description="AT-uri for this follow relationship") 147 did: str = Field(..., description="DID for the user creating the follow") 148 subject: str = Field(..., description="DID for the subject being followed") 149 created_at: datetime = Field(..., description="When the follow was made") 150 151 152class Unfollow(BaseModel): 153 uri: str = Field(..., description="AT-uri for the deleted follow relationship") 154 created_at: datetime = Field(..., description="When the follow was made")