forked from tangled.org/core
Monorepo for Tangled

knotserver: improve ingester to handle deletions/updates

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

anirudh.fi 4246a57c 6a968a8f

verified
Changed files
+146 -73
knotserver
-1
knotserver/handler.go
··· 86 86 }) 87 87 88 88 r.Route("/languages", func(r chi.Router) { 89 - r.With(h.VerifySignature) 90 89 r.Get("/", h.RepoLanguages) 91 90 r.Get("/{ref}", h.RepoLanguages) 92 91 })
+146 -72
knotserver/ingester.go
··· 25 25 "tangled.sh/tangled.sh/core/workflow" 26 26 ) 27 27 28 - func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 28 + func (h *Handle) processPublicKey(ctx context.Context, did string, operation string, record tangled.PublicKey) error { 29 29 l := log.FromContext(ctx) 30 - pk := db.PublicKey{ 31 - Did: did, 32 - PublicKey: record, 30 + 31 + switch operation { 32 + case models.CommitOperationCreate, models.CommitOperationUpdate: 33 + pk := db.PublicKey{ 34 + Did: did, 35 + PublicKey: record, 36 + } 37 + if err := h.db.AddPublicKey(pk); err != nil { 38 + l.Error("failed to add public key", "error", err) 39 + return fmt.Errorf("failed to add public key: %w", err) 40 + } 41 + l.Info("added public key from firehose", "did", did) 42 + 43 + case models.CommitOperationDelete: 44 + if err := h.db.RemovePublicKey(did); err != nil { 45 + l.Error("failed to remove public key", "error", err) 46 + return fmt.Errorf("failed to remove public key: %w", err) 47 + } 48 + l.Info("removed public key (delete triggered from firehose)", "did", did) 33 49 } 34 - if err := h.db.AddPublicKey(pk); err != nil { 35 - l.Error("failed to add public key", "error", err) 36 - return fmt.Errorf("failed to add public key: %w", err) 37 - } 38 - l.Info("added public key from firehose", "did", did) 50 + 39 51 return nil 40 52 } 41 53 42 - func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 54 + func (h *Handle) processKnotMember(ctx context.Context, did string, operation string, record tangled.KnotMember) error { 43 55 l := log.FromContext(ctx) 44 56 45 - if record.Domain != h.c.Server.Hostname { 46 - l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 47 - return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 48 - } 57 + switch operation { 58 + case models.CommitOperationCreate, models.CommitOperationUpdate: 59 + if record.Domain != h.c.Server.Hostname { 60 + l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 61 + return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 62 + } 49 63 50 - ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 51 - if err != nil || !ok { 52 - l.Error("failed to add member", "did", did) 53 - return fmt.Errorf("failed to enforce permissions: %w", err) 54 - } 64 + ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 65 + if err != nil || !ok { 66 + l.Error("failed to add member", "did", did) 67 + return fmt.Errorf("failed to enforce permissions: %w", err) 68 + } 69 + 70 + if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 71 + l.Error("failed to add member", "error", err) 72 + return fmt.Errorf("failed to add member: %w", err) 73 + } 74 + l.Info("added member from firehose", "member", record.Subject) 75 + 76 + if err := h.db.AddDid(did); err != nil { 77 + l.Error("failed to add did", "error", err) 78 + return fmt.Errorf("failed to add did: %w", err) 79 + } 80 + h.jc.AddDid(did) 55 81 56 - if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 57 - l.Error("failed to add member", "error", err) 58 - return fmt.Errorf("failed to add member: %w", err) 59 - } 60 - l.Info("added member from firehose", "member", record.Subject) 82 + if err := h.fetchAndAddKeys(ctx, did); err != nil { 83 + return fmt.Errorf("failed to fetch and add keys: %w", err) 84 + } 61 85 62 - if err := h.db.AddDid(did); err != nil { 63 - l.Error("failed to add did", "error", err) 64 - return fmt.Errorf("failed to add did: %w", err) 65 - } 66 - h.jc.AddDid(did) 86 + case models.CommitOperationDelete: 87 + if err := h.e.RemoveKnotMember(rbac.ThisServer, record.Subject); err != nil { 88 + l.Error("failed to remove member", "error", err) 89 + return fmt.Errorf("failed to remove member: %w", err) 90 + } 91 + l.Info("removed member (delete triggered from firehose)", "member", record.Subject) 67 92 68 - if err := h.fetchAndAddKeys(ctx, did); err != nil { 69 - return fmt.Errorf("failed to fetch and add keys: %w", err) 93 + if err := h.db.RemoveDid(record.Subject); err != nil { 94 + l.Error("failed to remove did", "error", err) 95 + return fmt.Errorf("failed to remove did: %w", err) 96 + } 97 + h.jc.RemoveDid(record.Subject) 70 98 } 71 99 72 100 return nil ··· 214 242 } 215 243 216 244 // duplicated from add collaborator 217 - func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error { 218 - repoAt, err := syntax.ParseATURI(record.Repo) 219 - if err != nil { 220 - return err 221 - } 245 + func (h *Handle) processCollaborator(ctx context.Context, did string, operation string, record tangled.RepoCollaborator) error { 246 + l := log.FromContext(ctx) 247 + l = l.With("handler", "processCollaborator", "did", did) 222 248 223 - resolver := idresolver.DefaultResolver() 249 + switch operation { 250 + case models.CommitOperationCreate, models.CommitOperationUpdate: 251 + repoAt, err := syntax.ParseATURI(record.Repo) 252 + if err != nil { 253 + return err 254 + } 224 255 225 - subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 226 - if err != nil || subjectId.Handle.IsInvalidHandle() { 227 - return err 228 - } 256 + resolver := h.resolver 229 257 230 - // TODO: fix this for good, we need to fetch the record here unfortunately 231 - // resolve this aturi to extract the repo record 232 - owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 233 - if err != nil || owner.Handle.IsInvalidHandle() { 234 - return fmt.Errorf("failed to resolve handle: %w", err) 235 - } 258 + subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 259 + if err != nil || subjectId.Handle.IsInvalidHandle() { 260 + return err 261 + } 262 + 263 + // TODO: fix this for good, we need to fetch the record here unfortunately 264 + // resolve this aturi to extract the repo record 265 + owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 266 + if err != nil || owner.Handle.IsInvalidHandle() { 267 + return fmt.Errorf("failed to resolve handle: %w", err) 268 + } 269 + 270 + xrpcc := xrpc.Client{ 271 + Host: owner.PDSEndpoint(), 272 + } 273 + 274 + resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 275 + if err != nil { 276 + return err 277 + } 278 + 279 + repo := resp.Value.Val.(*tangled.Repo) 280 + didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 281 + 282 + // check perms for this user 283 + if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 284 + return fmt.Errorf("insufficient permissions: %w", err) 285 + } 286 + 287 + if err := h.db.AddDid(subjectId.DID.String()); err != nil { 288 + return err 289 + } 290 + h.jc.AddDid(subjectId.DID.String()) 291 + 292 + if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 293 + return err 294 + } 295 + 296 + l.Info("added collaborator from firehose", "subject", record.Subject, "repo", record.Repo) 297 + 298 + return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 299 + 300 + case models.CommitOperationDelete: 301 + repoAt, err := syntax.ParseATURI(record.Repo) 302 + if err != nil { 303 + return err 304 + } 305 + 306 + resolver := h.resolver 307 + 308 + subjectId, err := resolver.ResolveIdent(ctx, record.Subject) 309 + if err != nil || subjectId.Handle.IsInvalidHandle() { 310 + return err 311 + } 236 312 237 - xrpcc := xrpc.Client{ 238 - Host: owner.PDSEndpoint(), 239 - } 313 + owner, err := resolver.ResolveIdent(ctx, repoAt.Authority().String()) 314 + if err != nil || owner.Handle.IsInvalidHandle() { 315 + return fmt.Errorf("failed to resolve handle: %w", err) 316 + } 240 317 241 - resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 242 - if err != nil { 243 - return err 244 - } 318 + xrpcc := xrpc.Client{ 319 + Host: owner.PDSEndpoint(), 320 + } 245 321 246 - repo := resp.Value.Val.(*tangled.Repo) 247 - didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 322 + resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 323 + if err != nil { 324 + return err 325 + } 248 326 249 - // check perms for this user 250 - if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 251 - return fmt.Errorf("insufficient permissions: %w", err) 252 - } 327 + repo := resp.Value.Val.(*tangled.Repo) 328 + didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 253 329 254 - if err := h.db.AddDid(subjectId.DID.String()); err != nil { 255 - return err 256 - } 257 - h.jc.AddDid(subjectId.DID.String()) 330 + if err := h.e.RemoveCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 331 + l.Error("failed to remove collaborator", "error", err) 332 + return fmt.Errorf("failed to remove collaborator: %w", err) 333 + } 258 334 259 - if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 260 - return err 335 + l.Info("removed collaborator from firehose", "subject", record.Subject, "repo", record.Repo) 261 336 } 262 337 263 - return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 338 + return nil 264 339 } 265 340 266 341 func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { ··· 329 404 if err := json.Unmarshal(raw, &record); err != nil { 330 405 return fmt.Errorf("failed to unmarshal record: %w", err) 331 406 } 332 - if err := h.processPublicKey(ctx, did, record); err != nil { 407 + if err := h.processPublicKey(ctx, did, event.Commit.Operation, record); err != nil { 333 408 return fmt.Errorf("failed to process public key: %w", err) 334 409 } 335 410 ··· 338 413 if err := json.Unmarshal(raw, &record); err != nil { 339 414 return fmt.Errorf("failed to unmarshal record: %w", err) 340 415 } 341 - if err := h.processKnotMember(ctx, did, record); err != nil { 416 + if err := h.processKnotMember(ctx, did, event.Commit.Operation, record); err != nil { 342 417 return fmt.Errorf("failed to process knot member: %w", err) 343 418 } 344 419 ··· 356 431 if err := json.Unmarshal(raw, &record); err != nil { 357 432 return fmt.Errorf("failed to unmarshal record: %w", err) 358 433 } 359 - if err := h.processCollaborator(ctx, did, record); err != nil { 360 - return fmt.Errorf("failed to process knot member: %w", err) 434 + if err := h.processCollaborator(ctx, did, event.Commit.Operation, record); err != nil { 435 + return fmt.Errorf("failed to process collaborator: %w", err) 361 436 } 362 - 363 437 } 364 438 365 439 return err