Monorepo for Tangled tangled.org

appview, knotserver: add didAssign event and PDS rewrite on login #1143

open opened by oyster.cafe targeting master from oyster.cafe/tangled-core: master
Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mgprvt2evm22
+526 -41
Diff #5
+167 -2
appview/db/repos.go
··· 596 596 return GetRepo(e, orm.FilterEq("repo_did", repoDid)) 597 597 } 598 598 599 + func EnqueuePdsRewritesForRepo(tx *sql.Tx, repoDid, repoAtUri string) error { 600 + type record struct { 601 + userDidCol string 602 + table string 603 + nsid string 604 + fkCol string 605 + } 606 + sources := []record{ 607 + {"did", "repos", "sh.tangled.repo", "at_uri"}, 608 + {"did", "issues", "sh.tangled.repo.issue", "repo_at"}, 609 + {"owner_did", "pulls", "sh.tangled.repo.pull", "repo_at"}, 610 + {"did", "collaborators", "sh.tangled.repo.collaborator", "repo_at"}, 611 + {"did", "artifacts", "sh.tangled.repo.artifact", "repo_at"}, 612 + {"did", "stars", "sh.tangled.feed.star", "subject_at"}, 613 + } 614 + 615 + for _, src := range sources { 616 + rows, err := tx.Query( 617 + fmt.Sprintf(`SELECT %s, rkey FROM %s WHERE %s = ?`, src.userDidCol, src.table, src.fkCol), 618 + repoAtUri, 619 + ) 620 + if err != nil { 621 + return fmt.Errorf("query %s for pds rewrites: %w", src.table, err) 622 + } 623 + 624 + var pairs []struct{ did, rkey string } 625 + for rows.Next() { 626 + var d, r string 627 + if scanErr := rows.Scan(&d, &r); scanErr != nil { 628 + rows.Close() 629 + return fmt.Errorf("scan %s for pds rewrites: %w", src.table, scanErr) 630 + } 631 + pairs = append(pairs, struct{ did, rkey string }{d, r}) 632 + } 633 + rows.Close() 634 + if rowsErr := rows.Err(); rowsErr != nil { 635 + return fmt.Errorf("iterate %s for pds rewrites: %w", src.table, rowsErr) 636 + } 637 + 638 + for _, p := range pairs { 639 + if err := EnqueuePdsRewrite(tx, p.did, repoDid, src.nsid, p.rkey, repoAtUri); err != nil { 640 + return fmt.Errorf("enqueue pds rewrite for %s/%s: %w", src.table, p.rkey, err) 641 + } 642 + } 643 + } 644 + 645 + profileRows, err := tx.Query( 646 + `SELECT DISTINCT did FROM profile_pinned_repositories WHERE at_uri = ?`, 647 + repoAtUri, 648 + ) 649 + if err != nil { 650 + return fmt.Errorf("query profile_pinned_repositories for pds rewrites: %w", err) 651 + } 652 + var profileDids []string 653 + for profileRows.Next() { 654 + var d string 655 + if scanErr := profileRows.Scan(&d); scanErr != nil { 656 + profileRows.Close() 657 + return fmt.Errorf("scan profile_pinned_repositories for pds rewrites: %w", scanErr) 658 + } 659 + profileDids = append(profileDids, d) 660 + } 661 + profileRows.Close() 662 + if profileRowsErr := profileRows.Err(); profileRowsErr != nil { 663 + return fmt.Errorf("iterate profile_pinned_repositories for pds rewrites: %w", profileRowsErr) 664 + } 665 + 666 + for _, d := range profileDids { 667 + if err := EnqueuePdsRewrite(tx, d, repoDid, "sh.tangled.actor.profile", "self", repoAtUri); err != nil { 668 + return fmt.Errorf("enqueue pds rewrite for profile/%s: %w", d, err) 669 + } 670 + } 671 + 672 + return nil 673 + } 674 + 675 + type PdsRewrite struct { 676 + Id int 677 + RepoDid string 678 + RecordNsid string 679 + RecordRkey string 680 + OldRepoAt string 681 + } 682 + 683 + func GetPendingPdsRewrites(e Execer, userDid string) ([]PdsRewrite, error) { 684 + rows, err := e.Query( 685 + `SELECT id, repo_did, record_nsid, record_rkey, old_repo_at 686 + FROM pds_rewrite_status 687 + WHERE user_did = ? AND status = 'pending'`, 688 + userDid, 689 + ) 690 + if err != nil { 691 + return nil, err 692 + } 693 + defer rows.Close() 694 + 695 + var rewrites []PdsRewrite 696 + for rows.Next() { 697 + var r PdsRewrite 698 + if err := rows.Scan(&r.Id, &r.RepoDid, &r.RecordNsid, &r.RecordRkey, &r.OldRepoAt); err != nil { 699 + return nil, err 700 + } 701 + rewrites = append(rewrites, r) 702 + } 703 + return rewrites, rows.Err() 704 + } 705 + 706 + func CompletePdsRewrite(e Execer, id int) error { 707 + _, err := e.Exec( 708 + `UPDATE pds_rewrite_status SET status = 'done', updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ?`, 709 + id, 710 + ) 711 + return err 712 + } 713 + 599 714 func EnqueuePdsRewrite(e Execer, userDid, repoDid, recordNsid, recordRkey, oldRepoAt string) error { 600 715 _, err := e.Exec( 601 - `INSERT OR IGNORE INTO pds_rewrite_status 716 + `INSERT INTO pds_rewrite_status 602 717 (user_did, repo_did, record_nsid, record_rkey, old_repo_at, status) 603 - VALUES (?, ?, ?, ?, ?, 'pending')`, 718 + VALUES (?, ?, ?, ?, ?, 'pending') 719 + ON CONFLICT(user_did, record_nsid, record_rkey) DO UPDATE SET 720 + status = 'pending', 721 + repo_did = excluded.repo_did, 722 + old_repo_at = excluded.old_repo_at, 723 + updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')`, 604 724 userDid, repoDid, recordNsid, recordRkey, oldRepoAt, 605 725 ) 606 726 return err 607 727 } 608 728 729 + func CascadeRepoDid(tx *sql.Tx, repoAtUri, repoDid string) error { 730 + updates := []struct{ table, column string }{ 731 + {"repos", "at_uri"}, 732 + {"issues", "repo_at"}, 733 + {"pulls", "repo_at"}, 734 + {"collaborators", "repo_at"}, 735 + {"artifacts", "repo_at"}, 736 + {"webhooks", "repo_at"}, 737 + {"pull_comments", "repo_at"}, 738 + {"repo_issue_seqs", "repo_at"}, 739 + {"repo_pull_seqs", "repo_at"}, 740 + {"repo_languages", "repo_at"}, 741 + {"repo_labels", "repo_at"}, 742 + {"profile_pinned_repositories", "at_uri"}, 743 + } 744 + 745 + for _, u := range updates { 746 + _, err := tx.Exec( 747 + fmt.Sprintf(`UPDATE %s SET repo_did = ? WHERE %s = ?`, u.table, u.column), 748 + repoDid, repoAtUri, 749 + ) 750 + if err != nil { 751 + return fmt.Errorf("cascade repo_did to %s: %w", u.table, err) 752 + } 753 + } 754 + 755 + _, err := tx.Exec( 756 + `UPDATE stars SET subject_did = ? WHERE subject_at = ?`, 757 + repoDid, repoAtUri, 758 + ) 759 + if err != nil { 760 + return fmt.Errorf("cascade subject_did to stars: %w", err) 761 + } 762 + 763 + _, err = tx.Exec( 764 + `UPDATE repos SET source = ? WHERE source = ?`, 765 + repoDid, repoAtUri, 766 + ) 767 + if err != nil { 768 + return fmt.Errorf("cascade repo_did to repos.source: %w", err) 769 + } 770 + 771 + return nil 772 + } 773 + 609 774 func UpdateDescription(e Execer, repoAt, newDescription string) error { 610 775 _, err := e.Exec( 611 776 `update repos set description = ? where at_uri = ?`, newDescription, repoAt)
+140
appview/oauth/handler.go
··· 14 14 15 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 16 "github.com/bluesky-social/indigo/atproto/auth/oauth" 17 + atpclient "github.com/bluesky-social/indigo/atproto/client" 17 18 lexutil "github.com/bluesky-social/indigo/lex/util" 18 19 xrpc "github.com/bluesky-social/indigo/xrpc" 19 20 "github.com/go-chi/chi/v5" ··· 95 96 go o.addToDefaultSpindle(sessData.AccountDID.String()) 96 97 go o.ensureTangledProfile(sessData) 97 98 go o.autoClaimTnglShDomain(sessData.AccountDID.String()) 99 + go o.drainPdsRewrites(sessData) 98 100 99 101 if !o.Config.Core.Dev { 100 102 err = o.Posthog.Enqueue(posthog.Capture{ ··· 273 275 l.Debug("successfully created empty Tangled profile on PDS and DB") 274 276 } 275 277 278 + func (o *OAuth) drainPdsRewrites(sessData *oauth.ClientSessionData) { 279 + ctx := context.Background() 280 + did := sessData.AccountDID.String() 281 + l := o.Logger.With("did", did, "handler", "drainPdsRewrites") 282 + 283 + rewrites, err := db.GetPendingPdsRewrites(o.Db, did) 284 + if err != nil { 285 + l.Error("failed to get pending rewrites", "err", err) 286 + return 287 + } 288 + if len(rewrites) == 0 { 289 + return 290 + } 291 + 292 + l.Info("draining pending PDS rewrites", "count", len(rewrites)) 293 + 294 + sess, err := o.ClientApp.ResumeSession(ctx, sessData.AccountDID, sessData.SessionID) 295 + if err != nil { 296 + l.Error("failed to resume session for PDS rewrites", "err", err) 297 + return 298 + } 299 + client := sess.APIClient() 300 + 301 + for _, rw := range rewrites { 302 + if err := o.rewritePdsRecord(ctx, client, did, rw); err != nil { 303 + l.Error("failed to rewrite PDS record", 304 + "nsid", rw.RecordNsid, 305 + "rkey", rw.RecordRkey, 306 + "repo_did", rw.RepoDid, 307 + "err", err) 308 + continue 309 + } 310 + 311 + if err := db.CompletePdsRewrite(o.Db, rw.Id); err != nil { 312 + l.Error("failed to mark rewrite complete", "id", rw.Id, "err", err) 313 + } 314 + } 315 + } 316 + 317 + func (o *OAuth) rewritePdsRecord(ctx context.Context, client *atpclient.APIClient, userDid string, rw db.PdsRewrite) error { 318 + ex, err := comatproto.RepoGetRecord(ctx, client, "", rw.RecordNsid, userDid, rw.RecordRkey) 319 + if err != nil { 320 + return fmt.Errorf("get record: %w", err) 321 + } 322 + 323 + val := ex.Value.Val 324 + repoDid := rw.RepoDid 325 + 326 + switch rw.RecordNsid { 327 + case tangled.RepoNSID: 328 + rec, ok := val.(*tangled.Repo) 329 + if !ok { 330 + return fmt.Errorf("unexpected type for repo record") 331 + } 332 + rec.RepoDid = &repoDid 333 + 334 + case tangled.RepoIssueNSID: 335 + rec, ok := val.(*tangled.RepoIssue) 336 + if !ok { 337 + return fmt.Errorf("unexpected type for issue record") 338 + } 339 + rec.RepoDid = &repoDid 340 + rec.Repo = nil 341 + 342 + case tangled.RepoPullNSID: 343 + rec, ok := val.(*tangled.RepoPull) 344 + if !ok { 345 + return fmt.Errorf("unexpected type for pull record") 346 + } 347 + if rec.Target != nil { 348 + rec.Target.RepoDid = &repoDid 349 + rec.Target.Repo = nil 350 + } 351 + if rec.Source != nil && rec.Source.Repo != nil && *rec.Source.Repo == rw.OldRepoAt { 352 + rec.Source.RepoDid = &repoDid 353 + rec.Source.Repo = nil 354 + } 355 + 356 + case tangled.RepoCollaboratorNSID: 357 + rec, ok := val.(*tangled.RepoCollaborator) 358 + if !ok { 359 + return fmt.Errorf("unexpected type for collaborator record") 360 + } 361 + rec.RepoDid = &repoDid 362 + rec.Repo = nil 363 + 364 + case tangled.RepoArtifactNSID: 365 + rec, ok := val.(*tangled.RepoArtifact) 366 + if !ok { 367 + return fmt.Errorf("unexpected type for artifact record") 368 + } 369 + rec.RepoDid = &repoDid 370 + rec.Repo = nil 371 + 372 + case tangled.FeedStarNSID: 373 + rec, ok := val.(*tangled.FeedStar) 374 + if !ok { 375 + return fmt.Errorf("unexpected type for star record") 376 + } 377 + rec.SubjectDid = &repoDid 378 + rec.Subject = nil 379 + 380 + case tangled.ActorProfileNSID: 381 + rec, ok := val.(*tangled.ActorProfile) 382 + if !ok { 383 + return fmt.Errorf("unexpected type for profile record") 384 + } 385 + var dids []string 386 + var remaining []string 387 + for _, pinUri := range rec.PinnedRepositories { 388 + repo, repoErr := db.GetRepoByAtUri(o.Db, pinUri) 389 + if repoErr != nil || repo.RepoDid == "" { 390 + remaining = append(remaining, pinUri) 391 + continue 392 + } 393 + dids = append(dids, repo.RepoDid) 394 + } 395 + rec.PinnedRepositoryDids = append(rec.PinnedRepositoryDids, dids...) 396 + rec.PinnedRepositories = remaining 397 + 398 + default: 399 + return fmt.Errorf("unsupported NSID for PDS rewrite: %s", rw.RecordNsid) 400 + } 401 + 402 + _, err = comatproto.RepoPutRecord(ctx, client, &comatproto.RepoPutRecord_Input{ 403 + Collection: rw.RecordNsid, 404 + Repo: userDid, 405 + Rkey: rw.RecordRkey, 406 + SwapRecord: ex.Cid, 407 + Record: &lexutil.LexiconTypeDecoder{Val: val}, 408 + }) 409 + if err != nil { 410 + return fmt.Errorf("put record: %w", err) 411 + } 412 + 413 + return nil 414 + } 415 + 276 416 // create a AppPasswordSession using apppasswords 277 417 type AppPasswordSession struct { 278 418 AccessJwt string `json:"accessJwt"`
+85
appview/state/knotstream.go
··· 20 20 "tangled.org/core/appview/sites" 21 21 ec "tangled.org/core/eventconsumer" 22 22 "tangled.org/core/eventconsumer/cursor" 23 + knotdb "tangled.org/core/knotserver/db" 23 24 "tangled.org/core/log" 24 25 "tangled.org/core/orm" 25 26 "tangled.org/core/rbac" ··· 74 75 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg) 75 76 case tangled.PipelineNSID: 76 77 return ingestPipeline(d, source, msg) 78 + case knotdb.RepoDIDAssignNSID: 79 + return ingestDIDAssign(d, enforcer, source, msg, ctx) 77 80 } 78 81 79 82 return nil ··· 374 377 375 378 return nil 376 379 } 380 + 381 + func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg ec.Message, ctx context.Context) error { 382 + logger := log.FromContext(ctx) 383 + 384 + var record knotdb.RepoDIDAssign 385 + if err := json.Unmarshal(msg.EventJson, &record); err != nil { 386 + return fmt.Errorf("unmarshal didAssign: %w", err) 387 + } 388 + 389 + if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" { 390 + return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q", 391 + record.RepoDid, record.OwnerDid, record.RepoName) 392 + } 393 + 394 + logger.Info("processing didAssign event", 395 + "repo_did", record.RepoDid, 396 + "owner_did", record.OwnerDid, 397 + "repo_name", record.RepoName) 398 + 399 + repos, err := db.GetRepos(d, 1, 400 + orm.FilterEq("did", record.OwnerDid), 401 + orm.FilterEq("name", record.RepoName), 402 + ) 403 + if err != nil || len(repos) == 0 { 404 + logger.Warn("didAssign for unknown repo, skipping", 405 + "owner_did", record.OwnerDid, 406 + "repo_name", record.RepoName) 407 + return nil 408 + } 409 + repo := repos[0] 410 + repoAtUri := repo.RepoAt().String() 411 + knot := source.Key() 412 + legacyResource := record.OwnerDid + "/" + record.RepoName 413 + 414 + if repo.RepoDid != record.RepoDid { 415 + tx, err := d.Begin() 416 + if err != nil { 417 + return fmt.Errorf("begin didAssign txn: %w", err) 418 + } 419 + defer tx.Rollback() 420 + 421 + if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil { 422 + return fmt.Errorf("cascade repo_did: %w", err) 423 + } 424 + 425 + if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil { 426 + return fmt.Errorf("enqueue pds rewrites: %w", err) 427 + } 428 + 429 + if err := tx.Commit(); err != nil { 430 + return fmt.Errorf("commit didAssign txn: %w", err) 431 + } 432 + } 433 + 434 + if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil { 435 + return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err) 436 + } 437 + if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil { 438 + return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err) 439 + } 440 + 441 + collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_at", repoAtUri)) 442 + if collabErr != nil { 443 + return fmt.Errorf("get collaborators for RBAC update: %w", collabErr) 444 + } 445 + for _, c := range collabs { 446 + collabDid := c.SubjectDid.String() 447 + if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil { 448 + return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err) 449 + } 450 + if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil { 451 + return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err) 452 + } 453 + } 454 + 455 + logger.Info("didAssign processed successfully", 456 + "repo_did", record.RepoDid, 457 + "owner_did", record.OwnerDid, 458 + "repo_name", record.RepoName) 459 + 460 + return nil 461 + }
+10
knotserver/db/didassign.go
··· 1 + package db 2 + 3 + const RepoDIDAssignNSID = "sh.tangled.repo.didAssign" 4 + 5 + type RepoDIDAssign struct { 6 + OwnerDid string `json:"ownerDid"` 7 + RepoName string `json:"repoName"` 8 + RepoDid string `json:"repoDid"` 9 + OldRepoAt string `json:"oldRepoAt,omitempty"` 10 + }
+24
knotserver/db/events.go
··· 1 1 package db 2 2 3 3 import ( 4 + "encoding/json" 4 5 "fmt" 5 6 "time" 6 7 8 + "github.com/bluesky-social/indigo/atproto/syntax" 7 9 "tangled.org/core/notifier" 8 10 ) 9 11 12 + var tidClock = syntax.NewTIDClock(0) 13 + 10 14 type Event struct { 11 15 Rkey string `json:"rkey"` 12 16 Nsid string `json:"nsid"` ··· 29 33 return err 30 34 } 31 35 36 + func (d *DB) EmitDIDAssign(n *notifier.Notifier, ownerDid, repoName, repoDid, oldRepoAt string) error { 37 + payload := RepoDIDAssign{ 38 + OwnerDid: ownerDid, 39 + RepoName: repoName, 40 + RepoDid: repoDid, 41 + OldRepoAt: oldRepoAt, 42 + } 43 + 44 + eventJson, err := json.Marshal(payload) 45 + if err != nil { 46 + return fmt.Errorf("marshal didAssign event: %w", err) 47 + } 48 + 49 + return d.InsertEvent(Event{ 50 + Rkey: tidClock.Next().String(), 51 + Nsid: RepoDIDAssignNSID, 52 + EventJson: string(eventJson), 53 + }, n) 54 + } 55 + 32 56 func (d *DB) GetEvents(cursor int64) ([]Event, error) { 33 57 whereClause := "" 34 58 args := []any{}
+26 -14
knotserver/events.go
··· 54 54 cursor = defaultCursor 55 55 } 56 56 57 - // complete backfill first before going to live data 58 57 l.Debug("going through backfill", "cursor", cursor) 59 - if err := h.streamOps(conn, &cursor); err != nil { 58 + if err := h.drainBackfill(conn, &cursor, 10_000); err != nil { 60 59 l.Error("failed to backfill", "err", err) 61 60 return 62 61 } ··· 68 67 l.Debug("stopping stream: client closed connection") 69 68 return 70 69 case <-ch: 71 - // we have been notified of new data 72 70 l.Debug("going through live data", "cursor", cursor) 73 - if err := h.streamOps(conn, &cursor); err != nil { 71 + if _, err := h.streamOps(conn, &cursor); err != nil { 74 72 l.Error("failed to stream", "err", err) 75 73 return 76 74 } ··· 83 81 } 84 82 } 85 83 86 - func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error { 84 + func (h *Knot) drainBackfill(conn *websocket.Conn, cursor *int64, maxBatches int) error { 85 + for range maxBatches { 86 + n, err := h.streamOps(conn, cursor) 87 + if err != nil { 88 + return err 89 + } 90 + if n < 100 { 91 + return nil 92 + } 93 + } 94 + h.l.Warn("backfill hit batch limit", "maxBatches", maxBatches, "cursor", *cursor) 95 + return nil 96 + } 97 + 98 + func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) (int, error) { 87 99 events, err := h.db.GetEvents(*cursor) 88 100 if err != nil { 89 101 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor) 90 - return err 102 + return 0, err 91 103 } 92 104 93 105 for _, event := range events { 94 - // first extract the inner json into a map 95 106 var eventJson map[string]any 96 107 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 97 108 if err != nil { 98 109 h.l.Error("failed to unmarshal event", "err", err) 99 - return err 110 + return 0, err 100 111 } 101 112 102 113 jsonMsg, err := json.Marshal(map[string]any{ 103 - "rkey": event.Rkey, 104 - "nsid": event.Nsid, 105 - "event": eventJson, 114 + "rkey": event.Rkey, 115 + "nsid": event.Nsid, 116 + "event": eventJson, 117 + "created": event.Created, 106 118 }) 107 119 if err != nil { 108 120 h.l.Error("failed to marshal record", "err", err) 109 - return err 121 + return 0, err 110 122 } 111 123 112 124 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 113 125 h.l.Debug("err", "err", err) 114 - return err 126 + return 0, err 115 127 } 116 128 *cursor = event.Created 117 129 } 118 130 119 - return nil 131 + return len(events), nil 120 132 }
+15 -5
knotserver/git.go
··· 5 5 "fmt" 6 6 "io" 7 7 "net/http" 8 + "os" 9 + "path/filepath" 8 10 "strings" 9 11 12 + securejoin "github.com/cyphar/filepath-securejoin" 10 13 "github.com/go-chi/chi/v5" 11 14 "tangled.org/core/knotserver/git/service" 12 15 ) ··· 24 27 } 25 28 26 29 repoDid, err := h.db.GetRepoDid(did, name) 27 - if err != nil { 28 - return "", "", fmt.Errorf("repo not found: %w", err) 30 + if err == nil { 31 + repoPath, _, _, resolveErr := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 32 + if resolveErr == nil { 33 + return repoPath, name, nil 34 + } 29 35 } 30 - repoPath, _, _, err := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 31 - if err != nil { 32 - return "", "", fmt.Errorf("repo not found: %w", err) 36 + 37 + repoPath, joinErr := securejoin.SecureJoin(h.c.Repo.ScanPath, filepath.Join(did, name)) 38 + if joinErr != nil { 39 + return "", "", fmt.Errorf("repo not found: %w", joinErr) 40 + } 41 + if _, statErr := os.Stat(repoPath); statErr != nil { 42 + return "", "", fmt.Errorf("repo not found: %w", statErr) 33 43 } 34 44 return repoPath, name, nil 35 45 }
+28 -13
knotserver/internal.go
··· 6 6 "fmt" 7 7 "log/slog" 8 8 "net/http" 9 + "os" 9 10 "path/filepath" 10 11 "strings" 11 12 13 + securejoin "github.com/cyphar/filepath-securejoin" 12 14 "github.com/go-chi/chi/v5" 13 15 "github.com/go-chi/chi/v5/middleware" 14 16 "github.com/go-git/go-git/v5/plumbing" ··· 124 126 ownerDid := repoOwnerIdent.DID.String() 125 127 repoName := components[1] 126 128 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 127 - if didErr != nil { 128 - w.WriteHeader(http.StatusNotFound) 129 - l.Error("repo DID not found", "owner", ownerDid, "name", repoName, "err", didErr) 130 - fmt.Fprintln(w, "repo not found") 131 - return 132 - } 133 - repoPath, _, _, lookupErr := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 134 - if lookupErr != nil { 135 - w.WriteHeader(http.StatusNotFound) 136 - l.Error("repo not found on disk", "repoDid", repoDid, "err", lookupErr) 137 - fmt.Fprintln(w, "repo not found") 138 - return 129 + var repoPath string 130 + if didErr == nil { 131 + var lookupErr error 132 + repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 133 + if lookupErr != nil { 134 + w.WriteHeader(http.StatusNotFound) 135 + l.Error("repo not found on disk", "repoDid", repoDid, "err", lookupErr) 136 + fmt.Fprintln(w, "repo not found") 137 + return 138 + } 139 + rbacResource = repoDid 140 + } else { 141 + legacyPath, joinErr := securejoin.SecureJoin(h.c.Repo.ScanPath, filepath.Join(ownerDid, repoName)) 142 + if joinErr != nil { 143 + w.WriteHeader(http.StatusNotFound) 144 + fmt.Fprintln(w, "repo not found") 145 + return 146 + } 147 + if _, statErr := os.Stat(legacyPath); statErr != nil { 148 + w.WriteHeader(http.StatusNotFound) 149 + l.Error("repo not found on disk (legacy)", "owner", ownerDid, "name", repoName) 150 + fmt.Fprintln(w, "repo not found") 151 + return 152 + } 153 + repoPath = legacyPath 154 + rbacResource = ownerDid + "/" + repoName 139 155 } 140 - rbacResource = repoDid 141 156 rel, relErr := filepath.Rel(h.c.Repo.ScanPath, repoPath) 142 157 if relErr != nil { 143 158 w.WriteHeader(http.StatusInternalServerError)
+28 -4
knotserver/xrpc/xrpc.go
··· 4 4 "encoding/json" 5 5 "log/slog" 6 6 "net/http" 7 + "os" 8 + "path/filepath" 7 9 "strings" 8 10 11 + securejoin "github.com/cyphar/filepath-securejoin" 12 + "github.com/go-chi/chi/v5" 9 13 "tangled.org/core/api/tangled" 10 14 "tangled.org/core/idresolver" 11 15 "tangled.org/core/jetstream" ··· 15 19 "tangled.org/core/rbac" 16 20 xrpcerr "tangled.org/core/xrpc/errors" 17 21 "tangled.org/core/xrpc/serviceauth" 18 - 19 - "github.com/go-chi/chi/v5" 20 22 ) 21 23 22 24 type Xrpc struct { ··· 85 87 ) 86 88 } 87 89 88 - repoPath, _, _, err := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repo) 89 - if err != nil { 90 + if !strings.Contains(repo, "/") { 91 + repoPath, _, _, err := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repo) 92 + if err != nil { 93 + return "", xrpcerr.RepoNotFoundError 94 + } 95 + return repoPath, nil 96 + } 97 + 98 + parts := strings.SplitN(repo, "/", 2) 99 + ownerDid, repoName := parts[0], parts[1] 100 + 101 + repoDid, err := x.Db.GetRepoDid(ownerDid, repoName) 102 + if err == nil { 103 + repoPath, _, _, resolveErr := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repoDid) 104 + if resolveErr == nil { 105 + return repoPath, nil 106 + } 107 + } 108 + 109 + repoPath, joinErr := securejoin.SecureJoin(x.Config.Repo.ScanPath, filepath.Join(ownerDid, repoName)) 110 + if joinErr != nil { 111 + return "", xrpcerr.RepoNotFoundError 112 + } 113 + if _, statErr := os.Stat(repoPath); statErr != nil { 90 114 return "", xrpcerr.RepoNotFoundError 91 115 } 92 116 return repoPath, nil
+2 -2
rbac/rbac.go
··· 34 34 ) 35 35 36 36 type Enforcer struct { 37 - E *casbin.Enforcer 37 + E *casbin.SyncedEnforcer 38 38 } 39 39 40 40 func NewEnforcer(path string) (*Enforcer, error) { ··· 53 53 return nil, err 54 54 } 55 55 56 - e, err := casbin.NewEnforcer(m, a) 56 + e, err := casbin.NewSyncedEnforcer(m, a) 57 57 if err != nil { 58 58 return nil, err 59 59 }
+1 -1
rbac/rbac_test.go
··· 23 23 m, err := model.NewModelFromString(rbac.Model) 24 24 assert.NoError(t, err) 25 25 26 - e, err := casbin.NewEnforcer(m, a) 26 + e, err := casbin.NewSyncedEnforcer(m, a) 27 27 assert.NoError(t, err) 28 28 29 29 e.EnableAutoSave(false)

History

12 rounds 1 comment
sign up or login to add to the discussion
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
merge conflicts detected
expand
  • go.mod:34
  • go.sum:339
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 1 comment

everything else lgtm!

1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments