Monorepo for Tangled tangled.org

appview, knotserver: add didAssign event and PDS rewrite on login #1143

open opened by oyster.cafe targeting master from oyster.cafe/tangled-core: master
Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mgprvt2evm22
+527 -41
Diff #1
+168 -2
appview/db/repos.go
··· 596 596 return GetRepo(e, orm.FilterEq("repo_did", repoDid)) 597 597 } 598 598 599 + func EnqueuePdsRewritesForRepo(tx *sql.Tx, repoDid, repoAtUri string) error { 600 + type record struct { 601 + userDidCol string 602 + table string 603 + nsid string 604 + fkCol string 605 + } 606 + sources := []record{ 607 + {"did", "repos", "sh.tangled.repo", "at_uri"}, 608 + {"did", "issues", "sh.tangled.repo.issue", "repo_at"}, 609 + {"owner_did", "pulls", "sh.tangled.repo.pull", "repo_at"}, 610 + {"did", "collaborators", "sh.tangled.repo.collaborator", "repo_at"}, 611 + {"did", "artifacts", "sh.tangled.repo.artifact", "repo_at"}, 612 + {"did", "stars", "sh.tangled.feed.star", "subject_at"}, 613 + } 614 + 615 + for _, src := range sources { 616 + rows, err := tx.Query( 617 + fmt.Sprintf(`SELECT %s, rkey FROM %s WHERE %s = ?`, src.userDidCol, src.table, src.fkCol), 618 + repoAtUri, 619 + ) 620 + if err != nil { 621 + return fmt.Errorf("query %s for pds rewrites: %w", src.table, err) 622 + } 623 + 624 + var pairs []struct{ did, rkey string } 625 + for rows.Next() { 626 + var d, r string 627 + if scanErr := rows.Scan(&d, &r); scanErr != nil { 628 + rows.Close() 629 + return fmt.Errorf("scan %s for pds rewrites: %w", src.table, scanErr) 630 + } 631 + pairs = append(pairs, struct{ did, rkey string }{d, r}) 632 + } 633 + rows.Close() 634 + if rowsErr := rows.Err(); rowsErr != nil { 635 + return fmt.Errorf("iterate %s for pds rewrites: %w", src.table, rowsErr) 636 + } 637 + 638 + for _, p := range pairs { 639 + if err := EnqueuePdsRewrite(tx, p.did, repoDid, src.nsid, p.rkey, repoAtUri); err != nil { 640 + return fmt.Errorf("enqueue pds rewrite for %s/%s: %w", src.table, p.rkey, err) 641 + } 642 + } 643 + } 644 + 645 + profileRows, err := tx.Query( 646 + `SELECT DISTINCT did FROM profile_pinned_repositories WHERE at_uri = ?`, 647 + repoAtUri, 648 + ) 649 + if err != nil { 650 + return fmt.Errorf("query profile_pinned_repositories for pds rewrites: %w", err) 651 + } 652 + var profileDids []string 653 + for profileRows.Next() { 654 + var d string 655 + if scanErr := profileRows.Scan(&d); scanErr != nil { 656 + profileRows.Close() 657 + return fmt.Errorf("scan profile_pinned_repositories for pds rewrites: %w", scanErr) 658 + } 659 + profileDids = append(profileDids, d) 660 + } 661 + profileRows.Close() 662 + if profileRowsErr := profileRows.Err(); profileRowsErr != nil { 663 + return fmt.Errorf("iterate profile_pinned_repositories for pds rewrites: %w", profileRowsErr) 664 + } 665 + 666 + for _, d := range profileDids { 667 + if err := EnqueuePdsRewrite(tx, d, repoDid, "sh.tangled.actor.profile", "self", repoAtUri); err != nil { 668 + return fmt.Errorf("enqueue pds rewrite for profile/%s: %w", d, err) 669 + } 670 + } 671 + 672 + return nil 673 + } 674 + 675 + type PdsRewrite struct { 676 + Id int 677 + UserDid string 678 + RepoDid string 679 + RecordNsid string 680 + RecordRkey string 681 + OldRepoAt string 682 + } 683 + 684 + func GetPendingPdsRewrites(e Execer, userDid string) ([]PdsRewrite, error) { 685 + rows, err := e.Query( 686 + `SELECT id, user_did, repo_did, record_nsid, record_rkey, old_repo_at 687 + FROM pds_rewrite_status 688 + WHERE user_did = ? AND status = 'pending'`, 689 + userDid, 690 + ) 691 + if err != nil { 692 + return nil, err 693 + } 694 + defer rows.Close() 695 + 696 + var rewrites []PdsRewrite 697 + for rows.Next() { 698 + var r PdsRewrite 699 + if err := rows.Scan(&r.Id, &r.UserDid, &r.RepoDid, &r.RecordNsid, &r.RecordRkey, &r.OldRepoAt); err != nil { 700 + return nil, err 701 + } 702 + rewrites = append(rewrites, r) 703 + } 704 + return rewrites, rows.Err() 705 + } 706 + 707 + func CompletePdsRewrite(e Execer, id int) error { 708 + _, err := e.Exec( 709 + `UPDATE pds_rewrite_status SET status = 'done', updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ?`, 710 + id, 711 + ) 712 + return err 713 + } 714 + 599 715 func EnqueuePdsRewrite(e Execer, userDid, repoDid, recordNsid, recordRkey, oldRepoAt string) error { 600 716 _, err := e.Exec( 601 - `INSERT OR IGNORE INTO pds_rewrite_status 717 + `INSERT INTO pds_rewrite_status 602 718 (user_did, repo_did, record_nsid, record_rkey, old_repo_at, status) 603 - VALUES (?, ?, ?, ?, ?, 'pending')`, 719 + VALUES (?, ?, ?, ?, ?, 'pending') 720 + ON CONFLICT(user_did, record_nsid, record_rkey) DO UPDATE SET 721 + status = 'pending', 722 + repo_did = excluded.repo_did, 723 + old_repo_at = excluded.old_repo_at, 724 + updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')`, 604 725 userDid, repoDid, recordNsid, recordRkey, oldRepoAt, 605 726 ) 606 727 return err 607 728 } 608 729 730 + func CascadeRepoDid(tx *sql.Tx, repoAtUri, repoDid string) error { 731 + updates := []struct{ table, column string }{ 732 + {"repos", "at_uri"}, 733 + {"issues", "repo_at"}, 734 + {"pulls", "repo_at"}, 735 + {"collaborators", "repo_at"}, 736 + {"artifacts", "repo_at"}, 737 + {"webhooks", "repo_at"}, 738 + {"pull_comments", "repo_at"}, 739 + {"repo_issue_seqs", "repo_at"}, 740 + {"repo_pull_seqs", "repo_at"}, 741 + {"repo_languages", "repo_at"}, 742 + {"repo_labels", "repo_at"}, 743 + {"profile_pinned_repositories", "at_uri"}, 744 + } 745 + 746 + for _, u := range updates { 747 + _, err := tx.Exec( 748 + fmt.Sprintf(`UPDATE %s SET repo_did = ? WHERE %s = ?`, u.table, u.column), 749 + repoDid, repoAtUri, 750 + ) 751 + if err != nil { 752 + return fmt.Errorf("cascade repo_did to %s: %w", u.table, err) 753 + } 754 + } 755 + 756 + _, err := tx.Exec( 757 + `UPDATE stars SET subject_did = ? WHERE subject_at = ?`, 758 + repoDid, repoAtUri, 759 + ) 760 + if err != nil { 761 + return fmt.Errorf("cascade subject_did to stars: %w", err) 762 + } 763 + 764 + _, err = tx.Exec( 765 + `UPDATE repos SET source = ? WHERE source = ?`, 766 + repoDid, repoAtUri, 767 + ) 768 + if err != nil { 769 + return fmt.Errorf("cascade repo_did to repos.source: %w", err) 770 + } 771 + 772 + return nil 773 + } 774 + 609 775 func UpdateDescription(e Execer, repoAt, newDescription string) error { 610 776 _, err := e.Exec( 611 777 `update repos set description = ? where at_uri = ?`, newDescription, repoAt)
+140
appview/oauth/handler.go
··· 13 13 14 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 15 "github.com/bluesky-social/indigo/atproto/auth/oauth" 16 + atpclient "github.com/bluesky-social/indigo/atproto/client" 16 17 lexutil "github.com/bluesky-social/indigo/lex/util" 17 18 "github.com/go-chi/chi/v5" 18 19 "github.com/posthog/posthog-go" ··· 91 92 go o.addToDefaultKnot(sessData.AccountDID.String()) 92 93 go o.addToDefaultSpindle(sessData.AccountDID.String()) 93 94 go o.ensureTangledProfile(sessData) 95 + go o.drainPdsRewrites(sessData) 94 96 95 97 if !o.Config.Core.Dev { 96 98 err = o.Posthog.Enqueue(posthog.Capture{ ··· 243 245 l.Debug("successfully created empty Tangled profile on PDS and DB") 244 246 } 245 247 248 + func (o *OAuth) drainPdsRewrites(sessData *oauth.ClientSessionData) { 249 + ctx := context.Background() 250 + did := sessData.AccountDID.String() 251 + l := o.Logger.With("did", did, "handler", "drainPdsRewrites") 252 + 253 + rewrites, err := db.GetPendingPdsRewrites(o.Db, did) 254 + if err != nil { 255 + l.Error("failed to get pending rewrites", "err", err) 256 + return 257 + } 258 + if len(rewrites) == 0 { 259 + return 260 + } 261 + 262 + l.Info("draining pending PDS rewrites", "count", len(rewrites)) 263 + 264 + sess, err := o.ClientApp.ResumeSession(ctx, sessData.AccountDID, sessData.SessionID) 265 + if err != nil { 266 + l.Error("failed to resume session for PDS rewrites", "err", err) 267 + return 268 + } 269 + client := sess.APIClient() 270 + 271 + for _, rw := range rewrites { 272 + if err := o.rewritePdsRecord(ctx, client, did, rw); err != nil { 273 + l.Error("failed to rewrite PDS record", 274 + "nsid", rw.RecordNsid, 275 + "rkey", rw.RecordRkey, 276 + "repo_did", rw.RepoDid, 277 + "err", err) 278 + continue 279 + } 280 + 281 + if err := db.CompletePdsRewrite(o.Db, rw.Id); err != nil { 282 + l.Error("failed to mark rewrite complete", "id", rw.Id, "err", err) 283 + } 284 + } 285 + } 286 + 287 + func (o *OAuth) rewritePdsRecord(ctx context.Context, client *atpclient.APIClient, userDid string, rw db.PdsRewrite) error { 288 + ex, err := comatproto.RepoGetRecord(ctx, client, "", rw.RecordNsid, userDid, rw.RecordRkey) 289 + if err != nil { 290 + return fmt.Errorf("get record: %w", err) 291 + } 292 + 293 + val := ex.Value.Val 294 + repoDid := rw.RepoDid 295 + 296 + switch rw.RecordNsid { 297 + case tangled.RepoNSID: 298 + rec, ok := val.(*tangled.Repo) 299 + if !ok { 300 + return fmt.Errorf("unexpected type for repo record") 301 + } 302 + rec.RepoDid = &repoDid 303 + 304 + case tangled.RepoIssueNSID: 305 + rec, ok := val.(*tangled.RepoIssue) 306 + if !ok { 307 + return fmt.Errorf("unexpected type for issue record") 308 + } 309 + rec.RepoDid = &repoDid 310 + rec.Repo = nil 311 + 312 + case tangled.RepoPullNSID: 313 + rec, ok := val.(*tangled.RepoPull) 314 + if !ok { 315 + return fmt.Errorf("unexpected type for pull record") 316 + } 317 + if rec.Target != nil { 318 + rec.Target.RepoDid = &repoDid 319 + rec.Target.Repo = nil 320 + } 321 + if rec.Source != nil && rec.Source.Repo != nil && *rec.Source.Repo == rw.OldRepoAt { 322 + rec.Source.RepoDid = &repoDid 323 + rec.Source.Repo = nil 324 + } 325 + 326 + case tangled.RepoCollaboratorNSID: 327 + rec, ok := val.(*tangled.RepoCollaborator) 328 + if !ok { 329 + return fmt.Errorf("unexpected type for collaborator record") 330 + } 331 + rec.RepoDid = &repoDid 332 + rec.Repo = nil 333 + 334 + case tangled.RepoArtifactNSID: 335 + rec, ok := val.(*tangled.RepoArtifact) 336 + if !ok { 337 + return fmt.Errorf("unexpected type for artifact record") 338 + } 339 + rec.RepoDid = &repoDid 340 + rec.Repo = nil 341 + 342 + case tangled.FeedStarNSID: 343 + rec, ok := val.(*tangled.FeedStar) 344 + if !ok { 345 + return fmt.Errorf("unexpected type for star record") 346 + } 347 + rec.SubjectDid = &repoDid 348 + rec.Subject = nil 349 + 350 + case tangled.ActorProfileNSID: 351 + rec, ok := val.(*tangled.ActorProfile) 352 + if !ok { 353 + return fmt.Errorf("unexpected type for profile record") 354 + } 355 + var dids []string 356 + var remaining []string 357 + for _, pinUri := range rec.PinnedRepositories { 358 + repo, repoErr := db.GetRepoByAtUri(o.Db, pinUri) 359 + if repoErr != nil || repo.RepoDid == "" { 360 + remaining = append(remaining, pinUri) 361 + continue 362 + } 363 + dids = append(dids, repo.RepoDid) 364 + } 365 + rec.PinnedRepositoryDids = append(rec.PinnedRepositoryDids, dids...) 366 + rec.PinnedRepositories = remaining 367 + 368 + default: 369 + return fmt.Errorf("unsupported NSID for PDS rewrite: %s", rw.RecordNsid) 370 + } 371 + 372 + _, err = comatproto.RepoPutRecord(ctx, client, &comatproto.RepoPutRecord_Input{ 373 + Collection: rw.RecordNsid, 374 + Repo: userDid, 375 + Rkey: rw.RecordRkey, 376 + SwapRecord: ex.Cid, 377 + Record: &lexutil.LexiconTypeDecoder{Val: val}, 378 + }) 379 + if err != nil { 380 + return fmt.Errorf("put record: %w", err) 381 + } 382 + 383 + return nil 384 + } 385 + 246 386 // create a AppPasswordSession using apppasswords 247 387 type AppPasswordSession struct { 248 388 AccessJwt string `json:"accessJwt"`
+85
appview/state/knotstream.go
··· 18 18 "tangled.org/core/appview/models" 19 19 ec "tangled.org/core/eventconsumer" 20 20 "tangled.org/core/eventconsumer/cursor" 21 + knotdb "tangled.org/core/knotserver/db" 21 22 "tangled.org/core/log" 22 23 "tangled.org/core/orm" 23 24 "tangled.org/core/rbac" ··· 72 73 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx) 73 74 case tangled.PipelineNSID: 74 75 return ingestPipeline(d, source, msg) 76 + case knotdb.RepoDIDAssignNSID: 77 + return ingestDIDAssign(d, enforcer, source, msg, ctx) 75 78 } 76 79 77 80 return nil ··· 307 310 308 311 return nil 309 312 } 313 + 314 + func ingestDIDAssign(d *db.DB, enforcer *rbac.Enforcer, source ec.Source, msg ec.Message, ctx context.Context) error { 315 + logger := log.FromContext(ctx) 316 + 317 + var record knotdb.RepoDIDAssign 318 + if err := json.Unmarshal(msg.EventJson, &record); err != nil { 319 + return fmt.Errorf("unmarshal didAssign: %w", err) 320 + } 321 + 322 + if record.RepoDid == "" || record.OwnerDid == "" || record.RepoName == "" { 323 + return fmt.Errorf("didAssign missing required fields: repoDid=%q ownerDid=%q repoName=%q", 324 + record.RepoDid, record.OwnerDid, record.RepoName) 325 + } 326 + 327 + logger.Info("processing didAssign event", 328 + "repo_did", record.RepoDid, 329 + "owner_did", record.OwnerDid, 330 + "repo_name", record.RepoName) 331 + 332 + repos, err := db.GetRepos(d, 1, 333 + orm.FilterEq("did", record.OwnerDid), 334 + orm.FilterEq("name", record.RepoName), 335 + ) 336 + if err != nil || len(repos) == 0 { 337 + logger.Warn("didAssign for unknown repo, skipping", 338 + "owner_did", record.OwnerDid, 339 + "repo_name", record.RepoName) 340 + return nil 341 + } 342 + repo := repos[0] 343 + repoAtUri := repo.RepoAt().String() 344 + knot := source.Key() 345 + legacyResource := record.OwnerDid + "/" + record.RepoName 346 + 347 + if repo.RepoDid != record.RepoDid { 348 + tx, err := d.Begin() 349 + if err != nil { 350 + return fmt.Errorf("begin didAssign txn: %w", err) 351 + } 352 + defer tx.Rollback() 353 + 354 + if err := db.CascadeRepoDid(tx, repoAtUri, record.RepoDid); err != nil { 355 + return fmt.Errorf("cascade repo_did: %w", err) 356 + } 357 + 358 + if err := db.EnqueuePdsRewritesForRepo(tx, record.RepoDid, repoAtUri); err != nil { 359 + return fmt.Errorf("enqueue pds rewrites: %w", err) 360 + } 361 + 362 + if err := tx.Commit(); err != nil { 363 + return fmt.Errorf("commit didAssign txn: %w", err) 364 + } 365 + } 366 + 367 + if err := enforcer.RemoveRepo(record.OwnerDid, knot, legacyResource); err != nil { 368 + return fmt.Errorf("remove legacy RBAC policies for %s: %w", legacyResource, err) 369 + } 370 + if err := enforcer.AddRepo(record.OwnerDid, knot, record.RepoDid); err != nil { 371 + return fmt.Errorf("add RBAC policies for %s: %w", record.RepoDid, err) 372 + } 373 + 374 + collabs, collabErr := db.GetCollaborators(d, orm.FilterEq("repo_at", repoAtUri)) 375 + if collabErr != nil { 376 + return fmt.Errorf("get collaborators for RBAC update: %w", collabErr) 377 + } 378 + for _, c := range collabs { 379 + collabDid := c.SubjectDid.String() 380 + if err := enforcer.RemoveCollaborator(collabDid, knot, legacyResource); err != nil { 381 + return fmt.Errorf("remove collaborator RBAC for %s: %w", collabDid, err) 382 + } 383 + if err := enforcer.AddCollaborator(collabDid, knot, record.RepoDid); err != nil { 384 + return fmt.Errorf("add collaborator RBAC for %s: %w", collabDid, err) 385 + } 386 + } 387 + 388 + logger.Info("didAssign processed successfully", 389 + "repo_did", record.RepoDid, 390 + "owner_did", record.OwnerDid, 391 + "repo_name", record.RepoName) 392 + 393 + return nil 394 + }
+10
knotserver/db/didassign.go
··· 1 + package db 2 + 3 + const RepoDIDAssignNSID = "sh.tangled.repo.didAssign" 4 + 5 + type RepoDIDAssign struct { 6 + OwnerDid string `json:"ownerDid"` 7 + RepoName string `json:"repoName"` 8 + RepoDid string `json:"repoDid"` 9 + OldRepoAt string `json:"oldRepoAt,omitempty"` 10 + }
+24
knotserver/db/events.go
··· 1 1 package db 2 2 3 3 import ( 4 + "encoding/json" 4 5 "fmt" 5 6 "time" 6 7 8 + "github.com/bluesky-social/indigo/atproto/syntax" 7 9 "tangled.org/core/notifier" 8 10 ) 9 11 12 + var tidClock = syntax.NewTIDClock(0) 13 + 10 14 type Event struct { 11 15 Rkey string `json:"rkey"` 12 16 Nsid string `json:"nsid"` ··· 29 33 return err 30 34 } 31 35 36 + func (d *DB) EmitDIDAssign(n *notifier.Notifier, ownerDid, repoName, repoDid, oldRepoAt string) error { 37 + payload := RepoDIDAssign{ 38 + OwnerDid: ownerDid, 39 + RepoName: repoName, 40 + RepoDid: repoDid, 41 + OldRepoAt: oldRepoAt, 42 + } 43 + 44 + eventJson, err := json.Marshal(payload) 45 + if err != nil { 46 + return fmt.Errorf("marshal didAssign event: %w", err) 47 + } 48 + 49 + return d.InsertEvent(Event{ 50 + Rkey: tidClock.Next().String(), 51 + Nsid: RepoDIDAssignNSID, 52 + EventJson: string(eventJson), 53 + }, n) 54 + } 55 + 32 56 func (d *DB) GetEvents(cursor int64) ([]Event, error) { 33 57 whereClause := "" 34 58 args := []any{}
+26 -14
knotserver/events.go
··· 54 54 cursor = defaultCursor 55 55 } 56 56 57 - // complete backfill first before going to live data 58 57 l.Debug("going through backfill", "cursor", cursor) 59 - if err := h.streamOps(conn, &cursor); err != nil { 58 + if err := h.drainBackfill(conn, &cursor, 10_000); err != nil { 60 59 l.Error("failed to backfill", "err", err) 61 60 return 62 61 } ··· 68 67 l.Debug("stopping stream: client closed connection") 69 68 return 70 69 case <-ch: 71 - // we have been notified of new data 72 70 l.Debug("going through live data", "cursor", cursor) 73 - if err := h.streamOps(conn, &cursor); err != nil { 71 + if _, err := h.streamOps(conn, &cursor); err != nil { 74 72 l.Error("failed to stream", "err", err) 75 73 return 76 74 } ··· 83 81 } 84 82 } 85 83 86 - func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) error { 84 + func (h *Knot) drainBackfill(conn *websocket.Conn, cursor *int64, maxBatches int) error { 85 + for range maxBatches { 86 + n, err := h.streamOps(conn, cursor) 87 + if err != nil { 88 + return err 89 + } 90 + if n < 100 { 91 + return nil 92 + } 93 + } 94 + h.l.Warn("backfill hit batch limit", "maxBatches", maxBatches, "cursor", *cursor) 95 + return nil 96 + } 97 + 98 + func (h *Knot) streamOps(conn *websocket.Conn, cursor *int64) (int, error) { 87 99 events, err := h.db.GetEvents(*cursor) 88 100 if err != nil { 89 101 h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor) 90 - return err 102 + return 0, err 91 103 } 92 104 93 105 for _, event := range events { 94 - // first extract the inner json into a map 95 106 var eventJson map[string]any 96 107 err := json.Unmarshal([]byte(event.EventJson), &eventJson) 97 108 if err != nil { 98 109 h.l.Error("failed to unmarshal event", "err", err) 99 - return err 110 + return 0, err 100 111 } 101 112 102 113 jsonMsg, err := json.Marshal(map[string]any{ 103 - "rkey": event.Rkey, 104 - "nsid": event.Nsid, 105 - "event": eventJson, 114 + "rkey": event.Rkey, 115 + "nsid": event.Nsid, 116 + "event": eventJson, 117 + "created": event.Created, 106 118 }) 107 119 if err != nil { 108 120 h.l.Error("failed to marshal record", "err", err) 109 - return err 121 + return 0, err 110 122 } 111 123 112 124 if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 113 125 h.l.Debug("err", "err", err) 114 - return err 126 + return 0, err 115 127 } 116 128 *cursor = event.Created 117 129 } 118 130 119 - return nil 131 + return len(events), nil 120 132 }
+15 -5
knotserver/git.go
··· 5 5 "fmt" 6 6 "io" 7 7 "net/http" 8 + "os" 9 + "path/filepath" 8 10 "strings" 9 11 12 + securejoin "github.com/cyphar/filepath-securejoin" 10 13 "github.com/go-chi/chi/v5" 11 14 "tangled.org/core/knotserver/git/service" 12 15 ) ··· 24 27 } 25 28 26 29 repoDid, err := h.db.GetRepoDid(did, name) 27 - if err != nil { 28 - return "", "", fmt.Errorf("repo not found: %w", err) 30 + if err == nil { 31 + repoPath, _, _, resolveErr := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 32 + if resolveErr == nil { 33 + return repoPath, name, nil 34 + } 29 35 } 30 - repoPath, _, _, err := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 31 - if err != nil { 32 - return "", "", fmt.Errorf("repo not found: %w", err) 36 + 37 + repoPath, joinErr := securejoin.SecureJoin(h.c.Repo.ScanPath, filepath.Join(did, name)) 38 + if joinErr != nil { 39 + return "", "", fmt.Errorf("repo not found: %w", joinErr) 40 + } 41 + if _, statErr := os.Stat(repoPath); statErr != nil { 42 + return "", "", fmt.Errorf("repo not found: %w", statErr) 33 43 } 34 44 return repoPath, name, nil 35 45 }
+28 -13
knotserver/internal.go
··· 6 6 "fmt" 7 7 "log/slog" 8 8 "net/http" 9 + "os" 9 10 "path/filepath" 10 11 "strings" 11 12 13 + securejoin "github.com/cyphar/filepath-securejoin" 12 14 "github.com/go-chi/chi/v5" 13 15 "github.com/go-chi/chi/v5/middleware" 14 16 "github.com/go-git/go-git/v5/plumbing" ··· 124 126 ownerDid := repoOwnerIdent.DID.String() 125 127 repoName := components[1] 126 128 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 127 - if didErr != nil { 128 - w.WriteHeader(http.StatusNotFound) 129 - l.Error("repo DID not found", "owner", ownerDid, "name", repoName, "err", didErr) 130 - fmt.Fprintln(w, "repo not found") 131 - return 132 - } 133 - repoPath, _, _, lookupErr := h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 134 - if lookupErr != nil { 135 - w.WriteHeader(http.StatusNotFound) 136 - l.Error("repo not found on disk", "repoDid", repoDid, "err", lookupErr) 137 - fmt.Fprintln(w, "repo not found") 138 - return 129 + var repoPath string 130 + if didErr == nil { 131 + var lookupErr error 132 + repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 133 + if lookupErr != nil { 134 + w.WriteHeader(http.StatusNotFound) 135 + l.Error("repo not found on disk", "repoDid", repoDid, "err", lookupErr) 136 + fmt.Fprintln(w, "repo not found") 137 + return 138 + } 139 + rbacResource = repoDid 140 + } else { 141 + legacyPath, joinErr := securejoin.SecureJoin(h.c.Repo.ScanPath, filepath.Join(ownerDid, repoName)) 142 + if joinErr != nil { 143 + w.WriteHeader(http.StatusNotFound) 144 + fmt.Fprintln(w, "repo not found") 145 + return 146 + } 147 + if _, statErr := os.Stat(legacyPath); statErr != nil { 148 + w.WriteHeader(http.StatusNotFound) 149 + l.Error("repo not found on disk (legacy)", "owner", ownerDid, "name", repoName) 150 + fmt.Fprintln(w, "repo not found") 151 + return 152 + } 153 + repoPath = legacyPath 154 + rbacResource = ownerDid + "/" + repoName 139 155 } 140 - rbacResource = repoDid 141 156 rel, relErr := filepath.Rel(h.c.Repo.ScanPath, repoPath) 142 157 if relErr != nil { 143 158 w.WriteHeader(http.StatusInternalServerError)
+28 -4
knotserver/xrpc/xrpc.go
··· 4 4 "encoding/json" 5 5 "log/slog" 6 6 "net/http" 7 + "os" 8 + "path/filepath" 7 9 "strings" 8 10 11 + securejoin "github.com/cyphar/filepath-securejoin" 12 + "github.com/go-chi/chi/v5" 9 13 "tangled.org/core/api/tangled" 10 14 "tangled.org/core/idresolver" 11 15 "tangled.org/core/jetstream" ··· 15 19 "tangled.org/core/rbac" 16 20 xrpcerr "tangled.org/core/xrpc/errors" 17 21 "tangled.org/core/xrpc/serviceauth" 18 - 19 - "github.com/go-chi/chi/v5" 20 22 ) 21 23 22 24 type Xrpc struct { ··· 85 87 ) 86 88 } 87 89 88 - repoPath, _, _, err := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repo) 89 - if err != nil { 90 + if !strings.Contains(repo, "/") { 91 + repoPath, _, _, err := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repo) 92 + if err != nil { 93 + return "", xrpcerr.RepoNotFoundError 94 + } 95 + return repoPath, nil 96 + } 97 + 98 + parts := strings.SplitN(repo, "/", 2) 99 + ownerDid, repoName := parts[0], parts[1] 100 + 101 + repoDid, err := x.Db.GetRepoDid(ownerDid, repoName) 102 + if err == nil { 103 + repoPath, _, _, resolveErr := x.Db.ResolveRepoDIDOnDisk(x.Config.Repo.ScanPath, repoDid) 104 + if resolveErr == nil { 105 + return repoPath, nil 106 + } 107 + } 108 + 109 + repoPath, joinErr := securejoin.SecureJoin(x.Config.Repo.ScanPath, filepath.Join(ownerDid, repoName)) 110 + if joinErr != nil { 111 + return "", xrpcerr.RepoNotFoundError 112 + } 113 + if _, statErr := os.Stat(repoPath); statErr != nil { 90 114 return "", xrpcerr.RepoNotFoundError 91 115 } 92 116 return repoPath, nil
+2 -2
rbac/rbac.go
··· 34 34 ) 35 35 36 36 type Enforcer struct { 37 - E *casbin.Enforcer 37 + E *casbin.SyncedEnforcer 38 38 } 39 39 40 40 func NewEnforcer(path string) (*Enforcer, error) { ··· 53 53 return nil, err 54 54 } 55 55 56 - e, err := casbin.NewEnforcer(m, a) 56 + e, err := casbin.NewSyncedEnforcer(m, a) 57 57 if err != nil { 58 58 return nil, err 59 59 }
+1 -1
rbac/rbac_test.go
··· 23 23 m, err := model.NewModelFromString(rbac.Model) 24 24 assert.NoError(t, err) 25 25 26 - e, err := casbin.NewEnforcer(m, a) 26 + e, err := casbin.NewSyncedEnforcer(m, a) 27 27 assert.NoError(t, err) 28 28 29 29 e.EnableAutoSave(false)

History

12 rounds 1 comment
sign up or login to add to the discussion
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
merge conflicts detected
expand
  • go.mod:34
  • go.sum:339
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 1 comment

everything else lgtm!

1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments
1 commit
expand
appview, knotserver: add didAssign event and PDS rewrite on login
expand 0 comments