A community based topic aggregation platform built on atproto

feat(pds): add PutRecord with optimistic locking for comment updates

Implement PutRecord in PDS client with swapRecord CID validation:
- Add ErrConflict error type for HTTP 409 responses
- Add PutRecord method to Client interface with optimistic locking
- Map 409 status to ErrConflict in wrapAPIError

Migrate UpdateComment to use PutRecord:
- Use existingRecord.CID as swapRecord for concurrent modification detection
- Add ErrConcurrentModification error type in comments package
- Return proper error when PDS detects CID mismatch

Testing:
- Add PutRecord unit tests (success, conflict, typed errors)
- Add PutRecord to mockPDSClient for unit test compatibility
- Add integration test for concurrent modification detection

๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+463 -131
docs
internal
tests
integration
-124
docs/PRD_BACKLOG.md
··· 649 649 650 650 ## ๐Ÿ”ต P3: Technical Debt 651 651 652 - ### Implement PutRecord in PDS Client 653 - **Added:** 2025-12-04 | **Effort:** 2-3 hours | **Priority:** Technical Debt 654 - **Status:** ๐Ÿ“‹ TODO 655 - 656 - **Problem:** 657 - The PDS client (`internal/atproto/pds/client.go`) only has `CreateRecord` but lacks `PutRecord`. This means updates use `CreateRecord` with an existing rkey, which: 658 - 1. Loses optimistic locking (no CID swap check) 659 - 2. Is semantically incorrect (creates vs updates) 660 - 3. Could cause race conditions on concurrent updates 661 - 662 - **atProto Best Practice:** 663 - - `com.atproto.repo.putRecord` should be used for updates 664 - - Accepts `swapRecord` (expected CID) for optimistic locking 665 - - Returns conflict error if CID doesn't match (concurrent modification detected) 666 - 667 - **Solution:** 668 - Add `PutRecord` method to the PDS client interface: 669 - 670 - ```go 671 - // Client interface addition 672 - type Client interface { 673 - // ... existing methods ... 674 - 675 - // PutRecord creates or updates a record with optional optimistic locking. 676 - // If swapRecord is provided, the operation fails if the current CID doesn't match. 677 - PutRecord(ctx context.Context, collection string, rkey string, record any, swapRecord string) (uri string, cid string, err error) 678 - } 679 - 680 - // Implementation 681 - func (c *client) PutRecord(ctx context.Context, collection string, rkey string, record any, swapRecord string) (string, string, error) { 682 - payload := map[string]any{ 683 - "repo": c.did, 684 - "collection": collection, 685 - "rkey": rkey, 686 - "record": record, 687 - } 688 - 689 - // Optional: optimistic locking via CID swap check 690 - if swapRecord != "" { 691 - payload["swapRecord"] = swapRecord 692 - } 693 - 694 - var result struct { 695 - URI string `json:"uri"` 696 - CID string `json:"cid"` 697 - } 698 - 699 - err := c.apiClient.Post(ctx, syntax.NSID("com.atproto.repo.putRecord"), payload, &result) 700 - if err != nil { 701 - return "", "", wrapAPIError(err, "putRecord") 702 - } 703 - 704 - return result.URI, result.CID, nil 705 - } 706 - ``` 707 - 708 - **Error Handling:** 709 - Add new error type for conflict detection: 710 - ```go 711 - var ErrConflict = errors.New("record was modified by another operation") 712 - ``` 713 - 714 - Map HTTP 409 in `wrapAPIError`: 715 - ```go 716 - case 409: 717 - return fmt.Errorf("%s: %w: %s", operation, ErrConflict, apiErr.Message) 718 - ``` 719 - 720 - **Files to Modify:** 721 - - `internal/atproto/pds/client.go` - Add `PutRecord` method and interface 722 - - `internal/atproto/pds/errors.go` - Add `ErrConflict` error type 723 - 724 - **Testing:** 725 - - Unit test: Verify payload includes `swapRecord` when provided 726 - - Integration test: Concurrent updates detect conflict 727 - - Integration test: Update without `swapRecord` still works (backwards compatible) 728 - 729 - **Blocked By:** Nothing 730 - **Blocks:** "Migrate UpdateComment to use PutRecord" 731 - 732 - --- 733 - 734 - ### Migrate UpdateComment to Use PutRecord 735 - **Added:** 2025-12-04 | **Effort:** 1 hour | **Priority:** Technical Debt 736 - **Status:** ๐Ÿ“‹ TODO (Blocked) 737 - **Blocked By:** "Implement PutRecord in PDS Client" 738 - 739 - **Problem:** 740 - `UpdateComment` in `internal/core/comments/comment_service.go` uses `CreateRecord` for updates instead of `PutRecord`. This lacks optimistic locking and is semantically incorrect. 741 - 742 - **Current Code (lines 687-690):** 743 - ```go 744 - // TODO: Use PutRecord instead of CreateRecord for proper update semantics with optimistic locking. 745 - // PutRecord should accept the existing CID (existingRecord.CID) to ensure concurrent updates are detected. 746 - // However, PutRecord is not yet implemented in internal/atproto/pds/client.go. 747 - uri, cid, err := pdsClient.CreateRecord(ctx, commentCollection, rkey, updatedRecord) 748 - ``` 749 - 750 - **Solution:** 751 - Once `PutRecord` is implemented in the PDS client, update to: 752 - ```go 753 - // Use PutRecord with optimistic locking via existing CID 754 - uri, cid, err := pdsClient.PutRecord(ctx, commentCollection, rkey, updatedRecord, existingRecord.CID) 755 - if err != nil { 756 - if errors.Is(err, pds.ErrConflict) { 757 - // Record was modified by another operation - return appropriate error 758 - return nil, fmt.Errorf("comment was modified, please refresh and try again: %w", err) 759 - } 760 - // ... existing error handling 761 - } 762 - ``` 763 - 764 - **Files to Modify:** 765 - - `internal/core/comments/comment_service.go` - UpdateComment method 766 - - `internal/core/comments/errors.go` - Add `ErrConcurrentModification` if needed 767 - 768 - **Testing:** 769 - - Unit test: Verify `PutRecord` is called with correct CID 770 - - Integration test: Simulate concurrent update, verify conflict handling 771 - 772 - **Impact:** Proper optimistic locking prevents lost updates from race conditions 773 - 774 - --- 775 - 776 652 ### Consolidate Environment Variable Validation 777 653 **Added:** 2025-10-11 | **Effort:** 2-3 hours 778 654
+33
internal/atproto/pds/client.go
··· 31 31 // GetRecord retrieves a single record by collection and rkey. 32 32 GetRecord(ctx context.Context, collection string, rkey string) (*RecordResponse, error) 33 33 34 + // PutRecord creates or updates a record with optional optimistic locking. 35 + // If swapRecord CID is provided, the operation fails if the current CID doesn't match. 36 + PutRecord(ctx context.Context, collection string, rkey string, record any, swapRecord string) (uri string, cid string, err error) 37 + 34 38 // DID returns the authenticated user's DID. 35 39 DID() string 36 40 ··· 89 93 return fmt.Errorf("%s: %w: %s", operation, ErrForbidden, apiErr.Message) 90 94 case 404: 91 95 return fmt.Errorf("%s: %w: %s", operation, ErrNotFound, apiErr.Message) 96 + case 409: 97 + return fmt.Errorf("%s: %w: %s", operation, ErrConflict, apiErr.Message) 92 98 } 93 99 } 94 100 ··· 218 224 Value: result.Value, 219 225 }, nil 220 226 } 227 + 228 + // PutRecord creates or updates a record with optional optimistic locking. 229 + func (c *client) PutRecord(ctx context.Context, collection string, rkey string, record any, swapRecord string) (string, string, error) { 230 + payload := map[string]any{ 231 + "repo": c.did, 232 + "collection": collection, 233 + "rkey": rkey, 234 + "record": record, 235 + } 236 + 237 + // Optional: optimistic locking via CID swap check 238 + if swapRecord != "" { 239 + payload["swapRecord"] = swapRecord 240 + } 241 + 242 + var result struct { 243 + URI string `json:"uri"` 244 + CID string `json:"cid"` 245 + } 246 + 247 + err := c.apiClient.Post(ctx, syntax.NSID("com.atproto.repo.putRecord"), payload, &result) 248 + if err != nil { 249 + return "", "", wrapAPIError(err, "putRecord") 250 + } 251 + 252 + return result.URI, result.CID, nil 253 + }
+231
internal/atproto/pds/client_test.go
··· 959 959 wantTyped: ErrBadRequest, 960 960 }, 961 961 { 962 + name: "409 maps to ErrConflict", 963 + err: &atclient.APIError{StatusCode: 409, Name: "InvalidSwap", Message: "Record CID mismatch"}, 964 + operation: "putRecord", 965 + wantTyped: ErrConflict, 966 + }, 967 + { 962 968 name: "500 wraps without typed error", 963 969 err: &atclient.APIError{StatusCode: 500, Name: "InternalError", Message: "Server error"}, 964 970 operation: "listRecords", ··· 1172 1178 }) 1173 1179 } 1174 1180 } 1181 + 1182 + // TestClient_PutRecord tests the PutRecord method with a mock server. 1183 + func TestClient_PutRecord(t *testing.T) { 1184 + tests := []struct { 1185 + name string 1186 + collection string 1187 + rkey string 1188 + record map[string]any 1189 + swapRecord string 1190 + serverResponse map[string]any 1191 + serverStatus int 1192 + wantURI string 1193 + wantCID string 1194 + wantErr bool 1195 + }{ 1196 + { 1197 + name: "successful put with swapRecord", 1198 + collection: "social.coves.comment", 1199 + rkey: "3kjzl5kcb2s2v", 1200 + record: map[string]any{ 1201 + "$type": "social.coves.comment", 1202 + "content": "Updated comment content", 1203 + }, 1204 + swapRecord: "bafyreigbtj4x7ip5legnfznufuopl4sg4knzc2cof6duas4b3q2fy6swua", 1205 + serverResponse: map[string]any{ 1206 + "uri": "at://did:plc:test/social.coves.comment/3kjzl5kcb2s2v", 1207 + "cid": "bafyreihd4q3yqcfvnv5zlp6n4fqzh6z4p4m3mwc7vvr6k2j6y6v2a3b4c5", 1208 + }, 1209 + serverStatus: http.StatusOK, 1210 + wantURI: "at://did:plc:test/social.coves.comment/3kjzl5kcb2s2v", 1211 + wantCID: "bafyreihd4q3yqcfvnv5zlp6n4fqzh6z4p4m3mwc7vvr6k2j6y6v2a3b4c5", 1212 + wantErr: false, 1213 + }, 1214 + { 1215 + name: "successful put without swapRecord", 1216 + collection: "social.coves.comment", 1217 + rkey: "3kjzl5kcb2s2v", 1218 + record: map[string]any{ 1219 + "$type": "social.coves.comment", 1220 + "content": "Updated comment", 1221 + }, 1222 + swapRecord: "", 1223 + serverResponse: map[string]any{ 1224 + "uri": "at://did:plc:test/social.coves.comment/3kjzl5kcb2s2v", 1225 + "cid": "bafyreihd4q3yqcfvnv5zlp6n4fqzh6z4p4m3mwc7vvr6k2j6y6v2a3b4c5", 1226 + }, 1227 + serverStatus: http.StatusOK, 1228 + wantURI: "at://did:plc:test/social.coves.comment/3kjzl5kcb2s2v", 1229 + wantCID: "bafyreihd4q3yqcfvnv5zlp6n4fqzh6z4p4m3mwc7vvr6k2j6y6v2a3b4c5", 1230 + wantErr: false, 1231 + }, 1232 + { 1233 + name: "conflict error (409)", 1234 + collection: "social.coves.comment", 1235 + rkey: "test", 1236 + record: map[string]any{"$type": "social.coves.comment"}, 1237 + swapRecord: "bafyreigbtj4x7ip5legnfznufuopl4sg4knzc2cof6duas4b3q2fy6swua", 1238 + serverResponse: map[string]any{ 1239 + "error": "InvalidSwap", 1240 + "message": "Record CID does not match", 1241 + }, 1242 + serverStatus: http.StatusConflict, 1243 + wantErr: true, 1244 + }, 1245 + { 1246 + name: "server error", 1247 + collection: "social.coves.comment", 1248 + rkey: "test", 1249 + record: map[string]any{"$type": "social.coves.comment"}, 1250 + swapRecord: "", 1251 + serverResponse: map[string]any{ 1252 + "error": "InvalidRequest", 1253 + "message": "Invalid record", 1254 + }, 1255 + serverStatus: http.StatusBadRequest, 1256 + wantErr: true, 1257 + }, 1258 + } 1259 + 1260 + for _, tt := range tests { 1261 + t.Run(tt.name, func(t *testing.T) { 1262 + // Create mock server 1263 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1264 + // Verify method 1265 + if r.Method != http.MethodPost { 1266 + t.Errorf("expected POST request, got %s", r.Method) 1267 + } 1268 + 1269 + // Verify path 1270 + expectedPath := "/xrpc/com.atproto.repo.putRecord" 1271 + if r.URL.Path != expectedPath { 1272 + t.Errorf("path = %q, want %q", r.URL.Path, expectedPath) 1273 + } 1274 + 1275 + // Verify request body 1276 + var payload map[string]any 1277 + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 1278 + t.Fatalf("failed to decode request body: %v", err) 1279 + } 1280 + 1281 + // Check required fields 1282 + if payload["collection"] != tt.collection { 1283 + t.Errorf("collection = %v, want %v", payload["collection"], tt.collection) 1284 + } 1285 + if payload["rkey"] != tt.rkey { 1286 + t.Errorf("rkey = %v, want %v", payload["rkey"], tt.rkey) 1287 + } 1288 + 1289 + // Check swapRecord inclusion 1290 + if tt.swapRecord != "" { 1291 + if payload["swapRecord"] != tt.swapRecord { 1292 + t.Errorf("swapRecord = %v, want %v", payload["swapRecord"], tt.swapRecord) 1293 + } 1294 + } else { 1295 + if _, exists := payload["swapRecord"]; exists { 1296 + t.Error("swapRecord should not be included when empty") 1297 + } 1298 + } 1299 + 1300 + // Send response 1301 + w.Header().Set("Content-Type", "application/json") 1302 + w.WriteHeader(tt.serverStatus) 1303 + json.NewEncoder(w).Encode(tt.serverResponse) 1304 + })) 1305 + defer server.Close() 1306 + 1307 + // Create client 1308 + apiClient := atclient.NewAPIClient(server.URL) 1309 + apiClient.Auth = &bearerAuth{token: "test-token"} 1310 + 1311 + c := &client{ 1312 + apiClient: apiClient, 1313 + did: "did:plc:test", 1314 + host: server.URL, 1315 + } 1316 + 1317 + // Execute PutRecord 1318 + ctx := context.Background() 1319 + uri, cid, err := c.PutRecord(ctx, tt.collection, tt.rkey, tt.record, tt.swapRecord) 1320 + 1321 + if tt.wantErr { 1322 + if err == nil { 1323 + t.Fatal("expected error, got nil") 1324 + } 1325 + return 1326 + } 1327 + 1328 + if err != nil { 1329 + t.Fatalf("unexpected error: %v", err) 1330 + } 1331 + 1332 + if uri != tt.wantURI { 1333 + t.Errorf("uri = %q, want %q", uri, tt.wantURI) 1334 + } 1335 + 1336 + if cid != tt.wantCID { 1337 + t.Errorf("cid = %q, want %q", cid, tt.wantCID) 1338 + } 1339 + }) 1340 + } 1341 + } 1342 + 1343 + // TestClient_TypedErrors_PutRecord tests that PutRecord returns typed errors. 1344 + func TestClient_TypedErrors_PutRecord(t *testing.T) { 1345 + tests := []struct { 1346 + name string 1347 + serverStatus int 1348 + wantErr error 1349 + }{ 1350 + { 1351 + name: "401 returns ErrUnauthorized", 1352 + serverStatus: http.StatusUnauthorized, 1353 + wantErr: ErrUnauthorized, 1354 + }, 1355 + { 1356 + name: "403 returns ErrForbidden", 1357 + serverStatus: http.StatusForbidden, 1358 + wantErr: ErrForbidden, 1359 + }, 1360 + { 1361 + name: "409 returns ErrConflict", 1362 + serverStatus: http.StatusConflict, 1363 + wantErr: ErrConflict, 1364 + }, 1365 + { 1366 + name: "400 returns ErrBadRequest", 1367 + serverStatus: http.StatusBadRequest, 1368 + wantErr: ErrBadRequest, 1369 + }, 1370 + } 1371 + 1372 + for _, tt := range tests { 1373 + t.Run(tt.name, func(t *testing.T) { 1374 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1375 + w.Header().Set("Content-Type", "application/json") 1376 + w.WriteHeader(tt.serverStatus) 1377 + json.NewEncoder(w).Encode(map[string]any{ 1378 + "error": "TestError", 1379 + "message": "Test error message", 1380 + }) 1381 + })) 1382 + defer server.Close() 1383 + 1384 + apiClient := atclient.NewAPIClient(server.URL) 1385 + apiClient.Auth = &bearerAuth{token: "test-token"} 1386 + 1387 + c := &client{ 1388 + apiClient: apiClient, 1389 + did: "did:plc:test", 1390 + host: server.URL, 1391 + } 1392 + 1393 + ctx := context.Background() 1394 + _, _, err := c.PutRecord(ctx, "test.collection", "rkey", map[string]any{}, "") 1395 + 1396 + if err == nil { 1397 + t.Fatal("expected error, got nil") 1398 + } 1399 + 1400 + if !errors.Is(err, tt.wantErr) { 1401 + t.Errorf("expected errors.Is(%v, %v) to be true", err, tt.wantErr) 1402 + } 1403 + }) 1404 + } 1405 + }
+3
internal/atproto/pds/errors.go
··· 17 17 18 18 // ErrBadRequest indicates the request was malformed or invalid (HTTP 400). 19 19 ErrBadRequest = errors.New("bad request") 20 + 21 + // ErrConflict indicates the record was modified by another operation (HTTP 409). 22 + ErrConflict = errors.New("record was modified by another operation") 20 23 ) 21 24 22 25 // IsAuthError returns true if the error is an authentication/authorization error.
+5 -6
internal/core/comments/comment_service.go
··· 750 750 CreatedAt: createdAt, // Preserve original timestamp 751 751 } 752 752 753 - // Update the record on PDS (putRecord) 754 - // Note: This creates a new CID even though the URI stays the same 755 - // TODO: Use PutRecord instead of CreateRecord for proper update semantics with optimistic locking. 756 - // PutRecord should accept the existing CID (existingRecord.CID) to ensure concurrent updates are detected. 757 - // However, PutRecord is not yet implemented in internal/atproto/pds/client.go. 758 - uri, cid, err := pdsClient.CreateRecord(ctx, commentCollection, rkey, updatedRecord) 753 + // Update the record on PDS with optimistic locking via swapRecord CID 754 + uri, cid, err := pdsClient.PutRecord(ctx, commentCollection, rkey, updatedRecord, existingRecord.CID) 759 755 if err != nil { 760 756 s.logger.Error("failed to update comment on PDS", 761 757 "error", err, ··· 763 759 "rkey", rkey) 764 760 if pds.IsAuthError(err) { 765 761 return nil, ErrNotAuthorized 762 + } 763 + if errors.Is(err, pds.ErrConflict) { 764 + return nil, ErrConcurrentModification 766 765 } 767 766 return nil, fmt.Errorf("failed to update comment: %w", err) 768 767 }
+17
internal/core/comments/comment_write_service_test.go
··· 24 24 createError error // Error to return on CreateRecord 25 25 getError error // Error to return on GetRecord 26 26 deleteError error // Error to return on DeleteRecord 27 + putError error // Error to return on PutRecord 27 28 did string // DID of the authenticated user 28 29 hostURL string // PDS host URL 29 30 } ··· 111 112 112 113 func (m *mockPDSClient) ListRecords(ctx context.Context, collection string, limit int, cursor string) (*pds.ListRecordsResponse, error) { 113 114 return &pds.ListRecordsResponse{}, nil 115 + } 116 + 117 + func (m *mockPDSClient) PutRecord(ctx context.Context, collection, rkey string, record any, swapRecord string) (string, string, error) { 118 + if m.putError != nil { 119 + return "", "", m.putError 120 + } 121 + 122 + // Store record (same logic as CreateRecord) 123 + if m.records[collection] == nil { 124 + m.records[collection] = make(map[string]interface{}) 125 + } 126 + m.records[collection][rkey] = record 127 + 128 + uri := fmt.Sprintf("at://%s/%s/%s", m.did, collection, rkey) 129 + cid := fmt.Sprintf("bafytest%d", time.Now().UnixNano()) 130 + return uri, cid, nil 114 131 } 115 132 116 133 // mockPDSClientFactory creates mock PDS clients for testing
+5 -1
internal/core/comments/errors.go
··· 29 29 30 30 // ErrCommentAlreadyExists indicates a comment with this URI already exists 31 31 ErrCommentAlreadyExists = errors.New("comment already exists") 32 + 33 + // ErrConcurrentModification indicates the comment was modified since it was loaded 34 + ErrConcurrentModification = errors.New("comment was modified by another operation") 32 35 ) 33 36 34 37 // IsNotFound checks if an error is a "not found" error ··· 40 43 41 44 // IsConflict checks if an error is a conflict/already exists error 42 45 func IsConflict(err error) bool { 43 - return errors.Is(err, ErrCommentAlreadyExists) 46 + return errors.Is(err, ErrCommentAlreadyExists) || 47 + errors.Is(err, ErrConcurrentModification) 44 48 } 45 49 46 50 // IsValidationError checks if an error is a validation error
+169
tests/integration/comment_write_test.go
··· 806 806 func parseTestDID(did string) (syntax.DID, error) { 807 807 return syntax.ParseDID(did) 808 808 } 809 + 810 + // TestCommentWrite_ConcurrentModificationDetection tests that PutRecord's swapRecord 811 + // CID validation correctly detects concurrent modifications. 812 + // This verifies the optimistic locking mechanism that prevents lost updates. 813 + func TestCommentWrite_ConcurrentModificationDetection(t *testing.T) { 814 + if testing.Short() { 815 + t.Skip("Skipping E2E test in short mode") 816 + } 817 + 818 + db := setupTestDB(t) 819 + defer func() { _ = db.Close() }() 820 + 821 + ctx := context.Background() 822 + pdsURL := getTestPDSURL() 823 + 824 + // Setup repositories and service 825 + commentRepo := postgres.NewCommentRepository(db) 826 + 827 + commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 828 + if session.AccessToken == "" { 829 + return nil, fmt.Errorf("session has no access token") 830 + } 831 + if session.HostURL == "" { 832 + return nil, fmt.Errorf("session has no host URL") 833 + } 834 + return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 835 + } 836 + 837 + commentService := comments.NewCommentServiceWithPDSFactory( 838 + commentRepo, 839 + nil, 840 + nil, 841 + nil, 842 + nil, 843 + commentPDSFactory, 844 + ) 845 + 846 + // Create test user 847 + testUserHandle := fmt.Sprintf("concurrency-%d.local.coves.dev", time.Now().Unix()) 848 + testUserEmail := fmt.Sprintf("concurrency-%d@test.local", time.Now().Unix()) 849 + testUserPassword := "test-password-123" 850 + 851 + pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 852 + if err != nil { 853 + t.Skipf("PDS not available: %v", err) 854 + } 855 + 856 + // Setup OAuth 857 + mockStore := NewMockOAuthStore() 858 + mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL) 859 + 860 + parsedDID, _ := parseTestDID(userDID) 861 + session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+userDID) 862 + 863 + // Step 1: Create a comment 864 + t.Logf("\n๐Ÿ“ Step 1: Creating initial comment...") 865 + createReq := comments.CreateCommentRequest{ 866 + Reply: comments.ReplyRef{ 867 + Root: comments.StrongRef{ 868 + URI: "at://did:plc:test/social.coves.community.post/test123", 869 + CID: "bafypost", 870 + }, 871 + Parent: comments.StrongRef{ 872 + URI: "at://did:plc:test/social.coves.community.post/test123", 873 + CID: "bafypost", 874 + }, 875 + }, 876 + Content: "Original content for concurrency test", 877 + Langs: []string{"en"}, 878 + } 879 + 880 + createResp, err := commentService.CreateComment(ctx, session, createReq) 881 + if err != nil { 882 + t.Fatalf("Failed to create comment: %v", err) 883 + } 884 + t.Logf("โœ… Comment created: URI=%s, CID=%s", createResp.URI, createResp.CID) 885 + originalCID := createResp.CID 886 + 887 + // Step 2: Update the comment (this changes the CID) 888 + t.Logf("\n๐Ÿ“ Step 2: Updating comment (this changes CID)...") 889 + updateReq := comments.UpdateCommentRequest{ 890 + URI: createResp.URI, 891 + Content: "Updated content - CID has changed", 892 + } 893 + 894 + updateResp, err := commentService.UpdateComment(ctx, session, updateReq) 895 + if err != nil { 896 + t.Fatalf("Failed to update comment: %v", err) 897 + } 898 + t.Logf("โœ… Comment updated: New CID=%s", updateResp.CID) 899 + newCID := updateResp.CID 900 + 901 + // Verify CIDs are different 902 + if originalCID == newCID { 903 + t.Fatalf("CIDs should be different after update: original=%s, new=%s", originalCID, newCID) 904 + } 905 + 906 + // Step 3: Simulate concurrent modification detection using direct PDS client 907 + // Create a PDS client and attempt to update with the stale (original) CID 908 + t.Logf("\n๐Ÿ” Step 3: Testing concurrent modification detection with stale CID...") 909 + 910 + pdsClient, err := pds.NewFromAccessToken(pdsURL, userDID, pdsAccessToken) 911 + if err != nil { 912 + t.Fatalf("Failed to create PDS client: %v", err) 913 + } 914 + 915 + rkey := utils.ExtractRKeyFromURI(createResp.URI) 916 + 917 + // Try to update with the ORIGINAL (now stale) CID - this should fail with 409 918 + staleRecord := map[string]interface{}{ 919 + "$type": "social.coves.community.comment", 920 + "reply": map[string]interface{}{ 921 + "root": map[string]interface{}{ 922 + "uri": "at://did:plc:test/social.coves.community.post/test123", 923 + "cid": "bafypost", 924 + }, 925 + "parent": map[string]interface{}{ 926 + "uri": "at://did:plc:test/social.coves.community.post/test123", 927 + "cid": "bafypost", 928 + }, 929 + }, 930 + "content": "This update should fail - using stale CID", 931 + "createdAt": time.Now().UTC().Format(time.RFC3339), 932 + } 933 + 934 + _, _, err = pdsClient.PutRecord(ctx, "social.coves.community.comment", rkey, staleRecord, originalCID) 935 + 936 + // Verify we get ErrConflict 937 + if err == nil { 938 + t.Fatal("Expected ErrConflict when updating with stale CID, got nil") 939 + } 940 + 941 + if !errors.Is(err, pds.ErrConflict) { 942 + t.Errorf("Expected pds.ErrConflict, got: %v", err) 943 + } 944 + 945 + t.Logf("โœ… Correctly detected concurrent modification!") 946 + t.Logf(" Error: %v", err) 947 + 948 + // Step 4: Verify that updating with the correct CID succeeds 949 + t.Logf("\n๐Ÿ“ Step 4: Verifying update with correct CID succeeds...") 950 + correctRecord := map[string]interface{}{ 951 + "$type": "social.coves.community.comment", 952 + "reply": map[string]interface{}{ 953 + "root": map[string]interface{}{ 954 + "uri": "at://did:plc:test/social.coves.community.post/test123", 955 + "cid": "bafypost", 956 + }, 957 + "parent": map[string]interface{}{ 958 + "uri": "at://did:plc:test/social.coves.community.post/test123", 959 + "cid": "bafypost", 960 + }, 961 + }, 962 + "content": "This update should succeed - using correct CID", 963 + "createdAt": time.Now().UTC().Format(time.RFC3339), 964 + } 965 + 966 + _, finalCID, err := pdsClient.PutRecord(ctx, "social.coves.community.comment", rkey, correctRecord, newCID) 967 + if err != nil { 968 + t.Fatalf("Update with correct CID should succeed, got: %v", err) 969 + } 970 + 971 + t.Logf("โœ… Update with correct CID succeeded: New CID=%s", finalCID) 972 + 973 + t.Logf("\nโœ… CONCURRENT MODIFICATION DETECTION TEST COMPLETE:") 974 + t.Logf(" โœ“ PutRecord with stale CID correctly returns ErrConflict") 975 + t.Logf(" โœ“ PutRecord with correct CID succeeds") 976 + t.Logf(" โœ“ Optimistic locking prevents lost updates") 977 + }