a love letter to tangled (android, iOS, and a search API)
1package index
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7
8 "tangled.org/desertthunder.dev/twister/internal/normalize"
9 "tangled.org/desertthunder.dev/twister/internal/store"
10 "tangled.org/desertthunder.dev/twister/internal/xrpc"
11)
12
13type Result struct {
14 Decision string
15 DocumentID string
16 Collection string
17 CID string
18}
19
20type Processor struct {
21 store store.IndexingStore
22 registry *normalize.Registry
23 xrpc *xrpc.Client
24 policy Policy
25 log *slog.Logger
26}
27
28func NewProcessor(
29 st store.IndexingStore, registry *normalize.Registry, xrpcClient *xrpc.Client,
30 policy Policy, log *slog.Logger,
31) *Processor {
32 if log == nil {
33 log = slog.Default()
34 }
35 return &Processor{store: st, registry: registry, xrpc: xrpcClient, policy: policy, log: log}
36}
37
38func (p *Processor) ProcessRecord(
39 ctx context.Context, source string, event normalize.TapRecordEvent,
40) (*Result, error) {
41 if event.Record == nil {
42 return &Result{Decision: "skip_missing_record"}, nil
43 }
44 record := event.Record
45 result := &Result{
46 Decision: "skip_unknown",
47 DocumentID: normalize.StableID(record.DID, record.Collection, record.RKey),
48 Collection: record.Collection,
49 CID: record.CID,
50 }
51 if !p.policy.Allows(source, record.Collection) {
52 result.Decision = "skip_collection"
53 return result, nil
54 }
55 if handler, ok := p.registry.StateHandler(record.Collection); ok {
56 if record.Action == "delete" {
57 result.Decision = "skip_state_delete"
58 return result, nil
59 }
60 update, err := handler.HandleState(event)
61 if err != nil {
62 return result, &PermanentError{Decision: "invalid_state_record", Err: err}
63 }
64 if err := p.store.UpdateRecordState(ctx, update.SubjectURI, update.State); err != nil {
65 return result, fmt.Errorf("update state: %w", err)
66 }
67 result.Decision = "state_updated"
68 return result, nil
69 }
70 adapter, ok := p.registry.Adapter(record.Collection)
71 if !ok {
72 result.Decision = "skip_unsupported_collection"
73 return result, nil
74 }
75 if record.Action == "delete" {
76 if err := p.store.MarkDeleted(ctx, result.DocumentID); err != nil {
77 return result, fmt.Errorf("mark deleted: %w", err)
78 }
79 result.Decision = "document_deleted"
80 return result, nil
81 }
82 if record.Record == nil {
83 return result, &PermanentError{Decision: "missing_record_payload"}
84 }
85 if !adapter.Searchable(record.Record) {
86 result.Decision = "skip_unsearchable"
87 return result, nil
88 }
89 doc, err := adapter.Normalize(event)
90 if err != nil {
91 return result, &PermanentError{Decision: "normalize_failed", Err: err}
92 }
93 if err := p.enrichDocument(ctx, doc, record.Record); err != nil {
94 return result, err
95 }
96 if err := p.store.UpsertDocument(ctx, doc); err != nil {
97 return result, fmt.Errorf("upsert document: %w", err)
98 }
99 result.Decision = "document_upserted"
100 return result, nil
101}