+6
-5
pkg/appview/db/device_store.go
+6
-5
pkg/appview/db/device_store.go
···
6
6
"database/sql"
7
7
"encoding/base64"
8
8
"fmt"
9
+
"log/slog"
9
10
"time"
10
11
11
12
"github.com/google/uuid"
···
108
109
return nil, false
109
110
}
110
111
if err != nil {
111
-
fmt.Printf("Warning: Failed to query pending auth: %v\n", err)
112
+
slog.Warn("Failed to query pending auth", "component", "device_store", "error", err)
112
113
return nil, false
113
114
}
114
115
···
144
145
return nil, false
145
146
}
146
147
if err != nil {
147
-
fmt.Printf("Warning: Failed to query pending auth: %v\n", err)
148
+
slog.Warn("Failed to query pending auth", "component", "device_store", "error", err)
148
149
return nil, false
149
150
}
150
151
···
382
383
`)
383
384
384
385
if err != nil {
385
-
fmt.Printf("Warning: Failed to cleanup expired pending auths: %v\n", err)
386
+
slog.Warn("Failed to cleanup expired pending auths", "component", "device_store", "error", err)
386
387
return
387
388
}
388
389
389
390
deleted, _ := result.RowsAffected()
390
391
if deleted > 0 {
391
-
fmt.Printf("Cleaned up %d expired pending device auths\n", deleted)
392
+
slog.Info("Cleaned up expired pending device auths", "count", deleted)
392
393
}
393
394
}
394
395
···
405
406
406
407
deleted, _ := result.RowsAffected()
407
408
if deleted > 0 {
408
-
fmt.Printf("Cleaned up %d expired pending device auths\n", deleted)
409
+
slog.Info("Cleaned up expired pending device auths", "count", deleted)
409
410
}
410
411
411
412
return nil
+8
-7
pkg/appview/db/oauth_store.go
+8
-7
pkg/appview/db/oauth_store.go
···
5
5
"database/sql"
6
6
"encoding/json"
7
7
"fmt"
8
+
"log/slog"
8
9
"time"
9
10
10
11
"github.com/bluesky-social/indigo/atproto/auth/oauth"
···
105
106
106
107
deleted, _ := result.RowsAffected()
107
108
if deleted > 0 {
108
-
fmt.Printf("Deleted %d OAuth session(s) for DID %s\n", deleted, did)
109
+
slog.Info("Deleted OAuth sessions for DID", "count", deleted, "did", did)
109
110
}
110
111
111
112
return nil
···
207
208
208
209
deleted, _ := result.RowsAffected()
209
210
if deleted > 0 {
210
-
fmt.Printf("Cleaned up %d old OAuth sessions (older than %v)\n", deleted, olderThan)
211
+
slog.Info("Cleaned up old OAuth sessions", "count", deleted, "older_than", olderThan)
211
212
}
212
213
213
214
return nil
···
228
229
229
230
deleted, _ := result.RowsAffected()
230
231
if deleted > 0 {
231
-
fmt.Printf("Cleaned up %d expired auth requests\n", deleted)
232
+
slog.Info("Cleaned up expired auth requests", "count", deleted)
232
233
}
233
234
234
235
return nil
···
252
253
for rows.Next() {
253
254
var sessionKey, accountDID, sessionID, sessionDataJSON string
254
255
if err := rows.Scan(&sessionKey, &accountDID, &sessionID, &sessionDataJSON); err != nil {
255
-
fmt.Printf("WARNING [oauth/store]: Failed to scan session row: %v\n", err)
256
+
slog.Warn("Failed to scan session row", "component", "oauth/store", "error", err)
256
257
continue
257
258
}
258
259
259
260
// Parse session data
260
261
var sessionData oauth.ClientSessionData
261
262
if err := json.Unmarshal([]byte(sessionDataJSON), &sessionData); err != nil {
262
-
fmt.Printf("WARNING [oauth/store]: Failed to parse session data for %s: %v\n", sessionKey, err)
263
+
slog.Warn("Failed to parse session data", "component", "oauth/store", "session_key", sessionKey, "error", err)
263
264
// Delete malformed sessions
264
265
sessionsToDelete = append(sessionsToDelete, sessionKey)
265
266
continue
···
284
285
DELETE FROM oauth_sessions WHERE session_key = ?
285
286
`, key)
286
287
if err != nil {
287
-
fmt.Printf("WARNING [oauth/store]: Failed to delete session %s: %v\n", key, err)
288
+
slog.Warn("Failed to delete session", "component", "oauth/store", "session_key", key, "error", err)
288
289
}
289
290
}
290
-
fmt.Printf("Invalidated %d OAuth session(s) with mismatched scopes\n", len(sessionsToDelete))
291
+
slog.Info("Invalidated OAuth sessions with mismatched scopes", "count", len(sessionsToDelete))
291
292
}
292
293
293
294
return len(sessionsToDelete), nil
+2
-1
pkg/appview/handlers/device.go
+2
-1
pkg/appview/handlers/device.go
···
4
4
"encoding/json"
5
5
"fmt"
6
6
"html/template"
7
+
"log/slog"
7
8
"net/http"
8
9
"net/url"
9
10
"strings"
···
271
272
// Approve the device
272
273
_, err := h.Store.ApprovePending(req.UserCode, sess.DID, sess.Handle)
273
274
if err != nil {
274
-
fmt.Printf("ERROR [device/approve]: Failed to approve: %v\n", err)
275
+
slog.Error("Failed to approve device", "component", "device/approve", "error", err)
275
276
http.Error(w, fmt.Sprintf("failed to approve: %v", err), http.StatusInternalServerError)
276
277
return
277
278
}
+22
-22
pkg/appview/jetstream/backfill.go
+22
-22
pkg/appview/jetstream/backfill.go
···
57
57
58
58
// First, query and cache the default hold's captain record
59
59
if b.defaultHoldDID != "" {
60
-
slog.Info("Backfill: Querying default hold captain record: %s\n", b.defaultHoldDID)
60
+
slog.Info("Backfill querying default hold captain record", "hold_did", b.defaultHoldDID)
61
61
if err := b.queryCaptainRecord(ctx, b.defaultHoldDID); err != nil {
62
-
slog.Warn("Backfill: Failed to query default hold captain record: %v\n", err)
62
+
slog.Warn("Backfill failed to query default hold captain record", "error", err)
63
63
// Don't fail the whole backfill - just warn
64
64
}
65
65
}
···
72
72
}
73
73
74
74
for _, collection := range collections {
75
-
slog.Info("Backfill: Processing collection: %s\n", collection)
75
+
slog.Info("Backfill processing collection", "collection", collection)
76
76
77
77
if err := b.backfillCollection(ctx, collection); err != nil {
78
78
return fmt.Errorf("failed to backfill collection %s: %w", collection, err)
79
79
}
80
80
81
-
slog.Info("Backfill: Completed collection: %s\n", collection)
81
+
slog.Info("Backfill completed collection", "collection", collection)
82
82
}
83
83
84
84
slog.Info("Backfill: All collections completed!")
···
99
99
return fmt.Errorf("failed to list repos: %w", err)
100
100
}
101
101
102
-
slog.Info("Backfill: Found %d repos with %s (cursor: %s)\n", len(result.Repos), collection, repoCursor)
102
+
slog.Info("Backfill found repos with collection", "count", len(result.Repos), "collection", collection, "cursor", repoCursor)
103
103
104
104
// Process each repo (DID)
105
105
for _, repo := range result.Repos {
106
106
recordCount, err := b.backfillRepo(ctx, repo.DID, collection)
107
107
if err != nil {
108
-
slog.Warn("Backfill: Failed to backfill repo %s: %v\n", repo.DID, err)
108
+
slog.Warn("Backfill failed to backfill repo", "did", repo.DID, "error", err)
109
109
continue
110
110
}
111
111
···
113
113
processedRecords += recordCount
114
114
115
115
if processedRepos%10 == 0 {
116
-
slog.Info("Backfill: Progress - %d repos, %d records\n", processedRepos, processedRecords)
116
+
slog.Info("Backfill progress", "repos", processedRepos, "records", processedRecords)
117
117
}
118
118
}
119
119
···
125
125
repoCursor = result.Cursor
126
126
}
127
127
128
-
slog.Info("Backfill: Collection %s complete - %d repos, %d records\n", collection, processedRepos, processedRecords)
128
+
slog.Info("Backfill collection complete", "collection", collection, "repos", processedRepos, "records", processedRecords)
129
129
return nil
130
130
}
131
131
···
197
197
}
198
198
199
199
if err := b.processRecord(ctx, did, collection, &record); err != nil {
200
-
slog.Warn("Backfill: Failed to process record %s: %v\n", record.URI, err)
200
+
slog.Warn("Backfill failed to process record", "uri", record.URI, "error", err)
201
201
continue
202
202
}
203
203
recordCount++
···
213
213
214
214
// Reconcile deletions - remove records from DB that no longer exist on PDS
215
215
if err := b.reconcileDeletions(did, collection, foundManifestDigests, foundTags, foundStars); err != nil {
216
-
slog.Warn("Backfill: Failed to reconcile deletions for %s: %v\n", did, err)
216
+
slog.Warn("Backfill failed to reconcile deletions", "did", did, "error", err)
217
217
}
218
218
219
219
// After processing manifests, clean up orphaned tags (tags pointing to non-existent manifests)
220
220
if collection == atproto.ManifestCollection {
221
221
if err := db.CleanupOrphanedTags(b.db, did); err != nil {
222
-
slog.Warn("Backfill: Failed to cleanup orphaned tags for %s: %v\n", did, err)
222
+
slog.Warn("Backfill failed to cleanup orphaned tags", "did", did, "error", err)
223
223
}
224
224
225
225
// Reconcile annotations - ensure they come from newest manifest per repository
226
226
// This fixes out-of-order backfill where older manifests can overwrite newer annotations
227
227
if err := b.reconcileAnnotations(ctx, did, pdsClient); err != nil {
228
-
slog.Warn("Backfill: Failed to reconcile annotations for %s: %v\n", did, err)
228
+
slog.Warn("Backfill failed to reconcile annotations", "did", did, "error", err)
229
229
}
230
230
}
231
231
···
250
250
// Log deletions
251
251
deleted := len(dbDigests) - len(foundManifestDigests)
252
252
if deleted > 0 {
253
-
slog.Info("Backfill: Deleted %d orphaned manifests for %s\n", deleted, did)
253
+
slog.Info("Backfill deleted orphaned manifests", "count", deleted, "did", did)
254
254
}
255
255
256
256
case atproto.TagCollection:
···
268
268
// Log deletions
269
269
deleted := len(dbTags) - len(foundTags)
270
270
if deleted > 0 {
271
-
slog.Info("Backfill: Deleted %d orphaned tags for %s\n", deleted, did)
271
+
slog.Info("Backfill deleted orphaned tags", "count", deleted, "did", did)
272
272
}
273
273
274
274
case atproto.StarCollection:
···
308
308
// Suppress warning for external holds in test mode
309
309
return nil
310
310
}
311
-
fmt.Printf("WARNING [backfill]: Failed to query captain record for hold %s: %v\n", holdDID, err)
311
+
slog.Warn("Backfill failed to query captain record for hold", "hold_did", holdDID, "error", err)
312
312
// Don't fail the whole backfill - just skip this hold
313
313
return nil
314
314
}
···
343
343
344
344
// Retry on connection errors (hold service might still be starting)
345
345
if attempt < maxRetries && strings.Contains(err.Error(), "connection refused") {
346
-
slog.Info("Backfill: Hold not ready (attempt %d/%d), retrying in 2s...\n", attempt, maxRetries)
346
+
slog.Info("Backfill hold not ready, retrying", "attempt", attempt, "max_retries", maxRetries)
347
347
time.Sleep(2 * time.Second)
348
348
continue
349
349
}
···
365
365
return fmt.Errorf("failed to cache captain record: %w", err)
366
366
}
367
367
368
-
slog.Info("Backfill: Cached captain record for hold %s (owner: %s)\n", holdDID, captainRecord.OwnerDID)
368
+
slog.Info("Backfill cached captain record for hold", "hold_did", holdDID, "owner_did", captainRecord.OwnerDID)
369
369
return nil
370
370
}
371
371
···
382
382
// Find newest manifest for this repository
383
383
newestManifest, err := db.GetNewestManifestForRepo(b.db, did, repo)
384
384
if err != nil {
385
-
fmt.Printf("WARNING [backfill]: Failed to get newest manifest for %s/%s: %v\n", did, repo, err)
385
+
slog.Warn("Backfill failed to get newest manifest for repo", "did", did, "repository", repo, "error", err)
386
386
continue // Skip on error
387
387
}
388
388
···
390
390
rkey := strings.TrimPrefix(newestManifest.Digest, "sha256:")
391
391
record, err := pdsClient.GetRecord(ctx, atproto.ManifestCollection, rkey)
392
392
if err != nil {
393
-
fmt.Printf("WARNING [backfill]: Failed to fetch manifest record for %s/%s: %v\n", did, repo, err)
393
+
slog.Warn("Backfill failed to fetch manifest record for repo", "did", did, "repository", repo, "error", err)
394
394
continue // Skip on error
395
395
}
396
396
397
397
// Parse manifest record
398
398
var manifestRecord atproto.ManifestRecord
399
399
if err := json.Unmarshal(record.Value, &manifestRecord); err != nil {
400
-
fmt.Printf("WARNING [backfill]: Failed to parse manifest record for %s/%s: %v\n", did, repo, err)
400
+
slog.Warn("Backfill failed to parse manifest record for repo", "did", did, "repository", repo, "error", err)
401
401
continue
402
402
}
403
403
···
415
415
if hasData {
416
416
err = db.UpsertRepositoryAnnotations(b.db, did, repo, manifestRecord.Annotations)
417
417
if err != nil {
418
-
fmt.Printf("WARNING [backfill]: Failed to reconcile annotations for %s/%s: %v\n", did, repo, err)
418
+
slog.Warn("Backfill failed to reconcile annotations for repo", "did", did, "repository", repo, "error", err)
419
419
} else {
420
-
slog.Info("Backfill: Reconciled annotations for %s/%s from newest manifest %s\n", did, repo, newestManifest.Digest)
420
+
slog.Info("Backfill reconciled annotations for repo from newest manifest", "did", did, "repository", repo, "digest", newestManifest.Digest)
421
421
}
422
422
}
423
423
}
+14
-26
pkg/appview/jetstream/worker.go
+14
-26
pkg/appview/jetstream/worker.go
···
90
90
// Calculate lag (cursor is in microseconds)
91
91
now := time.Now().UnixMicro()
92
92
lagSeconds := float64(now-w.startCursor) / 1_000_000.0
93
-
slog.Info("Jetstream: Starting from cursor %d (%.1f seconds behind live)\n", w.startCursor, lagSeconds)
93
+
slog.Info("Jetstream starting from cursor", "cursor", w.startCursor, "lag_seconds", lagSeconds)
94
94
}
95
95
96
96
// Disable compression for now to debug
97
97
// q.Set("compress", "true")
98
98
u.RawQuery = q.Encode()
99
99
100
-
fmt.Printf("Connecting to Jetstream: %s\n", u.String())
100
+
slog.Info("Connecting to Jetstream", "url", u.String())
101
101
102
102
// Connect to Jetstream
103
103
conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
···
170
170
171
171
// If no pong for 60 seconds, connection is likely dead
172
172
if timeSinceLastPong > 60*time.Second {
173
-
slog.Info("Jetstream: No pong received for %s (sent %d pings, got %d pongs), closing connection\n",
174
-
timeSinceLastPong, pingsTotal, pongsTotal)
173
+
slog.Info("Jetstream no pong received, closing connection", "time_since_last_pong", timeSinceLastPong, "pings_sent", pingsTotal, "pongs_received", pongsTotal)
175
174
conn.Close()
176
175
return
177
176
}
···
179
178
// Send ping with write deadline
180
179
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
181
180
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
182
-
slog.Info("Jetstream: Failed to send ping: %v\n", err)
181
+
slog.Warn("Jetstream failed to send ping", "error", err)
183
182
conn.Close()
184
183
return
185
184
}
···
201
200
return ctx.Err()
202
201
case <-heartbeatTicker.C:
203
202
elapsed := time.Since(lastHeartbeat)
204
-
slog.Info("Jetstream: Alive (processed %d events in last %.0fs)\n", eventCount, elapsed.Seconds())
203
+
slog.Info("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds())
205
204
eventCount = 0
206
205
lastHeartbeat = time.Now()
207
206
default:
···
237
236
}
238
237
239
238
// Log detailed context about the failure
240
-
slog.Info("Jetstream: Connection closed after %s\n", connDuration)
241
-
fmt.Printf(" - Events in last 30s: %d\n", eventCount)
242
-
fmt.Printf(" - Time since last event: %s\n", timeSinceLastEvent)
243
-
fmt.Printf(" - Ping/Pong: %d/%d (%.1f%% success)\n", pongsTotal, pingsTotal, pongRate)
244
-
fmt.Printf(" - Last pong: %s ago\n", timeSinceLastPong)
245
-
fmt.Printf(" - Error: %v\n", err)
246
-
fmt.Printf(" - Diagnosis: %s\n", diagnosis)
239
+
slog.Info("Jetstream connection closed", "duration", connDuration, "events_in_last_30s", eventCount, "time_since_last_event", timeSinceLastEvent, "pongs_received", pongsTotal, "pings_sent", pingsTotal, "pong_rate_pct", pongRate, "time_since_last_pong", timeSinceLastPong, "error", err, "diagnosis", diagnosis)
247
240
248
241
return fmt.Errorf("failed to read message: %w", err)
249
242
}
···
253
246
_ = decoder // Keep decoder to avoid unused variable error
254
247
255
248
if err := w.processMessage(message); err != nil {
256
-
fmt.Printf("ERROR processing message: %v\n", err)
249
+
slog.Error("ERROR processing message", "error", err)
257
250
// Continue processing other messages
258
251
} else {
259
252
eventCount++
···
306
299
307
300
// Debug: log first few collections we see to understand what's coming through
308
301
if w.debugCollectionCount < 5 {
309
-
fmt.Printf("Jetstream DEBUG: Received collection=%s, did=%s\n", commit.Collection, commit.DID)
302
+
slog.Debug("Jetstream received collection", "collection", commit.Collection, "did", commit.DID)
310
303
w.debugCollectionCount++
311
304
}
312
305
313
306
// Process based on collection
314
307
switch commit.Collection {
315
308
case atproto.ManifestCollection:
316
-
slog.Info("Jetstream: Processing manifest event: did=%s, operation=%s, rkey=%s\n",
317
-
commit.DID, commit.Operation, commit.RKey)
309
+
slog.Info("Jetstream processing manifest event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
318
310
return w.processManifest(commit)
319
311
case atproto.TagCollection:
320
-
slog.Info("Jetstream: Processing tag event: did=%s, operation=%s, rkey=%s\n",
321
-
commit.DID, commit.Operation, commit.RKey)
312
+
slog.Info("Jetstream processing tag event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
322
313
return w.processTag(commit)
323
314
case atproto.StarCollection:
324
-
slog.Info("Jetstream: Processing star event: did=%s, operation=%s, rkey=%s\n",
325
-
commit.DID, commit.Operation, commit.RKey)
315
+
slog.Info("Jetstream processing star event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
326
316
return w.processStar(commit)
327
317
default:
328
318
// Ignore other collections
···
373
363
if commit.Operation == "delete" {
374
364
// Delete tag - decode rkey back to repository and tag
375
365
repo, tag := atproto.RKeyToRepositoryTag(commit.RKey)
376
-
slog.Info("Jetstream: Deleting tag: did=%s, repository=%s, tag=%s (from rkey=%s)\n",
377
-
commit.DID, repo, tag, commit.RKey)
366
+
slog.Info("Jetstream deleting tag", "did", commit.DID, "repository", repo, "tag", tag, "rkey", commit.RKey)
378
367
if err := db.DeleteTag(w.db, commit.DID, repo, tag); err != nil {
379
-
slog.Info("Jetstream: ERROR deleting tag: %v\n", err)
368
+
slog.Error("Jetstream ERROR deleting tag", "error", err)
380
369
return err
381
370
}
382
-
slog.Info("Jetstream: Successfully deleted tag: did=%s, repository=%s, tag=%s\n",
383
-
commit.DID, repo, tag)
371
+
slog.Info("Jetstream successfully deleted tag", "did", commit.DID, "repository", repo, "tag", tag)
384
372
return nil
385
373
}
386
374