+9
-6
indexer.py
+9
-6
indexer.py
···
13
13
14
14
from config import CONFIG
15
15
from metrics import prom_metrics
16
-
from models import AtKafkaEvent, Follow, FollowRecord, Unfollow
16
+
from models import Follow, FollowRecord, TapEvent, Unfollow
17
17
18
18
logging.basicConfig(
19
19
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
···
259
259
kind = "unk"
260
260
261
261
try:
262
-
evt = AtKafkaEvent.model_validate(message.value)
262
+
evt = TapEvent.model_validate(message.value)
263
263
264
-
if not evt.operation or evt.operation.collection != "app.bsky.graph.follow":
264
+
if not evt.record or evt.record.collection != "app.bsky.graph.follow":
265
265
kind = "skipped"
266
266
status = "ok"
267
267
return
268
268
269
-
op = evt.operation
269
+
op = evt.record
270
+
uri = f"at://{op.did}/{op.collection}/{op.rkey}"
270
271
271
272
if op.action == "update":
272
273
kind = "update"
···
275
276
rec = FollowRecord.model_validate(op.record)
276
277
created_at = isoparse(rec.created_at)
277
278
follow = Follow(
278
-
uri=op.uri, did=evt.did, subject=rec.subject, created_at=created_at
279
+
uri=uri, did=op.did, subject=rec.subject, created_at=created_at
279
280
)
280
281
self.indexer.insert_follow(follow)
281
282
else:
282
283
kind = "delete"
283
-
unfollow = Unfollow(uri=op.uri, created_at=datetime.now())
284
+
285
+
unfollow = Unfollow(uri=uri, created_at=datetime.now())
286
+
284
287
self.indexer.insert_unfollow(unfollow)
285
288
286
289
status = "ok"
+105
-68
models.py
+105
-68
models.py
···
1
1
from datetime import datetime
2
-
from typing import Any, Dict, Optional, List
2
+
from typing import Any, Dict, Optional
3
3
4
4
from pydantic import BaseModel, Field
5
5
6
6
7
-
class AtKafkaOp(BaseModel):
8
-
action: str
7
+
class RecordEvent(BaseModel):
8
+
"""
9
+
A model for record events that come from Tap, in Kafka mode
10
+
"""
11
+
12
+
live: bool
13
+
did: str
14
+
rev: str
9
15
collection: str
10
16
rkey: str
11
-
uri: str
12
-
cid: str
13
-
path: str
17
+
action: str
14
18
record: Optional[Dict[str, Any]]
19
+
cid: Optional[str]
15
20
16
21
17
-
class AtKafkaIdentity(BaseModel):
18
-
seq: int
19
-
handle: str
22
+
class IdentityEvent(BaseModel):
23
+
"""
24
+
A model for identity events taht come from Tap, in Kafka mode
25
+
"""
20
26
27
+
live: bool
28
+
handle: Optional[str]
29
+
is_active: bool
30
+
status: str
21
31
22
-
class AtKafkaInfo(BaseModel):
23
-
name: str
24
-
message: Optional[str] = None
25
32
33
+
class TapEvent(BaseModel):
34
+
"""
35
+
The base model for events that come from Tap, in Kafka mode
36
+
"""
26
37
27
-
class AtKafkaAccount(BaseModel):
28
-
active: bool
29
-
seq: int
30
-
status: Optional[str] = None
38
+
id: int
39
+
type: str
40
+
record: RecordEvent
41
+
identity: IdentityEvent
31
42
32
43
33
-
class DIDDocument(BaseModel):
34
-
context: Optional[List[Any]] = Field(None, alias="@context")
35
-
id: Optional[str] = None
36
-
also_known_as: Optional[List[Any]] = Field(None, alias="alsoKnownAs")
37
-
verification_method: Optional[List[Any]] = Field(None, alias="verificationMethod")
38
-
service: Optional[List[Any]] = None
39
-
40
-
class Config:
41
-
populate_by_name = True
42
-
43
-
44
-
class ProfileViewDetailed(BaseModel):
45
-
did: str
46
-
handle: str
47
-
display_name: Optional[str] = Field(None, alias="displayName")
48
-
description: Optional[str] = None
49
-
avatar: Optional[str] = None
50
-
banner: Optional[str] = None
51
-
followers_count: Optional[int] = Field(None, alias="followersCount")
52
-
follows_count: Optional[int] = Field(None, alias="followsCount")
53
-
posts_count: Optional[int] = Field(None, alias="postsCount")
54
-
indexed_at: Optional[str] = Field(None, alias="indexedAt")
55
-
viewer: Optional[Dict[str, Any]] = None
56
-
labels: Optional[List[Any]] = None
57
-
58
-
class Config:
59
-
populate_by_name = True
60
-
61
-
62
-
class EventMetadata(BaseModel):
63
-
did_document: Optional[DIDDocument] = Field(None, alias="didDocument")
64
-
pds_host: Optional[str] = Field(None, alias="pdsHost")
65
-
handle: Optional[str] = None
66
-
did_created_at: Optional[str] = Field(None, alias="didCreatedAt")
67
-
account_age: Optional[int] = Field(None, alias="accountAge")
68
-
profile: Optional[ProfileViewDetailed] = None
69
-
70
-
class Config:
71
-
populate_by_name = True
72
-
73
-
74
-
class AtKafkaEvent(BaseModel):
75
-
did: str
76
-
timestamp: str
77
-
metadata: Optional[EventMetadata] = Field(None, alias="eventMetadata")
78
-
operation: Optional[AtKafkaOp] = None
79
-
account: Optional[AtKafkaAccount] = None
80
-
identity: Optional[AtKafkaIdentity] = None
81
-
info: Optional[AtKafkaInfo] = None
82
-
83
-
class Config:
84
-
populate_by_name = True
44
+
# class AtKafkaOp(BaseModel):
45
+
# action: str
46
+
# collection: str
47
+
# rkey: str
48
+
# uri: str
49
+
# cid: str
50
+
# path: str
51
+
# record: Optional[Dict[str, Any]]
52
+
#
53
+
#
54
+
# class AtKafkaIdentity(BaseModel):
55
+
# seq: int
56
+
# handle: str
57
+
#
58
+
#
59
+
# class AtKafkaInfo(BaseModel):
60
+
# name: str
61
+
# message: Optional[str] = None
62
+
#
63
+
#
64
+
# class AtKafkaAccount(BaseModel):
65
+
# active: bool
66
+
# seq: int
67
+
# status: Optional[str] = None
68
+
#
69
+
#
70
+
# class DIDDocument(BaseModel):
71
+
# context: Optional[List[Any]] = Field(None, alias="@context")
72
+
# id: Optional[str] = None
73
+
# also_known_as: Optional[List[Any]] = Field(None, alias="alsoKnownAs")
74
+
# verification_method: Optional[List[Any]] = Field(None, alias="verificationMethod")
75
+
# service: Optional[List[Any]] = None
76
+
#
77
+
# class Config:
78
+
# populate_by_name = True
79
+
#
80
+
#
81
+
# class ProfileViewDetailed(BaseModel):
82
+
# did: str
83
+
# handle: str
84
+
# display_name: Optional[str] = Field(None, alias="displayName")
85
+
# description: Optional[str] = None
86
+
# avatar: Optional[str] = None
87
+
# banner: Optional[str] = None
88
+
# followers_count: Optional[int] = Field(None, alias="followersCount")
89
+
# follows_count: Optional[int] = Field(None, alias="followsCount")
90
+
# posts_count: Optional[int] = Field(None, alias="postsCount")
91
+
# indexed_at: Optional[str] = Field(None, alias="indexedAt")
92
+
# viewer: Optional[Dict[str, Any]] = None
93
+
# labels: Optional[List[Any]] = None
94
+
#
95
+
# class Config:
96
+
# populate_by_name = True
97
+
#
98
+
#
99
+
# class EventMetadata(BaseModel):
100
+
# did_document: Optional[DIDDocument] = Field(None, alias="didDocument")
101
+
# pds_host: Optional[str] = Field(None, alias="pdsHost")
102
+
# handle: Optional[str] = None
103
+
# did_created_at: Optional[str] = Field(None, alias="didCreatedAt")
104
+
# account_age: Optional[int] = Field(None, alias="accountAge")
105
+
# profile: Optional[ProfileViewDetailed] = None
106
+
#
107
+
# class Config:
108
+
# populate_by_name = True
109
+
#
110
+
#
111
+
# class AtKafkaEvent(BaseModel):
112
+
# did: str
113
+
# timestamp: str
114
+
# metadata: Optional[EventMetadata] = Field(None, alias="eventMetadata")
115
+
# operation: Optional[AtKafkaOp] = None
116
+
# account: Optional[AtKafkaAccount] = None
117
+
# identity: Optional[AtKafkaIdentity] = None
118
+
# info: Optional[AtKafkaInfo] = None
119
+
#
120
+
# class Config:
121
+
# populate_by_name = True
85
122
86
123
87
124
class FollowRecord(BaseModel):