this repo has no description
at main 3.6 kB view raw
1package cdn 2 3import ( 4 "context" 5 "time" 6 7 "github.com/bluesky-social/indigo/atproto/atdata" 8 vyletkafka "github.com/vylet-app/go/bus/proto" 9 vyletdatabase "github.com/vylet-app/go/database/proto" 10 "google.golang.org/protobuf/types/known/timestamppb" 11) 12 13func (s *Server) handleEvent(ctx context.Context, evt *vyletkafka.FirehoseEvent) error { 14 if evt.Commit != nil { 15 return s.handleCommit(ctx, evt) 16 } 17 18 return nil 19} 20 21func (s *Server) handleCommit(ctx context.Context, evt *vyletkafka.FirehoseEvent) error { 22 logger := s.logger.With("did", evt.Did, "collection", evt.Commit.Collection, "rkey", evt.Commit.Rkey) 23 24 op := evt.Commit 25 switch op.Operation { 26 case vyletkafka.CommitOperation_COMMIT_OPERATION_CREATE, vyletkafka.CommitOperation_COMMIT_OPERATION_UPDATE: 27 // Track record processing 28 recordsProcessed.WithLabelValues(op.Operation.String()).Inc() 29 30 rec, err := atdata.UnmarshalJSON(op.Record) 31 if err != nil { 32 logger.Error("failed to unmarshal record JSON", "err", err) 33 return nil // Don't fail the event processing, just skip blob extraction 34 } 35 36 // Extract blobs from the record 37 blobs := atdata.ExtractBlobs(rec) 38 39 if len(blobs) == 0 { 40 // No blobs in this record, nothing to do 41 return nil 42 } 43 44 // Track blobs extracted 45 blobsExtracted.Add(float64(len(blobs))) 46 logger.Debug("extracted blobs from record", "count", len(blobs)) 47 48 // Store each blob reference in the database 49 now := time.Now().UTC() 50 for _, blob := range blobs { 51 cid := blob.Ref.String() 52 53 // Check if blob ref already exists 54 getResp, err := s.db.BlobRef.GetBlobRef(ctx, &vyletdatabase.GetBlobRefRequest{ 55 Did: evt.Did, 56 Cid: cid, 57 }) 58 if err != nil { 59 logger.Error("failed to check if blob ref exists", "cid", cid, "err", err) 60 continue 61 } 62 63 // If blob ref doesn't exist, create it 64 if getResp.Error != nil { 65 createResp, err := s.db.BlobRef.CreateBlobRef(ctx, &vyletdatabase.CreateBlobRefRequest{ 66 BlobRef: &vyletdatabase.BlobRef{ 67 Did: evt.Did, 68 Cid: cid, 69 FirstSeenAt: timestamppb.New(now), 70 TakenDown: false, 71 }, 72 }) 73 if err != nil { 74 dbOperations.WithLabelValues("create", "error").Inc() 75 logger.Error("failed to create blob ref", "cid", cid, "err", err) 76 continue 77 } 78 if createResp.Error != nil { 79 dbOperations.WithLabelValues("create", "error").Inc() 80 logger.Error("error creating blob ref", "cid", cid, "error", *createResp.Error) 81 continue 82 } 83 84 dbOperations.WithLabelValues("create", "success").Inc() 85 logger.Debug("created blob ref", "cid", cid) 86 } else { 87 // Blob ref already exists, update the updated_at timestamp 88 updateResp, err := s.db.BlobRef.UpdateBlobRef(ctx, &vyletdatabase.UpdateBlobRefRequest{ 89 BlobRef: &vyletdatabase.BlobRef{ 90 Did: evt.Did, 91 Cid: cid, 92 TakenDown: getResp.BlobRef.TakenDown, 93 }, 94 }) 95 if err != nil { 96 dbOperations.WithLabelValues("update", "error").Inc() 97 logger.Error("failed to update blob ref", "cid", cid, "err", err) 98 continue 99 } 100 if updateResp.Error != nil { 101 dbOperations.WithLabelValues("update", "error").Inc() 102 logger.Error("error updating blob ref", "cid", cid, "error", *updateResp.Error) 103 continue 104 } 105 106 dbOperations.WithLabelValues("update", "success").Inc() 107 logger.Debug("updated blob ref", "cid", cid) 108 } 109 } 110 111 case vyletkafka.CommitOperation_COMMIT_OPERATION_DELETE: 112 // For deletes, we don't remove blob refs since they might be referenced elsewhere 113 // We could potentially track reference counts in the future 114 return nil 115 } 116 117 return nil 118}