this repo has no description

Compare changes

Choose any two refs to compare.

Changed files
+404 -1
atkafka
cmd
atkafka
+3
atkafka/atkafka.go
··· 28 28 29 29 type Server struct { 30 30 relayHost string 31 + tapHost string 31 32 bootstrapServers []string 32 33 outputTopic string 33 34 ospreyCompat bool ··· 47 48 type ServerArgs struct { 48 49 // network params 49 50 RelayHost string 51 + TapHost string 50 52 PlcHost string 51 53 ApiHost string 52 54 ··· 113 115 114 116 s := &Server{ 115 117 relayHost: args.RelayHost, 118 + tapHost: args.TapHost, 116 119 plcClient: plcClient, 117 120 apiClient: apiClient, 118 121 bootstrapServers: args.BootstrapServers,
+268
atkafka/tap.go
··· 1 + package atkafka 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "net/http" 8 + "net/url" 9 + "os" 10 + "os/signal" 11 + "strings" 12 + "syscall" 13 + "time" 14 + 15 + "github.com/araddon/dateparse" 16 + "github.com/bluesky-social/indigo/atproto/syntax" 17 + "github.com/gorilla/websocket" 18 + "golang.org/x/sync/semaphore" 19 + ) 20 + 21 + func (s *Server) RunTapMode(ctx context.Context) error { 22 + sema := semaphore.NewWeighted(1_000) 23 + 24 + s.logger.Info("starting tap consumer", "tap-host", s.tapHost, "bootstrap-servers", s.bootstrapServers, "output-topic", s.outputTopic) 25 + 26 + createCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 27 + defer cancel() 28 + 29 + producerLogger := s.logger.With("component", "producer") 30 + kafProducer, err := NewProducer(createCtx, producerLogger, s.bootstrapServers, s.outputTopic, 31 + WithEnsureTopic(true), 32 + WithTopicPartitions(200), 33 + ) 34 + if err != nil { 35 + return fmt.Errorf("failed to create producer: %w", err) 36 + } 37 + defer kafProducer.Close() 38 + s.producer = kafProducer 39 + s.logger.Info("created producer") 40 + 41 + wsDialer := websocket.DefaultDialer 42 + u, err := url.Parse(s.tapHost) 43 + if err != nil { 44 + return fmt.Errorf("invalid tapHost: %w", err) 45 + } 46 + u.Path = "/channel" 47 + s.logger.Info("created dialer") 48 + 49 + wsErr := make(chan error, 1) 50 + shutdownWs := make(chan struct{}, 1) 51 + go func() { 52 + logger := s.logger.With("component", "websocket") 53 + 54 + logger.Info("subscribing to tap stream", "upstream", s.tapHost) 55 + 56 + conn, _, err := wsDialer.Dial(u.String(), http.Header{ 57 + "User-Agent": []string{"at-kafka/0.0.0"}, 58 + }) 59 + if err != nil { 60 + wsErr <- err 61 + return 62 + } 63 + 64 + // handle events! 65 + for { 66 + var evt TapEvent 67 + err := conn.ReadJSON(&evt) 68 + if err != nil { 69 + logger.Error("error reading json from websocket", "err", err) 70 + break 71 + } 72 + 73 + if err := sema.Acquire(ctx, 1); err != nil { 74 + logger.Error("error acquring sema", "err", err) 75 + break 76 + } 77 + 78 + go func() { 79 + defer sema.Release(1) 80 + s.handleTapEvent(ctx, &evt) 81 + }() 82 + } 83 + 84 + <-shutdownWs 85 + 86 + wsErr <- nil 87 + }() 88 + s.logger.Info("created tap consumer") 89 + 90 + signals := make(chan os.Signal, 1) 91 + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) 92 + 93 + select { 94 + case sig := <-signals: 95 + s.logger.Info("shutting down on signal", "signal", sig) 96 + case err := <-wsErr: 97 + if err != nil { 98 + s.logger.Error("websocket error", "err", err) 99 + } else { 100 + s.logger.Info("websocket shutdown unexpectedly") 101 + } 102 + } 103 + 104 + close(shutdownWs) 105 + 106 + return nil 107 + } 108 + 109 + func (s *Server) handleTapEvent(ctx context.Context, evt *TapEvent) error { 110 + logger := s.logger.With("component", "handleEvent") 111 + 112 + var collection string 113 + var actionName string 114 + 115 + var evtKey string 116 + var evtsToProduce [][]byte 117 + 118 + if evt.Record != nil { 119 + // key events by DID 120 + evtKey = evt.Record.Did 121 + did := evt.Record.Did 122 + kind := evt.Record.Action 123 + collection = evt.Record.Collection 124 + rkey := evt.Record.Rkey 125 + atUri := fmt.Sprintf("at://%s/%s/%s", did, collection, rkey) 126 + 127 + skip := false 128 + if len(s.watchedCollections) > 0 { 129 + skip = true 130 + for _, watchedCollection := range s.watchedCollections { 131 + if watchedCollection == collection || strings.HasPrefix(collection, watchedCollection+".") { 132 + skip = false 133 + break 134 + } 135 + } 136 + } else if len(s.ignoredCollections) > 0 { 137 + for _, ignoredCollection := range s.ignoredCollections { 138 + if ignoredCollection == collection || strings.HasPrefix(collection, ignoredCollection+".") { 139 + skip = true 140 + break 141 + } 142 + } 143 + } 144 + 145 + if skip { 146 + logger.Debug("skipping event based on collection", "collection", collection) 147 + return nil 148 + } 149 + 150 + actionName = "operation#" + kind 151 + 152 + handledEvents.WithLabelValues(actionName, collection).Inc() 153 + 154 + // create the formatted operation 155 + atkOp := AtKafkaOp{ 156 + Action: evt.Record.Action, 157 + Collection: collection, 158 + Rkey: rkey, 159 + Uri: atUri, 160 + Cid: evt.Record.Cid, 161 + Path: fmt.Sprintf("%s/%s", collection, rkey), 162 + } 163 + 164 + if evt.Record.Record != nil { 165 + atkOp.Record = *evt.Record.Record 166 + } 167 + 168 + kafkaEvt := AtKafkaEvent{ 169 + Did: did, 170 + Operation: &atkOp, 171 + } 172 + 173 + if evt.Record.Record != nil { 174 + timestamp, err := parseTimeFromRecord(collection, *evt.Record.Record, rkey) 175 + if err != nil { 176 + return fmt.Errorf("error getting timestamp from record: %w", err) 177 + } 178 + kafkaEvt.Timestamp = timestamp.Format(time.RFC3339Nano) 179 + } 180 + 181 + evtBytes, err := json.Marshal(&kafkaEvt) 182 + if err != nil { 183 + return fmt.Errorf("failed to marshal kafka event: %w", err) 184 + } 185 + 186 + evtsToProduce = append(evtsToProduce, evtBytes) 187 + } 188 + 189 + for _, evtBytes := range evtsToProduce { 190 + if err := s.produceAsync(ctx, evtKey, evtBytes); err != nil { 191 + return err 192 + } 193 + } 194 + 195 + return nil 196 + } 197 + 198 + func parseTimeFromRecord(collection string, rec map[string]any, rkey string) (*time.Time, error) { 199 + var rkeyTime time.Time 200 + if rkey != "self" { 201 + rt, err := syntax.ParseTID(rkey) 202 + if err == nil { 203 + rkeyTime = rt.Time() 204 + } 205 + } 206 + 207 + switch collection { 208 + case "app.bsky.feed.post": 209 + cat, ok := rec["createdAt"].(string) 210 + if ok { 211 + t, err := dateparse.ParseAny(cat) 212 + if err == nil { 213 + return &t, nil 214 + } 215 + 216 + if rkeyTime.IsZero() { 217 + return timePtr(time.Now()), nil 218 + } 219 + } 220 + 221 + return &rkeyTime, nil 222 + case "app.bsky.feed.repost": 223 + cat, ok := rec["createdAt"].(string) 224 + if ok { 225 + t, err := dateparse.ParseAny(cat) 226 + if err == nil { 227 + return &t, nil 228 + } 229 + 230 + if rkeyTime.IsZero() { 231 + return nil, fmt.Errorf("failed to get a useful timestamp from record") 232 + } 233 + } 234 + 235 + return &rkeyTime, nil 236 + case "app.bsky.feed.like": 237 + cat, ok := rec["createdAt"].(string) 238 + if ok { 239 + t, err := dateparse.ParseAny(cat) 240 + if err == nil { 241 + return &t, nil 242 + } 243 + 244 + if rkeyTime.IsZero() { 245 + return nil, fmt.Errorf("failed to get a useful timestamp from record") 246 + } 247 + } 248 + 249 + return &rkeyTime, nil 250 + case "app.bsky.actor.profile": 251 + // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one 252 + return timePtr(time.Now()), nil 253 + case "app.bsky.feed.generator": 254 + if !rkeyTime.IsZero() { 255 + return &rkeyTime, nil 256 + } 257 + return timePtr(time.Now()), nil 258 + default: 259 + if !rkeyTime.IsZero() { 260 + return &rkeyTime, nil 261 + } 262 + return timePtr(time.Now()), nil 263 + } 264 + } 265 + 266 + func timePtr(t time.Time) *time.Time { 267 + return &t 268 + }
+25
atkafka/types.go
··· 65 65 AccountAge int64 `json:"accountAge"` 66 66 Profile *bsky.ActorDefs_ProfileViewDetailed `json:"profile"` 67 67 } 68 + 69 + type TapEvent struct { 70 + Id int64 `json:"id"` 71 + Type string `json:"type"` 72 + Record *TapEventRecord `json:"record,omitempty"` 73 + Identity *TapEventIdentity `json:"identity,omitempty"` 74 + } 75 + 76 + type TapEventRecord struct { 77 + Live bool `json:"live"` 78 + Rev string `json:"rev"` 79 + Did string `json:"did"` 80 + Collection string `json:"collection"` 81 + Rkey string `json:"rkey"` 82 + Action string `json:"action"` 83 + Cid string `json:"cid"` 84 + Record *map[string]any `json:"record,omitempty"` 85 + } 86 + 87 + type TapEventIdentity struct { 88 + Did string `json:"did"` 89 + Handle string `json:"handle"` 90 + IsActive bool `json:"isActive"` 91 + Status string `json:"status"` 92 + }
+38
cmd/atkafka/main.go
··· 102 102 103 103 return nil 104 104 }, 105 + Commands: cli.Commands{ 106 + &cli.Command{ 107 + Name: "tap-mode", 108 + Flags: []cli.Flag{ 109 + &cli.StringFlag{ 110 + Name: "tap-host", 111 + Usage: "Tap host to subscribe to for events", 112 + Value: "ws://localhost:2480", 113 + EnvVars: []string{"ATKAFKA_TAP_HOST"}, 114 + }, 115 + }, 116 + Action: func(cmd *cli.Context) error { 117 + 118 + ctx := context.Background() 119 + 120 + telemetry.StartMetrics(cmd) 121 + logger := telemetry.StartLogger(cmd) 122 + 123 + s, err := atkafka.NewServer(&atkafka.ServerArgs{ 124 + TapHost: cmd.String("tap-host"), 125 + BootstrapServers: cmd.StringSlice("bootstrap-servers"), 126 + OutputTopic: cmd.String("output-topic"), 127 + WatchedCollections: cmd.StringSlice("watched-collections"), 128 + IgnoredCollections: cmd.StringSlice("ignored-collections"), 129 + Logger: logger, 130 + }) 131 + if err != nil { 132 + return fmt.Errorf("failed to create new server: %w", err) 133 + } 134 + 135 + if err := s.RunTapMode(ctx); err != nil { 136 + return fmt.Errorf("error running server: %w", err) 137 + } 138 + 139 + return nil 140 + }, 141 + }, 142 + }, 105 143 } 106 144 107 145 if err := app.Run(os.Args); err != nil {
+63
docker-compose.tap.yml
··· 1 + services: 2 + zookeeper: 3 + image: confluentinc/cp-zookeeper:7.6.0 4 + hostname: zookeeper 5 + container_name: zookeeper 6 + ports: 7 + - "2181:2181" 8 + environment: 9 + ZOOKEEPER_CLIENT_PORT: 2181 10 + ZOOKEEPER_TICK_TIME: 2000 11 + volumes: 12 + - zookeeper-data:/var/lib/zookeeper/data 13 + - zookeeper-logs:/var/lib/zookeeper/log 14 + 15 + kafka: 16 + image: confluentinc/cp-kafka:7.6.0 17 + hostname: kafka 18 + container_name: kafka 19 + depends_on: 20 + - zookeeper 21 + ports: 22 + - "9092:9092" 23 + - "9101:9101" 24 + environment: 25 + KAFKA_BROKER_ID: 1 26 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' 27 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT 28 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 29 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 30 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 31 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 32 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 33 + KAFKA_JMX_PORT: 9101 34 + KAFKA_JMX_HOSTNAME: localhost 35 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' 36 + volumes: 37 + - kafka-data:/var/lib/kafka/data 38 + 39 + atkafka: 40 + build: 41 + context: . 42 + dockerfile: Dockerfile 43 + container_name: atkafka 44 + depends_on: 45 + - kafka 46 + ports: 47 + - "2112:2112" 48 + environment: 49 + ATKAFKA_RELAY_HOST: "wss://bsky.network" 50 + ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" 51 + ATKAFKA_OUTPUT_TOPIC: "atproto-events" 52 + ATKAFKA_OSPREY_COMPATIBLE: "false" 53 + ATKAFKA_PLC_HOST: "https://plc.directory" 54 + 55 + ATKAFKA_WATCHED_SERVICES: "*.bsky.network" 56 + 57 + ATKAFKA_IGNORED_COLLECTIONS: "app.bsky.*" 58 + restart: unless-stopped 59 + 60 + volumes: 61 + zookeeper-data: 62 + zookeeper-logs: 63 + kafka-data:
+2 -1
go.mod
··· 3 3 go 1.25.4 4 4 5 5 require ( 6 + github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de 6 7 github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934 7 8 github.com/bluesky-social/indigo v0.0.0-20251125184450-35c1e15d2e5f 8 9 github.com/gorilla/websocket v1.5.3 ··· 11 12 github.com/prometheus/client_golang v1.23.2 12 13 github.com/twmb/franz-go v1.19.5 13 14 github.com/urfave/cli/v2 v2.25.7 15 + golang.org/x/sync v0.16.0 14 16 golang.org/x/time v0.6.0 15 17 ) 16 18 ··· 95 97 go.yaml.in/yaml/v2 v2.4.2 // indirect 96 98 golang.org/x/crypto v0.40.0 // indirect 97 99 golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect 98 - golang.org/x/sync v0.16.0 // indirect 99 100 golang.org/x/sys v0.35.0 // indirect 100 101 golang.org/x/text v0.28.0 // indirect 101 102 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
+5
go.sum
··· 3 3 github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4= 4 4 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM= 5 5 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA= 6 + github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA= 7 + github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw= 6 8 github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= 7 9 github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= 8 10 github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= ··· 200 202 github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= 201 203 github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= 202 204 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= 205 + github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= 203 206 github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= 204 207 github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= 205 208 github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= ··· 249 252 github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= 250 253 github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= 251 254 github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= 255 + github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= 252 256 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= 253 257 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= 254 258 github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= 255 259 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= 256 260 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= 257 261 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= 262 + github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= 258 263 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= 259 264 github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= 260 265 github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=