forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "time"
9
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/eventconsumer"
12 "tangled.org/core/rbac"
13 "tangled.org/core/spindle/db"
14
15 comatproto "github.com/bluesky-social/indigo/api/atproto"
16 "github.com/bluesky-social/indigo/atproto/identity"
17 "github.com/bluesky-social/indigo/atproto/syntax"
18 "github.com/bluesky-social/indigo/xrpc"
19 "github.com/bluesky-social/jetstream/pkg/models"
20 securejoin "github.com/cyphar/filepath-securejoin"
21)
22
23type Ingester func(ctx context.Context, e *models.Event) error
24
25func (s *Spindle) ingest() Ingester {
26 return func(ctx context.Context, e *models.Event) error {
27 var err error
28 defer func() {
29 eventTime := e.TimeUS
30 lastTimeUs := eventTime + 1
31 if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil {
32 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
33 }
34 }()
35
36 if e.Kind != models.EventKindCommit {
37 return nil
38 }
39
40 switch e.Commit.Collection {
41 case tangled.SpindleMemberNSID:
42 err = s.ingestMember(ctx, e)
43 case tangled.RepoNSID:
44 err = s.ingestRepo(ctx, e)
45 case tangled.RepoCollaboratorNSID:
46 err = s.ingestCollaborator(ctx, e)
47 }
48
49 if err != nil {
50 s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err)
51 }
52
53 return nil
54 }
55}
56
57func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error {
58 var err error
59 did := e.Did
60 rkey := e.Commit.RKey
61
62 l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID)
63
64 switch e.Commit.Operation {
65 case models.CommitOperationCreate, models.CommitOperationUpdate:
66 raw := e.Commit.Record
67 record := tangled.SpindleMember{}
68 err = json.Unmarshal(raw, &record)
69 if err != nil {
70 l.Error("invalid record", "error", err)
71 return err
72 }
73
74 domain := s.cfg.Server.Hostname
75 recordInstance := record.Instance
76
77 if recordInstance != domain {
78 l.Error("domain mismatch", "domain", recordInstance, "expected", domain)
79 return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain)
80 }
81
82 ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain)
83 if err != nil || !ok {
84 l.Error("failed to add member", "did", did, "error", err)
85 return fmt.Errorf("failed to enforce permissions: %w", err)
86 }
87
88 if err := db.AddSpindleMember(s.db, db.SpindleMember{
89 Did: syntax.DID(did),
90 Rkey: rkey,
91 Instance: recordInstance,
92 Subject: syntax.DID(record.Subject),
93 Created: time.Now(),
94 }); err != nil {
95 l.Error("failed to add member", "error", err)
96 return fmt.Errorf("failed to add member: %w", err)
97 }
98
99 if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil {
100 l.Error("failed to add member", "error", err)
101 return fmt.Errorf("failed to add member: %w", err)
102 }
103 l.Info("added member from firehose", "member", record.Subject)
104
105 if err := s.db.AddDid(record.Subject); err != nil {
106 l.Error("failed to add did", "error", err)
107 return fmt.Errorf("failed to add did: %w", err)
108 }
109 s.jc.AddDid(record.Subject)
110
111 return nil
112
113 case models.CommitOperationDelete:
114 record, err := db.GetSpindleMember(s.db, did, rkey)
115 if err != nil {
116 l.Error("failed to find member", "error", err)
117 return fmt.Errorf("failed to find member: %w", err)
118 }
119
120 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil {
121 l.Error("failed to remove member", "error", err)
122 return fmt.Errorf("failed to remove member: %w", err)
123 }
124
125 if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil {
126 l.Error("failed to add member", "error", err)
127 return fmt.Errorf("failed to add member: %w", err)
128 }
129 l.Info("added member from firehose", "member", record.Subject)
130
131 if err := s.db.RemoveDid(record.Subject.String()); err != nil {
132 l.Error("failed to add did", "error", err)
133 return fmt.Errorf("failed to add did: %w", err)
134 }
135 s.jc.RemoveDid(record.Subject.String())
136
137 }
138 return nil
139}
140
141func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error {
142 var err error
143 did := e.Did
144
145 l := s.l.With("component", "ingester", "record", tangled.RepoNSID)
146
147 l.Info("ingesting repo record", "did", did)
148
149 switch e.Commit.Operation {
150 case models.CommitOperationCreate, models.CommitOperationUpdate:
151 raw := e.Commit.Record
152 record := tangled.Repo{}
153 err = json.Unmarshal(raw, &record)
154 if err != nil {
155 l.Error("invalid record", "error", err)
156 return err
157 }
158
159 domain := s.cfg.Server.Hostname
160
161 // no spindle configured for this repo
162 if record.Spindle == nil {
163 l.Info("no spindle configured", "name", record.Name)
164 return nil
165 }
166
167 // this repo did not want this spindle
168 if *record.Spindle != domain {
169 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain)
170 return nil
171 }
172
173 // add this repo to the watch list
174 if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil {
175 l.Error("failed to add repo", "error", err)
176 return fmt.Errorf("failed to add repo: %w", err)
177 }
178
179 didSlashRepo, err := securejoin.SecureJoin(did, record.Name)
180 if err != nil {
181 return err
182 }
183
184 // add repo to rbac
185 if err := s.e.AddRepo(did, rbac.ThisServer, didSlashRepo); err != nil {
186 l.Error("failed to add repo to enforcer", "error", err)
187 return fmt.Errorf("failed to add repo: %w", err)
188 }
189
190 // add collaborators to rbac
191 owner, err := s.res.ResolveIdent(ctx, did)
192 if err != nil || owner.Handle.IsInvalidHandle() {
193 return err
194 }
195 if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil {
196 return err
197 }
198
199 // add this knot to the event consumer
200 src := eventconsumer.NewKnotSource(record.Knot)
201 s.ks.AddSource(context.Background(), src)
202
203 return nil
204
205 }
206 return nil
207}
208
209func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error {
210 var err error
211
212 l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did)
213
214 l.Info("ingesting collaborator record")
215
216 switch e.Commit.Operation {
217 case models.CommitOperationCreate, models.CommitOperationUpdate:
218 raw := e.Commit.Record
219 record := tangled.RepoCollaborator{}
220 err = json.Unmarshal(raw, &record)
221 if err != nil {
222 l.Error("invalid record", "error", err)
223 return err
224 }
225
226 subjectId, err := s.res.ResolveIdent(ctx, record.Subject)
227 if err != nil || subjectId.Handle.IsInvalidHandle() {
228 return err
229 }
230
231 repoAt, err := syntax.ParseATURI(record.Repo)
232 if err != nil {
233 l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo)
234 return nil
235 }
236
237 // TODO: get rid of this entirely
238 // resolve this aturi to extract the repo record
239 owner, err := s.res.ResolveIdent(ctx, repoAt.Authority().String())
240 if err != nil || owner.Handle.IsInvalidHandle() {
241 return fmt.Errorf("failed to resolve handle: %w", err)
242 }
243
244 xrpcc := xrpc.Client{
245 Host: owner.PDSEndpoint(),
246 }
247
248 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
249 if err != nil {
250 return err
251 }
252
253 repo := resp.Value.Val.(*tangled.Repo)
254 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
255
256 // check perms for this user
257 if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
258 return fmt.Errorf("insufficient permissions: %w", err)
259 }
260
261 // add collaborator to rbac
262 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
263 l.Error("failed to add repo to enforcer", "error", err)
264 return fmt.Errorf("failed to add repo: %w", err)
265 }
266
267 return nil
268 }
269 return nil
270}
271
272func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error {
273 l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
274
275 l.Info("fetching and adding existing collaborators")
276
277 xrpcc := xrpc.Client{
278 Host: owner.PDSEndpoint(),
279 }
280
281 resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false)
282 if err != nil {
283 return err
284 }
285
286 var errs error
287 for _, r := range resp.Records {
288 if r == nil {
289 continue
290 }
291 record := r.Value.Val.(*tangled.RepoCollaborator)
292
293 if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
294 l.Error("failed to add repo to enforcer", "error", err)
295 errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
296 }
297 }
298
299 return errs
300}