forked from tangled.org/core
this repo has no description

knotserver,spindle: ingest collaborator record and update rbac

Signed-off-by: oppiliappan <me@oppi.li>

authored by oppi.li and committed by Tangled f4c32b12 74cab56f

Changed files
+179
jetstream
knotserver
spindle
+13
jetstream/jetstream.go
··· 52 52 j.mu.Unlock() 53 53 } 54 54 55 + func (j *JetstreamClient) RemoveDid(did string) { 56 + if did == "" { 57 + return 58 + } 59 + 60 + if j.logDids { 61 + j.l.Info("removing did from in-memory filter", "did", did) 62 + } 63 + j.mu.Lock() 64 + delete(j.wantedDids, did) 65 + j.mu.Unlock() 66 + } 67 + 55 68 type processor func(context.Context, *models.Event) error 56 69 57 70 func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
+61
knotserver/ingester.go
··· 213 213 return h.db.InsertEvent(event, h.n) 214 214 } 215 215 216 + // duplicated from add collaborator 217 + func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error { 218 + repoAt, err := syntax.ParseATURI(record.Repo) 219 + if err != nil { 220 + return err 221 + } 222 + 223 + resolver := idresolver.DefaultResolver() 224 + 225 + subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 226 + if err != nil || subjectId.Handle.IsInvalidHandle() { 227 + return err 228 + } 229 + 230 + // TODO: fix this for good, we need to fetch the record here unfortunately 231 + // resolve this aturi to extract the repo record 232 + owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 233 + if err != nil || owner.Handle.IsInvalidHandle() { 234 + return fmt.Errorf("failed to resolve handle: %w", err) 235 + } 236 + 237 + xrpcc := xrpc.Client{ 238 + Host: owner.PDSEndpoint(), 239 + } 240 + 241 + resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 242 + if err != nil { 243 + return err 244 + } 245 + 246 + repo := resp.Value.Val.(*tangled.Repo) 247 + didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 248 + 249 + // check perms for this user 250 + if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 251 + return fmt.Errorf("insufficient permissions: %w", err) 252 + } 253 + 254 + if err := h.db.AddDid(subjectId.DID.String()); err != nil { 255 + return err 256 + } 257 + h.jc.AddDid(subjectId.DID.String()) 258 + 259 + if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 260 + return err 261 + } 262 + 263 + return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 264 + } 265 + 216 266 func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 217 267 l := log.FromContext(ctx) 218 268 ··· 292 342 if err := h.processKnotMember(ctx, did, record); err != nil { 293 343 return fmt.Errorf("failed to process knot member: %w", err) 294 344 } 345 + 295 346 case tangled.RepoPullNSID: 296 347 var record tangled.RepoPull 297 348 if err := json.Unmarshal(raw, &record); err != nil { ··· 300 351 if err := h.processPull(ctx, did, record); err != nil { 301 352 return fmt.Errorf("failed to process knot member: %w", err) 302 353 } 354 + 355 + case tangled.RepoCollaboratorNSID: 356 + var record tangled.RepoCollaborator 357 + if err := json.Unmarshal(raw, &record); err != nil { 358 + return fmt.Errorf("failed to unmarshal record: %w", err) 359 + } 360 + if err := h.processCollaborator(ctx, did, record); err != nil { 361 + return fmt.Errorf("failed to process knot member: %w", err) 362 + } 363 + 303 364 } 304 365 305 366 return err
+1
knotserver/server.go
··· 76 76 tangled.PublicKeyNSID, 77 77 tangled.KnotMemberNSID, 78 78 tangled.RepoPullNSID, 79 + tangled.RepoCollaboratorNSID, 79 80 }, nil, logger, db, true, c.Server.LogDids) 80 81 if err != nil { 81 82 logger.Error("failed to setup jetstream", "error", err)
+103
spindle/ingester.go
··· 8 8 9 9 "tangled.sh/tangled.sh/core/api/tangled" 10 10 "tangled.sh/tangled.sh/core/eventconsumer" 11 + "tangled.sh/tangled.sh/core/idresolver" 11 12 "tangled.sh/tangled.sh/core/rbac" 12 13 14 + comatproto "github.com/bluesky-social/indigo/api/atproto" 15 + "github.com/bluesky-social/indigo/atproto/identity" 16 + "github.com/bluesky-social/indigo/atproto/syntax" 17 + "github.com/bluesky-social/indigo/xrpc" 13 18 "github.com/bluesky-social/jetstream/pkg/models" 19 + securejoin "github.com/cyphar/filepath-securejoin" 14 20 ) 15 21 16 22 type Ingester func(ctx context.Context, e *models.Event) error ··· 35 41 s.ingestMember(ctx, e) 36 42 case tangled.RepoNSID: 37 43 s.ingestRepo(ctx, e) 44 + case tangled.RepoCollaboratorNSID: 45 + s.ingestCollaborator(ctx, e) 38 46 } 39 47 40 48 return err ··· 144 152 } 145 153 return nil 146 154 } 155 + 156 + func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 157 + var err error 158 + 159 + l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 160 + 161 + l.Info("ingesting collaborator record") 162 + 163 + switch e.Commit.Operation { 164 + case models.CommitOperationCreate, models.CommitOperationUpdate: 165 + raw := e.Commit.Record 166 + record := tangled.RepoCollaborator{} 167 + err = json.Unmarshal(raw, &record) 168 + if err != nil { 169 + l.Error("invalid record", "error", err) 170 + return err 171 + } 172 + 173 + resolver := idresolver.DefaultResolver() 174 + 175 + subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 176 + if err != nil || subjectId.Handle.IsInvalidHandle() { 177 + return err 178 + } 179 + 180 + repoAt, err := syntax.ParseATURI(record.Repo) 181 + if err != nil { 182 + l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 183 + return nil 184 + } 185 + 186 + // TODO: get rid of this entirely 187 + // resolve this aturi to extract the repo record 188 + owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 189 + if err != nil || owner.Handle.IsInvalidHandle() { 190 + return fmt.Errorf("failed to resolve handle: %w", err) 191 + } 192 + 193 + xrpcc := xrpc.Client{ 194 + Host: owner.PDSEndpoint(), 195 + } 196 + 197 + resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 198 + if err != nil { 199 + return err 200 + } 201 + 202 + repo := resp.Value.Val.(*tangled.Repo) 203 + didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 204 + 205 + // check perms for this user 206 + if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 207 + return fmt.Errorf("insufficient permissions: %w", err) 208 + } 209 + 210 + // add collaborator to rbac 211 + if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 212 + l.Error("failed to add repo to enforcer", "error", err) 213 + return fmt.Errorf("failed to add repo: %w", err) 214 + } 215 + 216 + return nil 217 + } 218 + return nil 219 + } 220 + 221 + func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error { 222 + l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 223 + 224 + l.Info("fetching and adding existing collaborators") 225 + 226 + xrpcc := xrpc.Client{ 227 + Host: owner.PDSEndpoint(), 228 + } 229 + 230 + resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false) 231 + if err != nil { 232 + return err 233 + } 234 + 235 + var errs error 236 + for _, r := range resp.Records { 237 + if r == nil { 238 + continue 239 + } 240 + record := r.Value.Val.(*tangled.RepoCollaborator) 241 + 242 + if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 243 + l.Error("failed to add repo to enforcer", "error", err) 244 + errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 245 + } 246 + } 247 + 248 + return errs 249 + }
+1
spindle/server.go
··· 103 103 collections := []string{ 104 104 tangled.SpindleMemberNSID, 105 105 tangled.RepoNSID, 106 + tangled.RepoCollaboratorNSID, 106 107 } 107 108 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 108 109 if err != nil {