+10
-3
cmd/hold/main.go
+10
-3
cmd/hold/main.go
···
82
82
slog.Warn("Failed to bootstrap events from repo", "error", err)
83
83
}
84
84
85
-
// Wire up repo event handler to broadcaster
86
-
holdPDS.RepomgrRef().SetEventHandler(broadcaster.SetRepoEventHandler(), true)
85
+
// Backfill records index from existing MST data (one-time on startup)
86
+
if err := holdPDS.BackfillRecordsIndex(ctx); err != nil {
87
+
slog.Warn("Failed to backfill records index", "error", err)
88
+
}
89
+
90
+
// Wire up repo event handler with records indexing + broadcaster
91
+
// The indexing handler wraps the broadcaster handler to keep index in sync
92
+
indexingHandler := holdPDS.CreateRecordsIndexEventHandler(broadcaster.SetRepoEventHandler())
93
+
holdPDS.RepomgrRef().SetEventHandler(indexingHandler, true)
87
94
88
-
slog.Info("Embedded PDS initialized successfully with firehose enabled")
95
+
slog.Info("Embedded PDS initialized successfully with firehose and records index enabled")
89
96
} else {
90
97
slog.Error("Database path is required for embedded PDS authorization")
91
98
os.Exit(1)
+6
-1
pkg/appview/jetstream/backfill.go
+6
-1
pkg/appview/jetstream/backfill.go
···
50
50
return &BackfillWorker{
51
51
db: database,
52
52
client: client, // This points to the relay
53
-
processor: NewProcessor(database, false, nil), // No cache for batch processing, no stats
53
+
processor: NewProcessor(database, false, NewStatsCache()), // Stats cache for aggregation
54
54
defaultHoldDID: defaultHoldDID,
55
55
testMode: testMode,
56
56
refresher: refresher,
···
76
76
atproto.StarCollection, // io.atcr.sailor.star
77
77
atproto.SailorProfileCollection, // io.atcr.sailor.profile
78
78
atproto.RepoPageCollection, // io.atcr.repo.page
79
+
atproto.StatsCollection, // io.atcr.hold.stats (from holds)
79
80
}
80
81
81
82
for _, collection := range collections {
···
311
312
case atproto.RepoPageCollection:
312
313
// rkey is extracted from the record URI, but for repo pages we use Repository field
313
314
return b.processor.ProcessRepoPage(ctx, did, record.URI, record.Value, false)
315
+
case atproto.StatsCollection:
316
+
// Stats are stored in hold PDSes, not user PDSes
317
+
// 'did' here is the hold's DID (e.g., did:web:hold01.atcr.io)
318
+
return b.processor.ProcessStats(ctx, did, record.Value, false)
314
319
default:
315
320
return fmt.Errorf("unsupported collection: %s", collection)
316
321
}
+251
pkg/hold/pds/records.go
+251
pkg/hold/pds/records.go
···
1
+
package pds
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"fmt"
7
+
"log/slog"
8
+
"strings"
9
+
10
+
"github.com/bluesky-social/indigo/repo"
11
+
"github.com/ipfs/go-cid"
12
+
_ "github.com/mattn/go-sqlite3"
13
+
)
14
+
15
+
// RecordsIndex provides an efficient index for listing records
16
+
// This follows the official ATProto PDS pattern of using SQL for queries
17
+
// while MST is used for sync operations.
18
+
type RecordsIndex struct {
19
+
db *sql.DB
20
+
}
21
+
22
+
// Record represents a record in the index
23
+
type Record struct {
24
+
Collection string
25
+
Rkey string
26
+
Cid string
27
+
}
28
+
29
+
const recordsSchema = `
30
+
CREATE TABLE IF NOT EXISTS records (
31
+
collection TEXT NOT NULL,
32
+
rkey TEXT NOT NULL,
33
+
cid TEXT NOT NULL,
34
+
PRIMARY KEY (collection, rkey)
35
+
);
36
+
CREATE INDEX IF NOT EXISTS idx_records_collection_rkey ON records(collection, rkey);
37
+
`
38
+
39
+
// NewRecordsIndex creates or opens a records index
40
+
func NewRecordsIndex(dbPath string) (*RecordsIndex, error) {
41
+
db, err := sql.Open("sqlite3", dbPath)
42
+
if err != nil {
43
+
return nil, fmt.Errorf("failed to open records database: %w", err)
44
+
}
45
+
46
+
// Create schema
47
+
_, err = db.Exec(recordsSchema)
48
+
if err != nil {
49
+
db.Close()
50
+
return nil, fmt.Errorf("failed to create records schema: %w", err)
51
+
}
52
+
53
+
return &RecordsIndex{db: db}, nil
54
+
}
55
+
56
+
// Close closes the database connection
57
+
func (ri *RecordsIndex) Close() error {
58
+
if ri.db != nil {
59
+
return ri.db.Close()
60
+
}
61
+
return nil
62
+
}
63
+
64
+
// IndexRecord adds or updates a record in the index
65
+
func (ri *RecordsIndex) IndexRecord(collection, rkey, cidStr string) error {
66
+
_, err := ri.db.Exec(`
67
+
INSERT OR REPLACE INTO records (collection, rkey, cid)
68
+
VALUES (?, ?, ?)
69
+
`, collection, rkey, cidStr)
70
+
return err
71
+
}
72
+
73
+
// DeleteRecord removes a record from the index
74
+
func (ri *RecordsIndex) DeleteRecord(collection, rkey string) error {
75
+
_, err := ri.db.Exec(`
76
+
DELETE FROM records WHERE collection = ? AND rkey = ?
77
+
`, collection, rkey)
78
+
return err
79
+
}
80
+
81
+
// ListRecords returns records for a collection with pagination support
82
+
// reverse=false (default): newest first (rkey DESC)
83
+
// reverse=true: oldest first (rkey ASC)
84
+
func (ri *RecordsIndex) ListRecords(collection string, limit int, cursor string, reverse bool) ([]Record, string, error) {
85
+
// Build query based on sort order
86
+
var query string
87
+
var args []any
88
+
89
+
if reverse {
90
+
// Oldest first (ascending order)
91
+
if cursor != "" {
92
+
query = `
93
+
SELECT collection, rkey, cid FROM records
94
+
WHERE collection = ? AND rkey > ?
95
+
ORDER BY rkey ASC
96
+
LIMIT ?
97
+
`
98
+
args = []any{collection, cursor, limit + 1}
99
+
} else {
100
+
query = `
101
+
SELECT collection, rkey, cid FROM records
102
+
WHERE collection = ?
103
+
ORDER BY rkey ASC
104
+
LIMIT ?
105
+
`
106
+
args = []any{collection, limit + 1}
107
+
}
108
+
} else {
109
+
// Newest first (descending order) - default
110
+
if cursor != "" {
111
+
query = `
112
+
SELECT collection, rkey, cid FROM records
113
+
WHERE collection = ? AND rkey < ?
114
+
ORDER BY rkey DESC
115
+
LIMIT ?
116
+
`
117
+
args = []any{collection, cursor, limit + 1}
118
+
} else {
119
+
query = `
120
+
SELECT collection, rkey, cid FROM records
121
+
WHERE collection = ?
122
+
ORDER BY rkey DESC
123
+
LIMIT ?
124
+
`
125
+
args = []any{collection, limit + 1}
126
+
}
127
+
}
128
+
129
+
rows, err := ri.db.Query(query, args...)
130
+
if err != nil {
131
+
return nil, "", fmt.Errorf("failed to query records: %w", err)
132
+
}
133
+
defer rows.Close()
134
+
135
+
var records []Record
136
+
for rows.Next() {
137
+
var rec Record
138
+
if err := rows.Scan(&rec.Collection, &rec.Rkey, &rec.Cid); err != nil {
139
+
return nil, "", fmt.Errorf("failed to scan record: %w", err)
140
+
}
141
+
records = append(records, rec)
142
+
}
143
+
144
+
if err := rows.Err(); err != nil {
145
+
return nil, "", fmt.Errorf("error iterating records: %w", err)
146
+
}
147
+
148
+
// Determine next cursor
149
+
var nextCursor string
150
+
if len(records) > limit {
151
+
// More records available, set cursor to the last included record
152
+
nextCursor = records[limit-1].Rkey
153
+
records = records[:limit]
154
+
}
155
+
156
+
return records, nextCursor, nil
157
+
}
158
+
159
+
// Count returns the number of records in a collection
160
+
func (ri *RecordsIndex) Count(collection string) (int, error) {
161
+
var count int
162
+
err := ri.db.QueryRow(`
163
+
SELECT COUNT(*) FROM records WHERE collection = ?
164
+
`, collection).Scan(&count)
165
+
return count, err
166
+
}
167
+
168
+
// TotalCount returns the total number of records in the index
169
+
func (ri *RecordsIndex) TotalCount() (int, error) {
170
+
var count int
171
+
err := ri.db.QueryRow(`SELECT COUNT(*) FROM records`).Scan(&count)
172
+
return count, err
173
+
}
174
+
175
+
// BackfillFromRepo populates the records index from an existing MST repo
176
+
// Compares MST count with index count - only backfills if they differ
177
+
func (ri *RecordsIndex) BackfillFromRepo(ctx context.Context, repoHandle *repo.Repo) error {
178
+
// Count records in MST
179
+
mstCount := 0
180
+
err := repoHandle.ForEach(ctx, "", func(key string, c cid.Cid) error {
181
+
mstCount++
182
+
return nil
183
+
})
184
+
if err != nil {
185
+
return fmt.Errorf("failed to count MST records: %w", err)
186
+
}
187
+
188
+
// Count records in index
189
+
indexCount, err := ri.TotalCount()
190
+
if err != nil {
191
+
return fmt.Errorf("failed to check index count: %w", err)
192
+
}
193
+
194
+
// Skip if counts match
195
+
if indexCount == mstCount {
196
+
slog.Debug("Records index in sync with MST", "count", indexCount)
197
+
return nil
198
+
}
199
+
200
+
slog.Info("Backfilling records index from MST...", "mstCount", mstCount, "indexCount", indexCount)
201
+
202
+
// Begin transaction for bulk insert
203
+
tx, err := ri.db.BeginTx(ctx, nil)
204
+
if err != nil {
205
+
return fmt.Errorf("failed to begin transaction: %w", err)
206
+
}
207
+
defer tx.Rollback()
208
+
209
+
stmt, err := tx.Prepare(`
210
+
INSERT OR REPLACE INTO records (collection, rkey, cid)
211
+
VALUES (?, ?, ?)
212
+
`)
213
+
if err != nil {
214
+
return fmt.Errorf("failed to prepare statement: %w", err)
215
+
}
216
+
defer stmt.Close()
217
+
218
+
recordCount := 0
219
+
err = repoHandle.ForEach(ctx, "", func(key string, c cid.Cid) error {
220
+
// key format: "collection/rkey"
221
+
parts := strings.SplitN(key, "/", 2)
222
+
if len(parts) != 2 {
223
+
return nil // Skip malformed keys
224
+
}
225
+
collection, rkey := parts[0], parts[1]
226
+
227
+
_, err := stmt.Exec(collection, rkey, c.String())
228
+
if err != nil {
229
+
return fmt.Errorf("failed to index record %s: %w", key, err)
230
+
}
231
+
recordCount++
232
+
233
+
// Log progress every 1000 records
234
+
if recordCount%1000 == 0 {
235
+
slog.Debug("Backfill progress", "count", recordCount)
236
+
}
237
+
238
+
return nil
239
+
})
240
+
241
+
if err != nil {
242
+
return fmt.Errorf("failed to walk repo: %w", err)
243
+
}
244
+
245
+
if err := tx.Commit(); err != nil {
246
+
return fmt.Errorf("failed to commit transaction: %w", err)
247
+
}
248
+
249
+
slog.Info("Backfill complete", "records", recordCount)
250
+
return nil
251
+
}
+627
pkg/hold/pds/records_test.go
+627
pkg/hold/pds/records_test.go
···
1
+
package pds
2
+
3
+
import (
4
+
"context"
5
+
"os"
6
+
"path/filepath"
7
+
"testing"
8
+
9
+
"github.com/bluesky-social/indigo/repo"
10
+
_ "github.com/mattn/go-sqlite3"
11
+
)
12
+
13
+
// Tests for RecordsIndex
14
+
15
+
// TestNewRecordsIndex tests creating a new records index
16
+
func TestNewRecordsIndex(t *testing.T) {
17
+
tmpDir := t.TempDir()
18
+
dbPath := filepath.Join(tmpDir, "records.db")
19
+
20
+
ri, err := NewRecordsIndex(dbPath)
21
+
if err != nil {
22
+
t.Fatalf("NewRecordsIndex() error = %v", err)
23
+
}
24
+
defer ri.Close()
25
+
26
+
if ri.db == nil {
27
+
t.Error("Expected db to be non-nil")
28
+
}
29
+
30
+
// Verify database file was created
31
+
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
32
+
t.Error("Expected database file to be created")
33
+
}
34
+
}
35
+
36
+
// TestNewRecordsIndex_InvalidPath tests error handling for invalid path
37
+
func TestNewRecordsIndex_InvalidPath(t *testing.T) {
38
+
// Try to create in a non-existent directory
39
+
_, err := NewRecordsIndex("/nonexistent/dir/records.db")
40
+
if err == nil {
41
+
t.Error("Expected error for invalid path")
42
+
}
43
+
}
44
+
45
+
// TestRecordsIndex_IndexRecord tests adding records to the index
46
+
func TestRecordsIndex_IndexRecord(t *testing.T) {
47
+
tmpDir := t.TempDir()
48
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
49
+
if err != nil {
50
+
t.Fatalf("NewRecordsIndex() error = %v", err)
51
+
}
52
+
defer ri.Close()
53
+
54
+
// Index a record
55
+
err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123")
56
+
if err != nil {
57
+
t.Fatalf("IndexRecord() error = %v", err)
58
+
}
59
+
60
+
// Verify it was indexed
61
+
count, err := ri.Count("io.atcr.hold.crew")
62
+
if err != nil {
63
+
t.Fatalf("Count() error = %v", err)
64
+
}
65
+
if count != 1 {
66
+
t.Errorf("Expected count 1, got %d", count)
67
+
}
68
+
}
69
+
70
+
// TestRecordsIndex_IndexRecord_Upsert tests updating an existing record
71
+
func TestRecordsIndex_IndexRecord_Upsert(t *testing.T) {
72
+
tmpDir := t.TempDir()
73
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
74
+
if err != nil {
75
+
t.Fatalf("NewRecordsIndex() error = %v", err)
76
+
}
77
+
defer ri.Close()
78
+
79
+
// Index a record
80
+
err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123")
81
+
if err != nil {
82
+
t.Fatalf("IndexRecord() first call error = %v", err)
83
+
}
84
+
85
+
// Update the same record with new CID
86
+
err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei456")
87
+
if err != nil {
88
+
t.Fatalf("IndexRecord() second call error = %v", err)
89
+
}
90
+
91
+
// Count should still be 1 (upsert, not insert)
92
+
count, err := ri.Count("io.atcr.hold.crew")
93
+
if err != nil {
94
+
t.Fatalf("Count() error = %v", err)
95
+
}
96
+
if count != 1 {
97
+
t.Errorf("Expected count 1 after upsert, got %d", count)
98
+
}
99
+
100
+
// Verify the CID was updated
101
+
records, _, err := ri.ListRecords("io.atcr.hold.crew", 10, "", false)
102
+
if err != nil {
103
+
t.Fatalf("ListRecords() error = %v", err)
104
+
}
105
+
if len(records) != 1 {
106
+
t.Fatalf("Expected 1 record, got %d", len(records))
107
+
}
108
+
if records[0].Cid != "bafyrei456" {
109
+
t.Errorf("Expected CID bafyrei456, got %s", records[0].Cid)
110
+
}
111
+
}
112
+
113
+
// TestRecordsIndex_DeleteRecord tests removing a record from the index
114
+
func TestRecordsIndex_DeleteRecord(t *testing.T) {
115
+
tmpDir := t.TempDir()
116
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
117
+
if err != nil {
118
+
t.Fatalf("NewRecordsIndex() error = %v", err)
119
+
}
120
+
defer ri.Close()
121
+
122
+
// Index a record
123
+
err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123")
124
+
if err != nil {
125
+
t.Fatalf("IndexRecord() error = %v", err)
126
+
}
127
+
128
+
// Delete it
129
+
err = ri.DeleteRecord("io.atcr.hold.crew", "abc123")
130
+
if err != nil {
131
+
t.Fatalf("DeleteRecord() error = %v", err)
132
+
}
133
+
134
+
// Verify it was deleted
135
+
count, err := ri.Count("io.atcr.hold.crew")
136
+
if err != nil {
137
+
t.Fatalf("Count() error = %v", err)
138
+
}
139
+
if count != 0 {
140
+
t.Errorf("Expected count 0 after delete, got %d", count)
141
+
}
142
+
}
143
+
144
+
// TestRecordsIndex_DeleteRecord_NotExists tests deleting a non-existent record
145
+
func TestRecordsIndex_DeleteRecord_NotExists(t *testing.T) {
146
+
tmpDir := t.TempDir()
147
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
148
+
if err != nil {
149
+
t.Fatalf("NewRecordsIndex() error = %v", err)
150
+
}
151
+
defer ri.Close()
152
+
153
+
// Delete a record that doesn't exist - should not error
154
+
err = ri.DeleteRecord("io.atcr.hold.crew", "nonexistent")
155
+
if err != nil {
156
+
t.Errorf("DeleteRecord() should not error for non-existent record, got: %v", err)
157
+
}
158
+
}
159
+
160
+
// TestRecordsIndex_Close tests clean shutdown
161
+
func TestRecordsIndex_Close(t *testing.T) {
162
+
tmpDir := t.TempDir()
163
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
164
+
if err != nil {
165
+
t.Fatalf("NewRecordsIndex() error = %v", err)
166
+
}
167
+
168
+
err = ri.Close()
169
+
if err != nil {
170
+
t.Errorf("Close() error = %v", err)
171
+
}
172
+
173
+
// Double close should not panic (nil check)
174
+
ri.db = nil
175
+
err = ri.Close()
176
+
if err != nil {
177
+
t.Errorf("Close() on nil db error = %v", err)
178
+
}
179
+
}
180
+
181
+
// TestRecordsIndex_ListRecords_Empty tests listing an empty collection
182
+
func TestRecordsIndex_ListRecords_Empty(t *testing.T) {
183
+
tmpDir := t.TempDir()
184
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
185
+
if err != nil {
186
+
t.Fatalf("NewRecordsIndex() error = %v", err)
187
+
}
188
+
defer ri.Close()
189
+
190
+
records, cursor, err := ri.ListRecords("io.atcr.hold.crew", 10, "", false)
191
+
if err != nil {
192
+
t.Fatalf("ListRecords() error = %v", err)
193
+
}
194
+
195
+
if len(records) != 0 {
196
+
t.Errorf("Expected empty records, got %d", len(records))
197
+
}
198
+
if cursor != "" {
199
+
t.Errorf("Expected empty cursor, got %s", cursor)
200
+
}
201
+
}
202
+
203
+
// TestRecordsIndex_ListRecords_Basic tests basic listing
204
+
func TestRecordsIndex_ListRecords_Basic(t *testing.T) {
205
+
tmpDir := t.TempDir()
206
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
207
+
if err != nil {
208
+
t.Fatalf("NewRecordsIndex() error = %v", err)
209
+
}
210
+
defer ri.Close()
211
+
212
+
// Add some records
213
+
records := []struct {
214
+
rkey string
215
+
cid string
216
+
}{
217
+
{"aaa", "cid1"},
218
+
{"bbb", "cid2"},
219
+
{"ccc", "cid3"},
220
+
}
221
+
for _, r := range records {
222
+
if err := ri.IndexRecord("io.atcr.hold.crew", r.rkey, r.cid); err != nil {
223
+
t.Fatalf("IndexRecord() error = %v", err)
224
+
}
225
+
}
226
+
227
+
// List all
228
+
result, cursor, err := ri.ListRecords("io.atcr.hold.crew", 10, "", false)
229
+
if err != nil {
230
+
t.Fatalf("ListRecords() error = %v", err)
231
+
}
232
+
233
+
if len(result) != 3 {
234
+
t.Errorf("Expected 3 records, got %d", len(result))
235
+
}
236
+
if cursor != "" {
237
+
t.Errorf("Expected no cursor when all records returned, got %s", cursor)
238
+
}
239
+
}
240
+
241
+
// TestRecordsIndex_ListRecords_DefaultOrder tests newest-first ordering (DESC)
242
+
func TestRecordsIndex_ListRecords_DefaultOrder(t *testing.T) {
243
+
tmpDir := t.TempDir()
244
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
245
+
if err != nil {
246
+
t.Fatalf("NewRecordsIndex() error = %v", err)
247
+
}
248
+
defer ri.Close()
249
+
250
+
// Add records with different rkeys (TIDs are lexicographically ordered by time)
251
+
rkeys := []string{"3m3aaaaaaaaa", "3m3bbbbbbbbb", "3m3ccccccccc"}
252
+
for _, rkey := range rkeys {
253
+
if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey); err != nil {
254
+
t.Fatalf("IndexRecord() error = %v", err)
255
+
}
256
+
}
257
+
258
+
// List with default order (newest first = DESC)
259
+
records, _, err := ri.ListRecords("io.atcr.hold.crew", 10, "", false)
260
+
if err != nil {
261
+
t.Fatalf("ListRecords() error = %v", err)
262
+
}
263
+
264
+
// Should be in descending order
265
+
if len(records) != 3 {
266
+
t.Fatalf("Expected 3 records, got %d", len(records))
267
+
}
268
+
if records[0].Rkey != "3m3ccccccccc" {
269
+
t.Errorf("Expected first record rkey=3m3ccccccccc, got %s", records[0].Rkey)
270
+
}
271
+
if records[1].Rkey != "3m3bbbbbbbbb" {
272
+
t.Errorf("Expected second record rkey=3m3bbbbbbbbb, got %s", records[1].Rkey)
273
+
}
274
+
if records[2].Rkey != "3m3aaaaaaaaa" {
275
+
t.Errorf("Expected third record rkey=3m3aaaaaaaaa, got %s", records[2].Rkey)
276
+
}
277
+
}
278
+
279
+
// TestRecordsIndex_ListRecords_ReverseOrder tests oldest-first ordering (ASC)
280
+
func TestRecordsIndex_ListRecords_ReverseOrder(t *testing.T) {
281
+
tmpDir := t.TempDir()
282
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
283
+
if err != nil {
284
+
t.Fatalf("NewRecordsIndex() error = %v", err)
285
+
}
286
+
defer ri.Close()
287
+
288
+
// Add records
289
+
rkeys := []string{"3m3aaaaaaaaa", "3m3bbbbbbbbb", "3m3ccccccccc"}
290
+
for _, rkey := range rkeys {
291
+
if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey); err != nil {
292
+
t.Fatalf("IndexRecord() error = %v", err)
293
+
}
294
+
}
295
+
296
+
// List with reverse=true (oldest first = ASC)
297
+
records, _, err := ri.ListRecords("io.atcr.hold.crew", 10, "", true)
298
+
if err != nil {
299
+
t.Fatalf("ListRecords() error = %v", err)
300
+
}
301
+
302
+
// Should be in ascending order
303
+
if len(records) != 3 {
304
+
t.Fatalf("Expected 3 records, got %d", len(records))
305
+
}
306
+
if records[0].Rkey != "3m3aaaaaaaaa" {
307
+
t.Errorf("Expected first record rkey=3m3aaaaaaaaa, got %s", records[0].Rkey)
308
+
}
309
+
if records[1].Rkey != "3m3bbbbbbbbb" {
310
+
t.Errorf("Expected second record rkey=3m3bbbbbbbbb, got %s", records[1].Rkey)
311
+
}
312
+
if records[2].Rkey != "3m3ccccccccc" {
313
+
t.Errorf("Expected third record rkey=3m3ccccccccc, got %s", records[2].Rkey)
314
+
}
315
+
}
316
+
317
+
// TestRecordsIndex_ListRecords_Limit tests the limit parameter
318
+
func TestRecordsIndex_ListRecords_Limit(t *testing.T) {
319
+
tmpDir := t.TempDir()
320
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
321
+
if err != nil {
322
+
t.Fatalf("NewRecordsIndex() error = %v", err)
323
+
}
324
+
defer ri.Close()
325
+
326
+
// Add 5 records
327
+
for i := 0; i < 5; i++ {
328
+
rkey := string(rune('a' + i))
329
+
if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey); err != nil {
330
+
t.Fatalf("IndexRecord() error = %v", err)
331
+
}
332
+
}
333
+
334
+
// List with limit=2
335
+
records, cursor, err := ri.ListRecords("io.atcr.hold.crew", 2, "", false)
336
+
if err != nil {
337
+
t.Fatalf("ListRecords() error = %v", err)
338
+
}
339
+
340
+
if len(records) != 2 {
341
+
t.Errorf("Expected 2 records with limit=2, got %d", len(records))
342
+
}
343
+
if cursor == "" {
344
+
t.Error("Expected cursor when more records exist")
345
+
}
346
+
}
347
+
348
+
// TestRecordsIndex_ListRecords_Cursor tests pagination with cursor
349
+
func TestRecordsIndex_ListRecords_Cursor(t *testing.T) {
350
+
tmpDir := t.TempDir()
351
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
352
+
if err != nil {
353
+
t.Fatalf("NewRecordsIndex() error = %v", err)
354
+
}
355
+
defer ri.Close()
356
+
357
+
// Add 5 records
358
+
rkeys := []string{"a", "b", "c", "d", "e"}
359
+
for _, rkey := range rkeys {
360
+
if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey); err != nil {
361
+
t.Fatalf("IndexRecord() error = %v", err)
362
+
}
363
+
}
364
+
365
+
// First page (default order = DESC, so e, d first)
366
+
page1, cursor1, err := ri.ListRecords("io.atcr.hold.crew", 2, "", false)
367
+
if err != nil {
368
+
t.Fatalf("ListRecords() page 1 error = %v", err)
369
+
}
370
+
if len(page1) != 2 {
371
+
t.Fatalf("Expected 2 records in page 1, got %d", len(page1))
372
+
}
373
+
if cursor1 == "" {
374
+
t.Fatal("Expected cursor after page 1")
375
+
}
376
+
377
+
// Second page using cursor
378
+
page2, cursor2, err := ri.ListRecords("io.atcr.hold.crew", 2, cursor1, false)
379
+
if err != nil {
380
+
t.Fatalf("ListRecords() page 2 error = %v", err)
381
+
}
382
+
if len(page2) != 2 {
383
+
t.Errorf("Expected 2 records in page 2, got %d", len(page2))
384
+
}
385
+
386
+
// Third page
387
+
page3, cursor3, err := ri.ListRecords("io.atcr.hold.crew", 2, cursor2, false)
388
+
if err != nil {
389
+
t.Fatalf("ListRecords() page 3 error = %v", err)
390
+
}
391
+
if len(page3) != 1 {
392
+
t.Errorf("Expected 1 record in page 3, got %d", len(page3))
393
+
}
394
+
if cursor3 != "" {
395
+
t.Errorf("Expected no cursor after last page, got %s", cursor3)
396
+
}
397
+
398
+
// Verify no duplicates across pages
399
+
seen := make(map[string]bool)
400
+
for _, r := range page1 {
401
+
if seen[r.Rkey] {
402
+
t.Errorf("Duplicate record: %s", r.Rkey)
403
+
}
404
+
seen[r.Rkey] = true
405
+
}
406
+
for _, r := range page2 {
407
+
if seen[r.Rkey] {
408
+
t.Errorf("Duplicate record: %s", r.Rkey)
409
+
}
410
+
seen[r.Rkey] = true
411
+
}
412
+
for _, r := range page3 {
413
+
if seen[r.Rkey] {
414
+
t.Errorf("Duplicate record: %s", r.Rkey)
415
+
}
416
+
seen[r.Rkey] = true
417
+
}
418
+
if len(seen) != 5 {
419
+
t.Errorf("Expected 5 unique records, got %d", len(seen))
420
+
}
421
+
}
422
+
423
+
// TestRecordsIndex_ListRecords_CursorReverse tests pagination with reverse order
424
+
func TestRecordsIndex_ListRecords_CursorReverse(t *testing.T) {
425
+
tmpDir := t.TempDir()
426
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
427
+
if err != nil {
428
+
t.Fatalf("NewRecordsIndex() error = %v", err)
429
+
}
430
+
defer ri.Close()
431
+
432
+
// Add 5 records
433
+
rkeys := []string{"a", "b", "c", "d", "e"}
434
+
for _, rkey := range rkeys {
435
+
if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey); err != nil {
436
+
t.Fatalf("IndexRecord() error = %v", err)
437
+
}
438
+
}
439
+
440
+
// First page (reverse = ASC, so a, b first)
441
+
page1, cursor1, err := ri.ListRecords("io.atcr.hold.crew", 2, "", true)
442
+
if err != nil {
443
+
t.Fatalf("ListRecords() page 1 error = %v", err)
444
+
}
445
+
if len(page1) != 2 {
446
+
t.Fatalf("Expected 2 records in page 1, got %d", len(page1))
447
+
}
448
+
if page1[0].Rkey != "a" {
449
+
t.Errorf("Expected first record a, got %s", page1[0].Rkey)
450
+
}
451
+
if page1[1].Rkey != "b" {
452
+
t.Errorf("Expected second record b, got %s", page1[1].Rkey)
453
+
}
454
+
455
+
// Second page using cursor
456
+
page2, _, err := ri.ListRecords("io.atcr.hold.crew", 2, cursor1, true)
457
+
if err != nil {
458
+
t.Fatalf("ListRecords() page 2 error = %v", err)
459
+
}
460
+
if len(page2) != 2 {
461
+
t.Errorf("Expected 2 records in page 2, got %d", len(page2))
462
+
}
463
+
if page2[0].Rkey != "c" {
464
+
t.Errorf("Expected first record c, got %s", page2[0].Rkey)
465
+
}
466
+
}
467
+
468
+
// TestRecordsIndex_Count tests counting records in a collection
469
+
func TestRecordsIndex_Count(t *testing.T) {
470
+
tmpDir := t.TempDir()
471
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
472
+
if err != nil {
473
+
t.Fatalf("NewRecordsIndex() error = %v", err)
474
+
}
475
+
defer ri.Close()
476
+
477
+
// Add records to two collections
478
+
for i := 0; i < 3; i++ {
479
+
ri.IndexRecord("io.atcr.hold.crew", string(rune('a'+i)), "cid1")
480
+
}
481
+
for i := 0; i < 5; i++ {
482
+
ri.IndexRecord("io.atcr.hold.captain", string(rune('a'+i)), "cid2")
483
+
}
484
+
485
+
// Count crew
486
+
count, err := ri.Count("io.atcr.hold.crew")
487
+
if err != nil {
488
+
t.Fatalf("Count() error = %v", err)
489
+
}
490
+
if count != 3 {
491
+
t.Errorf("Expected crew count 3, got %d", count)
492
+
}
493
+
494
+
// Count captain
495
+
count, err = ri.Count("io.atcr.hold.captain")
496
+
if err != nil {
497
+
t.Fatalf("Count() error = %v", err)
498
+
}
499
+
if count != 5 {
500
+
t.Errorf("Expected captain count 5, got %d", count)
501
+
}
502
+
}
503
+
504
+
// TestRecordsIndex_Count_Empty tests counting an empty collection
505
+
func TestRecordsIndex_Count_Empty(t *testing.T) {
506
+
tmpDir := t.TempDir()
507
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
508
+
if err != nil {
509
+
t.Fatalf("NewRecordsIndex() error = %v", err)
510
+
}
511
+
defer ri.Close()
512
+
513
+
count, err := ri.Count("io.atcr.nonexistent")
514
+
if err != nil {
515
+
t.Fatalf("Count() error = %v", err)
516
+
}
517
+
if count != 0 {
518
+
t.Errorf("Expected count 0 for empty collection, got %d", count)
519
+
}
520
+
}
521
+
522
+
// TestRecordsIndex_TotalCount tests total count across all collections
523
+
func TestRecordsIndex_TotalCount(t *testing.T) {
524
+
tmpDir := t.TempDir()
525
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
526
+
if err != nil {
527
+
t.Fatalf("NewRecordsIndex() error = %v", err)
528
+
}
529
+
defer ri.Close()
530
+
531
+
// Add records to multiple collections
532
+
ri.IndexRecord("io.atcr.hold.crew", "a", "cid1")
533
+
ri.IndexRecord("io.atcr.hold.crew", "b", "cid2")
534
+
ri.IndexRecord("io.atcr.hold.captain", "self", "cid3")
535
+
ri.IndexRecord("io.atcr.manifest", "abc123", "cid4")
536
+
537
+
count, err := ri.TotalCount()
538
+
if err != nil {
539
+
t.Fatalf("TotalCount() error = %v", err)
540
+
}
541
+
if count != 4 {
542
+
t.Errorf("Expected total count 4, got %d", count)
543
+
}
544
+
}
545
+
546
+
// TestRecordsIndex_BackfillFromRepo_Empty tests backfill with empty repo
547
+
func TestRecordsIndex_BackfillFromRepo_Empty(t *testing.T) {
548
+
// This test requires a mock repo which is complex to set up
549
+
// Skip for now - the integration tests in server_test.go will cover this
550
+
t.Skip("Requires mock repo setup - covered by integration tests")
551
+
}
552
+
553
+
// TestRecordsIndex_BackfillFromRepo tests backfill from MST
554
+
func TestRecordsIndex_BackfillFromRepo(t *testing.T) {
555
+
// This test requires a real repo with MST data
556
+
// Skip unit test - covered by integration tests in server_test.go
557
+
t.Skip("Requires real repo with MST - covered by integration tests")
558
+
}
559
+
560
+
// TestRecordsIndex_BackfillFromRepo_SkipsWhenSynced tests backfill skip logic
561
+
func TestRecordsIndex_BackfillFromRepo_SkipsWhenSynced(t *testing.T) {
562
+
// Create a mock scenario where counts match
563
+
// This is tested via the count comparison logic
564
+
tmpDir := t.TempDir()
565
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
566
+
if err != nil {
567
+
t.Fatalf("NewRecordsIndex() error = %v", err)
568
+
}
569
+
defer ri.Close()
570
+
571
+
// The skip logic depends on count comparison in BackfillFromRepo
572
+
// which requires a real repo. Skip for now.
573
+
t.Skip("Requires mock repo - covered by integration tests")
574
+
}
575
+
576
+
// TestRecordsIndex_MultipleCollections tests isolation between collections
577
+
func TestRecordsIndex_MultipleCollections(t *testing.T) {
578
+
tmpDir := t.TempDir()
579
+
ri, err := NewRecordsIndex(filepath.Join(tmpDir, "records.db"))
580
+
if err != nil {
581
+
t.Fatalf("NewRecordsIndex() error = %v", err)
582
+
}
583
+
defer ri.Close()
584
+
585
+
// Add records to different collections with same rkeys
586
+
ri.IndexRecord("io.atcr.hold.crew", "abc", "cid-crew")
587
+
ri.IndexRecord("io.atcr.hold.captain", "abc", "cid-captain")
588
+
ri.IndexRecord("io.atcr.manifest", "abc", "cid-manifest")
589
+
590
+
// Listing should only return records from requested collection
591
+
records, _, err := ri.ListRecords("io.atcr.hold.crew", 10, "", false)
592
+
if err != nil {
593
+
t.Fatalf("ListRecords() error = %v", err)
594
+
}
595
+
if len(records) != 1 {
596
+
t.Errorf("Expected 1 crew record, got %d", len(records))
597
+
}
598
+
if records[0].Cid != "cid-crew" {
599
+
t.Errorf("Expected cid-crew, got %s", records[0].Cid)
600
+
}
601
+
602
+
// Delete from one collection shouldn't affect others
603
+
ri.DeleteRecord("io.atcr.hold.crew", "abc")
604
+
605
+
count, _ := ri.Count("io.atcr.hold.captain")
606
+
if count != 1 {
607
+
t.Errorf("Expected captain count 1 after deleting crew, got %d", count)
608
+
}
609
+
}
610
+
611
+
// mockRepo is a minimal mock for testing backfill
612
+
// Note: Full backfill testing requires integration tests with real repo
613
+
type mockRepo struct {
614
+
records map[string]string // key -> cid
615
+
}
616
+
617
+
func (m *mockRepo) ForEach(ctx context.Context, prefix string, fn func(string, interface{}) error) error {
618
+
for k, v := range m.records {
619
+
if err := fn(k, v); err != nil {
620
+
if err == repo.ErrDoneIterating {
621
+
return nil
622
+
}
623
+
return err
624
+
}
625
+
}
626
+
return nil
627
+
}
+97
-2
pkg/hold/pds/server.go
+97
-2
pkg/hold/pds/server.go
···
41
41
uid models.Uid
42
42
signingKey *atcrypto.PrivateKeyK256
43
43
enableBlueskyPosts bool
44
+
recordsIndex *RecordsIndex
44
45
}
45
46
46
47
// NewHoldPDS creates or opens a hold PDS with SQLite carstore
···
98
99
slog.Info("New hold repo - will be initialized in Bootstrap")
99
100
}
100
101
102
+
// Initialize records index for efficient listing queries
103
+
// Uses same database as carstore for simplicity
104
+
var recordsIndex *RecordsIndex
105
+
if dbPath != ":memory:" {
106
+
recordsDbPath := dbPath + "/db.sqlite3"
107
+
recordsIndex, err = NewRecordsIndex(recordsDbPath)
108
+
if err != nil {
109
+
return nil, fmt.Errorf("failed to create records index: %w", err)
110
+
}
111
+
}
112
+
101
113
return &HoldPDS{
102
114
did: did,
103
115
PublicURL: publicURL,
···
107
119
uid: uid,
108
120
signingKey: signingKey,
109
121
enableBlueskyPosts: enableBlueskyPosts,
122
+
recordsIndex: recordsIndex,
110
123
}, nil
111
124
}
112
125
···
123
136
// RepomgrRef returns a reference to the RepoManager for event handler setup
124
137
func (p *HoldPDS) RepomgrRef() *RepoManager {
125
138
return p.repomgr
139
+
}
140
+
141
+
// RecordsIndex returns the records index for efficient listing
142
+
func (p *HoldPDS) RecordsIndex() *RecordsIndex {
143
+
return p.recordsIndex
144
+
}
145
+
146
+
// Carstore returns the carstore for repo operations
147
+
func (p *HoldPDS) Carstore() carstore.CarStore {
148
+
return p.carstore
149
+
}
150
+
151
+
// UID returns the user ID for this hold
152
+
func (p *HoldPDS) UID() models.Uid {
153
+
return p.uid
126
154
}
127
155
128
156
// Bootstrap initializes the hold with the captain record, owner as first crew member, and profile
···
268
296
return result, nil
269
297
}
270
298
271
-
// Close closes the carstore
299
+
// Close closes the carstore and records index
272
300
func (p *HoldPDS) Close() error {
273
-
// TODO: Close session properly
301
+
if p.recordsIndex != nil {
302
+
if err := p.recordsIndex.Close(); err != nil {
303
+
return fmt.Errorf("failed to close records index: %w", err)
304
+
}
305
+
}
274
306
return nil
275
307
}
308
+
309
+
// CreateRecordsIndexEventHandler creates an event handler that indexes records
310
+
// and also calls the provided broadcaster handler
311
+
func (p *HoldPDS) CreateRecordsIndexEventHandler(broadcasterHandler func(context.Context, *RepoEvent)) func(context.Context, *RepoEvent) {
312
+
return func(ctx context.Context, event *RepoEvent) {
313
+
// Index/delete records based on event operations
314
+
if p.recordsIndex != nil {
315
+
for _, op := range event.Ops {
316
+
switch op.Kind {
317
+
case EvtKindCreateRecord, EvtKindUpdateRecord:
318
+
// Index the record
319
+
cidStr := ""
320
+
if op.RecCid != nil {
321
+
cidStr = op.RecCid.String()
322
+
}
323
+
if err := p.recordsIndex.IndexRecord(op.Collection, op.Rkey, cidStr); err != nil {
324
+
slog.Warn("Failed to index record", "collection", op.Collection, "rkey", op.Rkey, "error", err)
325
+
}
326
+
case EvtKindDeleteRecord:
327
+
// Remove from index
328
+
if err := p.recordsIndex.DeleteRecord(op.Collection, op.Rkey); err != nil {
329
+
slog.Warn("Failed to delete record from index", "collection", op.Collection, "rkey", op.Rkey, "error", err)
330
+
}
331
+
}
332
+
}
333
+
}
334
+
335
+
// Call the broadcaster handler
336
+
if broadcasterHandler != nil {
337
+
broadcasterHandler(ctx, event)
338
+
}
339
+
}
340
+
}
341
+
342
+
// BackfillRecordsIndex populates the records index from existing MST data
343
+
func (p *HoldPDS) BackfillRecordsIndex(ctx context.Context) error {
344
+
if p.recordsIndex == nil {
345
+
return nil // No index to backfill
346
+
}
347
+
348
+
// Create session to read repo
349
+
session, err := p.carstore.ReadOnlySession(p.uid)
350
+
if err != nil {
351
+
return fmt.Errorf("failed to create session: %w", err)
352
+
}
353
+
354
+
head, err := p.carstore.GetUserRepoHead(ctx, p.uid)
355
+
if err != nil {
356
+
return fmt.Errorf("failed to get repo head: %w", err)
357
+
}
358
+
359
+
if !head.Defined() {
360
+
slog.Debug("No repo head, skipping backfill")
361
+
return nil
362
+
}
363
+
364
+
repoHandle, err := repo.OpenRepo(ctx, session, head)
365
+
if err != nil {
366
+
return fmt.Errorf("failed to open repo: %w", err)
367
+
}
368
+
369
+
return p.recordsIndex.BackfillFromRepo(ctx, repoHandle)
370
+
}
+328
pkg/hold/pds/server_test.go
+328
pkg/hold/pds/server_test.go
···
620
620
}
621
621
}
622
622
}
623
+
624
+
// Tests for RecordsIndex feature
625
+
626
+
// TestHoldPDS_RecordsIndex_Nil tests that RecordsIndex is nil for :memory: database
627
+
func TestHoldPDS_RecordsIndex_Nil(t *testing.T) {
628
+
ctx := context.Background()
629
+
tmpDir := t.TempDir()
630
+
keyPath := filepath.Join(tmpDir, "signing-key")
631
+
632
+
// Create with :memory: database
633
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", ":memory:", keyPath, false)
634
+
if err != nil {
635
+
t.Fatalf("NewHoldPDS failed: %v", err)
636
+
}
637
+
defer pds.Close()
638
+
639
+
// RecordsIndex should be nil for :memory:
640
+
if pds.RecordsIndex() != nil {
641
+
t.Error("Expected RecordsIndex() to be nil for :memory: database")
642
+
}
643
+
}
644
+
645
+
// TestHoldPDS_RecordsIndex_NonNil tests that RecordsIndex is created for file database
646
+
func TestHoldPDS_RecordsIndex_NonNil(t *testing.T) {
647
+
ctx := context.Background()
648
+
tmpDir := t.TempDir()
649
+
dbPath := filepath.Join(tmpDir, "pds.db")
650
+
keyPath := filepath.Join(tmpDir, "signing-key")
651
+
652
+
// Create with file database
653
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", dbPath, keyPath, false)
654
+
if err != nil {
655
+
t.Fatalf("NewHoldPDS failed: %v", err)
656
+
}
657
+
defer pds.Close()
658
+
659
+
// RecordsIndex should be non-nil for file database
660
+
if pds.RecordsIndex() == nil {
661
+
t.Error("Expected RecordsIndex() to be non-nil for file database")
662
+
}
663
+
}
664
+
665
+
// TestHoldPDS_Carstore tests the Carstore getter
666
+
func TestHoldPDS_Carstore(t *testing.T) {
667
+
ctx := context.Background()
668
+
tmpDir := t.TempDir()
669
+
keyPath := filepath.Join(tmpDir, "signing-key")
670
+
671
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", ":memory:", keyPath, false)
672
+
if err != nil {
673
+
t.Fatalf("NewHoldPDS failed: %v", err)
674
+
}
675
+
defer pds.Close()
676
+
677
+
if pds.Carstore() == nil {
678
+
t.Error("Expected Carstore() to be non-nil")
679
+
}
680
+
}
681
+
682
+
// TestHoldPDS_UID tests the UID getter
683
+
func TestHoldPDS_UID(t *testing.T) {
684
+
ctx := context.Background()
685
+
tmpDir := t.TempDir()
686
+
keyPath := filepath.Join(tmpDir, "signing-key")
687
+
688
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", ":memory:", keyPath, false)
689
+
if err != nil {
690
+
t.Fatalf("NewHoldPDS failed: %v", err)
691
+
}
692
+
defer pds.Close()
693
+
694
+
// UID should be 1 for single-user PDS
695
+
if pds.UID() != 1 {
696
+
t.Errorf("Expected UID() to be 1, got %d", pds.UID())
697
+
}
698
+
}
699
+
700
+
// TestHoldPDS_CreateRecordsIndexEventHandler tests event handler wrapper
701
+
func TestHoldPDS_CreateRecordsIndexEventHandler(t *testing.T) {
702
+
ctx := context.Background()
703
+
tmpDir := t.TempDir()
704
+
dbPath := filepath.Join(tmpDir, "pds.db")
705
+
keyPath := filepath.Join(tmpDir, "signing-key")
706
+
707
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", dbPath, keyPath, false)
708
+
if err != nil {
709
+
t.Fatalf("NewHoldPDS failed: %v", err)
710
+
}
711
+
defer pds.Close()
712
+
713
+
// Track if broadcaster was called
714
+
broadcasterCalled := false
715
+
broadcasterHandler := func(ctx context.Context, event *RepoEvent) {
716
+
broadcasterCalled = true
717
+
}
718
+
719
+
// Create handler
720
+
handler := pds.CreateRecordsIndexEventHandler(broadcasterHandler)
721
+
if handler == nil {
722
+
t.Fatal("Expected handler to be non-nil")
723
+
}
724
+
725
+
// Create a test event with create operation
726
+
event := &RepoEvent{
727
+
Ops: []RepoOp{
728
+
{
729
+
Kind: EvtKindCreateRecord,
730
+
Collection: "io.atcr.hold.crew",
731
+
Rkey: "testrkey",
732
+
RecCid: nil, // Will be nil string
733
+
},
734
+
},
735
+
}
736
+
737
+
// Call handler
738
+
handler(ctx, event)
739
+
740
+
// Verify broadcaster was called
741
+
if !broadcasterCalled {
742
+
t.Error("Expected broadcaster handler to be called")
743
+
}
744
+
745
+
// Verify record was indexed
746
+
if pds.RecordsIndex() != nil {
747
+
count, err := pds.RecordsIndex().Count("io.atcr.hold.crew")
748
+
if err != nil {
749
+
t.Fatalf("Count() error = %v", err)
750
+
}
751
+
if count != 1 {
752
+
t.Errorf("Expected 1 indexed record, got %d", count)
753
+
}
754
+
}
755
+
}
756
+
757
+
// TestHoldPDS_CreateRecordsIndexEventHandler_Delete tests delete operation
758
+
func TestHoldPDS_CreateRecordsIndexEventHandler_Delete(t *testing.T) {
759
+
ctx := context.Background()
760
+
tmpDir := t.TempDir()
761
+
dbPath := filepath.Join(tmpDir, "pds.db")
762
+
keyPath := filepath.Join(tmpDir, "signing-key")
763
+
764
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", dbPath, keyPath, false)
765
+
if err != nil {
766
+
t.Fatalf("NewHoldPDS failed: %v", err)
767
+
}
768
+
defer pds.Close()
769
+
770
+
handler := pds.CreateRecordsIndexEventHandler(nil)
771
+
772
+
// First, create a record
773
+
createEvent := &RepoEvent{
774
+
Ops: []RepoOp{
775
+
{
776
+
Kind: EvtKindCreateRecord,
777
+
Collection: "io.atcr.hold.crew",
778
+
Rkey: "testrkey",
779
+
},
780
+
},
781
+
}
782
+
handler(ctx, createEvent)
783
+
784
+
// Verify it was indexed
785
+
count, _ := pds.RecordsIndex().Count("io.atcr.hold.crew")
786
+
if count != 1 {
787
+
t.Fatalf("Expected 1 record after create, got %d", count)
788
+
}
789
+
790
+
// Now delete it
791
+
deleteEvent := &RepoEvent{
792
+
Ops: []RepoOp{
793
+
{
794
+
Kind: EvtKindDeleteRecord,
795
+
Collection: "io.atcr.hold.crew",
796
+
Rkey: "testrkey",
797
+
},
798
+
},
799
+
}
800
+
handler(ctx, deleteEvent)
801
+
802
+
// Verify it was removed from index
803
+
count, _ = pds.RecordsIndex().Count("io.atcr.hold.crew")
804
+
if count != 0 {
805
+
t.Errorf("Expected 0 records after delete, got %d", count)
806
+
}
807
+
}
808
+
809
+
// TestHoldPDS_CreateRecordsIndexEventHandler_NilBroadcaster tests with nil broadcaster
810
+
func TestHoldPDS_CreateRecordsIndexEventHandler_NilBroadcaster(t *testing.T) {
811
+
ctx := context.Background()
812
+
tmpDir := t.TempDir()
813
+
dbPath := filepath.Join(tmpDir, "pds.db")
814
+
keyPath := filepath.Join(tmpDir, "signing-key")
815
+
816
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", dbPath, keyPath, false)
817
+
if err != nil {
818
+
t.Fatalf("NewHoldPDS failed: %v", err)
819
+
}
820
+
defer pds.Close()
821
+
822
+
// Create handler with nil broadcaster (should not panic)
823
+
handler := pds.CreateRecordsIndexEventHandler(nil)
824
+
825
+
event := &RepoEvent{
826
+
Ops: []RepoOp{
827
+
{
828
+
Kind: EvtKindCreateRecord,
829
+
Collection: "io.atcr.hold.crew",
830
+
Rkey: "testrkey",
831
+
},
832
+
},
833
+
}
834
+
835
+
// Should not panic
836
+
handler(ctx, event)
837
+
838
+
// Verify record was still indexed
839
+
count, _ := pds.RecordsIndex().Count("io.atcr.hold.crew")
840
+
if count != 1 {
841
+
t.Errorf("Expected 1 indexed record, got %d", count)
842
+
}
843
+
}
844
+
845
+
// TestHoldPDS_BackfillRecordsIndex tests backfilling the records index from MST
846
+
func TestHoldPDS_BackfillRecordsIndex(t *testing.T) {
847
+
ctx := context.Background()
848
+
tmpDir := t.TempDir()
849
+
dbPath := filepath.Join(tmpDir, "pds.db")
850
+
keyPath := filepath.Join(tmpDir, "signing-key")
851
+
852
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", dbPath, keyPath, false)
853
+
if err != nil {
854
+
t.Fatalf("NewHoldPDS failed: %v", err)
855
+
}
856
+
defer pds.Close()
857
+
858
+
// Bootstrap to create some records in MST (captain + crew)
859
+
ownerDID := "did:plc:testowner"
860
+
err = pds.Bootstrap(ctx, nil, ownerDID, true, false, "")
861
+
if err != nil {
862
+
t.Fatalf("Bootstrap failed: %v", err)
863
+
}
864
+
865
+
// Clear the index to simulate out-of-sync state
866
+
_, err = pds.RecordsIndex().db.Exec("DELETE FROM records")
867
+
if err != nil {
868
+
t.Fatalf("Failed to clear index: %v", err)
869
+
}
870
+
871
+
// Verify index is empty
872
+
count, _ := pds.RecordsIndex().TotalCount()
873
+
if count != 0 {
874
+
t.Fatalf("Expected empty index, got %d", count)
875
+
}
876
+
877
+
// Backfill
878
+
err = pds.BackfillRecordsIndex(ctx)
879
+
if err != nil {
880
+
t.Fatalf("BackfillRecordsIndex failed: %v", err)
881
+
}
882
+
883
+
// Verify records were backfilled
884
+
// Bootstrap creates: 1 captain + 1 crew + 1 profile = 3 records
885
+
count, _ = pds.RecordsIndex().TotalCount()
886
+
if count < 2 {
887
+
t.Errorf("Expected at least 2 records after backfill (captain + crew), got %d", count)
888
+
}
889
+
}
890
+
891
+
// TestHoldPDS_BackfillRecordsIndex_NilIndex tests backfill with nil index
892
+
func TestHoldPDS_BackfillRecordsIndex_NilIndex(t *testing.T) {
893
+
ctx := context.Background()
894
+
tmpDir := t.TempDir()
895
+
keyPath := filepath.Join(tmpDir, "signing-key")
896
+
897
+
// Use :memory: to get nil index
898
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", ":memory:", keyPath, false)
899
+
if err != nil {
900
+
t.Fatalf("NewHoldPDS failed: %v", err)
901
+
}
902
+
defer pds.Close()
903
+
904
+
// Backfill should be no-op and not error
905
+
err = pds.BackfillRecordsIndex(ctx)
906
+
if err != nil {
907
+
t.Errorf("BackfillRecordsIndex should not error with nil index, got: %v", err)
908
+
}
909
+
}
910
+
911
+
// TestHoldPDS_BackfillRecordsIndex_SkipsWhenSynced tests backfill skip when already synced
912
+
func TestHoldPDS_BackfillRecordsIndex_SkipsWhenSynced(t *testing.T) {
913
+
ctx := context.Background()
914
+
tmpDir := t.TempDir()
915
+
dbPath := filepath.Join(tmpDir, "pds.db")
916
+
keyPath := filepath.Join(tmpDir, "signing-key")
917
+
918
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", dbPath, keyPath, false)
919
+
if err != nil {
920
+
t.Fatalf("NewHoldPDS failed: %v", err)
921
+
}
922
+
defer pds.Close()
923
+
924
+
// Bootstrap to create records
925
+
err = pds.Bootstrap(ctx, nil, "did:plc:testowner", true, false, "")
926
+
if err != nil {
927
+
t.Fatalf("Bootstrap failed: %v", err)
928
+
}
929
+
930
+
// Backfill once to sync
931
+
err = pds.BackfillRecordsIndex(ctx)
932
+
if err != nil {
933
+
t.Fatalf("First BackfillRecordsIndex failed: %v", err)
934
+
}
935
+
936
+
count1, _ := pds.RecordsIndex().TotalCount()
937
+
938
+
// Backfill again - should skip (counts match)
939
+
err = pds.BackfillRecordsIndex(ctx)
940
+
if err != nil {
941
+
t.Fatalf("Second BackfillRecordsIndex failed: %v", err)
942
+
}
943
+
944
+
count2, _ := pds.RecordsIndex().TotalCount()
945
+
946
+
// Count should be unchanged
947
+
if count1 != count2 {
948
+
t.Errorf("Expected count to remain %d after second backfill, got %d", count1, count2)
949
+
}
950
+
}
+1
-1
pkg/hold/pds/status_test.go
+1
-1
pkg/hold/pds/status_test.go
+132
-34
pkg/hold/pds/xrpc.go
+132
-34
pkg/hold/pds/xrpc.go
···
479
479
// HandleListRecords lists records in a collection
480
480
// Spec: https://docs.bsky.app/docs/api/com-atproto-repo-list-records
481
481
// Supports pagination via limit, cursor, and reverse parameters
482
+
// Uses SQL index for efficient pagination (following official ATProto PDS pattern)
482
483
func (h *XRPCHandler) HandleListRecords(w http.ResponseWriter, r *http.Request) {
483
484
repoDID := r.URL.Query().Get("repo")
484
485
collection := r.URL.Query().Get("collection")
···
507
508
cursor := r.URL.Query().Get("cursor")
508
509
reverse := r.URL.Query().Get("reverse") == "true"
509
510
510
-
// Generic implementation using repo.ForEach
511
+
// Use records index if available (efficient SQL-based pagination)
512
+
if h.pds.recordsIndex != nil {
513
+
h.handleListRecordsIndexed(w, r, collection, limit, cursor, reverse)
514
+
return
515
+
}
516
+
517
+
// Fallback: MST-based listing (legacy path for tests or in-memory mode)
518
+
h.handleListRecordsMST(w, r, collection, limit, cursor, reverse)
519
+
}
520
+
521
+
// handleListRecordsIndexed uses the SQL records index for efficient pagination
522
+
func (h *XRPCHandler) handleListRecordsIndexed(w http.ResponseWriter, r *http.Request, collection string, limit int, cursor string, reverse bool) {
523
+
// Query the index
524
+
indexedRecords, nextCursor, err := h.pds.recordsIndex.ListRecords(collection, limit, cursor, reverse)
525
+
if err != nil {
526
+
slog.Error("Failed to list records from index", "error", err, "collection", collection)
527
+
http.Error(w, fmt.Sprintf("failed to list records: %v", err), http.StatusInternalServerError)
528
+
return
529
+
}
530
+
531
+
// Create session to fetch full record data
511
532
session, err := h.pds.carstore.ReadOnlySession(h.pds.uid)
512
533
if err != nil {
513
534
http.Error(w, fmt.Sprintf("failed to create session: %v", err), http.StatusInternalServerError)
···
534
555
return
535
556
}
536
557
537
-
// Initialize as empty slice (not nil) to ensure JSON encodes as [] not null
558
+
// Fetch full record data for each indexed record
538
559
records := []map[string]any{}
539
-
var nextCursor string
540
-
skipUntilCursor := cursor != ""
560
+
for _, rec := range indexedRecords {
561
+
// Construct the record path
562
+
recordPath := rec.Collection + "/" + rec.Rkey
563
+
564
+
// Get the record bytes
565
+
recordCID, recBytes, err := repoHandle.GetRecordBytes(r.Context(), recordPath)
566
+
if err != nil {
567
+
slog.Warn("Failed to get indexed record, skipping", "path", recordPath, "error", err)
568
+
continue
569
+
}
570
+
571
+
// Decode using lexutil (type registry handles unmarshaling)
572
+
recordValue, err := lexutil.CborDecodeValue(*recBytes)
573
+
if err != nil {
574
+
slog.Warn("Failed to decode indexed record, skipping", "path", recordPath, "error", err)
575
+
continue
576
+
}
577
+
578
+
records = append(records, map[string]any{
579
+
"uri": fmt.Sprintf("at://%s/%s/%s", h.pds.DID(), rec.Collection, rec.Rkey),
580
+
"cid": recordCID.String(),
581
+
"value": recordValue,
582
+
})
583
+
}
584
+
585
+
response := map[string]any{
586
+
"records": records,
587
+
}
588
+
589
+
// Include cursor in response if there are more records
590
+
if nextCursor != "" {
591
+
response["cursor"] = nextCursor
592
+
}
593
+
594
+
w.Header().Set("Content-Type", "application/json")
595
+
json.NewEncoder(w).Encode(response)
596
+
}
597
+
598
+
// handleListRecordsMST uses the legacy MST-based listing (fallback for tests)
599
+
func (h *XRPCHandler) handleListRecordsMST(w http.ResponseWriter, r *http.Request, collection string, limit int, cursor string, reverse bool) {
600
+
// Generic implementation using repo.ForEach
601
+
session, err := h.pds.carstore.ReadOnlySession(h.pds.uid)
602
+
if err != nil {
603
+
http.Error(w, fmt.Sprintf("failed to create session: %v", err), http.StatusInternalServerError)
604
+
return
605
+
}
541
606
542
-
// Iterate over all records in the collection
607
+
head, err := h.pds.carstore.GetUserRepoHead(r.Context(), h.pds.uid)
608
+
if err != nil {
609
+
http.Error(w, fmt.Sprintf("failed to get repo head: %v", err), http.StatusInternalServerError)
610
+
return
611
+
}
612
+
613
+
if !head.Defined() {
614
+
// Empty repo, return empty list
615
+
response := map[string]any{"records": []any{}}
616
+
w.Header().Set("Content-Type", "application/json")
617
+
json.NewEncoder(w).Encode(response)
618
+
return
619
+
}
620
+
621
+
repoHandle, err := repo.OpenRepo(r.Context(), session, head)
622
+
if err != nil {
623
+
http.Error(w, fmt.Sprintf("failed to open repo: %v", err), http.StatusInternalServerError)
624
+
return
625
+
}
626
+
627
+
// Collect all records in the collection first.
628
+
// MST only supports forward iteration, so for newest-first (default) we must
629
+
// collect all records, reverse, then apply cursor/limit.
630
+
allRecords := []map[string]any{}
631
+
543
632
err = repoHandle.ForEach(r.Context(), collection, func(k string, v cid.Cid) error {
544
633
// k is like "io.atcr.hold.captain/self" or "io.atcr.hold.crew/3m3by7msdln22"
545
634
parts := strings.Split(k, "/")
···
552
641
rkey := parts[len(parts)-1]
553
642
554
643
// Filter: only include records that match the requested collection
555
-
// MST keys are sorted lexicographically, so once we hit a different
556
-
// collection prefix, all remaining keys will also be outside our range
557
644
if actualCollection != collection {
558
645
return repo.ErrDoneIterating // Stop walking the tree
559
646
}
560
647
561
-
// Handle cursor-based pagination
562
-
if skipUntilCursor {
563
-
if rkey == cursor {
564
-
skipUntilCursor = false // Found cursor, start including records after this
565
-
}
566
-
return nil // Skip this record
567
-
}
568
-
569
-
// Check if we've hit the limit
570
-
if len(records) >= limit {
571
-
// Set next cursor to current rkey
572
-
nextCursor = rkey
573
-
return repo.ErrDoneIterating // Stop iteration
574
-
}
575
-
576
648
// Get the record bytes
577
649
recordCID, recBytes, err := repoHandle.GetRecordBytes(r.Context(), k)
578
650
if err != nil {
···
585
657
return fmt.Errorf("failed to decode record: %v", err)
586
658
}
587
659
588
-
records = append(records, map[string]any{
660
+
allRecords = append(allRecords, map[string]any{
589
661
"uri": fmt.Sprintf("at://%s/%s/%s", h.pds.DID(), actualCollection, rkey),
590
662
"cid": recordCID.String(),
591
663
"value": recordValue,
664
+
"rkey": rkey,
592
665
})
593
666
return nil
594
667
})
595
668
596
669
if err != nil {
597
-
// ErrDoneIterating is expected when we stop walking early (reached collection boundary or hit limit)
598
-
// Check using strings.Contains because the error may be wrapped
599
670
if err == repo.ErrDoneIterating || strings.Contains(err.Error(), "done iterating") {
600
-
// Successfully stopped at collection boundary or hit pagination limit, continue with collected records
671
+
// Successfully stopped at collection boundary
601
672
} else if strings.Contains(err.Error(), "not found") {
602
-
// If the collection doesn't exist yet, return empty list
603
-
records = []map[string]any{}
673
+
allRecords = []map[string]any{}
604
674
} else {
605
675
http.Error(w, fmt.Sprintf("failed to list records: %v", err), http.StatusInternalServerError)
606
676
return
607
677
}
608
678
}
609
679
610
-
// Default order is newest-first (reverse chronological), which requires
611
-
// reversing the MST's lexicographic order. When reverse=true, keep MST order.
612
-
if !reverse && len(records) > 0 {
613
-
for i, j := 0, len(records)-1; i < j; i, j = i+1, j-1 {
614
-
records[i], records[j] = records[j], records[i]
680
+
// Default order is newest-first (reverse chronological).
681
+
// MST iterates oldest-first, so reverse for default order.
682
+
if !reverse && len(allRecords) > 0 {
683
+
for i, j := 0, len(allRecords)-1; i < j; i, j = i+1, j-1 {
684
+
allRecords[i], allRecords[j] = allRecords[j], allRecords[i]
685
+
}
686
+
}
687
+
688
+
// Apply cursor and limit
689
+
records := []map[string]any{}
690
+
var nextCursor string
691
+
skipUntilCursor := cursor != ""
692
+
693
+
for _, rec := range allRecords {
694
+
rkey := rec["rkey"].(string)
695
+
696
+
if skipUntilCursor {
697
+
if rkey == cursor {
698
+
skipUntilCursor = false
699
+
}
700
+
continue
615
701
}
702
+
703
+
if len(records) >= limit {
704
+
nextCursor = rkey
705
+
break
706
+
}
707
+
708
+
delete(rec, "rkey")
709
+
records = append(records, rec)
710
+
}
711
+
712
+
if skipUntilCursor {
713
+
records = []map[string]any{}
714
+
nextCursor = ""
616
715
}
617
716
618
717
response := map[string]any{
619
718
"records": records,
620
719
}
621
720
622
-
// Include cursor in response if there are more records
623
721
if nextCursor != "" {
624
722
response["cursor"] = nextCursor
625
723
}
+296
pkg/hold/pds/xrpc_test.go
+296
pkg/hold/pds/xrpc_test.go
···
29
29
30
30
// setupTestXRPCHandler creates a fresh PDS instance and handler for each test
31
31
// Bootstraps the PDS and suppresses logging to avoid log spam
32
+
// Uses :memory: database which disables RecordsIndex (uses MST fallback path)
32
33
func setupTestXRPCHandler(t *testing.T) (*XRPCHandler, context.Context) {
33
34
t.Helper()
34
35
···
72
73
mockClient := &mockPDSClient{}
73
74
74
75
// Create mock s3 service and storage driver (not needed for most PDS tests)
76
+
mockS3 := s3.S3Service{}
77
+
78
+
// Create XRPC handler with mock HTTP client
79
+
handler := NewXRPCHandler(pds, mockS3, nil, nil, mockClient)
80
+
81
+
return handler, ctx
82
+
}
83
+
84
+
// setupTestXRPCHandlerWithIndex creates a handler with file-based database
85
+
// to enable RecordsIndex (vs :memory: which disables it)
86
+
func setupTestXRPCHandlerWithIndex(t *testing.T) (*XRPCHandler, context.Context) {
87
+
t.Helper()
88
+
89
+
ctx := context.Background()
90
+
tmpDir := t.TempDir()
91
+
92
+
// Use file-based database to enable RecordsIndex
93
+
dbPath := filepath.Join(tmpDir, "pds.db")
94
+
keyPath := filepath.Join(tmpDir, "signing-key")
95
+
96
+
// Copy shared signing key instead of generating a new one
97
+
if err := os.WriteFile(keyPath, sharedTestKey, 0600); err != nil {
98
+
t.Fatalf("Failed to copy shared signing key: %v", err)
99
+
}
100
+
101
+
pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", dbPath, keyPath, false)
102
+
if err != nil {
103
+
t.Fatalf("Failed to create test PDS: %v", err)
104
+
}
105
+
106
+
// Verify RecordsIndex is enabled
107
+
if pds.RecordsIndex() == nil {
108
+
t.Fatal("Expected RecordsIndex to be non-nil for file-based database")
109
+
}
110
+
111
+
// Bootstrap with a test owner, suppressing stdout to avoid log spam
112
+
ownerDID := "did:plc:testowner123"
113
+
114
+
// Redirect stdout to suppress bootstrap logging
115
+
oldStdout := os.Stdout
116
+
r, w, _ := os.Pipe()
117
+
os.Stdout = w
118
+
119
+
err = pds.Bootstrap(ctx, nil, ownerDID, true, false, "")
120
+
121
+
// Restore stdout
122
+
w.Close()
123
+
os.Stdout = oldStdout
124
+
io.ReadAll(r) // Drain the pipe
125
+
126
+
if err != nil {
127
+
t.Fatalf("Failed to bootstrap PDS: %v", err)
128
+
}
129
+
130
+
// Wire up records indexing event handler
131
+
indexingHandler := pds.CreateRecordsIndexEventHandler(nil)
132
+
pds.RepomgrRef().SetEventHandler(indexingHandler, true)
133
+
134
+
// Backfill index from MST (bootstrap created records but didn't index them)
135
+
if err := pds.BackfillRecordsIndex(ctx); err != nil {
136
+
t.Fatalf("Failed to backfill records index: %v", err)
137
+
}
138
+
139
+
// Create mock PDS client for DPoP validation
140
+
mockClient := &mockPDSClient{}
141
+
142
+
// Create mock s3 service and storage driver
75
143
mockS3 := s3.S3Service{}
76
144
77
145
// Create XRPC handler with mock HTTP client
···
744
812
t.Errorf("Expected status 400, got %d", w.Code)
745
813
}
746
814
})
815
+
}
816
+
}
817
+
818
+
// Tests for HandleListRecords with RecordsIndex (indexed path)
819
+
// These tests use file-based database to enable the SQL-based indexing
820
+
821
+
// TestHandleListRecords_Indexed tests listing with RecordsIndex enabled
822
+
func TestHandleListRecords_Indexed(t *testing.T) {
823
+
handler, ctx := setupTestXRPCHandlerWithIndex(t)
824
+
holdDID := "did:web:hold.example.com"
825
+
826
+
// Add crew members (will be indexed via event handler)
827
+
memberDIDs := []string{
828
+
"did:plc:member1",
829
+
"did:plc:member2",
830
+
"did:plc:member3",
831
+
}
832
+
833
+
for _, did := range memberDIDs {
834
+
_, err := handler.pds.AddCrewMember(ctx, did, "reader", []string{"blob:read"})
835
+
if err != nil {
836
+
t.Fatalf("Failed to add crew member %s: %v", did, err)
837
+
}
838
+
}
839
+
840
+
// Test listing crew records via indexed path
841
+
req := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{
842
+
"repo": holdDID,
843
+
"collection": atproto.CrewCollection,
844
+
})
845
+
w := httptest.NewRecorder()
846
+
847
+
handler.HandleListRecords(w, req)
848
+
849
+
result := assertJSONResponse(t, w, http.StatusOK)
850
+
851
+
// Should have 4 crew records: 1 from bootstrap + 3 added
852
+
expectedCount := len(memberDIDs) + 1
853
+
if records, ok := result["records"].([]any); !ok {
854
+
t.Error("Expected records array in response")
855
+
} else if len(records) != expectedCount {
856
+
t.Errorf("Expected %d crew records, got %d", expectedCount, len(records))
857
+
} else {
858
+
// Verify each record has required fields
859
+
for i, rec := range records {
860
+
record, ok := rec.(map[string]any)
861
+
if !ok {
862
+
t.Errorf("Record %d: expected map, got %T", i, rec)
863
+
continue
864
+
}
865
+
866
+
if uri, ok := record["uri"].(string); !ok || uri == "" {
867
+
t.Errorf("Record %d: expected uri string", i)
868
+
}
869
+
870
+
if cid, ok := record["cid"].(string); !ok || cid == "" {
871
+
t.Errorf("Record %d: expected cid string", i)
872
+
}
873
+
874
+
if value, ok := record["value"].(map[string]any); !ok {
875
+
t.Errorf("Record %d: expected value object", i)
876
+
} else {
877
+
if recordType, ok := value["$type"].(string); !ok || recordType != atproto.CrewCollection {
878
+
t.Errorf("Record %d: expected $type=%s, got %v", i, atproto.CrewCollection, value["$type"])
879
+
}
880
+
}
881
+
}
882
+
}
883
+
}
884
+
885
+
// TestHandleListRecords_Indexed_Pagination tests pagination with indexed path
886
+
func TestHandleListRecords_Indexed_Pagination(t *testing.T) {
887
+
handler, ctx := setupTestXRPCHandlerWithIndex(t)
888
+
holdDID := "did:web:hold.example.com"
889
+
890
+
// Add 4 more crew members for total of 5
891
+
for i := 0; i < 4; i++ {
892
+
_, err := handler.pds.AddCrewMember(ctx, fmt.Sprintf("did:plc:member%d", i), "reader", []string{"blob:read"})
893
+
if err != nil {
894
+
t.Fatalf("Failed to add crew member: %v", err)
895
+
}
896
+
}
897
+
898
+
// Test with limit=2
899
+
req := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{
900
+
"repo": holdDID,
901
+
"collection": atproto.CrewCollection,
902
+
"limit": "2",
903
+
})
904
+
w := httptest.NewRecorder()
905
+
906
+
handler.HandleListRecords(w, req)
907
+
908
+
result := assertJSONResponse(t, w, http.StatusOK)
909
+
910
+
// Verify we got exactly 2 records
911
+
records, ok := result["records"].([]any)
912
+
if !ok {
913
+
t.Fatal("Expected records array in response")
914
+
}
915
+
916
+
if len(records) != 2 {
917
+
t.Errorf("Expected 2 records with limit=2, got %d", len(records))
918
+
}
919
+
920
+
// Verify cursor is present (there are more records)
921
+
cursor, ok := result["cursor"].(string)
922
+
if !ok || cursor == "" {
923
+
t.Fatal("Expected cursor in response when there are more records")
924
+
}
925
+
926
+
// Test pagination with cursor
927
+
req2 := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{
928
+
"repo": holdDID,
929
+
"collection": atproto.CrewCollection,
930
+
"limit": "2",
931
+
"cursor": cursor,
932
+
})
933
+
w2 := httptest.NewRecorder()
934
+
935
+
handler.HandleListRecords(w2, req2)
936
+
937
+
result2 := assertJSONResponse(t, w2, http.StatusOK)
938
+
939
+
records2, ok := result2["records"].([]any)
940
+
if !ok {
941
+
t.Fatal("Expected records array in paginated response")
942
+
}
943
+
944
+
// Should get the next page of records
945
+
if len(records2) == 0 {
946
+
t.Error("Expected records in paginated response")
947
+
}
948
+
949
+
// Verify no duplicates
950
+
seen := make(map[string]bool)
951
+
for _, r := range records {
952
+
rec := r.(map[string]any)
953
+
uri := rec["uri"].(string)
954
+
seen[uri] = true
955
+
}
956
+
for _, r := range records2 {
957
+
rec := r.(map[string]any)
958
+
uri := rec["uri"].(string)
959
+
if seen[uri] {
960
+
t.Errorf("Duplicate record in pagination: %s", uri)
961
+
}
962
+
}
963
+
}
964
+
965
+
// TestHandleListRecords_Indexed_Reverse tests reverse ordering with indexed path
966
+
func TestHandleListRecords_Indexed_Reverse(t *testing.T) {
967
+
handler, ctx := setupTestXRPCHandlerWithIndex(t)
968
+
holdDID := "did:web:hold.example.com"
969
+
970
+
// Add crew members
971
+
for i := 0; i < 3; i++ {
972
+
_, err := handler.pds.AddCrewMember(ctx, fmt.Sprintf("did:plc:member%d", i), "reader", []string{"blob:read"})
973
+
if err != nil {
974
+
t.Fatalf("Failed to add crew member: %v", err)
975
+
}
976
+
}
977
+
978
+
// Get normal order (default = newest first)
979
+
req1 := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{
980
+
"repo": holdDID,
981
+
"collection": atproto.CrewCollection,
982
+
})
983
+
w1 := httptest.NewRecorder()
984
+
handler.HandleListRecords(w1, req1)
985
+
result1 := assertJSONResponse(t, w1, http.StatusOK)
986
+
987
+
// Get reverse order (oldest first)
988
+
req2 := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{
989
+
"repo": holdDID,
990
+
"collection": atproto.CrewCollection,
991
+
"reverse": "true",
992
+
})
993
+
w2 := httptest.NewRecorder()
994
+
handler.HandleListRecords(w2, req2)
995
+
result2 := assertJSONResponse(t, w2, http.StatusOK)
996
+
997
+
records1 := result1["records"].([]any)
998
+
records2 := result2["records"].([]any)
999
+
1000
+
if len(records1) != len(records2) {
1001
+
t.Fatalf("Expected same number of records, got %d vs %d", len(records1), len(records2))
1002
+
}
1003
+
1004
+
if len(records1) > 1 {
1005
+
// First record in normal order should be last in reverse order
1006
+
first1 := records1[0].(map[string]any)["uri"].(string)
1007
+
last2 := records2[len(records2)-1].(map[string]any)["uri"].(string)
1008
+
1009
+
if first1 != last2 {
1010
+
t.Error("Expected first record in default order to be last in reverse order")
1011
+
}
1012
+
}
1013
+
}
1014
+
1015
+
// TestHandleListRecords_Indexed_EmptyCollection tests empty collection with indexed path
1016
+
func TestHandleListRecords_Indexed_EmptyCollection(t *testing.T) {
1017
+
handler, _ := setupTestXRPCHandlerWithIndex(t)
1018
+
holdDID := "did:web:hold.example.com"
1019
+
1020
+
// List a collection that doesn't exist
1021
+
req := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{
1022
+
"repo": holdDID,
1023
+
"collection": "io.atcr.nonexistent",
1024
+
})
1025
+
w := httptest.NewRecorder()
1026
+
1027
+
handler.HandleListRecords(w, req)
1028
+
1029
+
result := assertJSONResponse(t, w, http.StatusOK)
1030
+
1031
+
records, ok := result["records"].([]any)
1032
+
if !ok {
1033
+
t.Fatal("Expected records array in response")
1034
+
}
1035
+
1036
+
if len(records) != 0 {
1037
+
t.Errorf("Expected 0 records for empty collection, got %d", len(records))
1038
+
}
1039
+
1040
+
// Should not have cursor for empty results
1041
+
if _, ok := result["cursor"]; ok {
1042
+
t.Error("Expected no cursor for empty collection")
747
1043
}
748
1044
}
749
1045