this repo has no description

add ozone events to kafka (#4)

* add ozone event stream

* update readme

authored by hailey.at and committed by GitHub 8c84f512 f275487f

+2
.gitignore
··· 1 + .env 2 + *cursor.txt
+61 -29
README.md
··· 1 1 # at-kafka 2 2 3 - A small service that receives events from the AT firehose and produces them to Kafka. Supports standard JSON outputs as well as [Osprey](https://github.com/roostorg/osprey) 4 - formatted events. 5 - 6 - Additionally, at-kafka supports subscribing to [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) if youare attempting to perform a network backfill. 3 + A small service that receives events from ATProto and produces them to Kafka. Supports: 4 + - Firehose events from relay or [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) (for network backfill) 5 + - Ozone moderation events 6 + - Standard JSON and [Osprey](https://github.com/roostorg/osprey)-compatible event formats 7 7 8 8 ## Usage 9 9 10 10 ### Docker Compose 11 11 12 - The included `docker-compose.yml` provides a complete local stack. Edit the environment variables in the file to customize: 12 + Three compose files are provided: 13 + 14 + ```bash 15 + # Firehose relay mode (default) 16 + docker compose up -d 17 + 18 + # Firehose tap mode (for backfill) 19 + docker compose -f docker-compose.tap.yml up -d 20 + 21 + # Ozone moderation events 22 + docker compose -f docker-compose.ozone.yml up -d 23 + ``` 24 + 25 + #### Firehose Configuration 13 26 14 27 ```yaml 15 28 environment: 16 - # For relay mode 17 - ATKAFKA_RELAY_HOST: "wss://bsky.network" # ATProto relay to subscribe to for events 29 + # Relay/Tap connection 30 + ATKAFKA_RELAY_HOST: "wss://bsky.network" 31 + ATKAFKA_TAP_HOST: "ws://localhost:2480" 32 + ATKAFKA_DISABLE_ACKS: false 18 33 19 - # For tap mode 20 - ATKAFKA_TAP_HOST: "ws://localhost:2480" # Tap websocket host to subscribe to for events 21 - ATKAFKA_DISABLE_ACKS: false # Whether to disable sending of acks to Tap 34 + # Kafka 35 + ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" 36 + ATKAFKA_OUTPUT_TOPIC: "atproto-events" 37 + ATKAFKA_OSPREY_COMPATIBLE: false 22 38 23 - # Kafka configuration 24 - ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" # Kafka bootstrap servers, comma separated 25 - ATKAFKA_OUTPUT_TOPIC: "atproto-events" # The output topic for events 26 - ATKAFKA_OSPREY_COMPATIBLE: false # Whether to produce Osprey-compatible events 39 + # Filtering 40 + ATKAFKA_WATCHED_SERVICES: "*.bsky.network" 41 + ATKAFKA_IGNORED_SERVICES: "blacksky.app" 42 + ATKAFKA_WATCHED_COLLECTIONS: "app.bsky.*" 43 + ATKAFKA_IGNORED_COLLECTIONS: "fm.teal.*" 44 + ``` 27 45 28 - # Match only Blacksky PDS users 29 - ATKAFKA_MATCHED_SERVICES: "blacksky.app" # A comma-separated list of PDSes to emit events for 30 - # OR ignore anyone on Bluesky PBC PDSes 31 - ATKAFKA_IGNORED_SERVICES: "*.bsky.network" # OR a comma-separated list of PDSes to _not_ emit events for 46 + #### Ozone Configuration 47 + 48 + ```yaml 49 + environment: 50 + # Ozone connection 51 + ATKAFKA_OZONE_PDS_HOST: "https://pds.example.com" 52 + ATKAFKA_OZONE_IDENTIFIER: "your.handle" 53 + ATKAFKA_OZONE_PASSWORD: "password" 54 + ATKAFKA_OZONE_LABELER_DID: "did:plc:..." 32 55 33 - # Match only Teal.fm records 34 - ATKAFKA_MATCHED_COLLECTIONS: "fm.teal.*" # A comma-separated list of collections to emit events for 35 - # OR ignore all Bluesky records 36 - ATKAFKA_IGNORED_COLLECTIONS: "app.bsky.*" # OR a comma-separated list of collections to ignore events for 56 + # Kafka 57 + ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" 58 + ATKAFKA_OUTPUT_TOPIC: "ozone-events" 37 59 ``` 38 60 39 - Then start: 61 + ### CLI 40 62 41 63 ```bash 42 - # For normal mode 43 - docker compose up -d 44 - 45 - # For tap mode 46 - docker compose -f docker-compose.tap.yml up -d 64 + # Firehose modes 65 + atkafka firehose relay --bootstrap-servers localhost:9092 --output-topic events 66 + atkafka firehose tap --tap-host ws://localhost:2480 --bootstrap-servers localhost:9092 --output-topic events 47 67 68 + # Ozone mode 69 + atkafka ozone-events \ 70 + --pds-host https://pds.example.com \ 71 + --identifier admin@example.com \ 72 + --password password \ 73 + --labeler-did did:plc:... \ 74 + --bootstrap-servers localhost:9092 \ 75 + --output-topic ozone-events 48 76 ``` 49 77 50 78 ## Event Structure 51 79 52 - ### Standard Mode 80 + ### Firehose Events 53 81 54 82 Events are structured similarly to the raw AT Protocol firehose, with one key difference: **commit events are split into individual operation events**. 55 83 ··· 128 156 - `account` - Account status changes 129 157 - `identity` - Identity/handle changes 130 158 - `info` - Informational messages 159 + 160 + ### Ozone Events 161 + 162 + Ozone events are produced as-is from the `tools.ozone.moderation.queryEvents` API response. Events include moderation actions, reports, and other moderation activity. The cursor is persisted to disk and automatically resumed on restart. 131 163 132 164 ## Monitoring 133 165
+337
atkafka/ozone.go
··· 1 + package atkafka 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "os" 10 + "os/signal" 11 + "strconv" 12 + "strings" 13 + "sync" 14 + "syscall" 15 + "time" 16 + 17 + "github.com/bluesky-social/indigo/api/atproto" 18 + "github.com/bluesky-social/indigo/api/ozone" 19 + "github.com/bluesky-social/indigo/xrpc" 20 + "github.com/twmb/franz-go/pkg/kgo" 21 + ) 22 + 23 + type OzoneServer struct { 24 + logger *slog.Logger 25 + bootstrapServers []string 26 + outputTopic string 27 + ozoneClientArgs *OzoneClientArgs 28 + 29 + cursorMu sync.Mutex 30 + cursor string 31 + } 32 + 33 + type OzoneServerArgs struct { 34 + BootstrapServers []string 35 + OutputTopic string 36 + Logger *slog.Logger 37 + OzoneClientArgs *OzoneClientArgs 38 + } 39 + 40 + func NewOzoneServer(args *OzoneServerArgs) (*OzoneServer, error) { 41 + s := OzoneServer{ 42 + logger: args.Logger, 43 + bootstrapServers: args.BootstrapServers, 44 + outputTopic: args.OutputTopic, 45 + ozoneClientArgs: args.OzoneClientArgs, 46 + } 47 + 48 + return &s, nil 49 + } 50 + 51 + func (s *OzoneServer) Run(ctx context.Context) error { 52 + logger := s.logger.With("name", "Run") 53 + 54 + if err := s.LoadCursor(); err != nil { 55 + logger.Error("error loading cursor", "err", err) 56 + } 57 + 58 + ozoneClient, err := NewOzoneClient(s.ozoneClientArgs) 59 + if err != nil { 60 + return err 61 + } 62 + 63 + logger.Info("creating producer", "bootstrap-servers", s.bootstrapServers, "output-topic", s.outputTopic) 64 + busProducer, err := NewProducer( 65 + ctx, 66 + s.logger.With("component", "producer"), 67 + s.bootstrapServers, 68 + s.outputTopic, 69 + WithEnsureTopic(true), 70 + WithTopicPartitions(4), 71 + ) 72 + if err != nil { 73 + return fmt.Errorf("failed to create producer: %w", err) 74 + } 75 + defer busProducer.close() 76 + 77 + shutdownPoll := make(chan struct{}, 1) 78 + pollShutdown := make(chan struct{}, 1) 79 + go func() { 80 + ticker := time.NewTicker(1 * time.Second) 81 + 82 + defer func() { 83 + ticker.Stop() 84 + close(pollShutdown) 85 + }() 86 + 87 + for { 88 + select { 89 + case <-ctx.Done(): 90 + logger.Info("shutting down poller on context cancel") 91 + return 92 + case <-shutdownPoll: 93 + logger.Info("shutting down poller") 94 + return 95 + case <-ticker.C: 96 + func() { 97 + ozoneClient.mu.Lock() 98 + defer ozoneClient.mu.Unlock() 99 + 100 + cursor := s.GetCursor() 101 + 102 + logger.Info("attempting to fetch next batch of events") 103 + 104 + _, events, err := ozoneClient.GetEvents(ctx, cursor) 105 + if err != nil { 106 + logger.Error("error getting events", "err", err) 107 + return 108 + } 109 + 110 + if len(events) == 0 { 111 + logger.Info("no new events to emit") 112 + return 113 + } 114 + 115 + var newCursor string 116 + for _, evt := range events { 117 + func() { 118 + status := "error" 119 + 120 + defer func() { 121 + handledEvents.WithLabelValues("ozone", status).Inc() 122 + }() 123 + 124 + b, err := json.Marshal(evt) 125 + if err != nil { 126 + logger.Error("error marshaling event", "err", err) 127 + return 128 + } 129 + 130 + if err := busProducer.ProduceAsync(ctx, strconv.FormatInt(evt.Id, 10), b, func(r *kgo.Record, err error) { 131 + produceStatus := "error" 132 + 133 + defer func() { 134 + producedEvents.WithLabelValues(produceStatus).Inc() 135 + }() 136 + 137 + if err != nil { 138 + logger.Error("error producing event", "err", err) 139 + return 140 + } 141 + produceStatus = "ok" 142 + }); err != nil { 143 + logger.Error("error attempting to produce event", "err", err) 144 + } 145 + 146 + newCursor = evt.CreatedAt 147 + 148 + status = "ok" 149 + }() 150 + } 151 + 152 + if err := s.UpdateCursor(newCursor); err != nil { 153 + logger.Error("error updating cursor", "err", err) 154 + } 155 + 156 + logger.Info("received events to emit", "length", len(events), "new-cursor", newCursor) 157 + }() 158 + } 159 + } 160 + }() 161 + 162 + signals := make(chan os.Signal, 1) 163 + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) 164 + 165 + select { 166 + case <-ctx.Done(): 167 + logger.Warn("context cancelled") 168 + close(shutdownPoll) 169 + case sig := <-signals: 170 + logger.Info("received exit signal", "signal", sig) 171 + close(shutdownPoll) 172 + } 173 + 174 + select { 175 + case <-pollShutdown: 176 + logger.Info("shutdown successfully") 177 + case <-time.After(5 * time.Second): 178 + logger.Error("poller did not shut down within five seconds, forcing shutdown") 179 + } 180 + 181 + return nil 182 + } 183 + 184 + func (s *OzoneServer) UpdateCursor(cursor string) error { 185 + s.cursorMu.Lock() 186 + defer s.cursorMu.Unlock() 187 + s.cursor = cursor 188 + if err := os.WriteFile("ozone-cursor.txt", []byte(cursor), 0644); err != nil { 189 + return fmt.Errorf("failed to write cursor: %w", err) 190 + } 191 + return nil 192 + } 193 + 194 + func (s *OzoneServer) LoadCursor() error { 195 + logger := s.logger.With("name", "LoadCursor") 196 + 197 + s.cursorMu.Lock() 198 + defer s.cursorMu.Unlock() 199 + 200 + logger.Info("attempting to load stored cursor") 201 + 202 + b, err := os.ReadFile("ozone-cursor.txt") 203 + if err != nil { 204 + if errors.Is(err, os.ErrNotExist) { 205 + logger.Info("no previous cursor found") 206 + return nil 207 + } 208 + return fmt.Errorf("failed to load cursor: %w", err) 209 + } 210 + s.cursor = strings.TrimSpace(string(b)) 211 + logger.Info("loaded old cursor", "cursor", s.cursor) 212 + return nil 213 + } 214 + 215 + func (s *OzoneServer) GetCursor() string { 216 + s.cursorMu.Lock() 217 + defer s.cursorMu.Unlock() 218 + return s.cursor 219 + } 220 + 221 + type OzoneClient struct { 222 + xrpcc *xrpc.Client 223 + mu sync.Mutex 224 + logger *slog.Logger 225 + } 226 + 227 + type OzoneClientArgs struct { 228 + OzonePdsHost string 229 + OzoneIdentifier string 230 + OzonePassword string 231 + OzoneLabelerDid string 232 + Logger *slog.Logger 233 + } 234 + 235 + func NewOzoneClient(args *OzoneClientArgs) (*OzoneClient, error) { 236 + xrpcc := &xrpc.Client{ 237 + Host: args.OzonePdsHost, 238 + Headers: map[string]string{ 239 + "atproto-proxy": fmt.Sprintf("%s#atproto_labeler", args.OzoneLabelerDid), 240 + }, 241 + } 242 + 243 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 244 + defer cancel() 245 + 246 + resp, err := atproto.ServerCreateSession(ctx, xrpcc, &atproto.ServerCreateSession_Input{ 247 + Identifier: args.OzoneIdentifier, 248 + Password: args.OzonePassword, 249 + }) 250 + if err != nil { 251 + return nil, err 252 + } 253 + 254 + xrpcc.Auth = &xrpc.AuthInfo{ 255 + Handle: resp.Handle, 256 + Did: resp.Did, 257 + AccessJwt: resp.AccessJwt, 258 + RefreshJwt: resp.RefreshJwt, 259 + } 260 + 261 + oc := OzoneClient{ 262 + xrpcc: xrpcc, 263 + logger: args.Logger, 264 + } 265 + 266 + go func() { 267 + logger := args.Logger.With("component", "refresh-routine") 268 + 269 + ticker := time.NewTicker(1 * time.Hour) 270 + 271 + defer func() { 272 + ticker.Stop() 273 + }() 274 + 275 + for range ticker.C { 276 + func() { 277 + logger.Info("attempting to refresh session") 278 + 279 + oc.mu.Lock() 280 + defer oc.mu.Unlock() 281 + 282 + xrpcc.Auth.AccessJwt = xrpcc.Auth.RefreshJwt 283 + 284 + refreshCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 285 + defer cancel() 286 + 287 + resp, err := atproto.ServerRefreshSession(refreshCtx, xrpcc) 288 + if err != nil { 289 + logger.Error("error refreshing session", "err", err) 290 + return 291 + } 292 + 293 + xrpcc.Auth.AccessJwt = resp.AccessJwt 294 + xrpcc.Auth.RefreshJwt = resp.RefreshJwt 295 + 296 + logger.Info("session refreshed successfully") 297 + }() 298 + } 299 + }() 300 + 301 + return &oc, nil 302 + } 303 + 304 + func (oc *OzoneClient) GetEvents(ctx context.Context, before string) (*string, []*ozone.ModerationDefs_ModEventView, error) { 305 + resp, err := ozone.ModerationQueryEvents( 306 + ctx, 307 + oc.xrpcc, 308 + nil, // labels array 309 + nil, // tags array 310 + "", // age assurance state 311 + "", // batch id 312 + nil, // collections array 313 + "", // comment filter 314 + before, // created before 315 + "", // created after 316 + "", // created by 317 + "", // cursor 318 + false, // has comment 319 + true, // include all records 320 + 100, // limit 321 + nil, // mod tool array 322 + nil, // policies array 323 + nil, // removed labels array 324 + nil, // removed tags array 325 + nil, // report types array 326 + "asc", // sort direction 327 + "", // subject 328 + "", // subject type 329 + nil, // types array 330 + false, // withStrike bool 331 + ) 332 + if err != nil { 333 + return nil, nil, fmt.Errorf("error querying ozone events: %w", err) 334 + } 335 + 336 + return resp.Cursor, resp.Events, nil 337 + }
+154 -92
cmd/atkafka/main.go
··· 18 18 Flags: []cli.Flag{ 19 19 telemetry.CLIFlagDebug, 20 20 telemetry.CLIFlagMetricsListenAddress, 21 - &cli.StringFlag{ 22 - Name: "relay-host", 23 - Usage: "Websocket host to subscribe to for events", 24 - Value: "wss://bsky.network", 25 - EnvVars: []string{"ATKAFKA_RELAY_HOST"}, 26 - }, 27 - &cli.StringFlag{ 28 - Name: "plc-host", 29 - Usage: "The host of the PLC directory you want to use for event metadata", 30 - EnvVars: []string{"ATKAFKA_PLC_HOST"}, 31 - }, 32 - &cli.StringFlag{ 33 - Name: "api-host", 34 - Usage: "The API host for making XRPC calls. Recommended to use https://public.api.bsky.app", 35 - EnvVars: []string{"ATKAFKA_API_HOST"}, 36 - }, 37 21 &cli.StringSliceFlag{ 38 22 Name: "bootstrap-servers", 39 23 Usage: "List of Kafka bootstrap servers", ··· 46 30 EnvVars: []string{"ATKAFKA_OUTPUT_TOPIC"}, 47 31 Required: true, 48 32 }, 49 - &cli.BoolFlag{ 50 - Name: "osprey-compatible", 51 - Usage: "Whether or not events should be formulated in an Osprey-compatible format", 52 - EnvVars: []string{"ATKAFKA_OSPREY_COMPATIBLE"}, 53 - Value: false, 54 - }, 55 - &cli.StringSliceFlag{ 56 - Name: "watched-services", 57 - Usage: "A list of ATProto services inside a user's DID document that you want to watch. Wildcards like *.bsky.network are allowed.", 58 - EnvVars: []string{"ATKAFKA_WATCHED_SERVICES"}, 59 - }, 60 - &cli.StringSliceFlag{ 61 - Name: "ignored-services", 62 - Usage: "A list of ATProto services inside a user's DID document that you want to ignore. Wildcards like *.bsky.network are allowed.", 63 - EnvVars: []string{"ATKAFKA_IGNORED_SERVICES"}, 64 - }, 65 - &cli.StringSliceFlag{ 66 - Name: "watched-collections", 67 - Usage: "A list of collections that you want to watch. Wildcards like *.bsky.app are allowed.", 68 - EnvVars: []string{"ATKAFKA_WATCHED_COLLECTIONS"}, 69 - }, 70 - &cli.StringSliceFlag{ 71 - Name: "ignored-collections", 72 - Usage: "A list of collections that you want to ignore. Wildcards like *.bsky.app are allowed.", 73 - EnvVars: []string{"ATKAFKA_IGNORED_COLLECTIONS"}, 74 - }, 75 - }, 76 - Action: func(cmd *cli.Context) error { 77 - ctx := context.Background() 78 - 79 - telemetry.StartMetrics(cmd) 80 - logger := telemetry.StartLogger(cmd) 81 - 82 - s, err := atkafka.NewServer(&atkafka.ServerArgs{ 83 - RelayHost: cmd.String("relay-host"), 84 - PlcHost: cmd.String("plc-host"), 85 - ApiHost: cmd.String("api-host"), 86 - BootstrapServers: cmd.StringSlice("bootstrap-servers"), 87 - OutputTopic: cmd.String("output-topic"), 88 - OspreyCompat: cmd.Bool("osprey-compatible"), 89 - WatchedServices: cmd.StringSlice("watched-services"), 90 - IgnoredServices: cmd.StringSlice("ignored-services"), 91 - WatchedCollections: cmd.StringSlice("watched-collections"), 92 - IgnoredCollections: cmd.StringSlice("ignored-collections"), 93 - Logger: logger, 94 - }) 95 - if err != nil { 96 - return fmt.Errorf("failed to create new server: %w", err) 97 - } 98 - 99 - if err := s.Run(ctx); err != nil { 100 - return fmt.Errorf("error running server: %w", err) 101 - } 102 - 103 - return nil 104 33 }, 105 34 Commands: cli.Commands{ 106 35 &cli.Command{ 107 - Name: "tap-mode", 36 + Name: "firehose", 108 37 Flags: []cli.Flag{ 109 38 &cli.StringFlag{ 110 - Name: "tap-host", 111 - Usage: "Tap host to subscribe to for events", 112 - Value: "ws://localhost:2480", 113 - EnvVars: []string{"ATKAFKA_TAP_HOST"}, 39 + Name: "relay-host", 40 + Usage: "Websocket host to subscribe to for events", 41 + Value: "wss://bsky.network", 42 + EnvVars: []string{"ATKAFKA_RELAY_HOST"}, 43 + }, 44 + &cli.StringFlag{ 45 + Name: "plc-host", 46 + Usage: "The host of the PLC directory you want to use for event metadata", 47 + EnvVars: []string{"ATKAFKA_PLC_HOST"}, 48 + }, 49 + &cli.StringFlag{ 50 + Name: "api-host", 51 + Usage: "The API host for making XRPC calls. Recommended to use https://public.api.bsky.app", 52 + EnvVars: []string{"ATKAFKA_API_HOST"}, 114 53 }, 115 54 &cli.IntFlag{ 116 55 Name: "tap-workers", ··· 119 58 EnvVars: []string{"ATKAFKA_TAP_WORKERS"}, 120 59 }, 121 60 &cli.BoolFlag{ 122 - Name: "disable-acks", 123 - Usage: "Set to `true` to disable sending of event acks to Tap. May result in the loss of events.", 124 - EnvVars: []string{"ATKAFKA_DISABLE_ACKS"}, 61 + Name: "osprey-compatible", 62 + Usage: "Whether or not events should be formulated in an Osprey-compatible format", 63 + EnvVars: []string{"ATKAFKA_OSPREY_COMPATIBLE"}, 64 + Value: false, 65 + }, 66 + &cli.StringSliceFlag{ 67 + Name: "watched-services", 68 + Usage: "A list of ATProto services inside a user's DID document that you want to watch. Wildcards like *.bsky.network are allowed.", 69 + EnvVars: []string{"ATKAFKA_WATCHED_SERVICES"}, 70 + }, 71 + &cli.StringSliceFlag{ 72 + Name: "ignored-services", 73 + Usage: "A list of ATProto services inside a user's DID document that you want to ignore. Wildcards like *.bsky.network are allowed.", 74 + EnvVars: []string{"ATKAFKA_IGNORED_SERVICES"}, 75 + }, 76 + &cli.StringSliceFlag{ 77 + Name: "watched-collections", 78 + Usage: "A list of collections that you want to watch. Wildcards like *.bsky.app are allowed.", 79 + EnvVars: []string{"ATKAFKA_WATCHED_COLLECTIONS"}, 80 + }, 81 + &cli.StringSliceFlag{ 82 + Name: "ignored-collections", 83 + Usage: "A list of collections that you want to ignore. Wildcards like *.bsky.app are allowed.", 84 + EnvVars: []string{"ATKAFKA_IGNORED_COLLECTIONS"}, 125 85 }, 126 86 }, 127 - Action: func(cmd *cli.Context) error { 87 + Subcommands: cli.Commands{ 88 + &cli.Command{ 89 + Name: "relay", 90 + Action: func(cmd *cli.Context) error { 91 + ctx := context.Background() 128 92 93 + telemetry.StartMetrics(cmd) 94 + logger := telemetry.StartLogger(cmd) 95 + 96 + s, err := atkafka.NewServer(&atkafka.ServerArgs{ 97 + RelayHost: cmd.String("relay-host"), 98 + PlcHost: cmd.String("plc-host"), 99 + ApiHost: cmd.String("api-host"), 100 + BootstrapServers: cmd.StringSlice("bootstrap-servers"), 101 + OutputTopic: cmd.String("output-topic"), 102 + OspreyCompat: cmd.Bool("osprey-compatible"), 103 + WatchedServices: cmd.StringSlice("watched-services"), 104 + IgnoredServices: cmd.StringSlice("ignored-services"), 105 + WatchedCollections: cmd.StringSlice("watched-collections"), 106 + IgnoredCollections: cmd.StringSlice("ignored-collections"), 107 + Logger: logger, 108 + }) 109 + if err != nil { 110 + return fmt.Errorf("failed to create new server: %w", err) 111 + } 112 + 113 + if err := s.Run(ctx); err != nil { 114 + return fmt.Errorf("error running server: %w", err) 115 + } 116 + 117 + return nil 118 + }, 119 + }, 120 + &cli.Command{ 121 + Name: "tap", 122 + Flags: []cli.Flag{ 123 + &cli.StringFlag{ 124 + Name: "tap-host", 125 + Usage: "Tap host to subscribe to for events", 126 + Value: "ws://localhost:2480", 127 + EnvVars: []string{"ATKAFKA_TAP_HOST"}, 128 + }, 129 + &cli.BoolFlag{ 130 + Name: "disable-acks", 131 + Usage: "Set to `true` to disable sending of event acks to Tap. May result in the loss of events.", 132 + EnvVars: []string{"ATKAFKA_DISABLE_ACKS"}, 133 + }, 134 + }, 135 + Action: func(cmd *cli.Context) error { 136 + 137 + ctx := context.Background() 138 + 139 + telemetry.StartMetrics(cmd) 140 + logger := telemetry.StartLogger(cmd) 141 + 142 + s, err := atkafka.NewServer(&atkafka.ServerArgs{ 143 + TapHost: cmd.String("tap-host"), 144 + DisableAcks: cmd.Bool("disable-acks"), 145 + BootstrapServers: cmd.StringSlice("bootstrap-servers"), 146 + OutputTopic: cmd.String("output-topic"), 147 + WatchedCollections: cmd.StringSlice("watched-collections"), 148 + IgnoredCollections: cmd.StringSlice("ignored-collections"), 149 + Logger: logger, 150 + }) 151 + if err != nil { 152 + return fmt.Errorf("failed to create new server: %w", err) 153 + } 154 + 155 + if err := s.RunTapMode(ctx); err != nil { 156 + return fmt.Errorf("error running server: %w", err) 157 + } 158 + 159 + return nil 160 + }, 161 + }, 162 + }, 163 + }, 164 + &cli.Command{ 165 + Name: "ozone-events", 166 + Flags: []cli.Flag{ 167 + &cli.StringFlag{ 168 + Name: "pds-host", 169 + EnvVars: []string{"ATKAFKA_OZONE_PDS_HOST"}, 170 + Required: true, 171 + }, 172 + &cli.StringFlag{ 173 + Name: "identifier", 174 + EnvVars: []string{"ATKAFKA_OZONE_IDENTIFIER"}, 175 + Required: true, 176 + }, 177 + &cli.StringFlag{ 178 + Name: "password", 179 + EnvVars: []string{"ATKAFKA_OZONE_PASSWORD"}, 180 + Required: true, 181 + }, 182 + &cli.StringFlag{ 183 + Name: "labeler-did", 184 + EnvVars: []string{"ATKAFKA_OZONE_LABELER_DID"}, 185 + Required: true, 186 + }, 187 + }, 188 + Action: func(cmd *cli.Context) error { 129 189 ctx := context.Background() 130 190 131 191 telemetry.StartMetrics(cmd) 132 192 logger := telemetry.StartLogger(cmd) 133 193 134 - s, err := atkafka.NewServer(&atkafka.ServerArgs{ 135 - TapHost: cmd.String("tap-host"), 136 - TapWorkers: cmd.Int("tap-workers"), 137 - DisableAcks: cmd.Bool("disable-acks"), 138 - BootstrapServers: cmd.StringSlice("bootstrap-servers"), 139 - OutputTopic: cmd.String("output-topic"), 140 - WatchedCollections: cmd.StringSlice("watched-collections"), 141 - IgnoredCollections: cmd.StringSlice("ignored-collections"), 142 - Logger: logger, 194 + s, err := atkafka.NewOzoneServer(&atkafka.OzoneServerArgs{ 195 + Logger: logger, 196 + BootstrapServers: cmd.StringSlice("bootstrap-servers"), 197 + OutputTopic: cmd.String("output-topic"), 198 + OzoneClientArgs: &atkafka.OzoneClientArgs{ 199 + OzonePdsHost: cmd.String("pds-host"), 200 + OzoneIdentifier: cmd.String("identifier"), 201 + OzonePassword: cmd.String("password"), 202 + OzoneLabelerDid: cmd.String("labeler-did"), 203 + Logger: logger, 204 + }, 143 205 }) 144 206 if err != nil { 145 - return fmt.Errorf("failed to create new server: %w", err) 207 + return fmt.Errorf("failed to create new ozone server: %w", err) 146 208 } 147 209 148 - if err := s.RunTapMode(ctx); err != nil { 149 - return fmt.Errorf("error running server: %w", err) 210 + if err := s.Run(ctx); err != nil { 211 + return fmt.Errorf("error running ozone server: %w", err) 150 212 } 151 213 152 214 return nil
+39
docker-compose.ozone.yml
··· 1 + services: 2 + zookeeper: 3 + image: confluentinc/cp-zookeeper:7.6.0 4 + hostname: zookeeper 5 + container_name: zookeeper 6 + ports: 7 + - "2181:2181" 8 + environment: 9 + ZOOKEEPER_CLIENT_PORT: 2181 10 + ZOOKEEPER_TICK_TIME: 2000 11 + volumes: 12 + - zookeeper-data:/var/lib/zookeeper/data 13 + - zookeeper-logs:/var/lib/zookeeper/log 14 + 15 + atkafka: 16 + build: 17 + context: . 18 + dockerfile: Dockerfile 19 + container_name: atkafka 20 + command: ["ozone-events"] 21 + ports: 22 + - "2112:2112" 23 + environment: 24 + ATKAFKA_RELAY_HOST: "wss://bsky.network" 25 + ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" 26 + ATKAFKA_OUTPUT_TOPIC: "atproto-events" 27 + ATKAFKA_OSPREY_COMPATIBLE: "false" 28 + ATKAFKA_PLC_HOST: "https://plc.directory" 29 + ATKAFKA_API_HOST: "https://public.api.bsky.app" 30 + volumes: 31 + - atkafka-data:/data 32 + working_dir: /data 33 + restart: unless-stopped 34 + 35 + volumes: 36 + zookeeper-data: 37 + zookeeper-logs: 38 + kafka-data: 39 + atkafka-data:
+1 -1
docker-compose.tap.yml
··· 75 75 ports: 76 76 # metrics port 77 77 - "6009:6009" 78 - command: ["tap-mode"] 78 + command: ["firehose", "tap"] 79 79 environment: 80 80 ATKAFKA_TAP_HOST: "ws://tap:2480" 81 81 ATKAFKA_DISABLE_ACKS: false
+2 -6
docker-compose.yml
··· 41 41 context: . 42 42 dockerfile: Dockerfile 43 43 container_name: atkafka 44 - depends_on: 45 - - kafka 44 + command: ["firehose", "relay"] 46 45 ports: 47 46 - "2112:2112" 48 47 environment: ··· 51 50 ATKAFKA_OUTPUT_TOPIC: "atproto-events" 52 51 ATKAFKA_OSPREY_COMPATIBLE: "false" 53 52 ATKAFKA_PLC_HOST: "https://plc.directory" 54 - 55 - ATKAFKA_WATCHED_SERVICES: "*.bsky.network" 56 - 57 - ATKAFKA_IGNORED_COLLECTIONS: "app.bsky.*" 53 + ATKAFKA_API_HOST: "https://public.api.bsky.app" 58 54 restart: unless-stopped 59 55 60 56 volumes: