Signed-off-by: Seongmin Lee git@boltless.me
+404
Diff
round #1
+22
appview/db/db.go
+22
appview/db/db.go
···
1409
1409
return err
1410
1410
})
1411
1411
1412
+
orm.RunMigration(conn, logger, "add-pds-migration", func(tx *sql.Tx) error {
1413
+
_, err := tx.Exec(`
1414
+
create table if not exists pds_migration (
1415
+
name text not null,
1416
+
1417
+
-- record at_uri
1418
+
did text not null,
1419
+
collection text not null,
1420
+
rkey text not null,
1421
+
1422
+
status text not null default 'pending',
1423
+
error_msg text,
1424
+
retry_count integer not null default 0,
1425
+
retry_after integer not null default 0,
1426
+
updated_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
1427
+
1428
+
unique(name, did, collection, rkey)
1429
+
);
1430
+
`)
1431
+
return err
1432
+
})
1433
+
1412
1434
return &DB{
1413
1435
db,
1414
1436
logger,
+89
appview/db/migration.go
+89
appview/db/migration.go
···
1
+
package db
2
+
3
+
import (
4
+
"context"
5
+
"time"
6
+
7
+
"github.com/bluesky-social/indigo/atproto/syntax"
8
+
"tangled.org/core/appview/models"
9
+
)
10
+
11
+
// "migration" for records stored in user's PDS, not AppView DB
12
+
13
+
// ListPendingPdsRecordMigrations queries list of pending PDS migrations for given user.
14
+
// Only pending migrations whose `retry_after` has elapsed are returned.
15
+
func ListPendingPdsRecordMigrations(ctx context.Context, e Execer, user syntax.DID) ([]*models.PDSMigration, error) {
16
+
rows, err := e.QueryContext(ctx,
17
+
`with picked as (
18
+
select rowid
19
+
from pds_migration
20
+
where did = ?
21
+
and status = 'pending'
22
+
and retry_after < ?
23
+
)
24
+
update pds_migration
25
+
set status = ?
26
+
where rowid in (select rowid from picked)
27
+
returning name, did, collection, rkey, status, error_msg, retry_count, retry_after`,
28
+
user,
29
+
time.Now().Unix(),
30
+
models.PDSMigrationStatusRunning,
31
+
)
32
+
if err != nil {
33
+
return nil, err
34
+
}
35
+
defer rows.Close()
36
+
37
+
var migrations []*models.PDSMigration
38
+
for rows.Next() {
39
+
var migration models.PDSMigration
40
+
if err := rows.Scan(
41
+
&migration.Name,
42
+
&migration.Did,
43
+
&migration.Collection,
44
+
&migration.Rkey,
45
+
&migration.Status,
46
+
&migration.ErrorMsg,
47
+
&migration.RetryCount,
48
+
&migration.RetryAfter,
49
+
); err != nil {
50
+
return nil, err
51
+
}
52
+
migrations = append(migrations, &migration)
53
+
}
54
+
if err := rows.Err(); err != nil {
55
+
return nil, err
56
+
}
57
+
58
+
return migrations, nil
59
+
}
60
+
61
+
func EnqueuePdsRecordMigration(ctx context.Context, e Execer, name string, did syntax.DID, collection syntax.NSID, rkey syntax.RecordKey) error {
62
+
_, err := e.ExecContext(ctx,
63
+
`insert into pds_migration (name, did, collection, rkey)
64
+
values (?, ?, ?, ?)`,
65
+
name, did, collection, rkey,
66
+
)
67
+
return err
68
+
}
69
+
70
+
func UpdatePdsRecordMigration(ctx context.Context, e Execer, migration *models.PDSMigration) error {
71
+
_, err := e.ExecContext(ctx,
72
+
`update pds_migration
73
+
set status = ?,
74
+
error_msg = ?,
75
+
retry_count = ?,
76
+
retry_after = ?,
77
+
updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
78
+
where name = ? and did = ? and collection = ? and rkey = ?`,
79
+
migration.Status,
80
+
migration.ErrorMsg,
81
+
migration.RetryCount,
82
+
migration.RetryAfter,
83
+
migration.Name,
84
+
migration.Did,
85
+
migration.Collection,
86
+
migration.Rkey,
87
+
)
88
+
return err
89
+
}
+151
appview/migration/migrate_add_repo_did.go
+151
appview/migration/migrate_add_repo_did.go
···
1
+
package migration
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"strings"
7
+
8
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
9
+
"github.com/bluesky-social/indigo/atproto/atclient"
10
+
"github.com/bluesky-social/indigo/atproto/syntax"
11
+
lexutil "github.com/bluesky-social/indigo/lex/util"
12
+
"tangled.org/core/api/tangled"
13
+
"tangled.org/core/appview/db"
14
+
)
15
+
16
+
func (s *Migration) migrateAddRepoDid(ctx context.Context, client *atclient.APIClient, did syntax.DID, record syntax.ATURI) error {
17
+
// TODO: use agnostic.RepoGetRecord instead
18
+
ex, err := comatproto.RepoGetRecord(ctx, client, "", record.Collection().String(), did.String(), record.RecordKey().String())
19
+
if err != nil {
20
+
return fmt.Errorf("pds: %w", err)
21
+
}
22
+
23
+
val := ex.Value.Val
24
+
25
+
switch record.Collection() {
26
+
case tangled.RepoNSID:
27
+
rec, ok := val.(*tangled.Repo)
28
+
if !ok {
29
+
return fmt.Errorf("unexpected type for repo record")
30
+
}
31
+
repo, err := db.GetRepoByAtUri(s.db, record.String())
32
+
if err != nil {
33
+
return fmt.Errorf("db: failed to query repo: %w", err)
34
+
}
35
+
rec.RepoDid = &repo.RepoDid
36
+
37
+
case tangled.RepoIssueNSID:
38
+
rec, ok := val.(*tangled.RepoIssue)
39
+
if !ok {
40
+
return fmt.Errorf("unexpected type for issue record")
41
+
}
42
+
if rec.Repo != nil {
43
+
repoAt := *rec.Repo
44
+
repo, err := db.GetRepoByAtUri(s.db, repoAt)
45
+
if err != nil {
46
+
return fmt.Errorf("db: failed to query repo: %w", err)
47
+
}
48
+
rec.RepoDid = &repo.RepoDid
49
+
}
50
+
51
+
case tangled.RepoPullNSID:
52
+
rec, ok := val.(*tangled.RepoPull)
53
+
if !ok {
54
+
return fmt.Errorf("unexpected type for pull record")
55
+
}
56
+
if rec.Target != nil && rec.Target.Repo != nil {
57
+
repoAt := *rec.Target.Repo
58
+
repo, err := db.GetRepoByAtUri(s.db, repoAt)
59
+
if err != nil {
60
+
return fmt.Errorf("db: failed to query repo: %w", err)
61
+
}
62
+
rec.Target.RepoDid = &repo.RepoDid
63
+
}
64
+
if rec.Source != nil && rec.Source.Repo != nil {
65
+
repoAt := *rec.Source.Repo
66
+
repo, err := db.GetRepoByAtUri(s.db, repoAt)
67
+
if err != nil {
68
+
return fmt.Errorf("db: failed to query repo: %w", err)
69
+
}
70
+
rec.Source.RepoDid = &repo.RepoDid
71
+
}
72
+
73
+
case tangled.RepoCollaboratorNSID:
74
+
rec, ok := val.(*tangled.RepoCollaborator)
75
+
if !ok {
76
+
return fmt.Errorf("unexpected type for collaborator record")
77
+
}
78
+
if rec.Repo != nil {
79
+
repoAt := *rec.Repo
80
+
repo, err := db.GetRepoByAtUri(s.db, repoAt)
81
+
if err != nil {
82
+
return fmt.Errorf("db: failed to query repo: %w", err)
83
+
}
84
+
rec.RepoDid = &repo.RepoDid
85
+
}
86
+
87
+
case tangled.RepoArtifactNSID:
88
+
rec, ok := val.(*tangled.RepoArtifact)
89
+
if !ok {
90
+
return fmt.Errorf("unexpected type for artifact record")
91
+
}
92
+
if rec.Repo != nil {
93
+
repoAt := *rec.Repo
94
+
repo, err := db.GetRepoByAtUri(s.db, repoAt)
95
+
if err != nil {
96
+
return fmt.Errorf("db: failed to query repo: %w", err)
97
+
}
98
+
rec.RepoDid = &repo.RepoDid
99
+
}
100
+
101
+
case tangled.FeedStarNSID:
102
+
rec, ok := val.(*tangled.FeedStar)
103
+
if !ok {
104
+
return fmt.Errorf("unexpected type for star record")
105
+
}
106
+
if rec.Subject != nil {
107
+
repoAt := *rec.Subject
108
+
repo, err := db.GetRepoByAtUri(s.db, repoAt)
109
+
if err != nil {
110
+
return fmt.Errorf("db: failed to query repo: %w", err)
111
+
}
112
+
rec.SubjectDid = &repo.RepoDid
113
+
}
114
+
115
+
case tangled.ActorProfileNSID:
116
+
rec, ok := val.(*tangled.ActorProfile)
117
+
if !ok {
118
+
return fmt.Errorf("unexpected type for profile record")
119
+
}
120
+
rewritten := make([]string, 0, len(rec.PinnedRepositories))
121
+
for _, pin := range rec.PinnedRepositories {
122
+
if strings.HasPrefix(pin, "did:") {
123
+
rewritten = append(rewritten, pin)
124
+
continue
125
+
}
126
+
repo, repoErr := db.GetRepoByAtUri(s.db, pin)
127
+
if repoErr != nil || repo.RepoDid == "" {
128
+
rewritten = append(rewritten, pin)
129
+
continue
130
+
}
131
+
rewritten = append(rewritten, repo.RepoDid)
132
+
}
133
+
rec.PinnedRepositories = rewritten
134
+
135
+
default:
136
+
return fmt.Errorf("unexpected collection: '%s'", record.Collection())
137
+
}
138
+
139
+
_, err = comatproto.RepoPutRecord(ctx, client, &comatproto.RepoPutRecord_Input{
140
+
Repo: did.String(),
141
+
Collection: record.Collection().String(),
142
+
Rkey: record.RecordKey().String(),
143
+
SwapRecord: ex.Cid,
144
+
Record: &lexutil.LexiconTypeDecoder{Val: val},
145
+
})
146
+
if err != nil {
147
+
return fmt.Errorf("put record: %w", err)
148
+
}
149
+
150
+
return nil
151
+
}
+100
appview/migration/migration.go
+100
appview/migration/migration.go
···
1
+
package migration
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"log/slog"
7
+
"net/http"
8
+
"strings"
9
+
"time"
10
+
11
+
"github.com/bluesky-social/indigo/atproto/atclient"
12
+
"github.com/bluesky-social/indigo/atproto/identity"
13
+
"github.com/bluesky-social/indigo/atproto/syntax"
14
+
15
+
"tangled.org/core/appview/db"
16
+
"tangled.org/core/appview/models"
17
+
"tangled.org/core/appview/oauth"
18
+
)
19
+
20
+
type Migration struct {
21
+
db *db.DB
22
+
oauth *oauth.OAuth
23
+
dir identity.Directory
24
+
logger *slog.Logger
25
+
}
26
+
27
+
func NewMigration(db *db.DB, oauth *oauth.OAuth, dir identity.Directory, logger *slog.Logger) *Migration {
28
+
return &Migration{
29
+
db, oauth, dir, logger,
30
+
}
31
+
}
32
+
33
+
func (s *Migration) BackgroundMigrationMiddleware(next http.Handler) http.Handler {
34
+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
35
+
defer next.ServeHTTP(w, r)
36
+
37
+
client, err := s.oauth.AuthorizedClient(r)
38
+
if err != nil {
39
+
return
40
+
}
41
+
if client.AccountDID == nil {
42
+
return
43
+
}
44
+
45
+
go s.runPendingMigrations(context.Background(), *client.AccountDID, client)
46
+
})
47
+
}
48
+
49
+
func (s *Migration) runPendingMigrations(ctx context.Context, did syntax.DID, client *atclient.APIClient) {
50
+
l := s.logger.With("did", did)
51
+
migrations, err := db.ListPendingPdsRecordMigrations(ctx, s.db, did)
52
+
if err != nil {
53
+
l.Error("failed to query pending migrations", "err", err)
54
+
return
55
+
}
56
+
57
+
for _, migration := range migrations {
58
+
if err := s.migrate(ctx, client, migration); err != nil {
59
+
l.Error("migration failed", "err", err)
60
+
}
61
+
}
62
+
}
63
+
64
+
func (s *Migration) migrate(ctx context.Context, client *atclient.APIClient, migration *models.PDSMigration) error {
65
+
l := s.logger.With(
66
+
"name", migration.Name,
67
+
"aturi", migration.RecordAtUri(),
68
+
)
69
+
70
+
var err error
71
+
switch migration.Name {
72
+
case "add-repo-did":
73
+
err = s.migrateAddRepoDid(ctx, client, migration.Did, migration.RecordAtUri())
74
+
default:
75
+
return fmt.Errorf("unexpected migration name %s", migration.Name)
76
+
}
77
+
78
+
if err == nil {
79
+
l.Info("migrated")
80
+
migration.Status = models.PDSMigrationStatusDone
81
+
} else {
82
+
l.Warn("failed to migrate", "err", err)
83
+
84
+
errMsg := err.Error()
85
+
var retryCount = migration.RetryCount + 1
86
+
var retryAfter = time.Now().Add(3 * time.Second).Unix()
87
+
88
+
// remove null bytes
89
+
errMsg = strings.ReplaceAll(errMsg, "\x00", "")
90
+
91
+
migration.Status = models.PDSMigrationStatusPending
92
+
migration.ErrorMsg = &errMsg
93
+
migration.RetryCount = retryCount
94
+
migration.RetryAfter = retryAfter
95
+
}
96
+
if err := db.UpdatePdsRecordMigration(ctx, s.db, migration); err != nil {
97
+
return fmt.Errorf("failed to update migration status: %w", err)
98
+
}
99
+
return nil
100
+
}
+37
appview/models/migration.go
+37
appview/models/migration.go
···
1
+
package models
2
+
3
+
import (
4
+
"fmt"
5
+
6
+
"github.com/bluesky-social/indigo/atproto/syntax"
7
+
)
8
+
9
+
type PDSRecordMigration struct {
10
+
Did syntax.DID
11
+
Name string // name of the migration
12
+
Records []syntax.ATURI // records that need a migration
13
+
ErrorMsg *string // error message from previous attempt
14
+
}
15
+
16
+
type PDSMigration struct {
17
+
Name string // name of the migration
18
+
Did syntax.DID // record owner
19
+
Collection syntax.NSID // record collection
20
+
Rkey syntax.RecordKey // record rkey
21
+
Status PDSMigrationStatus
22
+
ErrorMsg *string // error message from previous attempt
23
+
RetryCount int
24
+
RetryAfter int64 // Unix timestamp (seconds)
25
+
}
26
+
27
+
type PDSMigrationStatus string
28
+
29
+
const (
30
+
PDSMigrationStatusPending PDSMigrationStatus = "pending"
31
+
PDSMigrationStatusRunning PDSMigrationStatus = "running"
32
+
PDSMigrationStatusDone PDSMigrationStatus = "done"
33
+
)
34
+
35
+
func (m *PDSMigration) RecordAtUri() syntax.ATURI {
36
+
return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", m.Did, m.Collection, m.Rkey))
37
+
}
+5
appview/state/router.go
+5
appview/state/router.go
···
12
12
"tangled.org/core/appview/knots"
13
13
"tangled.org/core/appview/labels"
14
14
"tangled.org/core/appview/middleware"
15
+
"tangled.org/core/appview/migration"
15
16
"tangled.org/core/appview/notifications"
16
17
"tangled.org/core/appview/pipelines"
17
18
"tangled.org/core/appview/pulls"
···
36
37
s.logger,
37
38
)
38
39
40
+
// TODO(boltless): merge this into BackgroundMigrationMiddleware
39
41
router.Use(s.oauth.PdsRewriteMiddleware)
40
42
43
+
m := migration.NewMigration(s.db, s.oauth, s.idResolver.Directory(), s.logger)
44
+
router.Use(m.BackgroundMigrationMiddleware)
45
+
41
46
router.Get("/pwa-manifest.json", s.WebAppManifest)
42
47
router.Get("/robots.txt", s.RobotsTxt)
43
48
router.Get("/.well-known/security.txt", s.SecurityTxt)
History
2 rounds
0 comments
boltless.me
submitted
#1
1 commit
expand
collapse
appview: background pds data migration
Signed-off-by: Seongmin Lee <git@boltless.me>
3/3 failed
expand
collapse
no conflicts, ready to merge
expand 0 comments
boltless.me
submitted
#0
1 commit
expand
collapse
appview: background pds data migration
Signed-off-by: Seongmin Lee <git@boltless.me>