a love letter to tangled (android, iOS, and a search API)
at main 101 lines 2.8 kB view raw
1package index 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 8 "tangled.org/desertthunder.dev/twister/internal/normalize" 9 "tangled.org/desertthunder.dev/twister/internal/store" 10 "tangled.org/desertthunder.dev/twister/internal/xrpc" 11) 12 13type Result struct { 14 Decision string 15 DocumentID string 16 Collection string 17 CID string 18} 19 20type Processor struct { 21 store store.IndexingStore 22 registry *normalize.Registry 23 xrpc *xrpc.Client 24 policy Policy 25 log *slog.Logger 26} 27 28func NewProcessor( 29 st store.IndexingStore, registry *normalize.Registry, xrpcClient *xrpc.Client, 30 policy Policy, log *slog.Logger, 31) *Processor { 32 if log == nil { 33 log = slog.Default() 34 } 35 return &Processor{store: st, registry: registry, xrpc: xrpcClient, policy: policy, log: log} 36} 37 38func (p *Processor) ProcessRecord( 39 ctx context.Context, source string, event normalize.TapRecordEvent, 40) (*Result, error) { 41 if event.Record == nil { 42 return &Result{Decision: "skip_missing_record"}, nil 43 } 44 record := event.Record 45 result := &Result{ 46 Decision: "skip_unknown", 47 DocumentID: normalize.StableID(record.DID, record.Collection, record.RKey), 48 Collection: record.Collection, 49 CID: record.CID, 50 } 51 if !p.policy.Allows(source, record.Collection) { 52 result.Decision = "skip_collection" 53 return result, nil 54 } 55 if handler, ok := p.registry.StateHandler(record.Collection); ok { 56 if record.Action == "delete" { 57 result.Decision = "skip_state_delete" 58 return result, nil 59 } 60 update, err := handler.HandleState(event) 61 if err != nil { 62 return result, &PermanentError{Decision: "invalid_state_record", Err: err} 63 } 64 if err := p.store.UpdateRecordState(ctx, update.SubjectURI, update.State); err != nil { 65 return result, fmt.Errorf("update state: %w", err) 66 } 67 result.Decision = "state_updated" 68 return result, nil 69 } 70 adapter, ok := p.registry.Adapter(record.Collection) 71 if !ok { 72 result.Decision = "skip_unsupported_collection" 73 return result, nil 74 } 75 if record.Action == "delete" { 76 if err := p.store.MarkDeleted(ctx, result.DocumentID); err != nil { 77 return result, fmt.Errorf("mark deleted: %w", err) 78 } 79 result.Decision = "document_deleted" 80 return result, nil 81 } 82 if record.Record == nil { 83 return result, &PermanentError{Decision: "missing_record_payload"} 84 } 85 if !adapter.Searchable(record.Record) { 86 result.Decision = "skip_unsearchable" 87 return result, nil 88 } 89 doc, err := adapter.Normalize(event) 90 if err != nil { 91 return result, &PermanentError{Decision: "normalize_failed", Err: err} 92 } 93 if err := p.enrichDocument(ctx, doc, record.Record); err != nil { 94 return result, err 95 } 96 if err := p.store.UpsertDocument(ctx, doc); err != nil { 97 return result, fmt.Errorf("upsert document: %w", err) 98 } 99 result.Decision = "document_upserted" 100 return result, nil 101}