A community based topic aggregation platform built on atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 347 lines 12 kB view raw
1package jetstream 2 3import ( 4 "Coves/internal/core/aggregators" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log" 9 "time" 10) 11 12// AggregatorEventConsumer consumes aggregator-related events from Jetstream 13// Following Bluesky's pattern: feed generators (app.bsky.feed.generator) and labelers (app.bsky.labeler.service) 14type AggregatorEventConsumer struct { 15 repo aggregators.Repository // Repository for aggregator operations 16} 17 18// NewAggregatorEventConsumer creates a new Jetstream consumer for aggregator events 19func NewAggregatorEventConsumer(repo aggregators.Repository) *AggregatorEventConsumer { 20 return &AggregatorEventConsumer{ 21 repo: repo, 22 } 23} 24 25// HandleEvent processes a Jetstream event for aggregator records 26// This is called by the main Jetstream consumer when it receives commit events 27func (c *AggregatorEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 28 // We only care about commit events for aggregator records 29 if event.Kind != "commit" || event.Commit == nil { 30 return nil 31 } 32 33 commit := event.Commit 34 35 // Route to appropriate handler based on collection 36 // IMPORTANT: Collection names refer to RECORD TYPES in repositories 37 // - social.coves.aggregator.service: Service declaration (in aggregator's own repo, rkey="self") 38 // - social.coves.aggregator.authorization: Authorization (in community's repo, any rkey) 39 switch commit.Collection { 40 case "social.coves.aggregator.service": 41 return c.handleServiceDeclaration(ctx, event.Did, commit) 42 case "social.coves.aggregator.authorization": 43 return c.handleAuthorization(ctx, event.Did, commit) 44 default: 45 // Not an aggregator-related collection 46 return nil 47 } 48} 49 50// handleServiceDeclaration processes aggregator service declaration events 51// Service declarations are stored at: at://aggregator_did/social.coves.aggregator.service/self 52func (c *AggregatorEventConsumer) handleServiceDeclaration(ctx context.Context, did string, commit *CommitEvent) error { 53 switch commit.Operation { 54 case "create", "update": 55 // Both create and update are handled the same way (upsert) 56 return c.upsertAggregator(ctx, did, commit) 57 case "delete": 58 return c.deleteAggregator(ctx, did) 59 default: 60 log.Printf("Unknown operation for aggregator service: %s", commit.Operation) 61 return nil 62 } 63} 64 65// handleAuthorization processes authorization record events 66// Authorizations are stored at: at://community_did/social.coves.aggregator.authorization/{rkey} 67func (c *AggregatorEventConsumer) handleAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error { 68 switch commit.Operation { 69 case "create", "update": 70 // Both create and update are handled the same way (upsert) 71 return c.upsertAuthorization(ctx, communityDID, commit) 72 case "delete": 73 return c.deleteAuthorization(ctx, communityDID, commit) 74 default: 75 log.Printf("Unknown operation for aggregator authorization: %s", commit.Operation) 76 return nil 77 } 78} 79 80// upsertAggregator indexes or updates an aggregator service declaration 81func (c *AggregatorEventConsumer) upsertAggregator(ctx context.Context, did string, commit *CommitEvent) error { 82 if commit.Record == nil { 83 return fmt.Errorf("aggregator service event missing record data") 84 } 85 86 // Verify rkey is "self" (canonical location for service declaration) 87 // Following Bluesky's pattern: app.bsky.feed.generator and app.bsky.labeler.service use /self 88 if commit.RKey != "self" { 89 return fmt.Errorf("invalid aggregator service rkey: expected 'self', got '%s'", commit.RKey) 90 } 91 92 // Parse the service declaration record 93 service, err := parseAggregatorService(commit.Record) 94 if err != nil { 95 return fmt.Errorf("failed to parse aggregator service: %w", err) 96 } 97 98 // Validate DID matches repo DID (security check) 99 if service.DID != "" && service.DID != did { 100 return fmt.Errorf("service record DID (%s) does not match repo DID (%s)", service.DID, did) 101 } 102 103 // Build AT-URI for this record 104 uri := fmt.Sprintf("at://%s/social.coves.aggregator.service/self", did) 105 106 // Parse createdAt from service record 107 var createdAt time.Time 108 if service.CreatedAt != "" { 109 createdAt, err = time.Parse(time.RFC3339, service.CreatedAt) 110 if err != nil { 111 createdAt = time.Now() // Fallback 112 log.Printf("Warning: invalid createdAt format for aggregator %s: %v", did, err) 113 } 114 } else { 115 createdAt = time.Now() 116 } 117 118 // Extract avatar CID from blob if present 119 var avatarCID string 120 if service.Avatar != nil { 121 if cid, ok := extractBlobCID(service.Avatar); ok { 122 avatarCID = cid 123 } 124 } 125 126 // Build aggregator domain model 127 agg := &aggregators.Aggregator{ 128 DID: did, 129 DisplayName: service.DisplayName, 130 Description: service.Description, 131 AvatarURL: avatarCID, // Now contains the CID from blob 132 MaintainerDID: service.MaintainerDID, 133 SourceURL: service.SourceURL, 134 CreatedAt: createdAt, 135 IndexedAt: time.Now(), 136 RecordURI: uri, 137 RecordCID: commit.CID, 138 } 139 140 // Handle config schema (JSONB) 141 if service.ConfigSchema != nil { 142 schemaBytes, err := json.Marshal(service.ConfigSchema) 143 if err != nil { 144 return fmt.Errorf("failed to marshal config schema: %w", err) 145 } 146 agg.ConfigSchema = schemaBytes 147 } 148 149 // Create or update in database 150 if err := c.repo.CreateAggregator(ctx, agg); err != nil { 151 return fmt.Errorf("failed to index aggregator: %w", err) 152 } 153 154 log.Printf("[AGGREGATOR-CONSUMER] Indexed service: %s (%s)", agg.DisplayName, did) 155 return nil 156} 157 158// deleteAggregator removes an aggregator from the index 159func (c *AggregatorEventConsumer) deleteAggregator(ctx context.Context, did string) error { 160 // Delete from database (cascade deletes authorizations and posts via FK) 161 if err := c.repo.DeleteAggregator(ctx, did); err != nil { 162 // Log but don't fail if not found (idempotent delete) 163 if aggregators.IsNotFound(err) { 164 log.Printf("[AGGREGATOR-CONSUMER] Aggregator not found for deletion: %s (already deleted?)", did) 165 return nil 166 } 167 return fmt.Errorf("failed to delete aggregator: %w", err) 168 } 169 170 log.Printf("[AGGREGATOR-CONSUMER] Deleted aggregator: %s", did) 171 return nil 172} 173 174// upsertAuthorization indexes or updates an authorization record 175func (c *AggregatorEventConsumer) upsertAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error { 176 if commit.Record == nil { 177 return fmt.Errorf("authorization event missing record data") 178 } 179 180 // Parse the authorization record 181 authRecord, err := parseAggregatorAuthorization(commit.Record) 182 if err != nil { 183 return fmt.Errorf("failed to parse authorization: %w", err) 184 } 185 186 // Validate communityDid matches repo DID (security check) 187 if authRecord.CommunityDid != "" && authRecord.CommunityDid != communityDID { 188 return fmt.Errorf("authorization record communityDid (%s) does not match repo DID (%s)", 189 authRecord.CommunityDid, communityDID) 190 } 191 192 // Build AT-URI for this record 193 uri := fmt.Sprintf("at://%s/social.coves.aggregator.authorization/%s", communityDID, commit.RKey) 194 195 // Parse createdAt from authorization record 196 var createdAt time.Time 197 if authRecord.CreatedAt != "" { 198 createdAt, err = time.Parse(time.RFC3339, authRecord.CreatedAt) 199 if err != nil { 200 createdAt = time.Now() // Fallback 201 log.Printf("Warning: invalid createdAt format for authorization %s: %v", uri, err) 202 } 203 } else { 204 createdAt = time.Now() 205 } 206 207 // Parse disabledAt from authorization record (optional, for modlog/audit) 208 var disabledAt *time.Time 209 if authRecord.DisabledAt != "" { 210 parsed, err := time.Parse(time.RFC3339, authRecord.DisabledAt) 211 if err != nil { 212 log.Printf("Warning: invalid disabledAt format for authorization %s: %v", uri, err) 213 } else { 214 disabledAt = &parsed 215 } 216 } 217 218 // Build authorization domain model 219 auth := &aggregators.Authorization{ 220 AggregatorDID: authRecord.Aggregator, 221 CommunityDID: communityDID, 222 Enabled: authRecord.Enabled, 223 CreatedBy: authRecord.CreatedBy, 224 DisabledBy: authRecord.DisabledBy, 225 DisabledAt: disabledAt, 226 CreatedAt: createdAt, 227 IndexedAt: time.Now(), 228 RecordURI: uri, 229 RecordCID: commit.CID, 230 } 231 232 // Handle config (JSONB) 233 if authRecord.Config != nil { 234 configBytes, err := json.Marshal(authRecord.Config) 235 if err != nil { 236 return fmt.Errorf("failed to marshal config: %w", err) 237 } 238 auth.Config = configBytes 239 } 240 241 // Create or update in database 242 if err := c.repo.CreateAuthorization(ctx, auth); err != nil { 243 return fmt.Errorf("failed to index authorization: %w", err) 244 } 245 246 log.Printf("[AGGREGATOR-CONSUMER] Indexed authorization: community=%s, aggregator=%s, enabled=%v", 247 communityDID, authRecord.Aggregator, authRecord.Enabled) 248 return nil 249} 250 251// deleteAuthorization removes an authorization from the index 252func (c *AggregatorEventConsumer) deleteAuthorization(ctx context.Context, communityDID string, commit *CommitEvent) error { 253 // Build AT-URI to find the authorization 254 uri := fmt.Sprintf("at://%s/social.coves.aggregator.authorization/%s", communityDID, commit.RKey) 255 256 // Delete from database 257 if err := c.repo.DeleteAuthorizationByURI(ctx, uri); err != nil { 258 // Log but don't fail if not found (idempotent delete) 259 if aggregators.IsNotFound(err) { 260 log.Printf("[AGGREGATOR-CONSUMER] Authorization not found for deletion: %s (already deleted?)", uri) 261 return nil 262 } 263 return fmt.Errorf("failed to delete authorization: %w", err) 264 } 265 266 log.Printf("[AGGREGATOR-CONSUMER] Deleted authorization: %s", uri) 267 return nil 268} 269 270// ===== Record Parsing Functions ===== 271 272// AggregatorServiceRecord represents the service declaration record structure 273type AggregatorServiceRecord struct { 274 Type string `json:"$type"` 275 DID string `json:"did"` // DID of aggregator (must match repo DID) 276 DisplayName string `json:"displayName"` 277 Description string `json:"description,omitempty"` 278 Avatar map[string]interface{} `json:"avatar,omitempty"` // Blob reference (CID will be extracted) 279 ConfigSchema map[string]interface{} `json:"configSchema,omitempty"` // JSON Schema 280 MaintainerDID string `json:"maintainer,omitempty"` // Fixed: was maintainerDid 281 SourceURL string `json:"sourceUrl,omitempty"` // Fixed: was homepageUrl 282 CreatedAt string `json:"createdAt"` 283} 284 285// parseAggregatorService parses an aggregator service record 286func parseAggregatorService(record interface{}) (*AggregatorServiceRecord, error) { 287 recordBytes, err := json.Marshal(record) 288 if err != nil { 289 return nil, fmt.Errorf("failed to marshal record: %w", err) 290 } 291 292 var service AggregatorServiceRecord 293 if err := json.Unmarshal(recordBytes, &service); err != nil { 294 return nil, fmt.Errorf("failed to unmarshal service record: %w", err) 295 } 296 297 // Validate required fields 298 if service.DisplayName == "" { 299 return nil, fmt.Errorf("displayName is required") 300 } 301 302 return &service, nil 303} 304 305// Note: extractBlobCID is defined in community_consumer.go and shared across consumers 306 307// AggregatorAuthorizationRecord represents the authorization record structure 308type AggregatorAuthorizationRecord struct { 309 Config map[string]interface{} `json:"config,omitempty"` 310 Type string `json:"$type"` 311 Aggregator string `json:"aggregatorDid"` 312 CommunityDid string `json:"communityDid"` 313 CreatedBy string `json:"createdBy"` 314 DisabledBy string `json:"disabledBy,omitempty"` 315 DisabledAt string `json:"disabledAt,omitempty"` 316 CreatedAt string `json:"createdAt"` 317 Enabled bool `json:"enabled"` 318} 319 320// parseAggregatorAuthorization parses an aggregator authorization record 321func parseAggregatorAuthorization(record interface{}) (*AggregatorAuthorizationRecord, error) { 322 recordBytes, err := json.Marshal(record) 323 if err != nil { 324 return nil, fmt.Errorf("failed to marshal record: %w", err) 325 } 326 327 var auth AggregatorAuthorizationRecord 328 if err := json.Unmarshal(recordBytes, &auth); err != nil { 329 return nil, fmt.Errorf("failed to unmarshal authorization record: %w", err) 330 } 331 332 // Validate required fields per lexicon 333 if auth.Aggregator == "" { 334 return nil, fmt.Errorf("aggregatorDid is required") 335 } 336 if auth.CommunityDid == "" { 337 return nil, fmt.Errorf("communityDid is required") 338 } 339 if auth.CreatedAt == "" { 340 return nil, fmt.Errorf("createdAt is required") 341 } 342 if auth.CreatedBy == "" { 343 return nil, fmt.Errorf("createdBy is required") 344 } 345 346 return &auth, nil 347}