this repo has no description
1import base64
2
3from datetime import datetime
4import logging
5from typing import Any, Dict, Optional
6
7from pydantic import BaseModel, Field, field_validator
8
9logger = logging.getLogger(__name__)
10
11
12class RecordEvent(BaseModel):
13 """
14 A model for record events that come from Tap, in Kafka mode
15 """
16
17 live: Optional[bool] = False
18 did: str
19 rev: str
20 collection: str
21 rkey: str
22 action: str
23 record: Optional[Dict[str, Any]] = None
24 cid: Optional[str] = None
25
26
27class IdentityEvent(BaseModel):
28 """
29 A model for identity events taht come from Tap, in Kafka mode
30 """
31
32 live: Optional[bool] = False
33 handle: Optional[str]
34 is_active: bool
35 status: str
36
37
38class TapEvent(BaseModel):
39 """
40 The base model for events that come from Tap, in Kafka mode
41 """
42
43 id: int
44 type: str
45 record: Optional[RecordEvent] = None
46 identity: Optional[IdentityEvent] = None
47
48 @field_validator("record", "identity", mode="before")
49 @classmethod
50 def decode_base64(cls, v: Any):
51 if v is not None and isinstance(v, str):
52 try:
53 return base64.b64decode(v).decode("utf-8")
54 except Exception as e:
55 logger.error(f"Error decoding event base64: {e}")
56 return v
57 return v
58
59
60# class AtKafkaOp(BaseModel):
61# action: str
62# collection: str
63# rkey: str
64# uri: str
65# cid: str
66# path: str
67# record: Optional[Dict[str, Any]]
68#
69#
70# class AtKafkaIdentity(BaseModel):
71# seq: int
72# handle: str
73#
74#
75# class AtKafkaInfo(BaseModel):
76# name: str
77# message: Optional[str] = None
78#
79#
80# class AtKafkaAccount(BaseModel):
81# active: bool
82# seq: int
83# status: Optional[str] = None
84#
85#
86# class DIDDocument(BaseModel):
87# context: Optional[List[Any]] = Field(None, alias="@context")
88# id: Optional[str] = None
89# also_known_as: Optional[List[Any]] = Field(None, alias="alsoKnownAs")
90# verification_method: Optional[List[Any]] = Field(None, alias="verificationMethod")
91# service: Optional[List[Any]] = None
92#
93# class Config:
94# populate_by_name = True
95#
96#
97# class ProfileViewDetailed(BaseModel):
98# did: str
99# handle: str
100# display_name: Optional[str] = Field(None, alias="displayName")
101# description: Optional[str] = None
102# avatar: Optional[str] = None
103# banner: Optional[str] = None
104# followers_count: Optional[int] = Field(None, alias="followersCount")
105# follows_count: Optional[int] = Field(None, alias="followsCount")
106# posts_count: Optional[int] = Field(None, alias="postsCount")
107# indexed_at: Optional[str] = Field(None, alias="indexedAt")
108# viewer: Optional[Dict[str, Any]] = None
109# labels: Optional[List[Any]] = None
110#
111# class Config:
112# populate_by_name = True
113#
114#
115# class EventMetadata(BaseModel):
116# did_document: Optional[DIDDocument] = Field(None, alias="didDocument")
117# pds_host: Optional[str] = Field(None, alias="pdsHost")
118# handle: Optional[str] = None
119# did_created_at: Optional[str] = Field(None, alias="didCreatedAt")
120# account_age: Optional[int] = Field(None, alias="accountAge")
121# profile: Optional[ProfileViewDetailed] = None
122#
123# class Config:
124# populate_by_name = True
125#
126#
127# class AtKafkaEvent(BaseModel):
128# did: str
129# timestamp: str
130# metadata: Optional[EventMetadata] = Field(None, alias="eventMetadata")
131# operation: Optional[AtKafkaOp] = None
132# account: Optional[AtKafkaAccount] = None
133# identity: Optional[AtKafkaIdentity] = None
134# info: Optional[AtKafkaInfo] = None
135#
136# class Config:
137# populate_by_name = True
138
139
140class FollowRecord(BaseModel):
141 created_at: str = Field(..., alias="createdAt")
142 subject: str
143
144
145class Follow(BaseModel):
146 uri: str = Field(..., description="AT-uri for this follow relationship")
147 did: str = Field(..., description="DID for the user creating the follow")
148 subject: str = Field(..., description="DID for the subject being followed")
149 created_at: datetime = Field(..., description="When the follow was made")
150
151
152class Unfollow(BaseModel):
153 uri: str = Field(..., description="AT-uri for the deleted follow relationship")
154 created_at: datetime = Field(..., description="When the follow was made")