Monorepo for Tangled tangled.org

appview: background pds data migration #1309

open opened by boltless.me targeting master from sl/mnznnmqwysmz
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mjak3un7jc22
+404
Diff #1
+22
appview/db/db.go
··· 1409 1409 return err 1410 1410 }) 1411 1411 1412 + orm.RunMigration(conn, logger, "add-pds-migration", func(tx *sql.Tx) error { 1413 + _, err := tx.Exec(` 1414 + create table if not exists pds_migration ( 1415 + name text not null, 1416 + 1417 + -- record at_uri 1418 + did text not null, 1419 + collection text not null, 1420 + rkey text not null, 1421 + 1422 + status text not null default 'pending', 1423 + error_msg text, 1424 + retry_count integer not null default 0, 1425 + retry_after integer not null default 0, 1426 + updated_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 1427 + 1428 + unique(name, did, collection, rkey) 1429 + ); 1430 + `) 1431 + return err 1432 + }) 1433 + 1412 1434 return &DB{ 1413 1435 db, 1414 1436 logger,
+89
appview/db/migration.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + "tangled.org/core/appview/models" 9 + ) 10 + 11 + // "migration" for records stored in user's PDS, not AppView DB 12 + 13 + // ListPendingPdsRecordMigrations queries list of pending PDS migrations for given user. 14 + // Only pending migrations whose `retry_after` has elapsed are returned. 15 + func ListPendingPdsRecordMigrations(ctx context.Context, e Execer, user syntax.DID) ([]*models.PDSMigration, error) { 16 + rows, err := e.QueryContext(ctx, 17 + `with picked as ( 18 + select rowid 19 + from pds_migration 20 + where did = ? 21 + and status = 'pending' 22 + and retry_after < ? 23 + ) 24 + update pds_migration 25 + set status = ? 26 + where rowid in (select rowid from picked) 27 + returning name, did, collection, rkey, status, error_msg, retry_count, retry_after`, 28 + user, 29 + time.Now().Unix(), 30 + models.PDSMigrationStatusRunning, 31 + ) 32 + if err != nil { 33 + return nil, err 34 + } 35 + defer rows.Close() 36 + 37 + var migrations []*models.PDSMigration 38 + for rows.Next() { 39 + var migration models.PDSMigration 40 + if err := rows.Scan( 41 + &migration.Name, 42 + &migration.Did, 43 + &migration.Collection, 44 + &migration.Rkey, 45 + &migration.Status, 46 + &migration.ErrorMsg, 47 + &migration.RetryCount, 48 + &migration.RetryAfter, 49 + ); err != nil { 50 + return nil, err 51 + } 52 + migrations = append(migrations, &migration) 53 + } 54 + if err := rows.Err(); err != nil { 55 + return nil, err 56 + } 57 + 58 + return migrations, nil 59 + } 60 + 61 + func EnqueuePdsRecordMigration(ctx context.Context, e Execer, name string, did syntax.DID, collection syntax.NSID, rkey syntax.RecordKey) error { 62 + _, err := e.ExecContext(ctx, 63 + `insert into pds_migration (name, did, collection, rkey) 64 + values (?, ?, ?, ?)`, 65 + name, did, collection, rkey, 66 + ) 67 + return err 68 + } 69 + 70 + func UpdatePdsRecordMigration(ctx context.Context, e Execer, migration *models.PDSMigration) error { 71 + _, err := e.ExecContext(ctx, 72 + `update pds_migration 73 + set status = ?, 74 + error_msg = ?, 75 + retry_count = ?, 76 + retry_after = ?, 77 + updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 78 + where name = ? and did = ? and collection = ? and rkey = ?`, 79 + migration.Status, 80 + migration.ErrorMsg, 81 + migration.RetryCount, 82 + migration.RetryAfter, 83 + migration.Name, 84 + migration.Did, 85 + migration.Collection, 86 + migration.Rkey, 87 + ) 88 + return err 89 + }
+151
appview/migration/migrate_add_repo_did.go
··· 1 + package migration 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + 8 + comatproto "github.com/bluesky-social/indigo/api/atproto" 9 + "github.com/bluesky-social/indigo/atproto/atclient" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + lexutil "github.com/bluesky-social/indigo/lex/util" 12 + "tangled.org/core/api/tangled" 13 + "tangled.org/core/appview/db" 14 + ) 15 + 16 + func (s *Migration) migrateAddRepoDid(ctx context.Context, client *atclient.APIClient, did syntax.DID, record syntax.ATURI) error { 17 + // TODO: use agnostic.RepoGetRecord instead 18 + ex, err := comatproto.RepoGetRecord(ctx, client, "", record.Collection().String(), did.String(), record.RecordKey().String()) 19 + if err != nil { 20 + return fmt.Errorf("pds: %w", err) 21 + } 22 + 23 + val := ex.Value.Val 24 + 25 + switch record.Collection() { 26 + case tangled.RepoNSID: 27 + rec, ok := val.(*tangled.Repo) 28 + if !ok { 29 + return fmt.Errorf("unexpected type for repo record") 30 + } 31 + repo, err := db.GetRepoByAtUri(s.db, record.String()) 32 + if err != nil { 33 + return fmt.Errorf("db: failed to query repo: %w", err) 34 + } 35 + rec.RepoDid = &repo.RepoDid 36 + 37 + case tangled.RepoIssueNSID: 38 + rec, ok := val.(*tangled.RepoIssue) 39 + if !ok { 40 + return fmt.Errorf("unexpected type for issue record") 41 + } 42 + if rec.Repo != nil { 43 + repoAt := *rec.Repo 44 + repo, err := db.GetRepoByAtUri(s.db, repoAt) 45 + if err != nil { 46 + return fmt.Errorf("db: failed to query repo: %w", err) 47 + } 48 + rec.RepoDid = &repo.RepoDid 49 + } 50 + 51 + case tangled.RepoPullNSID: 52 + rec, ok := val.(*tangled.RepoPull) 53 + if !ok { 54 + return fmt.Errorf("unexpected type for pull record") 55 + } 56 + if rec.Target != nil && rec.Target.Repo != nil { 57 + repoAt := *rec.Target.Repo 58 + repo, err := db.GetRepoByAtUri(s.db, repoAt) 59 + if err != nil { 60 + return fmt.Errorf("db: failed to query repo: %w", err) 61 + } 62 + rec.Target.RepoDid = &repo.RepoDid 63 + } 64 + if rec.Source != nil && rec.Source.Repo != nil { 65 + repoAt := *rec.Source.Repo 66 + repo, err := db.GetRepoByAtUri(s.db, repoAt) 67 + if err != nil { 68 + return fmt.Errorf("db: failed to query repo: %w", err) 69 + } 70 + rec.Source.RepoDid = &repo.RepoDid 71 + } 72 + 73 + case tangled.RepoCollaboratorNSID: 74 + rec, ok := val.(*tangled.RepoCollaborator) 75 + if !ok { 76 + return fmt.Errorf("unexpected type for collaborator record") 77 + } 78 + if rec.Repo != nil { 79 + repoAt := *rec.Repo 80 + repo, err := db.GetRepoByAtUri(s.db, repoAt) 81 + if err != nil { 82 + return fmt.Errorf("db: failed to query repo: %w", err) 83 + } 84 + rec.RepoDid = &repo.RepoDid 85 + } 86 + 87 + case tangled.RepoArtifactNSID: 88 + rec, ok := val.(*tangled.RepoArtifact) 89 + if !ok { 90 + return fmt.Errorf("unexpected type for artifact record") 91 + } 92 + if rec.Repo != nil { 93 + repoAt := *rec.Repo 94 + repo, err := db.GetRepoByAtUri(s.db, repoAt) 95 + if err != nil { 96 + return fmt.Errorf("db: failed to query repo: %w", err) 97 + } 98 + rec.RepoDid = &repo.RepoDid 99 + } 100 + 101 + case tangled.FeedStarNSID: 102 + rec, ok := val.(*tangled.FeedStar) 103 + if !ok { 104 + return fmt.Errorf("unexpected type for star record") 105 + } 106 + if rec.Subject != nil { 107 + repoAt := *rec.Subject 108 + repo, err := db.GetRepoByAtUri(s.db, repoAt) 109 + if err != nil { 110 + return fmt.Errorf("db: failed to query repo: %w", err) 111 + } 112 + rec.SubjectDid = &repo.RepoDid 113 + } 114 + 115 + case tangled.ActorProfileNSID: 116 + rec, ok := val.(*tangled.ActorProfile) 117 + if !ok { 118 + return fmt.Errorf("unexpected type for profile record") 119 + } 120 + rewritten := make([]string, 0, len(rec.PinnedRepositories)) 121 + for _, pin := range rec.PinnedRepositories { 122 + if strings.HasPrefix(pin, "did:") { 123 + rewritten = append(rewritten, pin) 124 + continue 125 + } 126 + repo, repoErr := db.GetRepoByAtUri(s.db, pin) 127 + if repoErr != nil || repo.RepoDid == "" { 128 + rewritten = append(rewritten, pin) 129 + continue 130 + } 131 + rewritten = append(rewritten, repo.RepoDid) 132 + } 133 + rec.PinnedRepositories = rewritten 134 + 135 + default: 136 + return fmt.Errorf("unexpected collection: '%s'", record.Collection()) 137 + } 138 + 139 + _, err = comatproto.RepoPutRecord(ctx, client, &comatproto.RepoPutRecord_Input{ 140 + Repo: did.String(), 141 + Collection: record.Collection().String(), 142 + Rkey: record.RecordKey().String(), 143 + SwapRecord: ex.Cid, 144 + Record: &lexutil.LexiconTypeDecoder{Val: val}, 145 + }) 146 + if err != nil { 147 + return fmt.Errorf("put record: %w", err) 148 + } 149 + 150 + return nil 151 + }
+100
appview/migration/migration.go
··· 1 + package migration 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "net/http" 8 + "strings" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/atproto/atclient" 12 + "github.com/bluesky-social/indigo/atproto/identity" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + 15 + "tangled.org/core/appview/db" 16 + "tangled.org/core/appview/models" 17 + "tangled.org/core/appview/oauth" 18 + ) 19 + 20 + type Migration struct { 21 + db *db.DB 22 + oauth *oauth.OAuth 23 + dir identity.Directory 24 + logger *slog.Logger 25 + } 26 + 27 + func NewMigration(db *db.DB, oauth *oauth.OAuth, dir identity.Directory, logger *slog.Logger) *Migration { 28 + return &Migration{ 29 + db, oauth, dir, logger, 30 + } 31 + } 32 + 33 + func (s *Migration) BackgroundMigrationMiddleware(next http.Handler) http.Handler { 34 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 35 + defer next.ServeHTTP(w, r) 36 + 37 + client, err := s.oauth.AuthorizedClient(r) 38 + if err != nil { 39 + return 40 + } 41 + if client.AccountDID == nil { 42 + return 43 + } 44 + 45 + go s.runPendingMigrations(context.Background(), *client.AccountDID, client) 46 + }) 47 + } 48 + 49 + func (s *Migration) runPendingMigrations(ctx context.Context, did syntax.DID, client *atclient.APIClient) { 50 + l := s.logger.With("did", did) 51 + migrations, err := db.ListPendingPdsRecordMigrations(ctx, s.db, did) 52 + if err != nil { 53 + l.Error("failed to query pending migrations", "err", err) 54 + return 55 + } 56 + 57 + for _, migration := range migrations { 58 + if err := s.migrate(ctx, client, migration); err != nil { 59 + l.Error("migration failed", "err", err) 60 + } 61 + } 62 + } 63 + 64 + func (s *Migration) migrate(ctx context.Context, client *atclient.APIClient, migration *models.PDSMigration) error { 65 + l := s.logger.With( 66 + "name", migration.Name, 67 + "aturi", migration.RecordAtUri(), 68 + ) 69 + 70 + var err error 71 + switch migration.Name { 72 + case "add-repo-did": 73 + err = s.migrateAddRepoDid(ctx, client, migration.Did, migration.RecordAtUri()) 74 + default: 75 + return fmt.Errorf("unexpected migration name %s", migration.Name) 76 + } 77 + 78 + if err == nil { 79 + l.Info("migrated") 80 + migration.Status = models.PDSMigrationStatusDone 81 + } else { 82 + l.Warn("failed to migrate", "err", err) 83 + 84 + errMsg := err.Error() 85 + var retryCount = migration.RetryCount + 1 86 + var retryAfter = time.Now().Add(3 * time.Second).Unix() 87 + 88 + // remove null bytes 89 + errMsg = strings.ReplaceAll(errMsg, "\x00", "") 90 + 91 + migration.Status = models.PDSMigrationStatusPending 92 + migration.ErrorMsg = &errMsg 93 + migration.RetryCount = retryCount 94 + migration.RetryAfter = retryAfter 95 + } 96 + if err := db.UpdatePdsRecordMigration(ctx, s.db, migration); err != nil { 97 + return fmt.Errorf("failed to update migration status: %w", err) 98 + } 99 + return nil 100 + }
+37
appview/models/migration.go
··· 1 + package models 2 + 3 + import ( 4 + "fmt" 5 + 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + ) 8 + 9 + type PDSRecordMigration struct { 10 + Did syntax.DID 11 + Name string // name of the migration 12 + Records []syntax.ATURI // records that need a migration 13 + ErrorMsg *string // error message from previous attempt 14 + } 15 + 16 + type PDSMigration struct { 17 + Name string // name of the migration 18 + Did syntax.DID // record owner 19 + Collection syntax.NSID // record collection 20 + Rkey syntax.RecordKey // record rkey 21 + Status PDSMigrationStatus 22 + ErrorMsg *string // error message from previous attempt 23 + RetryCount int 24 + RetryAfter int64 // Unix timestamp (seconds) 25 + } 26 + 27 + type PDSMigrationStatus string 28 + 29 + const ( 30 + PDSMigrationStatusPending PDSMigrationStatus = "pending" 31 + PDSMigrationStatusRunning PDSMigrationStatus = "running" 32 + PDSMigrationStatusDone PDSMigrationStatus = "done" 33 + ) 34 + 35 + func (m *PDSMigration) RecordAtUri() syntax.ATURI { 36 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", m.Did, m.Collection, m.Rkey)) 37 + }
+5
appview/state/router.go
··· 12 12 "tangled.org/core/appview/knots" 13 13 "tangled.org/core/appview/labels" 14 14 "tangled.org/core/appview/middleware" 15 + "tangled.org/core/appview/migration" 15 16 "tangled.org/core/appview/notifications" 16 17 "tangled.org/core/appview/pipelines" 17 18 "tangled.org/core/appview/pulls" ··· 36 37 s.logger, 37 38 ) 38 39 40 + // TODO(boltless): merge this into BackgroundMigrationMiddleware 39 41 router.Use(s.oauth.PdsRewriteMiddleware) 40 42 43 + m := migration.NewMigration(s.db, s.oauth, s.idResolver.Directory(), s.logger) 44 + router.Use(m.BackgroundMigrationMiddleware) 45 + 41 46 router.Get("/pwa-manifest.json", s.WebAppManifest) 42 47 router.Get("/robots.txt", s.RobotsTxt) 43 48 router.Get("/.well-known/security.txt", s.SecurityTxt)

History

2 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
appview: background pds data migration
3/3 failed
expand
no conflicts, ready to merge
expand 0 comments
1 commit
expand
appview: background pds data migration
3/3 failed
expand
expand 0 comments