Monorepo for Tangled tangled.org

knotserver: add repo DID migration on startup #1144

open opened by oyster.cafe targeting master from oyster.cafe/tangled-core: master
Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mgprvt2exw22
+303 -25
Diff #9
+1
appview/db/db.go
··· 34 34 "_journal_mode=WAL", 35 35 "_synchronous=NORMAL", 36 36 "_auto_vacuum=incremental", 37 + "_busy_timeout=5000", 37 38 } 38 39 39 40 logger := log.FromContext(ctx)
+1 -3
appview/state/router.go
··· 1 1 package state 2 2 3 3 import ( 4 - "context" 5 4 "database/sql" 6 5 "errors" 7 6 "net/http" ··· 62 61 r2 := r.Clone(r.Context()) 63 62 r2.URL.Path = rewritten 64 63 r2.URL.RawPath = rewritten 65 - ctx := context.WithValue(r2.Context(), "repoDidCanonical", true) 66 - userRouter.ServeHTTP(w, r2.WithContext(ctx)) 64 + userRouter.ServeHTTP(w, r2) 67 65 case errors.Is(err, sql.ErrNoRows): 68 66 userRouter.ServeHTTP(w, r) 69 67 default:
+8 -5
eventconsumer/consumer.go
··· 19 19 type ProcessFunc func(ctx context.Context, source Source, message Message) error 20 20 21 21 type Message struct { 22 - Rkey string 23 - Nsid string 24 - // do not full deserialize this portion of the message, processFunc can do that 22 + Rkey string 23 + Nsid string 24 + Created int64 `json:"created"` 25 25 EventJson json.RawMessage `json:"event"` 26 26 } 27 27 ··· 159 159 return 160 160 } 161 161 162 - // update cursor 163 - c.cfg.CursorStore.Set(j.source.Key(), time.Now().UnixNano()) 162 + cursorVal := msg.Created 163 + if cursorVal == 0 { 164 + cursorVal = time.Now().UnixNano() 165 + } 166 + c.cfg.CursorStore.Set(j.source.Key(), cursorVal) 164 167 165 168 if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { 166 169 c.logger.Error("error processing message", "source", j.source, "err", err)
+1
knotserver/db/db.go
··· 25 25 "_journal_mode=WAL", 26 26 "_synchronous=NORMAL", 27 27 "_auto_vacuum=incremental", 28 + "_busy_timeout=5000", 28 29 } 29 30 30 31 logger := log.FromContext(ctx)
+7 -7
knotserver/events.go
··· 44 44 } 45 45 }() 46 46 47 - defaultCursor := time.Now().UnixNano() 47 + var cursor int64 48 48 cursorStr := r.URL.Query().Get("cursor") 49 - cursor, err := strconv.ParseInt(cursorStr, 10, 64) 50 - if err != nil { 51 - l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 52 - } 53 - if cursor == 0 { 54 - cursor = defaultCursor 49 + if cursorStr != "" { 50 + cursor, err = strconv.ParseInt(cursorStr, 10, 64) 51 + if err != nil { 52 + l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr) 53 + cursor = 0 54 + } 55 55 } 56 56 57 57 l.Debug("going through backfill", "cursor", cursor)
+271
knotserver/migrate.go
··· 1 + package knotserver 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "math/rand/v2" 10 + "os" 11 + "path/filepath" 12 + "strings" 13 + "time" 14 + 15 + "tangled.org/core/knotserver/config" 16 + "tangled.org/core/knotserver/db" 17 + "tangled.org/core/knotserver/repodid" 18 + "tangled.org/core/notifier" 19 + "tangled.org/core/rbac" 20 + ) 21 + 22 + type legacyRepo struct { 23 + ownerDid string 24 + repoName string 25 + oldPath string 26 + } 27 + 28 + func scanLegacyRepos(scanPath string, logger *slog.Logger) []legacyRepo { 29 + topEntries, err := os.ReadDir(scanPath) 30 + if err != nil { 31 + logger.Error("reading scan path", "error", err) 32 + return nil 33 + } 34 + 35 + var repos []legacyRepo 36 + for _, entry := range topEntries { 37 + if !entry.IsDir() || !strings.HasPrefix(entry.Name(), "did:") { 38 + continue 39 + } 40 + 41 + ownerPath := filepath.Join(scanPath, entry.Name()) 42 + 43 + if _, headErr := os.Stat(filepath.Join(ownerPath, "HEAD")); headErr == nil { 44 + continue 45 + } 46 + 47 + subEntries, readErr := os.ReadDir(ownerPath) 48 + if readErr != nil { 49 + logger.Error("reading owner dir", "ownerDir", ownerPath, "error", readErr) 50 + continue 51 + } 52 + 53 + ownerDid := entry.Name() 54 + for _, sub := range subEntries { 55 + if !sub.IsDir() { 56 + continue 57 + } 58 + subPath := filepath.Join(ownerPath, sub.Name()) 59 + if _, statErr := os.Stat(filepath.Join(subPath, "HEAD")); statErr != nil { 60 + logger.Warn("skipping non-repo directory", "path", subPath) 61 + continue 62 + } 63 + repos = append(repos, legacyRepo{ 64 + ownerDid: ownerDid, 65 + repoName: sub.Name(), 66 + oldPath: subPath, 67 + }) 68 + } 69 + } 70 + return repos 71 + } 72 + 73 + func migrateReposOnStartup(ctx context.Context, c *config.Config, d *db.DB, e *rbac.Enforcer, n *notifier.Notifier, logger *slog.Logger) { 74 + repos := scanLegacyRepos(c.Repo.ScanPath, logger) 75 + if len(repos) == 0 { 76 + logger.Info("no legacy repos found, migration complete") 77 + return 78 + } 79 + 80 + logger.Info("starting legacy repo migration", "count", len(repos)) 81 + start := time.Now() 82 + 83 + knotServiceUrl := "https://" + c.Server.Hostname 84 + if c.Server.Dev { 85 + knotServiceUrl = "http://" + c.Server.Hostname 86 + } 87 + 88 + migrated := 0 89 + for _, repo := range repos { 90 + select { 91 + case <-ctx.Done(): 92 + logger.Info("migration interrupted by shutdown", "migrated", migrated, "remaining", len(repos)-migrated) 93 + return 94 + default: 95 + } 96 + 97 + err := migrateOneRepo(ctx, c, d, e, n, logger, repo, knotServiceUrl) 98 + if err != nil { 99 + logger.Error("migration failed for repo", "owner", repo.ownerDid, "repo", repo.repoName, "error", err) 100 + continue 101 + } 102 + migrated++ 103 + } 104 + 105 + logger.Info("legacy repo migration complete", "migrated", migrated, "total", len(repos), "duration", time.Since(start)) 106 + } 107 + 108 + func migrateOneRepo( 109 + ctx context.Context, 110 + c *config.Config, 111 + d *db.DB, 112 + e *rbac.Enforcer, 113 + n *notifier.Notifier, 114 + logger *slog.Logger, 115 + repo legacyRepo, 116 + knotServiceUrl string, 117 + ) error { 118 + l := logger.With("owner", repo.ownerDid, "repo", repo.repoName) 119 + 120 + repoDid, err := d.GetRepoDid(repo.ownerDid, repo.repoName) 121 + needsMint := errors.Is(err, sql.ErrNoRows) 122 + if err != nil && !needsMint { 123 + return fmt.Errorf("checking repo_keys: %w", err) 124 + } 125 + 126 + if needsMint { 127 + repoDid, err = mintAndStoreRepoDID(ctx, c, d, l, repo, knotServiceUrl) 128 + if err != nil { 129 + return err 130 + } 131 + } 132 + 133 + if err := rewriteRBACPolicies(e, repo.ownerDid, repo.repoName, repoDid, l); err != nil { 134 + l.Error("RBAC rewrite failed (non-fatal)", "error", err) 135 + } 136 + 137 + newPath := filepath.Join(c.Repo.ScanPath, repoDid) 138 + 139 + if err := moveRepoOnDisk(repo.oldPath, newPath, l); err != nil { 140 + return err 141 + } 142 + 143 + ownerDir := filepath.Dir(repo.oldPath) 144 + if err := os.Remove(ownerDir); err != nil && !errors.Is(err, os.ErrNotExist) { 145 + l.Warn("could not remove empty owner dir", "path", ownerDir, "error", err) 146 + } 147 + 148 + if err := d.EmitDIDAssign(n, repo.ownerDid, repo.repoName, repoDid, ""); err != nil { 149 + l.Error("emitting didAssign event failed (non-fatal)", "error", err) 150 + } 151 + 152 + l.Info("migrated repo", "repoDid", repoDid) 153 + return nil 154 + } 155 + 156 + func mintAndStoreRepoDID( 157 + ctx context.Context, 158 + c *config.Config, 159 + d *db.DB, 160 + l *slog.Logger, 161 + repo legacyRepo, 162 + knotServiceUrl string, 163 + ) (string, error) { 164 + prepared, err := repodid.PrepareRepoDID(c.Server.PlcUrl, knotServiceUrl) 165 + if err != nil { 166 + return "", fmt.Errorf("preparing DID: %w", err) 167 + } 168 + 169 + if err := submitWithBackoff(ctx, prepared, l); err != nil { 170 + return "", fmt.Errorf("PLC submission: %w", err) 171 + } 172 + 173 + if err := d.StoreRepoKey(prepared.RepoDid, prepared.SigningKeyRaw, repo.ownerDid, repo.repoName); err != nil { 174 + return "", fmt.Errorf("storing repo key: %w", err) 175 + } 176 + 177 + return prepared.RepoDid, nil 178 + } 179 + 180 + func submitWithBackoff(ctx context.Context, prepared *repodid.PreparedDID, l *slog.Logger) error { 181 + backoff := 2 * time.Second 182 + const maxBackoff = 5 * time.Minute 183 + const maxAttempts = 20 184 + 185 + for attempt := range maxAttempts { 186 + plcCtx, cancel := context.WithTimeout(ctx, 30*time.Second) 187 + err := prepared.Submit(plcCtx) 188 + cancel() 189 + 190 + if err == nil { 191 + return nil 192 + } 193 + 194 + errMsg := err.Error() 195 + retryable := strings.Contains(errMsg, "429") || 196 + strings.Contains(errMsg, "500") || 197 + strings.Contains(errMsg, "502") || 198 + strings.Contains(errMsg, "503") || 199 + strings.Contains(errMsg, "504") 200 + 201 + if !retryable || attempt == maxAttempts-1 { 202 + return err 203 + } 204 + 205 + jitter := time.Duration(rand.Int64N(int64(backoff / 4))) 206 + delay := backoff + jitter 207 + 208 + l.Info("PLC rate limited, backing off", "delay", delay, "attempt", attempt+1) 209 + 210 + select { 211 + case <-ctx.Done(): 212 + return ctx.Err() 213 + case <-time.After(delay): 214 + } 215 + 216 + backoff = min(backoff*2, maxBackoff) 217 + } 218 + 219 + return fmt.Errorf("unreachable: exceeded max attempts") 220 + } 221 + 222 + func moveRepoOnDisk(oldPath, newPath string, l *slog.Logger) error { 223 + if _, err := os.Stat(newPath); err == nil { 224 + l.Info("new path already exists, skipping rename", "newPath", newPath) 225 + return nil 226 + } 227 + 228 + if _, err := os.Stat(oldPath); errors.Is(err, os.ErrNotExist) { 229 + return fmt.Errorf("old path %s does not exist and new path %s not found", oldPath, newPath) 230 + } 231 + 232 + if err := os.Rename(oldPath, newPath); err != nil { 233 + return fmt.Errorf("renaming %s -> %s: %w", oldPath, newPath, err) 234 + } 235 + 236 + return nil 237 + } 238 + 239 + func rewriteRBACPolicies(e *rbac.Enforcer, ownerDid, repoName, repoDid string, l *slog.Logger) error { 240 + oldResource := ownerDid + "/" + repoName 241 + 242 + policies, err := e.E.GetFilteredPolicy(1, rbac.ThisServer, oldResource) 243 + if err != nil { 244 + return fmt.Errorf("getting old policies: %w", err) 245 + } 246 + 247 + var addPolicies [][]string 248 + var removePolicies [][]string 249 + for _, p := range policies { 250 + removePolicies = append(removePolicies, p) 251 + newPolicy := make([]string, len(p)) 252 + copy(newPolicy, p) 253 + newPolicy[2] = repoDid 254 + addPolicies = append(addPolicies, newPolicy) 255 + } 256 + 257 + if len(addPolicies) > 0 { 258 + if _, addErr := e.E.AddPolicies(addPolicies); addErr != nil { 259 + return fmt.Errorf("adding new policies: %w", addErr) 260 + } 261 + } 262 + 263 + if len(removePolicies) > 0 { 264 + if _, rmErr := e.E.RemovePolicies(removePolicies); rmErr != nil { 265 + return fmt.Errorf("removing old policies: %w", rmErr) 266 + } 267 + } 268 + 269 + l.Info("rewrote RBAC policies", "old", oldResource, "new", repoDid, "count", len(policies)) 270 + return nil 271 + }
+2
knotserver/server.go
··· 88 88 89 89 notifier := notifier.New() 90 90 91 + go migrateReposOnStartup(ctx, c, db, e, &notifier, log.SubLogger(logger, "migrate")) 92 + 91 93 mux, err := Setup(ctx, c, db, e, jc, &notifier) 92 94 if err != nil { 93 95 return fmt.Errorf("failed to setup server: %w", err)
+1
spindle/db/db.go
··· 18 18 "_journal_mode=WAL", 19 19 "_synchronous=NORMAL", 20 20 "_auto_vacuum=incremental", 21 + "_busy_timeout=5000", 21 22 } 22 23 23 24 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&"))
+11 -10
spindle/stream.go
··· 52 52 } 53 53 }() 54 54 55 - defaultCursor := time.Now().UnixNano() 55 + var cursor int64 56 56 cursorStr := r.URL.Query().Get("cursor") 57 - cursor, err := strconv.ParseInt(cursorStr, 10, 64) 58 - if err != nil { 59 - l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor) 60 - } 61 - if cursor == 0 { 62 - cursor = defaultCursor 57 + if cursorStr != "" { 58 + cursor, err = strconv.ParseInt(cursorStr, 10, 64) 59 + if err != nil { 60 + l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr) 61 + cursor = 0 62 + } 63 63 } 64 64 65 65 // complete backfill first before going to live data ··· 239 239 } 240 240 241 241 jsonMsg, err := json.Marshal(map[string]any{ 242 - "rkey": event.Rkey, 243 - "nsid": event.Nsid, 244 - "event": eventJson, 242 + "rkey": event.Rkey, 243 + "nsid": event.Nsid, 244 + "event": eventJson, 245 + "created": event.Created, 245 246 }) 246 247 if err != nil { 247 248 s.l.Error("failed to marshal record", "err", err)

History

12 rounds 1 comment
sign up or login to add to the discussion
1 commit
expand
knotserver: add repo DID migration on startup
merge conflicts detected
expand
  • go.mod:34
  • go.sum:339
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 1 comment
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
2/3 failed, 1/3 success
expand
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments
1 commit
expand
knotserver: add repo DID migration on startup
expand 0 comments