Signed-off-by: oppiliappan me@oppi.li
+13
jetstream/jetstream.go
+13
jetstream/jetstream.go
···
52
j.mu.Unlock()
53
}
54
55
+
func (j *JetstreamClient) RemoveDid(did string) {
56
+
if did == "" {
57
+
return
58
+
}
59
+
60
+
if j.logDids {
61
+
j.l.Info("removing did from in-memory filter", "did", did)
62
+
}
63
+
j.mu.Lock()
64
+
delete(j.wantedDids, did)
65
+
j.mu.Unlock()
66
+
}
67
+
68
type processor func(context.Context, *models.Event) error
69
70
func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
+61
knotserver/ingester.go
+61
knotserver/ingester.go
···
213
return h.db.InsertEvent(event, h.n)
214
}
215
216
func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
217
l := log.FromContext(ctx)
218
···
292
if err := h.processKnotMember(ctx, did, record); err != nil {
293
return fmt.Errorf("failed to process knot member: %w", err)
294
}
295
case tangled.RepoPullNSID:
296
var record tangled.RepoPull
297
if err := json.Unmarshal(raw, &record); err != nil {
···
300
if err := h.processPull(ctx, did, record); err != nil {
301
return fmt.Errorf("failed to process knot member: %w", err)
302
}
303
}
304
305
return err
···
213
return h.db.InsertEvent(event, h.n)
214
}
215
216
+
// duplicated from add collaborator
217
+
func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error {
218
+
repoAt, err := syntax.ParseATURI(record.Repo)
219
+
if err != nil {
220
+
return err
221
+
}
222
+
223
+
resolver := idresolver.DefaultResolver()
224
+
225
+
subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
226
+
if err != nil || subjectId.Handle.IsInvalidHandle() {
227
+
return err
228
+
}
229
+
230
+
// TODO: fix this for good, we need to fetch the record here unfortunately
231
+
// resolve this aturi to extract the repo record
232
+
owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
233
+
if err != nil || owner.Handle.IsInvalidHandle() {
234
+
return fmt.Errorf("failed to resolve handle: %w", err)
235
+
}
236
+
237
+
xrpcc := xrpc.Client{
238
+
Host: owner.PDSEndpoint(),
239
+
}
240
+
241
+
resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
242
+
if err != nil {
243
+
return err
244
+
}
245
+
246
+
repo := resp.Value.Val.(*tangled.Repo)
247
+
didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
248
+
249
+
// check perms for this user
250
+
if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
251
+
return fmt.Errorf("insufficient permissions: %w", err)
252
+
}
253
+
254
+
if err := h.db.AddDid(subjectId.DID.String()); err != nil {
255
+
return err
256
+
}
257
+
h.jc.AddDid(subjectId.DID.String())
258
+
259
+
if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
260
+
return err
261
+
}
262
+
263
+
return h.fetchAndAddKeys(ctx, subjectId.DID.String())
264
+
}
265
+
266
func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
267
l := log.FromContext(ctx)
268
···
342
if err := h.processKnotMember(ctx, did, record); err != nil {
343
return fmt.Errorf("failed to process knot member: %w", err)
344
}
345
+
346
case tangled.RepoPullNSID:
347
var record tangled.RepoPull
348
if err := json.Unmarshal(raw, &record); err != nil {
···
351
if err := h.processPull(ctx, did, record); err != nil {
352
return fmt.Errorf("failed to process knot member: %w", err)
353
}
354
+
355
+
case tangled.RepoCollaboratorNSID:
356
+
var record tangled.RepoCollaborator
357
+
if err := json.Unmarshal(raw, &record); err != nil {
358
+
return fmt.Errorf("failed to unmarshal record: %w", err)
359
+
}
360
+
if err := h.processCollaborator(ctx, did, record); err != nil {
361
+
return fmt.Errorf("failed to process knot member: %w", err)
362
+
}
363
+
364
}
365
366
return err
+1
knotserver/server.go
+1
knotserver/server.go
+103
spindle/ingester.go
+103
spindle/ingester.go
···
8
9
"tangled.sh/tangled.sh/core/api/tangled"
10
"tangled.sh/tangled.sh/core/eventconsumer"
11
"tangled.sh/tangled.sh/core/rbac"
12
13
"github.com/bluesky-social/jetstream/pkg/models"
14
)
15
16
type Ingester func(ctx context.Context, e *models.Event) error
···
35
s.ingestMember(ctx, e)
36
case tangled.RepoNSID:
37
s.ingestRepo(ctx, e)
38
}
39
40
return err
···
144
}
145
return nil
146
}
···
8
9
"tangled.sh/tangled.sh/core/api/tangled"
10
"tangled.sh/tangled.sh/core/eventconsumer"
11
+
"tangled.sh/tangled.sh/core/idresolver"
12
"tangled.sh/tangled.sh/core/rbac"
13
14
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
15
+
"github.com/bluesky-social/indigo/atproto/identity"
16
+
"github.com/bluesky-social/indigo/atproto/syntax"
17
+
"github.com/bluesky-social/indigo/xrpc"
18
"github.com/bluesky-social/jetstream/pkg/models"
19
+
securejoin "github.com/cyphar/filepath-securejoin"
20
)
21
22
type Ingester func(ctx context.Context, e *models.Event) error
···
41
s.ingestMember(ctx, e)
42
case tangled.RepoNSID:
43
s.ingestRepo(ctx, e)
44
+
case tangled.RepoCollaboratorNSID:
45
+
s.ingestCollaborator(ctx, e)
46
}
47
48
return err
···
152
}
153
return nil
154
}
155
+
156
+
func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error {
157
+
var err error
158
+
159
+
l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did)
160
+
161
+
l.Info("ingesting collaborator record")
162
+
163
+
switch e.Commit.Operation {
164
+
case models.CommitOperationCreate, models.CommitOperationUpdate:
165
+
raw := e.Commit.Record
166
+
record := tangled.RepoCollaborator{}
167
+
err = json.Unmarshal(raw, &record)
168
+
if err != nil {
169
+
l.Error("invalid record", "error", err)
170
+
return err
171
+
}
172
+
173
+
resolver := idresolver.DefaultResolver()
174
+
175
+
subjectId, err := resolver.ResolveIdent(ctx, record.Subject)
176
+
if err != nil || subjectId.Handle.IsInvalidHandle() {
177
+
return err
178
+
}
179
+
180
+
repoAt, err := syntax.ParseATURI(record.Repo)
181
+
if err != nil {
182
+
l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo)
183
+
return nil
184
+
}
185
+
186
+
// TODO: get rid of this entirely
187
+
// resolve this aturi to extract the repo record
188
+
owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String())
189
+
if err != nil || owner.Handle.IsInvalidHandle() {
190
+
return fmt.Errorf("failed to resolve handle: %w", err)
191
+
}
192
+
193
+
xrpcc := xrpc.Client{
194
+
Host: owner.PDSEndpoint(),
195
+
}
196
+
197
+
resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
198
+
if err != nil {
199
+
return err
200
+
}
201
+
202
+
repo := resp.Value.Val.(*tangled.Repo)
203
+
didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
204
+
205
+
// check perms for this user
206
+
if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
207
+
return fmt.Errorf("insufficient permissions: %w", err)
208
+
}
209
+
210
+
// add collaborator to rbac
211
+
if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
212
+
l.Error("failed to add repo to enforcer", "error", err)
213
+
return fmt.Errorf("failed to add repo: %w", err)
214
+
}
215
+
216
+
return nil
217
+
}
218
+
return nil
219
+
}
220
+
221
+
func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error {
222
+
l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators")
223
+
224
+
l.Info("fetching and adding existing collaborators")
225
+
226
+
xrpcc := xrpc.Client{
227
+
Host: owner.PDSEndpoint(),
228
+
}
229
+
230
+
resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false)
231
+
if err != nil {
232
+
return err
233
+
}
234
+
235
+
var errs error
236
+
for _, r := range resp.Records {
237
+
if r == nil {
238
+
continue
239
+
}
240
+
record := r.Value.Val.(*tangled.RepoCollaborator)
241
+
242
+
if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil {
243
+
l.Error("failed to add repo to enforcer", "error", err)
244
+
errors.Join(errs, fmt.Errorf("failed to add repo: %w", err))
245
+
}
246
+
}
247
+
248
+
return errs
249
+
}
+1
spindle/server.go
+1
spindle/server.go