+206
-111
atkafka/atkafka.go
+206
-111
atkafka/atkafka.go
···
30
30
bootstrapServers []string
31
31
outputTopic string
32
32
ospreyCompat bool
33
-
logger *slog.Logger
34
33
35
-
producer *Producer
34
+
watchedServices []string
35
+
ignoredServices []string
36
36
37
+
watchedCollections []string
38
+
ignoredCollections []string
39
+
40
+
producer *Producer
37
41
plcClient *PlcClient
42
+
logger *slog.Logger
38
43
}
39
44
40
45
type ServerArgs struct {
41
-
RelayHost string
42
-
PlcHost string
46
+
// network params
47
+
RelayHost string
48
+
PlcHost string
49
+
50
+
// for watched and ignoed services or collections, only one list may be supplied
51
+
// for both services and collections, wildcards are acceptable. for example:
52
+
// app.bsky.* will watch/ignore any collection that falls under the app.bsky namespace.
53
+
// *.bsky.network will watch/ignore any event that falls under the bsky.network list of PDSes
54
+
55
+
// list of services that are events will be emitted for
56
+
WatchedServices []string
57
+
// list of services that events are ignored for
58
+
IgnoredServices []string
59
+
60
+
// list of collections that events are emitted for
61
+
WatchedCollections []string
62
+
// list of collections that events are ignored for
63
+
IgnoredCollections []string
64
+
65
+
// kafka params
43
66
BootstrapServers []string
44
67
OutputTopic string
45
-
OspreyCompat bool
46
-
Logger *slog.Logger
68
+
69
+
// osprey-specific params
70
+
OspreyCompat bool
71
+
72
+
// other
73
+
Logger *slog.Logger
47
74
}
48
75
49
-
func NewServer(args *ServerArgs) *Server {
76
+
func NewServer(args *ServerArgs) (*Server, error) {
50
77
if args.Logger == nil {
51
78
args.Logger = slog.Default()
52
79
}
53
80
81
+
if len(args.WatchedServices) > 0 && len(args.IgnoredServices) > 0 {
82
+
return nil, fmt.Errorf("you may only specify a list of watched services _or_ ignored services, not both")
83
+
}
84
+
85
+
if (len(args.WatchedServices) > 0 || len(args.IgnoredServices) > 0) && args.PlcHost == "" {
86
+
return nil, fmt.Errorf("unable to support watched/ignored services without specifying a PLC host")
87
+
}
88
+
89
+
if len(args.WatchedCollections) > 0 && len(args.IgnoredCollections) > 0 {
90
+
return nil, fmt.Errorf("you may only specify a list of watched collections _or_ ignored collections, not both")
91
+
}
92
+
54
93
var plcClient *PlcClient
55
94
if args.PlcHost != "" {
56
95
plcClient = NewPlcClient(&PlcClientArgs{
···
58
97
})
59
98
}
60
99
61
-
return &Server{
100
+
s := &Server{
62
101
relayHost: args.RelayHost,
63
102
plcClient: plcClient,
64
103
bootstrapServers: args.BootstrapServers,
···
66
105
ospreyCompat: args.OspreyCompat,
67
106
logger: args.Logger,
68
107
}
108
+
109
+
if len(args.WatchedServices) > 0 {
110
+
watchedServices := make([]string, 0, len(args.WatchedServices))
111
+
for _, service := range args.WatchedServices {
112
+
watchedServices = append(watchedServices, strings.TrimPrefix(strings.TrimPrefix(service, "*."), "."))
113
+
}
114
+
s.watchedServices = watchedServices
115
+
} else if len(args.IgnoredServices) > 0 {
116
+
ignoredServices := make([]string, 0, len(args.IgnoredServices))
117
+
for _, service := range args.IgnoredServices {
118
+
ignoredServices = append(ignoredServices, strings.TrimPrefix(strings.TrimPrefix(service, "*."), "."))
119
+
}
120
+
s.ignoredServices = ignoredServices
121
+
}
122
+
123
+
if len(args.WatchedCollections) > 0 {
124
+
watchedCollections := make([]string, 0, len(args.WatchedCollections))
125
+
for _, collection := range args.WatchedCollections {
126
+
watchedCollections = append(watchedCollections, strings.TrimSuffix(strings.TrimSuffix(collection, ".*"), "."))
127
+
}
128
+
s.watchedCollections = watchedCollections
129
+
} else if len(args.IgnoredCollections) > 0 {
130
+
ignoredCollections := make([]string, 0, len(args.IgnoredCollections))
131
+
for _, collection := range args.IgnoredCollections {
132
+
ignoredCollections = append(ignoredCollections, strings.TrimSuffix(strings.TrimSuffix(collection, ".*"), "."))
133
+
}
134
+
s.ignoredCollections = ignoredCollections
135
+
}
136
+
137
+
return s, nil
69
138
}
70
139
71
140
func (s *Server) Run(ctx context.Context) error {
···
147
216
return nil
148
217
}
149
218
150
-
type EventMetadata struct {
151
-
DidDocument *identity.DIDDocument `json:"didDocument,omitempty"`
152
-
PdsHost string `json:"pdsHost,omitempty"`
153
-
Handle string `json:"handle,omitempty"`
154
-
DidCreatedAt string `json:"didCreatedAt,omitempty"`
155
-
AccountAge int64 `json:"accountAge"`
156
-
}
157
-
158
-
func (s *Server) FetchEventMetadata(ctx context.Context, did string) (*EventMetadata, error) {
159
-
var didDocument *identity.DIDDocument
219
+
func (s *Server) FetchEventMetadata(ctx context.Context, did string) (*EventMetadata, *identity.Identity, error) {
220
+
var ident *identity.Identity
221
+
var didDocument identity.DIDDocument
160
222
var pdsHost string
161
223
var handle string
162
224
var didCreatedAt string
···
167
229
if s.plcClient != nil {
168
230
wg.Go(func() {
169
231
logger := s.logger.With("component", "didDoc")
170
-
doc, err := s.plcClient.GetDIDDoc(ctx, did)
232
+
var err error
233
+
ident, err = s.plcClient.GetIdentity(ctx, did)
171
234
if err != nil {
172
235
logger.Error("error fetching did doc", "did", did, "err", err)
173
236
return
174
237
}
175
-
didDocument = doc
176
-
177
-
for _, svc := range doc.Service {
178
-
if svc.ID == "#atproto_pds" {
179
-
pdsHost = svc.ServiceEndpoint
180
-
break
181
-
}
182
-
}
183
-
184
-
for _, aka := range doc.AlsoKnownAs {
185
-
if strings.HasPrefix(aka, "at://") {
186
-
handle = strings.TrimPrefix(aka, "at://")
187
-
break
188
-
}
189
-
}
238
+
didDocument = ident.DIDDocument()
239
+
pdsHost = ident.PDSEndpoint()
240
+
handle = ident.Handle.String()
190
241
})
191
242
192
243
wg.Go(func() {
···
217
268
Handle: handle,
218
269
DidCreatedAt: didCreatedAt,
219
270
AccountAge: accountAge,
220
-
}, nil
271
+
}, ident, nil
221
272
}
222
273
223
274
func (s *Server) handleEvent(ctx context.Context, evt *events.XRPCStreamEvent) error {
···
230
281
var collection string
231
282
var actionName string
232
283
284
+
var evtKey string
285
+
var evtsToProduce [][]byte
286
+
233
287
if evt.RepoCommit != nil {
288
+
// key events by DID
289
+
evtKey = evt.RepoCommit.Repo
290
+
234
291
// read the repo
235
292
rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.RepoCommit.Blocks))
236
293
if err != nil {
···
238
295
return nil
239
296
}
240
297
298
+
eventMetadata, ident, err := s.FetchEventMetadata(dispatchCtx, evt.RepoCommit.Repo)
299
+
if err != nil {
300
+
logger.Error("error fetching event metadata", "err", err)
301
+
} else if ident != nil {
302
+
skip := false
303
+
pdsEndpoint := ident.PDSEndpoint()
304
+
u, err := url.Parse(pdsEndpoint)
305
+
if err != nil {
306
+
return fmt.Errorf("failed to parse pds host: %w", err)
307
+
}
308
+
pdsHost := u.Hostname()
309
+
310
+
if pdsHost != "" {
311
+
if len(s.watchedServices) > 0 {
312
+
skip = true
313
+
for _, watchedService := range s.watchedServices {
314
+
if watchedService == pdsHost || strings.HasSuffix(pdsHost, "."+watchedService) {
315
+
skip = false
316
+
break
317
+
}
318
+
}
319
+
} else if len(s.ignoredServices) > 0 {
320
+
for _, ignoredService := range s.ignoredServices {
321
+
if ignoredService == pdsHost || strings.HasSuffix(pdsHost, "."+ignoredService) {
322
+
skip = true
323
+
break
324
+
}
325
+
}
326
+
}
327
+
}
328
+
329
+
if skip {
330
+
logger.Debug("skipping event based on pds host", "pdsHost", pdsHost)
331
+
return nil
332
+
}
333
+
}
334
+
241
335
for _, op := range evt.RepoCommit.Ops {
242
336
kind := repomgr.EventKind(op.Action)
243
337
collection = strings.Split(op.Path, "/")[0]
244
338
rkey := strings.Split(op.Path, "/")[1]
245
339
atUri := fmt.Sprintf("at://%s/%s/%s", evt.RepoCommit.Repo, collection, rkey)
246
340
341
+
skip := false
342
+
if len(s.watchedCollections) > 0 {
343
+
skip = true
344
+
for _, watchedCollection := range s.watchedCollections {
345
+
if watchedCollection == collection || strings.HasPrefix(collection, watchedCollection+".") {
346
+
skip = false
347
+
break
348
+
}
349
+
}
350
+
} else if len(s.ignoredCollections) > 0 {
351
+
for _, ignoredCollection := range s.ignoredCollections {
352
+
if ignoredCollection == collection || strings.HasPrefix(collection, ignoredCollection+".") {
353
+
skip = true
354
+
break
355
+
}
356
+
}
357
+
}
358
+
359
+
if skip {
360
+
logger.Debug("skipping event based on collection", "collection", collection)
361
+
continue
362
+
}
363
+
247
364
kindStr := "create"
248
365
switch kind {
249
366
case repomgr.EvtKindUpdateRecord:
···
298
415
Operation: &atkOp,
299
416
}
300
417
301
-
eventMetadata, err := s.FetchEventMetadata(dispatchCtx, evt.RepoCommit.Repo)
302
-
if err != nil {
303
-
logger.Error("error fetching event metadata", "err", err)
304
-
} else {
418
+
if eventMetadata != nil {
305
419
kafkaEvt.Metadata = eventMetadata
306
420
}
307
421
308
-
var kafkaEvtBytes []byte
422
+
var evtBytes []byte
309
423
if s.ospreyCompat {
310
424
// create the wrapper event for osprey
311
425
ospreyKafkaEvent := OspreyAtKafkaEvent{
···
320
434
SendTime: time.Now().Format(time.RFC3339),
321
435
}
322
436
323
-
kafkaEvtBytes, err = json.Marshal(&ospreyKafkaEvent)
437
+
evtBytes, err = json.Marshal(&ospreyKafkaEvent)
324
438
} else {
325
-
kafkaEvtBytes, err = json.Marshal(&kafkaEvt)
439
+
evtBytes, err = json.Marshal(&kafkaEvt)
326
440
}
327
441
if err != nil {
328
442
return fmt.Errorf("failed to marshal kafka event: %w", err)
329
443
}
330
444
331
-
if err := s.produceAsync(ctx, evt.RepoCommit.Repo, kafkaEvtBytes); err != nil {
332
-
return err
333
-
}
445
+
evtsToProduce = append(evtsToProduce, evtBytes)
334
446
}
335
447
} else {
336
448
defer func() {
···
389
501
}
390
502
391
503
if did != "" {
392
-
eventMetadata, err := s.FetchEventMetadata(dispatchCtx, did)
504
+
// key events by DID
505
+
evtKey = did
506
+
eventMetadata, ident, err := s.FetchEventMetadata(dispatchCtx, did)
393
507
if err != nil {
394
508
logger.Error("error fetching event metadata", "err", err)
395
-
} else {
509
+
} else if ident != nil {
510
+
skip := false
511
+
pdsEndpoint := ident.PDSEndpoint()
512
+
u, err := url.Parse(pdsEndpoint)
513
+
if err != nil {
514
+
return fmt.Errorf("failed to parse pds host: %w", err)
515
+
}
516
+
pdsHost := u.Hostname()
517
+
518
+
if pdsHost != "" {
519
+
if len(s.watchedServices) > 0 {
520
+
skip = true
521
+
for _, watchedService := range s.watchedServices {
522
+
if watchedService == pdsHost || strings.HasSuffix(pdsHost, "."+watchedService) {
523
+
skip = false
524
+
break
525
+
}
526
+
}
527
+
} else if len(s.ignoredServices) > 0 {
528
+
for _, ignoredService := range s.ignoredServices {
529
+
if ignoredService == pdsHost || strings.HasSuffix(pdsHost, "."+ignoredService) {
530
+
skip = true
531
+
break
532
+
}
533
+
}
534
+
}
535
+
}
536
+
537
+
if skip {
538
+
logger.Debug("skipping event based on pds host", "pdsHost", pdsHost)
539
+
return nil
540
+
}
541
+
396
542
kafkaEvt.Metadata = eventMetadata
397
543
}
544
+
} else {
545
+
// key events without a DID by "unknown"
546
+
evtKey = "<unknown>"
398
547
}
399
548
400
549
// create the kafka event bytes
401
-
var kafkaEvtBytes []byte
550
+
var evtBytes []byte
402
551
var err error
403
552
404
553
if s.ospreyCompat {
···
415
564
SendTime: time.Now().Format(time.RFC3339),
416
565
}
417
566
418
-
kafkaEvtBytes, err = json.Marshal(&ospreyKafkaEvent)
567
+
evtBytes, err = json.Marshal(&ospreyKafkaEvent)
419
568
} else {
420
-
kafkaEvtBytes, err = json.Marshal(&kafkaEvt)
569
+
evtBytes, err = json.Marshal(&kafkaEvt)
421
570
}
422
571
if err != nil {
423
572
return fmt.Errorf("failed to marshal kafka event: %w", err)
424
573
}
425
574
426
-
if err := s.produceAsync(ctx, did, kafkaEvtBytes); err != nil {
575
+
evtsToProduce = append(evtsToProduce, evtBytes)
576
+
}
577
+
578
+
for _, evtBytes := range evtsToProduce {
579
+
if err := s.produceAsync(ctx, evtKey, evtBytes); err != nil {
427
580
return err
428
581
}
429
582
}
···
441
594
producedEvents.WithLabelValues(status).Inc()
442
595
}
443
596
444
-
if !s.ospreyCompat {
445
-
if err := s.producer.ProduceAsync(ctx, key, msg, callback); err != nil {
446
-
return fmt.Errorf("failed to produce message: %w", err)
447
-
}
448
-
} else if s.ospreyCompat {
449
-
if err := s.producer.ProduceAsync(ctx, key, msg, callback); err != nil {
450
-
return fmt.Errorf("failed to produce message: %w", err)
451
-
}
597
+
if err := s.producer.ProduceAsync(ctx, key, msg, callback); err != nil {
598
+
return fmt.Errorf("failed to produce message: %w", err)
452
599
}
453
600
454
601
return nil
455
602
}
456
-
457
-
type AtKafkaOp struct {
458
-
Action string `json:"action"`
459
-
Collection string `json:"collection"`
460
-
Rkey string `json:"rkey"`
461
-
Uri string `json:"uri"`
462
-
Cid string `json:"cid"`
463
-
Path string `json:"path"`
464
-
Record map[string]any `json:"record"`
465
-
}
466
-
467
-
type AtKafkaIdentity struct {
468
-
Seq int64 `json:"seq"`
469
-
Handle string `json:"handle"`
470
-
}
471
-
472
-
type AtKafkaInfo struct {
473
-
Name string `json:"name"`
474
-
Message *string `json:"message,omitempty"`
475
-
}
476
-
477
-
type AtKafkaAccount struct {
478
-
Active bool `json:"active"`
479
-
Seq int64 `json:"seq"`
480
-
Status *string `json:"status,omitempty"`
481
-
}
482
-
483
-
type AtKafkaEvent struct {
484
-
Did string `json:"did"`
485
-
Timestamp string `json:"timestamp"`
486
-
Metadata *EventMetadata `json:"eventMetadata"`
487
-
488
-
Operation *AtKafkaOp `json:"operation,omitempty"`
489
-
Account *AtKafkaAccount `json:"account,omitempty"`
490
-
Identity *AtKafkaIdentity `json:"identity,omitempty"`
491
-
Info *AtKafkaInfo `json:"info,omitempty"`
492
-
}
493
-
494
-
// Intentionally using snake case since that is what Osprey expects
495
-
type OspreyEventData struct {
496
-
ActionName string `json:"action_name"`
497
-
ActionId int64 `json:"action_id"`
498
-
Data AtKafkaEvent `json:"data"`
499
-
Timestamp string `json:"timestamp"`
500
-
SecretData map[string]string `json:"secret_data"`
501
-
Encoding string `json:"encoding"`
502
-
}
503
-
504
-
type OspreyAtKafkaEvent struct {
505
-
Data OspreyEventData `json:"data"`
506
-
SendTime string `json:"send_time"`
507
-
}
+16
-44
atkafka/plc.go
+16
-44
atkafka/plc.go
···
6
6
"errors"
7
7
"fmt"
8
8
"io"
9
-
"net"
10
9
"net/http"
11
10
"time"
12
11
···
19
18
20
19
type PlcClient struct {
21
20
client *http.Client
22
-
dir *identity.BaseDirectory
23
-
plcHost string
24
-
docCache *lru.LRU[string, *identity.DIDDocument]
21
+
dir *identity.CacheDirectory
25
22
auditCache *lru.LRU[string, *DidAuditEntry]
23
+
plcHost string
26
24
}
27
25
28
26
type PlcClientArgs struct {
···
33
31
client := robusthttp.NewClient(robusthttp.WithMaxRetries(2))
34
32
client.Timeout = 3 * time.Second
35
33
36
-
baseDir := identity.BaseDirectory{
37
-
PLCURL: args.PlcHost,
38
-
PLCLimiter: rate.NewLimiter(rate.Limit(200), 100),
39
-
HTTPClient: *client,
40
-
Resolver: net.Resolver{
41
-
PreferGo: true,
42
-
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
43
-
dialer := net.Dialer{Timeout: time.Second * 5}
44
-
nameserver := address
45
-
return dialer.DialContext(ctx, network, nameserver)
46
-
},
34
+
baseDirectory := identity.BaseDirectory{
35
+
PLCURL: "https://plc.directory",
36
+
HTTPClient: http.Client{
37
+
Timeout: time.Second * 5,
47
38
},
48
-
TryAuthoritativeDNS: true,
49
-
// primary Bluesky PDS instance only supports HTTP resolution method
50
-
SkipDNSDomainSuffixes: []string{".bsky.social"},
39
+
PLCLimiter: rate.NewLimiter(rate.Limit(200), 100),
40
+
TryAuthoritativeDNS: true,
41
+
SkipDNSDomainSuffixes: []string{".bsky.social", ".staging.bsky.dev"},
51
42
}
52
-
53
-
docCache := lru.NewLRU(100_000, func(_ string, _ *identity.DIDDocument) {
54
-
cacheSize.WithLabelValues("did_doc").Dec()
55
-
}, 5*time.Minute)
43
+
directory := identity.NewCacheDirectory(&baseDirectory, 100_000, time.Hour*48, time.Minute*15, time.Minute*15)
56
44
57
45
auditCache := lru.NewLRU(100_000, func(_ string, _ *DidAuditEntry) {
58
46
cacheSize.WithLabelValues("audit_log").Dec()
···
60
48
61
49
return &PlcClient{
62
50
client: client,
63
-
dir: &baseDir,
51
+
dir: &directory,
52
+
auditCache: auditCache,
64
53
plcHost: args.PlcHost,
65
-
docCache: docCache,
66
-
auditCache: auditCache,
67
54
}
68
55
}
69
56
···
92
79
93
80
type DidAuditLog []DidAuditEntry
94
81
95
-
func (c *PlcClient) GetDIDDoc(ctx context.Context, did string) (*identity.DIDDocument, error) {
82
+
func (c *PlcClient) GetIdentity(ctx context.Context, did string) (*identity.Identity, error) {
96
83
status := "error"
97
-
cached := false
98
84
99
85
defer func() {
100
-
plcRequests.WithLabelValues("did_doc", status, fmt.Sprintf("%t", cached)).Inc()
86
+
plcRequests.WithLabelValues("did_doc", status, "unknown").Inc()
101
87
}()
102
88
103
-
if val, ok := c.docCache.Get(did); ok {
104
-
status = "ok"
105
-
cached = true
106
-
return val, nil
107
-
}
108
-
109
-
didDoc, err := c.dir.ResolveDID(ctx, syntax.DID(did))
89
+
identity, err := c.dir.LookupDID(ctx, syntax.DID(did))
110
90
if err != nil {
111
91
return nil, fmt.Errorf("failed to lookup DID: %w", err)
112
92
}
113
93
114
-
if didDoc == nil {
115
-
return nil, fmt.Errorf("DID Document not found")
116
-
}
117
-
118
-
if c.docCache != nil {
119
-
c.docCache.Add(did, didDoc)
120
-
}
121
-
122
94
cacheSize.WithLabelValues("did_doc").Inc()
123
95
status = "ok"
124
96
125
-
return didDoc, nil
97
+
return identity, nil
126
98
}
127
99
128
100
var ErrAuditLogNotFound = errors.New("audit log not found for DID")
+63
atkafka/types.go
+63
atkafka/types.go
···
1
+
package atkafka
2
+
3
+
import "github.com/bluesky-social/indigo/atproto/identity"
4
+
5
+
type AtKafkaOp struct {
6
+
Action string `json:"action"`
7
+
Collection string `json:"collection"`
8
+
Rkey string `json:"rkey"`
9
+
Uri string `json:"uri"`
10
+
Cid string `json:"cid"`
11
+
Path string `json:"path"`
12
+
Record map[string]any `json:"record"`
13
+
}
14
+
15
+
type AtKafkaIdentity struct {
16
+
Seq int64 `json:"seq"`
17
+
Handle string `json:"handle"`
18
+
}
19
+
20
+
type AtKafkaInfo struct {
21
+
Name string `json:"name"`
22
+
Message *string `json:"message,omitempty"`
23
+
}
24
+
25
+
type AtKafkaAccount struct {
26
+
Active bool `json:"active"`
27
+
Seq int64 `json:"seq"`
28
+
Status *string `json:"status,omitempty"`
29
+
}
30
+
31
+
type AtKafkaEvent struct {
32
+
Did string `json:"did"`
33
+
Timestamp string `json:"timestamp"`
34
+
Metadata *EventMetadata `json:"eventMetadata"`
35
+
36
+
Operation *AtKafkaOp `json:"operation,omitempty"`
37
+
Account *AtKafkaAccount `json:"account,omitempty"`
38
+
Identity *AtKafkaIdentity `json:"identity,omitempty"`
39
+
Info *AtKafkaInfo `json:"info,omitempty"`
40
+
}
41
+
42
+
// Intentionally using snake case since that is what Osprey expects
43
+
type OspreyEventData struct {
44
+
ActionName string `json:"action_name"`
45
+
ActionId int64 `json:"action_id"`
46
+
Data AtKafkaEvent `json:"data"`
47
+
Timestamp string `json:"timestamp"`
48
+
SecretData map[string]string `json:"secret_data"`
49
+
Encoding string `json:"encoding"`
50
+
}
51
+
52
+
type OspreyAtKafkaEvent struct {
53
+
Data OspreyEventData `json:"data"`
54
+
SendTime string `json:"send_time"`
55
+
}
56
+
57
+
type EventMetadata struct {
58
+
DidDocument identity.DIDDocument `json:"didDocument,omitempty"`
59
+
PdsHost string `json:"pdsHost,omitempty"`
60
+
Handle string `json:"handle,omitempty"`
61
+
DidCreatedAt string `json:"didCreatedAt,omitempty"`
62
+
AccountAge int64 `json:"accountAge"`
63
+
}
+39
-7
cmd/atkafka/main.go
+39
-7
cmd/atkafka/main.go
···
20
20
telemetry.CLIFlagMetricsListenAddress,
21
21
&cli.StringFlag{
22
22
Name: "relay-host",
23
+
Usage: "Websocket host to subscribe to for events",
23
24
Value: "wss://bsky.network",
24
25
EnvVars: []string{"ATKAFKA_RELAY_HOST"},
25
26
},
26
27
&cli.StringSliceFlag{
27
28
Name: "bootstrap-servers",
29
+
Usage: "List of Kafka bootstrap servers",
28
30
EnvVars: []string{"ATKAFKA_BOOTSTRAP_SERVERS"},
29
31
Required: true,
30
32
},
31
33
&cli.StringFlag{
32
34
Name: "output-topic",
35
+
Usage: "The Kafka topic to produce events to",
33
36
EnvVars: []string{"ATKAFKA_OUTPUT_TOPIC"},
34
37
Required: true,
35
38
},
36
39
&cli.BoolFlag{
37
40
Name: "osprey-compatible",
41
+
Usage: "Whether or not events should be formulated in an Osprey-compatible format",
38
42
EnvVars: []string{"ATKAFKA_OSPREY_COMPATIBLE"},
39
43
Value: false,
40
44
},
41
45
&cli.StringFlag{
42
46
Name: "plc-host",
47
+
Usage: "The host of the PLC directory you want to use for event metadata",
43
48
EnvVars: []string{"ATKAFKA_PLC_HOST"},
44
49
},
50
+
&cli.StringSliceFlag{
51
+
Name: "watched-services",
52
+
Usage: "A list of ATProto services inside a user's DID document that you want to watch. Wildcards like *.bsky.network are allowed.",
53
+
EnvVars: []string{"ATKAFKA_WATCHED_SERVICES"},
54
+
},
55
+
&cli.StringSliceFlag{
56
+
Name: "ignored-services",
57
+
Usage: "A list of ATProto services inside a user's DID document that you want to ignore. Wildcards like *.bsky.network are allowed.",
58
+
EnvVars: []string{"ATKAFKA_IGNORED_SERVICES"},
59
+
},
60
+
&cli.StringSliceFlag{
61
+
Name: "watched-collections",
62
+
Usage: "A list of collections that you want to watch. Wildcards like *.bsky.app are allowed.",
63
+
EnvVars: []string{"ATKAFKA_WATCHED_COLLECTIONS"},
64
+
},
65
+
&cli.StringSliceFlag{
66
+
Name: "ignored-collections",
67
+
Usage: "A list of collections that you want to ignore. Wildcards like *.bsky.app are allowed.",
68
+
EnvVars: []string{"ATKAFKA_IGNORED_COLLECTIONS"},
69
+
},
45
70
},
46
71
Action: func(cmd *cli.Context) error {
47
72
ctx := context.Background()
···
49
74
telemetry.StartMetrics(cmd)
50
75
logger := telemetry.StartLogger(cmd)
51
76
52
-
s := atkafka.NewServer(&atkafka.ServerArgs{
53
-
RelayHost: cmd.String("relay-host"),
54
-
BootstrapServers: cmd.StringSlice("bootstrap-servers"),
55
-
OutputTopic: cmd.String("output-topic"),
56
-
OspreyCompat: cmd.Bool("osprey-compatible"),
57
-
PlcHost: cmd.String("plc-host"),
58
-
Logger: logger,
77
+
s, err := atkafka.NewServer(&atkafka.ServerArgs{
78
+
RelayHost: cmd.String("relay-host"),
79
+
BootstrapServers: cmd.StringSlice("bootstrap-servers"),
80
+
OutputTopic: cmd.String("output-topic"),
81
+
OspreyCompat: cmd.Bool("osprey-compatible"),
82
+
PlcHost: cmd.String("plc-host"),
83
+
WatchedServices: cmd.StringSlice("watched-services"),
84
+
IgnoredServices: cmd.StringSlice("ignored-services"),
85
+
WatchedCollections: cmd.StringSlice("watched-collections"),
86
+
IgnoredCollections: cmd.StringSlice("ignored-collections"),
87
+
Logger: logger,
59
88
})
89
+
if err != nil {
90
+
return fmt.Errorf("failed to create new server: %w", err)
91
+
}
60
92
61
93
if err := s.Run(ctx); err != nil {
62
94
return fmt.Errorf("error running server: %w", err)
+4
docker-compose.yml
+4
docker-compose.yml