Monorepo for Tangled tangled.org

appview: update state, ingester, middleware, and resolver for repo DID #1140

open opened by oyster.cafe targeting master from oyster.cafe/tangled-core: master
Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mgprvt2eon22
+474 -254
Diff #13
+74 -18
appview/ingester.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "database/sql" 5 6 "encoding/json" 7 + "errors" 6 8 "fmt" 7 9 "log/slog" 8 10 "maps" ··· 116 118 return err 117 119 } 118 120 119 - subjectUri, err = syntax.ParseATURI(record.Subject) 120 - if err != nil { 121 - l.Error("invalid record", "err", err) 122 - return err 121 + star := &models.Star{ 122 + Did: did, 123 + Rkey: e.Commit.RKey, 123 124 } 124 - err = db.AddStar(i.Db, &models.Star{ 125 - Did: did, 126 - RepoAt: subjectUri, 127 - Rkey: e.Commit.RKey, 128 - }) 125 + 126 + switch { 127 + case record.SubjectDid != nil: 128 + repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 129 + if repoErr == nil { 130 + subjectUri = repo.RepoAt() 131 + star.RepoAt = subjectUri 132 + } 133 + case record.Subject != nil: 134 + subjectUri, err = syntax.ParseATURI(*record.Subject) 135 + if err != nil { 136 + l.Error("invalid record", "err", err) 137 + return err 138 + } 139 + star.RepoAt = subjectUri 140 + repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 141 + if repoErr == nil && repo.RepoDid != "" { 142 + if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 143 + l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 144 + } 145 + } 146 + default: 147 + l.Error("star record has neither subject nor subjectDid") 148 + return fmt.Errorf("star record has neither subject nor subjectDid") 149 + } 150 + err = db.AddStar(i.Db, star) 129 151 case jmodels.CommitOperationDelete: 130 152 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 131 153 } ··· 220 242 return err 221 243 } 222 244 223 - repoAt, err := syntax.ParseATURI(record.Repo) 224 - if err != nil { 225 - return err 245 + var repo *models.Repo 246 + if record.RepoDid != nil && *record.RepoDid != "" { 247 + repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 248 + if err != nil && !errors.Is(err, sql.ErrNoRows) { 249 + return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 250 + } 226 251 } 227 - 228 - repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 229 - if err != nil { 230 - return err 252 + if repo == nil && record.Repo != nil { 253 + repoAt, parseErr := syntax.ParseATURI(*record.Repo) 254 + if parseErr != nil { 255 + return parseErr 256 + } 257 + repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 258 + if err != nil { 259 + return err 260 + } 261 + } 262 + if repo == nil { 263 + return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 231 264 } 232 265 233 - ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 266 + ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 234 267 if err != nil || !ok { 235 268 return err 236 269 } 237 270 271 + repoDid := repo.RepoDid 272 + if repoDid == "" && record.RepoDid != nil { 273 + repoDid = *record.RepoDid 274 + } 275 + if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 276 + if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 277 + l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 278 + } 279 + } 280 + 238 281 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 239 282 if err != nil { 240 283 createdAt = time.Now() ··· 243 286 artifact := models.Artifact{ 244 287 Did: did, 245 288 Rkey: e.Commit.RKey, 246 - RepoAt: repoAt, 289 + RepoAt: repo.RepoAt(), 247 290 Tag: plumbing.Hash(record.Tag), 248 291 CreatedAt: createdAt, 249 292 BlobCid: cid.Cid(record.Artifact.Ref), ··· 822 865 823 866 issue := models.IssueFromRecord(did, rkey, record) 824 867 868 + if issue.RepoAt == "" { 869 + return fmt.Errorf("issue record has no repo field") 870 + } 871 + 825 872 if err := i.Validator.ValidateIssue(&issue); err != nil { 826 873 return fmt.Errorf("failed to validate issue: %w", err) 827 874 } 828 875 876 + if record.Repo != nil { 877 + repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 878 + if repoErr == nil && repo.RepoDid != "" { 879 + if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 880 + l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 881 + } 882 + } 883 + } 884 + 829 885 tx, err := ddb.BeginTx(ctx, nil) 830 886 if err != nil { 831 887 l.Error("failed to begin transaction", "err", err)
+2 -2
appview/issues/issues.go
··· 309 309 return 310 310 } 311 311 312 - roles := repoinfo.RolesInRepo{Roles: rp.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.DidSlashRepo())} 312 + roles := repoinfo.RolesInRepo{Roles: rp.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.RepoIdentifier())} 313 313 isRepoOwner := roles.IsOwner() 314 314 isCollaborator := roles.IsCollaborator() 315 315 isIssueOwner := user.Active.Did == issue.Did ··· 357 357 return 358 358 } 359 359 360 - roles := repoinfo.RolesInRepo{Roles: rp.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.DidSlashRepo())} 360 + roles := repoinfo.RolesInRepo{Roles: rp.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.RepoIdentifier())} 361 361 isRepoOwner := roles.IsOwner() 362 362 isCollaborator := roles.IsCollaborator() 363 363 isIssueOwner := user.Active.Did == issue.Did
+9 -5
appview/middleware/middleware.go
··· 17 17 "tangled.org/core/appview/pages" 18 18 "tangled.org/core/appview/pagination" 19 19 "tangled.org/core/appview/reporesolver" 20 + "tangled.org/core/appview/state/userutil" 20 21 "tangled.org/core/idresolver" 21 22 "tangled.org/core/orm" 22 23 "tangled.org/core/rbac" ··· 161 162 return 162 163 } 163 164 164 - ok, err := mw.enforcer.E.Enforce(actor.Active.Did, f.Knot, f.DidSlashRepo(), requiredPerm) 165 + ok, err := mw.enforcer.E.Enforce(actor.Active.Did, f.Knot, f.RepoIdentifier(), requiredPerm) 165 166 if err != nil || !ok { 166 - log.Printf("%s does not have perms of a %s in repo %s", actor.Active.Did, requiredPerm, f.DidSlashRepo()) 167 + log.Printf("%s does not have perms of a %s in repo %s", actor.Active.Did, requiredPerm, f.RepoIdentifier()) 167 168 http.Error(w, "Forbiden", http.StatusUnauthorized) 168 169 return 169 170 } ··· 188 189 189 190 id, err := mw.idResolver.ResolveIdent(req.Context(), didOrHandle) 190 191 if err != nil { 191 - // invalid did or handle 192 192 log.Printf("failed to resolve did/handle '%s': %s\n", didOrHandle, err) 193 193 mw.pages.Error404(w) 194 194 return ··· 334 334 335 335 if r.Header.Get("User-Agent") == "Go-http-client/1.1" { 336 336 if r.URL.Query().Get("go-get") == "1" { 337 + modulePath := userutil.FlattenDid(fullName) 338 + if strings.Contains(modulePath, ":") { 339 + modulePath = userutil.FlattenDid(f.Did) + "/" + f.Name 340 + } 337 341 html := fmt.Sprintf( 338 342 `<meta name="go-import" content="tangled.sh/%s git https://tangled.sh/%s"/> 339 343 <meta name="go-import" content="tangled.org/%s git https://tangled.org/%s"/>`, 340 - fullName, fullName, 341 - fullName, fullName, 344 + modulePath, fullName, 345 + modulePath, fullName, 342 346 ) 343 347 w.Header().Set("Content-Type", "text/html") 344 348 w.Write([]byte(html))
+5 -1
appview/models/issue.go
··· 45 45 references[i] = string(uri) 46 46 } 47 47 repoAtStr := i.RepoAt.String() 48 - return tangled.RepoIssue{ 48 + rec := tangled.RepoIssue{ 49 49 Repo: &repoAtStr, 50 50 Title: i.Title, 51 51 Body: &i.Body, ··· 53 53 References: references, 54 54 CreatedAt: i.Created.Format(time.RFC3339), 55 55 } 56 + if i.Repo != nil && i.Repo.RepoDid != "" { 57 + rec.RepoDid = &i.Repo.RepoDid 58 + } 59 + return rec 56 60 } 57 61 58 62 func (i *Issue) State() string {
+26 -20
appview/pulls/pulls.go
··· 409 409 } 410 410 411 411 // user can only delete branch if they are a collaborator in the repo that the branch belongs to 412 - perms := s.enforcer.GetPermissionsInRepo(user.Active.Did, repo.Knot, repo.DidSlashRepo()) 412 + perms := s.enforcer.GetPermissionsInRepo(user.Active.Did, repo.Knot, repo.RepoIdentifier()) 413 413 if !slices.Contains(perms, "repo:push") { 414 414 return nil 415 415 } ··· 433 433 434 434 var sourceRepo syntax.ATURI 435 435 if pull.PullSource.RepoAt != nil { 436 - // fork-based pulls 437 436 sourceRepo = *pull.PullSource.RepoAt 438 437 } else { 439 - // pulls within the same repo 440 438 sourceRepo = repo.RepoAt() 441 439 } 442 440 ··· 930 928 } 931 929 932 930 // Determine PR type based on input parameters 933 - roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.DidSlashRepo())} 931 + roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.RepoIdentifier())} 934 932 isPushAllowed := roles.IsPushAllowed() 935 933 isBranchBased := isPushAllowed && sourceBranch != "" && fromFork == "" 936 934 isForkBased := fromFork != "" && sourceBranch != "" ··· 1046 1044 Host: host, 1047 1045 } 1048 1046 1049 - didSlashRepo := fmt.Sprintf("%s/%s", repo.Did, repo.Name) 1050 - xrpcBytes, err := tangled.RepoCompare(r.Context(), xrpcc, didSlashRepo, targetBranch, sourceBranch) 1047 + xrpcBytes, err := tangled.RepoCompare(r.Context(), xrpcc, repo.RepoIdentifier(), targetBranch, sourceBranch) 1051 1048 if err != nil { 1052 1049 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1053 1050 log.Println("failed to call XRPC repo.compare", xrpcerr) ··· 1156 1153 Host: forkHost, 1157 1154 } 1158 1155 1159 - forkRepoId := fmt.Sprintf("%s/%s", fork.Did, fork.Name) 1160 - forkXrpcBytes, err := tangled.RepoCompare(r.Context(), forkXrpcc, forkRepoId, hiddenRef, sourceBranch) 1156 + forkXrpcBytes, err := tangled.RepoCompare(r.Context(), forkXrpcc, fork.RepoIdentifier(), hiddenRef, sourceBranch) 1161 1157 if err != nil { 1162 1158 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1163 1159 log.Println("failed to call XRPC repo.compare for fork", xrpcerr) ··· 1198 1194 Repo: &forkAtUriStr, 1199 1195 Sha: sourceRev, 1200 1196 } 1197 + if fork.RepoDid != "" { 1198 + recordPullSource.RepoDid = &fork.RepoDid 1199 + } 1201 1200 1202 1201 s.createPullRequest(w, r, repo, user, title, body, targetBranch, patch, combined, sourceRev, pullSource, recordPullSource, isStacked) 1203 1202 } ··· 1314 1313 Rkey: rkey, 1315 1314 Record: &lexutil.LexiconTypeDecoder{ 1316 1315 Val: &tangled.RepoPull{ 1317 - Title: title, 1318 - Target: &tangled.RepoPull_Target{ 1319 - Repo: string(repo.RepoAt()), 1320 - Branch: targetBranch, 1321 - }, 1316 + Title: title, 1317 + Target: repoPullTarget(repo, targetBranch), 1322 1318 PatchBlob: blob.Blob, 1323 1319 Source: recordPullSource, 1324 1320 CreatedAt: time.Now().Format(time.RFC3339), ··· 1708 1704 return 1709 1705 } 1710 1706 1711 - roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.DidSlashRepo())} 1707 + roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.RepoIdentifier())} 1712 1708 if !roles.IsPushAllowed() { 1713 1709 log.Println("unauthorized user") 1714 1710 w.WriteHeader(http.StatusUnauthorized) ··· 1724 1720 Host: host, 1725 1721 } 1726 1722 1727 - repo := fmt.Sprintf("%s/%s", f.Did, f.Name) 1728 - xrpcBytes, err := tangled.RepoCompare(r.Context(), xrpcc, repo, pull.TargetBranch, pull.PullSource.Branch) 1723 + xrpcBytes, err := tangled.RepoCompare(r.Context(), xrpcc, f.RepoIdentifier(), pull.TargetBranch, pull.PullSource.Branch) 1729 1724 if err != nil { 1730 1725 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1731 1726 log.Println("failed to call XRPC repo.compare", xrpcerr) ··· 1818 1813 forkScheme = "https" 1819 1814 } 1820 1815 forkHost := fmt.Sprintf("%s://%s", forkScheme, forkRepo.Knot) 1821 - forkRepoId := fmt.Sprintf("%s/%s", forkRepo.Did, forkRepo.Name) 1822 - forkXrpcBytes, err := tangled.RepoCompare(r.Context(), &indigoxrpc.Client{Host: forkHost}, forkRepoId, hiddenRef, pull.PullSource.Branch) 1816 + forkXrpcBytes, err := tangled.RepoCompare(r.Context(), &indigoxrpc.Client{Host: forkHost}, forkRepo.RepoIdentifier(), hiddenRef, pull.PullSource.Branch) 1823 1817 if err != nil { 1824 1818 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1825 1819 log.Println("failed to call XRPC repo.compare for fork", xrpcerr) ··· 2297 2291 } 2298 2292 2299 2293 // auth filter: only owner or collaborators can close 2300 - roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.DidSlashRepo())} 2294 + roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.RepoIdentifier())} 2301 2295 isOwner := roles.IsOwner() 2302 2296 isCollaborator := roles.IsCollaborator() 2303 2297 isPullAuthor := user.Active.Did == pull.OwnerDid ··· 2371 2365 } 2372 2366 2373 2367 // auth filter: only owner or collaborators can close 2374 - roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.DidSlashRepo())} 2368 + roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.RepoIdentifier())} 2375 2369 isOwner := roles.IsOwner() 2376 2370 isCollaborator := roles.IsCollaborator() 2377 2371 isPullAuthor := user.Active.Did == pull.OwnerDid ··· 2496 2490 } 2497 2491 2498 2492 func ptrPullState(s models.PullState) *models.PullState { return &s } 2493 + 2494 + func repoPullTarget(repo *models.Repo, branch string) *tangled.RepoPull_Target { 2495 + s := string(repo.RepoAt()) 2496 + t := &tangled.RepoPull_Target{ 2497 + Branch: branch, 2498 + Repo: &s, 2499 + } 2500 + if repo.RepoDid != "" { 2501 + t.RepoDid = &repo.RepoDid 2502 + } 2503 + return t 2504 + }
+1 -1
appview/repo/archive.go
··· 60 60 if link := resp.Header.Get("Link"); link != "" { 61 61 if resolvedRef, err := extractImmutableLink(link); err == nil { 62 62 newLink := fmt.Sprintf("<%s/%s/archive/%s.tar.gz>; rel=\"immutable\"", 63 - rp.config.Core.BaseUrl(), f.DidSlashRepo(), resolvedRef) 63 + rp.config.Core.BaseUrl(), f.RepoIdentifier(), resolvedRef) 64 64 w.Header().Set("Link", newLink) 65 65 } 66 66 }
+16 -7
appview/repo/artifact.go
··· 80 80 Repo: user.Active.Did, 81 81 Rkey: rkey, 82 82 Record: &lexutil.LexiconTypeDecoder{ 83 - Val: &tangled.RepoArtifact{ 84 - Artifact: uploadBlobResp.Blob, 85 - CreatedAt: createdAt.Format(time.RFC3339), 86 - Name: header.Filename, 87 - Repo: f.RepoAt().String(), 88 - Tag: tag.Tag.Hash[:], 89 - }, 83 + Val: repoArtifactRecord(f, uploadBlobResp.Blob, createdAt, header.Filename, tag.Tag.Hash[:]), 90 84 }, 91 85 }) 92 86 if err != nil { ··· 350 344 351 345 return tag, nil 352 346 } 347 + 348 + func repoArtifactRecord(f *models.Repo, blob *lexutil.LexBlob, createdAt time.Time, name string, tag []byte) *tangled.RepoArtifact { 349 + rec := &tangled.RepoArtifact{ 350 + Artifact: blob, 351 + CreatedAt: createdAt.Format(time.RFC3339), 352 + Name: name, 353 + Tag: tag, 354 + } 355 + s := f.RepoAt().String() 356 + rec.Repo = &s 357 + if f.RepoDid != "" { 358 + rec.RepoDid = &f.RepoDid 359 + } 360 + return rec 361 + }
+3 -4
appview/repo/blob.go
··· 58 58 xrpcc := &indigoxrpc.Client{ 59 59 Host: host, 60 60 } 61 - repo := fmt.Sprintf("%s/%s", f.Did, f.Name) 62 - resp, err := tangled.RepoBlob(r.Context(), xrpcc, filePath, false, ref, repo) 61 + resp, err := tangled.RepoBlob(r.Context(), xrpcc, filePath, false, ref, f.RepoIdentifier()) 63 62 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 64 63 l.Error("failed to call XRPC repo.blob", "err", xrpcerr) 65 64 rp.pages.Error503(w) ··· 139 138 if !rp.config.Core.Dev { 140 139 scheme = "https" 141 140 } 142 - repo := f.DidSlashRepo() 141 + repo := f.RepoIdentifier() 143 142 baseURL := &url.URL{ 144 143 Scheme: scheme, 145 144 Host: f.Knot, ··· 290 289 scheme = "https" 291 290 } 292 291 293 - repoName := fmt.Sprintf("%s/%s", repo.Did, repo.Name) 292 + repoName := repo.RepoIdentifier() 294 293 baseURL := &url.URL{ 295 294 Scheme: scheme, 296 295 Host: repo.Knot,
+4 -4
appview/repo/compare.go
··· 141 141 Host: host, 142 142 } 143 143 144 - repo := fmt.Sprintf("%s/%s", f.Did, f.Name) 144 + repoId := f.RepoIdentifier() 145 145 146 - branchBytes, err := tangled.RepoBranches(r.Context(), xrpcc, "", 0, repo) 146 + branchBytes, err := tangled.RepoBranches(r.Context(), xrpcc, "", 0, repoId) 147 147 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 148 148 l.Error("failed to call XRPC repo.branches", "err", xrpcerr) 149 149 rp.pages.Error503(w) ··· 157 157 return 158 158 } 159 159 160 - tagBytes, err := tangled.RepoTags(r.Context(), xrpcc, "", 0, repo) 160 + tagBytes, err := tangled.RepoTags(r.Context(), xrpcc, "", 0, repoId) 161 161 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 162 162 l.Error("failed to call XRPC repo.tags", "err", xrpcerr) 163 163 rp.pages.Error503(w) ··· 171 171 return 172 172 } 173 173 174 - compareBytes, err := tangled.RepoCompare(r.Context(), xrpcc, repo, base, head) 174 + compareBytes, err := tangled.RepoCompare(r.Context(), xrpcc, repoId, base, head) 175 175 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 176 176 l.Error("failed to call XRPC repo.compare", "err", xrpcerr) 177 177 rp.pages.Error503(w)
-1
appview/repo/index.go
··· 239 239 func (rp *Repo) buildIndexResponse(ctx context.Context, repo *models.Repo, ref string) (*types.RepoIndexResponse, error) { 240 240 xrpcc := &indigoxrpc.Client{Host: rp.config.KnotMirror.Url} 241 241 242 - // first get branches to determine the ref if not specified 243 242 branchesBytes, err := tangled.GitTempListBranches(ctx, xrpcc, "", 0, repo.RepoAt().String()) 244 243 if err != nil { 245 244 return nil, fmt.Errorf("calling knotmirror git.listBranches: %w", err)
+1 -2
appview/repo/log.go
··· 164 164 Host: host, 165 165 } 166 166 167 - repo := fmt.Sprintf("%s/%s", f.Did, f.Name) 168 - xrpcBytes, err := tangled.RepoDiff(r.Context(), xrpcc, ref, repo) 167 + xrpcBytes, err := tangled.RepoDiff(r.Context(), xrpcc, ref, f.RepoIdentifier()) 169 168 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 170 169 l.Error("failed to call XRPC repo.diff", "err", xrpcerr) 171 170 rp.pages.Error503(w)
+120 -55
appview/repo/repo.go
··· 36 36 "github.com/bluesky-social/indigo/atproto/atclient" 37 37 "github.com/bluesky-social/indigo/atproto/syntax" 38 38 lexutil "github.com/bluesky-social/indigo/lex/util" 39 - securejoin "github.com/cyphar/filepath-securejoin" 39 + 40 40 "github.com/go-chi/chi/v5" 41 41 ) 42 42 ··· 318 318 return 319 319 } 320 320 321 - err = db.SubscribeLabel(tx, &models.RepoLabel{ 321 + if err = db.SubscribeLabel(tx, &models.RepoLabel{ 322 322 RepoAt: f.RepoAt(), 323 323 LabelAt: label.AtUri(), 324 - }) 324 + }); err != nil { 325 + fail("Failed to subscribe to label.", err) 326 + return 327 + } 325 328 326 329 err = tx.Commit() 327 330 if err != nil { ··· 755 758 Repo: currentUser.Active.Did, 756 759 Rkey: rkey, 757 760 Record: &lexutil.LexiconTypeDecoder{ 758 - Val: &tangled.RepoCollaborator{ 759 - Subject: collaboratorIdent.DID.String(), 760 - Repo: string(f.RepoAt()), 761 - CreatedAt: createdAt.Format(time.RFC3339), 762 - }}, 761 + Val: repoCollaboratorRecord(f, collaboratorIdent.DID.String(), createdAt), 762 + }, 763 763 }) 764 764 // invalid record 765 765 if err != nil { ··· 794 794 } 795 795 defer rollback() 796 796 797 - err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo()) 797 + err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.RepoIdentifier()) 798 798 if err != nil { 799 799 fail("Failed to add collaborator permissions.", err) 800 800 return ··· 900 900 }() 901 901 902 902 // remove collaborator RBAC 903 - repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot) 903 + repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.RepoIdentifier(), f.Knot) 904 904 if err != nil { 905 905 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 906 906 return 907 907 } 908 908 for _, c := range repoCollaborators { 909 909 did := c[0] 910 - rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo()) 910 + rp.enforcer.RemoveCollaborator(did, f.Knot, f.RepoIdentifier()) 911 911 } 912 912 l.Info("removed collaborators") 913 913 914 914 // remove repo RBAC 915 - err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo()) 915 + err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.RepoIdentifier()) 916 916 if err != nil { 917 917 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 918 918 return ··· 1067 1067 uri = "http" 1068 1068 } 1069 1069 1070 - forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name) 1070 + forkSourceUrl := fmt.Sprintf("%s://%s/%s", uri, f.Knot, f.RepoIdentifier()) 1071 1071 l = l.With("cloneUrl", forkSourceUrl) 1072 1072 1073 - sourceAt := f.RepoAt().String() 1074 - 1075 - // create an atproto record for this fork 1076 1073 rkey := tid.TID() 1074 + 1075 + // TODO: this could coordinate better with the knot to recieve a clone status 1076 + client, err := rp.oauth.ServiceClient( 1077 + r, 1078 + oauth.WithService(targetKnot), 1079 + oauth.WithLxm(tangled.RepoCreateNSID), 1080 + oauth.WithDev(rp.config.Core.Dev), 1081 + oauth.WithTimeout(time.Second*20), 1082 + ) 1083 + if err != nil { 1084 + l.Error("could not create service client", "err", err) 1085 + rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1086 + return 1087 + } 1088 + 1089 + forkInput := &tangled.RepoCreate_Input{ 1090 + Rkey: rkey, 1091 + Name: forkName, 1092 + Source: &forkSourceUrl, 1093 + } 1094 + createResp, createErr := tangled.RepoCreate( 1095 + r.Context(), 1096 + client, 1097 + forkInput, 1098 + ) 1099 + if err := xrpcclient.HandleXrpcErr(createErr); err != nil { 1100 + rp.pages.Notice(w, "repo", err.Error()) 1101 + return 1102 + } 1103 + 1104 + var repoDid string 1105 + if createResp != nil && createResp.RepoDid != nil { 1106 + repoDid = *createResp.RepoDid 1107 + } 1108 + if repoDid == "" { 1109 + l.Error("knot returned empty repo DID for fork") 1110 + rp.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 1111 + return 1112 + } 1113 + 1114 + forkSource := f.RepoAt().String() 1115 + if f.RepoDid != "" { 1116 + forkSource = f.RepoDid 1117 + } 1118 + 1077 1119 repo := &models.Repo{ 1078 1120 Did: user.Active.Did, 1079 1121 Name: forkName, 1080 1122 Knot: targetKnot, 1081 1123 Rkey: rkey, 1082 - Source: sourceAt, 1124 + Source: forkSource, 1083 1125 Description: f.Description, 1084 1126 Created: time.Now(), 1085 1127 Labels: rp.config.Label.DefaultLabelDefs, 1128 + RepoDid: repoDid, 1086 1129 } 1087 1130 record := repo.AsRecord() 1088 1131 1132 + cleanupKnot := func() { 1133 + go func() { 1134 + delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 1135 + for attempt, delay := range delays { 1136 + time.Sleep(delay) 1137 + deleteClient, dErr := rp.oauth.ServiceClient( 1138 + r, 1139 + oauth.WithService(targetKnot), 1140 + oauth.WithLxm(tangled.RepoDeleteNSID), 1141 + oauth.WithDev(rp.config.Core.Dev), 1142 + ) 1143 + if dErr != nil { 1144 + l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 1145 + continue 1146 + } 1147 + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 1148 + if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 1149 + Did: user.Active.Did, 1150 + Name: forkName, 1151 + Rkey: rkey, 1152 + }); dErr != nil { 1153 + cancel() 1154 + l.Error("failed to clean up fork on knot after rollback", "attempt", attempt+1, "err", dErr) 1155 + continue 1156 + } 1157 + cancel() 1158 + l.Info("successfully cleaned up fork on knot after rollback", "attempt", attempt+1) 1159 + return 1160 + } 1161 + l.Error("exhausted retries for knot cleanup, fork may be orphaned", 1162 + "did", user.Active.Did, "fork", forkName, "knot", targetKnot) 1163 + }() 1164 + } 1165 + 1089 1166 atpClient, err := rp.oauth.AuthorizedClient(r) 1090 1167 if err != nil { 1091 1168 l.Error("failed to create xrpcclient", "err", err) 1169 + cleanupKnot() 1092 1170 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1093 1171 return 1094 1172 } ··· 1103 1181 }) 1104 1182 if err != nil { 1105 1183 l.Error("failed to write to PDS", "err", err) 1184 + cleanupKnot() 1106 1185 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1107 1186 return 1108 1187 } ··· 1118 1197 return 1119 1198 } 1120 1199 1121 - // The rollback function reverts a few things on failure: 1122 - // - the pending txn 1123 - // - the ACLs 1124 - // - the atproto record created 1125 1200 rollback := func() { 1126 1201 err1 := tx.Rollback() 1127 1202 err2 := rp.enforcer.E.LoadPolicy() 1128 1203 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1129 1204 1130 - // ignore txn complete errors, this is okay 1131 1205 if errors.Is(err1, sql.ErrTxDone) { 1132 1206 err1 = nil 1133 1207 } 1134 1208 1135 1209 if errs := errors.Join(err1, err2, err3); errs != nil { 1136 1210 l.Error("failed to rollback changes", "errs", errs) 1137 - return 1138 1211 } 1139 - } 1140 - defer rollback() 1141 1212 1142 - // TODO: this could coordinate better with the knot to recieve a clone status 1143 - client, err := rp.oauth.ServiceClient( 1144 - r, 1145 - oauth.WithService(targetKnot), 1146 - oauth.WithLxm(tangled.RepoCreateNSID), 1147 - oauth.WithDev(rp.config.Core.Dev), 1148 - oauth.WithTimeout(time.Second*20), // big repos take time to clone 1149 - ) 1150 - if err != nil { 1151 - l.Error("could not create service client", "err", err) 1152 - rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1153 - return 1154 - } 1155 - 1156 - err = tangled.RepoCreate( 1157 - r.Context(), 1158 - client, 1159 - &tangled.RepoCreate_Input{ 1160 - Rkey: rkey, 1161 - Source: &forkSourceUrl, 1162 - }, 1163 - ) 1164 - if err := xrpcclient.HandleXrpcErr(err); err != nil { 1165 - rp.pages.Notice(w, "repo", err.Error()) 1166 - return 1213 + if aturi != "" { 1214 + cleanupKnot() 1215 + } 1167 1216 } 1217 + defer rollback() 1168 1218 1169 1219 err = db.AddRepo(tx, repo) 1170 1220 if err != nil { ··· 1173 1223 return 1174 1224 } 1175 1225 1176 - // acls 1177 - p, _ := securejoin.SecureJoin(user.Active.Did, forkName) 1178 - err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, p) 1226 + rbacPath := repo.RepoIdentifier() 1227 + err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, rbacPath) 1179 1228 if err != nil { 1180 1229 l.Error("failed to add ACLs", "err", err) 1181 1230 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") ··· 1196 1245 return 1197 1246 } 1198 1247 1199 - // reset the ATURI because the transaction completed successfully 1200 1248 aturi = "" 1201 1249 1202 1250 rp.notifier.NewRepo(r.Context(), repo) 1203 - rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName)) 1251 + if repoDid != "" { 1252 + rp.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 1253 + } else { 1254 + rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName)) 1255 + } 1204 1256 } 1205 1257 } 1206 1258 ··· 1225 1277 }) 1226 1278 return err 1227 1279 } 1280 + 1281 + func repoCollaboratorRecord(f *models.Repo, subject string, createdAt time.Time) *tangled.RepoCollaborator { 1282 + rec := &tangled.RepoCollaborator{ 1283 + Subject: subject, 1284 + CreatedAt: createdAt.Format(time.RFC3339), 1285 + } 1286 + s := string(f.RepoAt()) 1287 + rec.Repo = &s 1288 + if f.RepoDid != "" { 1289 + rec.RepoDid = &f.RepoDid 1290 + } 1291 + return rec 1292 + }
+7 -7
appview/repo/settings.go
··· 293 293 // Skip entirely if there is no active domain claim — the site cannot be served anyway. 294 294 ownerClaim, _ := db.GetActiveDomainClaimForDid(rp.db, f.Did) 295 295 if ownerClaim == nil { 296 - rp.logger.Info("skipping deploy: no active domain claim", "repo", f.DidSlashRepo()) 296 + rp.logger.Info("skipping deploy: no active domain claim", "repo", f.RepoIdentifier()) 297 297 } else if rp.cfClient.Enabled() { 298 298 scheme := "http" 299 299 if !rp.config.Core.Dev { ··· 313 313 314 314 deployErr := sites.Deploy(ctx, rp.cfClient, knotHost, f.Did, f.Name, branch, dir) 315 315 if deployErr != nil { 316 - l.Error("sites: initial R2 sync failed", "repo", f.DidSlashRepo(), "err", deployErr) 316 + l.Error("sites: initial R2 sync failed", "repo", f.RepoIdentifier(), "err", deployErr) 317 317 deploy.Status = models.SiteDeployStatusFailure 318 318 deploy.Error = deployErr.Error() 319 319 } else { ··· 321 321 } 322 322 323 323 if err := db.AddSiteDeploy(rp.db, deploy); err != nil { 324 - l.Error("sites: failed to record deploy", "repo", f.DidSlashRepo(), "err", err) 324 + l.Error("sites: failed to record deploy", "repo", f.RepoIdentifier(), "err", err) 325 325 } 326 326 327 327 if deployErr == nil { 328 328 if err := sites.PutDomainMapping(ctx, rp.cfClient, ownerClaim.Domain, f.Did, f.Name, isIndex); err != nil { 329 329 l.Error("sites: KV write failed", "domain", ownerClaim.Domain, "err", err) 330 330 } 331 - rp.logger.Info("site deployed to r2", "repo", f.DidSlashRepo(), "is_index", isIndex) 331 + rp.logger.Info("site deployed to r2", "repo", f.RepoIdentifier(), "is_index", isIndex) 332 332 } 333 333 }() 334 334 } else { 335 - rp.logger.Warn("cloudflare integration is disabled; site won't be deployed", "repo", f.DidSlashRepo()) 335 + rp.logger.Warn("cloudflare integration is disabled; site won't be deployed", "repo", f.RepoIdentifier()) 336 336 } 337 337 338 338 rp.pages.HxRefresh(w) ··· 367 367 go func() { 368 368 ctx := context.Background() 369 369 if err := sites.Delete(ctx, rp.cfClient, f.Did, f.Name); err != nil { 370 - l.Error("sites: R2 delete failed", "repo", f.DidSlashRepo(), "err", err) 370 + l.Error("sites: R2 delete failed", "repo", f.RepoIdentifier(), "err", err) 371 371 } 372 372 if ownerClaim != nil { 373 373 if err := sites.DeleteDomainMapping(ctx, rp.cfClient, ownerClaim.Domain, f.Name); err != nil { ··· 459 459 user := rp.oauth.GetMultiAccountUser(r) 460 460 461 461 collaborators, err := func(repo *models.Repo) ([]pages.Collaborator, error) { 462 - repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(repo.DidSlashRepo(), repo.Knot) 462 + repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(repo.RepoIdentifier(), repo.Knot) 463 463 if err != nil { 464 464 return nil, err 465 465 }
+13 -6
appview/reporesolver/resolver.go
··· 36 36 37 37 // NOTE: this... should not even be here. the entire package will be removed in future refactor 38 38 func GetBaseRepoPath(r *http.Request, repo *models.Repo) string { 39 + if repo.RepoDid != "" { 40 + return repo.RepoDid 41 + } 39 42 var ( 40 43 user = chi.URLParam(r, "user") 41 44 name = chi.URLParam(r, "repo") 42 45 ) 43 46 if user == "" || name == "" { 44 - return repo.DidSlashRepo() 47 + return repo.RepoIdentifier() 45 48 } 46 49 return path.Join(user, name) 47 50 } ··· 77 80 roles := repoinfo.RolesInRepo{} 78 81 if user != nil && user.Active != nil { 79 82 isStarred = db.GetStarStatus(rr.execer, user.Active.Did, repoAt) 80 - roles.Roles = rr.enforcer.GetPermissionsInRepo(user.Active.Did, repo.Knot, repo.DidSlashRepo()) 83 + roles.Roles = rr.enforcer.GetPermissionsInRepo(user.Active.Did, repo.Knot, repo.RepoIdentifier()) 81 84 } 82 85 83 86 stats := repo.RepoStats 84 87 if stats == nil { 85 - starCount, err := db.GetStarCount(rr.execer, repoAt) 86 - if err != nil { 88 + starCount, starErr := db.GetStarCount(rr.execer, repoAt) 89 + if starErr != nil { 87 90 log.Println("failed to get star count for ", repoAt) 88 91 } 89 92 issueCount, err := db.GetIssueCount(rr.execer, repoAt) ··· 104 107 var sourceRepo *models.Repo 105 108 var err error 106 109 if repo.Source != "" { 107 - sourceRepo, err = db.GetRepoByAtUri(rr.execer, repo.Source) 110 + if strings.HasPrefix(repo.Source, "did:") { 111 + sourceRepo, err = db.GetRepoByDid(rr.execer, repo.Source) 112 + } else { 113 + sourceRepo, err = db.GetRepoByAtUri(rr.execer, repo.Source) 114 + } 108 115 if err != nil { 109 - log.Println("failed to get repo by at uri", err) 116 + log.Println("failed to get source repo", err) 110 117 } 111 118 } 112 119
+7 -22
appview/state/git_http.go
··· 37 37 } 38 38 39 39 func (s *State) InfoRefs(w http.ResponseWriter, r *http.Request) { 40 - user := r.Context().Value("resolvedId").(identity.Identity) 41 40 repo := r.Context().Value("repo").(*models.Repo) 42 41 43 42 scheme := "https" ··· 45 44 scheme = "http" 46 45 } 47 46 48 - // check for the 'service' url param 49 47 service := r.URL.Query().Get("service") 50 48 var contentType string 51 49 switch service { 52 50 case "git-receive-pack": 53 51 contentType = "application/x-git-receive-pack-advertisement" 54 52 default: 55 - // git-upload-pack is the default service for git-clone / git-fetch. 56 53 contentType = "application/x-git-upload-pack-advertisement" 57 54 } 58 55 59 - targetURL := fmt.Sprintf("%s://%s/%s/%s/info/refs?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 56 + targetURL := fmt.Sprintf("%s://%s/%s/info/refs?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 60 57 s.proxyRequest(w, r, targetURL, contentType) 61 58 } 62 59 63 60 func (s *State) UploadArchive(w http.ResponseWriter, r *http.Request) { 64 - user, ok := r.Context().Value("resolvedId").(identity.Identity) 65 - if !ok { 66 - http.Error(w, "failed to resolve user", http.StatusInternalServerError) 67 - return 68 - } 69 61 repo := r.Context().Value("repo").(*models.Repo) 70 62 71 63 scheme := "https" ··· 73 65 scheme = "http" 74 66 } 75 67 76 - targetURL := fmt.Sprintf("%s://%s/%s/%s/git-upload-archive?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 68 + targetURL := fmt.Sprintf("%s://%s/%s/git-upload-archive?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 77 69 s.proxyRequest(w, r, targetURL, "application/x-git-upload-archive-result") 78 70 } 79 71 80 72 func (s *State) UploadPack(w http.ResponseWriter, r *http.Request) { 81 - user, ok := r.Context().Value("resolvedId").(identity.Identity) 82 - if !ok { 83 - http.Error(w, "failed to resolve user", http.StatusInternalServerError) 84 - return 85 - } 86 73 repo := r.Context().Value("repo").(*models.Repo) 87 74 88 75 scheme := "https" ··· 90 77 scheme = "http" 91 78 } 92 79 93 - targetURL := fmt.Sprintf("%s://%s/%s/%s/git-upload-pack?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 80 + targetURL := fmt.Sprintf("%s://%s/%s/git-upload-pack?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 94 81 s.proxyRequest(w, r, targetURL, "application/x-git-upload-pack-result") 95 82 } 96 83 97 84 func (s *State) ReceivePack(w http.ResponseWriter, r *http.Request) { 98 - user, ok := r.Context().Value("resolvedId").(identity.Identity) 99 - if !ok { 100 - http.Error(w, "failed to resolve user", http.StatusInternalServerError) 101 - return 102 - } 103 85 repo := r.Context().Value("repo").(*models.Repo) 104 86 105 87 scheme := "https" ··· 107 89 scheme = "http" 108 90 } 109 91 110 - targetURL := fmt.Sprintf("%s://%s/%s/%s/git-receive-pack?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 92 + targetURL := fmt.Sprintf("%s://%s/%s/git-receive-pack?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 111 93 s.proxyRequest(w, r, targetURL, "application/x-git-receive-pack-result") 112 94 } 113 95 ··· 123 105 proxyReq.Header = r.Header.Clone() 124 106 125 107 repoOwnerHandle := chi.URLParam(r, "user") 108 + if id, ok := r.Context().Value("resolvedId").(identity.Identity); ok && !id.Handle.IsInvalidHandle() { 109 + repoOwnerHandle = id.Handle.String() 110 + } 126 111 proxyReq.Header.Set("x-tangled-repo-owner-handle", repoOwnerHandle) 127 112 128 113 resp, err := client.Do(proxyReq)
+58 -53
appview/state/knotstream.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "database/sql" 5 6 "encoding/json" 6 7 "errors" 7 8 "fmt" ··· 66 67 return ec.NewConsumer(cfg), nil 67 68 } 68 69 70 + func resolveRepo(d *db.DB, repoDid *string, ownerDid, repoName string) (*models.Repo, error) { 71 + if repoDid != nil && *repoDid != "" { 72 + return db.GetRepoByDid(d, *repoDid) 73 + } 74 + repos, err := db.GetRepos(d, 1, orm.FilterEq("did", ownerDid), orm.FilterEq("name", repoName)) 75 + if err != nil { 76 + return nil, err 77 + } 78 + if len(repos) == 0 { 79 + return nil, sql.ErrNoRows 80 + } 81 + return &repos[0], nil 82 + } 83 + 69 84 func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 70 85 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 71 86 switch msg.Nsid { ··· 96 111 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 97 112 } 98 113 114 + ownerDid := "" 115 + if record.OwnerDid != nil { 116 + ownerDid = *record.OwnerDid 117 + } 118 + 119 + repo, lookupErr := resolveRepo(d, record.RepoDid, ownerDid, record.RepoName) 120 + if lookupErr != nil { 121 + return fmt.Errorf("failed to look up repo: %w", lookupErr) 122 + } 123 + 99 124 logger.Info("processing gitRefUpdate event", 100 - "repo_did", record.RepoDid, 101 - "repo_name", record.RepoName, 125 + "repo", repo.RepoIdentifier(), 102 126 "ref", record.Ref, 103 127 "old_sha", record.OldSha, 104 128 "new_sha", record.NewSha) 105 129 106 - // trigger webhook notifications first (before other ops that might fail) 107 - var errWebhook error 108 - repos, err := db.GetRepos( 109 - d, 110 - 0, 111 - orm.FilterEq("did", record.RepoDid), 112 - orm.FilterEq("name", record.RepoName), 113 - ) 114 - if err != nil { 115 - errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err) 116 - } else if len(repos) == 1 { 117 - notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 118 - } 130 + notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 119 131 120 132 errPunchcard := populatePunchcard(d, record) 121 133 errLanguages := updateRepoLanguages(d, record) ··· 133 145 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source) 134 146 } 135 147 136 - return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog) 148 + return errors.Join(errPunchcard, errLanguages, errPosthog) 137 149 } 138 150 139 151 // triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites ··· 147 159 } 148 160 pushedBranch := ref.Short() 149 161 150 - repos, err := db.GetRepos( 151 - d, 152 - 0, 153 - orm.FilterEq("did", record.RepoDid), 154 - orm.FilterEq("name", record.RepoName), 155 - ) 156 - if err != nil || len(repos) != 1 { 162 + ownerDid := "" 163 + if record.OwnerDid != nil { 164 + ownerDid = *record.OwnerDid 165 + } 166 + 167 + repo, err := resolveRepo(d, record.RepoDid, ownerDid, record.RepoName) 168 + if err != nil { 157 169 return 158 170 } 159 - repo := repos[0] 160 171 161 172 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoAt().String()) 162 173 if err != nil || siteConfig == nil { ··· 180 191 Trigger: models.SiteDeployTriggerPush, 181 192 } 182 193 183 - deployErr := sites.Deploy(ctx, cfClient, knotHost, record.RepoDid, record.RepoName, siteConfig.Branch, siteConfig.Dir) 194 + deployErr := sites.Deploy(ctx, cfClient, knotHost, repo.RepoIdentifier(), record.RepoName, siteConfig.Branch, siteConfig.Dir) 184 195 if deployErr != nil { 185 - logger.Error("sites: R2 sync failed on push", "repo", record.RepoDid+"/"+record.RepoName, "err", deployErr) 196 + logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr) 186 197 deploy.Status = models.SiteDeployStatusFailure 187 198 deploy.Error = deployErr.Error() 188 199 } else { ··· 190 201 } 191 202 192 203 if err := db.AddSiteDeploy(d, deploy); err != nil { 193 - logger.Error("sites: failed to record deploy", "repo", record.RepoDid+"/"+record.RepoName, "err", err) 204 + logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err) 194 205 } 195 206 196 207 if deployErr == nil { 197 - logger.Info("site deployed to r2", "repo", record.RepoDid+"/"+record.RepoName) 208 + logger.Info("site deployed to r2", "repo", repo.RepoIdentifier()) 198 209 } 199 210 } 200 211 ··· 236 247 237 248 func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 238 249 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 239 - return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 250 + return fmt.Errorf("empty language data for repo: %v/%s", record.OwnerDid, record.RepoName) 240 251 } 241 252 242 - repos, err := db.GetRepos( 243 - d, 244 - 0, 245 - orm.FilterEq("did", record.RepoDid), 246 - orm.FilterEq("name", record.RepoName), 247 - ) 248 - if err != nil { 249 - return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 253 + ownerDid := "" 254 + if record.OwnerDid != nil { 255 + ownerDid = *record.OwnerDid 250 256 } 251 - if len(repos) != 1 { 252 - return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 257 + 258 + r, lookupErr := resolveRepo(d, record.RepoDid, ownerDid, record.RepoName) 259 + if lookupErr != nil { 260 + return fmt.Errorf("failed to look up repo: %w", lookupErr) 253 261 } 254 - repo := repos[0] 262 + repo := *r 255 263 256 264 ref := plumbing.ReferenceName(record.Ref) 257 265 if !ref.IsBranch() { ··· 304 312 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 305 313 } 306 314 307 - // does this repo have a spindle configured? 308 - repos, err := db.GetRepos( 309 - d, 310 - 0, 311 - orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 312 - orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 313 - ) 314 - if err != nil { 315 - return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 315 + repoName := "" 316 + if record.TriggerMetadata.Repo.Repo != nil { 317 + repoName = *record.TriggerMetadata.Repo.Repo 316 318 } 317 - if len(repos) != 1 { 318 - return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 319 + 320 + repo, lookupErr := resolveRepo(d, record.TriggerMetadata.Repo.RepoDid, record.TriggerMetadata.Repo.Did, repoName) 321 + if lookupErr != nil { 322 + return fmt.Errorf("failed to look up repo: %w", lookupErr) 319 323 } 320 - if repos[0].Spindle == "" { 324 + if repo.Spindle == "" { 321 325 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 322 326 } 323 327 ··· 353 357 Rkey: msg.Rkey, 354 358 Knot: source.Key(), 355 359 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 356 - RepoName: record.TriggerMetadata.Repo.Repo, 360 + RepoName: repoName, 361 + RepoDid: repo.RepoDid, 357 362 TriggerId: int(triggerId), 358 363 Sha: sha, 359 364 }
+26 -2
appview/state/router.go
··· 1 1 package state 2 2 3 3 import ( 4 + "database/sql" 5 + "errors" 4 6 "net/http" 5 7 "strings" 6 8 7 9 "github.com/go-chi/chi/v5" 10 + "tangled.org/core/appview/db" 8 11 "tangled.org/core/appview/issues" 9 12 "tangled.org/core/appview/knots" 10 13 "tangled.org/core/appview/labels" ··· 46 49 if len(pathParts) > 0 { 47 50 firstPart := pathParts[0] 48 51 49 - // if using a DID or handle, just continue as per usual 50 - if userutil.IsDid(firstPart) || userutil.IsHandle(firstPart) { 52 + if userutil.IsDid(firstPart) { 53 + repo, err := db.GetRepoByDid(s.db, firstPart) 54 + switch { 55 + case err == nil: 56 + remaining := "" 57 + if len(pathParts) > 1 { 58 + remaining = "/" + pathParts[1] 59 + } 60 + rewritten := "/" + repo.Did + "/" + repo.Name + remaining 61 + r2 := r.Clone(r.Context()) 62 + r2.URL.Path = rewritten 63 + r2.URL.RawPath = rewritten 64 + userRouter.ServeHTTP(w, r2) 65 + case errors.Is(err, sql.ErrNoRows): 66 + userRouter.ServeHTTP(w, r) 67 + default: 68 + s.logger.Error("db error looking up repo DID", "repoDid", firstPart, "err", err) 69 + http.Error(w, "internal server error", http.StatusInternalServerError) 70 + } 71 + return 72 + } 73 + 74 + if userutil.IsHandle(firstPart) { 51 75 userRouter.ServeHTTP(w, r) 52 76 return 53 77 }
+13 -5
appview/state/star.go
··· 12 12 "tangled.org/core/appview/db" 13 13 "tangled.org/core/appview/models" 14 14 "tangled.org/core/appview/pages" 15 + "tangled.org/core/orm" 15 16 "tangled.org/core/tid" 16 17 ) 17 18 ··· 40 41 case http.MethodPost: 41 42 createdAt := time.Now().Format(time.RFC3339) 42 43 rkey := tid.TID() 44 + 45 + subjectStr := subjectUri.String() 46 + starRecord := &tangled.FeedStar{ 47 + CreatedAt: createdAt, 48 + Subject: &subjectStr, 49 + } 50 + repo, err := db.GetRepo(s.db, orm.FilterEq("at_uri", subjectUri.String())) 51 + if err == nil && repo.RepoDid != "" { 52 + starRecord.SubjectDid = &repo.RepoDid 53 + } 54 + 43 55 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 44 56 Collection: tangled.FeedStarNSID, 45 57 Repo: currentUser.Active.Did, 46 58 Rkey: rkey, 47 - Record: &lexutil.LexiconTypeDecoder{ 48 - Val: &tangled.FeedStar{ 49 - Subject: subjectUri.String(), 50 - CreatedAt: createdAt, 51 - }}, 59 + Record: &lexutil.LexiconTypeDecoder{Val: starRecord}, 52 60 }) 53 61 if err != nil { 54 62 log.Println("failed to create atproto record", err)
+88 -38
appview/state/state.go
··· 42 42 "github.com/bluesky-social/indigo/atproto/syntax" 43 43 lexutil "github.com/bluesky-social/indigo/lex/util" 44 44 "github.com/bluesky-social/indigo/xrpc" 45 - securejoin "github.com/cyphar/filepath-securejoin" 45 + 46 46 "github.com/go-chi/chi/v5" 47 47 "github.com/posthog/posthog-go" 48 48 ) ··· 456 456 return 457 457 } 458 458 459 - // create atproto record for this repo 460 459 rkey := tid.TID() 460 + 461 + client, err := s.oauth.ServiceClient( 462 + r, 463 + oauth.WithService(domain), 464 + oauth.WithLxm(tangled.RepoCreateNSID), 465 + oauth.WithDev(s.config.Core.Dev), 466 + ) 467 + if err != nil { 468 + l.Error("service auth failed", "err", err) 469 + s.pages.Notice(w, "repo", "Failed to reach knot server.") 470 + return 471 + } 472 + 473 + input := &tangled.RepoCreate_Input{ 474 + Rkey: rkey, 475 + Name: repoName, 476 + DefaultBranch: &defaultBranch, 477 + } 478 + createResp, xe := tangled.RepoCreate( 479 + r.Context(), 480 + client, 481 + input, 482 + ) 483 + if err := xrpcclient.HandleXrpcErr(xe); err != nil { 484 + l.Error("xrpc error", "xe", xe) 485 + s.pages.Notice(w, "repo", err.Error()) 486 + return 487 + } 488 + 489 + var repoDid string 490 + if createResp != nil && createResp.RepoDid != nil { 491 + repoDid = *createResp.RepoDid 492 + } 493 + if repoDid == "" { 494 + l.Error("knot returned empty repo DID") 495 + s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 496 + return 497 + } 498 + 461 499 repo := &models.Repo{ 462 500 Did: user.Active.Did, 463 501 Name: repoName, ··· 466 504 Description: description, 467 505 Created: time.Now(), 468 506 Labels: s.config.Label.DefaultLabelDefs, 507 + RepoDid: repoDid, 469 508 } 470 509 record := repo.AsRecord() 471 510 511 + cleanupKnot := func() { 512 + go func() { 513 + delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 514 + for attempt, delay := range delays { 515 + time.Sleep(delay) 516 + deleteClient, dErr := s.oauth.ServiceClient( 517 + r, 518 + oauth.WithService(domain), 519 + oauth.WithLxm(tangled.RepoDeleteNSID), 520 + oauth.WithDev(s.config.Core.Dev), 521 + ) 522 + if dErr != nil { 523 + l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 524 + continue 525 + } 526 + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 527 + if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 528 + Did: user.Active.Did, 529 + Name: repoName, 530 + Rkey: rkey, 531 + }); dErr != nil { 532 + cancel() 533 + l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 534 + continue 535 + } 536 + cancel() 537 + l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 538 + return 539 + } 540 + l.Error("exhausted retries for knot cleanup, repo may be orphaned", 541 + "did", user.Active.Did, "repo", repoName, "knot", domain) 542 + }() 543 + } 544 + 472 545 atpClient, err := s.oauth.AuthorizedClient(r) 473 546 if err != nil { 474 547 l.Info("PDS write failed", "err", err) 548 + cleanupKnot() 475 549 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 476 550 return 477 551 } ··· 486 560 }) 487 561 if err != nil { 488 562 l.Info("PDS write failed", "err", err) 563 + cleanupKnot() 489 564 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 490 565 return 491 566 } ··· 501 576 return 502 577 } 503 578 504 - // The rollback function reverts a few things on failure: 505 - // - the pending txn 506 - // - the ACLs 507 - // - the atproto record created 508 579 rollback := func() { 509 580 err1 := tx.Rollback() 510 581 err2 := s.enforcer.E.LoadPolicy() 511 582 err3 := rollbackRecord(context.Background(), aturi, atpClient) 512 583 513 - // ignore txn complete errors, this is okay 514 584 if errors.Is(err1, sql.ErrTxDone) { 515 585 err1 = nil 516 586 } 517 587 518 588 if errs := errors.Join(err1, err2, err3); errs != nil { 519 589 l.Error("failed to rollback changes", "errs", errs) 520 - return 521 590 } 522 - } 523 - defer rollback() 524 591 525 - client, err := s.oauth.ServiceClient( 526 - r, 527 - oauth.WithService(domain), 528 - oauth.WithLxm(tangled.RepoCreateNSID), 529 - oauth.WithDev(s.config.Core.Dev), 530 - ) 531 - if err != nil { 532 - l.Error("service auth failed", "err", err) 533 - s.pages.Notice(w, "repo", "Failed to reach PDS.") 534 - return 535 - } 536 - 537 - xe := tangled.RepoCreate( 538 - r.Context(), 539 - client, 540 - &tangled.RepoCreate_Input{ 541 - Rkey: rkey, 542 - }, 543 - ) 544 - if err := xrpcclient.HandleXrpcErr(xe); err != nil { 545 - l.Error("xrpc error", "xe", xe) 546 - s.pages.Notice(w, "repo", err.Error()) 547 - return 592 + if aturi != "" { 593 + cleanupKnot() 594 + } 548 595 } 596 + defer rollback() 549 597 550 598 err = db.AddRepo(tx, repo) 551 599 if err != nil { ··· 554 602 return 555 603 } 556 604 557 - // acls 558 - p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 559 - err = s.enforcer.AddRepo(user.Active.Did, domain, p) 605 + rbacPath := repo.RepoIdentifier() 606 + err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath) 560 607 if err != nil { 561 608 l.Error("acl setup failed", "err", err) 562 609 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") ··· 577 624 return 578 625 } 579 626 580 - // reset the ATURI because the transaction completed successfully 581 627 aturi = "" 582 628 583 629 s.notifier.NewRepo(r.Context(), repo) 584 - s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 630 + if repoDid != "" { 631 + s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 632 + } else { 633 + s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 634 + } 585 635 } 586 636 } 587 637
+1 -1
appview/validator/label.go
··· 109 109 // validate permissions: only collaborators can apply labels currently 110 110 // 111 111 // TODO: introduce a repo:triage permission 112 - ok, err := v.enforcer.IsPushAllowed(labelOp.Did, repo.Knot, repo.DidSlashRepo()) 112 + ok, err := v.enforcer.IsPushAllowed(labelOp.Did, repo.Knot, repo.RepoIdentifier()) 113 113 if err != nil { 114 114 return fmt.Errorf("failed to enforce permissions: %w", err) 115 115 }

