1package cdn
2
3import (
4 "context"
5 "time"
6
7 "github.com/bluesky-social/indigo/atproto/atdata"
8 vyletkafka "github.com/vylet-app/go/bus/proto"
9 vyletdatabase "github.com/vylet-app/go/database/proto"
10 "google.golang.org/protobuf/types/known/timestamppb"
11)
12
13func (s *Server) handleEvent(ctx context.Context, evt *vyletkafka.FirehoseEvent) error {
14 if evt.Commit != nil {
15 return s.handleCommit(ctx, evt)
16 }
17
18 return nil
19}
20
21func (s *Server) handleCommit(ctx context.Context, evt *vyletkafka.FirehoseEvent) error {
22 logger := s.logger.With("did", evt.Did, "collection", evt.Commit.Collection, "rkey", evt.Commit.Rkey)
23
24 op := evt.Commit
25 switch op.Operation {
26 case vyletkafka.CommitOperation_COMMIT_OPERATION_CREATE, vyletkafka.CommitOperation_COMMIT_OPERATION_UPDATE:
27 // Track record processing
28 recordsProcessed.WithLabelValues(op.Operation.String()).Inc()
29
30 rec, err := atdata.UnmarshalJSON(op.Record)
31 if err != nil {
32 logger.Error("failed to unmarshal record JSON", "err", err)
33 return nil // Don't fail the event processing, just skip blob extraction
34 }
35
36 // Extract blobs from the record
37 blobs := atdata.ExtractBlobs(rec)
38
39 if len(blobs) == 0 {
40 // No blobs in this record, nothing to do
41 return nil
42 }
43
44 // Track blobs extracted
45 blobsExtracted.Add(float64(len(blobs)))
46 logger.Debug("extracted blobs from record", "count", len(blobs))
47
48 // Store each blob reference in the database
49 now := time.Now().UTC()
50 for _, blob := range blobs {
51 cid := blob.Ref.String()
52
53 // Check if blob ref already exists
54 getResp, err := s.db.BlobRef.GetBlobRef(ctx, &vyletdatabase.GetBlobRefRequest{
55 Did: evt.Did,
56 Cid: cid,
57 })
58 if err != nil {
59 logger.Error("failed to check if blob ref exists", "cid", cid, "err", err)
60 continue
61 }
62
63 // If blob ref doesn't exist, create it
64 if getResp.Error != nil {
65 createResp, err := s.db.BlobRef.CreateBlobRef(ctx, &vyletdatabase.CreateBlobRefRequest{
66 BlobRef: &vyletdatabase.BlobRef{
67 Did: evt.Did,
68 Cid: cid,
69 FirstSeenAt: timestamppb.New(now),
70 TakenDown: false,
71 },
72 })
73 if err != nil {
74 dbOperations.WithLabelValues("create", "error").Inc()
75 logger.Error("failed to create blob ref", "cid", cid, "err", err)
76 continue
77 }
78 if createResp.Error != nil {
79 dbOperations.WithLabelValues("create", "error").Inc()
80 logger.Error("error creating blob ref", "cid", cid, "error", *createResp.Error)
81 continue
82 }
83
84 dbOperations.WithLabelValues("create", "success").Inc()
85 logger.Debug("created blob ref", "cid", cid)
86 } else {
87 // Blob ref already exists, update the updated_at timestamp
88 updateResp, err := s.db.BlobRef.UpdateBlobRef(ctx, &vyletdatabase.UpdateBlobRefRequest{
89 BlobRef: &vyletdatabase.BlobRef{
90 Did: evt.Did,
91 Cid: cid,
92 TakenDown: getResp.BlobRef.TakenDown,
93 },
94 })
95 if err != nil {
96 dbOperations.WithLabelValues("update", "error").Inc()
97 logger.Error("failed to update blob ref", "cid", cid, "err", err)
98 continue
99 }
100 if updateResp.Error != nil {
101 dbOperations.WithLabelValues("update", "error").Inc()
102 logger.Error("error updating blob ref", "cid", cid, "error", *updateResp.Error)
103 continue
104 }
105
106 dbOperations.WithLabelValues("update", "success").Inc()
107 logger.Debug("updated blob ref", "cid", cid)
108 }
109 }
110
111 case vyletkafka.CommitOperation_COMMIT_OPERATION_DELETE:
112 // For deletes, we don't remove blob refs since they might be referenced elsewhere
113 // We could potentially track reference counts in the future
114 return nil
115 }
116
117 return nil
118}