Signed-off-by: Lewis lewis@tangled.org
+311
-25
Diff
round #2
+1
-3
appview/state/router.go
+1
-3
appview/state/router.go
···
1
1
package state
2
2
3
3
import (
4
-
"context"
5
4
"database/sql"
6
5
"errors"
7
6
"net/http"
···
61
60
r2 := r.Clone(r.Context())
62
61
r2.URL.Path = rewritten
63
62
r2.URL.RawPath = rewritten
64
-
ctx := context.WithValue(r2.Context(), "repoDidCanonical", true)
65
-
userRouter.ServeHTTP(w, r2.WithContext(ctx))
63
+
userRouter.ServeHTTP(w, r2)
66
64
case errors.Is(err, sql.ErrNoRows):
67
65
userRouter.ServeHTTP(w, r)
68
66
default:
+8
-5
eventconsumer/consumer.go
+8
-5
eventconsumer/consumer.go
···
19
19
type ProcessFunc func(ctx context.Context, source Source, message Message) error
20
20
21
21
type Message struct {
22
-
Rkey string
23
-
Nsid string
24
-
// do not full deserialize this portion of the message, processFunc can do that
22
+
Rkey string
23
+
Nsid string
24
+
Created int64 `json:"created"`
25
25
EventJson json.RawMessage `json:"event"`
26
26
}
27
27
···
159
159
return
160
160
}
161
161
162
-
// update cursor
163
-
c.cfg.CursorStore.Set(j.source.Key(), time.Now().UnixNano())
162
+
cursorVal := msg.Created
163
+
if cursorVal == 0 {
164
+
cursorVal = time.Now().UnixNano()
165
+
}
166
+
c.cfg.CursorStore.Set(j.source.Key(), cursorVal)
164
167
165
168
if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil {
166
169
c.logger.Error("error processing message", "source", j.source, "err", err)
+7
-7
knotserver/events.go
+7
-7
knotserver/events.go
···
44
44
}
45
45
}()
46
46
47
-
defaultCursor := time.Now().UnixNano()
47
+
var cursor int64
48
48
cursorStr := r.URL.Query().Get("cursor")
49
-
cursor, err := strconv.ParseInt(cursorStr, 10, 64)
50
-
if err != nil {
51
-
l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
52
-
}
53
-
if cursor == 0 {
54
-
cursor = defaultCursor
49
+
if cursorStr != "" {
50
+
cursor, err = strconv.ParseInt(cursorStr, 10, 64)
51
+
if err != nil {
52
+
l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr)
53
+
cursor = 0
54
+
}
55
55
}
56
56
57
57
l.Debug("going through backfill", "cursor", cursor)
+282
knotserver/migrate.go
+282
knotserver/migrate.go
···
1
+
package knotserver
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"errors"
7
+
"fmt"
8
+
"log/slog"
9
+
"math/rand/v2"
10
+
"os"
11
+
"path/filepath"
12
+
"strings"
13
+
"time"
14
+
15
+
"tangled.org/core/knotserver/config"
16
+
"tangled.org/core/knotserver/db"
17
+
"tangled.org/core/knotserver/repodid"
18
+
"tangled.org/core/notifier"
19
+
"tangled.org/core/rbac"
20
+
)
21
+
22
+
type legacyRepo struct {
23
+
ownerDid string
24
+
repoName string
25
+
oldPath string
26
+
}
27
+
28
+
func scanLegacyRepos(scanPath string, d *db.DB, logger *slog.Logger) []legacyRepo {
29
+
entries, err := os.ReadDir(scanPath)
30
+
if err != nil {
31
+
logger.Error("reading scan path", "error", err)
32
+
return nil
33
+
}
34
+
35
+
return collectLegacyEntries(entries, scanPath, d, logger)
36
+
}
37
+
38
+
func collectLegacyEntries(entries []os.DirEntry, scanPath string, d *db.DB, logger *slog.Logger) []legacyRepo {
39
+
var repos []legacyRepo
40
+
for _, entry := range entries {
41
+
if !entry.IsDir() || !strings.HasPrefix(entry.Name(), "did:") {
42
+
continue
43
+
}
44
+
45
+
ownerPath := filepath.Join(scanPath, entry.Name())
46
+
47
+
if _, headErr := os.Stat(filepath.Join(ownerPath, "HEAD")); headErr == nil {
48
+
continue
49
+
}
50
+
51
+
subEntries, readErr := os.ReadDir(ownerPath)
52
+
if readErr != nil {
53
+
logger.Error("reading owner dir", "ownerDir", ownerPath, "error", readErr)
54
+
continue
55
+
}
56
+
57
+
repos = append(repos, collectReposInOwnerDir(subEntries, entry.Name(), ownerPath, d, logger)...)
58
+
}
59
+
return repos
60
+
}
61
+
62
+
func collectReposInOwnerDir(entries []os.DirEntry, ownerDid, ownerPath string, d *db.DB, logger *slog.Logger) []legacyRepo {
63
+
var repos []legacyRepo
64
+
for _, sub := range entries {
65
+
if !sub.IsDir() {
66
+
continue
67
+
}
68
+
69
+
subPath := filepath.Join(ownerPath, sub.Name())
70
+
if _, err := os.Stat(filepath.Join(subPath, "HEAD")); err != nil {
71
+
logger.Warn("skipping non-repo directory", "path", subPath)
72
+
continue
73
+
}
74
+
75
+
repos = append(repos, legacyRepo{
76
+
ownerDid: ownerDid,
77
+
repoName: sub.Name(),
78
+
oldPath: subPath,
79
+
})
80
+
}
81
+
return repos
82
+
}
83
+
84
+
func migrateReposOnStartup(ctx context.Context, c *config.Config, d *db.DB, e *rbac.Enforcer, n *notifier.Notifier, logger *slog.Logger) {
85
+
repos := scanLegacyRepos(c.Repo.ScanPath, d, logger)
86
+
if len(repos) == 0 {
87
+
logger.Info("no legacy repos found, migration complete")
88
+
return
89
+
}
90
+
91
+
logger.Info("starting legacy repo migration", "count", len(repos))
92
+
start := time.Now()
93
+
94
+
knotServiceUrl := "https://" + c.Server.Hostname
95
+
if c.Server.Dev {
96
+
knotServiceUrl = "http://" + c.Server.Hostname
97
+
}
98
+
99
+
migrated := 0
100
+
for _, repo := range repos {
101
+
select {
102
+
case <-ctx.Done():
103
+
logger.Info("migration interrupted by shutdown", "migrated", migrated, "remaining", len(repos)-migrated)
104
+
return
105
+
default:
106
+
}
107
+
108
+
err := migrateOneRepo(ctx, c, d, e, n, logger, repo, knotServiceUrl)
109
+
if err != nil {
110
+
logger.Error("migration failed for repo", "owner", repo.ownerDid, "repo", repo.repoName, "error", err)
111
+
continue
112
+
}
113
+
migrated++
114
+
}
115
+
116
+
logger.Info("legacy repo migration complete", "migrated", migrated, "total", len(repos), "duration", time.Since(start))
117
+
}
118
+
119
+
func migrateOneRepo(
120
+
ctx context.Context,
121
+
c *config.Config,
122
+
d *db.DB,
123
+
e *rbac.Enforcer,
124
+
n *notifier.Notifier,
125
+
logger *slog.Logger,
126
+
repo legacyRepo,
127
+
knotServiceUrl string,
128
+
) error {
129
+
l := logger.With("owner", repo.ownerDid, "repo", repo.repoName)
130
+
131
+
repoDid, err := d.GetRepoDid(repo.ownerDid, repo.repoName)
132
+
needsMint := errors.Is(err, sql.ErrNoRows)
133
+
if err != nil && !needsMint {
134
+
return fmt.Errorf("checking repo_keys: %w", err)
135
+
}
136
+
137
+
if needsMint {
138
+
repoDid, err = mintAndStoreRepoDID(ctx, c, d, l, repo, knotServiceUrl)
139
+
if err != nil {
140
+
return err
141
+
}
142
+
}
143
+
144
+
if err := rewriteRBACPolicies(e, repo.ownerDid, repo.repoName, repoDid, l); err != nil {
145
+
l.Error("RBAC rewrite failed (non-fatal)", "error", err)
146
+
}
147
+
148
+
newPath := filepath.Join(c.Repo.ScanPath, repoDid)
149
+
150
+
if err := moveRepoOnDisk(repo.oldPath, newPath, l); err != nil {
151
+
return err
152
+
}
153
+
154
+
ownerDir := filepath.Dir(repo.oldPath)
155
+
if err := os.Remove(ownerDir); err != nil && !errors.Is(err, os.ErrNotExist) {
156
+
l.Warn("could not remove empty owner dir", "path", ownerDir, "error", err)
157
+
}
158
+
159
+
if err := d.EmitDIDAssign(n, repo.ownerDid, repo.repoName, repoDid, ""); err != nil {
160
+
l.Error("emitting didAssign event failed (non-fatal)", "error", err)
161
+
}
162
+
163
+
l.Info("migrated repo", "repoDid", repoDid)
164
+
return nil
165
+
}
166
+
167
+
func mintAndStoreRepoDID(
168
+
ctx context.Context,
169
+
c *config.Config,
170
+
d *db.DB,
171
+
l *slog.Logger,
172
+
repo legacyRepo,
173
+
knotServiceUrl string,
174
+
) (string, error) {
175
+
prepared, err := repodid.PrepareRepoDID(c.Server.PlcUrl, knotServiceUrl)
176
+
if err != nil {
177
+
return "", fmt.Errorf("preparing DID: %w", err)
178
+
}
179
+
180
+
if err := submitWithBackoff(ctx, prepared, l); err != nil {
181
+
return "", fmt.Errorf("PLC submission: %w", err)
182
+
}
183
+
184
+
if err := d.StoreRepoKey(prepared.RepoDid, prepared.SigningKeyRaw, repo.ownerDid, repo.repoName); err != nil {
185
+
return "", fmt.Errorf("storing repo key: %w", err)
186
+
}
187
+
188
+
return prepared.RepoDid, nil
189
+
}
190
+
191
+
func submitWithBackoff(ctx context.Context, prepared *repodid.PreparedDID, l *slog.Logger) error {
192
+
backoff := 2 * time.Second
193
+
const maxBackoff = 5 * time.Minute
194
+
const maxAttempts = 20
195
+
196
+
for attempt := range maxAttempts {
197
+
plcCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
198
+
err := prepared.Submit(plcCtx)
199
+
cancel()
200
+
201
+
if err == nil {
202
+
return nil
203
+
}
204
+
205
+
errMsg := err.Error()
206
+
retryable := strings.Contains(errMsg, "429") ||
207
+
strings.Contains(errMsg, "500") ||
208
+
strings.Contains(errMsg, "502") ||
209
+
strings.Contains(errMsg, "503") ||
210
+
strings.Contains(errMsg, "504")
211
+
212
+
if !retryable || attempt == maxAttempts-1 {
213
+
return err
214
+
}
215
+
216
+
jitter := time.Duration(rand.Int64N(int64(backoff / 4)))
217
+
delay := backoff + jitter
218
+
219
+
l.Info("PLC rate limited, backing off", "delay", delay, "attempt", attempt+1)
220
+
221
+
select {
222
+
case <-ctx.Done():
223
+
return ctx.Err()
224
+
case <-time.After(delay):
225
+
}
226
+
227
+
backoff = min(backoff*2, maxBackoff)
228
+
}
229
+
230
+
return fmt.Errorf("unreachable: exceeded max attempts")
231
+
}
232
+
233
+
func moveRepoOnDisk(oldPath, newPath string, l *slog.Logger) error {
234
+
if _, err := os.Stat(newPath); err == nil {
235
+
l.Info("new path already exists, skipping rename", "newPath", newPath)
236
+
return nil
237
+
}
238
+
239
+
if _, err := os.Stat(oldPath); errors.Is(err, os.ErrNotExist) {
240
+
return fmt.Errorf("old path %s does not exist and new path %s not found", oldPath, newPath)
241
+
}
242
+
243
+
if err := os.Rename(oldPath, newPath); err != nil {
244
+
return fmt.Errorf("renaming %s -> %s: %w", oldPath, newPath, err)
245
+
}
246
+
247
+
return nil
248
+
}
249
+
250
+
func rewriteRBACPolicies(e *rbac.Enforcer, ownerDid, repoName, repoDid string, l *slog.Logger) error {
251
+
oldResource := ownerDid + "/" + repoName
252
+
253
+
policies, err := e.E.GetFilteredPolicy(1, rbac.ThisServer, oldResource)
254
+
if err != nil {
255
+
return fmt.Errorf("getting old policies: %w", err)
256
+
}
257
+
258
+
var addPolicies [][]string
259
+
var removePolicies [][]string
260
+
for _, p := range policies {
261
+
removePolicies = append(removePolicies, p)
262
+
newPolicy := make([]string, len(p))
263
+
copy(newPolicy, p)
264
+
newPolicy[2] = repoDid
265
+
addPolicies = append(addPolicies, newPolicy)
266
+
}
267
+
268
+
if len(addPolicies) > 0 {
269
+
if _, addErr := e.E.AddPolicies(addPolicies); addErr != nil {
270
+
return fmt.Errorf("adding new policies: %w", addErr)
271
+
}
272
+
}
273
+
274
+
if len(removePolicies) > 0 {
275
+
if _, rmErr := e.E.RemovePolicies(removePolicies); rmErr != nil {
276
+
return fmt.Errorf("removing old policies: %w", rmErr)
277
+
}
278
+
}
279
+
280
+
l.Info("rewrote RBAC policies", "old", oldResource, "new", repoDid, "count", len(policies))
281
+
return nil
282
+
}
+2
knotserver/server.go
+2
knotserver/server.go
+11
-10
spindle/stream.go
+11
-10
spindle/stream.go
···
52
52
}
53
53
}()
54
54
55
-
defaultCursor := time.Now().UnixNano()
55
+
var cursor int64
56
56
cursorStr := r.URL.Query().Get("cursor")
57
-
cursor, err := strconv.ParseInt(cursorStr, 10, 64)
58
-
if err != nil {
59
-
l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
60
-
}
61
-
if cursor == 0 {
62
-
cursor = defaultCursor
57
+
if cursorStr != "" {
58
+
cursor, err = strconv.ParseInt(cursorStr, 10, 64)
59
+
if err != nil {
60
+
l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr)
61
+
cursor = 0
62
+
}
63
63
}
64
64
65
65
// complete backfill first before going to live data
···
239
239
}
240
240
241
241
jsonMsg, err := json.Marshal(map[string]any{
242
-
"rkey": event.Rkey,
243
-
"nsid": event.Nsid,
244
-
"event": eventJson,
242
+
"rkey": event.Rkey,
243
+
"nsid": event.Nsid,
244
+
"event": eventJson,
245
+
"created": event.Created,
245
246
})
246
247
if err != nil {
247
248
s.l.Error("failed to marshal record", "err", err)
History
12 rounds
1 comment
oyster.cafe
submitted
#11
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
merge conflicts detected
expand
collapse
expand
collapse
- go.mod:34
- go.sum:339
expand 0 comments
oyster.cafe
submitted
#10
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#9
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 1 comment
oyster.cafe
submitted
#8
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#7
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#6
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#5
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#4
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#3
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#2
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
2/3 failed, 1/3 success
expand
collapse
expand 0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
appview/state/router.go:65can you absorb this?knotserver/server.go:91I remember you have tested this against knot1's ~4k repos. Is it ok to run on background without progress indicator?