-215
pkg/appview/db/stats_migration.go
-215
pkg/appview/db/stats_migration.go
···
1
-
package db
2
-
3
-
import (
4
-
"bytes"
5
-
"context"
6
-
"database/sql"
7
-
"encoding/json"
8
-
"fmt"
9
-
"io"
10
-
"log/slog"
11
-
"net/http"
12
-
"time"
13
-
14
-
"atcr.io/pkg/atproto"
15
-
)
16
-
17
-
// MigrateStatsToHolds migrates existing repository_stats data to hold services.
18
-
// This is a one-time migration that runs on startup.
19
-
//
20
-
// The migration:
21
-
// 1. Checks if migration has already completed
22
-
// 2. Reads all repository_stats entries
23
-
// 3. For each entry, looks up the hold DID from manifests table
24
-
// 4. Calls the hold's setStats endpoint (no auth required - temporary migration endpoint)
25
-
// 5. Marks migration complete after all entries are processed
26
-
//
27
-
// If a hold is offline, the migration logs a warning and continues.
28
-
// The hold will receive real-time stats updates via Jetstream once online.
29
-
func MigrateStatsToHolds(ctx context.Context, db *sql.DB) error {
30
-
// Check if migration already done
31
-
var migrationDone bool
32
-
err := db.QueryRowContext(ctx, `
33
-
SELECT EXISTS(
34
-
SELECT 1 FROM schema_migrations WHERE version = 1000
35
-
)
36
-
`).Scan(&migrationDone)
37
-
38
-
// Table might not exist yet on fresh install
39
-
if err == sql.ErrNoRows {
40
-
migrationDone = false
41
-
} else if err != nil {
42
-
// Check if it's a "no such table" error (fresh install)
43
-
if err.Error() != "no such table: schema_migrations" {
44
-
return fmt.Errorf("failed to check migration status: %w", err)
45
-
}
46
-
migrationDone = false
47
-
}
48
-
49
-
if migrationDone {
50
-
slog.Debug("Stats migration already complete, skipping", "component", "migration")
51
-
return nil
52
-
}
53
-
54
-
slog.Info("Starting stats migration to holds", "component", "migration")
55
-
56
-
// Get all repository_stats entries
57
-
rows, err := db.QueryContext(ctx, `
58
-
SELECT did, repository, pull_count, last_pull, push_count, last_push
59
-
FROM repository_stats
60
-
WHERE pull_count > 0 OR push_count > 0
61
-
`)
62
-
if err != nil {
63
-
// Table might not exist on fresh install
64
-
if err.Error() == "no such table: repository_stats" {
65
-
slog.Info("No repository_stats table found, skipping migration", "component", "migration")
66
-
return markMigrationComplete(db)
67
-
}
68
-
return fmt.Errorf("failed to query repository_stats: %w", err)
69
-
}
70
-
defer rows.Close()
71
-
72
-
var stats []struct {
73
-
DID string
74
-
Repository string
75
-
PullCount int64
76
-
LastPull sql.NullString
77
-
PushCount int64
78
-
LastPush sql.NullString
79
-
}
80
-
81
-
for rows.Next() {
82
-
var stat struct {
83
-
DID string
84
-
Repository string
85
-
PullCount int64
86
-
LastPull sql.NullString
87
-
PushCount int64
88
-
LastPush sql.NullString
89
-
}
90
-
if err := rows.Scan(&stat.DID, &stat.Repository, &stat.PullCount, &stat.LastPull, &stat.PushCount, &stat.LastPush); err != nil {
91
-
return fmt.Errorf("failed to scan stat: %w", err)
92
-
}
93
-
stats = append(stats, stat)
94
-
}
95
-
96
-
if len(stats) == 0 {
97
-
slog.Info("No stats to migrate", "component", "migration")
98
-
return markMigrationComplete(db)
99
-
}
100
-
101
-
slog.Info("Found stats entries to migrate", "component", "migration", "count", len(stats))
102
-
103
-
// Process each stat
104
-
successCount := 0
105
-
skipCount := 0
106
-
errorCount := 0
107
-
108
-
for _, stat := range stats {
109
-
// Look up hold DID from manifests table
110
-
holdDID, err := GetLatestHoldDIDForRepo(db, stat.DID, stat.Repository)
111
-
if err != nil || holdDID == "" {
112
-
slog.Debug("No hold DID found for repo, skipping", "component", "migration",
113
-
"did", stat.DID, "repository", stat.Repository)
114
-
skipCount++
115
-
continue
116
-
}
117
-
118
-
// Resolve hold DID to HTTP URL
119
-
holdURL := atproto.ResolveHoldURL(holdDID)
120
-
if holdURL == "" {
121
-
slog.Warn("Failed to resolve hold DID, skipping", "component", "migration",
122
-
"hold_did", holdDID)
123
-
errorCount++
124
-
continue
125
-
}
126
-
127
-
// Call hold's setStats endpoint (no auth required for migration)
128
-
err = callSetStats(ctx, holdURL, stat.DID, stat.Repository,
129
-
stat.PullCount, stat.PushCount, stat.LastPull.String, stat.LastPush.String)
130
-
if err != nil {
131
-
slog.Warn("Failed to migrate stats to hold, continuing", "component", "migration",
132
-
"did", stat.DID, "repository", stat.Repository, "hold", holdDID, "error", err)
133
-
errorCount++
134
-
continue
135
-
}
136
-
137
-
successCount++
138
-
slog.Debug("Migrated stats", "component", "migration",
139
-
"did", stat.DID, "repository", stat.Repository, "hold", holdDID,
140
-
"pull_count", stat.PullCount, "push_count", stat.PushCount)
141
-
}
142
-
143
-
slog.Info("Stats migration completed", "component", "migration",
144
-
"success", successCount, "skipped", skipCount, "errors", errorCount, "total", len(stats))
145
-
146
-
// Only mark complete if there were no errors
147
-
// Skipped repos (no hold DID) will never migrate - that's fine
148
-
// Errors are transient failures that should be retried
149
-
if errorCount == 0 {
150
-
return markMigrationComplete(db)
151
-
}
152
-
153
-
slog.Warn("Stats migration had errors, will retry on next startup", "component", "migration",
154
-
"errors", errorCount)
155
-
return nil
156
-
}
157
-
158
-
// markMigrationComplete records that the stats migration has been done
159
-
func markMigrationComplete(db *sql.DB) error {
160
-
_, err := db.Exec(`
161
-
INSERT INTO schema_migrations (version, applied_at)
162
-
VALUES (1000, datetime('now'))
163
-
ON CONFLICT(version) DO NOTHING
164
-
`)
165
-
if err != nil {
166
-
return fmt.Errorf("failed to mark migration complete: %w", err)
167
-
}
168
-
return nil
169
-
}
170
-
171
-
// callSetStats calls the hold's io.atcr.hold.setStats endpoint
172
-
// No authentication required - this is a temporary migration endpoint
173
-
func callSetStats(ctx context.Context, holdURL, ownerDID, repository string, pullCount, pushCount int64, lastPull, lastPush string) error {
174
-
// Build request
175
-
reqBody := map[string]any{
176
-
"ownerDid": ownerDID,
177
-
"repository": repository,
178
-
"pullCount": pullCount,
179
-
"pushCount": pushCount,
180
-
}
181
-
if lastPull != "" {
182
-
reqBody["lastPull"] = lastPull
183
-
}
184
-
if lastPush != "" {
185
-
reqBody["lastPush"] = lastPush
186
-
}
187
-
188
-
body, err := json.Marshal(reqBody)
189
-
if err != nil {
190
-
return fmt.Errorf("failed to marshal request: %w", err)
191
-
}
192
-
193
-
// Create HTTP request
194
-
req, err := http.NewRequestWithContext(ctx, "POST", holdURL+atproto.HoldSetStats, bytes.NewReader(body))
195
-
if err != nil {
196
-
return fmt.Errorf("failed to create request: %w", err)
197
-
}
198
-
199
-
req.Header.Set("Content-Type", "application/json")
200
-
201
-
// Send request with timeout
202
-
client := &http.Client{Timeout: 10 * time.Second}
203
-
resp, err := client.Do(req)
204
-
if err != nil {
205
-
return fmt.Errorf("request failed: %w", err)
206
-
}
207
-
defer resp.Body.Close()
208
-
209
-
if resp.StatusCode != http.StatusOK {
210
-
body, _ := io.ReadAll(resp.Body)
211
-
return fmt.Errorf("setStats failed: status %d, body: %s", resp.StatusCode, body)
212
-
}
213
-
214
-
return nil
215
-
}
-46
pkg/hold/oci/xrpc.go
-46
pkg/hold/oci/xrpc.go
···
40
40
41
41
// RegisterHandlers registers all OCI XRPC endpoints with the chi router
42
42
func (h *XRPCHandler) RegisterHandlers(r chi.Router) {
43
-
// Temporary migration endpoint - no auth required
44
-
// TODO: Remove after stats migration is complete
45
-
r.Post(atproto.HoldSetStats, h.HandleSetStats)
46
-
47
43
// All multipart upload endpoints require blob:write permission
48
44
r.Group(func(r chi.Router) {
49
45
r.Use(h.requireBlobWriteAccess)
···
383
379
}
384
380
385
381
RespondJSON(w, http.StatusOK, resp)
386
-
}
387
-
388
-
// HandleSetStats sets absolute stats values for a repository (used by migration)
389
-
// This is a temporary migration-only endpoint that allows AppView to sync existing stats to holds.
390
-
// No authentication required - this endpoint will be removed after migration is complete.
391
-
// TODO: Remove this endpoint after stats migration is complete
392
-
func (h *XRPCHandler) HandleSetStats(w http.ResponseWriter, r *http.Request) {
393
-
ctx := r.Context()
394
-
395
-
// Parse request
396
-
var req struct {
397
-
OwnerDID string `json:"ownerDid"`
398
-
Repository string `json:"repository"`
399
-
PullCount int64 `json:"pullCount"`
400
-
PushCount int64 `json:"pushCount"`
401
-
LastPull string `json:"lastPull,omitempty"`
402
-
LastPush string `json:"lastPush,omitempty"`
403
-
}
404
-
405
-
if err := DecodeJSON(r, &req); err != nil {
406
-
RespondError(w, http.StatusBadRequest, err.Error())
407
-
return
408
-
}
409
-
410
-
// Validate required fields
411
-
if req.OwnerDID == "" || req.Repository == "" {
412
-
RespondError(w, http.StatusBadRequest, "ownerDid and repository are required")
413
-
return
414
-
}
415
-
416
-
// Set stats using the SetStats method
417
-
if err := h.pds.SetStats(ctx, req.OwnerDID, req.Repository, req.PullCount, req.PushCount, req.LastPull, req.LastPush); err != nil {
418
-
slog.Error("Failed to set stats", "error", err)
419
-
RespondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to set stats: %v", err))
420
-
return
421
-
}
422
-
423
-
slog.Info("Stats set via migration", "owner_did", req.OwnerDID, "repository", req.Repository, "pull_count", req.PullCount, "push_count", req.PushCount)
424
-
425
-
RespondJSON(w, http.StatusOK, map[string]any{
426
-
"success": true,
427
-
})
428
382
}
429
383
430
384
// requireBlobWriteAccess middleware - validates DPoP + OAuth and checks for blob:write permission