A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

add ability to toggle debug. refactor hold pds logic to allow crew record lookups by rkey rather than a list

evan.jarrett.net e0a2dda1 482d921c

verified
Changed files
+638 -130
deploy
lexicons
io
atcr
hold
pkg
+20
deploy/README.md
··· 243 243 docker logs -f atcr-appview 244 244 ``` 245 245 246 + #### Enable debug logging 247 + 248 + Toggle debug logging at runtime without restarting the container: 249 + 250 + ```bash 251 + # Enable debug logging (auto-reverts after 30 minutes) 252 + docker kill -s SIGUSR1 atcr-appview 253 + docker kill -s SIGUSR1 atcr-hold 254 + 255 + # Manually disable before timeout 256 + docker kill -s SIGUSR1 atcr-appview 257 + ``` 258 + 259 + When toggled, you'll see: 260 + ``` 261 + level=INFO msg="Log level changed" from=INFO to=DEBUG trigger=SIGUSR1 auto_revert_in=30m0s 262 + ``` 263 + 264 + **Note:** Despite the command name, `docker kill -s SIGUSR1` does NOT stop the container. It sends a user-defined signal that the application handles to toggle debug mode. 265 + 246 266 #### Restart services 247 267 248 268 ```bash
+52
lexicons/io/atcr/hold/stats.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "io.atcr.hold.stats", 4 + "defs": { 5 + "main": { 6 + "type": "record", 7 + "key": "any", 8 + "description": "Repository statistics stored in the hold's embedded PDS. Tracks pull/push counts per owner+repository combination. Record key is deterministic: base32(sha256(ownerDID + \"/\" + repository)[:16]).", 9 + "record": { 10 + "type": "object", 11 + "required": ["ownerDid", "repository", "pullCount", "pushCount", "updatedAt"], 12 + "properties": { 13 + "ownerDid": { 14 + "type": "string", 15 + "format": "did", 16 + "description": "DID of the image owner (e.g., did:plc:xyz123)" 17 + }, 18 + "repository": { 19 + "type": "string", 20 + "description": "Repository name (e.g., myapp)", 21 + "maxLength": 256 22 + }, 23 + "pullCount": { 24 + "type": "integer", 25 + "minimum": 0, 26 + "description": "Number of manifest downloads" 27 + }, 28 + "pushCount": { 29 + "type": "integer", 30 + "minimum": 0, 31 + "description": "Number of manifest uploads" 32 + }, 33 + "lastPull": { 34 + "type": "string", 35 + "format": "datetime", 36 + "description": "RFC3339 timestamp of last pull" 37 + }, 38 + "lastPush": { 39 + "type": "string", 40 + "format": "datetime", 41 + "description": "RFC3339 timestamp of last push" 42 + }, 43 + "updatedAt": { 44 + "type": "string", 45 + "format": "datetime", 46 + "description": "RFC3339 timestamp of when this record was last updated" 47 + } 48 + } 49 + } 50 + } 51 + } 52 + }
+10
pkg/atproto/lexicon.go
··· 665 665 return strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash[:16])) 666 666 } 667 667 668 + // CrewRecordKey generates a deterministic rkey from member DID 669 + // Uses same pattern as StatsRecordKey for consistency 670 + // This enables O(1) crew membership lookups via getRecord instead of O(n) pagination 671 + func CrewRecordKey(memberDID string) string { 672 + hash := sha256.Sum256([]byte(memberDID)) 673 + // Use first 16 bytes (128 bits) for collision resistance 674 + // Encode with base32 (alphanumeric, lowercase, no padding) for ATProto rkey compatibility 675 + return strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash[:16])) 676 + } 677 + 668 678 // TangledProfileRecord represents a Tangled profile for the hold 669 679 // Collection: sh.tangled.actor.profile (singleton record at rkey "self") 670 680 // Stored in the hold's embedded PDS
+44 -56
pkg/auth/hold_remote.go
··· 324 324 } 325 325 326 326 // isCrewMemberNoCache queries XRPC without caching (internal helper) 327 - // Handles pagination to check all crew records, not just the first page 327 + // Uses O(1) lookup via getRecord with hash-based rkey instead of pagination 328 328 func (a *RemoteHoldAuthorizer) isCrewMemberNoCache(ctx context.Context, holdDID, userDID string) (bool, error) { 329 329 // Resolve DID to URL 330 330 holdURL := atproto.ResolveHoldURL(holdDID) 331 331 332 - // Paginate through all crew records 333 - cursor := "" 334 - for { 335 - // Build XRPC request URL with pagination 336 - // GET /xrpc/com.atproto.repo.listRecords?repo={did}&collection=io.atcr.hold.crew&limit=100 337 - xrpcURL := fmt.Sprintf("%s%s?repo=%s&collection=%s&limit=100", 338 - holdURL, atproto.RepoListRecords, url.QueryEscape(holdDID), url.QueryEscape(atproto.CrewCollection)) 339 - if cursor != "" { 340 - xrpcURL += "&cursor=" + url.QueryEscape(cursor) 341 - } 332 + // Generate deterministic rkey from member DID (hash-based) 333 + rkey := atproto.CrewRecordKey(userDID) 342 334 343 - req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 344 - if err != nil { 345 - return false, err 346 - } 335 + // Build XRPC request URL for direct record lookup 336 + // GET /xrpc/com.atproto.repo.getRecord?repo={did}&collection=io.atcr.hold.crew&rkey={hash} 337 + xrpcURL := fmt.Sprintf("%s%s?repo=%s&collection=%s&rkey=%s", 338 + holdURL, atproto.RepoGetRecord, url.QueryEscape(holdDID), url.QueryEscape(atproto.CrewCollection), url.QueryEscape(rkey)) 347 339 348 - resp, err := a.httpClient.Do(req) 349 - if err != nil { 350 - return false, fmt.Errorf("XRPC request failed: %w", err) 351 - } 340 + req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 341 + if err != nil { 342 + return false, err 343 + } 352 344 353 - if resp.StatusCode != http.StatusOK { 354 - body, _ := io.ReadAll(resp.Body) 355 - resp.Body.Close() 356 - return false, fmt.Errorf("XRPC request failed: status %d: %s", resp.StatusCode, string(body)) 357 - } 345 + resp, err := a.httpClient.Do(req) 346 + if err != nil { 347 + return false, fmt.Errorf("XRPC request failed: %w", err) 348 + } 349 + defer resp.Body.Close() 358 350 359 - // Parse response 360 - var xrpcResp struct { 361 - Cursor string `json:"cursor"` 362 - Records []struct { 363 - URI string `json:"uri"` 364 - CID string `json:"cid"` 365 - Value struct { 366 - Type string `json:"$type"` 367 - Member string `json:"member"` 368 - Role string `json:"role"` 369 - Permissions []string `json:"permissions"` 370 - AddedAt string `json:"addedAt"` 371 - } `json:"value"` 372 - } `json:"records"` 373 - } 351 + // 404 means not a crew member (record doesn't exist) 352 + if resp.StatusCode == http.StatusNotFound { 353 + return false, nil 354 + } 374 355 375 - if err := json.NewDecoder(resp.Body).Decode(&xrpcResp); err != nil { 376 - resp.Body.Close() 377 - return false, fmt.Errorf("failed to decode XRPC response: %w", err) 378 - } 379 - resp.Body.Close() 356 + if resp.StatusCode != http.StatusOK { 357 + body, _ := io.ReadAll(resp.Body) 358 + return false, fmt.Errorf("XRPC request failed: status %d: %s", resp.StatusCode, string(body)) 359 + } 380 360 381 - // Check if userDID is in this page of crew records 382 - for _, record := range xrpcResp.Records { 383 - if record.Value.Member == userDID { 384 - // TODO: Check expiration if set 385 - return true, nil 386 - } 387 - } 361 + // Parse response to verify the member DID matches 362 + var xrpcResp struct { 363 + URI string `json:"uri"` 364 + CID string `json:"cid"` 365 + Value struct { 366 + Type string `json:"$type"` 367 + Member string `json:"member"` 368 + Role string `json:"role"` 369 + Permissions []string `json:"permissions"` 370 + AddedAt string `json:"addedAt"` 371 + } `json:"value"` 372 + } 388 373 389 - // Check if there are more pages 390 - if xrpcResp.Cursor == "" || len(xrpcResp.Records) == 0 { 391 - break 392 - } 393 - cursor = xrpcResp.Cursor 374 + if err := json.NewDecoder(resp.Body).Decode(&xrpcResp); err != nil { 375 + return false, fmt.Errorf("failed to decode XRPC response: %w", err) 394 376 } 395 377 378 + // Verify the member DID matches (sanity check) 379 + if xrpcResp.Value.Member == userDID { 380 + return true, nil 381 + } 382 + 383 + // Hash collision or invalid record - treat as not a member 396 384 return false, nil 397 385 } 398 386
+6 -1
pkg/hold/admin/static/css/admin.css
··· 76 76 77 77 /* Container */ 78 78 .container { 79 - max-width: 1200px; 79 + max-width: 1600px; 80 80 margin: 0 auto; 81 81 padding: 2rem; 82 82 } ··· 299 299 } 300 300 301 301 .tier-limit { 302 + color: var(--gray-500); 303 + } 304 + 305 + .date-cell { 306 + white-space: nowrap; 302 307 color: var(--gray-500); 303 308 } 304 309
+2
pkg/hold/admin/templates/pages/crew.html
··· 34 34 <th>Permissions</th> 35 35 <th>Tier</th> 36 36 <th>Usage</th> 37 + <th>Added</th> 37 38 <th class="actions-header">Actions</th> 38 39 </tr> 39 40 </thead> ··· 56 57 <small>{{.UsagePercent}}%</small> 57 58 </div> 58 59 </td> 60 + <td class="date-cell">{{formatTime .AddedAt}}</td> 59 61 <td class="actions"> 60 62 <a href="/admin/crew/{{.RKey}}" class="btn btn-icon" title="Edit"> 61 63 <i data-lucide="pencil"></i>
+127 -35
pkg/hold/pds/crew.go
··· 5 5 "context" 6 6 "errors" 7 7 "fmt" 8 + "log/slog" 8 9 "strings" 9 10 "time" 10 11 ··· 14 15 ) 15 16 16 17 // AddCrewMember adds a new crew member to the hold and commits to carstore 18 + // Uses deterministic rkey based on member DID hash for O(1) lookups and automatic deduplication 19 + // If the member already exists, updates their record (upsert behavior) 17 20 func (p *HoldPDS) AddCrewMember(ctx context.Context, memberDID, role string, permissions []string) (cid.Cid, error) { 18 21 crewRecord := &atproto.CrewRecord{ 19 22 Type: atproto.CrewCollection, ··· 23 26 AddedAt: time.Now().Format(time.RFC3339), 24 27 } 25 28 26 - // Use repomgr for crew operations - auto-generated rkey is fine 27 - _, recordCID, err := p.repomgr.CreateRecord(ctx, p.uid, atproto.CrewCollection, crewRecord) 29 + // Use deterministic rkey based on member DID hash 30 + // UpsertRecord handles create-or-update automatically 31 + rkey := atproto.CrewRecordKey(memberDID) 32 + _, recordCID, _, err := p.repomgr.UpsertRecord(ctx, p.uid, atproto.CrewCollection, rkey, crewRecord) 28 33 if err != nil { 29 - return cid.Undef, fmt.Errorf("failed to create crew record: %w", err) 34 + return cid.Undef, fmt.Errorf("failed to upsert crew record: %w", err) 30 35 } 31 36 32 37 return recordCID, nil ··· 47 52 } 48 53 49 54 return recordCID, crewRecord, nil 55 + } 56 + 57 + // GetCrewMemberByDID retrieves a crew member by their DID using O(1) lookup 58 + // Uses deterministic rkey based on member DID hash 59 + func (p *HoldPDS) GetCrewMemberByDID(ctx context.Context, memberDID string) (cid.Cid, *atproto.CrewRecord, error) { 60 + rkey := atproto.CrewRecordKey(memberDID) 61 + return p.GetCrewMember(ctx, rkey) 50 62 } 51 63 52 64 // CrewMemberWithKey pairs a crew record with its rkey and CID ··· 138 150 return crew, nil 139 151 } 140 152 141 - // RemoveCrewMember removes a crew member 153 + // RemoveCrewMember removes a crew member by rkey 142 154 func (p *HoldPDS) RemoveCrewMember(ctx context.Context, rkey string) error { 143 155 // Use repomgr.DeleteRecord - it will automatically commit! 144 156 // This fixes the bug where deletions weren't being committed ··· 150 162 return nil 151 163 } 152 164 165 + // RemoveCrewMemberByDID removes a crew member by their DID using O(1) lookup 166 + func (p *HoldPDS) RemoveCrewMemberByDID(ctx context.Context, memberDID string) error { 167 + rkey := atproto.CrewRecordKey(memberDID) 168 + return p.RemoveCrewMember(ctx, rkey) 169 + } 170 + 153 171 // UpdateCrewMemberTier updates a crew member's tier 154 - // Since ATProto records are immutable, this finds the member's record by DID, 155 - // deletes it, and recreates it with the new tier value. 172 + // Uses O(1) lookup via hash-based rkey and PutRecord for atomic upsert 156 173 func (p *HoldPDS) UpdateCrewMemberTier(ctx context.Context, memberDID, tier string) error { 157 - // Find the crew member's record by iterating over crew records 158 - members, err := p.ListCrewMembers(ctx) 174 + // O(1) lookup using hash-based rkey 175 + _, existing, err := p.GetCrewMemberByDID(ctx, memberDID) 159 176 if err != nil { 160 - return fmt.Errorf("failed to list crew members: %w", err) 161 - } 162 - 163 - // Find the member with matching DID 164 - var targetMember *CrewMemberWithKey 165 - for _, m := range members { 166 - if m.Record.Member == memberDID { 167 - targetMember = m 168 - break 169 - } 170 - } 171 - 172 - if targetMember == nil { 173 - return fmt.Errorf("crew member not found: %s", memberDID) 177 + return fmt.Errorf("crew member not found: %w", err) 174 178 } 175 179 176 180 // If tier is already the same, no update needed 177 - if targetMember.Record.Tier == tier { 181 + if existing.Tier == tier { 178 182 return nil 179 183 } 180 184 181 - // Delete the old record 182 - if err := p.RemoveCrewMember(ctx, targetMember.Rkey); err != nil { 183 - return fmt.Errorf("failed to remove old crew record: %w", err) 184 - } 185 - 186 - // Create new record with updated tier 185 + // Create updated record (PutRecord handles upsert with same rkey) 187 186 newRecord := &atproto.CrewRecord{ 188 187 Type: atproto.CrewCollection, 189 - Member: targetMember.Record.Member, 190 - Role: targetMember.Record.Role, 191 - Permissions: targetMember.Record.Permissions, 188 + Member: existing.Member, 189 + Role: existing.Role, 190 + Permissions: existing.Permissions, 192 191 Tier: tier, 193 - AddedAt: targetMember.Record.AddedAt, // Preserve original add time 192 + AddedAt: existing.AddedAt, // Preserve original add time 194 193 } 195 194 196 - _, _, err = p.repomgr.CreateRecord(ctx, p.uid, atproto.CrewCollection, newRecord) 195 + rkey := atproto.CrewRecordKey(memberDID) 196 + _, _, err = p.repomgr.PutRecord(ctx, p.uid, atproto.CrewCollection, rkey, newRecord) 197 197 if err != nil { 198 - return fmt.Errorf("failed to create updated crew record: %w", err) 198 + return fmt.Errorf("failed to update crew record: %w", err) 199 199 } 200 200 201 201 return nil 202 202 } 203 + 204 + // TODO(crew-migration): Remove this migration code after all holds have been upgraded (added 2026-01-06) 205 + // This migrates TID-based crew records to hash-based rkeys for O(1) lookups 206 + 207 + // MigrateCrewRecordsToHashRkeys migrates old TID-based crew records to hash-based rkeys 208 + // This is idempotent - records that already have hash-based rkeys are skipped 209 + // Returns the number of records migrated 210 + func (p *HoldPDS) MigrateCrewRecordsToHashRkeys(ctx context.Context) (int, error) { 211 + // List all crew members (includes both TID and hash-based rkeys) 212 + members, err := p.ListCrewMembers(ctx) 213 + if err != nil { 214 + return 0, fmt.Errorf("failed to list crew members: %w", err) 215 + } 216 + 217 + slog.Info("Starting crew record migration", "totalRecords", len(members)) 218 + 219 + migrated := 0 220 + duplicatesDeleted := 0 221 + alreadyHashBased := 0 222 + seen := make(map[string]bool) // Track seen member DIDs to handle duplicates 223 + 224 + for _, m := range members { 225 + memberDID := m.Record.Member 226 + expectedRkey := atproto.CrewRecordKey(memberDID) 227 + 228 + // Skip if already using hash-based rkey 229 + if m.Rkey == expectedRkey { 230 + seen[memberDID] = true 231 + alreadyHashBased++ 232 + continue 233 + } 234 + 235 + // This is a TID-based record that needs migration 236 + slog.Info("Migrating crew record to hash-based rkey", 237 + "memberDID", memberDID, 238 + "oldRkey", m.Rkey, 239 + "newRkey", expectedRkey) 240 + 241 + // Check if we already have a hash-based record for this DID (duplicate handling) 242 + if seen[memberDID] { 243 + // Already migrated this DID, just delete the old TID record 244 + slog.Info("Deleting duplicate TID-based crew record", 245 + "memberDID", memberDID, 246 + "rkey", m.Rkey) 247 + if err := p.RemoveCrewMember(ctx, m.Rkey); err != nil { 248 + slog.Warn("Failed to delete duplicate crew record", 249 + "rkey", m.Rkey, 250 + "error", err) 251 + } else { 252 + duplicatesDeleted++ 253 + } 254 + continue 255 + } 256 + 257 + // Create new record with hash-based rkey (PutRecord handles upsert) 258 + newRecord := &atproto.CrewRecord{ 259 + Type: atproto.CrewCollection, 260 + Member: m.Record.Member, 261 + Role: m.Record.Role, 262 + Permissions: m.Record.Permissions, 263 + Tier: m.Record.Tier, 264 + AddedAt: m.Record.AddedAt, 265 + } 266 + 267 + _, _, err := p.repomgr.PutRecord(ctx, p.uid, atproto.CrewCollection, expectedRkey, newRecord) 268 + if err != nil { 269 + slog.Error("Failed to create hash-based crew record", 270 + "memberDID", memberDID, 271 + "error", err) 272 + continue 273 + } 274 + 275 + // Delete the old TID-based record 276 + if err := p.RemoveCrewMember(ctx, m.Rkey); err != nil { 277 + slog.Warn("Failed to delete old TID-based crew record", 278 + "rkey", m.Rkey, 279 + "error", err) 280 + // Continue anyway - the new record is created 281 + } 282 + 283 + seen[memberDID] = true 284 + migrated++ 285 + } 286 + 287 + slog.Info("Crew record migration complete", 288 + "migrated", migrated, 289 + "duplicatesDeleted", duplicatesDeleted, 290 + "alreadyHashBased", alreadyHashBased, 291 + "totalRecords", len(members)) 292 + 293 + return migrated, nil 294 + }
+127 -12
pkg/hold/pds/records.go
··· 1 1 package pds 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "database/sql" 6 7 "fmt" 7 8 "log/slog" 8 9 "strings" 9 10 11 + "atcr.io/pkg/atproto" 10 12 "github.com/bluesky-social/indigo/repo" 11 13 "github.com/ipfs/go-cid" 12 14 _ "github.com/mattn/go-sqlite3" ··· 24 26 Collection string 25 27 Rkey string 26 28 Cid string 29 + Did string // Associated DID (member for crew, userDid for layers, ownerDid for stats) 27 30 } 28 31 29 32 const recordsSchema = ` ··· 31 34 collection TEXT NOT NULL, 32 35 rkey TEXT NOT NULL, 33 36 cid TEXT NOT NULL, 37 + did TEXT, 34 38 PRIMARY KEY (collection, rkey) 35 39 ); 36 40 CREATE INDEX IF NOT EXISTS idx_records_collection_rkey ON records(collection, rkey); 41 + CREATE INDEX IF NOT EXISTS idx_records_collection_did ON records(collection, did); 37 42 ` 38 43 44 + // Schema version for migration detection 45 + const recordsSchemaVersion = 2 46 + 39 47 // NewRecordsIndex creates or opens a records index 48 + // If the schema is outdated (missing did column), drops and rebuilds the table 40 49 func NewRecordsIndex(dbPath string) (*RecordsIndex, error) { 41 50 db, err := sql.Open("sqlite3", dbPath) 42 51 if err != nil { 43 52 return nil, fmt.Errorf("failed to open records database: %w", err) 44 53 } 45 54 55 + // Check if table exists and has the did column 56 + needsRebuild := false 57 + var tableName string 58 + err = db.QueryRow(`SELECT name FROM sqlite_master WHERE type='table' AND name='records'`).Scan(&tableName) 59 + if err == nil { 60 + // Table exists, check for did column 61 + var colCount int 62 + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('records') WHERE name='did'`).Scan(&colCount) 63 + if err != nil || colCount == 0 { 64 + needsRebuild = true 65 + slog.Info("Records index schema outdated, rebuilding with did column") 66 + } 67 + } 68 + 69 + if needsRebuild { 70 + // Drop old table 71 + _, err = db.Exec(`DROP TABLE IF EXISTS records`) 72 + if err != nil { 73 + db.Close() 74 + return nil, fmt.Errorf("failed to drop old records table: %w", err) 75 + } 76 + } 77 + 46 78 // Create schema 47 79 _, err = db.Exec(recordsSchema) 48 80 if err != nil { ··· 62 94 } 63 95 64 96 // IndexRecord adds or updates a record in the index 65 - func (ri *RecordsIndex) IndexRecord(collection, rkey, cidStr string) error { 97 + // did parameter is optional - pass empty string if not applicable 98 + func (ri *RecordsIndex) IndexRecord(collection, rkey, cidStr, did string) error { 66 99 _, err := ri.db.Exec(` 67 - INSERT OR REPLACE INTO records (collection, rkey, cid) 68 - VALUES (?, ?, ?) 69 - `, collection, rkey, cidStr) 100 + INSERT OR REPLACE INTO records (collection, rkey, cid, did) 101 + VALUES (?, ?, ?, ?) 102 + `, collection, rkey, cidStr, sql.NullString{String: did, Valid: did != ""}) 70 103 return err 71 104 } 72 105 ··· 90 123 // Oldest first (ascending order) 91 124 if cursor != "" { 92 125 query = ` 93 - SELECT collection, rkey, cid FROM records 126 + SELECT collection, rkey, cid, COALESCE(did, '') FROM records 94 127 WHERE collection = ? AND rkey > ? 95 128 ORDER BY rkey ASC 96 129 LIMIT ? ··· 98 131 args = []any{collection, cursor, limit + 1} 99 132 } else { 100 133 query = ` 101 - SELECT collection, rkey, cid FROM records 134 + SELECT collection, rkey, cid, COALESCE(did, '') FROM records 102 135 WHERE collection = ? 103 136 ORDER BY rkey ASC 104 137 LIMIT ? ··· 109 142 // Newest first (descending order) - default 110 143 if cursor != "" { 111 144 query = ` 112 - SELECT collection, rkey, cid FROM records 145 + SELECT collection, rkey, cid, COALESCE(did, '') FROM records 113 146 WHERE collection = ? AND rkey < ? 114 147 ORDER BY rkey DESC 115 148 LIMIT ? ··· 117 150 args = []any{collection, cursor, limit + 1} 118 151 } else { 119 152 query = ` 120 - SELECT collection, rkey, cid FROM records 153 + SELECT collection, rkey, cid, COALESCE(did, '') FROM records 121 154 WHERE collection = ? 122 155 ORDER BY rkey DESC 123 156 LIMIT ? ··· 135 168 var records []Record 136 169 for rows.Next() { 137 170 var rec Record 138 - if err := rows.Scan(&rec.Collection, &rec.Rkey, &rec.Cid); err != nil { 171 + if err := rows.Scan(&rec.Collection, &rec.Rkey, &rec.Cid, &rec.Did); err != nil { 139 172 return nil, "", fmt.Errorf("failed to scan record: %w", err) 140 173 } 141 174 records = append(records, rec) ··· 156 189 return records, nextCursor, nil 157 190 } 158 191 192 + // ListRecordsByDID returns records for a collection filtered by DID with pagination support 193 + func (ri *RecordsIndex) ListRecordsByDID(collection, did string, limit int, cursor string) ([]Record, string, error) { 194 + var query string 195 + var args []any 196 + 197 + if cursor != "" { 198 + query = ` 199 + SELECT collection, rkey, cid, COALESCE(did, '') FROM records 200 + WHERE collection = ? AND did = ? AND rkey < ? 201 + ORDER BY rkey DESC 202 + LIMIT ? 203 + ` 204 + args = []any{collection, did, cursor, limit + 1} 205 + } else { 206 + query = ` 207 + SELECT collection, rkey, cid, COALESCE(did, '') FROM records 208 + WHERE collection = ? AND did = ? 209 + ORDER BY rkey DESC 210 + LIMIT ? 211 + ` 212 + args = []any{collection, did, limit + 1} 213 + } 214 + 215 + rows, err := ri.db.Query(query, args...) 216 + if err != nil { 217 + return nil, "", fmt.Errorf("failed to query records: %w", err) 218 + } 219 + defer rows.Close() 220 + 221 + var records []Record 222 + for rows.Next() { 223 + var rec Record 224 + if err := rows.Scan(&rec.Collection, &rec.Rkey, &rec.Cid, &rec.Did); err != nil { 225 + return nil, "", fmt.Errorf("failed to scan record: %w", err) 226 + } 227 + records = append(records, rec) 228 + } 229 + 230 + if err := rows.Err(); err != nil { 231 + return nil, "", fmt.Errorf("error iterating records: %w", err) 232 + } 233 + 234 + // Determine next cursor 235 + var nextCursor string 236 + if len(records) > limit { 237 + nextCursor = records[limit-1].Rkey 238 + records = records[:limit] 239 + } 240 + 241 + return records, nextCursor, nil 242 + } 243 + 159 244 // Count returns the number of records in a collection 160 245 func (ri *RecordsIndex) Count(collection string) (int, error) { 161 246 var count int ··· 174 259 175 260 // BackfillFromRepo populates the records index from an existing MST repo 176 261 // Compares MST count with index count - only backfills if they differ 262 + // Extracts DID from record content for crew, layer, and stats records 177 263 func (ri *RecordsIndex) BackfillFromRepo(ctx context.Context, repoHandle *repo.Repo) error { 178 264 // Count records in MST 179 265 mstCount := 0 ··· 207 293 defer tx.Rollback() 208 294 209 295 stmt, err := tx.Prepare(` 210 - INSERT OR REPLACE INTO records (collection, rkey, cid) 211 - VALUES (?, ?, ?) 296 + INSERT OR REPLACE INTO records (collection, rkey, cid, did) 297 + VALUES (?, ?, ?, ?) 212 298 `) 213 299 if err != nil { 214 300 return fmt.Errorf("failed to prepare statement: %w", err) ··· 224 310 } 225 311 collection, rkey := parts[0], parts[1] 226 312 227 - _, err := stmt.Exec(collection, rkey, c.String()) 313 + // Extract DID from record content based on collection type 314 + var did string 315 + _, recBytes, err := repoHandle.GetRecordBytes(ctx, key) 316 + if err == nil && recBytes != nil { 317 + did = extractDIDFromRecord(collection, *recBytes) 318 + } 319 + 320 + _, err = stmt.Exec(collection, rkey, c.String(), sql.NullString{String: did, Valid: did != ""}) 228 321 if err != nil { 229 322 return fmt.Errorf("failed to index record %s: %w", key, err) 230 323 } ··· 249 342 slog.Info("Backfill complete", "records", recordCount) 250 343 return nil 251 344 } 345 + 346 + // extractDIDFromRecord extracts the associated DID from a record based on its collection type 347 + func extractDIDFromRecord(collection string, recBytes []byte) string { 348 + switch collection { 349 + case atproto.CrewCollection: 350 + var rec atproto.CrewRecord 351 + if err := rec.UnmarshalCBOR(bytes.NewReader(recBytes)); err == nil { 352 + return rec.Member 353 + } 354 + case atproto.LayerCollection: 355 + var rec atproto.LayerRecord 356 + if err := rec.UnmarshalCBOR(bytes.NewReader(recBytes)); err == nil { 357 + return rec.UserDID 358 + } 359 + case atproto.StatsCollection: 360 + var rec atproto.StatsRecord 361 + if err := rec.UnmarshalCBOR(bytes.NewReader(recBytes)); err == nil { 362 + return rec.OwnerDID 363 + } 364 + } 365 + return "" 366 + }
+19 -19
pkg/hold/pds/records_test.go
··· 50 50 defer ri.Close() 51 51 52 52 // Index a record 53 - err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123") 53 + err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123", "") 54 54 if err != nil { 55 55 t.Fatalf("IndexRecord() error = %v", err) 56 56 } ··· 75 75 defer ri.Close() 76 76 77 77 // Index a record 78 - err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123") 78 + err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123", "") 79 79 if err != nil { 80 80 t.Fatalf("IndexRecord() first call error = %v", err) 81 81 } 82 82 83 83 // Update the same record with new CID 84 - err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei456") 84 + err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei456", "") 85 85 if err != nil { 86 86 t.Fatalf("IndexRecord() second call error = %v", err) 87 87 } ··· 118 118 defer ri.Close() 119 119 120 120 // Index a record 121 - err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123") 121 + err = ri.IndexRecord("io.atcr.hold.crew", "abc123", "bafyrei123", "") 122 122 if err != nil { 123 123 t.Fatalf("IndexRecord() error = %v", err) 124 124 } ··· 217 217 {"ccc", "cid3"}, 218 218 } 219 219 for _, r := range records { 220 - if err := ri.IndexRecord("io.atcr.hold.crew", r.rkey, r.cid); err != nil { 220 + if err := ri.IndexRecord("io.atcr.hold.crew", r.rkey, r.cid, ""); err != nil { 221 221 t.Fatalf("IndexRecord() error = %v", err) 222 222 } 223 223 } ··· 248 248 // Add records with different rkeys (TIDs are lexicographically ordered by time) 249 249 rkeys := []string{"3m3aaaaaaaaa", "3m3bbbbbbbbb", "3m3ccccccccc"} 250 250 for _, rkey := range rkeys { 251 - if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey); err != nil { 251 + if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, ""); err != nil { 252 252 t.Fatalf("IndexRecord() error = %v", err) 253 253 } 254 254 } ··· 286 286 // Add records 287 287 rkeys := []string{"3m3aaaaaaaaa", "3m3bbbbbbbbb", "3m3ccccccccc"} 288 288 for _, rkey := range rkeys { 289 - if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey); err != nil { 289 + if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, ""); err != nil { 290 290 t.Fatalf("IndexRecord() error = %v", err) 291 291 } 292 292 } ··· 324 324 // Add 5 records 325 325 for i := range 5 { 326 326 rkey := string(rune('a' + i)) 327 - if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey); err != nil { 327 + if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, ""); err != nil { 328 328 t.Fatalf("IndexRecord() error = %v", err) 329 329 } 330 330 } ··· 355 355 // Add 5 records 356 356 rkeys := []string{"a", "b", "c", "d", "e"} 357 357 for _, rkey := range rkeys { 358 - if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey); err != nil { 358 + if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, ""); err != nil { 359 359 t.Fatalf("IndexRecord() error = %v", err) 360 360 } 361 361 } ··· 430 430 // Add 5 records 431 431 rkeys := []string{"a", "b", "c", "d", "e"} 432 432 for _, rkey := range rkeys { 433 - if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey); err != nil { 433 + if err := ri.IndexRecord("io.atcr.hold.crew", rkey, "cid-"+rkey, ""); err != nil { 434 434 t.Fatalf("IndexRecord() error = %v", err) 435 435 } 436 436 } ··· 474 474 475 475 // Add records to two collections 476 476 for i := range 3 { 477 - ri.IndexRecord("io.atcr.hold.crew", string(rune('a'+i)), "cid1") 477 + ri.IndexRecord("io.atcr.hold.crew", string(rune('a'+i)), "cid1", "") 478 478 } 479 479 for i := range 5 { 480 - ri.IndexRecord("io.atcr.hold.captain", string(rune('a'+i)), "cid2") 480 + ri.IndexRecord("io.atcr.hold.captain", string(rune('a'+i)), "cid2", "") 481 481 } 482 482 483 483 // Count crew ··· 527 527 defer ri.Close() 528 528 529 529 // Add records to multiple collections 530 - ri.IndexRecord("io.atcr.hold.crew", "a", "cid1") 531 - ri.IndexRecord("io.atcr.hold.crew", "b", "cid2") 532 - ri.IndexRecord("io.atcr.hold.captain", "self", "cid3") 533 - ri.IndexRecord("io.atcr.manifest", "abc123", "cid4") 530 + ri.IndexRecord("io.atcr.hold.crew", "a", "cid1", "") 531 + ri.IndexRecord("io.atcr.hold.crew", "b", "cid2", "") 532 + ri.IndexRecord("io.atcr.hold.captain", "self", "cid3", "") 533 + ri.IndexRecord("io.atcr.manifest", "abc123", "cid4", "") 534 534 535 535 count, err := ri.TotalCount() 536 536 if err != nil { ··· 581 581 defer ri.Close() 582 582 583 583 // Add records to different collections with same rkeys 584 - ri.IndexRecord("io.atcr.hold.crew", "abc", "cid-crew") 585 - ri.IndexRecord("io.atcr.hold.captain", "abc", "cid-captain") 586 - ri.IndexRecord("io.atcr.manifest", "abc", "cid-manifest") 584 + ri.IndexRecord("io.atcr.hold.crew", "abc", "cid-crew", "") 585 + ri.IndexRecord("io.atcr.hold.captain", "abc", "cid-captain", "") 586 + ri.IndexRecord("io.atcr.manifest", "abc", "cid-manifest", "") 587 587 588 588 // Listing should only return records from requested collection 589 589 records, _, err := ri.ListRecords("io.atcr.hold.crew", 10, "", false)
+88
pkg/hold/pds/repomgr.go
··· 382 382 return rpath, cc, nil 383 383 } 384 384 385 + // UpsertRecord creates or updates a record with an explicit rkey. 386 + // If the record doesn't exist, it creates it. If it exists, it updates it. 387 + // Returns the collection path (e.g., "io.atcr.captain/self"), CID, and whether it was created (true) or updated (false). 388 + func (rm *RepoManager) UpsertRecord(ctx context.Context, user models.Uid, collection, rkey string, rec cbg.CBORMarshaler) (string, cid.Cid, bool, error) { 389 + ctx, span := otel.Tracer("repoman").Start(ctx, "UpsertRecord") 390 + defer span.End() 391 + 392 + unlock := rm.lockUser(ctx, user) 393 + defer unlock() 394 + 395 + rev, err := rm.cs.GetUserRepoRev(ctx, user) 396 + if err != nil { 397 + return "", cid.Undef, false, err 398 + } 399 + 400 + ds, err := rm.cs.NewDeltaSession(ctx, user, &rev) 401 + if err != nil { 402 + return "", cid.Undef, false, err 403 + } 404 + 405 + head := ds.BaseCid() 406 + r, err := repo.OpenRepo(ctx, ds, head) 407 + if err != nil { 408 + return "", cid.Undef, false, err 409 + } 410 + 411 + rpath := collection + "/" + rkey 412 + 413 + // Check if record exists 414 + _, _, err = r.GetRecordBytes(ctx, rpath) 415 + recordExists := err == nil 416 + 417 + var cc cid.Cid 418 + var evtKind EventKind 419 + if recordExists { 420 + // Update existing record 421 + cc, err = r.UpdateRecord(ctx, rpath, rec) 422 + evtKind = EvtKindUpdateRecord 423 + } else { 424 + // Create new record 425 + cc, err = r.PutRecord(ctx, rpath, rec) 426 + evtKind = EvtKindCreateRecord 427 + } 428 + if err != nil { 429 + return "", cid.Undef, false, err 430 + } 431 + 432 + nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) 433 + if err != nil { 434 + return "", cid.Undef, false, err 435 + } 436 + 437 + rslice, err := ds.CloseWithRoot(ctx, nroot, nrev) 438 + if err != nil { 439 + return "", cid.Undef, false, fmt.Errorf("close with root: %w", err) 440 + } 441 + 442 + var oldroot *cid.Cid 443 + if head.Defined() { 444 + oldroot = &head 445 + } 446 + 447 + if rm.events != nil { 448 + op := RepoOp{ 449 + Kind: evtKind, 450 + Collection: collection, 451 + Rkey: rkey, 452 + RecCid: &cc, 453 + } 454 + 455 + if rm.hydrateRecords { 456 + op.Record = rec 457 + } 458 + 459 + rm.events(ctx, &RepoEvent{ 460 + User: user, 461 + OldRoot: oldroot, 462 + NewRoot: nroot, 463 + Rev: nrev, 464 + Since: &rev, 465 + Ops: []RepoOp{op}, 466 + RepoSlice: rslice, 467 + }) 468 + } 469 + 470 + return rpath, cc, !recordExists, nil 471 + } 472 + 385 473 func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collection, rkey string) error { 386 474 ctx, span := otel.Tracer("repoman").Start(ctx, "DeleteRecord") 387 475 defer span.End()
+33 -1
pkg/hold/pds/server.go
··· 225 225 } 226 226 } 227 227 228 + // TODO(crew-migration): Remove this call after all holds have been upgraded (added 2026-01-06) 229 + // Migrate TID-based crew records to hash-based rkeys for O(1) lookups 230 + if migrated, err := p.MigrateCrewRecordsToHashRkeys(ctx); err != nil { 231 + slog.Warn("Crew record migration failed", "error", err) 232 + } else if migrated > 0 { 233 + slog.Info("Migrated crew records to hash-based rkeys", "count", migrated) 234 + } 235 + 228 236 // Create Bluesky profile record (idempotent - check if exists first) 229 237 // This runs even if captain exists (for existing holds being upgraded) 230 238 // Skip if no storage driver (e.g., in tests) ··· 319 327 if op.RecCid != nil { 320 328 cidStr = op.RecCid.String() 321 329 } 322 - if err := p.recordsIndex.IndexRecord(op.Collection, op.Rkey, cidStr); err != nil { 330 + // Extract DID from record based on collection type 331 + did := extractDIDFromOp(op) 332 + if err := p.recordsIndex.IndexRecord(op.Collection, op.Rkey, cidStr, did); err != nil { 323 333 slog.Warn("Failed to index record", "collection", op.Collection, "rkey", op.Rkey, "error", err) 324 334 } 325 335 case EvtKindDeleteRecord: ··· 336 346 broadcasterHandler(ctx, event) 337 347 } 338 348 } 349 + } 350 + 351 + // extractDIDFromOp extracts the associated DID from a repo operation based on collection type 352 + func extractDIDFromOp(op RepoOp) string { 353 + if op.Record == nil { 354 + return "" 355 + } 356 + switch op.Collection { 357 + case atproto.CrewCollection: 358 + if rec, ok := op.Record.(*atproto.CrewRecord); ok { 359 + return rec.Member 360 + } 361 + case atproto.LayerCollection: 362 + if rec, ok := op.Record.(*atproto.LayerRecord); ok { 363 + return rec.UserDID 364 + } 365 + case atproto.StatsCollection: 366 + if rec, ok := op.Record.(*atproto.StatsRecord); ok { 367 + return rec.OwnerDID 368 + } 369 + } 370 + return "" 339 371 } 340 372 341 373 // BackfillRecordsIndex populates the records index from existing MST data
+4 -5
pkg/hold/pds/server_test.go
··· 524 524 } 525 525 526 526 // Verify crew wasn't duplicated (Bootstrap adds owner as crew, but they already exist) 527 + // With hash-based rkeys, AddCrewMember uses PutRecord which upserts - no duplicates possible 527 528 crewAfter, err := pds.ListCrewMembers(ctx) 528 529 if err != nil { 529 530 t.Fatalf("ListCrewMembers failed after bootstrap: %v", err) 530 531 } 531 532 532 - // Should have 2 crew members now: original + one added by bootstrap 533 - // (Bootstrap doesn't check for duplicates currently) 534 - if len(crewAfter) != 2 { 535 - t.Logf("Note: Bootstrap added owner as crew even though they already existed") 536 - t.Logf("Crew count after bootstrap: %d", len(crewAfter)) 533 + // Should still have 1 crew member (hash-based rkey ensures upsert, not duplicate) 534 + if len(crewAfter) != 1 { 535 + t.Errorf("Expected 1 crew member after bootstrap (upsert), got %d", len(crewAfter)) 537 536 } 538 537 } 539 538
+106 -1
pkg/logging/logger.go
··· 1 1 // Package logging provides centralized structured logging using slog 2 2 // with configurable log levels. Call InitLogger() from main() to configure. 3 + // 4 + // Dynamic debug logging: 5 + // Send SIGUSR1 to toggle debug mode at runtime (auto-reverts after 30 minutes). 6 + // Example: docker kill -s SIGUSR1 <container> 3 7 package logging 4 8 5 9 import ( 6 10 "io" 7 11 "log/slog" 8 12 "os" 13 + "os/signal" 9 14 "strings" 15 + "sync" 16 + "sync/atomic" 17 + "syscall" 18 + "time" 19 + ) 20 + 21 + const debugTimeout = 30 * time.Minute 22 + 23 + var ( 24 + levelVar *slog.LevelVar 25 + originalLevel slog.Level 26 + debugEnabled atomic.Bool 27 + revertTimer *time.Timer 28 + revertMu sync.Mutex 10 29 ) 11 30 12 31 // InitLogger initializes the global slog default logger with the specified log level. 13 32 // Valid levels: debug, info, warn, error (case-insensitive) 14 33 // If level is empty or invalid, defaults to INFO. 15 34 // Call this from main() at startup. 35 + // 36 + // Also starts a signal handler for SIGUSR1 to toggle debug mode at runtime. 16 37 func InitLogger(level string) { 17 38 var logLevel slog.Level 18 39 ··· 29 50 logLevel = slog.LevelInfo 30 51 } 31 52 53 + // Store original level for toggle-back and use LevelVar for dynamic changes 54 + originalLevel = logLevel 55 + levelVar = new(slog.LevelVar) 56 + levelVar.Set(logLevel) 57 + 32 58 opts := &slog.HandlerOptions{ 33 - Level: logLevel, 59 + Level: levelVar, 34 60 } 35 61 36 62 handler := slog.NewTextHandler(os.Stdout, opts) 37 63 slog.SetDefault(slog.New(handler)) 64 + 65 + // Start signal handler for dynamic debug toggle 66 + go handleDebugSignal() 67 + } 68 + 69 + func handleDebugSignal() { 70 + sigChan := make(chan os.Signal, 1) 71 + signal.Notify(sigChan, syscall.SIGUSR1) 72 + 73 + for range sigChan { 74 + ToggleDebug() 75 + } 76 + } 77 + 78 + // ToggleDebug toggles between the original log level and DEBUG. 79 + // When enabling debug, starts a 30-minute timer that auto-reverts. 80 + // Typically called via SIGUSR1 signal. 81 + func ToggleDebug() { 82 + revertMu.Lock() 83 + defer revertMu.Unlock() 84 + 85 + wasDebug := debugEnabled.Swap(!debugEnabled.Load()) 86 + 87 + // Cancel any existing revert timer 88 + if revertTimer != nil { 89 + revertTimer.Stop() 90 + revertTimer = nil 91 + } 92 + 93 + if wasDebug { 94 + // Turning debug OFF 95 + levelVar.Set(originalLevel) 96 + slog.Info("Log level changed", 97 + "from", "DEBUG", 98 + "to", levelToString(originalLevel), 99 + "trigger", "SIGUSR1") 100 + } else { 101 + // Turning debug ON - start auto-revert timer 102 + levelVar.Set(slog.LevelDebug) 103 + revertTimer = time.AfterFunc(debugTimeout, autoRevert) 104 + slog.Info("Log level changed", 105 + "from", levelToString(originalLevel), 106 + "to", "DEBUG", 107 + "trigger", "SIGUSR1", 108 + "auto_revert_in", debugTimeout) 109 + } 110 + } 111 + 112 + func autoRevert() { 113 + revertMu.Lock() 114 + defer revertMu.Unlock() 115 + 116 + if !debugEnabled.Load() { 117 + return // Already reverted manually 118 + } 119 + 120 + debugEnabled.Store(false) 121 + levelVar.Set(originalLevel) 122 + revertTimer = nil 123 + 124 + slog.Info("Log level changed", 125 + "from", "DEBUG", 126 + "to", levelToString(originalLevel), 127 + "trigger", "auto-revert") 128 + } 129 + 130 + func levelToString(l slog.Level) string { 131 + switch l { 132 + case slog.LevelDebug: 133 + return "DEBUG" 134 + case slog.LevelInfo: 135 + return "INFO" 136 + case slog.LevelWarn: 137 + return "WARN" 138 + case slog.LevelError: 139 + return "ERROR" 140 + default: 141 + return l.String() 142 + } 38 143 } 39 144 40 145 // SetupTestLogger configures logging for tests to reduce noise.