Signed-off-by: Lewis lewis@tangled.org
+302
-22
Diff
round #11
+1
appview/db/db.go
+1
appview/db/db.go
+8
-5
eventconsumer/consumer.go
+8
-5
eventconsumer/consumer.go
···
19
19
type ProcessFunc func(ctx context.Context, source Source, message Message) error
20
20
21
21
type Message struct {
22
-
Rkey string
23
-
Nsid string
24
-
// do not full deserialize this portion of the message, processFunc can do that
22
+
Rkey string
23
+
Nsid string
24
+
Created int64 `json:"created"`
25
25
EventJson json.RawMessage `json:"event"`
26
26
}
27
27
···
159
159
return
160
160
}
161
161
162
-
// update cursor
163
-
c.cfg.CursorStore.Set(j.source.Key(), time.Now().UnixNano())
162
+
cursorVal := msg.Created
163
+
if cursorVal == 0 {
164
+
cursorVal = time.Now().UnixNano()
165
+
}
166
+
c.cfg.CursorStore.Set(j.source.Key(), cursorVal)
164
167
165
168
if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil {
166
169
c.logger.Error("error processing message", "source", j.source, "err", err)
+1
knotserver/db/db.go
+1
knotserver/db/db.go
+7
-7
knotserver/events.go
+7
-7
knotserver/events.go
···
44
44
}
45
45
}()
46
46
47
-
defaultCursor := time.Now().UnixNano()
47
+
var cursor int64
48
48
cursorStr := r.URL.Query().Get("cursor")
49
-
cursor, err := strconv.ParseInt(cursorStr, 10, 64)
50
-
if err != nil {
51
-
l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
52
-
}
53
-
if cursor == 0 {
54
-
cursor = defaultCursor
49
+
if cursorStr != "" {
50
+
cursor, err = strconv.ParseInt(cursorStr, 10, 64)
51
+
if err != nil {
52
+
l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr)
53
+
cursor = 0
54
+
}
55
55
}
56
56
57
57
l.Debug("going through backfill", "cursor", cursor)
+271
knotserver/migrate.go
+271
knotserver/migrate.go
···
1
+
package knotserver
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"errors"
7
+
"fmt"
8
+
"log/slog"
9
+
"math/rand/v2"
10
+
"os"
11
+
"path/filepath"
12
+
"strings"
13
+
"time"
14
+
15
+
"tangled.org/core/knotserver/config"
16
+
"tangled.org/core/knotserver/db"
17
+
"tangled.org/core/knotserver/repodid"
18
+
"tangled.org/core/notifier"
19
+
"tangled.org/core/rbac"
20
+
)
21
+
22
+
type legacyRepo struct {
23
+
ownerDid string
24
+
repoName string
25
+
oldPath string
26
+
}
27
+
28
+
func scanLegacyRepos(scanPath string, logger *slog.Logger) []legacyRepo {
29
+
topEntries, err := os.ReadDir(scanPath)
30
+
if err != nil {
31
+
logger.Error("reading scan path", "error", err)
32
+
return nil
33
+
}
34
+
35
+
var repos []legacyRepo
36
+
for _, entry := range topEntries {
37
+
if !entry.IsDir() || !strings.HasPrefix(entry.Name(), "did:") {
38
+
continue
39
+
}
40
+
41
+
ownerPath := filepath.Join(scanPath, entry.Name())
42
+
43
+
if _, headErr := os.Stat(filepath.Join(ownerPath, "HEAD")); headErr == nil {
44
+
continue
45
+
}
46
+
47
+
subEntries, readErr := os.ReadDir(ownerPath)
48
+
if readErr != nil {
49
+
logger.Error("reading owner dir", "ownerDir", ownerPath, "error", readErr)
50
+
continue
51
+
}
52
+
53
+
ownerDid := entry.Name()
54
+
for _, sub := range subEntries {
55
+
if !sub.IsDir() {
56
+
continue
57
+
}
58
+
subPath := filepath.Join(ownerPath, sub.Name())
59
+
if _, statErr := os.Stat(filepath.Join(subPath, "HEAD")); statErr != nil {
60
+
logger.Warn("skipping non-repo directory", "path", subPath)
61
+
continue
62
+
}
63
+
repos = append(repos, legacyRepo{
64
+
ownerDid: ownerDid,
65
+
repoName: sub.Name(),
66
+
oldPath: subPath,
67
+
})
68
+
}
69
+
}
70
+
return repos
71
+
}
72
+
73
+
func migrateReposOnStartup(ctx context.Context, c *config.Config, d *db.DB, e *rbac.Enforcer, n *notifier.Notifier, logger *slog.Logger) {
74
+
repos := scanLegacyRepos(c.Repo.ScanPath, logger)
75
+
if len(repos) == 0 {
76
+
logger.Info("no legacy repos found, migration complete")
77
+
return
78
+
}
79
+
80
+
logger.Info("starting legacy repo migration", "count", len(repos))
81
+
start := time.Now()
82
+
83
+
knotServiceUrl := "https://" + c.Server.Hostname
84
+
if c.Server.Dev {
85
+
knotServiceUrl = "http://" + c.Server.Hostname
86
+
}
87
+
88
+
migrated := 0
89
+
for _, repo := range repos {
90
+
select {
91
+
case <-ctx.Done():
92
+
logger.Info("migration interrupted by shutdown", "migrated", migrated, "remaining", len(repos)-migrated)
93
+
return
94
+
default:
95
+
}
96
+
97
+
err := migrateOneRepo(ctx, c, d, e, n, logger, repo, knotServiceUrl)
98
+
if err != nil {
99
+
logger.Error("migration failed for repo", "owner", repo.ownerDid, "repo", repo.repoName, "error", err)
100
+
continue
101
+
}
102
+
migrated++
103
+
}
104
+
105
+
logger.Info("legacy repo migration complete", "migrated", migrated, "total", len(repos), "duration", time.Since(start))
106
+
}
107
+
108
+
func migrateOneRepo(
109
+
ctx context.Context,
110
+
c *config.Config,
111
+
d *db.DB,
112
+
e *rbac.Enforcer,
113
+
n *notifier.Notifier,
114
+
logger *slog.Logger,
115
+
repo legacyRepo,
116
+
knotServiceUrl string,
117
+
) error {
118
+
l := logger.With("owner", repo.ownerDid, "repo", repo.repoName)
119
+
120
+
repoDid, err := d.GetRepoDid(repo.ownerDid, repo.repoName)
121
+
needsMint := errors.Is(err, sql.ErrNoRows)
122
+
if err != nil && !needsMint {
123
+
return fmt.Errorf("checking repo_keys: %w", err)
124
+
}
125
+
126
+
if needsMint {
127
+
repoDid, err = mintAndStoreRepoDID(ctx, c, d, l, repo, knotServiceUrl)
128
+
if err != nil {
129
+
return err
130
+
}
131
+
}
132
+
133
+
if err := rewriteRBACPolicies(e, repo.ownerDid, repo.repoName, repoDid, l); err != nil {
134
+
l.Error("RBAC rewrite failed (non-fatal)", "error", err)
135
+
}
136
+
137
+
newPath := filepath.Join(c.Repo.ScanPath, repoDid)
138
+
139
+
if err := moveRepoOnDisk(repo.oldPath, newPath, l); err != nil {
140
+
return err
141
+
}
142
+
143
+
ownerDir := filepath.Dir(repo.oldPath)
144
+
if err := os.Remove(ownerDir); err != nil && !errors.Is(err, os.ErrNotExist) {
145
+
l.Warn("could not remove empty owner dir", "path", ownerDir, "error", err)
146
+
}
147
+
148
+
if err := d.EmitDIDAssign(n, repo.ownerDid, repo.repoName, repoDid, ""); err != nil {
149
+
l.Error("emitting didAssign event failed (non-fatal)", "error", err)
150
+
}
151
+
152
+
l.Info("migrated repo", "repoDid", repoDid)
153
+
return nil
154
+
}
155
+
156
+
func mintAndStoreRepoDID(
157
+
ctx context.Context,
158
+
c *config.Config,
159
+
d *db.DB,
160
+
l *slog.Logger,
161
+
repo legacyRepo,
162
+
knotServiceUrl string,
163
+
) (string, error) {
164
+
prepared, err := repodid.PrepareRepoDID(c.Server.PlcUrl, knotServiceUrl)
165
+
if err != nil {
166
+
return "", fmt.Errorf("preparing DID: %w", err)
167
+
}
168
+
169
+
if err := submitWithBackoff(ctx, prepared, l); err != nil {
170
+
return "", fmt.Errorf("PLC submission: %w", err)
171
+
}
172
+
173
+
if err := d.StoreRepoKey(prepared.RepoDid, prepared.SigningKeyRaw, repo.ownerDid, repo.repoName, ""); err != nil {
174
+
return "", fmt.Errorf("storing repo key: %w", err)
175
+
}
176
+
177
+
return prepared.RepoDid, nil
178
+
}
179
+
180
+
func submitWithBackoff(ctx context.Context, prepared *repodid.PreparedDID, l *slog.Logger) error {
181
+
backoff := 2 * time.Second
182
+
const maxBackoff = 5 * time.Minute
183
+
const maxAttempts = 20
184
+
185
+
for attempt := range maxAttempts {
186
+
plcCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
187
+
err := prepared.Submit(plcCtx)
188
+
cancel()
189
+
190
+
if err == nil {
191
+
return nil
192
+
}
193
+
194
+
errMsg := err.Error()
195
+
retryable := strings.Contains(errMsg, "429") ||
196
+
strings.Contains(errMsg, "500") ||
197
+
strings.Contains(errMsg, "502") ||
198
+
strings.Contains(errMsg, "503") ||
199
+
strings.Contains(errMsg, "504")
200
+
201
+
if !retryable || attempt == maxAttempts-1 {
202
+
return err
203
+
}
204
+
205
+
jitter := time.Duration(rand.Int64N(int64(backoff / 4)))
206
+
delay := backoff + jitter
207
+
208
+
l.Info("PLC rate limited, backing off", "delay", delay, "attempt", attempt+1)
209
+
210
+
select {
211
+
case <-ctx.Done():
212
+
return ctx.Err()
213
+
case <-time.After(delay):
214
+
}
215
+
216
+
backoff = min(backoff*2, maxBackoff)
217
+
}
218
+
219
+
return fmt.Errorf("unreachable: exceeded max attempts")
220
+
}
221
+
222
+
func moveRepoOnDisk(oldPath, newPath string, l *slog.Logger) error {
223
+
if _, err := os.Stat(newPath); err == nil {
224
+
l.Info("new path already exists, skipping rename", "newPath", newPath)
225
+
return nil
226
+
}
227
+
228
+
if _, err := os.Stat(oldPath); errors.Is(err, os.ErrNotExist) {
229
+
return fmt.Errorf("old path %s does not exist and new path %s not found", oldPath, newPath)
230
+
}
231
+
232
+
if err := os.Rename(oldPath, newPath); err != nil {
233
+
return fmt.Errorf("renaming %s -> %s: %w", oldPath, newPath, err)
234
+
}
235
+
236
+
return nil
237
+
}
238
+
239
+
func rewriteRBACPolicies(e *rbac.Enforcer, ownerDid, repoName, repoDid string, l *slog.Logger) error {
240
+
oldResource := ownerDid + "/" + repoName
241
+
242
+
policies, err := e.E.GetFilteredPolicy(1, rbac.ThisServer, oldResource)
243
+
if err != nil {
244
+
return fmt.Errorf("getting old policies: %w", err)
245
+
}
246
+
247
+
var addPolicies [][]string
248
+
var removePolicies [][]string
249
+
for _, p := range policies {
250
+
removePolicies = append(removePolicies, p)
251
+
newPolicy := make([]string, len(p))
252
+
copy(newPolicy, p)
253
+
newPolicy[2] = repoDid
254
+
addPolicies = append(addPolicies, newPolicy)
255
+
}
256
+
257
+
if len(addPolicies) > 0 {
258
+
if _, addErr := e.E.AddPolicies(addPolicies); addErr != nil {
259
+
return fmt.Errorf("adding new policies: %w", addErr)
260
+
}
261
+
}
262
+
263
+
if len(removePolicies) > 0 {
264
+
if _, rmErr := e.E.RemovePolicies(removePolicies); rmErr != nil {
265
+
return fmt.Errorf("removing old policies: %w", rmErr)
266
+
}
267
+
}
268
+
269
+
l.Info("rewrote RBAC policies", "old", oldResource, "new", repoDid, "count", len(policies))
270
+
return nil
271
+
}
+2
knotserver/server.go
+2
knotserver/server.go
+1
spindle/db/db.go
+1
spindle/db/db.go
+11
-10
spindle/stream.go
+11
-10
spindle/stream.go
···
52
52
}
53
53
}()
54
54
55
-
defaultCursor := time.Now().UnixNano()
55
+
var cursor int64
56
56
cursorStr := r.URL.Query().Get("cursor")
57
-
cursor, err := strconv.ParseInt(cursorStr, 10, 64)
58
-
if err != nil {
59
-
l.Error("empty or invalid cursor", "invalidCursor", cursorStr, "default", defaultCursor)
60
-
}
61
-
if cursor == 0 {
62
-
cursor = defaultCursor
57
+
if cursorStr != "" {
58
+
cursor, err = strconv.ParseInt(cursorStr, 10, 64)
59
+
if err != nil {
60
+
l.Error("invalid cursor, starting from beginning", "invalidCursor", cursorStr)
61
+
cursor = 0
62
+
}
63
63
}
64
64
65
65
// complete backfill first before going to live data
···
239
239
}
240
240
241
241
jsonMsg, err := json.Marshal(map[string]any{
242
-
"rkey": event.Rkey,
243
-
"nsid": event.Nsid,
244
-
"event": eventJson,
242
+
"rkey": event.Rkey,
243
+
"nsid": event.Nsid,
244
+
"event": eventJson,
245
+
"created": event.Created,
245
246
})
246
247
if err != nil {
247
248
s.l.Error("failed to marshal record", "err", err)
History
12 rounds
1 comment
oyster.cafe
submitted
#11
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
merge conflicts detected
expand
collapse
expand
collapse
- go.mod:34
- go.sum:339
expand 0 comments
oyster.cafe
submitted
#10
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#9
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 1 comment
oyster.cafe
submitted
#8
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#7
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#6
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#5
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#4
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#3
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#2
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
2/3 failed, 1/3 success
expand
collapse
expand 0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
knotserver: add repo DID migration on startup
Signed-off-by: Lewis <lewis@tangled.org>
appview/state/router.go:65can you absorb this?knotserver/server.go:91I remember you have tested this against knot1's ~4k repos. Is it ok to run on background without progress indicator?