Monorepo for Tangled tangled.org

knotserver: add repo DID migration on startup #1144

open opened by oyster.cafe targeting master from oyster.cafe/tangled-core: master
Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mgprvt2exw22
+311 -51
Diff #0
-20
appview/middleware/middleware.go
··· 226 226 return 227 227 } 228 228 229 - if repo.RepoDid != "" && req.Context().Value("repoDidCanonical") == nil { 230 - gitPaths := []string{"/info/refs", "/git-upload-pack", "/git-receive-pack", "/git-upload-archive"} 231 - user := chi.URLParam(req, "user") 232 - repoParam := chi.URLParam(req, "repo") 233 - remaining := strings.TrimPrefix(req.URL.Path, "/"+user+"/"+repoParam) 234 - 235 - isGitPath := slices.ContainsFunc(gitPaths, func(p string) bool { 236 - return strings.HasSuffix(remaining, p) 237 - }) 238 - 239 - if !isGitPath && req.URL.Query().Get("go-get") != "1" { 240 - target := "/" + repo.RepoDid + remaining 241 - if req.URL.RawQuery != "" { 242 - target += "?" + req.URL.RawQuery 243 - } 244 - http.Redirect(w, req, target, http.StatusFound) 245 - return 246 - } 247 - } 248 - 249 229 ctx := context.WithValue(req.Context(), "repo", repo) 250 230 next.ServeHTTP(w, req.WithContext(ctx)) 251 231 })
-6
appview/pages/repoinfo/repoinfo.go
··· 20 20 } 21 21 22 22 func (r RepoInfo) FullName() string { 23 - if r.RepoDid != "" { 24 - return r.RepoDid 25 - } 26 23 return path.Join(r.owner(), r.Name) 27 24 } 28 25 ··· 35 32 } 36 33 37 34 func (r RepoInfo) FullNameWithoutAt() string { 38 - if r.RepoDid != "" { 39 - return userutil.FlattenDid(r.RepoDid) 40 - } 41 35 return path.Join(r.ownerWithoutAt(), r.Name) 42 36 } 43 37
+1 -3
appview/state/router.go
··· 1 1 package state 2 2 3 3 import ( 4 - "context" 5 4 "database/sql" 6 5 "errors" 7 6 "net/http" ··· 61 60 r2 := r.Clone(r.Context()) 62 61 r2.URL.Path = rewritten 63 62 r2.URL.RawPath = rewritten 64 - ctx := context.WithValue(r2.Context(), "repoDidCanonical", true) 65 - userRouter.ServeHTTP(w, r2.WithContext(ctx)) 63 + userRouter.ServeHTTP(w, r2) 66 64 case errors.Is(err, sql.ErrNoRows): 67 65 userRouter.ServeHTTP(w, r) 68 66 default:
+8 -5
eventconsumer/consumer.go
··· 19 19 type ProcessFunc func(ctx context.Context, source Source, message Message) error 20 20 21 21 type Message struct { 22 - Rkey string 23 - Nsid string 24 - // do not full deserialize this portion of the message, processFunc can do that 22 + Rkey string 23 + Nsid string 24 + Created int64 `json:"created"` 25 25 EventJson json.RawMessage `json:"event"` 26 26 } 27 27 ··· 159 159 return 160 160 } 161 161 162 - // update cursor 163 - c.cfg.CursorStore.Set(j.source.Key(), time.Now().UnixNano()) 162 + cursorVal := msg.Created 163 + if cursorVal == 0 { 164 + cursorVal = time.Now().UnixNano() 165 + } 166 + c.cfg.CursorStore.Set(j.source.Key(), cursorVal) 164 167 165 168 if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { 166 169 c.logger.Error("error processing message", "source", j.source, "err", err)
+7 -7
knotserver/events.go
··· 44 44 } 45 45 }() 46 46 47 - defaultCursor := time.Now().UnixNano() 47 + var cursor int64 48 48 cursorStr := r.URL.Query().Get("cursor") 49 - cursor, err := strconv.ParseInt(cursorStr, 10, 64) 50 - if err != nil { 51 - l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 52 - } 53 - if cursor == 0 { 54 - cursor = defaultCursor 49 + if cursorStr != "" { 50 + cursor, err = strconv.ParseInt(cursorStr, 10, 64) 51 + if err != nil { 52 + l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr) 53 + cursor = 0 54 + } 55 55 } 56 56 57 57 l.Debug("going through backfill", "cursor", cursor)
+282
knotserver/migrate.go
··· 1 + package knotserver 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "math/rand/v2" 10 + "os" 11 + "path/filepath" 12 + "strings" 13 + "time" 14 + 15 + "tangled.org/core/knotserver/config" 16 + "tangled.org/core/knotserver/db" 17 + "tangled.org/core/knotserver/repodid" 18 + "tangled.org/core/notifier" 19 + "tangled.org/core/rbac" 20 + ) 21 + 22 + type legacyRepo struct { 23 + ownerDid string 24 + repoName string 25 + oldPath string 26 + } 27 + 28 + func scanLegacyRepos(scanPath string, d *db.DB, logger *slog.Logger) []legacyRepo { 29 + entries, err := os.ReadDir(scanPath) 30 + if err != nil { 31 + logger.Error("reading scan path", "error", err) 32 + return nil 33 + } 34 + 35 + return collectLegacyEntries(entries, scanPath, d, logger) 36 + } 37 + 38 + func collectLegacyEntries(entries []os.DirEntry, scanPath string, d *db.DB, logger *slog.Logger) []legacyRepo { 39 + var repos []legacyRepo 40 + for _, entry := range entries { 41 + if !entry.IsDir() || !strings.HasPrefix(entry.Name(), "did:") { 42 + continue 43 + } 44 + 45 + ownerPath := filepath.Join(scanPath, entry.Name()) 46 + 47 + if _, headErr := os.Stat(filepath.Join(ownerPath, "HEAD")); headErr == nil { 48 + continue 49 + } 50 + 51 + subEntries, readErr := os.ReadDir(ownerPath) 52 + if readErr != nil { 53 + logger.Error("reading owner dir", "ownerDir", ownerPath, "error", readErr) 54 + continue 55 + } 56 + 57 + repos = append(repos, collectReposInOwnerDir(subEntries, entry.Name(), ownerPath, d, logger)...) 58 + } 59 + return repos 60 + } 61 + 62 + func collectReposInOwnerDir(entries []os.DirEntry, ownerDid, ownerPath string, d *db.DB, logger *slog.Logger) []legacyRepo { 63 + var repos []legacyRepo 64 + for _, sub := range entries { 65 + if !sub.IsDir() { 66 + continue 67 + } 68 + 69 + subPath := filepath.Join(ownerPath, sub.Name()) 70 + if _, err := os.Stat(filepath.Join(subPath, "HEAD")); err != nil { 71 + logger.Warn("skipping non-repo directory", "path", subPath) 72 + continue 73 + } 74 + 75 + repos = append(repos, legacyRepo{ 76 + ownerDid: ownerDid, 77 + repoName: sub.Name(), 78 + oldPath: subPath, 79 + }) 80 + } 81 + return repos 82 + } 83 + 84 + func migrateReposOnStartup(ctx context.Context, c *config.Config, d *db.DB, e *rbac.Enforcer, n *notifier.Notifier, logger *slog.Logger) { 85 + repos := scanLegacyRepos(c.Repo.ScanPath, d, logger) 86 + if len(repos) == 0 { 87 + logger.Info("no legacy repos found, migration complete") 88 + return 89 + } 90 + 91 + logger.Info("starting legacy repo migration", "count", len(repos)) 92 + start := time.Now() 93 + 94 + knotServiceUrl := "https://" + c.Server.Hostname 95 + if c.Server.Dev { 96 + knotServiceUrl = "http://" + c.Server.Hostname 97 + } 98 + 99 + migrated := 0 100 + for _, repo := range repos { 101 + select { 102 + case <-ctx.Done(): 103 + logger.Info("migration interrupted by shutdown", "migrated", migrated, "remaining", len(repos)-migrated) 104 + return 105 + default: 106 + } 107 + 108 + err := migrateOneRepo(ctx, c, d, e, n, logger, repo, knotServiceUrl) 109 + if err != nil { 110 + logger.Error("migration failed for repo", "owner", repo.ownerDid, "repo", repo.repoName, "error", err) 111 + continue 112 + } 113 + migrated++ 114 + } 115 + 116 + logger.Info("legacy repo migration complete", "migrated", migrated, "total", len(repos), "duration", time.Since(start)) 117 + } 118 + 119 + func migrateOneRepo( 120 + ctx context.Context, 121 + c *config.Config, 122 + d *db.DB, 123 + e *rbac.Enforcer, 124 + n *notifier.Notifier, 125 + logger *slog.Logger, 126 + repo legacyRepo, 127 + knotServiceUrl string, 128 + ) error { 129 + l := logger.With("owner", repo.ownerDid, "repo", repo.repoName) 130 + 131 + repoDid, err := d.GetRepoDid(repo.ownerDid, repo.repoName) 132 + needsMint := errors.Is(err, sql.ErrNoRows) 133 + if err != nil && !needsMint { 134 + return fmt.Errorf("checking repo_keys: %w", err) 135 + } 136 + 137 + if needsMint { 138 + repoDid, err = mintAndStoreRepoDID(ctx, c, d, l, repo, knotServiceUrl) 139 + if err != nil { 140 + return err 141 + } 142 + } 143 + 144 + if err := rewriteRBACPolicies(e, repo.ownerDid, repo.repoName, repoDid, l); err != nil { 145 + l.Error("RBAC rewrite failed (non-fatal)", "error", err) 146 + } 147 + 148 + newPath := filepath.Join(c.Repo.ScanPath, repoDid) 149 + 150 + if err := moveRepoOnDisk(repo.oldPath, newPath, l); err != nil { 151 + return err 152 + } 153 + 154 + ownerDir := filepath.Dir(repo.oldPath) 155 + if err := os.Remove(ownerDir); err != nil && !errors.Is(err, os.ErrNotExist) { 156 + l.Warn("could not remove empty owner dir", "path", ownerDir, "error", err) 157 + } 158 + 159 + if err := d.EmitDIDAssign(n, repo.ownerDid, repo.repoName, repoDid, ""); err != nil { 160 + l.Error("emitting didAssign event failed (non-fatal)", "error", err) 161 + } 162 + 163 + l.Info("migrated repo", "repoDid", repoDid) 164 + return nil 165 + } 166 + 167 + func mintAndStoreRepoDID( 168 + ctx context.Context, 169 + c *config.Config, 170 + d *db.DB, 171 + l *slog.Logger, 172 + repo legacyRepo, 173 + knotServiceUrl string, 174 + ) (string, error) { 175 + prepared, err := repodid.PrepareRepoDID(c.Server.PlcUrl, knotServiceUrl) 176 + if err != nil { 177 + return "", fmt.Errorf("preparing DID: %w", err) 178 + } 179 + 180 + if err := submitWithBackoff(ctx, prepared, l); err != nil { 181 + return "", fmt.Errorf("PLC submission: %w", err) 182 + } 183 + 184 + if err := d.StoreRepoKey(prepared.RepoDid, prepared.SigningKeyRaw, repo.ownerDid, repo.repoName); err != nil { 185 + return "", fmt.Errorf("storing repo key: %w", err) 186 + } 187 + 188 + return prepared.RepoDid, nil 189 + } 190 + 191 + func submitWithBackoff(ctx context.Context, prepared *repodid.PreparedDID, l *slog.Logger) error { 192 + backoff := 2 * time.Second 193 + const maxBackoff = 5 * time.Minute 194 + const maxAttempts = 20 195 + 196 + for attempt := range maxAttempts { 197 + plcCtx, cancel := context.WithTimeout(ctx, 30*time.Second) 198 + err := prepared.Submit(plcCtx) 199 + cancel() 200 + 201 + if err == nil { 202 + return nil 203 + } 204 + 205 + errMsg := err.Error() 206 + retryable := strings.Contains(errMsg, "429") || 207 + strings.Contains(errMsg, "500") || 208 + strings.Contains(errMsg, "502") || 209 + strings.Contains(errMsg, "503") || 210 + strings.Contains(errMsg, "504") 211 + 212 + if !retryable || attempt == maxAttempts-1 { 213 + return err 214 + } 215 + 216 + jitter := time.Duration(rand.Int64N(int64(backoff / 4))) 217 + delay := backoff + jitter 218 + 219 + l.Info("PLC rate limited, backing off", "delay", delay, "attempt", attempt+1) 220 + 221 + select { 222 + case <-ctx.Done(): 223 + return ctx.Err() 224 + case <-time.After(delay): 225 + } 226 + 227 + backoff = min(backoff*2, maxBackoff) 228 + } 229 + 230 + return fmt.Errorf("unreachable: exceeded max attempts") 231 + } 232 + 233 + func moveRepoOnDisk(oldPath, newPath string, l *slog.Logger) error { 234 + if _, err := os.Stat(newPath); err == nil { 235 + l.Info("new path already exists, skipping rename", "newPath", newPath) 236 + return nil 237 + } 238 + 239 + if _, err := os.Stat(oldPath); errors.Is(err, os.ErrNotExist) { 240 + return fmt.Errorf("old path %s does not exist and new path %s not found", oldPath, newPath) 241 + } 242 + 243 + if err := os.Rename(oldPath, newPath); err != nil { 244 + return fmt.Errorf("renaming %s -> %s: %w", oldPath, newPath, err) 245 + } 246 + 247 + return nil 248 + } 249 + 250 + func rewriteRBACPolicies(e *rbac.Enforcer, ownerDid, repoName, repoDid string, l *slog.Logger) error { 251 + oldResource := ownerDid + "/" + repoName 252 + 253 + policies, err := e.E.GetFilteredPolicy(1, rbac.ThisServer, oldResource) 254 + if err != nil { 255 + return fmt.Errorf("getting old policies: %w", err) 256 + } 257 + 258 + var addPolicies [][]string 259 + var removePolicies [][]string 260 + for _, p := range policies { 261 + removePolicies = append(removePolicies, p) 262 + newPolicy := make([]string, len(p)) 263 + copy(newPolicy, p) 264 + newPolicy[2] = repoDid 265 + addPolicies = append(addPolicies, newPolicy) 266 + } 267 + 268 + if len(addPolicies) > 0 { 269 + if _, addErr := e.E.AddPolicies(addPolicies); addErr != nil { 270 + return fmt.Errorf("adding new policies: %w", addErr) 271 + } 272 + } 273 + 274 + if len(removePolicies) > 0 { 275 + if _, rmErr := e.E.RemovePolicies(removePolicies); rmErr != nil { 276 + return fmt.Errorf("removing old policies: %w", rmErr) 277 + } 278 + } 279 + 280 + l.Info("rewrote RBAC policies", "old", oldResource, "new", repoDid, "count", len(policies)) 281 + return nil 282 + }
+2
knotserver/server.go
··· 88 88 89 89 notifier := notifier.New() 90 90 91 + go migrateReposOnStartup(ctx, c, db, e, &notifier, log.SubLogger(logger, "migrate")) 92 + 91 93 mux, err := Setup(ctx, c, db, e, jc, &notifier) 92 94 if err != nil { 93 95 return fmt.Errorf("failed to setup server: %w", err)
+11 -10
spindle/stream.go
··· 52 52 } 53 53 }() 54 54 55 - defaultCursor := time.Now().UnixNano() 55 + var cursor int64 56 56 cursorStr := r.URL.Query().Get("cursor") 57 - cursor, err := strconv.ParseInt(cursorStr, 10, 64) 58 - if err != nil { 59 - l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 60 - } 61 - if cursor == 0 { 62 - cursor = defaultCursor 57 + if cursorStr != "" { 58 + cursor, err = strconv.ParseInt(cursorStr, 10, 64) 59 + if err != nil { 60 + l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr) 61 + cursor = 0 62 + } 63 63 } 64 64 65 65 // complete backfill first before going to live data ··· 239 239 } 240 240 241 241 jsonMsg, err := json.Marshal(map[string]any{ 242 - "rkey": event.Rkey, 243 - "nsid": event.Nsid, 244 - "event": eventJson, 242 + "rkey": event.Rkey, 243 + "nsid": event.Nsid, 244 + "event": eventJson, 245 + "created": event.Created, 245 246 }) 246 247 if err != nil { 247 248 s.l.Error("failed to marshal record", "err", err)

History

12 rounds 1 comment
sign up or login to add to the discussion
1 commit
expand
knotserver: add repo DID migration on startup
merge conflicts detected
expand
  • go.mod:34
  • go.sum:339
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 1 comment
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
2/3 failed, 1/3 success
expand
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
oyster.cafe submitted #0
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments