this repo has no description

add plc event metadata

Changed files
+261 -7
.github
workflows
atkafka
cmd
atkafka
+2 -2
.github/workflows/docker.yml
··· 29 29 uses: docker/setup-buildx-action@v3 30 30 31 31 - name: Log in to GitHub Container Registry 32 - if: github.event_name != 'pull_request' 32 + if: github.event_name != 'pull_request'amd 33 33 uses: docker/login-action@v3 34 34 with: 35 35 registry: ${{ env.REGISTRY }} ··· 59 59 labels: ${{ steps.meta.outputs.labels }} 60 60 cache-from: type=gha 61 61 cache-to: type=gha,mode=max 62 - platforms: linux/amd64,linux/arm64 62 + platforms: linux/amd64
+98 -3
atkafka/atkafka.go
··· 11 11 "os" 12 12 "os/signal" 13 13 "strings" 14 + "sync" 14 15 "syscall" 15 16 "time" 16 17 17 18 "github.com/bluesky-social/indigo/atproto/atdata" 19 + "github.com/bluesky-social/indigo/atproto/identity" 18 20 "github.com/bluesky-social/indigo/events" 19 21 "github.com/bluesky-social/indigo/events/schedulers/parallel" 20 22 "github.com/bluesky-social/indigo/repo" ··· 32 34 33 35 atkProducer *Producer 34 36 ospProducer *Producer 37 + 38 + plcClient *PlcClient 35 39 } 36 40 37 41 type ServerArgs struct { 38 42 RelayHost string 43 + PlcHost string 39 44 BootstrapServers []string 40 45 OutputTopic string 41 46 OspreyCompat bool ··· 47 52 args.Logger = slog.Default() 48 53 } 49 54 55 + var plcClient *PlcClient 56 + if args.PlcHost != "" { 57 + plcClient = NewPlcClient(&PlcClientArgs{ 58 + PlcHost: args.PlcHost, 59 + }) 60 + } 61 + 50 62 return &Server{ 51 63 relayHost: args.RelayHost, 64 + plcClient: plcClient, 52 65 bootstrapServers: args.BootstrapServers, 53 66 outputTopic: args.OutputTopic, 54 67 ospreyCompat: args.OspreyCompat, ··· 139 152 return nil 140 153 } 141 154 155 + type EventMetadata struct { 156 + DidDocument *identity.DIDDocument `json:"didDocument,omitempty"` 157 + PdsHost string `json:"pdsHost,omitempty"` 158 + DidCreatedAt string `json:"didCreatedAt,omitempty"` 159 + AccountAge int64 `json:"accountAge"` 160 + } 161 + 162 + func (s *Server) FetchEventMetadata(ctx context.Context, did string) (*EventMetadata, error) { 163 + var didDocument *identity.DIDDocument 164 + var pdsHost string 165 + var didCreatedAt string 166 + accountAge := int64(-1) 167 + 168 + var wg sync.WaitGroup 169 + 170 + if s.plcClient != nil { 171 + wg.Go(func() { 172 + logger := s.logger.With("component", "didDoc") 173 + doc, err := s.plcClient.GetDIDDoc(ctx, did) 174 + if err != nil { 175 + logger.Error("error fetching did doc", "did", did, "err", err) 176 + return 177 + } 178 + didDocument = doc 179 + 180 + for _, svc := range doc.Service { 181 + if svc.ID == "#atproto_pds" { 182 + pdsHost = svc.ServiceEndpoint 183 + break 184 + } 185 + } 186 + }) 187 + 188 + wg.Go(func() { 189 + logger := s.logger.With("component", "auditLog") 190 + auditLog, err := s.plcClient.GetDIDAuditLog(ctx, did) 191 + if err != nil { 192 + logger.Error("error fetching did audit log", "did", did, "err", err) 193 + return 194 + } 195 + 196 + didCreatedAt = auditLog.CreatedAt 197 + 198 + createdAt, err := time.Parse(time.RFC3339Nano, auditLog.CreatedAt) 199 + if err != nil { 200 + logger.Error("error parsing timestamp in audit log", "did", did, "timestamp", auditLog.CreatedAt, "err", err) 201 + return 202 + } 203 + 204 + accountAge = int64(time.Since(createdAt).Seconds()) 205 + }) 206 + } 207 + 208 + wg.Wait() 209 + 210 + return &EventMetadata{ 211 + DidDocument: didDocument, 212 + PdsHost: pdsHost, 213 + DidCreatedAt: didCreatedAt, 214 + AccountAge: accountAge, 215 + }, nil 216 + } 217 + 142 218 func (s *Server) handleEvent(ctx context.Context, evt *events.XRPCStreamEvent) error { 219 + dispatchCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 220 + defer cancel() 221 + 143 222 logger := s.logger.With("component", "handleEvent") 144 223 logger.Debug("event", "seq", evt.Sequence()) 145 224 ··· 214 293 Operation: &atkOp, 215 294 } 216 295 296 + eventMetadata, err := s.FetchEventMetadata(dispatchCtx, evt.RepoCommit.Repo) 297 + if err != nil { 298 + logger.Error("error fetching event metadata", "err", err) 299 + } else { 300 + kafkaEvt.Metadata = eventMetadata 301 + } 302 + 217 303 var kafkaEvtBytes []byte 218 304 if s.ospreyCompat { 219 - 220 305 // create the wrapper event for osprey 221 306 ospreyKafkaEvent := OspreyAtKafkaEvent{ 222 307 Data: OspreyEventData{ ··· 293 378 return fmt.Errorf("unhandled event received") 294 379 } 295 380 381 + if did != "" { 382 + eventMetadata, err := s.FetchEventMetadata(dispatchCtx, did) 383 + if err != nil { 384 + logger.Error("error fetching event metadata", "err", err) 385 + } else { 386 + kafkaEvt.Metadata = eventMetadata 387 + } 388 + } 389 + 296 390 // create the kafka event bytes 297 391 var kafkaEvtBytes []byte 298 392 var err error ··· 384 478 } 385 479 386 480 type AtKafkaEvent struct { 387 - Did string `json:"did"` 388 - Timestamp string `json:"timestamp"` 481 + Did string `json:"did"` 482 + Timestamp string `json:"timestamp"` 483 + Metadata *EventMetadata `json:"eventMetadata"` 389 484 390 485 Operation *AtKafkaOp `json:"operation,omitempty"` 391 486 Account *AtKafkaAccount `json:"account,omitempty"`
+153
atkafka/plc.go
··· 1 + package atkafka 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "fmt" 8 + "io" 9 + "net" 10 + "net/http" 11 + "time" 12 + 13 + "github.com/bluesky-social/indigo/atproto/identity" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/bluesky-social/indigo/pkg/robusthttp" 16 + lru "github.com/hashicorp/golang-lru/v2/expirable" 17 + "golang.org/x/time/rate" 18 + ) 19 + 20 + type PlcClient struct { 21 + client *http.Client 22 + dir *identity.BaseDirectory 23 + plcHost string 24 + docCache *lru.LRU[string, *identity.DIDDocument] 25 + auditCache *lru.LRU[string, *DidAuditEntry] 26 + } 27 + 28 + type PlcClientArgs struct { 29 + PlcHost string 30 + } 31 + 32 + func NewPlcClient(args *PlcClientArgs) *PlcClient { 33 + client := robusthttp.NewClient(robusthttp.WithMaxRetries(2)) 34 + client.Timeout = 3 * time.Second 35 + 36 + baseDir := identity.BaseDirectory{ 37 + PLCURL: args.PlcHost, 38 + PLCLimiter: rate.NewLimiter(rate.Limit(200), 100), 39 + HTTPClient: *client, 40 + Resolver: net.Resolver{ 41 + PreferGo: true, 42 + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { 43 + dialer := net.Dialer{Timeout: time.Second * 5} 44 + nameserver := address 45 + return dialer.DialContext(ctx, network, nameserver) 46 + }, 47 + }, 48 + TryAuthoritativeDNS: true, 49 + // primary Bluesky PDS instance only supports HTTP resolution method 50 + SkipDNSDomainSuffixes: []string{".bsky.social"}, 51 + } 52 + 53 + docCache := lru.NewLRU[string, *identity.DIDDocument](100_000, nil, 5*time.Minute) 54 + 55 + auditCache := lru.NewLRU[string, *DidAuditEntry](100_000, nil, 1*time.Hour) 56 + 57 + return &PlcClient{ 58 + client: client, 59 + dir: &baseDir, 60 + plcHost: args.PlcHost, 61 + docCache: docCache, 62 + auditCache: auditCache, 63 + } 64 + } 65 + 66 + type OperationService struct { 67 + Type string `json:"type"` 68 + Endpoint string `json:"endpoint"` 69 + } 70 + 71 + type DidLogEntry struct { 72 + Sig string `json:"sig"` 73 + Prev *string `json:"prev"` 74 + Type string `json:"type"` 75 + Services map[string]OperationService `json:"services"` 76 + AlsoKnownAs []string `json:"alsoKnownAs"` 77 + RotationKeys []string `json:"rotationKeys"` 78 + VerificationMethods map[string]string `json:"verificationMethods"` 79 + } 80 + 81 + type DidAuditEntry struct { 82 + Did string `json:"did"` 83 + Operation DidLogEntry `json:"operation"` 84 + Cid string `json:"cid"` 85 + Nullified bool `json:"nullified"` 86 + CreatedAt string `json:"createdAt"` 87 + } 88 + 89 + type DidAuditLog []DidAuditEntry 90 + 91 + func (c *PlcClient) GetDIDDoc(ctx context.Context, did string) (*identity.DIDDocument, error) { 92 + if val, ok := c.docCache.Get(did); ok { 93 + return val, nil 94 + } 95 + 96 + didDoc, err := c.dir.ResolveDID(ctx, syntax.DID(did)) 97 + if err != nil { 98 + return nil, fmt.Errorf("failed to lookup DID: %w", err) 99 + } 100 + 101 + if didDoc == nil { 102 + return nil, fmt.Errorf("DID Document not found") 103 + } 104 + 105 + c.docCache.Add(did, didDoc) 106 + 107 + return didDoc, nil 108 + } 109 + 110 + var ErrAuditLogNotFound = errors.New("audit log not found for DID") 111 + 112 + func (c *PlcClient) GetDIDAuditLog(ctx context.Context, did string) (*DidAuditEntry, error) { 113 + if val, ok := c.auditCache.Get(did); ok { 114 + return val, nil 115 + } 116 + 117 + ustr := fmt.Sprintf("%s/%s/log/audit", c.plcHost, did) 118 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, ustr, nil) 119 + if err != nil { 120 + return nil, fmt.Errorf("failed to create http request for DID audit log: %w", err) 121 + } 122 + 123 + resp, err := c.client.Do(req) 124 + if err != nil { 125 + return nil, fmt.Errorf("failed to fetch DID audit log: %w", err) 126 + } 127 + defer resp.Body.Close() 128 + 129 + if resp.StatusCode != http.StatusOK { 130 + io.Copy(io.Discard, resp.Body) 131 + if resp.StatusCode == http.StatusNotFound { 132 + return nil, ErrAuditLogNotFound 133 + } 134 + return nil, fmt.Errorf("DID audit log fetch returned unexpected status: %s", resp.Status) 135 + } 136 + 137 + var entries []DidAuditEntry 138 + if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil { 139 + return nil, fmt.Errorf("failed to read DID audit log response bytes into DidAuditLog: %w", err) 140 + } 141 + 142 + if len(entries) == 0 { 143 + return nil, fmt.Errorf("empty did audit log for did %s", did) 144 + } 145 + 146 + entry := entries[0] 147 + 148 + if c.auditCache != nil { 149 + c.auditCache.Add(did, &entry) 150 + } 151 + 152 + return &entry, nil 153 + }
+5
cmd/atkafka/main.go
··· 36 36 EnvVars: []string{"ATKAFKA_OSPREY_COMPATIBLE"}, 37 37 Value: false, 38 38 }, 39 + &cli.StringFlag{ 40 + Name: "plc-host", 41 + EnvVars: []string{"ATKAFKA_PLC_HOST"}, 42 + }, 39 43 }, 40 44 Action: func(cmd *cli.Context) error { 41 45 ctx := context.Background() ··· 48 52 BootstrapServers: cmd.StringSlice("bootstrap-servers"), 49 53 OutputTopic: cmd.String("output-topic"), 50 54 OspreyCompat: cmd.Bool("osprey-compatible"), 55 + PlcHost: cmd.String("plc-host"), 51 56 Logger: logger, 52 57 }) 53 58
+1
docker-compose.yml
··· 50 50 ATKAFKA_BOOTSTRAP_SERVERS: "kafka:29092" 51 51 ATKAFKA_OUTPUT_TOPIC: "atproto-events" 52 52 ATKAFKA_OSPREY_COMPATIBLE: "false" 53 + ATKAFKA_PLC_HOST: "https://plc.directory" 53 54 restart: unless-stopped 54 55 55 56 volumes:
+2 -2
go.mod
··· 6 6 github.com/bluesky-social/go-util v0.0.0-20251012040650-2ebbf57f5934 7 7 github.com/bluesky-social/indigo v0.0.0-20251125184450-35c1e15d2e5f 8 8 github.com/gorilla/websocket v1.5.3 9 + github.com/hashicorp/golang-lru/v2 v2.0.7 9 10 github.com/joho/godotenv v1.5.1 10 11 github.com/prometheus/client_golang v1.23.2 11 12 github.com/twmb/franz-go v1.19.5 12 13 github.com/urfave/cli/v2 v2.25.7 14 + golang.org/x/time v0.6.0 13 15 ) 14 16 15 17 require ( ··· 29 31 github.com/hashicorp/go-cleanhttp v0.5.2 // indirect 30 32 github.com/hashicorp/go-retryablehttp v0.7.7 // indirect 31 33 github.com/hashicorp/golang-lru v1.0.2 // indirect 32 - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 33 34 github.com/ipfs/bbloom v0.0.4 // indirect 34 35 github.com/ipfs/go-block-format v0.2.0 // indirect 35 36 github.com/ipfs/go-blockservice v0.5.2 // indirect ··· 97 98 golang.org/x/sync v0.16.0 // indirect 98 99 golang.org/x/sys v0.35.0 // indirect 99 100 golang.org/x/text v0.28.0 // indirect 100 - golang.org/x/time v0.6.0 // indirect 101 101 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect 102 102 google.golang.org/protobuf v1.36.9 // indirect 103 103 gopkg.in/inf.v0 v0.9.1 // indirect