forked from
evan.jarrett.net/at-container-registry
A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
1package jetstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "strings"
10 "time"
11
12 "atcr.io/pkg/appview/db"
13 "atcr.io/pkg/atproto"
14)
15
16// Processor handles shared database operations for both Worker (live) and Backfill (sync)
17// This eliminates code duplication between the two data ingestion paths
18type Processor struct {
19 db *sql.DB
20 userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
21 statsCache *StatsCache // In-memory cache for per-hold stats aggregation
22 useCache bool
23}
24
25// NewProcessor creates a new shared processor
26// useCache: true for Worker (live streaming), false for Backfill (batch processing)
27// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing)
28func NewProcessor(database *sql.DB, useCache bool, statsCache *StatsCache) *Processor {
29 p := &Processor{
30 db: database,
31 useCache: useCache,
32 statsCache: statsCache,
33 }
34
35 if useCache {
36 p.userCache = &UserCache{
37 cache: make(map[string]*db.User),
38 }
39 }
40
41 return p
42}
43
44// EnsureUser resolves and upserts a user by DID
45// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill)
46func (p *Processor) EnsureUser(ctx context.Context, did string) error {
47 // Check cache first (if enabled)
48 if p.useCache && p.userCache != nil {
49 if _, ok := p.userCache.cache[did]; ok {
50 // User in cache - just update last seen timestamp
51 return db.UpdateUserLastSeen(p.db, did)
52 }
53 } else if !p.useCache {
54 // No cache - check if user already exists in DB
55 existingUser, err := db.GetUserByDID(p.db, did)
56 if err == nil && existingUser != nil {
57 // User exists - just update last seen timestamp
58 return db.UpdateUserLastSeen(p.db, did)
59 }
60 }
61
62 // Resolve DID to get handle and PDS endpoint
63 resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
64 if err != nil {
65 return err
66 }
67
68 // Fetch user's Bluesky profile record from their PDS (including avatar)
69 avatarURL := ""
70 client := atproto.NewClient(pdsEndpoint, "", "")
71 profileRecord, err := client.GetProfileRecord(ctx, resolvedDID)
72 if err != nil {
73 slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err)
74 // Continue without avatar
75 } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
76 avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link)
77 }
78
79 // Create user record
80 user := &db.User{
81 DID: resolvedDID,
82 Handle: handle,
83 PDSEndpoint: pdsEndpoint,
84 Avatar: avatarURL,
85 LastSeen: time.Now(),
86 }
87
88 // Cache if enabled
89 if p.useCache {
90 p.userCache.cache[did] = user
91 }
92
93 // Upsert to database
94 // Use UpsertUser if we successfully fetched an avatar (to update existing users)
95 // Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars)
96 if avatarURL != "" {
97 return db.UpsertUser(p.db, user)
98 }
99 return db.UpsertUserIgnoreAvatar(p.db, user)
100}
101
102// ProcessManifest processes a manifest record and stores it in the database
103// Returns the manifest ID for further processing (layers/references)
104func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) {
105 // Unmarshal manifest record
106 var manifestRecord atproto.ManifestRecord
107 if err := json.Unmarshal(recordData, &manifestRecord); err != nil {
108 return 0, fmt.Errorf("failed to unmarshal manifest: %w", err)
109 }
110 // Detect manifest type
111 isManifestList := len(manifestRecord.Manifests) > 0
112
113 // Extract hold DID from manifest (with fallback for legacy manifests)
114 // New manifests use holdDid field (DID format)
115 // Old manifests use holdEndpoint field (URL format) - convert to DID
116 holdDID := manifestRecord.HoldDID
117 if holdDID == "" && manifestRecord.HoldEndpoint != "" {
118 // Legacy manifest - convert URL to DID
119 holdDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint)
120 }
121
122 // Detect artifact type from config media type
123 artifactType := "container-image"
124 if !isManifestList && manifestRecord.Config != nil {
125 artifactType = db.GetArtifactType(manifestRecord.Config.MediaType)
126 }
127
128 // Prepare manifest for insertion (WITHOUT annotation fields)
129 manifest := &db.Manifest{
130 DID: did,
131 Repository: manifestRecord.Repository,
132 Digest: manifestRecord.Digest,
133 MediaType: manifestRecord.MediaType,
134 SchemaVersion: manifestRecord.SchemaVersion,
135 HoldEndpoint: holdDID,
136 ArtifactType: artifactType,
137 CreatedAt: manifestRecord.CreatedAt,
138 // Annotations removed - stored separately in repository_annotations table
139 }
140
141 // Set config fields only for image manifests (not manifest lists)
142 if !isManifestList && manifestRecord.Config != nil {
143 manifest.ConfigDigest = manifestRecord.Config.Digest
144 manifest.ConfigSize = manifestRecord.Config.Size
145 }
146
147 // Insert manifest
148 manifestID, err := db.InsertManifest(p.db, manifest)
149 if err != nil {
150 // For backfill: if manifest already exists, get its ID
151 if strings.Contains(err.Error(), "UNIQUE constraint failed") {
152 var existingID int64
153 err := p.db.QueryRow(`
154 SELECT id FROM manifests
155 WHERE did = ? AND repository = ? AND digest = ?
156 `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID)
157
158 if err != nil {
159 return 0, fmt.Errorf("failed to get existing manifest ID: %w", err)
160 }
161 manifestID = existingID
162 } else {
163 return 0, fmt.Errorf("failed to insert manifest: %w", err)
164 }
165 }
166
167 // Update repository annotations ONLY if manifest has at least one non-empty annotation
168 if manifestRecord.Annotations != nil {
169 hasData := false
170 for _, value := range manifestRecord.Annotations {
171 if value != "" {
172 hasData = true
173 break
174 }
175 }
176
177 if hasData {
178 // Replace all annotations for this repository
179 err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations)
180 if err != nil {
181 return 0, fmt.Errorf("failed to upsert annotations: %w", err)
182 }
183 }
184 }
185
186 // Insert manifest references or layers
187 if isManifestList {
188 // Insert manifest references (for manifest lists/indexes)
189 for i, ref := range manifestRecord.Manifests {
190 platformArch := ""
191 platformOS := ""
192 platformVariant := ""
193 platformOSVersion := ""
194
195 if ref.Platform != nil {
196 platformArch = ref.Platform.Architecture
197 platformOS = ref.Platform.OS
198 platformVariant = ref.Platform.Variant
199 platformOSVersion = ref.Platform.OSVersion
200 }
201
202 // Detect attestation manifests from annotations
203 isAttestation := false
204 if ref.Annotations != nil {
205 if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok {
206 isAttestation = refType == "attestation-manifest"
207 }
208 }
209
210 if err := db.InsertManifestReference(p.db, &db.ManifestReference{
211 ManifestID: manifestID,
212 Digest: ref.Digest,
213 MediaType: ref.MediaType,
214 Size: ref.Size,
215 PlatformArchitecture: platformArch,
216 PlatformOS: platformOS,
217 PlatformVariant: platformVariant,
218 PlatformOSVersion: platformOSVersion,
219 IsAttestation: isAttestation,
220 ReferenceIndex: i,
221 }); err != nil {
222 // Continue on error - reference might already exist
223 continue
224 }
225 }
226 } else {
227 // Insert layers (for image manifests)
228 for i, layer := range manifestRecord.Layers {
229 if err := db.InsertLayer(p.db, &db.Layer{
230 ManifestID: manifestID,
231 Digest: layer.Digest,
232 MediaType: layer.MediaType,
233 Size: layer.Size,
234 LayerIndex: i,
235 }); err != nil {
236 // Continue on error - layer might already exist
237 continue
238 }
239 }
240 }
241
242 return manifestID, nil
243}
244
245// ProcessTag processes a tag record and stores it in the database
246func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error {
247 // Unmarshal tag record
248 var tagRecord atproto.TagRecord
249 if err := json.Unmarshal(recordData, &tagRecord); err != nil {
250 return fmt.Errorf("failed to unmarshal tag: %w", err)
251 }
252 // Extract digest from tag record (tries manifest field first, falls back to manifestDigest)
253 manifestDigest, err := tagRecord.GetManifestDigest()
254 if err != nil {
255 return fmt.Errorf("failed to get manifest digest from tag record: %w", err)
256 }
257
258 // Insert or update tag
259 return db.UpsertTag(p.db, &db.Tag{
260 DID: did,
261 Repository: tagRecord.Repository,
262 Tag: tagRecord.Tag,
263 Digest: manifestDigest,
264 CreatedAt: tagRecord.UpdatedAt,
265 })
266}
267
268// ProcessStar processes a star record and stores it in the database
269func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error {
270 // Unmarshal star record
271 var starRecord atproto.StarRecord
272 if err := json.Unmarshal(recordData, &starRecord); err != nil {
273 return fmt.Errorf("failed to unmarshal star: %w", err)
274 }
275 // Upsert the star record (idempotent - won't duplicate)
276 // The DID here is the starrer (user who starred)
277 // The subject contains the owner DID and repository
278 // Star count will be calculated on demand from the stars table
279 return db.UpsertStar(p.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt)
280}
281
282// ProcessSailorProfile processes a sailor profile record
283// This is primarily used by backfill to cache captain records for holds
284func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error {
285 // Unmarshal sailor profile record
286 var profileRecord atproto.SailorProfileRecord
287 if err := json.Unmarshal(recordData, &profileRecord); err != nil {
288 return fmt.Errorf("failed to unmarshal sailor profile: %w", err)
289 }
290
291 // Skip if no default hold set
292 if profileRecord.DefaultHold == "" {
293 return nil
294 }
295
296 // Convert hold URL/DID to canonical DID
297 holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold)
298 if holdDID == "" {
299 slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold)
300 return nil
301 }
302
303 // Query and cache the captain record using provided function
304 // This allows backfill-specific logic (retries, test mode handling) without duplicating it here
305 if queryCaptainFn != nil {
306 return queryCaptainFn(ctx, holdDID)
307 }
308
309 return nil
310}
311
312// ProcessRepoPage processes a repository page record
313// This is called when Jetstream receives a repo page create/update event
314func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error {
315 if isDelete {
316 // Delete the repo page from our cache
317 return db.DeleteRepoPage(p.db, did, rkey)
318 }
319
320 // Unmarshal repo page record
321 var pageRecord atproto.RepoPageRecord
322 if err := json.Unmarshal(recordData, &pageRecord); err != nil {
323 return fmt.Errorf("failed to unmarshal repo page: %w", err)
324 }
325
326 // Extract avatar CID if present
327 avatarCID := ""
328 if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" {
329 avatarCID = pageRecord.Avatar.Ref.Link
330 }
331
332 // Upsert to database
333 return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.CreatedAt, pageRecord.UpdatedAt)
334}
335
336// ProcessIdentity handles identity change events (handle updates)
337// This is called when Jetstream receives an identity event indicating a handle change.
338// The identity cache is invalidated to ensure the next lookup uses the new handle,
339// and the database is updated to reflect the change in the UI.
340//
341// Only processes events for users who already exist in our database (have ATCR activity).
342func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error {
343 // Check if user exists in our database - only update if they're an ATCR user
344 user, err := db.GetUserByDID(p.db, did)
345 if err != nil {
346 return fmt.Errorf("failed to check user existence: %w", err)
347 }
348
349 // Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.)
350 if user == nil {
351 return nil
352 }
353
354 // Update handle in database
355 if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil {
356 slog.Warn("Failed to update user handle in database",
357 "component", "processor",
358 "did", did,
359 "handle", newHandle,
360 "error", err)
361 // Continue to invalidate cache even if DB update fails
362 }
363
364 // Invalidate cached identity data to force re-resolution on next lookup
365 if err := atproto.InvalidateIdentity(ctx, did); err != nil {
366 slog.Warn("Failed to invalidate identity cache",
367 "component", "processor",
368 "did", did,
369 "error", err)
370 return err
371 }
372
373 slog.Info("Processed identity change event",
374 "component", "processor",
375 "did", did,
376 "old_handle", user.Handle,
377 "new_handle", newHandle)
378
379 return nil
380}
381
382// ProcessStats handles stats record events from hold PDSes
383// This is called when Jetstream receives a stats create/update/delete event from a hold
384// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository
385func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error {
386 // Skip if no stats cache configured
387 if p.statsCache == nil {
388 return nil
389 }
390
391 // Unmarshal stats record
392 var statsRecord atproto.StatsRecord
393 if err := json.Unmarshal(recordData, &statsRecord); err != nil {
394 return fmt.Errorf("failed to unmarshal stats record: %w", err)
395 }
396
397 if isDelete {
398 // Delete from in-memory cache
399 p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository)
400 } else {
401 // Parse timestamps
402 var lastPull, lastPush *time.Time
403 if statsRecord.LastPull != "" {
404 t, err := time.Parse(time.RFC3339, statsRecord.LastPull)
405 if err == nil {
406 lastPull = &t
407 }
408 }
409 if statsRecord.LastPush != "" {
410 t, err := time.Parse(time.RFC3339, statsRecord.LastPush)
411 if err == nil {
412 lastPush = &t
413 }
414 }
415
416 // Update in-memory cache
417 p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository,
418 statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush)
419 }
420
421 // Get aggregated stats across all holds
422 totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated(
423 statsRecord.OwnerDID, statsRecord.Repository)
424
425 // Upsert aggregated stats to repository_stats
426 return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{
427 DID: statsRecord.OwnerDID,
428 Repository: statsRecord.Repository,
429 PullCount: int(totalPull),
430 PushCount: int(totalPush),
431 LastPull: latestPull,
432 LastPush: latestPush,
433 })
434}
435
436// ProcessAccount handles account status events (deactivation/deletion/etc)
437// This is called when Jetstream receives an account event indicating status changes.
438//
439// Status handling:
440// - "deleted": Account permanently deleted - remove all cached data
441// - "deactivated": Could be PDS migration or permanent - invalidate cache only
442// - "takendown": Moderation action - invalidate cache only
443// - Other: Ignore
444//
445// For "deactivated", we don't delete data because it's ambiguous:
446// - Could be permanent deactivation (user deleted account)
447// - Could be PDS migration (account moves to new PDS)
448// Cache invalidation forces re-resolution on next lookup.
449//
450// Only processes events for users who already exist in our database (have ATCR activity).
451func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error {
452 // Skip active accounts or unknown statuses
453 if active {
454 return nil
455 }
456
457 // Check if user exists in our database - only process if they're an ATCR user
458 user, err := db.GetUserByDID(p.db, did)
459 if err != nil {
460 return fmt.Errorf("failed to check user existence: %w", err)
461 }
462
463 // Skip if user doesn't exist - they don't have any ATCR activity
464 if user == nil {
465 return nil
466 }
467
468 switch status {
469 case "deleted":
470 // Account permanently deleted - remove all cached data
471 if err := db.DeleteUserData(p.db, did); err != nil {
472 slog.Error("Failed to delete user data for deleted account",
473 "component", "processor",
474 "did", did,
475 "handle", user.Handle,
476 "error", err)
477 return err
478 }
479
480 // Also invalidate identity cache
481 _ = atproto.InvalidateIdentity(ctx, did)
482
483 slog.Info("Deleted user data for deleted account",
484 "component", "processor",
485 "did", did,
486 "handle", user.Handle)
487
488 case "deactivated", "takendown":
489 // Ambiguous status - invalidate cache but keep data
490 // For deactivated: could be PDS migration, will resolve on next lookup
491 // For takendown: moderation action, keep data in case of appeal
492 if err := atproto.InvalidateIdentity(ctx, did); err != nil {
493 slog.Warn("Failed to invalidate identity cache",
494 "component", "processor",
495 "did", did,
496 "status", status,
497 "error", err)
498 return err
499 }
500
501 slog.Info("Processed account status event - cache invalidated",
502 "component", "processor",
503 "did", did,
504 "handle", user.Handle,
505 "status", status)
506
507 default:
508 // Unknown status - ignore
509 slog.Debug("Ignoring unknown account status",
510 "component", "processor",
511 "did", did,
512 "status", status)
513 }
514
515 return nil
516}