1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/eventconsumer"
12 "tangled.org/core/spindle/db"
13 "tangled.org/core/tap"
14)
15
16func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error {
17 l := s.l.With("component", "tapIndexer")
18
19 var err error
20 switch evt.Type {
21 case tap.EvtRecord:
22 switch evt.Record.Collection.String() {
23 case tangled.SpindleMemberNSID:
24 err = s.processMember(ctx, evt)
25 case tangled.RepoNSID:
26 err = s.processRepo(ctx, evt)
27 case tangled.RepoCollaboratorNSID:
28 err = s.processCollaborator(ctx, evt)
29 case tangled.RepoPullNSID:
30 err = s.processPull(ctx, evt)
31 }
32 case tap.EvtIdentity:
33 // no-op
34 }
35
36 if err != nil {
37 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
38 return err
39 }
40 return nil
41}
42
43// NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated)
44
45func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error {
46 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
47
48 l.Info("processing spindle.member record")
49
50 // check perms for this user
51 if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
52 l.Warn("forbidden request", "did", evt.Record.Did, "error", err)
53 return nil
54 }
55
56 switch evt.Record.Action {
57 case tap.RecordCreateAction, tap.RecordUpdateAction:
58 record := tangled.SpindleMember{}
59 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
60 return fmt.Errorf("parsing record: %w", err)
61 }
62
63 domain := s.cfg.Server.Hostname
64 if record.Instance != domain {
65 l.Info("domain mismatch", "domain", record.Instance, "expected", domain)
66 return nil
67 }
68
69 created, err := time.Parse(record.CreatedAt, time.RFC3339)
70 if err != nil {
71 created = time.Now()
72 }
73 if err := db.AddSpindleMember(s.db, db.SpindleMember{
74 Did: evt.Record.Did,
75 Rkey: evt.Record.Rkey.String(),
76 Instance: record.Instance,
77 Subject: syntax.DID(record.Subject),
78 Created: created,
79 }); err != nil {
80 l.Error("failed to add member", "error", err)
81 return fmt.Errorf("adding member to db: %w", err)
82 }
83 if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil {
84 return fmt.Errorf("adding member to rbac: %w", err)
85 }
86 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
87 return fmt.Errorf("adding did to tap", err)
88 }
89
90 l.Info("added member", "member", record.Subject)
91 return nil
92
93 case tap.RecordDeleteAction:
94 var (
95 did = evt.Record.Did.String()
96 rkey = evt.Record.Rkey.String()
97 )
98 member, err := db.GetSpindleMember(s.db, did, rkey)
99 if err != nil {
100 return fmt.Errorf("finding member: %w", err)
101 }
102
103 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
104 return fmt.Errorf("removing member from db: %w", err)
105 }
106 if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil {
107 return fmt.Errorf("removing member from rbac: %w", err)
108 }
109 if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil {
110 return fmt.Errorf("removing did from tap: %w", err)
111 }
112
113 l.Info("removed member", "member", member.Subject)
114 return nil
115 }
116 return nil
117}
118
119func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error {
120 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
121
122 l.Info("processing collaborator record")
123 switch evt.Record.Action {
124 case tap.RecordCreateAction, tap.RecordUpdateAction:
125 record := tangled.RepoCollaborator{}
126 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
127 l.Error("invalid record", "err", err)
128 return fmt.Errorf("parsing record: %w", err)
129 }
130
131 // check perms for this user
132 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil {
133 l.Warn("forbidden request", "did", evt.Record.Did, "err", err)
134 return nil
135 }
136
137 if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{
138 Did: evt.Record.Did,
139 Rkey: evt.Record.Rkey,
140 Repo: syntax.ATURI(record.Repo),
141 Subject: syntax.DID(record.Subject),
142 }); err != nil {
143 return fmt.Errorf("adding collaborator to db: %w", err)
144 }
145 if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil {
146 return fmt.Errorf("adding collaborator to rbac: %w", err)
147 }
148 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil {
149 return fmt.Errorf("adding did to tap: %w", err)
150 }
151
152 l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo)
153 return nil
154
155 case tap.RecordDeleteAction:
156 // get existing collaborator
157 collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey)
158 if err != nil {
159 return fmt.Errorf("failed to get existing collaborator info: %w", err)
160 }
161
162 // check perms for this user
163 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil {
164 l.Warn("forbidden request", "did", evt.Record.Did, "err", err)
165 return nil
166 }
167
168 if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil {
169 return fmt.Errorf("removing collaborator from db: %w", err)
170 }
171 if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil {
172 return fmt.Errorf("removing collaborator from rbac: %w", err)
173 }
174 if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil {
175 return fmt.Errorf("removing did from tap: %w", err)
176 }
177
178 l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo)
179 return nil
180 }
181 return nil
182}
183
184func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error {
185 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
186
187 l.Info("processing repo record")
188
189 // check perms for this user
190 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil {
191 l.Warn("forbidden request", "did", evt.Record.Did, "err", err)
192 return nil
193 }
194
195 switch evt.Record.Action {
196 case tap.RecordCreateAction, tap.RecordUpdateAction:
197 record := tangled.Repo{}
198 if err := json.Unmarshal(evt.Record.Record, &record); err != nil {
199 return fmt.Errorf("parsing record: %w", err)
200 }
201
202 domain := s.cfg.Server.Hostname
203 if record.Spindle == nil || *record.Spindle != domain {
204 if record.Spindle == nil {
205 l.Info("spindle isn't configured", "name", record.Name)
206 } else {
207 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
208 }
209 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
210 return fmt.Errorf("deleting repo from db: %w", err)
211 }
212 return nil
213 }
214
215 if err := s.db.PutRepo(&db.Repo{
216 Did: evt.Record.Did,
217 Rkey: evt.Record.Rkey,
218 Name: record.Name,
219 Knot: record.Knot,
220 }); err != nil {
221 return fmt.Errorf("adding repo to db: %w", err)
222 }
223
224 if err := s.e.AddRepo(evt.Record.AtUri()); err != nil {
225 return fmt.Errorf("adding repo to rbac")
226 }
227
228 // add this knot to the event consumer
229 src := eventconsumer.NewKnotSource(record.Knot)
230 s.ks.AddSource(context.Background(), src)
231
232 l.Info("added repo", "repo", evt.Record.AtUri())
233 return nil
234
235 case tap.RecordDeleteAction:
236 // check perms for this user
237 if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil {
238 l.Warn("forbidden request", "did", evt.Record.Did, "err", err)
239 return nil
240 }
241
242 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil {
243 return fmt.Errorf("deleting repo from db: %w", err)
244 }
245
246 if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil {
247 return fmt.Errorf("deleting repo from rbac: %w", err)
248 }
249
250 l.Info("deleted repo", "repo", evt.Record.AtUri())
251 return nil
252 }
253 return nil
254}
255
256func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error {
257 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri())
258
259 l.Info("processing pull record")
260
261 switch evt.Record.Action {
262 case tap.RecordCreateAction, tap.RecordUpdateAction:
263 // TODO
264 case tap.RecordDeleteAction:
265 // TODO
266 }
267 return nil
268}
269
270func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error {
271 known, err := s.db.IsKnownDid(syntax.DID(did))
272 if err != nil {
273 return fmt.Errorf("ensuring did known state: %w", err)
274 }
275 if !known {
276 if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil {
277 return fmt.Errorf("removing did from tap: %w", err)
278 }
279 }
280 return nil
281}