History

14 rounds 2 comments
sign up or login to add to the discussion
1 commit
expand
appview: DID-based routing, state/handler/middleware updates
merge conflicts detected
expand
  • appview/db/repos.go:46
expand 0 comments
1 commit
expand
appview: DID-based routing, state/handler/middleware updates
expand 0 comments
1 commit
expand
appview: DID-based routing, state/handler/middleware updates
expand 2 comments

appview/state/state.go:631 won't this eventually redirect to /{owner}/{reponame}?

wdym by eventually? I was thinking to keep this in, so that we don't simply error out if someone does decide to be clever and link to their git repo by repoDID, we should render the page anyway to reward them for being clever instead of punishing heh

1 commit
expand
appview: DID-based routing, state/handler/middleware updates
expand 0 comments
1 commit
expand
appview: DID-based routing, state/handler/middleware updates
expand 0 comments
1 commit
expand
appview: update state, ingester, middleware, and resolver for repo DID
expand 0 comments
1 commit
expand
appview: update state, ingester, middleware, and resolver for repo DID
expand 0 comments
1 commit
expand
appview: update state, ingester, middleware, and resolver for repo DID
expand 0 comments
1 commit
expand
appview: update state, ingester, middleware, and resolver for repo DID
expand 0 comments
1 commit
expand
appview: update state, ingester, middleware, and resolver for repo DID
expand 0 comments
1 commit
expand
appview: update state, ingester, middleware, and resolver for repo DID
expand 0 comments
1 commit
expand
appview: update state, ingester, middleware, and resolver for repo DID
expand 0 comments
1 commit
expand
appview: update state, ingester, middleware, and resolver for repo DID
expand 0 comments
1 commit
expand
appview: update state, ingester, middleware, and resolver for repo DID
expand 0 comments