A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
at codeberg-source 516 lines 17 kB view raw
1package jetstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "strings" 10 "time" 11 12 "atcr.io/pkg/appview/db" 13 "atcr.io/pkg/atproto" 14) 15 16// Processor handles shared database operations for both Worker (live) and Backfill (sync) 17// This eliminates code duplication between the two data ingestion paths 18type Processor struct { 19 db *sql.DB 20 userCache *UserCache // Optional - enabled for Worker, disabled for Backfill 21 statsCache *StatsCache // In-memory cache for per-hold stats aggregation 22 useCache bool 23} 24 25// NewProcessor creates a new shared processor 26// useCache: true for Worker (live streaming), false for Backfill (batch processing) 27// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing) 28func NewProcessor(database *sql.DB, useCache bool, statsCache *StatsCache) *Processor { 29 p := &Processor{ 30 db: database, 31 useCache: useCache, 32 statsCache: statsCache, 33 } 34 35 if useCache { 36 p.userCache = &UserCache{ 37 cache: make(map[string]*db.User), 38 } 39 } 40 41 return p 42} 43 44// EnsureUser resolves and upserts a user by DID 45// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill) 46func (p *Processor) EnsureUser(ctx context.Context, did string) error { 47 // Check cache first (if enabled) 48 if p.useCache && p.userCache != nil { 49 if _, ok := p.userCache.cache[did]; ok { 50 // User in cache - just update last seen timestamp 51 return db.UpdateUserLastSeen(p.db, did) 52 } 53 } else if !p.useCache { 54 // No cache - check if user already exists in DB 55 existingUser, err := db.GetUserByDID(p.db, did) 56 if err == nil && existingUser != nil { 57 // User exists - just update last seen timestamp 58 return db.UpdateUserLastSeen(p.db, did) 59 } 60 } 61 62 // Resolve DID to get handle and PDS endpoint 63 resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did) 64 if err != nil { 65 return err 66 } 67 68 // Fetch user's Bluesky profile record from their PDS (including avatar) 69 avatarURL := "" 70 client := atproto.NewClient(pdsEndpoint, "", "") 71 profileRecord, err := client.GetProfileRecord(ctx, resolvedDID) 72 if err != nil { 73 slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err) 74 // Continue without avatar 75 } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" { 76 avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link) 77 } 78 79 // Create user record 80 user := &db.User{ 81 DID: resolvedDID, 82 Handle: handle, 83 PDSEndpoint: pdsEndpoint, 84 Avatar: avatarURL, 85 LastSeen: time.Now(), 86 } 87 88 // Cache if enabled 89 if p.useCache { 90 p.userCache.cache[did] = user 91 } 92 93 // Upsert to database 94 // Use UpsertUser if we successfully fetched an avatar (to update existing users) 95 // Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars) 96 if avatarURL != "" { 97 return db.UpsertUser(p.db, user) 98 } 99 return db.UpsertUserIgnoreAvatar(p.db, user) 100} 101 102// ProcessManifest processes a manifest record and stores it in the database 103// Returns the manifest ID for further processing (layers/references) 104func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) { 105 // Unmarshal manifest record 106 var manifestRecord atproto.ManifestRecord 107 if err := json.Unmarshal(recordData, &manifestRecord); err != nil { 108 return 0, fmt.Errorf("failed to unmarshal manifest: %w", err) 109 } 110 // Detect manifest type 111 isManifestList := len(manifestRecord.Manifests) > 0 112 113 // Extract hold DID from manifest (with fallback for legacy manifests) 114 // New manifests use holdDid field (DID format) 115 // Old manifests use holdEndpoint field (URL format) - convert to DID 116 holdDID := manifestRecord.HoldDID 117 if holdDID == "" && manifestRecord.HoldEndpoint != "" { 118 // Legacy manifest - convert URL to DID 119 holdDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint) 120 } 121 122 // Detect artifact type from config media type 123 artifactType := "container-image" 124 if !isManifestList && manifestRecord.Config != nil { 125 artifactType = db.GetArtifactType(manifestRecord.Config.MediaType) 126 } 127 128 // Prepare manifest for insertion (WITHOUT annotation fields) 129 manifest := &db.Manifest{ 130 DID: did, 131 Repository: manifestRecord.Repository, 132 Digest: manifestRecord.Digest, 133 MediaType: manifestRecord.MediaType, 134 SchemaVersion: manifestRecord.SchemaVersion, 135 HoldEndpoint: holdDID, 136 ArtifactType: artifactType, 137 CreatedAt: manifestRecord.CreatedAt, 138 // Annotations removed - stored separately in repository_annotations table 139 } 140 141 // Set config fields only for image manifests (not manifest lists) 142 if !isManifestList && manifestRecord.Config != nil { 143 manifest.ConfigDigest = manifestRecord.Config.Digest 144 manifest.ConfigSize = manifestRecord.Config.Size 145 } 146 147 // Insert manifest 148 manifestID, err := db.InsertManifest(p.db, manifest) 149 if err != nil { 150 // For backfill: if manifest already exists, get its ID 151 if strings.Contains(err.Error(), "UNIQUE constraint failed") { 152 var existingID int64 153 err := p.db.QueryRow(` 154 SELECT id FROM manifests 155 WHERE did = ? AND repository = ? AND digest = ? 156 `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID) 157 158 if err != nil { 159 return 0, fmt.Errorf("failed to get existing manifest ID: %w", err) 160 } 161 manifestID = existingID 162 } else { 163 return 0, fmt.Errorf("failed to insert manifest: %w", err) 164 } 165 } 166 167 // Update repository annotations ONLY if manifest has at least one non-empty annotation 168 if manifestRecord.Annotations != nil { 169 hasData := false 170 for _, value := range manifestRecord.Annotations { 171 if value != "" { 172 hasData = true 173 break 174 } 175 } 176 177 if hasData { 178 // Replace all annotations for this repository 179 err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations) 180 if err != nil { 181 return 0, fmt.Errorf("failed to upsert annotations: %w", err) 182 } 183 } 184 } 185 186 // Insert manifest references or layers 187 if isManifestList { 188 // Insert manifest references (for manifest lists/indexes) 189 for i, ref := range manifestRecord.Manifests { 190 platformArch := "" 191 platformOS := "" 192 platformVariant := "" 193 platformOSVersion := "" 194 195 if ref.Platform != nil { 196 platformArch = ref.Platform.Architecture 197 platformOS = ref.Platform.OS 198 platformVariant = ref.Platform.Variant 199 platformOSVersion = ref.Platform.OSVersion 200 } 201 202 // Detect attestation manifests from annotations 203 isAttestation := false 204 if ref.Annotations != nil { 205 if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok { 206 isAttestation = refType == "attestation-manifest" 207 } 208 } 209 210 if err := db.InsertManifestReference(p.db, &db.ManifestReference{ 211 ManifestID: manifestID, 212 Digest: ref.Digest, 213 MediaType: ref.MediaType, 214 Size: ref.Size, 215 PlatformArchitecture: platformArch, 216 PlatformOS: platformOS, 217 PlatformVariant: platformVariant, 218 PlatformOSVersion: platformOSVersion, 219 IsAttestation: isAttestation, 220 ReferenceIndex: i, 221 }); err != nil { 222 // Continue on error - reference might already exist 223 continue 224 } 225 } 226 } else { 227 // Insert layers (for image manifests) 228 for i, layer := range manifestRecord.Layers { 229 if err := db.InsertLayer(p.db, &db.Layer{ 230 ManifestID: manifestID, 231 Digest: layer.Digest, 232 MediaType: layer.MediaType, 233 Size: layer.Size, 234 LayerIndex: i, 235 }); err != nil { 236 // Continue on error - layer might already exist 237 continue 238 } 239 } 240 } 241 242 return manifestID, nil 243} 244 245// ProcessTag processes a tag record and stores it in the database 246func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error { 247 // Unmarshal tag record 248 var tagRecord atproto.TagRecord 249 if err := json.Unmarshal(recordData, &tagRecord); err != nil { 250 return fmt.Errorf("failed to unmarshal tag: %w", err) 251 } 252 // Extract digest from tag record (tries manifest field first, falls back to manifestDigest) 253 manifestDigest, err := tagRecord.GetManifestDigest() 254 if err != nil { 255 return fmt.Errorf("failed to get manifest digest from tag record: %w", err) 256 } 257 258 // Insert or update tag 259 return db.UpsertTag(p.db, &db.Tag{ 260 DID: did, 261 Repository: tagRecord.Repository, 262 Tag: tagRecord.Tag, 263 Digest: manifestDigest, 264 CreatedAt: tagRecord.UpdatedAt, 265 }) 266} 267 268// ProcessStar processes a star record and stores it in the database 269func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error { 270 // Unmarshal star record 271 var starRecord atproto.StarRecord 272 if err := json.Unmarshal(recordData, &starRecord); err != nil { 273 return fmt.Errorf("failed to unmarshal star: %w", err) 274 } 275 // Upsert the star record (idempotent - won't duplicate) 276 // The DID here is the starrer (user who starred) 277 // The subject contains the owner DID and repository 278 // Star count will be calculated on demand from the stars table 279 return db.UpsertStar(p.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt) 280} 281 282// ProcessSailorProfile processes a sailor profile record 283// This is primarily used by backfill to cache captain records for holds 284func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error { 285 // Unmarshal sailor profile record 286 var profileRecord atproto.SailorProfileRecord 287 if err := json.Unmarshal(recordData, &profileRecord); err != nil { 288 return fmt.Errorf("failed to unmarshal sailor profile: %w", err) 289 } 290 291 // Skip if no default hold set 292 if profileRecord.DefaultHold == "" { 293 return nil 294 } 295 296 // Convert hold URL/DID to canonical DID 297 holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold) 298 if holdDID == "" { 299 slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold) 300 return nil 301 } 302 303 // Query and cache the captain record using provided function 304 // This allows backfill-specific logic (retries, test mode handling) without duplicating it here 305 if queryCaptainFn != nil { 306 return queryCaptainFn(ctx, holdDID) 307 } 308 309 return nil 310} 311 312// ProcessRepoPage processes a repository page record 313// This is called when Jetstream receives a repo page create/update event 314func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error { 315 if isDelete { 316 // Delete the repo page from our cache 317 return db.DeleteRepoPage(p.db, did, rkey) 318 } 319 320 // Unmarshal repo page record 321 var pageRecord atproto.RepoPageRecord 322 if err := json.Unmarshal(recordData, &pageRecord); err != nil { 323 return fmt.Errorf("failed to unmarshal repo page: %w", err) 324 } 325 326 // Extract avatar CID if present 327 avatarCID := "" 328 if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" { 329 avatarCID = pageRecord.Avatar.Ref.Link 330 } 331 332 // Upsert to database 333 return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.CreatedAt, pageRecord.UpdatedAt) 334} 335 336// ProcessIdentity handles identity change events (handle updates) 337// This is called when Jetstream receives an identity event indicating a handle change. 338// The identity cache is invalidated to ensure the next lookup uses the new handle, 339// and the database is updated to reflect the change in the UI. 340// 341// Only processes events for users who already exist in our database (have ATCR activity). 342func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error { 343 // Check if user exists in our database - only update if they're an ATCR user 344 user, err := db.GetUserByDID(p.db, did) 345 if err != nil { 346 return fmt.Errorf("failed to check user existence: %w", err) 347 } 348 349 // Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.) 350 if user == nil { 351 return nil 352 } 353 354 // Update handle in database 355 if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil { 356 slog.Warn("Failed to update user handle in database", 357 "component", "processor", 358 "did", did, 359 "handle", newHandle, 360 "error", err) 361 // Continue to invalidate cache even if DB update fails 362 } 363 364 // Invalidate cached identity data to force re-resolution on next lookup 365 if err := atproto.InvalidateIdentity(ctx, did); err != nil { 366 slog.Warn("Failed to invalidate identity cache", 367 "component", "processor", 368 "did", did, 369 "error", err) 370 return err 371 } 372 373 slog.Info("Processed identity change event", 374 "component", "processor", 375 "did", did, 376 "old_handle", user.Handle, 377 "new_handle", newHandle) 378 379 return nil 380} 381 382// ProcessStats handles stats record events from hold PDSes 383// This is called when Jetstream receives a stats create/update/delete event from a hold 384// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository 385func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error { 386 // Skip if no stats cache configured 387 if p.statsCache == nil { 388 return nil 389 } 390 391 // Unmarshal stats record 392 var statsRecord atproto.StatsRecord 393 if err := json.Unmarshal(recordData, &statsRecord); err != nil { 394 return fmt.Errorf("failed to unmarshal stats record: %w", err) 395 } 396 397 if isDelete { 398 // Delete from in-memory cache 399 p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository) 400 } else { 401 // Parse timestamps 402 var lastPull, lastPush *time.Time 403 if statsRecord.LastPull != "" { 404 t, err := time.Parse(time.RFC3339, statsRecord.LastPull) 405 if err == nil { 406 lastPull = &t 407 } 408 } 409 if statsRecord.LastPush != "" { 410 t, err := time.Parse(time.RFC3339, statsRecord.LastPush) 411 if err == nil { 412 lastPush = &t 413 } 414 } 415 416 // Update in-memory cache 417 p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository, 418 statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush) 419 } 420 421 // Get aggregated stats across all holds 422 totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated( 423 statsRecord.OwnerDID, statsRecord.Repository) 424 425 // Upsert aggregated stats to repository_stats 426 return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{ 427 DID: statsRecord.OwnerDID, 428 Repository: statsRecord.Repository, 429 PullCount: int(totalPull), 430 PushCount: int(totalPush), 431 LastPull: latestPull, 432 LastPush: latestPush, 433 }) 434} 435 436// ProcessAccount handles account status events (deactivation/deletion/etc) 437// This is called when Jetstream receives an account event indicating status changes. 438// 439// Status handling: 440// - "deleted": Account permanently deleted - remove all cached data 441// - "deactivated": Could be PDS migration or permanent - invalidate cache only 442// - "takendown": Moderation action - invalidate cache only 443// - Other: Ignore 444// 445// For "deactivated", we don't delete data because it's ambiguous: 446// - Could be permanent deactivation (user deleted account) 447// - Could be PDS migration (account moves to new PDS) 448// Cache invalidation forces re-resolution on next lookup. 449// 450// Only processes events for users who already exist in our database (have ATCR activity). 451func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error { 452 // Skip active accounts or unknown statuses 453 if active { 454 return nil 455 } 456 457 // Check if user exists in our database - only process if they're an ATCR user 458 user, err := db.GetUserByDID(p.db, did) 459 if err != nil { 460 return fmt.Errorf("failed to check user existence: %w", err) 461 } 462 463 // Skip if user doesn't exist - they don't have any ATCR activity 464 if user == nil { 465 return nil 466 } 467 468 switch status { 469 case "deleted": 470 // Account permanently deleted - remove all cached data 471 if err := db.DeleteUserData(p.db, did); err != nil { 472 slog.Error("Failed to delete user data for deleted account", 473 "component", "processor", 474 "did", did, 475 "handle", user.Handle, 476 "error", err) 477 return err 478 } 479 480 // Also invalidate identity cache 481 _ = atproto.InvalidateIdentity(ctx, did) 482 483 slog.Info("Deleted user data for deleted account", 484 "component", "processor", 485 "did", did, 486 "handle", user.Handle) 487 488 case "deactivated", "takendown": 489 // Ambiguous status - invalidate cache but keep data 490 // For deactivated: could be PDS migration, will resolve on next lookup 491 // For takendown: moderation action, keep data in case of appeal 492 if err := atproto.InvalidateIdentity(ctx, did); err != nil { 493 slog.Warn("Failed to invalidate identity cache", 494 "component", "processor", 495 "did", did, 496 "status", status, 497 "error", err) 498 return err 499 } 500 501 slog.Info("Processed account status event - cache invalidated", 502 "component", "processor", 503 "did", did, 504 "handle", user.Handle, 505 "status", status) 506 507 default: 508 // Unknown status - ignore 509 slog.Debug("Ignoring unknown account status", 510 "component", "processor", 511 "did", did, 512 "status", status) 513 } 514 515 return nil 516}