this repo has no description

Compare changes

Choose any two refs to compare.

+1
.github/workflows/docker.yml
··· 59 labels: ${{ steps.meta.outputs.labels }} 60 cache-from: type=gha 61 cache-to: type=gha,mode=max
··· 59 labels: ${{ steps.meta.outputs.labels }} 60 cache-from: type=gha 61 cache-to: type=gha,mode=max 62 + platforms: linux/amd64,linux/arm64
+2
.gitignore
···
··· 1 + .env 2 + *cursor.txt
+62 -34
README.md
··· 1 # at-kafka 2 3 - A small service that receives events from the AT firehose and produces them to Kafka. Supports standard JSON outputs as well as [Osprey](https://github.com/roostorg/osprey) 4 - formatted events. 5 6 ## Usage 7 8 ### Docker Compose 9 10 - The included `docker-compose.yml` provides a complete local stack. Edit the environment variables in the file to customize: 11 12 ```yaml 13 environment: 14 ATKAFKA_RELAY_HOST: "wss://bsky.network" 15 ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" 16 ATKAFKA_OUTPUT_TOPIC: "atproto-events" 17 - ATKAFKA_OSPREY_COMPATIBLE: "false" 18 - ``` 19 - 20 - Then start: 21 22 - ```bash 23 - docker compose up -d # Start services 24 ``` 25 26 - ### Docker Run 27 28 - For standard mode: 29 30 - ```bash 31 - docker run -d \ 32 - -e ATKAFKA_BOOTSTRAP_SERVERS=kafka:9092 \ 33 - -e ATKAFKA_OUTPUT_TOPIC=atproto-events \ 34 - -p 2112:2112 \ 35 - ghcr.io/haileyok/at-kafka:latest 36 ``` 37 38 - For Osprey-compatible mode: 39 40 ```bash 41 - docker run -d \ 42 - -e ATKAFKA_BOOTSTRAP_SERVERS=kafka:9092 \ 43 - -e ATKAFKA_OUTPUT_TOPIC=atproto-events \ 44 - -e ATKAFKA_OSPREY_COMPATIBLE=true \ 45 - -p 2112:2112 \ 46 - ghcr.io/haileyok/at-kafka:latest 47 - ``` 48 - 49 - ## Configuration 50 51 - | Flag | Environment Variable | Default | Description | 52 - |------|---------------------|---------|-------------| 53 - | `--relay-host` | `ATKAFKA_RELAY_HOST` | `wss://bsky.network` | AT Protocol relay host to connect to | 54 - | `--bootstrap-servers` | `ATKAFKA_BOOTSTRAP_SERVERS` | (required) | Comma-separated list of Kafka bootstrap servers | 55 - | `--output-topic` | `ATKAFKA_OUTPUT_TOPIC` | (required) | Kafka topic to publish events to | 56 - | `--osprey-compatible` | `ATKAFKA_OSPREY_COMPATIBLE` | `false` | Enable Osprey-compatible event format | 57 58 ## Event Structure 59 60 - ### Standard Mode 61 62 Events are structured similarly to the raw AT Protocol firehose, with one key difference: **commit events are split into individual operation events**. 63 ··· 137 - `identity` - Identity/handle changes 138 - `info` - Informational messages 139 140 ## Monitoring 141 142 The service exposes Prometheus metrics on the default metrics port. 143 144 - `atkafka_handled_events` - Total events that are received on the firehose and handled 145 - `atkafka_produced_events` - Total messages that are output on the bus
··· 1 # at-kafka 2 3 + A small service that receives events from ATProto and produces them to Kafka. Supports: 4 + - Firehose events from relay or [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) (for network backfill) 5 + - Ozone moderation events 6 + - Standard JSON and [Osprey](https://github.com/roostorg/osprey)-compatible event formats 7 8 ## Usage 9 10 ### Docker Compose 11 12 + Three compose files are provided: 13 + 14 + ```bash 15 + # Firehose relay mode (default) 16 + docker compose up -d 17 + 18 + # Firehose tap mode (for backfill) 19 + docker compose -f docker-compose.tap.yml up -d 20 + 21 + # Ozone moderation events 22 + docker compose -f docker-compose.ozone.yml up -d 23 + ``` 24 + 25 + #### Firehose Configuration 26 27 ```yaml 28 environment: 29 + # Relay/Tap connection 30 ATKAFKA_RELAY_HOST: "wss://bsky.network" 31 + ATKAFKA_TAP_HOST: "ws://localhost:2480" 32 + ATKAFKA_DISABLE_ACKS: false 33 + 34 + # Kafka 35 ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" 36 ATKAFKA_OUTPUT_TOPIC: "atproto-events" 37 + ATKAFKA_OSPREY_COMPATIBLE: false 38 39 + # Filtering 40 + ATKAFKA_WATCHED_SERVICES: "*.bsky.network" 41 + ATKAFKA_IGNORED_SERVICES: "blacksky.app" 42 + ATKAFKA_WATCHED_COLLECTIONS: "app.bsky.*" 43 + ATKAFKA_IGNORED_COLLECTIONS: "fm.teal.*" 44 ``` 45 46 + #### Ozone Configuration 47 48 + ```yaml 49 + environment: 50 + # Ozone connection 51 + ATKAFKA_OZONE_PDS_HOST: "https://pds.example.com" 52 + ATKAFKA_OZONE_IDENTIFIER: "your.handle" 53 + ATKAFKA_OZONE_PASSWORD: "password" 54 + ATKAFKA_OZONE_LABELER_DID: "did:plc:..." 55 56 + # Kafka 57 + ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" 58 + ATKAFKA_OUTPUT_TOPIC: "ozone-events" 59 ``` 60 61 + ### CLI 62 63 ```bash 64 + # Firehose modes 65 + atkafka firehose relay --bootstrap-servers localhost:9092 --output-topic events 66 + atkafka firehose tap --tap-host ws://localhost:2480 --bootstrap-servers localhost:9092 --output-topic events 67 68 + # Ozone mode 69 + atkafka ozone-events \ 70 + --pds-host https://pds.example.com \ 71 + --identifier admin@example.com \ 72 + --password password \ 73 + --labeler-did did:plc:... \ 74 + --bootstrap-servers localhost:9092 \ 75 + --output-topic ozone-events 76 + ``` 77 78 ## Event Structure 79 80 + ### Firehose Events 81 82 Events are structured similarly to the raw AT Protocol firehose, with one key difference: **commit events are split into individual operation events**. 83 ··· 157 - `identity` - Identity/handle changes 158 - `info` - Informational messages 159 160 + ### Ozone Events 161 + 162 + Ozone events are produced as-is from the `tools.ozone.moderation.queryEvents` API response. Events include moderation actions, reports, and other moderation activity. The cursor is persisted to disk and automatically resumed on restart. 163 + 164 ## Monitoring 165 166 The service exposes Prometheus metrics on the default metrics port. 167 168 - `atkafka_handled_events` - Total events that are received on the firehose and handled 169 - `atkafka_produced_events` - Total messages that are output on the bus 170 + - `atkafka_plc_requests` - Total number of PLC requests that were made, if applicable, and whether they were cached 171 + - `atkafka_api_requests` - Total number of API requests that were made, if applicable, and whether they were cached 172 + - `atkafka_cache_size` - The size of the PLC and API caches 173 + - `atkafka_acks_sent` - Total acks that were sent to Tap, if applicable
+14 -3
atkafka/atkafka.go
··· 28 29 type Server struct { 30 relayHost string 31 bootstrapServers []string 32 outputTopic string 33 ospreyCompat bool ··· 42 plcClient *PlcClient 43 apiClient *ApiClient 44 logger *slog.Logger 45 } 46 47 type ServerArgs struct { 48 // network params 49 - RelayHost string 50 - PlcHost string 51 - ApiHost string 52 53 // for watched and ignoed services or collections, only one list may be supplied 54 // for both services and collections, wildcards are acceptable. for example: ··· 113 114 s := &Server{ 115 relayHost: args.RelayHost, 116 plcClient: plcClient, 117 apiClient: apiClient, 118 bootstrapServers: args.BootstrapServers,
··· 28 29 type Server struct { 30 relayHost string 31 + tapHost string 32 + tapWorkers int 33 + disableAcks bool 34 bootstrapServers []string 35 outputTopic string 36 ospreyCompat bool ··· 45 plcClient *PlcClient 46 apiClient *ApiClient 47 logger *slog.Logger 48 + ws *websocket.Conn 49 + ackQueue chan uint 50 } 51 52 type ServerArgs struct { 53 // network params 54 + RelayHost string 55 + TapHost string 56 + TapWorkers int 57 + DisableAcks bool 58 + PlcHost string 59 + ApiHost string 60 61 // for watched and ignoed services or collections, only one list may be supplied 62 // for both services and collections, wildcards are acceptable. for example: ··· 121 122 s := &Server{ 123 relayHost: args.RelayHost, 124 + tapHost: args.TapHost, 125 + tapWorkers: args.TapWorkers, 126 + disableAcks: args.DisableAcks, 127 plcClient: plcClient, 128 apiClient: apiClient, 129 bootstrapServers: args.BootstrapServers,
+15
atkafka/metrics.go
··· 30 Namespace: "atkafka", 31 Name: "api_requests", 32 }, []string{"kind", "status", "cached"}) 33 )
··· 30 Namespace: "atkafka", 31 Name: "api_requests", 32 }, []string{"kind", "status", "cached"}) 33 + 34 + acksSent = promauto.NewCounterVec(prometheus.CounterOpts{ 35 + Namespace: "atkafka", 36 + Name: "acks_sent", 37 + }, []string{"status"}) 38 + 39 + tapEvtBufferSize = promauto.NewGauge(prometheus.GaugeOpts{ 40 + Namespace: "atkafka", 41 + Name: "tap_event_buffer_size", 42 + }) 43 + 44 + tapAckBufferSize = promauto.NewGauge(prometheus.GaugeOpts{ 45 + Namespace: "atkafka", 46 + Name: "tap_ack_event_buffer_size", 47 + }) 48 )
+337
atkafka/ozone.go
···
··· 1 + package atkafka 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "os" 10 + "os/signal" 11 + "strconv" 12 + "strings" 13 + "sync" 14 + "syscall" 15 + "time" 16 + 17 + "github.com/bluesky-social/indigo/api/atproto" 18 + "github.com/bluesky-social/indigo/api/ozone" 19 + "github.com/bluesky-social/indigo/xrpc" 20 + "github.com/twmb/franz-go/pkg/kgo" 21 + ) 22 + 23 + type OzoneServer struct { 24 + logger *slog.Logger 25 + bootstrapServers []string 26 + outputTopic string 27 + ozoneClientArgs *OzoneClientArgs 28 + 29 + cursorMu sync.Mutex 30 + cursor string 31 + } 32 + 33 + type OzoneServerArgs struct { 34 + BootstrapServers []string 35 + OutputTopic string 36 + Logger *slog.Logger 37 + OzoneClientArgs *OzoneClientArgs 38 + } 39 + 40 + func NewOzoneServer(args *OzoneServerArgs) (*OzoneServer, error) { 41 + s := OzoneServer{ 42 + logger: args.Logger, 43 + bootstrapServers: args.BootstrapServers, 44 + outputTopic: args.OutputTopic, 45 + ozoneClientArgs: args.OzoneClientArgs, 46 + } 47 + 48 + return &s, nil 49 + } 50 + 51 + func (s *OzoneServer) Run(ctx context.Context) error { 52 + logger := s.logger.With("name", "Run") 53 + 54 + if err := s.LoadCursor(); err != nil { 55 + logger.Error("error loading cursor", "err", err) 56 + } 57 + 58 + ozoneClient, err := NewOzoneClient(s.ozoneClientArgs) 59 + if err != nil { 60 + return err 61 + } 62 + 63 + logger.Info("creating producer", "bootstrap-servers", s.bootstrapServers, "output-topic", s.outputTopic) 64 + busProducer, err := NewProducer( 65 + ctx, 66 + s.logger.With("component", "producer"), 67 + s.bootstrapServers, 68 + s.outputTopic, 69 + WithEnsureTopic(true), 70 + WithTopicPartitions(4), 71 + ) 72 + if err != nil { 73 + return fmt.Errorf("failed to create producer: %w", err) 74 + } 75 + defer busProducer.close() 76 + 77 + shutdownPoll := make(chan struct{}, 1) 78 + pollShutdown := make(chan struct{}, 1) 79 + go func() { 80 + ticker := time.NewTicker(1 * time.Second) 81 + 82 + defer func() { 83 + ticker.Stop() 84 + close(pollShutdown) 85 + }() 86 + 87 + for { 88 + select { 89 + case <-ctx.Done(): 90 + logger.Info("shutting down poller on context cancel") 91 + return 92 + case <-shutdownPoll: 93 + logger.Info("shutting down poller") 94 + return 95 + case <-ticker.C: 96 + func() { 97 + ozoneClient.mu.Lock() 98 + defer ozoneClient.mu.Unlock() 99 + 100 + cursor := s.GetCursor() 101 + 102 + logger.Info("attempting to fetch next batch of events") 103 + 104 + _, events, err := ozoneClient.GetEvents(ctx, cursor) 105 + if err != nil { 106 + logger.Error("error getting events", "err", err) 107 + return 108 + } 109 + 110 + if len(events) == 0 { 111 + logger.Info("no new events to emit") 112 + return 113 + } 114 + 115 + var newCursor string 116 + for _, evt := range events { 117 + func() { 118 + status := "error" 119 + 120 + defer func() { 121 + handledEvents.WithLabelValues("ozone", status).Inc() 122 + }() 123 + 124 + b, err := json.Marshal(evt) 125 + if err != nil { 126 + logger.Error("error marshaling event", "err", err) 127 + return 128 + } 129 + 130 + if err := busProducer.ProduceAsync(ctx, strconv.FormatInt(evt.Id, 10), b, func(r *kgo.Record, err error) { 131 + produceStatus := "error" 132 + 133 + defer func() { 134 + producedEvents.WithLabelValues(produceStatus).Inc() 135 + }() 136 + 137 + if err != nil { 138 + logger.Error("error producing event", "err", err) 139 + return 140 + } 141 + produceStatus = "ok" 142 + }); err != nil { 143 + logger.Error("error attempting to produce event", "err", err) 144 + } 145 + 146 + newCursor = evt.CreatedAt 147 + 148 + status = "ok" 149 + }() 150 + } 151 + 152 + if err := s.UpdateCursor(newCursor); err != nil { 153 + logger.Error("error updating cursor", "err", err) 154 + } 155 + 156 + logger.Info("received events to emit", "length", len(events), "new-cursor", newCursor) 157 + }() 158 + } 159 + } 160 + }() 161 + 162 + signals := make(chan os.Signal, 1) 163 + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) 164 + 165 + select { 166 + case <-ctx.Done(): 167 + logger.Warn("context cancelled") 168 + close(shutdownPoll) 169 + case sig := <-signals: 170 + logger.Info("received exit signal", "signal", sig) 171 + close(shutdownPoll) 172 + } 173 + 174 + select { 175 + case <-pollShutdown: 176 + logger.Info("shutdown successfully") 177 + case <-time.After(5 * time.Second): 178 + logger.Error("poller did not shut down within five seconds, forcing shutdown") 179 + } 180 + 181 + return nil 182 + } 183 + 184 + func (s *OzoneServer) UpdateCursor(cursor string) error { 185 + s.cursorMu.Lock() 186 + defer s.cursorMu.Unlock() 187 + s.cursor = cursor 188 + if err := os.WriteFile("ozone-cursor.txt", []byte(cursor), 0644); err != nil { 189 + return fmt.Errorf("failed to write cursor: %w", err) 190 + } 191 + return nil 192 + } 193 + 194 + func (s *OzoneServer) LoadCursor() error { 195 + logger := s.logger.With("name", "LoadCursor") 196 + 197 + s.cursorMu.Lock() 198 + defer s.cursorMu.Unlock() 199 + 200 + logger.Info("attempting to load stored cursor") 201 + 202 + b, err := os.ReadFile("ozone-cursor.txt") 203 + if err != nil { 204 + if errors.Is(err, os.ErrNotExist) { 205 + logger.Info("no previous cursor found") 206 + return nil 207 + } 208 + return fmt.Errorf("failed to load cursor: %w", err) 209 + } 210 + s.cursor = strings.TrimSpace(string(b)) 211 + logger.Info("loaded old cursor", "cursor", s.cursor) 212 + return nil 213 + } 214 + 215 + func (s *OzoneServer) GetCursor() string { 216 + s.cursorMu.Lock() 217 + defer s.cursorMu.Unlock() 218 + return s.cursor 219 + } 220 + 221 + type OzoneClient struct { 222 + xrpcc *xrpc.Client 223 + mu sync.Mutex 224 + logger *slog.Logger 225 + } 226 + 227 + type OzoneClientArgs struct { 228 + OzonePdsHost string 229 + OzoneIdentifier string 230 + OzonePassword string 231 + OzoneLabelerDid string 232 + Logger *slog.Logger 233 + } 234 + 235 + func NewOzoneClient(args *OzoneClientArgs) (*OzoneClient, error) { 236 + xrpcc := &xrpc.Client{ 237 + Host: args.OzonePdsHost, 238 + Headers: map[string]string{ 239 + "atproto-proxy": fmt.Sprintf("%s#atproto_labeler", args.OzoneLabelerDid), 240 + }, 241 + } 242 + 243 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 244 + defer cancel() 245 + 246 + resp, err := atproto.ServerCreateSession(ctx, xrpcc, &atproto.ServerCreateSession_Input{ 247 + Identifier: args.OzoneIdentifier, 248 + Password: args.OzonePassword, 249 + }) 250 + if err != nil { 251 + return nil, err 252 + } 253 + 254 + xrpcc.Auth = &xrpc.AuthInfo{ 255 + Handle: resp.Handle, 256 + Did: resp.Did, 257 + AccessJwt: resp.AccessJwt, 258 + RefreshJwt: resp.RefreshJwt, 259 + } 260 + 261 + oc := OzoneClient{ 262 + xrpcc: xrpcc, 263 + logger: args.Logger, 264 + } 265 + 266 + go func() { 267 + logger := args.Logger.With("component", "refresh-routine") 268 + 269 + ticker := time.NewTicker(1 * time.Hour) 270 + 271 + defer func() { 272 + ticker.Stop() 273 + }() 274 + 275 + for range ticker.C { 276 + func() { 277 + logger.Info("attempting to refresh session") 278 + 279 + oc.mu.Lock() 280 + defer oc.mu.Unlock() 281 + 282 + xrpcc.Auth.AccessJwt = xrpcc.Auth.RefreshJwt 283 + 284 + refreshCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 285 + defer cancel() 286 + 287 + resp, err := atproto.ServerRefreshSession(refreshCtx, xrpcc) 288 + if err != nil { 289 + logger.Error("error refreshing session", "err", err) 290 + return 291 + } 292 + 293 + xrpcc.Auth.AccessJwt = resp.AccessJwt 294 + xrpcc.Auth.RefreshJwt = resp.RefreshJwt 295 + 296 + logger.Info("session refreshed successfully") 297 + }() 298 + } 299 + }() 300 + 301 + return &oc, nil 302 + } 303 + 304 + func (oc *OzoneClient) GetEvents(ctx context.Context, before string) (*string, []*ozone.ModerationDefs_ModEventView, error) { 305 + resp, err := ozone.ModerationQueryEvents( 306 + ctx, 307 + oc.xrpcc, 308 + nil, // labels array 309 + nil, // tags array 310 + "", // age assurance state 311 + "", // batch id 312 + nil, // collections array 313 + "", // comment filter 314 + before, // created before 315 + "", // created after 316 + "", // created by 317 + "", // cursor 318 + false, // has comment 319 + true, // include all records 320 + 100, // limit 321 + nil, // mod tool array 322 + nil, // policies array 323 + nil, // removed labels array 324 + nil, // removed tags array 325 + nil, // report types array 326 + "asc", // sort direction 327 + "", // subject 328 + "", // subject type 329 + nil, // types array 330 + false, // withStrike bool 331 + ) 332 + if err != nil { 333 + return nil, nil, fmt.Errorf("error querying ozone events: %w", err) 334 + } 335 + 336 + return resp.Cursor, resp.Events, nil 337 + }
+327
atkafka/tap.go
···
··· 1 + package atkafka 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "net/http" 8 + "net/url" 9 + "os" 10 + "os/signal" 11 + "strings" 12 + "syscall" 13 + "time" 14 + 15 + "github.com/araddon/dateparse" 16 + "github.com/bluesky-social/indigo/atproto/syntax" 17 + "github.com/gorilla/websocket" 18 + "github.com/twmb/franz-go/pkg/kgo" 19 + ) 20 + 21 + func (s *Server) RunTapMode(ctx context.Context) error { 22 + s.logger.Info("starting tap consumer", "tap-host", s.tapHost, "bootstrap-servers", s.bootstrapServers, "output-topic", s.outputTopic) 23 + 24 + createCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 25 + defer cancel() 26 + 27 + producerLogger := s.logger.With("component", "producer") 28 + kafProducer, err := NewProducer(createCtx, producerLogger, s.bootstrapServers, s.outputTopic, 29 + WithEnsureTopic(true), 30 + WithTopicPartitions(200), 31 + ) 32 + if err != nil { 33 + return fmt.Errorf("failed to create producer: %w", err) 34 + } 35 + defer kafProducer.Close() 36 + s.producer = kafProducer 37 + s.logger.Info("created producer") 38 + 39 + wsDialer := websocket.DefaultDialer 40 + u, err := url.Parse(s.tapHost) 41 + if err != nil { 42 + return fmt.Errorf("invalid tapHost: %w", err) 43 + } 44 + u.Path = "/channel" 45 + s.logger.Info("created dialer") 46 + 47 + evtQueue := make(chan TapEvent, 10_000) 48 + for range s.tapWorkers { 49 + go func() { 50 + for evt := range evtQueue { 51 + s.handleTapEvent(ctx, &evt) 52 + } 53 + }() 54 + } 55 + 56 + ackQueue := make(chan uint, 10_000) 57 + go func() { 58 + for id := range ackQueue { 59 + func() { 60 + status := "ok" 61 + defer func() { 62 + acksSent.WithLabelValues(status).Inc() 63 + }() 64 + 65 + if !s.disableAcks { 66 + if err := s.ws.WriteJSON(TapAck{ 67 + Type: "ack", 68 + Id: id, 69 + }); err != nil { 70 + s.logger.Error("error sending ack", "err", err) 71 + status = "error" 72 + } 73 + } 74 + }() 75 + } 76 + }() 77 + s.ackQueue = ackQueue 78 + 79 + bufferSizeTicker := time.NewTicker(5 * time.Second) 80 + go func() { 81 + for range bufferSizeTicker.C { 82 + tapEvtBufferSize.Set(float64(len(evtQueue))) 83 + tapAckBufferSize.Set(float64(len(ackQueue))) 84 + } 85 + }() 86 + 87 + wsErr := make(chan error, 1) 88 + go func() { 89 + logger := s.logger.With("component", "websocket") 90 + 91 + logger.Info("subscribing to tap stream", "upstream", s.tapHost) 92 + 93 + conn, _, err := wsDialer.Dial(u.String(), http.Header{ 94 + "User-Agent": []string{"at-kafka/0.0.0"}, 95 + }) 96 + if err != nil { 97 + wsErr <- err 98 + return 99 + } 100 + s.ws = conn 101 + defer conn.Close() 102 + 103 + for { 104 + var evt TapEvent 105 + err := conn.ReadJSON(&evt) 106 + if err != nil { 107 + logger.Error("error reading json from websocket", "err", err) 108 + wsErr <- err 109 + return 110 + } 111 + 112 + select { 113 + case evtQueue <- evt: 114 + case <-ctx.Done(): 115 + wsErr <- ctx.Err() 116 + return 117 + } 118 + } 119 + }() 120 + 121 + s.logger.Info("created tap consumer") 122 + 123 + signals := make(chan os.Signal, 1) 124 + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) 125 + 126 + select { 127 + case sig := <-signals: 128 + s.logger.Info("shutting down on signal", "signal", sig) 129 + case err := <-wsErr: 130 + if err != nil { 131 + s.logger.Error("websocket error", "err", err) 132 + } else { 133 + s.logger.Info("websocket shutdown unexpectedly") 134 + } 135 + } 136 + 137 + bufferSizeTicker.Stop() 138 + close(evtQueue) 139 + close(ackQueue) 140 + 141 + return nil 142 + } 143 + 144 + func (s *Server) handleTapEvent(ctx context.Context, evt *TapEvent) error { 145 + logger := s.logger.With("component", "handleEvent") 146 + 147 + defer func() { 148 + s.ackEvent(ctx, evt.Id) 149 + }() 150 + 151 + if evt.Record != nil { 152 + did := evt.Record.Did 153 + kind := evt.Record.Action 154 + collection := evt.Record.Collection 155 + rkey := evt.Record.Rkey 156 + atUri := fmt.Sprintf("at://%s/%s/%s", did, collection, rkey) 157 + 158 + skip := false 159 + if len(s.watchedCollections) > 0 { 160 + skip = true 161 + for _, watchedCollection := range s.watchedCollections { 162 + if watchedCollection == collection || strings.HasPrefix(collection, watchedCollection+".") { 163 + skip = false 164 + break 165 + } 166 + } 167 + } else if len(s.ignoredCollections) > 0 { 168 + for _, ignoredCollection := range s.ignoredCollections { 169 + if ignoredCollection == collection || strings.HasPrefix(collection, ignoredCollection+".") { 170 + skip = true 171 + break 172 + } 173 + } 174 + } 175 + 176 + if skip { 177 + logger.Debug("skipping event based on collection", "collection", collection) 178 + return nil 179 + } 180 + 181 + actionName := "operation#" + kind 182 + 183 + handledEvents.WithLabelValues(actionName, collection).Inc() 184 + 185 + // create the formatted operation 186 + atkOp := AtKafkaOp{ 187 + Action: evt.Record.Action, 188 + Collection: collection, 189 + Rkey: rkey, 190 + Uri: atUri, 191 + Cid: evt.Record.Cid, 192 + Path: fmt.Sprintf("%s/%s", collection, rkey), 193 + } 194 + 195 + if evt.Record.Record != nil { 196 + atkOp.Record = *evt.Record.Record 197 + } 198 + 199 + kafkaEvt := AtKafkaEvent{ 200 + Did: did, 201 + Operation: &atkOp, 202 + } 203 + 204 + if evt.Record.Record != nil { 205 + timestamp, err := parseTimeFromRecord(collection, *evt.Record.Record, rkey) 206 + if err != nil { 207 + return fmt.Errorf("error getting timestamp from record: %w", err) 208 + } 209 + kafkaEvt.Timestamp = timestamp.Format(time.RFC3339Nano) 210 + } 211 + 212 + evtBytes, err := json.Marshal(&kafkaEvt) 213 + if err != nil { 214 + return fmt.Errorf("failed to marshal kafka event: %w", err) 215 + } 216 + 217 + if err := s.produceAsyncTap(ctx, did, evtBytes); err != nil { 218 + return err 219 + } 220 + } 221 + 222 + return nil 223 + } 224 + 225 + func (s *Server) produceAsyncTap(ctx context.Context, key string, msg []byte) error { 226 + logger := s.logger.With("name", "produceAsyncTap", "key", key) 227 + callback := func(r *kgo.Record, err error) { 228 + status := "ok" 229 + defer func() { 230 + producedEvents.WithLabelValues(status).Inc() 231 + }() 232 + if err != nil { 233 + logger.Error("error producing message", "err", err) 234 + status = "error" 235 + } 236 + } 237 + 238 + if err := s.producer.ProduceAsync(ctx, key, msg, callback); err != nil { 239 + return fmt.Errorf("failed to produce message: %w", err) 240 + } 241 + 242 + return nil 243 + } 244 + 245 + func (s *Server) ackEvent(ctx context.Context, id uint) { 246 + ackCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 247 + defer cancel() 248 + 249 + select { 250 + case s.ackQueue <- id: 251 + case <-ackCtx.Done(): 252 + s.logger.Warn("dropped ack event", "id", id, "err", ackCtx.Err()) 253 + acksSent.WithLabelValues("dropped").Inc() 254 + } 255 + } 256 + 257 + func parseTimeFromRecord(collection string, rec map[string]any, rkey string) (*time.Time, error) { 258 + var rkeyTime time.Time 259 + if rkey != "self" { 260 + rt, err := syntax.ParseTID(rkey) 261 + if err == nil { 262 + rkeyTime = rt.Time() 263 + } 264 + } 265 + 266 + switch collection { 267 + case "app.bsky.feed.post": 268 + cat, ok := rec["createdAt"].(string) 269 + if ok { 270 + t, err := dateparse.ParseAny(cat) 271 + if err == nil { 272 + return &t, nil 273 + } 274 + 275 + if rkeyTime.IsZero() { 276 + return timePtr(time.Now()), nil 277 + } 278 + } 279 + 280 + return &rkeyTime, nil 281 + case "app.bsky.feed.repost": 282 + cat, ok := rec["createdAt"].(string) 283 + if ok { 284 + t, err := dateparse.ParseAny(cat) 285 + if err == nil { 286 + return &t, nil 287 + } 288 + 289 + if rkeyTime.IsZero() { 290 + return nil, fmt.Errorf("failed to get a useful timestamp from record") 291 + } 292 + } 293 + 294 + return &rkeyTime, nil 295 + case "app.bsky.feed.like": 296 + cat, ok := rec["createdAt"].(string) 297 + if ok { 298 + t, err := dateparse.ParseAny(cat) 299 + if err == nil { 300 + return &t, nil 301 + } 302 + 303 + if rkeyTime.IsZero() { 304 + return nil, fmt.Errorf("failed to get a useful timestamp from record") 305 + } 306 + } 307 + 308 + return &rkeyTime, nil 309 + case "app.bsky.actor.profile": 310 + // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one 311 + return timePtr(time.Now()), nil 312 + case "app.bsky.feed.generator": 313 + if !rkeyTime.IsZero() { 314 + return &rkeyTime, nil 315 + } 316 + return timePtr(time.Now()), nil 317 + default: 318 + if !rkeyTime.IsZero() { 319 + return &rkeyTime, nil 320 + } 321 + return timePtr(time.Now()), nil 322 + } 323 + } 324 + 325 + func timePtr(t time.Time) *time.Time { 326 + return &t 327 + }
+30
atkafka/types.go
··· 65 AccountAge int64 `json:"accountAge"` 66 Profile *bsky.ActorDefs_ProfileViewDetailed `json:"profile"` 67 }
··· 65 AccountAge int64 `json:"accountAge"` 66 Profile *bsky.ActorDefs_ProfileViewDetailed `json:"profile"` 67 } 68 + 69 + type TapEvent struct { 70 + Id uint `json:"id"` 71 + Type string `json:"type"` 72 + Record *TapEventRecord `json:"record,omitempty"` 73 + Identity *TapEventIdentity `json:"identity,omitempty"` 74 + } 75 + 76 + type TapEventRecord struct { 77 + Live bool `json:"live"` 78 + Rev string `json:"rev"` 79 + Did string `json:"did"` 80 + Collection string `json:"collection"` 81 + Rkey string `json:"rkey"` 82 + Action string `json:"action"` 83 + Cid string `json:"cid"` 84 + Record *map[string]any `json:"record,omitempty"` 85 + } 86 + 87 + type TapEventIdentity struct { 88 + Did string `json:"did"` 89 + Handle string `json:"handle"` 90 + IsActive bool `json:"isActive"` 91 + Status string `json:"status"` 92 + } 93 + 94 + type TapAck struct { 95 + Type string `json:"type"` 96 + Id uint `json:"id"` 97 + }
+499
atkafka-grafana.json
···
··· 1 + { 2 + "annotations": { 3 + "list": [ 4 + { 5 + "builtIn": 1, 6 + "datasource": { 7 + "type": "grafana", 8 + "uid": "-- Grafana --" 9 + }, 10 + "enable": true, 11 + "hide": true, 12 + "iconColor": "rgba(0, 211, 255, 1)", 13 + "name": "Annotations & Alerts", 14 + "type": "dashboard" 15 + } 16 + ] 17 + }, 18 + "editable": true, 19 + "fiscalYearStartMonth": 0, 20 + "graphTooltip": 0, 21 + "id": 0, 22 + "links": [], 23 + "panels": [ 24 + { 25 + "datasource": { 26 + "type": "prometheus", 27 + "uid": "ef7prazof3yf4d" 28 + }, 29 + "fieldConfig": { 30 + "defaults": { 31 + "color": { 32 + "mode": "palette-classic" 33 + }, 34 + "custom": { 35 + "axisBorderShow": false, 36 + "axisCenteredZero": false, 37 + "axisColorMode": "text", 38 + "axisLabel": "", 39 + "axisPlacement": "auto", 40 + "barAlignment": 0, 41 + "barWidthFactor": 0.6, 42 + "drawStyle": "line", 43 + "fillOpacity": 0, 44 + "gradientMode": "none", 45 + "hideFrom": { 46 + "legend": false, 47 + "tooltip": false, 48 + "viz": false 49 + }, 50 + "insertNulls": 1800000, 51 + "lineInterpolation": "linear", 52 + "lineWidth": 1, 53 + "pointSize": 5, 54 + "scaleDistribution": { 55 + "type": "linear" 56 + }, 57 + "showPoints": "auto", 58 + "showValues": false, 59 + "spanNulls": false, 60 + "stacking": { 61 + "group": "A", 62 + "mode": "none" 63 + }, 64 + "thresholdsStyle": { 65 + "mode": "off" 66 + } 67 + }, 68 + "mappings": [], 69 + "thresholds": { 70 + "mode": "absolute", 71 + "steps": [ 72 + { 73 + "color": "green", 74 + "value": 0 75 + }, 76 + { 77 + "color": "red", 78 + "value": 80 79 + } 80 + ] 81 + } 82 + }, 83 + "overrides": [] 84 + }, 85 + "gridPos": { 86 + "h": 8, 87 + "w": 12, 88 + "x": 0, 89 + "y": 0 90 + }, 91 + "id": 2, 92 + "options": { 93 + "legend": { 94 + "calcs": [], 95 + "displayMode": "table", 96 + "placement": "right", 97 + "showLegend": true 98 + }, 99 + "tooltip": { 100 + "hideZeros": false, 101 + "mode": "single", 102 + "sort": "none" 103 + } 104 + }, 105 + "pluginVersion": "12.3.1", 106 + "targets": [ 107 + { 108 + "editorMode": "builder", 109 + "expr": "sum by(collection, action_name) (rate(atkafka_handled_events{instance=\"$instance\"}[$__rate_interval]))", 110 + "legendFormat": "{{collection}} ({{action_name}})", 111 + "range": true, 112 + "refId": "A" 113 + } 114 + ], 115 + "title": "Firehose Received Events", 116 + "type": "timeseries" 117 + }, 118 + { 119 + "datasource": { 120 + "type": "prometheus", 121 + "uid": "ef7prazof3yf4d" 122 + }, 123 + "fieldConfig": { 124 + "defaults": { 125 + "color": { 126 + "mode": "palette-classic" 127 + }, 128 + "custom": { 129 + "axisBorderShow": false, 130 + "axisCenteredZero": false, 131 + "axisColorMode": "text", 132 + "axisLabel": "", 133 + "axisPlacement": "auto", 134 + "barAlignment": 0, 135 + "barWidthFactor": 0.6, 136 + "drawStyle": "line", 137 + "fillOpacity": 0, 138 + "gradientMode": "none", 139 + "hideFrom": { 140 + "legend": false, 141 + "tooltip": false, 142 + "viz": false 143 + }, 144 + "insertNulls": 1800000, 145 + "lineInterpolation": "linear", 146 + "lineWidth": 1, 147 + "pointSize": 5, 148 + "scaleDistribution": { 149 + "type": "linear" 150 + }, 151 + "showPoints": "auto", 152 + "showValues": false, 153 + "spanNulls": false, 154 + "stacking": { 155 + "group": "A", 156 + "mode": "none" 157 + }, 158 + "thresholdsStyle": { 159 + "mode": "off" 160 + } 161 + }, 162 + "mappings": [], 163 + "thresholds": { 164 + "mode": "absolute", 165 + "steps": [ 166 + { 167 + "color": "green", 168 + "value": 0 169 + }, 170 + { 171 + "color": "red", 172 + "value": 80 173 + } 174 + ] 175 + } 176 + }, 177 + "overrides": [] 178 + }, 179 + "gridPos": { 180 + "h": 8, 181 + "w": 12, 182 + "x": 12, 183 + "y": 0 184 + }, 185 + "id": 1, 186 + "options": { 187 + "legend": { 188 + "calcs": [], 189 + "displayMode": "table", 190 + "placement": "right", 191 + "showLegend": true 192 + }, 193 + "tooltip": { 194 + "hideZeros": false, 195 + "mode": "single", 196 + "sort": "none" 197 + } 198 + }, 199 + "pluginVersion": "12.3.1", 200 + "targets": [ 201 + { 202 + "editorMode": "builder", 203 + "expr": "sum by(status) (rate(atkafka_produced_events{instance=\"$instance\"}[$__rate_interval]))", 204 + "legendFormat": "__auto", 205 + "range": true, 206 + "refId": "A" 207 + } 208 + ], 209 + "title": "Produced Events", 210 + "type": "timeseries" 211 + }, 212 + { 213 + "datasource": { 214 + "type": "prometheus", 215 + "uid": "ef7prazof3yf4d" 216 + }, 217 + "fieldConfig": { 218 + "defaults": { 219 + "color": { 220 + "mode": "palette-classic" 221 + }, 222 + "custom": { 223 + "axisBorderShow": false, 224 + "axisCenteredZero": false, 225 + "axisColorMode": "text", 226 + "axisLabel": "", 227 + "axisPlacement": "auto", 228 + "barAlignment": 0, 229 + "barWidthFactor": 0.6, 230 + "drawStyle": "line", 231 + "fillOpacity": 0, 232 + "gradientMode": "none", 233 + "hideFrom": { 234 + "legend": false, 235 + "tooltip": false, 236 + "viz": false 237 + }, 238 + "insertNulls": 1800000, 239 + "lineInterpolation": "linear", 240 + "lineWidth": 1, 241 + "pointSize": 5, 242 + "scaleDistribution": { 243 + "type": "linear" 244 + }, 245 + "showPoints": "auto", 246 + "showValues": false, 247 + "spanNulls": false, 248 + "stacking": { 249 + "group": "A", 250 + "mode": "none" 251 + }, 252 + "thresholdsStyle": { 253 + "mode": "off" 254 + } 255 + }, 256 + "mappings": [], 257 + "thresholds": { 258 + "mode": "absolute", 259 + "steps": [ 260 + { 261 + "color": "green", 262 + "value": 0 263 + }, 264 + { 265 + "color": "red", 266 + "value": 80 267 + } 268 + ] 269 + } 270 + }, 271 + "overrides": [] 272 + }, 273 + "gridPos": { 274 + "h": 8, 275 + "w": 12, 276 + "x": 0, 277 + "y": 8 278 + }, 279 + "id": 3, 280 + "options": { 281 + "legend": { 282 + "calcs": [], 283 + "displayMode": "table", 284 + "placement": "right", 285 + "showLegend": true 286 + }, 287 + "tooltip": { 288 + "hideZeros": false, 289 + "mode": "single", 290 + "sort": "none" 291 + } 292 + }, 293 + "pluginVersion": "12.3.1", 294 + "targets": [ 295 + { 296 + "editorMode": "builder", 297 + "expr": "sum by(status, cached, kind) (rate(atkafka_api_requests{instance=\"$instance\"}[$__rate_interval]))", 298 + "legendFormat": "{{kind}} cached={{cached}} status={{status}}", 299 + "range": true, 300 + "refId": "A" 301 + } 302 + ], 303 + "title": "API Requests", 304 + "type": "timeseries" 305 + }, 306 + { 307 + "datasource": { 308 + "type": "prometheus", 309 + "uid": "ef7prazof3yf4d" 310 + }, 311 + "fieldConfig": { 312 + "defaults": { 313 + "color": { 314 + "mode": "palette-classic" 315 + }, 316 + "custom": { 317 + "axisBorderShow": false, 318 + "axisCenteredZero": false, 319 + "axisColorMode": "text", 320 + "axisLabel": "", 321 + "axisPlacement": "auto", 322 + "barAlignment": 0, 323 + "barWidthFactor": 0.6, 324 + "drawStyle": "line", 325 + "fillOpacity": 0, 326 + "gradientMode": "none", 327 + "hideFrom": { 328 + "legend": false, 329 + "tooltip": false, 330 + "viz": false 331 + }, 332 + "insertNulls": 3600000, 333 + "lineInterpolation": "linear", 334 + "lineWidth": 1, 335 + "pointSize": 5, 336 + "scaleDistribution": { 337 + "type": "linear" 338 + }, 339 + "showPoints": "auto", 340 + "showValues": false, 341 + "spanNulls": false, 342 + "stacking": { 343 + "group": "A", 344 + "mode": "none" 345 + }, 346 + "thresholdsStyle": { 347 + "mode": "off" 348 + } 349 + }, 350 + "mappings": [], 351 + "thresholds": { 352 + "mode": "absolute", 353 + "steps": [ 354 + { 355 + "color": "green", 356 + "value": 0 357 + }, 358 + { 359 + "color": "red", 360 + "value": 80 361 + } 362 + ] 363 + } 364 + }, 365 + "overrides": [] 366 + }, 367 + "gridPos": { 368 + "h": 8, 369 + "w": 12, 370 + "x": 12, 371 + "y": 8 372 + }, 373 + "id": 4, 374 + "options": { 375 + "legend": { 376 + "calcs": [], 377 + "displayMode": "table", 378 + "placement": "right", 379 + "showLegend": true 380 + }, 381 + "tooltip": { 382 + "hideZeros": false, 383 + "mode": "single", 384 + "sort": "none" 385 + } 386 + }, 387 + "pluginVersion": "12.3.1", 388 + "targets": [ 389 + { 390 + "editorMode": "builder", 391 + "expr": "sum by(kind, status, cached) (rate(atkafka_plc_requests{instance=\"$instance\"}[$__rate_interval]))", 392 + "legendFormat": "{{kind}} cached={{cached}} status={{status}}", 393 + "range": true, 394 + "refId": "A" 395 + } 396 + ], 397 + "title": "New panel", 398 + "type": "timeseries" 399 + }, 400 + { 401 + "datasource": { 402 + "type": "prometheus", 403 + "uid": "ef7prazof3yf4d" 404 + }, 405 + "fieldConfig": { 406 + "defaults": { 407 + "color": { 408 + "mode": "thresholds" 409 + }, 410 + "mappings": [], 411 + "thresholds": { 412 + "mode": "absolute", 413 + "steps": [ 414 + { 415 + "color": "green", 416 + "value": 0 417 + } 418 + ] 419 + } 420 + }, 421 + "overrides": [] 422 + }, 423 + "gridPos": { 424 + "h": 7, 425 + "w": 6, 426 + "x": 0, 427 + "y": 16 428 + }, 429 + "id": 5, 430 + "options": { 431 + "colorMode": "value", 432 + "graphMode": "none", 433 + "justifyMode": "center", 434 + "orientation": "auto", 435 + "percentChangeColorMode": "standard", 436 + "reduceOptions": { 437 + "calcs": ["lastNotNull"], 438 + "fields": "", 439 + "values": false 440 + }, 441 + "showPercentChange": false, 442 + "text": { 443 + "titleSize": 40, 444 + "valueSize": 100 445 + }, 446 + "textMode": "auto", 447 + "wideLayout": true 448 + }, 449 + "pluginVersion": "12.3.1", 450 + "targets": [ 451 + { 452 + "editorMode": "builder", 453 + "expr": "atkafka_cache_size{instance=\"$instance\"}", 454 + "legendFormat": "{{kind}}", 455 + "range": true, 456 + "refId": "A" 457 + } 458 + ], 459 + "title": "Cache Sizes", 460 + "transparent": true, 461 + "type": "stat" 462 + } 463 + ], 464 + "preload": false, 465 + "schemaVersion": 42, 466 + "tags": [], 467 + "templating": { 468 + "list": [ 469 + { 470 + "current": { 471 + "text": "panda", 472 + "value": "panda" 473 + }, 474 + "definition": "label_values(atkafka_handled_events,instance)", 475 + "description": "", 476 + "label": "Instance", 477 + "name": "instance", 478 + "options": [], 479 + "query": { 480 + "qryType": 1, 481 + "query": "label_values(atkafka_handled_events,instance)", 482 + "refId": "PrometheusVariableQueryEditor-VariableQuery" 483 + }, 484 + "refresh": 1, 485 + "regex": "", 486 + "type": "query" 487 + } 488 + ] 489 + }, 490 + "time": { 491 + "from": "now-3h", 492 + "to": "now" 493 + }, 494 + "timepicker": {}, 495 + "timezone": "browser", 496 + "title": "ATKafka", 497 + "uid": "adn8c6k", 498 + "version": 17 499 + }
+179 -66
cmd/atkafka/main.go
··· 18 Flags: []cli.Flag{ 19 telemetry.CLIFlagDebug, 20 telemetry.CLIFlagMetricsListenAddress, 21 - &cli.StringFlag{ 22 - Name: "relay-host", 23 - Usage: "Websocket host to subscribe to for events", 24 - Value: "wss://bsky.network", 25 - EnvVars: []string{"ATKAFKA_RELAY_HOST"}, 26 - }, 27 - &cli.StringFlag{ 28 - Name: "plc-host", 29 - Usage: "The host of the PLC directory you want to use for event metadata", 30 - EnvVars: []string{"ATKAFKA_PLC_HOST"}, 31 - }, 32 - &cli.StringFlag{ 33 - Name: "api-host", 34 - Usage: "The API host for making XRPC calls. Recommended to use https://public.api.bsky.app", 35 - EnvVars: []string{"ATKAFKA_API_HOST"}, 36 - }, 37 &cli.StringSliceFlag{ 38 Name: "bootstrap-servers", 39 Usage: "List of Kafka bootstrap servers", ··· 46 EnvVars: []string{"ATKAFKA_OUTPUT_TOPIC"}, 47 Required: true, 48 }, 49 - &cli.BoolFlag{ 50 - Name: "osprey-compatible", 51 - Usage: "Whether or not events should be formulated in an Osprey-compatible format", 52 - EnvVars: []string{"ATKAFKA_OSPREY_COMPATIBLE"}, 53 - Value: false, 54 }, 55 - &cli.StringSliceFlag{ 56 - Name: "watched-services", 57 - Usage: "A list of ATProto services inside a user's DID document that you want to watch. Wildcards like *.bsky.network are allowed.", 58 - EnvVars: []string{"ATKAFKA_WATCHED_SERVICES"}, 59 - }, 60 - &cli.StringSliceFlag{ 61 - Name: "ignored-services", 62 - Usage: "A list of ATProto services inside a user's DID document that you want to ignore. Wildcards like *.bsky.network are allowed.", 63 - EnvVars: []string{"ATKAFKA_IGNORED_SERVICES"}, 64 - }, 65 - &cli.StringSliceFlag{ 66 - Name: "watched-collections", 67 - Usage: "A list of collections that you want to watch. Wildcards like *.bsky.app are allowed.", 68 - EnvVars: []string{"ATKAFKA_WATCHED_COLLECTIONS"}, 69 - }, 70 - &cli.StringSliceFlag{ 71 - Name: "ignored-collections", 72 - Usage: "A list of collections that you want to ignore. Wildcards like *.bsky.app are allowed.", 73 - EnvVars: []string{"ATKAFKA_IGNORED_COLLECTIONS"}, 74 - }, 75 - }, 76 - Action: func(cmd *cli.Context) error { 77 - ctx := context.Background() 78 79 - telemetry.StartMetrics(cmd) 80 - logger := telemetry.StartLogger(cmd) 81 82 - s, err := atkafka.NewServer(&atkafka.ServerArgs{ 83 - RelayHost: cmd.String("relay-host"), 84 - PlcHost: cmd.String("plc-host"), 85 - ApiHost: cmd.String("api-host"), 86 - BootstrapServers: cmd.StringSlice("bootstrap-servers"), 87 - OutputTopic: cmd.String("output-topic"), 88 - OspreyCompat: cmd.Bool("osprey-compatible"), 89 - WatchedServices: cmd.StringSlice("watched-services"), 90 - IgnoredServices: cmd.StringSlice("ignored-services"), 91 - WatchedCollections: cmd.StringSlice("watched-collections"), 92 - IgnoredCollections: cmd.StringSlice("ignored-collections"), 93 - Logger: logger, 94 - }) 95 - if err != nil { 96 - return fmt.Errorf("failed to create new server: %w", err) 97 - } 98 99 - if err := s.Run(ctx); err != nil { 100 - return fmt.Errorf("error running server: %w", err) 101 - } 102 103 - return nil 104 }, 105 } 106
··· 18 Flags: []cli.Flag{ 19 telemetry.CLIFlagDebug, 20 telemetry.CLIFlagMetricsListenAddress, 21 &cli.StringSliceFlag{ 22 Name: "bootstrap-servers", 23 Usage: "List of Kafka bootstrap servers", ··· 30 EnvVars: []string{"ATKAFKA_OUTPUT_TOPIC"}, 31 Required: true, 32 }, 33 + }, 34 + Commands: cli.Commands{ 35 + &cli.Command{ 36 + Name: "firehose", 37 + Flags: []cli.Flag{ 38 + &cli.StringFlag{ 39 + Name: "relay-host", 40 + Usage: "Websocket host to subscribe to for events", 41 + Value: "wss://bsky.network", 42 + EnvVars: []string{"ATKAFKA_RELAY_HOST"}, 43 + }, 44 + &cli.StringFlag{ 45 + Name: "plc-host", 46 + Usage: "The host of the PLC directory you want to use for event metadata", 47 + EnvVars: []string{"ATKAFKA_PLC_HOST"}, 48 + }, 49 + &cli.StringFlag{ 50 + Name: "api-host", 51 + Usage: "The API host for making XRPC calls. Recommended to use https://public.api.bsky.app", 52 + EnvVars: []string{"ATKAFKA_API_HOST"}, 53 + }, 54 + &cli.IntFlag{ 55 + Name: "tap-workers", 56 + Usage: "Number of workers to process events from Tap", 57 + Value: 10, 58 + EnvVars: []string{"ATKAFKA_TAP_WORKERS"}, 59 + }, 60 + &cli.BoolFlag{ 61 + Name: "osprey-compatible", 62 + Usage: "Whether or not events should be formulated in an Osprey-compatible format", 63 + EnvVars: []string{"ATKAFKA_OSPREY_COMPATIBLE"}, 64 + Value: false, 65 + }, 66 + &cli.StringSliceFlag{ 67 + Name: "watched-services", 68 + Usage: "A list of ATProto services inside a user's DID document that you want to watch. Wildcards like *.bsky.network are allowed.", 69 + EnvVars: []string{"ATKAFKA_WATCHED_SERVICES"}, 70 + }, 71 + &cli.StringSliceFlag{ 72 + Name: "ignored-services", 73 + Usage: "A list of ATProto services inside a user's DID document that you want to ignore. Wildcards like *.bsky.network are allowed.", 74 + EnvVars: []string{"ATKAFKA_IGNORED_SERVICES"}, 75 + }, 76 + &cli.StringSliceFlag{ 77 + Name: "watched-collections", 78 + Usage: "A list of collections that you want to watch. Wildcards like *.bsky.app are allowed.", 79 + EnvVars: []string{"ATKAFKA_WATCHED_COLLECTIONS"}, 80 + }, 81 + &cli.StringSliceFlag{ 82 + Name: "ignored-collections", 83 + Usage: "A list of collections that you want to ignore. Wildcards like *.bsky.app are allowed.", 84 + EnvVars: []string{"ATKAFKA_IGNORED_COLLECTIONS"}, 85 + }, 86 + }, 87 + Subcommands: cli.Commands{ 88 + &cli.Command{ 89 + Name: "relay", 90 + Action: func(cmd *cli.Context) error { 91 + ctx := context.Background() 92 + 93 + telemetry.StartMetrics(cmd) 94 + logger := telemetry.StartLogger(cmd) 95 + 96 + s, err := atkafka.NewServer(&atkafka.ServerArgs{ 97 + RelayHost: cmd.String("relay-host"), 98 + PlcHost: cmd.String("plc-host"), 99 + ApiHost: cmd.String("api-host"), 100 + BootstrapServers: cmd.StringSlice("bootstrap-servers"), 101 + OutputTopic: cmd.String("output-topic"), 102 + OspreyCompat: cmd.Bool("osprey-compatible"), 103 + WatchedServices: cmd.StringSlice("watched-services"), 104 + IgnoredServices: cmd.StringSlice("ignored-services"), 105 + WatchedCollections: cmd.StringSlice("watched-collections"), 106 + IgnoredCollections: cmd.StringSlice("ignored-collections"), 107 + Logger: logger, 108 + }) 109 + if err != nil { 110 + return fmt.Errorf("failed to create new server: %w", err) 111 + } 112 + 113 + if err := s.Run(ctx); err != nil { 114 + return fmt.Errorf("error running server: %w", err) 115 + } 116 + 117 + return nil 118 + }, 119 + }, 120 + &cli.Command{ 121 + Name: "tap", 122 + Flags: []cli.Flag{ 123 + &cli.StringFlag{ 124 + Name: "tap-host", 125 + Usage: "Tap host to subscribe to for events", 126 + Value: "ws://localhost:2480", 127 + EnvVars: []string{"ATKAFKA_TAP_HOST"}, 128 + }, 129 + &cli.BoolFlag{ 130 + Name: "disable-acks", 131 + Usage: "Set to `true` to disable sending of event acks to Tap. May result in the loss of events.", 132 + EnvVars: []string{"ATKAFKA_DISABLE_ACKS"}, 133 + }, 134 + }, 135 + Action: func(cmd *cli.Context) error { 136 + 137 + ctx := context.Background() 138 + 139 + telemetry.StartMetrics(cmd) 140 + logger := telemetry.StartLogger(cmd) 141 + 142 + s, err := atkafka.NewServer(&atkafka.ServerArgs{ 143 + TapHost: cmd.String("tap-host"), 144 + DisableAcks: cmd.Bool("disable-acks"), 145 + BootstrapServers: cmd.StringSlice("bootstrap-servers"), 146 + OutputTopic: cmd.String("output-topic"), 147 + WatchedCollections: cmd.StringSlice("watched-collections"), 148 + IgnoredCollections: cmd.StringSlice("ignored-collections"), 149 + Logger: logger, 150 + }) 151 + if err != nil { 152 + return fmt.Errorf("failed to create new server: %w", err) 153 + } 154 + 155 + if err := s.RunTapMode(ctx); err != nil { 156 + return fmt.Errorf("error running server: %w", err) 157 + } 158 + 159 + return nil 160 + }, 161 + }, 162 + }, 163 }, 164 + &cli.Command{ 165 + Name: "ozone-events", 166 + Flags: []cli.Flag{ 167 + &cli.StringFlag{ 168 + Name: "pds-host", 169 + EnvVars: []string{"ATKAFKA_OZONE_PDS_HOST"}, 170 + Required: true, 171 + }, 172 + &cli.StringFlag{ 173 + Name: "identifier", 174 + EnvVars: []string{"ATKAFKA_OZONE_IDENTIFIER"}, 175 + Required: true, 176 + }, 177 + &cli.StringFlag{ 178 + Name: "password", 179 + EnvVars: []string{"ATKAFKA_OZONE_PASSWORD"}, 180 + Required: true, 181 + }, 182 + &cli.StringFlag{ 183 + Name: "labeler-did", 184 + EnvVars: []string{"ATKAFKA_OZONE_LABELER_DID"}, 185 + Required: true, 186 + }, 187 + }, 188 + Action: func(cmd *cli.Context) error { 189 + ctx := context.Background() 190 191 + telemetry.StartMetrics(cmd) 192 + logger := telemetry.StartLogger(cmd) 193 194 + s, err := atkafka.NewOzoneServer(&atkafka.OzoneServerArgs{ 195 + Logger: logger, 196 + BootstrapServers: cmd.StringSlice("bootstrap-servers"), 197 + OutputTopic: cmd.String("output-topic"), 198 + OzoneClientArgs: &atkafka.OzoneClientArgs{ 199 + OzonePdsHost: cmd.String("pds-host"), 200 + OzoneIdentifier: cmd.String("identifier"), 201 + OzonePassword: cmd.String("password"), 202 + OzoneLabelerDid: cmd.String("labeler-did"), 203 + Logger: logger, 204 + }, 205 + }) 206 + if err != nil { 207 + return fmt.Errorf("failed to create new ozone server: %w", err) 208 + } 209 210 + if err := s.Run(ctx); err != nil { 211 + return fmt.Errorf("error running ozone server: %w", err) 212 + } 213 214 + return nil 215 + }, 216 + }, 217 }, 218 } 219
+39
docker-compose.ozone.yml
···
··· 1 + services: 2 + zookeeper: 3 + image: confluentinc/cp-zookeeper:7.6.0 4 + hostname: zookeeper 5 + container_name: zookeeper 6 + ports: 7 + - "2181:2181" 8 + environment: 9 + ZOOKEEPER_CLIENT_PORT: 2181 10 + ZOOKEEPER_TICK_TIME: 2000 11 + volumes: 12 + - zookeeper-data:/var/lib/zookeeper/data 13 + - zookeeper-logs:/var/lib/zookeeper/log 14 + 15 + atkafka: 16 + build: 17 + context: . 18 + dockerfile: Dockerfile 19 + container_name: atkafka 20 + command: ["ozone-events"] 21 + ports: 22 + - "2112:2112" 23 + environment: 24 + ATKAFKA_RELAY_HOST: "wss://bsky.network" 25 + ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" 26 + ATKAFKA_OUTPUT_TOPIC: "atproto-events" 27 + ATKAFKA_OSPREY_COMPATIBLE: "false" 28 + ATKAFKA_PLC_HOST: "https://plc.directory" 29 + ATKAFKA_API_HOST: "https://public.api.bsky.app" 30 + volumes: 31 + - atkafka-data:/data 32 + working_dir: /data 33 + restart: unless-stopped 34 + 35 + volumes: 36 + zookeeper-data: 37 + zookeeper-logs: 38 + kafka-data: 39 + atkafka-data:
+91
docker-compose.tap.yml
···
··· 1 + services: 2 + zookeeper: 3 + image: confluentinc/cp-zookeeper:7.6.0 4 + hostname: zookeeper 5 + container_name: zookeeper 6 + ports: 7 + - "2181:2181" 8 + environment: 9 + ZOOKEEPER_CLIENT_PORT: 2181 10 + ZOOKEEPER_TICK_TIME: 2000 11 + volumes: 12 + - zookeeper-data:/var/lib/zookeeper/data 13 + - zookeeper-logs:/var/lib/zookeeper/log 14 + 15 + kafka: 16 + image: confluentinc/cp-kafka:7.6.0 17 + hostname: kafka 18 + container_name: kafka 19 + depends_on: 20 + - zookeeper 21 + ports: 22 + - "9092:9092" 23 + - "9101:9101" 24 + environment: 25 + KAFKA_BROKER_ID: 1 26 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' 27 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT 28 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 29 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 30 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 31 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 32 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 33 + KAFKA_JMX_PORT: 9101 34 + KAFKA_JMX_HOSTNAME: localhost 35 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' 36 + healthcheck: 37 + test: 38 + [ 39 + "CMD", 40 + "bash", 41 + "-c", 42 + "kafka-topics --bootstrap-server kafka:29092 --list", 43 + ] 44 + volumes: 45 + - kafka-data:/var/lib/kafka/data 46 + 47 + 48 + tap: 49 + image: ghcr.io/bluesky-social/indigo/tap:latest 50 + hostname: tap 51 + container_name: tap 52 + depends_on: 53 + kafka: 54 + condition: service_healthy 55 + ports: 56 + - "2480:2480" 57 + - "6010:6010" 58 + environment: 59 + TAP_BIND: ":2480" 60 + TAP_FULL_NETWORK: true 61 + TAP_DISABLE_ACKS: false 62 + TAP_COLLECTION_FILTERS: "app.bsky.graph.follow" 63 + TAP_METRICS_LISTEN: ":6010" 64 + volumes: 65 + - tap-data:/data 66 + 67 + atkafka: 68 + build: 69 + context: . 70 + dockerfile: Dockerfile 71 + container_name: atkafka 72 + depends_on: 73 + - kafka 74 + - tap 75 + ports: 76 + # metrics port 77 + - "6009:6009" 78 + command: ["firehose", "tap"] 79 + environment: 80 + ATKAFKA_TAP_HOST: "ws://tap:2480" 81 + ATKAFKA_DISABLE_ACKS: false 82 + ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" 83 + ATKAFKA_OUTPUT_TOPIC: "tap-events" 84 + 85 + restart: unless-stopped 86 + 87 + volumes: 88 + zookeeper-data: 89 + zookeeper-logs: 90 + kafka-data: 91 + tap-data:
+2 -6
docker-compose.yml
··· 41 context: . 42 dockerfile: Dockerfile 43 container_name: atkafka 44 - depends_on: 45 - - kafka 46 ports: 47 - "2112:2112" 48 environment: ··· 51 ATKAFKA_OUTPUT_TOPIC: "atproto-events" 52 ATKAFKA_OSPREY_COMPATIBLE: "false" 53 ATKAFKA_PLC_HOST: "https://plc.directory" 54 - 55 - ATKAFKA_WATCHED_SERVICES: "*.bsky.network" 56 - 57 - ATKAFKA_IGNORED_COLLECTIONS: "app.bsky.*" 58 restart: unless-stopped 59 60 volumes:
··· 41 context: . 42 dockerfile: Dockerfile 43 container_name: atkafka 44 + command: ["firehose", "relay"] 45 ports: 46 - "2112:2112" 47 environment: ··· 50 ATKAFKA_OUTPUT_TOPIC: "atproto-events" 51 ATKAFKA_OSPREY_COMPATIBLE: "false" 52 ATKAFKA_PLC_HOST: "https://plc.directory" 53 + ATKAFKA_API_HOST: "https://public.api.bsky.app" 54 restart: unless-stopped 55 56 volumes:
+2 -1
go.mod
··· 3 go 1.25.4 4 5 require ( 6 github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934 7 github.com/bluesky-social/indigo v0.0.0-20251125184450-35c1e15d2e5f 8 github.com/gorilla/websocket v1.5.3 ··· 11 github.com/prometheus/client_golang v1.23.2 12 github.com/twmb/franz-go v1.19.5 13 github.com/urfave/cli/v2 v2.25.7 14 golang.org/x/time v0.6.0 15 ) 16 ··· 95 go.yaml.in/yaml/v2 v2.4.2 // indirect 96 golang.org/x/crypto v0.40.0 // indirect 97 golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect 98 - golang.org/x/sync v0.16.0 // indirect 99 golang.org/x/sys v0.35.0 // indirect 100 golang.org/x/text v0.28.0 // indirect 101 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
··· 3 go 1.25.4 4 5 require ( 6 + github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de 7 github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934 8 github.com/bluesky-social/indigo v0.0.0-20251125184450-35c1e15d2e5f 9 github.com/gorilla/websocket v1.5.3 ··· 12 github.com/prometheus/client_golang v1.23.2 13 github.com/twmb/franz-go v1.19.5 14 github.com/urfave/cli/v2 v2.25.7 15 + golang.org/x/sync v0.16.0 16 golang.org/x/time v0.6.0 17 ) 18 ··· 97 go.yaml.in/yaml/v2 v2.4.2 // indirect 98 golang.org/x/crypto v0.40.0 // indirect 99 golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect 100 golang.org/x/sys v0.35.0 // indirect 101 golang.org/x/text v0.28.0 // indirect 102 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
+5
go.sum
··· 3 github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4= 4 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM= 5 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA= 6 github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= 7 github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= 8 github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= ··· 200 github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= 201 github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= 202 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= 203 github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= 204 github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= 205 github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= ··· 249 github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= 250 github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= 251 github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= 252 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= 253 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= 254 github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= 255 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= 256 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= 257 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= 258 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= 259 github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= 260 github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
··· 3 github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4= 4 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM= 5 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA= 6 + github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA= 7 + github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw= 8 github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= 9 github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= 10 github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= ··· 202 github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= 203 github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= 204 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= 205 + github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= 206 github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= 207 github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= 208 github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= ··· 252 github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= 253 github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= 254 github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= 255 + github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= 256 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= 257 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= 258 github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= 259 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= 260 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= 261 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= 262 + github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= 263 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= 264 github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= 265 github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=