package jetstream import ( "context" "database/sql" "encoding/json" "fmt" "log/slog" "strings" "time" "atcr.io/pkg/appview/db" "atcr.io/pkg/atproto" ) // Processor handles shared database operations for both Worker (live) and Backfill (sync) // This eliminates code duplication between the two data ingestion paths type Processor struct { db *sql.DB userCache *UserCache // Optional - enabled for Worker, disabled for Backfill statsCache *StatsCache // In-memory cache for per-hold stats aggregation useCache bool } // NewProcessor creates a new shared processor // useCache: true for Worker (live streaming), false for Backfill (batch processing) // statsCache: shared stats cache for aggregating across holds (nil to skip stats processing) func NewProcessor(database *sql.DB, useCache bool, statsCache *StatsCache) *Processor { p := &Processor{ db: database, useCache: useCache, statsCache: statsCache, } if useCache { p.userCache = &UserCache{ cache: make(map[string]*db.User), } } return p } // EnsureUser resolves and upserts a user by DID // Uses cache if enabled (Worker), queries DB if cache disabled (Backfill) func (p *Processor) EnsureUser(ctx context.Context, did string) error { // Check cache first (if enabled) if p.useCache && p.userCache != nil { if _, ok := p.userCache.cache[did]; ok { // User in cache - just update last seen timestamp return db.UpdateUserLastSeen(p.db, did) } } else if !p.useCache { // No cache - check if user already exists in DB existingUser, err := db.GetUserByDID(p.db, did) if err == nil && existingUser != nil { // User exists - just update last seen timestamp return db.UpdateUserLastSeen(p.db, did) } } // Resolve DID to get handle and PDS endpoint resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did) if err != nil { return err } // Fetch user's Bluesky profile record from their PDS (including avatar) avatarURL := "" client := atproto.NewClient(pdsEndpoint, "", "") profileRecord, err := client.GetProfileRecord(ctx, resolvedDID) if err != nil { slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err) // Continue without avatar } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" { avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link) } // Create user record user := &db.User{ DID: resolvedDID, Handle: handle, PDSEndpoint: pdsEndpoint, Avatar: avatarURL, LastSeen: time.Now(), } // Cache if enabled if p.useCache { p.userCache.cache[did] = user } // Upsert to database // Use UpsertUser if we successfully fetched an avatar (to update existing users) // Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars) if avatarURL != "" { return db.UpsertUser(p.db, user) } return db.UpsertUserIgnoreAvatar(p.db, user) } // ProcessManifest processes a manifest record and stores it in the database // Returns the manifest ID for further processing (layers/references) func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) { // Unmarshal manifest record var manifestRecord atproto.ManifestRecord if err := json.Unmarshal(recordData, &manifestRecord); err != nil { return 0, fmt.Errorf("failed to unmarshal manifest: %w", err) } // Detect manifest type isManifestList := len(manifestRecord.Manifests) > 0 // Extract hold DID from manifest (with fallback for legacy manifests) // New manifests use holdDid field (DID format) // Old manifests use holdEndpoint field (URL format) - convert to DID holdDID := manifestRecord.HoldDID if holdDID == "" && manifestRecord.HoldEndpoint != "" { // Legacy manifest - convert URL to DID holdDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint) } // Detect artifact type from config media type artifactType := "container-image" if !isManifestList && manifestRecord.Config != nil { artifactType = db.GetArtifactType(manifestRecord.Config.MediaType) } // Prepare manifest for insertion (WITHOUT annotation fields) manifest := &db.Manifest{ DID: did, Repository: manifestRecord.Repository, Digest: manifestRecord.Digest, MediaType: manifestRecord.MediaType, SchemaVersion: manifestRecord.SchemaVersion, HoldEndpoint: holdDID, ArtifactType: artifactType, CreatedAt: manifestRecord.CreatedAt, // Annotations removed - stored separately in repository_annotations table } // Set config fields only for image manifests (not manifest lists) if !isManifestList && manifestRecord.Config != nil { manifest.ConfigDigest = manifestRecord.Config.Digest manifest.ConfigSize = manifestRecord.Config.Size } // Insert manifest manifestID, err := db.InsertManifest(p.db, manifest) if err != nil { // For backfill: if manifest already exists, get its ID if strings.Contains(err.Error(), "UNIQUE constraint failed") { var existingID int64 err := p.db.QueryRow(` SELECT id FROM manifests WHERE did = ? AND repository = ? AND digest = ? `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID) if err != nil { return 0, fmt.Errorf("failed to get existing manifest ID: %w", err) } manifestID = existingID } else { return 0, fmt.Errorf("failed to insert manifest: %w", err) } } // Update repository annotations ONLY if manifest has at least one non-empty annotation if manifestRecord.Annotations != nil { hasData := false for _, value := range manifestRecord.Annotations { if value != "" { hasData = true break } } if hasData { // Replace all annotations for this repository err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations) if err != nil { return 0, fmt.Errorf("failed to upsert annotations: %w", err) } } } // Insert manifest references or layers if isManifestList { // Insert manifest references (for manifest lists/indexes) for i, ref := range manifestRecord.Manifests { platformArch := "" platformOS := "" platformVariant := "" platformOSVersion := "" if ref.Platform != nil { platformArch = ref.Platform.Architecture platformOS = ref.Platform.OS platformVariant = ref.Platform.Variant platformOSVersion = ref.Platform.OSVersion } // Detect attestation manifests from annotations isAttestation := false if ref.Annotations != nil { if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok { isAttestation = refType == "attestation-manifest" } } if err := db.InsertManifestReference(p.db, &db.ManifestReference{ ManifestID: manifestID, Digest: ref.Digest, MediaType: ref.MediaType, Size: ref.Size, PlatformArchitecture: platformArch, PlatformOS: platformOS, PlatformVariant: platformVariant, PlatformOSVersion: platformOSVersion, IsAttestation: isAttestation, ReferenceIndex: i, }); err != nil { // Continue on error - reference might already exist continue } } } else { // Insert layers (for image manifests) for i, layer := range manifestRecord.Layers { if err := db.InsertLayer(p.db, &db.Layer{ ManifestID: manifestID, Digest: layer.Digest, MediaType: layer.MediaType, Size: layer.Size, LayerIndex: i, }); err != nil { // Continue on error - layer might already exist continue } } } return manifestID, nil } // ProcessTag processes a tag record and stores it in the database func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error { // Unmarshal tag record var tagRecord atproto.TagRecord if err := json.Unmarshal(recordData, &tagRecord); err != nil { return fmt.Errorf("failed to unmarshal tag: %w", err) } // Extract digest from tag record (tries manifest field first, falls back to manifestDigest) manifestDigest, err := tagRecord.GetManifestDigest() if err != nil { return fmt.Errorf("failed to get manifest digest from tag record: %w", err) } // Insert or update tag return db.UpsertTag(p.db, &db.Tag{ DID: did, Repository: tagRecord.Repository, Tag: tagRecord.Tag, Digest: manifestDigest, CreatedAt: tagRecord.UpdatedAt, }) } // ProcessStar processes a star record and stores it in the database func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error { // Unmarshal star record var starRecord atproto.StarRecord if err := json.Unmarshal(recordData, &starRecord); err != nil { return fmt.Errorf("failed to unmarshal star: %w", err) } // Upsert the star record (idempotent - won't duplicate) // The DID here is the starrer (user who starred) // The subject contains the owner DID and repository // Star count will be calculated on demand from the stars table return db.UpsertStar(p.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt) } // ProcessSailorProfile processes a sailor profile record // This is primarily used by backfill to cache captain records for holds func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error { // Unmarshal sailor profile record var profileRecord atproto.SailorProfileRecord if err := json.Unmarshal(recordData, &profileRecord); err != nil { return fmt.Errorf("failed to unmarshal sailor profile: %w", err) } // Skip if no default hold set if profileRecord.DefaultHold == "" { return nil } // Convert hold URL/DID to canonical DID holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold) if holdDID == "" { slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold) return nil } // Query and cache the captain record using provided function // This allows backfill-specific logic (retries, test mode handling) without duplicating it here if queryCaptainFn != nil { return queryCaptainFn(ctx, holdDID) } return nil } // ProcessRepoPage processes a repository page record // This is called when Jetstream receives a repo page create/update event func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error { if isDelete { // Delete the repo page from our cache return db.DeleteRepoPage(p.db, did, rkey) } // Unmarshal repo page record var pageRecord atproto.RepoPageRecord if err := json.Unmarshal(recordData, &pageRecord); err != nil { return fmt.Errorf("failed to unmarshal repo page: %w", err) } // Extract avatar CID if present avatarCID := "" if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" { avatarCID = pageRecord.Avatar.Ref.Link } // Upsert to database return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.CreatedAt, pageRecord.UpdatedAt) } // ProcessIdentity handles identity change events (handle updates) // This is called when Jetstream receives an identity event indicating a handle change. // The identity cache is invalidated to ensure the next lookup uses the new handle, // and the database is updated to reflect the change in the UI. // // Only processes events for users who already exist in our database (have ATCR activity). func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error { // Check if user exists in our database - only update if they're an ATCR user user, err := db.GetUserByDID(p.db, did) if err != nil { return fmt.Errorf("failed to check user existence: %w", err) } // Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.) if user == nil { return nil } // Update handle in database if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil { slog.Warn("Failed to update user handle in database", "component", "processor", "did", did, "handle", newHandle, "error", err) // Continue to invalidate cache even if DB update fails } // Invalidate cached identity data to force re-resolution on next lookup if err := atproto.InvalidateIdentity(ctx, did); err != nil { slog.Warn("Failed to invalidate identity cache", "component", "processor", "did", did, "error", err) return err } slog.Info("Processed identity change event", "component", "processor", "did", did, "old_handle", user.Handle, "new_handle", newHandle) return nil } // ProcessStats handles stats record events from hold PDSes // This is called when Jetstream receives a stats create/update/delete event from a hold // The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error { // Skip if no stats cache configured if p.statsCache == nil { return nil } // Unmarshal stats record var statsRecord atproto.StatsRecord if err := json.Unmarshal(recordData, &statsRecord); err != nil { return fmt.Errorf("failed to unmarshal stats record: %w", err) } if isDelete { // Delete from in-memory cache p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository) } else { // Parse timestamps var lastPull, lastPush *time.Time if statsRecord.LastPull != "" { t, err := time.Parse(time.RFC3339, statsRecord.LastPull) if err == nil { lastPull = &t } } if statsRecord.LastPush != "" { t, err := time.Parse(time.RFC3339, statsRecord.LastPush) if err == nil { lastPush = &t } } // Update in-memory cache p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository, statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush) } // Get aggregated stats across all holds totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated( statsRecord.OwnerDID, statsRecord.Repository) // Upsert aggregated stats to repository_stats return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{ DID: statsRecord.OwnerDID, Repository: statsRecord.Repository, PullCount: int(totalPull), PushCount: int(totalPush), LastPull: latestPull, LastPush: latestPush, }) } // ProcessAccount handles account status events (deactivation/deletion/etc) // This is called when Jetstream receives an account event indicating status changes. // // Status handling: // - "deleted": Account permanently deleted - remove all cached data // - "deactivated": Could be PDS migration or permanent - invalidate cache only // - "takendown": Moderation action - invalidate cache only // - Other: Ignore // // For "deactivated", we don't delete data because it's ambiguous: // - Could be permanent deactivation (user deleted account) // - Could be PDS migration (account moves to new PDS) // Cache invalidation forces re-resolution on next lookup. // // Only processes events for users who already exist in our database (have ATCR activity). func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error { // Skip active accounts or unknown statuses if active { return nil } // Check if user exists in our database - only process if they're an ATCR user user, err := db.GetUserByDID(p.db, did) if err != nil { return fmt.Errorf("failed to check user existence: %w", err) } // Skip if user doesn't exist - they don't have any ATCR activity if user == nil { return nil } switch status { case "deleted": // Account permanently deleted - remove all cached data if err := db.DeleteUserData(p.db, did); err != nil { slog.Error("Failed to delete user data for deleted account", "component", "processor", "did", did, "handle", user.Handle, "error", err) return err } // Also invalidate identity cache _ = atproto.InvalidateIdentity(ctx, did) slog.Info("Deleted user data for deleted account", "component", "processor", "did", did, "handle", user.Handle) case "deactivated", "takendown": // Ambiguous status - invalidate cache but keep data // For deactivated: could be PDS migration, will resolve on next lookup // For takendown: moderation action, keep data in case of appeal if err := atproto.InvalidateIdentity(ctx, did); err != nil { slog.Warn("Failed to invalidate identity cache", "component", "processor", "did", did, "status", status, "error", err) return err } slog.Info("Processed account status event - cache invalidated", "component", "processor", "did", did, "handle", user.Handle, "status", status) default: // Unknown status - ignore slog.Debug("Ignoring unknown account status", "component", "processor", "did", did, "status", status) } return nil }