+9
-6
indexer.py
+9
-6
indexer.py
···
13
13
14
14
from config import CONFIG
15
15
from metrics import prom_metrics
16
-
from models import AtKafkaEvent, Follow, FollowRecord, Unfollow
16
+
from models import Follow, FollowRecord, TapEvent, Unfollow
17
17
18
18
logging.basicConfig(
19
19
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
···
259
259
kind = "unk"
260
260
261
261
try:
262
-
evt = AtKafkaEvent.model_validate(message.value)
262
+
evt = TapEvent.model_validate(message.value)
263
263
264
-
if not evt.operation or evt.operation.collection != "app.bsky.graph.follow":
264
+
if not evt.record or evt.record.collection != "app.bsky.graph.follow":
265
265
kind = "skipped"
266
266
status = "ok"
267
267
return
268
268
269
-
op = evt.operation
269
+
op = evt.record
270
+
uri = f"at://{op.did}/{op.collection}/{op.rkey}"
270
271
271
272
if op.action == "update":
272
273
kind = "update"
···
275
276
rec = FollowRecord.model_validate(op.record)
276
277
created_at = isoparse(rec.created_at)
277
278
follow = Follow(
278
-
uri=op.uri, did=evt.did, subject=rec.subject, created_at=created_at
279
+
uri=uri, did=op.did, subject=rec.subject, created_at=created_at
279
280
)
280
281
self.indexer.insert_follow(follow)
281
282
else:
282
283
kind = "delete"
283
-
unfollow = Unfollow(uri=op.uri, created_at=datetime.now())
284
+
285
+
unfollow = Unfollow(uri=uri, created_at=datetime.now())
286
+
284
287
self.indexer.insert_unfollow(unfollow)
285
288
286
289
status = "ok"
+122
-69
models.py
+122
-69
models.py
···
1
+
import base64
2
+
1
3
from datetime import datetime
2
-
from typing import Any, Dict, Optional, List
4
+
import logging
5
+
from typing import Any, Dict, Optional
3
6
4
-
from pydantic import BaseModel, Field
7
+
from pydantic import BaseModel, Field, field_validator
5
8
9
+
logger = logging.getLogger(__name__)
6
10
7
-
class AtKafkaOp(BaseModel):
8
-
action: str
11
+
12
+
class RecordEvent(BaseModel):
13
+
"""
14
+
A model for record events that come from Tap, in Kafka mode
15
+
"""
16
+
17
+
live: Optional[bool] = False
18
+
did: str
19
+
rev: str
9
20
collection: str
10
21
rkey: str
11
-
uri: str
12
-
cid: str
13
-
path: str
14
-
record: Optional[Dict[str, Any]]
22
+
action: str
23
+
record: Optional[Dict[str, Any]] = None
24
+
cid: Optional[str] = None
15
25
16
26
17
-
class AtKafkaIdentity(BaseModel):
18
-
seq: int
19
-
handle: str
27
+
class IdentityEvent(BaseModel):
28
+
"""
29
+
A model for identity events taht come from Tap, in Kafka mode
30
+
"""
20
31
32
+
live: Optional[bool] = False
33
+
handle: Optional[str]
34
+
is_active: bool
35
+
status: str
21
36
22
-
class AtKafkaInfo(BaseModel):
23
-
name: str
24
-
message: Optional[str] = None
25
37
38
+
class TapEvent(BaseModel):
39
+
"""
40
+
The base model for events that come from Tap, in Kafka mode
41
+
"""
26
42
27
-
class AtKafkaAccount(BaseModel):
28
-
active: bool
29
-
seq: int
30
-
status: Optional[str] = None
43
+
id: int
44
+
type: str
45
+
record: Optional[RecordEvent] = None
46
+
identity: Optional[IdentityEvent] = None
31
47
48
+
@field_validator("record", "identity", mode="before")
49
+
@classmethod
50
+
def decode_base64(cls, v: Any):
51
+
if v is not None and isinstance(v, str):
52
+
try:
53
+
return base64.b64decode(v).decode("utf-8")
54
+
except Exception as e:
55
+
logger.error(f"Error decoding event base64: {e}")
56
+
return v
57
+
return v
32
58
33
-
class DIDDocument(BaseModel):
34
-
context: Optional[List[Any]] = Field(None, alias="@context")
35
-
id: Optional[str] = None
36
-
also_known_as: Optional[List[Any]] = Field(None, alias="alsoKnownAs")
37
-
verification_method: Optional[List[Any]] = Field(None, alias="verificationMethod")
38
-
service: Optional[List[Any]] = None
39
59
40
-
class Config:
41
-
populate_by_name = True
42
-
43
-
44
-
class ProfileViewDetailed(BaseModel):
45
-
did: str
46
-
handle: str
47
-
display_name: Optional[str] = Field(None, alias="displayName")
48
-
description: Optional[str] = None
49
-
avatar: Optional[str] = None
50
-
banner: Optional[str] = None
51
-
followers_count: Optional[int] = Field(None, alias="followersCount")
52
-
follows_count: Optional[int] = Field(None, alias="followsCount")
53
-
posts_count: Optional[int] = Field(None, alias="postsCount")
54
-
indexed_at: Optional[str] = Field(None, alias="indexedAt")
55
-
viewer: Optional[Dict[str, Any]] = None
56
-
labels: Optional[List[Any]] = None
57
-
58
-
class Config:
59
-
populate_by_name = True
60
-
61
-
62
-
class EventMetadata(BaseModel):
63
-
did_document: Optional[DIDDocument] = Field(None, alias="didDocument")
64
-
pds_host: Optional[str] = Field(None, alias="pdsHost")
65
-
handle: Optional[str] = None
66
-
did_created_at: Optional[str] = Field(None, alias="didCreatedAt")
67
-
account_age: Optional[int] = Field(None, alias="accountAge")
68
-
profile: Optional[ProfileViewDetailed] = None
69
-
70
-
class Config:
71
-
populate_by_name = True
72
-
73
-
74
-
class AtKafkaEvent(BaseModel):
75
-
did: str
76
-
timestamp: str
77
-
metadata: Optional[EventMetadata] = Field(None, alias="eventMetadata")
78
-
operation: Optional[AtKafkaOp] = None
79
-
account: Optional[AtKafkaAccount] = None
80
-
identity: Optional[AtKafkaIdentity] = None
81
-
info: Optional[AtKafkaInfo] = None
82
-
83
-
class Config:
84
-
populate_by_name = True
60
+
# class AtKafkaOp(BaseModel):
61
+
# action: str
62
+
# collection: str
63
+
# rkey: str
64
+
# uri: str
65
+
# cid: str
66
+
# path: str
67
+
# record: Optional[Dict[str, Any]]
68
+
#
69
+
#
70
+
# class AtKafkaIdentity(BaseModel):
71
+
# seq: int
72
+
# handle: str
73
+
#
74
+
#
75
+
# class AtKafkaInfo(BaseModel):
76
+
# name: str
77
+
# message: Optional[str] = None
78
+
#
79
+
#
80
+
# class AtKafkaAccount(BaseModel):
81
+
# active: bool
82
+
# seq: int
83
+
# status: Optional[str] = None
84
+
#
85
+
#
86
+
# class DIDDocument(BaseModel):
87
+
# context: Optional[List[Any]] = Field(None, alias="@context")
88
+
# id: Optional[str] = None
89
+
# also_known_as: Optional[List[Any]] = Field(None, alias="alsoKnownAs")
90
+
# verification_method: Optional[List[Any]] = Field(None, alias="verificationMethod")
91
+
# service: Optional[List[Any]] = None
92
+
#
93
+
# class Config:
94
+
# populate_by_name = True
95
+
#
96
+
#
97
+
# class ProfileViewDetailed(BaseModel):
98
+
# did: str
99
+
# handle: str
100
+
# display_name: Optional[str] = Field(None, alias="displayName")
101
+
# description: Optional[str] = None
102
+
# avatar: Optional[str] = None
103
+
# banner: Optional[str] = None
104
+
# followers_count: Optional[int] = Field(None, alias="followersCount")
105
+
# follows_count: Optional[int] = Field(None, alias="followsCount")
106
+
# posts_count: Optional[int] = Field(None, alias="postsCount")
107
+
# indexed_at: Optional[str] = Field(None, alias="indexedAt")
108
+
# viewer: Optional[Dict[str, Any]] = None
109
+
# labels: Optional[List[Any]] = None
110
+
#
111
+
# class Config:
112
+
# populate_by_name = True
113
+
#
114
+
#
115
+
# class EventMetadata(BaseModel):
116
+
# did_document: Optional[DIDDocument] = Field(None, alias="didDocument")
117
+
# pds_host: Optional[str] = Field(None, alias="pdsHost")
118
+
# handle: Optional[str] = None
119
+
# did_created_at: Optional[str] = Field(None, alias="didCreatedAt")
120
+
# account_age: Optional[int] = Field(None, alias="accountAge")
121
+
# profile: Optional[ProfileViewDetailed] = None
122
+
#
123
+
# class Config:
124
+
# populate_by_name = True
125
+
#
126
+
#
127
+
# class AtKafkaEvent(BaseModel):
128
+
# did: str
129
+
# timestamp: str
130
+
# metadata: Optional[EventMetadata] = Field(None, alias="eventMetadata")
131
+
# operation: Optional[AtKafkaOp] = None
132
+
# account: Optional[AtKafkaAccount] = None
133
+
# identity: Optional[AtKafkaIdentity] = None
134
+
# info: Optional[AtKafkaInfo] = None
135
+
#
136
+
# class Config:
137
+
# populate_by_name = True
85
138
86
139
87
140
class FollowRecord(BaseModel):