Monorepo for Tangled
at master 271 lines 7.0 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "math/rand/v2" 10 "os" 11 "path/filepath" 12 "strings" 13 "time" 14 15 "tangled.org/core/knotserver/config" 16 "tangled.org/core/knotserver/db" 17 "tangled.org/core/knotserver/repodid" 18 "tangled.org/core/notifier" 19 "tangled.org/core/rbac" 20) 21 22type legacyRepo struct { 23 ownerDid string 24 repoName string 25 oldPath string 26} 27 28func scanLegacyRepos(scanPath string, logger *slog.Logger) []legacyRepo { 29 topEntries, err := os.ReadDir(scanPath) 30 if err != nil { 31 logger.Error("reading scan path", "error", err) 32 return nil 33 } 34 35 var repos []legacyRepo 36 for _, entry := range topEntries { 37 if !entry.IsDir() || !strings.HasPrefix(entry.Name(), "did:") { 38 continue 39 } 40 41 ownerPath := filepath.Join(scanPath, entry.Name()) 42 43 if _, headErr := os.Stat(filepath.Join(ownerPath, "HEAD")); headErr == nil { 44 continue 45 } 46 47 subEntries, readErr := os.ReadDir(ownerPath) 48 if readErr != nil { 49 logger.Error("reading owner dir", "ownerDir", ownerPath, "error", readErr) 50 continue 51 } 52 53 ownerDid := entry.Name() 54 for _, sub := range subEntries { 55 if !sub.IsDir() { 56 continue 57 } 58 subPath := filepath.Join(ownerPath, sub.Name()) 59 if _, statErr := os.Stat(filepath.Join(subPath, "HEAD")); statErr != nil { 60 logger.Warn("skipping non-repo directory", "path", subPath) 61 continue 62 } 63 repos = append(repos, legacyRepo{ 64 ownerDid: ownerDid, 65 repoName: sub.Name(), 66 oldPath: subPath, 67 }) 68 } 69 } 70 return repos 71} 72 73func migrateReposOnStartup(ctx context.Context, c *config.Config, d *db.DB, e *rbac.Enforcer, n *notifier.Notifier, logger *slog.Logger) { 74 repos := scanLegacyRepos(c.Repo.ScanPath, logger) 75 if len(repos) == 0 { 76 logger.Info("no legacy repos found, migration complete") 77 return 78 } 79 80 logger.Info("starting legacy repo migration", "count", len(repos)) 81 start := time.Now() 82 83 knotServiceUrl := "https://" + c.Server.Hostname 84 if c.Server.Dev { 85 knotServiceUrl = "http://" + c.Server.Hostname 86 } 87 88 migrated := 0 89 for _, repo := range repos { 90 select { 91 case <-ctx.Done(): 92 logger.Info("migration interrupted by shutdown", "migrated", migrated, "remaining", len(repos)-migrated) 93 return 94 default: 95 } 96 97 err := migrateOneRepo(ctx, c, d, e, n, logger, repo, knotServiceUrl) 98 if err != nil { 99 logger.Error("migration failed for repo", "owner", repo.ownerDid, "repo", repo.repoName, "error", err) 100 continue 101 } 102 migrated++ 103 } 104 105 logger.Info("legacy repo migration complete", "migrated", migrated, "total", len(repos), "duration", time.Since(start)) 106} 107 108func migrateOneRepo( 109 ctx context.Context, 110 c *config.Config, 111 d *db.DB, 112 e *rbac.Enforcer, 113 n *notifier.Notifier, 114 logger *slog.Logger, 115 repo legacyRepo, 116 knotServiceUrl string, 117) error { 118 l := logger.With("owner", repo.ownerDid, "repo", repo.repoName) 119 120 repoDid, err := d.GetRepoDid(repo.ownerDid, repo.repoName) 121 needsMint := errors.Is(err, sql.ErrNoRows) 122 if err != nil && !needsMint { 123 return fmt.Errorf("checking repo_keys: %w", err) 124 } 125 126 if needsMint { 127 repoDid, err = mintAndStoreRepoDID(ctx, c, d, l, repo, knotServiceUrl) 128 if err != nil { 129 return err 130 } 131 } 132 133 if err := rewriteRBACPolicies(e, repo.ownerDid, repo.repoName, repoDid, l); err != nil { 134 l.Error("RBAC rewrite failed (non-fatal)", "error", err) 135 } 136 137 newPath := filepath.Join(c.Repo.ScanPath, repoDid) 138 139 if err := moveRepoOnDisk(repo.oldPath, newPath, l); err != nil { 140 return err 141 } 142 143 ownerDir := filepath.Dir(repo.oldPath) 144 if err := os.Remove(ownerDir); err != nil && !errors.Is(err, os.ErrNotExist) { 145 l.Warn("could not remove empty owner dir", "path", ownerDir, "error", err) 146 } 147 148 if err := d.EmitDIDAssign(n, repo.ownerDid, repo.repoName, repoDid, ""); err != nil { 149 l.Error("emitting didAssign event failed (non-fatal)", "error", err) 150 } 151 152 l.Info("migrated repo", "repoDid", repoDid) 153 return nil 154} 155 156func mintAndStoreRepoDID( 157 ctx context.Context, 158 c *config.Config, 159 d *db.DB, 160 l *slog.Logger, 161 repo legacyRepo, 162 knotServiceUrl string, 163) (string, error) { 164 prepared, err := repodid.PrepareRepoDID(c.Server.PlcUrl, knotServiceUrl) 165 if err != nil { 166 return "", fmt.Errorf("preparing DID: %w", err) 167 } 168 169 if err := submitWithBackoff(ctx, prepared, l); err != nil { 170 return "", fmt.Errorf("PLC submission: %w", err) 171 } 172 173 if err := d.StoreRepoKey(prepared.RepoDid, prepared.SigningKeyRaw, repo.ownerDid, repo.repoName, ""); err != nil { 174 return "", fmt.Errorf("storing repo key: %w", err) 175 } 176 177 return prepared.RepoDid, nil 178} 179 180func submitWithBackoff(ctx context.Context, prepared *repodid.PreparedDID, l *slog.Logger) error { 181 backoff := 2 * time.Second 182 const maxBackoff = 5 * time.Minute 183 const maxAttempts = 20 184 185 for attempt := range maxAttempts { 186 plcCtx, cancel := context.WithTimeout(ctx, 30*time.Second) 187 err := prepared.Submit(plcCtx) 188 cancel() 189 190 if err == nil { 191 return nil 192 } 193 194 errMsg := err.Error() 195 retryable := strings.Contains(errMsg, "429") || 196 strings.Contains(errMsg, "500") || 197 strings.Contains(errMsg, "502") || 198 strings.Contains(errMsg, "503") || 199 strings.Contains(errMsg, "504") 200 201 if !retryable || attempt == maxAttempts-1 { 202 return err 203 } 204 205 jitter := time.Duration(rand.Int64N(int64(backoff / 4))) 206 delay := backoff + jitter 207 208 l.Info("PLC rate limited, backing off", "delay", delay, "attempt", attempt+1) 209 210 select { 211 case <-ctx.Done(): 212 return ctx.Err() 213 case <-time.After(delay): 214 } 215 216 backoff = min(backoff*2, maxBackoff) 217 } 218 219 return fmt.Errorf("unreachable: exceeded max attempts") 220} 221 222func moveRepoOnDisk(oldPath, newPath string, l *slog.Logger) error { 223 if _, err := os.Stat(newPath); err == nil { 224 l.Info("new path already exists, skipping rename", "newPath", newPath) 225 return nil 226 } 227 228 if _, err := os.Stat(oldPath); errors.Is(err, os.ErrNotExist) { 229 return fmt.Errorf("old path %s does not exist and new path %s not found", oldPath, newPath) 230 } 231 232 if err := os.Rename(oldPath, newPath); err != nil { 233 return fmt.Errorf("renaming %s -> %s: %w", oldPath, newPath, err) 234 } 235 236 return nil 237} 238 239func rewriteRBACPolicies(e *rbac.Enforcer, ownerDid, repoName, repoDid string, l *slog.Logger) error { 240 oldResource := ownerDid + "/" + repoName 241 242 policies, err := e.E.GetFilteredPolicy(1, rbac.ThisServer, oldResource) 243 if err != nil { 244 return fmt.Errorf("getting old policies: %w", err) 245 } 246 247 var addPolicies [][]string 248 var removePolicies [][]string 249 for _, p := range policies { 250 removePolicies = append(removePolicies, p) 251 newPolicy := make([]string, len(p)) 252 copy(newPolicy, p) 253 newPolicy[2] = repoDid 254 addPolicies = append(addPolicies, newPolicy) 255 } 256 257 if len(addPolicies) > 0 { 258 if _, addErr := e.E.AddPolicies(addPolicies); addErr != nil { 259 return fmt.Errorf("adding new policies: %w", addErr) 260 } 261 } 262 263 if len(removePolicies) > 0 { 264 if _, rmErr := e.E.RemovePolicies(removePolicies); rmErr != nil { 265 return fmt.Errorf("removing old policies: %w", rmErr) 266 } 267 } 268 269 l.Info("rewrote RBAC policies", "old", oldResource, "new", repoDid, "count", len(policies)) 270 return nil 271}