-7
Dockerfile
-7
Dockerfile
+4
-4
config.py
+4
-4
config.py
···
4
4
5
5
6
6
class Config(BaseSettings):
7
-
ch_host: str = "localhost"
8
-
ch_port: int = 8123
9
-
ch_user: str = "default"
10
-
ch_pass: str = "clickhouse"
7
+
clickhouse_host: str = "localhost"
8
+
clickhouse_port: int = 8123
9
+
clickhouse_user: str = "default"
10
+
clickhouse_pass: str = "clickhouse"
11
11
12
12
batch_size: int = 1000
13
13
+9
-8
docker-compose.yaml
+9
-8
docker-compose.yaml
···
30
30
hostname: zookeeper
31
31
container_name: zookeeper
32
32
ports:
33
-
- "2181:2181"
33
+
- "127.0.0.1:2181:2181"
34
34
environment:
35
35
ZOOKEEPER_CLIENT_PORT: 2181
36
36
ZOOKEEPER_TICK_TIME: 2000
···
45
45
depends_on:
46
46
- zookeeper
47
47
ports:
48
-
- "9092:9092"
49
-
- "9101:9101"
48
+
- "127.0.0.1:9092:9092"
49
+
- "127.0.0.1:9101:9101"
50
50
environment:
51
51
KAFKA_BROKER_ID: 1
52
52
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
···
78
78
kafka:
79
79
condition: service_healthy
80
80
ports:
81
-
- "2480:2480"
82
-
- "6010:6010"
81
+
- "127.0.0.1:2480:2480"
82
+
- "127.0.0.1:6010:6010"
83
83
environment:
84
84
TAP_BIND: ":2480"
85
85
TAP_FULL_NETWORK: true
86
86
TAP_DISABLE_ACKS: false
87
87
TAP_COLLECTION_FILTERS: "app.bsky.graph.follow"
88
88
TAP_METRICS_LISTEN: ":6010"
89
+
TAP_RESYNC_PARALLELISM: 50
89
90
volumes:
90
91
- tap-data:/data
91
92
restart: unless-stopped
···
99
100
condition: service_healthy
100
101
ports:
101
102
# metrics port
102
-
- "6011:6009"
103
+
- "127.0.0.1:6011:6009"
103
104
command: ["tap-mode"]
104
105
environment:
105
106
ATKAFKA_TAP_HOST: "ws://tap:2480"
···
111
112
indexer:
112
113
build: .
113
114
ports:
114
-
- "6012:6009"
115
+
- "127.0.0.1:8050:8050"
115
116
depends_on:
116
117
clickhouse:
117
118
condition: service_healthy
···
119
120
condition: service_healthy
120
121
command: ["uv", "run", "indexer.py"]
121
122
environment:
122
-
KAFKA_BOOTSTRAP_SERVERS: "localhost:9092"
123
+
KAFKA_BOOTSTRAP_SERVERS: '["kafka:29092"]'
123
124
KAFKA_INPUT_TOPIC: "tap-events"
124
125
CLICKHOUSE_HOST: "clickhouse"
125
126
restart: unless-stopped
+10
-13
indexer.py
+10
-13
indexer.py
···
118
118
column_names=["did", "subject", "uri", "created_at"],
119
119
)
120
120
121
+
self.client.insert(
122
+
"follows_reverse",
123
+
follows_data,
124
+
column_names=["did", "subject", "uri", "created_at"],
125
+
)
126
+
121
127
status = "ok"
122
128
except Exception as e:
123
129
# TODO: handle errors gracefully
···
231
237
232
238
if op.action == "update":
233
239
kind = "update"
234
-
status = "ok"
235
-
return
236
240
elif op.action == "create":
237
241
kind = "create"
238
-
239
242
rec = FollowRecord.model_validate(op.record)
240
243
created_at = isoparse(rec.created_at)
241
-
242
244
follow = Follow(
243
245
uri=op.uri, did=evt.did, subject=rec.subject, created_at=created_at
244
246
)
245
-
246
247
self.indexer.insert_follow(follow)
247
248
else:
248
249
kind = "delete"
249
-
250
-
kind = "delete"
251
-
252
250
unfollow = Unfollow(uri=op.uri, created_at=datetime.now())
253
-
254
251
self.indexer.insert_unfollow(unfollow)
255
252
256
253
status = "ok"
···
343
340
)
344
341
345
342
indexer = FollowIndexer(
346
-
clickhouse_host=ch_host or CONFIG.ch_host,
347
-
clickhouse_port=ch_port or CONFIG.ch_port,
348
-
clickhouse_user=ch_user or CONFIG.ch_user,
349
-
clickhouse_pass=ch_pass or CONFIG.ch_pass,
343
+
clickhouse_host=ch_host or CONFIG.clickhouse_host,
344
+
clickhouse_port=ch_port or CONFIG.clickhouse_port,
345
+
clickhouse_user=ch_user or CONFIG.clickhouse_user,
346
+
clickhouse_pass=ch_pass or CONFIG.clickhouse_pass,
350
347
batch_size=batch_size or CONFIG.batch_size,
351
348
)
352
349
indexer.init_schema()