forked from
tangled.org/core
Monorepo for Tangled
1package knotserver
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "math/rand/v2"
10 "os"
11 "path/filepath"
12 "strings"
13 "time"
14
15 "tangled.org/core/knotserver/config"
16 "tangled.org/core/knotserver/db"
17 "tangled.org/core/knotserver/repodid"
18 "tangled.org/core/notifier"
19 "tangled.org/core/rbac"
20)
21
22type legacyRepo struct {
23 ownerDid string
24 repoName string
25 oldPath string
26}
27
28func scanLegacyRepos(scanPath string, logger *slog.Logger) []legacyRepo {
29 topEntries, err := os.ReadDir(scanPath)
30 if err != nil {
31 logger.Error("reading scan path", "error", err)
32 return nil
33 }
34
35 var repos []legacyRepo
36 for _, entry := range topEntries {
37 if !entry.IsDir() || !strings.HasPrefix(entry.Name(), "did:") {
38 continue
39 }
40
41 ownerPath := filepath.Join(scanPath, entry.Name())
42
43 if _, headErr := os.Stat(filepath.Join(ownerPath, "HEAD")); headErr == nil {
44 continue
45 }
46
47 subEntries, readErr := os.ReadDir(ownerPath)
48 if readErr != nil {
49 logger.Error("reading owner dir", "ownerDir", ownerPath, "error", readErr)
50 continue
51 }
52
53 ownerDid := entry.Name()
54 for _, sub := range subEntries {
55 if !sub.IsDir() {
56 continue
57 }
58 subPath := filepath.Join(ownerPath, sub.Name())
59 if _, statErr := os.Stat(filepath.Join(subPath, "HEAD")); statErr != nil {
60 logger.Warn("skipping non-repo directory", "path", subPath)
61 continue
62 }
63 repos = append(repos, legacyRepo{
64 ownerDid: ownerDid,
65 repoName: sub.Name(),
66 oldPath: subPath,
67 })
68 }
69 }
70 return repos
71}
72
73func migrateReposOnStartup(ctx context.Context, c *config.Config, d *db.DB, e *rbac.Enforcer, n *notifier.Notifier, logger *slog.Logger) {
74 repos := scanLegacyRepos(c.Repo.ScanPath, logger)
75 if len(repos) == 0 {
76 logger.Info("no legacy repos found, migration complete")
77 return
78 }
79
80 logger.Info("starting legacy repo migration", "count", len(repos))
81 start := time.Now()
82
83 knotServiceUrl := "https://" + c.Server.Hostname
84 if c.Server.Dev {
85 knotServiceUrl = "http://" + c.Server.Hostname
86 }
87
88 migrated := 0
89 for _, repo := range repos {
90 select {
91 case <-ctx.Done():
92 logger.Info("migration interrupted by shutdown", "migrated", migrated, "remaining", len(repos)-migrated)
93 return
94 default:
95 }
96
97 err := migrateOneRepo(ctx, c, d, e, n, logger, repo, knotServiceUrl)
98 if err != nil {
99 logger.Error("migration failed for repo", "owner", repo.ownerDid, "repo", repo.repoName, "error", err)
100 continue
101 }
102 migrated++
103 }
104
105 logger.Info("legacy repo migration complete", "migrated", migrated, "total", len(repos), "duration", time.Since(start))
106}
107
108func migrateOneRepo(
109 ctx context.Context,
110 c *config.Config,
111 d *db.DB,
112 e *rbac.Enforcer,
113 n *notifier.Notifier,
114 logger *slog.Logger,
115 repo legacyRepo,
116 knotServiceUrl string,
117) error {
118 l := logger.With("owner", repo.ownerDid, "repo", repo.repoName)
119
120 repoDid, err := d.GetRepoDid(repo.ownerDid, repo.repoName)
121 needsMint := errors.Is(err, sql.ErrNoRows)
122 if err != nil && !needsMint {
123 return fmt.Errorf("checking repo_keys: %w", err)
124 }
125
126 if needsMint {
127 repoDid, err = mintAndStoreRepoDID(ctx, c, d, l, repo, knotServiceUrl)
128 if err != nil {
129 return err
130 }
131 }
132
133 if err := rewriteRBACPolicies(e, repo.ownerDid, repo.repoName, repoDid, l); err != nil {
134 l.Error("RBAC rewrite failed (non-fatal)", "error", err)
135 }
136
137 newPath := filepath.Join(c.Repo.ScanPath, repoDid)
138
139 if err := moveRepoOnDisk(repo.oldPath, newPath, l); err != nil {
140 return err
141 }
142
143 ownerDir := filepath.Dir(repo.oldPath)
144 if err := os.Remove(ownerDir); err != nil && !errors.Is(err, os.ErrNotExist) {
145 l.Warn("could not remove empty owner dir", "path", ownerDir, "error", err)
146 }
147
148 if err := d.EmitDIDAssign(n, repo.ownerDid, repo.repoName, repoDid, ""); err != nil {
149 l.Error("emitting didAssign event failed (non-fatal)", "error", err)
150 }
151
152 l.Info("migrated repo", "repoDid", repoDid)
153 return nil
154}
155
156func mintAndStoreRepoDID(
157 ctx context.Context,
158 c *config.Config,
159 d *db.DB,
160 l *slog.Logger,
161 repo legacyRepo,
162 knotServiceUrl string,
163) (string, error) {
164 prepared, err := repodid.PrepareRepoDID(c.Server.PlcUrl, knotServiceUrl)
165 if err != nil {
166 return "", fmt.Errorf("preparing DID: %w", err)
167 }
168
169 if err := submitWithBackoff(ctx, prepared, l); err != nil {
170 return "", fmt.Errorf("PLC submission: %w", err)
171 }
172
173 if err := d.StoreRepoKey(prepared.RepoDid, prepared.SigningKeyRaw, repo.ownerDid, repo.repoName, ""); err != nil {
174 return "", fmt.Errorf("storing repo key: %w", err)
175 }
176
177 return prepared.RepoDid, nil
178}
179
180func submitWithBackoff(ctx context.Context, prepared *repodid.PreparedDID, l *slog.Logger) error {
181 backoff := 2 * time.Second
182 const maxBackoff = 5 * time.Minute
183 const maxAttempts = 20
184
185 for attempt := range maxAttempts {
186 plcCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
187 err := prepared.Submit(plcCtx)
188 cancel()
189
190 if err == nil {
191 return nil
192 }
193
194 errMsg := err.Error()
195 retryable := strings.Contains(errMsg, "429") ||
196 strings.Contains(errMsg, "500") ||
197 strings.Contains(errMsg, "502") ||
198 strings.Contains(errMsg, "503") ||
199 strings.Contains(errMsg, "504")
200
201 if !retryable || attempt == maxAttempts-1 {
202 return err
203 }
204
205 jitter := time.Duration(rand.Int64N(int64(backoff / 4)))
206 delay := backoff + jitter
207
208 l.Info("PLC rate limited, backing off", "delay", delay, "attempt", attempt+1)
209
210 select {
211 case <-ctx.Done():
212 return ctx.Err()
213 case <-time.After(delay):
214 }
215
216 backoff = min(backoff*2, maxBackoff)
217 }
218
219 return fmt.Errorf("unreachable: exceeded max attempts")
220}
221
222func moveRepoOnDisk(oldPath, newPath string, l *slog.Logger) error {
223 if _, err := os.Stat(newPath); err == nil {
224 l.Info("new path already exists, skipping rename", "newPath", newPath)
225 return nil
226 }
227
228 if _, err := os.Stat(oldPath); errors.Is(err, os.ErrNotExist) {
229 return fmt.Errorf("old path %s does not exist and new path %s not found", oldPath, newPath)
230 }
231
232 if err := os.Rename(oldPath, newPath); err != nil {
233 return fmt.Errorf("renaming %s -> %s: %w", oldPath, newPath, err)
234 }
235
236 return nil
237}
238
239func rewriteRBACPolicies(e *rbac.Enforcer, ownerDid, repoName, repoDid string, l *slog.Logger) error {
240 oldResource := ownerDid + "/" + repoName
241
242 policies, err := e.E.GetFilteredPolicy(1, rbac.ThisServer, oldResource)
243 if err != nil {
244 return fmt.Errorf("getting old policies: %w", err)
245 }
246
247 var addPolicies [][]string
248 var removePolicies [][]string
249 for _, p := range policies {
250 removePolicies = append(removePolicies, p)
251 newPolicy := make([]string, len(p))
252 copy(newPolicy, p)
253 newPolicy[2] = repoDid
254 addPolicies = append(addPolicies, newPolicy)
255 }
256
257 if len(addPolicies) > 0 {
258 if _, addErr := e.E.AddPolicies(addPolicies); addErr != nil {
259 return fmt.Errorf("adding new policies: %w", addErr)
260 }
261 }
262
263 if len(removePolicies) > 0 {
264 if _, rmErr := e.E.RemovePolicies(removePolicies); rmErr != nil {
265 return fmt.Errorf("removing old policies: %w", rmErr)
266 }
267 }
268
269 l.Info("rewrote RBAC policies", "old", oldResource, "new", repoDid, "count", len(policies))
270 return nil
271}