Monorepo for Tangled
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview: update state, ingester, middleware, and resolver for repo DID

Signed-off-by: Lewis <lewis@tangled.org>

oyster.cafe bef1ed88 0fbf1c38

verified
+301 -145
+78 -18
appview/ingester.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "database/sql" 5 6 "encoding/json" 7 + "errors" 6 8 "fmt" 7 9 "log/slog" 8 10 "maps" ··· 116 118 return err 117 119 } 118 120 119 - subjectUri, err = syntax.ParseATURI(record.Subject) 120 - if err != nil { 121 - l.Error("invalid record", "err", err) 122 - return err 121 + star := &models.Star{ 122 + Did: did, 123 + Rkey: e.Commit.RKey, 124 + } 125 + 126 + switch { 127 + case record.SubjectDid != nil: 128 + star.SubjectDid = *record.SubjectDid 129 + repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 130 + if repoErr == nil { 131 + subjectUri = repo.RepoAt() 132 + star.RepoAt = subjectUri 133 + } 134 + case record.Subject != nil: 135 + subjectUri, err = syntax.ParseATURI(*record.Subject) 136 + if err != nil { 137 + l.Error("invalid record", "err", err) 138 + return err 139 + } 140 + star.RepoAt = subjectUri 141 + repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 142 + if repoErr == nil && repo.RepoDid != "" { 143 + star.SubjectDid = repo.RepoDid 144 + if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 145 + l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 146 + } 147 + } 148 + default: 149 + l.Error("star record has neither subject nor subjectDid") 150 + return fmt.Errorf("star record has neither subject nor subjectDid") 123 151 } 124 - err = db.AddStar(i.Db, &models.Star{ 125 - Did: did, 126 - RepoAt: subjectUri, 127 - Rkey: e.Commit.RKey, 128 - }) 152 + err = db.AddStar(i.Db, star) 129 153 case jmodels.CommitOperationDelete: 130 154 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 131 155 } ··· 220 244 return err 221 245 } 222 246 223 - repoAt, err := syntax.ParseATURI(record.Repo) 224 - if err != nil { 225 - return err 247 + var repo *models.Repo 248 + if record.RepoDid != nil && *record.RepoDid != "" { 249 + repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 250 + if err != nil && !errors.Is(err, sql.ErrNoRows) { 251 + return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 252 + } 226 253 } 227 - 228 - repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 229 - if err != nil { 230 - return err 254 + if repo == nil && record.Repo != nil { 255 + repoAt, parseErr := syntax.ParseATURI(*record.Repo) 256 + if parseErr != nil { 257 + return parseErr 258 + } 259 + repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 260 + if err != nil { 261 + return err 262 + } 263 + } 264 + if repo == nil { 265 + return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 231 266 } 232 267 233 - ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 268 + ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 234 269 if err != nil || !ok { 235 270 return err 236 271 } 237 272 273 + repoDid := repo.RepoDid 274 + if repoDid == "" && record.RepoDid != nil { 275 + repoDid = *record.RepoDid 276 + } 277 + if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 278 + if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 279 + l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 280 + } 281 + } 282 + 238 283 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 239 284 if err != nil { 240 285 createdAt = time.Now() ··· 243 288 artifact := models.Artifact{ 244 289 Did: did, 245 290 Rkey: e.Commit.RKey, 246 - RepoAt: repoAt, 291 + RepoAt: repo.RepoAt(), 292 + RepoDid: repoDid, 247 293 Tag: plumbing.Hash(record.Tag), 248 294 CreatedAt: createdAt, 249 295 BlobCid: cid.Cid(record.Artifact.Ref), ··· 822 868 823 869 issue := models.IssueFromRecord(did, rkey, record) 824 870 871 + if issue.RepoDid == "" && issue.RepoAt == "" { 872 + return fmt.Errorf("issue record has neither repo nor repoDid") 873 + } 874 + 825 875 if err := i.Validator.ValidateIssue(&issue); err != nil { 826 876 return fmt.Errorf("failed to validate issue: %w", err) 877 + } 878 + 879 + if issue.RepoDid == "" && record.Repo != nil { 880 + repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 881 + if repoErr == nil && repo.RepoDid != "" { 882 + issue.RepoDid = repo.RepoDid 883 + if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 884 + l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 885 + } 886 + } 827 887 } 828 888 829 889 tx, err := ddb.BeginTx(ctx, nil)
+9 -5
appview/middleware/middleware.go
··· 17 17 "tangled.org/core/appview/pages" 18 18 "tangled.org/core/appview/pagination" 19 19 "tangled.org/core/appview/reporesolver" 20 + "tangled.org/core/appview/state/userutil" 20 21 "tangled.org/core/idresolver" 21 22 "tangled.org/core/orm" 22 23 "tangled.org/core/rbac" ··· 161 162 return 162 163 } 163 164 164 - ok, err := mw.enforcer.E.Enforce(actor.Active.Did, f.Knot, f.DidSlashRepo(), requiredPerm) 165 + ok, err := mw.enforcer.E.Enforce(actor.Active.Did, f.Knot, f.RepoIdentifier(), requiredPerm) 165 166 if err != nil || !ok { 166 - log.Printf("%s does not have perms of a %s in repo %s", actor.Active.Did, requiredPerm, f.DidSlashRepo()) 167 + log.Printf("%s does not have perms of a %s in repo %s", actor.Active.Did, requiredPerm, f.RepoIdentifier()) 167 168 http.Error(w, "Forbiden", http.StatusUnauthorized) 168 169 return 169 170 } ··· 188 189 189 190 id, err := mw.idResolver.ResolveIdent(req.Context(), didOrHandle) 190 191 if err != nil { 191 - // invalid did or handle 192 192 log.Printf("failed to resolve did/handle '%s': %s\n", didOrHandle, err) 193 193 mw.pages.Error404(w) 194 194 return ··· 334 334 335 335 if r.Header.Get("User-Agent") == "Go-http-client/1.1" { 336 336 if r.URL.Query().Get("go-get") == "1" { 337 + modulePath := userutil.FlattenDid(fullName) 338 + if strings.Contains(modulePath, ":") { 339 + modulePath = userutil.FlattenDid(f.Did) + "/" + f.Name 340 + } 337 341 html := fmt.Sprintf( 338 342 `<meta name="go-import" content="tangled.sh/%s git https://tangled.sh/%s"/> 339 343 <meta name="go-import" content="tangled.org/%s git https://tangled.org/%s"/>`, 340 - fullName, fullName, 341 - fullName, fullName, 344 + modulePath, fullName, 345 + modulePath, fullName, 342 346 ) 343 347 w.Header().Set("Content-Type", "text/html") 344 348 w.Write([]byte(html))
+14 -6
appview/reporesolver/resolver.go
··· 36 36 37 37 // NOTE: this... should not even be here. the entire package will be removed in future refactor 38 38 func GetBaseRepoPath(r *http.Request, repo *models.Repo) string { 39 + if repo.RepoDid != "" { 40 + return repo.RepoDid 41 + } 39 42 var ( 40 43 user = chi.URLParam(r, "user") 41 44 name = chi.URLParam(r, "repo") 42 45 ) 43 46 if user == "" || name == "" { 44 - return repo.DidSlashRepo() 47 + return repo.RepoIdentifier() 45 48 } 46 49 return path.Join(user, name) 47 50 } ··· 77 80 roles := repoinfo.RolesInRepo{} 78 81 if user != nil && user.Active != nil { 79 82 isStarred = db.GetStarStatus(rr.execer, user.Active.Did, repoAt) 80 - roles.Roles = rr.enforcer.GetPermissionsInRepo(user.Active.Did, repo.Knot, repo.DidSlashRepo()) 83 + roles.Roles = rr.enforcer.GetPermissionsInRepo(user.Active.Did, repo.Knot, repo.RepoIdentifier()) 81 84 } 82 85 83 86 stats := repo.RepoStats 84 87 if stats == nil { 85 - starCount, err := db.GetStarCount(rr.execer, repoAt) 86 - if err != nil { 88 + starCount, starErr := db.GetStarCount(rr.execer, repo.RepoDid, repoAt) 89 + if starErr != nil { 87 90 log.Println("failed to get star count for ", repoAt) 88 91 } 89 92 issueCount, err := db.GetIssueCount(rr.execer, repoAt) ··· 104 107 var sourceRepo *models.Repo 105 108 var err error 106 109 if repo.Source != "" { 107 - sourceRepo, err = db.GetRepoByAtUri(rr.execer, repo.Source) 110 + if strings.HasPrefix(repo.Source, "did:") { 111 + sourceRepo, err = db.GetRepoByDid(rr.execer, repo.Source) 112 + } else { 113 + sourceRepo, err = db.GetRepoByAtUri(rr.execer, repo.Source) 114 + } 108 115 if err != nil { 109 - log.Println("failed to get repo by at uri", err) 116 + log.Println("failed to get source repo", err) 110 117 } 111 118 } 112 119 ··· 114 121 // this is basically a models.Repo 115 122 OwnerDid: ownerId.DID.String(), 116 123 OwnerHandle: ownerId.Handle.String(), 124 + RepoDid: repo.RepoDid, 117 125 Name: repo.Name, 118 126 Rkey: repo.Rkey, 119 127 Description: repo.Description,
+7 -20
appview/state/git_http.go
··· 12 12 ) 13 13 14 14 func (s *State) InfoRefs(w http.ResponseWriter, r *http.Request) { 15 - user := r.Context().Value("resolvedId").(identity.Identity) 16 15 repo := r.Context().Value("repo").(*models.Repo) 17 16 18 17 scheme := "https" ··· 20 19 scheme = "http" 21 20 } 22 21 23 - targetURL := fmt.Sprintf("%s://%s/%s/%s/info/refs?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 22 + targetURL := fmt.Sprintf("%s://%s/%s/info/refs?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 24 23 s.proxyRequest(w, r, targetURL) 25 24 26 25 } 27 26 28 27 func (s *State) UploadArchive(w http.ResponseWriter, r *http.Request) { 29 - user, ok := r.Context().Value("resolvedId").(identity.Identity) 30 - if !ok { 31 - http.Error(w, "failed to resolve user", http.StatusInternalServerError) 32 - return 33 - } 34 28 repo := r.Context().Value("repo").(*models.Repo) 35 29 36 30 scheme := "https" ··· 38 32 scheme = "http" 39 33 } 40 34 41 - targetURL := fmt.Sprintf("%s://%s/%s/%s/git-upload-archive?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 35 + targetURL := fmt.Sprintf("%s://%s/%s/git-upload-archive?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 42 36 s.proxyRequest(w, r, targetURL) 43 37 } 44 38 45 39 func (s *State) UploadPack(w http.ResponseWriter, r *http.Request) { 46 - user, ok := r.Context().Value("resolvedId").(identity.Identity) 47 - if !ok { 48 - http.Error(w, "failed to resolve user", http.StatusInternalServerError) 49 - return 50 - } 51 40 repo := r.Context().Value("repo").(*models.Repo) 52 41 53 42 scheme := "https" ··· 55 44 scheme = "http" 56 45 } 57 46 58 - targetURL := fmt.Sprintf("%s://%s/%s/%s/git-upload-pack?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 47 + targetURL := fmt.Sprintf("%s://%s/%s/git-upload-pack?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 59 48 s.proxyRequest(w, r, targetURL) 60 49 } 61 50 62 51 func (s *State) ReceivePack(w http.ResponseWriter, r *http.Request) { 63 - user, ok := r.Context().Value("resolvedId").(identity.Identity) 64 - if !ok { 65 - http.Error(w, "failed to resolve user", http.StatusInternalServerError) 66 - return 67 - } 68 52 repo := r.Context().Value("repo").(*models.Repo) 69 53 70 54 scheme := "https" ··· 72 56 scheme = "http" 73 57 } 74 58 75 - targetURL := fmt.Sprintf("%s://%s/%s/%s/git-receive-pack?%s", scheme, repo.Knot, user.DID, repo.Name, r.URL.RawQuery) 59 + targetURL := fmt.Sprintf("%s://%s/%s/git-receive-pack?%s", scheme, repo.Knot, repo.RepoIdentifier(), r.URL.RawQuery) 76 60 s.proxyRequest(w, r, targetURL) 77 61 } 78 62 ··· 90 74 proxyReq.Header = r.Header 91 75 92 76 repoOwnerHandle := chi.URLParam(r, "user") 77 + if id, ok := r.Context().Value("resolvedId").(identity.Identity); ok && !id.Handle.IsInvalidHandle() { 78 + repoOwnerHandle = id.Handle.String() 79 + } 93 80 proxyReq.Header.Add("x-tangled-repo-owner-handle", repoOwnerHandle) 94 81 95 82 // Execute request
+52 -48
appview/state/knotstream.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "database/sql" 5 6 "encoding/json" 6 7 "errors" 7 8 "fmt" ··· 88 89 return err 89 90 } 90 91 92 + if record.RepoDid == nil || *record.RepoDid == "" { 93 + logger.Error("gitRefUpdate missing repoDid, skipping", "repo_name", record.RepoName) 94 + return fmt.Errorf("gitRefUpdate missing repoDid") 95 + } 96 + 91 97 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 92 98 if err != nil { 93 99 return err ··· 97 103 } 98 104 99 105 logger.Info("processing gitRefUpdate event", 100 - "repo_did", record.RepoDid, 101 - "repo_name", record.RepoName, 106 + "repo_did", *record.RepoDid, 102 107 "ref", record.Ref, 103 108 "old_sha", record.OldSha, 104 109 "new_sha", record.NewSha) 105 110 106 - // trigger webhook notifications first (before other ops that might fail) 107 111 var errWebhook error 108 - repos, err := db.GetRepos( 109 - d, 110 - 0, 111 - orm.FilterEq("did", record.RepoDid), 112 - orm.FilterEq("name", record.RepoName), 113 - ) 114 - if err != nil { 115 - errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err) 116 - } else if len(repos) == 1 { 112 + 113 + repo, lookupErr := db.GetRepoByDid(d, *record.RepoDid) 114 + if lookupErr != nil && !errors.Is(lookupErr, sql.ErrNoRows) { 115 + return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, lookupErr) 116 + } 117 + 118 + var repos []models.Repo 119 + if lookupErr == nil { 120 + repos = []models.Repo{*repo} 121 + } 122 + 123 + if errWebhook == nil && len(repos) == 1 { 117 124 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 118 125 } 119 126 ··· 147 154 } 148 155 pushedBranch := ref.Short() 149 156 150 - repos, err := db.GetRepos( 151 - d, 152 - 0, 153 - orm.FilterEq("did", record.RepoDid), 154 - orm.FilterEq("name", record.RepoName), 155 - ) 156 - if err != nil || len(repos) != 1 { 157 + if record.RepoDid == nil || *record.RepoDid == "" { 158 + return 159 + } 160 + 161 + repo, err := db.GetRepoByDid(d, *record.RepoDid) 162 + if err != nil { 157 163 return 158 164 } 159 - repo := repos[0] 160 165 161 166 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoAt().String()) 162 167 if err != nil || siteConfig == nil { ··· 180 185 Trigger: models.SiteDeployTriggerPush, 181 186 } 182 187 183 - deployErr := sites.Deploy(ctx, cfClient, knotHost, record.RepoDid, record.RepoName, siteConfig.Branch, siteConfig.Dir) 188 + deployErr := sites.Deploy(ctx, cfClient, knotHost, *record.RepoDid, record.RepoName, siteConfig.Branch, siteConfig.Dir) 184 189 if deployErr != nil { 185 - logger.Error("sites: R2 sync failed on push", "repo", record.RepoDid+"/"+record.RepoName, "err", deployErr) 190 + logger.Error("sites: R2 sync failed on push", "repo", repo.RepoIdentifier(), "err", deployErr) 186 191 deploy.Status = models.SiteDeployStatusFailure 187 192 deploy.Error = deployErr.Error() 188 193 } else { ··· 190 195 } 191 196 192 197 if err := db.AddSiteDeploy(d, deploy); err != nil { 193 - logger.Error("sites: failed to record deploy", "repo", record.RepoDid+"/"+record.RepoName, "err", err) 198 + logger.Error("sites: failed to record deploy", "repo", repo.RepoIdentifier(), "err", err) 194 199 } 195 200 196 201 if deployErr == nil { 197 - logger.Info("site deployed to r2", "repo", record.RepoDid+"/"+record.RepoName) 202 + logger.Info("site deployed to r2", "repo", repo.RepoIdentifier()) 198 203 } 199 204 } 200 205 ··· 236 241 237 242 func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 238 243 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 239 - return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 244 + return fmt.Errorf("empty language data for repo: %v/%s", record.OwnerDid, record.RepoName) 240 245 } 241 246 242 - repos, err := db.GetRepos( 243 - d, 244 - 0, 245 - orm.FilterEq("did", record.RepoDid), 246 - orm.FilterEq("name", record.RepoName), 247 - ) 248 - if err != nil { 249 - return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 247 + if record.RepoDid == nil || *record.RepoDid == "" { 248 + return fmt.Errorf("gitRefUpdate missing repoDid for language update") 250 249 } 251 - if len(repos) != 1 { 252 - return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 250 + 251 + r, lookupErr := db.GetRepoByDid(d, *record.RepoDid) 252 + if lookupErr != nil { 253 + return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, lookupErr) 253 254 } 254 - repo := repos[0] 255 + repo := *r 255 256 256 257 ref := plumbing.ReferenceName(record.Ref) 257 258 if !ref.IsBranch() { ··· 266 267 267 268 langs = append(langs, models.RepoLanguage{ 268 269 RepoAt: repo.RepoAt(), 270 + RepoDid: repo.RepoDid, 269 271 Ref: ref.Short(), 270 272 IsDefaultRef: record.Meta.IsDefaultRef, 271 273 Language: l.Lang, ··· 304 306 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 305 307 } 306 308 307 - // does this repo have a spindle configured? 308 - repos, err := db.GetRepos( 309 - d, 310 - 0, 311 - orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 312 - orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 313 - ) 314 - if err != nil { 315 - return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 309 + if record.TriggerMetadata.Repo.RepoDid == nil || *record.TriggerMetadata.Repo.RepoDid == "" { 310 + return fmt.Errorf("pipeline missing repoDid: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 316 311 } 317 - if len(repos) != 1 { 318 - return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 312 + 313 + repo, lookupErr := db.GetRepoByDid(d, *record.TriggerMetadata.Repo.RepoDid) 314 + if lookupErr != nil { 315 + return fmt.Errorf("failed to look up repo by DID %s: %w", *record.TriggerMetadata.Repo.RepoDid, lookupErr) 319 316 } 317 + repos := []models.Repo{*repo} 320 318 if repos[0].Spindle == "" { 321 319 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 322 320 } ··· 349 347 return fmt.Errorf("failed to add trigger entry: %w", err) 350 348 } 351 349 350 + repoName := "" 351 + if record.TriggerMetadata.Repo.Repo != nil { 352 + repoName = *record.TriggerMetadata.Repo.Repo 353 + } 354 + 352 355 pipeline := models.Pipeline{ 353 356 Rkey: msg.Rkey, 354 357 Knot: source.Key(), 355 358 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 356 - RepoName: record.TriggerMetadata.Repo.Repo, 359 + RepoName: repoName, 360 + RepoDid: repos[0].RepoDid, 357 361 TriggerId: int(triggerId), 358 362 Sha: sha, 359 363 }
+28 -2
appview/state/router.go
··· 1 1 package state 2 2 3 3 import ( 4 + "context" 5 + "database/sql" 6 + "errors" 4 7 "net/http" 5 8 "strings" 6 9 7 10 "github.com/go-chi/chi/v5" 11 + "tangled.org/core/appview/db" 8 12 "tangled.org/core/appview/issues" 9 13 "tangled.org/core/appview/knots" 10 14 "tangled.org/core/appview/labels" ··· 45 49 if len(pathParts) > 0 { 46 50 firstPart := pathParts[0] 47 51 48 - // if using a DID or handle, just continue as per usual 49 - if userutil.IsDid(firstPart) || userutil.IsHandle(firstPart) { 52 + if userutil.IsDid(firstPart) { 53 + repo, err := db.GetRepoByDid(s.db, firstPart) 54 + switch { 55 + case err == nil: 56 + remaining := "" 57 + if len(pathParts) > 1 { 58 + remaining = "/" + pathParts[1] 59 + } 60 + rewritten := "/" + repo.Did + "/" + repo.Name + remaining 61 + r2 := r.Clone(r.Context()) 62 + r2.URL.Path = rewritten 63 + r2.URL.RawPath = rewritten 64 + ctx := context.WithValue(r2.Context(), "repoDidCanonical", true) 65 + userRouter.ServeHTTP(w, r2.WithContext(ctx)) 66 + case errors.Is(err, sql.ErrNoRows): 67 + userRouter.ServeHTTP(w, r) 68 + default: 69 + s.logger.Error("db error looking up repo DID", "repoDid", firstPart, "err", err) 70 + http.Error(w, "internal server error", http.StatusInternalServerError) 71 + } 72 + return 73 + } 74 + 75 + if userutil.IsHandle(firstPart) { 50 76 userRouter.ServeHTTP(w, r) 51 77 return 52 78 }
+20 -7
appview/state/star.go
··· 12 12 "tangled.org/core/appview/db" 13 13 "tangled.org/core/appview/models" 14 14 "tangled.org/core/appview/pages" 15 + "tangled.org/core/orm" 15 16 "tangled.org/core/tid" 16 17 ) 17 18 ··· 40 41 case http.MethodPost: 41 42 createdAt := time.Now().Format(time.RFC3339) 42 43 rkey := tid.TID() 44 + 45 + starRecord := &tangled.FeedStar{ 46 + CreatedAt: createdAt, 47 + } 48 + repo, err := db.GetRepo(s.db, orm.FilterEq("at_uri", subjectUri.String())) 49 + repoHasDid := err == nil && repo.RepoDid != "" 50 + if repoHasDid { 51 + starRecord.SubjectDid = &repo.RepoDid 52 + } else { 53 + s := subjectUri.String() 54 + starRecord.Subject = &s 55 + } 56 + 43 57 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 44 58 Collection: tangled.FeedStarNSID, 45 59 Repo: currentUser.Active.Did, 46 60 Rkey: rkey, 47 - Record: &lexutil.LexiconTypeDecoder{ 48 - Val: &tangled.FeedStar{ 49 - Subject: subjectUri.String(), 50 - CreatedAt: createdAt, 51 - }}, 61 + Record: &lexutil.LexiconTypeDecoder{Val: starRecord}, 52 62 }) 53 63 if err != nil { 54 64 log.Println("failed to create atproto record", err) ··· 61 71 RepoAt: subjectUri, 62 72 Rkey: rkey, 63 73 } 74 + if repoHasDid { 75 + star.SubjectDid = repo.RepoDid 76 + } 64 77 65 78 err = db.AddStar(s.db, star) 66 79 if err != nil { ··· 68 81 return 69 82 } 70 83 71 - starCount, err := db.GetStarCount(s.db, subjectUri) 84 + starCount, err := db.GetStarCount(s.db, "", subjectUri) 72 85 if err != nil { 73 86 log.Println("failed to get star count for ", subjectUri) 74 87 } ··· 107 120 // this is not an issue, the firehose event might have already done this 108 121 } 109 122 110 - starCount, err := db.GetStarCount(s.db, subjectUri) 123 + starCount, err := db.GetStarCount(s.db, "", subjectUri) 111 124 if err != nil { 112 125 log.Println("failed to get star count for ", subjectUri) 113 126 return
+92 -38
appview/state/state.go
··· 42 42 "github.com/bluesky-social/indigo/atproto/syntax" 43 43 lexutil "github.com/bluesky-social/indigo/lex/util" 44 44 "github.com/bluesky-social/indigo/xrpc" 45 - securejoin "github.com/cyphar/filepath-securejoin" 45 + 46 46 "github.com/go-chi/chi/v5" 47 47 "github.com/posthog/posthog-go" 48 48 ) ··· 444 444 return 445 445 } 446 446 447 - // create atproto record for this repo 448 447 rkey := tid.TID() 448 + 449 + client, err := s.oauth.ServiceClient( 450 + r, 451 + oauth.WithService(domain), 452 + oauth.WithLxm(tangled.RepoCreateNSID), 453 + oauth.WithDev(s.config.Core.Dev), 454 + ) 455 + if err != nil { 456 + l.Error("service auth failed", "err", err) 457 + s.pages.Notice(w, "repo", "Failed to reach knot server.") 458 + return 459 + } 460 + 461 + input := &tangled.RepoCreate_Input{ 462 + Rkey: rkey, 463 + Name: repoName, 464 + DefaultBranch: &defaultBranch, 465 + } 466 + if rd := strings.TrimSpace(r.FormValue("repo_did")); rd != "" { 467 + input.RepoDid = &rd 468 + } 469 + 470 + createResp, xe := tangled.RepoCreate( 471 + r.Context(), 472 + client, 473 + input, 474 + ) 475 + if err := xrpcclient.HandleXrpcErr(xe); err != nil { 476 + l.Error("xrpc error", "xe", xe) 477 + s.pages.Notice(w, "repo", err.Error()) 478 + return 479 + } 480 + 481 + var repoDid string 482 + if createResp != nil && createResp.RepoDid != nil { 483 + repoDid = *createResp.RepoDid 484 + } 485 + if repoDid == "" { 486 + l.Error("knot returned empty repo DID") 487 + s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.") 488 + return 489 + } 490 + 449 491 repo := &models.Repo{ 450 492 Did: user.Active.Did, 451 493 Name: repoName, ··· 454 496 Description: description, 455 497 Created: time.Now(), 456 498 Labels: s.config.Label.DefaultLabelDefs, 499 + RepoDid: repoDid, 457 500 } 458 501 record := repo.AsRecord() 459 502 503 + cleanupKnot := func() { 504 + go func() { 505 + delays := []time.Duration{0, 2 * time.Second, 5 * time.Second} 506 + for attempt, delay := range delays { 507 + time.Sleep(delay) 508 + deleteClient, dErr := s.oauth.ServiceClient( 509 + r, 510 + oauth.WithService(domain), 511 + oauth.WithLxm(tangled.RepoDeleteNSID), 512 + oauth.WithDev(s.config.Core.Dev), 513 + ) 514 + if dErr != nil { 515 + l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr) 516 + continue 517 + } 518 + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 519 + if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{ 520 + Did: user.Active.Did, 521 + Name: repoName, 522 + Rkey: rkey, 523 + }); dErr != nil { 524 + cancel() 525 + l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr) 526 + continue 527 + } 528 + cancel() 529 + l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1) 530 + return 531 + } 532 + l.Error("exhausted retries for knot cleanup, repo may be orphaned", 533 + "did", user.Active.Did, "repo", repoName, "knot", domain) 534 + }() 535 + } 536 + 460 537 atpClient, err := s.oauth.AuthorizedClient(r) 461 538 if err != nil { 462 539 l.Info("PDS write failed", "err", err) 540 + cleanupKnot() 463 541 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 464 542 return 465 543 } ··· 474 552 }) 475 553 if err != nil { 476 554 l.Info("PDS write failed", "err", err) 555 + cleanupKnot() 477 556 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 478 557 return 479 558 } ··· 489 568 return 490 569 } 491 570 492 - // The rollback function reverts a few things on failure: 493 - // - the pending txn 494 - // - the ACLs 495 - // - the atproto record created 496 571 rollback := func() { 497 572 err1 := tx.Rollback() 498 573 err2 := s.enforcer.E.LoadPolicy() 499 574 err3 := rollbackRecord(context.Background(), aturi, atpClient) 500 575 501 - // ignore txn complete errors, this is okay 502 576 if errors.Is(err1, sql.ErrTxDone) { 503 577 err1 = nil 504 578 } 505 579 506 580 if errs := errors.Join(err1, err2, err3); errs != nil { 507 581 l.Error("failed to rollback changes", "errs", errs) 508 - return 509 582 } 510 - } 511 - defer rollback() 512 583 513 - client, err := s.oauth.ServiceClient( 514 - r, 515 - oauth.WithService(domain), 516 - oauth.WithLxm(tangled.RepoCreateNSID), 517 - oauth.WithDev(s.config.Core.Dev), 518 - ) 519 - if err != nil { 520 - l.Error("service auth failed", "err", err) 521 - s.pages.Notice(w, "repo", "Failed to reach PDS.") 522 - return 523 - } 524 - 525 - xe := tangled.RepoCreate( 526 - r.Context(), 527 - client, 528 - &tangled.RepoCreate_Input{ 529 - Rkey: rkey, 530 - }, 531 - ) 532 - if err := xrpcclient.HandleXrpcErr(xe); err != nil { 533 - l.Error("xrpc error", "xe", xe) 534 - s.pages.Notice(w, "repo", err.Error()) 535 - return 584 + if aturi != "" { 585 + cleanupKnot() 586 + } 536 587 } 588 + defer rollback() 537 589 538 590 err = db.AddRepo(tx, repo) 539 591 if err != nil { ··· 542 594 return 543 595 } 544 596 545 - // acls 546 - p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 547 - err = s.enforcer.AddRepo(user.Active.Did, domain, p) 597 + rbacPath := repo.RepoIdentifier() 598 + err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath) 548 599 if err != nil { 549 600 l.Error("acl setup failed", "err", err) 550 601 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") ··· 565 616 return 566 617 } 567 618 568 - // reset the ATURI because the transaction completed successfully 569 619 aturi = "" 570 620 571 621 s.notifier.NewRepo(r.Context(), repo) 572 - s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 622 + if repoDid != "" { 623 + s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid)) 624 + } else { 625 + s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 626 + } 573 627 } 574 628 } 575 629
+1 -1
appview/validator/label.go
··· 109 109 // validate permissions: only collaborators can apply labels currently 110 110 // 111 111 // TODO: introduce a repo:triage permission 112 - ok, err := v.enforcer.IsPushAllowed(labelOp.Did, repo.Knot, repo.DidSlashRepo()) 112 + ok, err := v.enforcer.IsPushAllowed(labelOp.Did, repo.Knot, repo.RepoIdentifier()) 113 113 if err != nil { 114 114 return fmt.Errorf("failed to enforce permissions: %w", err) 115 115 }