A community based topic aggregation platform built on atproto

fix(auth): use DPoP authentication for user write operations

Replace plain Bearer token authentication with proper DPoP-authenticated
OAuth sessions for subscribe, unsubscribe, block, and unblock operations.

The issue was that atProto OAuth tokens are DPoP-bound and require both
an access token AND a DPoP proof header. The community service was using
plain Bearer authentication which caused "Malformed token" errors.

Changes:
- Update Service interface to use *oauth.ClientSessionData
- Add PDSClientFactory pattern for testability
- Update handlers to get OAuth session from middleware
- Update unit tests to inject OAuth session into context

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+3 -2
cmd/server/main.go
··· 276 276 log.Printf(" - Communities will be created at: %s", defaultPDS) 277 277 log.Printf(" - PDS will generate and manage all DIDs and keys") 278 278 279 - // Initialize community service (no longer needs didGenerator directly) 280 - communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 279 + // Initialize community service with OAuth client for user DPoP authentication 280 + // OAuth client is required for subscribe/unsubscribe/block/unblock operations 281 + communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner, oauthClient, oauthStore) 281 282 282 283 // Authenticate Coves instance with PDS to enable community record writes 283 284 // The instance needs a PDS account to write community records it owns
+14 -56
internal/api/handlers/community/block.go
··· 47 47 return 48 48 } 49 49 50 - // Extract authenticated user DID and access token from request context (injected by auth middleware) 51 - userDID := middleware.GetUserDID(r) 52 - if userDID == "" { 50 + // Get OAuth session from context (injected by auth middleware) 51 + // The session contains the user's DID and credentials needed for DPoP authentication 52 + session := middleware.GetOAuthSession(r) 53 + if session == nil { 53 54 writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required") 54 55 return 55 56 } 56 57 57 - userAccessToken := middleware.GetUserAccessToken(r) 58 - if userAccessToken == "" { 59 - writeError(w, http.StatusUnauthorized, "AuthRequired", "Missing access token") 60 - return 61 - } 62 - 63 - // Resolve community identifier (handle or DID) to DID 64 - // This allows users to block by handle: @gaming.community.coves.social or !gaming@coves.social 65 - communityDID, err := h.service.ResolveCommunityIdentifier(r.Context(), req.Community) 66 - if err != nil { 67 - if communities.IsNotFound(err) { 68 - writeError(w, http.StatusNotFound, "CommunityNotFound", "Community not found") 69 - return 70 - } 71 - if communities.IsValidationError(err) { 72 - writeError(w, http.StatusBadRequest, "InvalidRequest", err.Error()) 73 - return 74 - } 75 - log.Printf("Failed to resolve community identifier %s: %v", req.Community, err) 76 - writeError(w, http.StatusInternalServerError, "InternalError", "Failed to resolve community") 77 - return 78 - } 79 - 80 - // Block via service (write-forward to PDS) using resolved DID 81 - block, err := h.service.BlockCommunity(r.Context(), userDID, userAccessToken, communityDID) 58 + // Block via service (write-forward to PDS with DPoP authentication) 59 + // Service handles identifier resolution (DIDs, handles, scoped identifiers) 60 + block, err := h.service.BlockCommunity(r.Context(), session, req.Community) 82 61 if err != nil { 83 62 handleServiceError(w, err) 84 63 return ··· 125 104 return 126 105 } 127 106 128 - // Extract authenticated user DID and access token from request context (injected by auth middleware) 129 - userDID := middleware.GetUserDID(r) 130 - if userDID == "" { 107 + // Get OAuth session from context (injected by auth middleware) 108 + // The session contains the user's DID and credentials needed for DPoP authentication 109 + session := middleware.GetOAuthSession(r) 110 + if session == nil { 131 111 writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required") 132 112 return 133 113 } 134 114 135 - userAccessToken := middleware.GetUserAccessToken(r) 136 - if userAccessToken == "" { 137 - writeError(w, http.StatusUnauthorized, "AuthRequired", "Missing access token") 138 - return 139 - } 140 - 141 - // Resolve community identifier (handle or DID) to DID 142 - // This allows users to unblock by handle: @gaming.community.coves.social or !gaming@coves.social 143 - communityDID, err := h.service.ResolveCommunityIdentifier(r.Context(), req.Community) 144 - if err != nil { 145 - if communities.IsNotFound(err) { 146 - writeError(w, http.StatusNotFound, "CommunityNotFound", "Community not found") 147 - return 148 - } 149 - if communities.IsValidationError(err) { 150 - writeError(w, http.StatusBadRequest, "InvalidRequest", err.Error()) 151 - return 152 - } 153 - log.Printf("Failed to resolve community identifier %s: %v", req.Community, err) 154 - writeError(w, http.StatusInternalServerError, "InternalError", "Failed to resolve community") 155 - return 156 - } 157 - 158 - // Unblock via service (delete record on PDS) using resolved DID 159 - err = h.service.UnblockCommunity(r.Context(), userDID, userAccessToken, communityDID) 115 + // Unblock via service (delete record on PDS with DPoP authentication) 116 + // Service handles identifier resolution (DIDs, handles, scoped identifiers) 117 + err := h.service.UnblockCommunity(r.Context(), session, req.Community) 160 118 if err != nil { 161 119 handleServiceError(w, err) 162 120 return
+6 -4
internal/api/handlers/community/create_test.go
··· 10 10 "net/http/httptest" 11 11 "testing" 12 12 "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/auth/oauth" 13 15 ) 14 16 15 17 // mockCommunityService implements communities.Service for testing ··· 49 51 return nil, 0, nil 50 52 } 51 53 52 - func (m *mockCommunityService) SubscribeToCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) { 54 + func (m *mockCommunityService) SubscribeToCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) { 53 55 return nil, nil 54 56 } 55 57 56 - func (m *mockCommunityService) UnsubscribeFromCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string) error { 58 + func (m *mockCommunityService) UnsubscribeFromCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error { 57 59 return nil 58 60 } 59 61 ··· 65 67 return nil, nil 66 68 } 67 69 68 - func (m *mockCommunityService) BlockCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string) (*communities.CommunityBlock, error) { 70 + func (m *mockCommunityService) BlockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) (*communities.CommunityBlock, error) { 69 71 return nil, nil 70 72 } 71 73 72 - func (m *mockCommunityService) UnblockCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string) error { 74 + func (m *mockCommunityService) UnblockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error { 73 75 return nil 74 76 } 75 77
+12 -23
internal/api/handlers/community/subscribe.go
··· 51 51 return 52 52 } 53 53 54 - // Extract authenticated user DID and access token from request context (injected by auth middleware) 55 - // Note: contentVisibility defaults and clamping handled by service layer 56 - userDID := middleware.GetUserDID(r) 57 - if userDID == "" { 54 + // Get OAuth session from context (injected by auth middleware) 55 + // The session contains the user's DID and credentials needed for DPoP authentication 56 + session := middleware.GetOAuthSession(r) 57 + if session == nil { 58 58 writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required") 59 59 return 60 60 } 61 61 62 - userAccessToken := middleware.GetUserAccessToken(r) 63 - if userAccessToken == "" { 64 - writeError(w, http.StatusUnauthorized, "AuthRequired", "Missing access token") 65 - return 66 - } 67 - 68 - // Subscribe via service (write-forward to PDS) 62 + // Subscribe via service (write-forward to PDS with DPoP authentication) 69 63 // Service handles identifier resolution (DIDs, handles, scoped identifiers) 70 - subscription, err := h.service.SubscribeToCommunity(r.Context(), userDID, userAccessToken, req.Community, req.ContentVisibility) 64 + subscription, err := h.service.SubscribeToCommunity(r.Context(), session, req.Community, req.ContentVisibility) 71 65 if err != nil { 72 66 handleServiceError(w, err) 73 67 return ··· 117 111 return 118 112 } 119 113 120 - // Extract authenticated user DID and access token from request context (injected by auth middleware) 121 - userDID := middleware.GetUserDID(r) 122 - if userDID == "" { 114 + // Get OAuth session from context (injected by auth middleware) 115 + // The session contains the user's DID and credentials needed for DPoP authentication 116 + session := middleware.GetOAuthSession(r) 117 + if session == nil { 123 118 writeError(w, http.StatusUnauthorized, "AuthRequired", "Authentication required") 124 119 return 125 120 } 126 121 127 - userAccessToken := middleware.GetUserAccessToken(r) 128 - if userAccessToken == "" { 129 - writeError(w, http.StatusUnauthorized, "AuthRequired", "Missing access token") 130 - return 131 - } 132 - 133 - // Unsubscribe via service (delete record on PDS) 122 + // Unsubscribe via service (delete record on PDS with DPoP authentication) 134 123 // Service handles identifier resolution (DIDs, handles, scoped identifiers) 135 - err := h.service.UnsubscribeFromCommunity(r.Context(), userDID, userAccessToken, req.Community) 124 + err := h.service.UnsubscribeFromCommunity(r.Context(), session, req.Community) 136 125 if err != nil { 137 126 handleServiceError(w, err) 138 127 return
+49 -29
internal/api/handlers/community/subscribe_test.go
··· 11 11 "net/http/httptest" 12 12 "testing" 13 13 "time" 14 + 15 + "github.com/bluesky-social/indigo/atproto/auth/oauth" 16 + "github.com/bluesky-social/indigo/atproto/syntax" 14 17 ) 15 18 19 + // createTestOAuthSession creates a mock OAuth session for testing 20 + func createTestOAuthSession(did string) *oauth.ClientSessionData { 21 + parsedDID, _ := syntax.ParseDID(did) 22 + return &oauth.ClientSessionData{ 23 + AccountDID: parsedDID, 24 + SessionID: "test-session", 25 + HostURL: "http://localhost:3001", 26 + AccessToken: "test-access-token", 27 + } 28 + } 29 + 16 30 // subscribeTestService implements communities.Service for subscribe handler tests 17 31 type subscribeTestService struct { 18 - subscribeFunc func(ctx context.Context, userDID, accessToken, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) 19 - unsubscribeFunc func(ctx context.Context, userDID, accessToken, communityIdentifier string) error 32 + subscribeFunc func(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) 33 + unsubscribeFunc func(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error 20 34 } 21 35 22 36 func (m *subscribeTestService) CreateCommunity(ctx context.Context, req communities.CreateCommunityRequest) (*communities.Community, error) { ··· 39 53 return nil, 0, nil 40 54 } 41 55 42 - func (m *subscribeTestService) SubscribeToCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) { 56 + func (m *subscribeTestService) SubscribeToCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) { 43 57 if m.subscribeFunc != nil { 44 - return m.subscribeFunc(ctx, userDID, accessToken, communityIdentifier, contentVisibility) 58 + return m.subscribeFunc(ctx, session, communityIdentifier, contentVisibility) 59 + } 60 + userDID := "" 61 + if session != nil { 62 + userDID = session.AccountDID.String() 45 63 } 46 64 return &communities.Subscription{ 47 65 UserDID: userDID, ··· 52 70 }, nil 53 71 } 54 72 55 - func (m *subscribeTestService) UnsubscribeFromCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string) error { 73 + func (m *subscribeTestService) UnsubscribeFromCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error { 56 74 if m.unsubscribeFunc != nil { 57 - return m.unsubscribeFunc(ctx, userDID, accessToken, communityIdentifier) 75 + return m.unsubscribeFunc(ctx, session, communityIdentifier) 58 76 } 59 77 return nil 60 78 } ··· 67 85 return nil, nil 68 86 } 69 87 70 - func (m *subscribeTestService) BlockCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string) (*communities.CommunityBlock, error) { 88 + func (m *subscribeTestService) BlockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) (*communities.CommunityBlock, error) { 71 89 return nil, nil 72 90 } 73 91 74 - func (m *subscribeTestService) UnblockCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string) error { 92 + func (m *subscribeTestService) UnblockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error { 75 93 return nil 76 94 } 77 95 ··· 144 162 t.Run(tc.name, func(t *testing.T) { 145 163 var receivedIdentifier string 146 164 mockService := &subscribeTestService{ 147 - subscribeFunc: func(ctx context.Context, userDID, accessToken, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) { 165 + subscribeFunc: func(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) { 148 166 receivedIdentifier = communityIdentifier 167 + userDID := "" 168 + if session != nil { 169 + userDID = session.AccountDID.String() 170 + } 149 171 return &communities.Subscription{ 150 172 UserDID: userDID, 151 173 CommunityDID: "did:plc:resolved", ··· 167 189 req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBuffer(bodyBytes)) 168 190 req.Header.Set("Content-Type", "application/json") 169 191 170 - // Inject auth context 171 - ctx := context.WithValue(req.Context(), middleware.UserDIDKey, "did:plc:testuser") 172 - ctx = context.WithValue(ctx, middleware.UserAccessToken, "test-token") 192 + // Inject OAuth session into context 193 + session := createTestOAuthSession("did:plc:testuser") 194 + ctx := context.WithValue(req.Context(), middleware.OAuthSessionKey, session) 173 195 req = req.WithContext(ctx) 174 196 175 197 w := httptest.NewRecorder() ··· 244 266 req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBuffer(bodyBytes)) 245 267 req.Header.Set("Content-Type", "application/json") 246 268 247 - ctx := context.WithValue(req.Context(), middleware.UserDIDKey, "did:plc:testuser") 248 - ctx = context.WithValue(ctx, middleware.UserAccessToken, "test-token") 269 + session := createTestOAuthSession("did:plc:testuser") 270 + ctx := context.WithValue(req.Context(), middleware.OAuthSessionKey, session) 249 271 req = req.WithContext(ctx) 250 272 251 273 w := httptest.NewRecorder() ··· 286 308 for _, tc := range tests { 287 309 t.Run(tc.name, func(t *testing.T) { 288 310 mockService := &subscribeTestService{ 289 - subscribeFunc: func(ctx context.Context, userDID, accessToken, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) { 311 + subscribeFunc: func(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) { 290 312 return nil, tc.serviceErr 291 313 }, 292 314 } ··· 302 324 req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBuffer(bodyBytes)) 303 325 req.Header.Set("Content-Type", "application/json") 304 326 305 - ctx := context.WithValue(req.Context(), middleware.UserDIDKey, "did:plc:testuser") 306 - ctx = context.WithValue(ctx, middleware.UserAccessToken, "test-token") 327 + session := createTestOAuthSession("did:plc:testuser") 328 + ctx := context.WithValue(req.Context(), middleware.OAuthSessionKey, session) 307 329 req = req.WithContext(ctx) 308 330 309 331 w := httptest.NewRecorder() ··· 353 375 t.Run(tc.name, func(t *testing.T) { 354 376 var receivedIdentifier string 355 377 mockService := &subscribeTestService{ 356 - unsubscribeFunc: func(ctx context.Context, userDID, accessToken, communityIdentifier string) error { 378 + unsubscribeFunc: func(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error { 357 379 receivedIdentifier = communityIdentifier 358 380 return nil 359 381 }, ··· 369 391 req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.unsubscribe", bytes.NewBuffer(bodyBytes)) 370 392 req.Header.Set("Content-Type", "application/json") 371 393 372 - ctx := context.WithValue(req.Context(), middleware.UserDIDKey, "did:plc:testuser") 373 - ctx = context.WithValue(ctx, middleware.UserAccessToken, "test-token") 394 + session := createTestOAuthSession("did:plc:testuser") 395 + ctx := context.WithValue(req.Context(), middleware.OAuthSessionKey, session) 374 396 req = req.WithContext(ctx) 375 397 376 398 w := httptest.NewRecorder() ··· 399 421 400 422 func TestSubscribeHandler_Unsubscribe_SubscriptionNotFound(t *testing.T) { 401 423 mockService := &subscribeTestService{ 402 - unsubscribeFunc: func(ctx context.Context, userDID, accessToken, communityIdentifier string) error { 424 + unsubscribeFunc: func(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error { 403 425 return communities.ErrSubscriptionNotFound 404 426 }, 405 427 } ··· 414 436 req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.unsubscribe", bytes.NewBuffer(bodyBytes)) 415 437 req.Header.Set("Content-Type", "application/json") 416 438 417 - ctx := context.WithValue(req.Context(), middleware.UserDIDKey, "did:plc:testuser") 418 - ctx = context.WithValue(ctx, middleware.UserAccessToken, "test-token") 439 + session := createTestOAuthSession("did:plc:testuser") 440 + ctx := context.WithValue(req.Context(), middleware.OAuthSessionKey, session) 419 441 req = req.WithContext(ctx) 420 442 421 443 w := httptest.NewRecorder() ··· 456 478 req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBufferString("invalid json")) 457 479 req.Header.Set("Content-Type", "application/json") 458 480 459 - ctx := context.WithValue(req.Context(), middleware.UserDIDKey, "did:plc:testuser") 460 - ctx = context.WithValue(ctx, middleware.UserAccessToken, "test-token") 481 + session := createTestOAuthSession("did:plc:testuser") 482 + ctx := context.WithValue(req.Context(), middleware.OAuthSessionKey, session) 461 483 req = req.WithContext(ctx) 462 484 463 485 w := httptest.NewRecorder() ··· 468 490 } 469 491 } 470 492 471 - func TestSubscribeHandler_RequiresAccessToken(t *testing.T) { 493 + func TestSubscribeHandler_RequiresOAuthSession(t *testing.T) { 472 494 mockService := &subscribeTestService{} 473 495 handler := NewSubscribeHandler(mockService) 474 496 ··· 480 502 req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBuffer(bodyBytes)) 481 503 req.Header.Set("Content-Type", "application/json") 482 504 483 - // User DID but no access token 484 - ctx := context.WithValue(req.Context(), middleware.UserDIDKey, "did:plc:testuser") 485 - req = req.WithContext(ctx) 505 + // No OAuth session in context 486 506 487 507 w := httptest.NewRecorder() 488 508 handler.HandleSubscribe(w, req)
+5
internal/atproto/pds/errors.go
··· 27 27 func IsAuthError(err error) bool { 28 28 return errors.Is(err, ErrUnauthorized) || errors.Is(err, ErrForbidden) 29 29 } 30 + 31 + // IsConflictError returns true if the error indicates a conflict (e.g., duplicate record). 32 + func IsConflictError(err error) bool { 33 + return errors.Is(err, ErrConflict) 34 + }
+11 -5
internal/core/communities/interfaces.go
··· 1 1 package communities 2 2 3 - import "context" 3 + import ( 4 + "context" 5 + 6 + "github.com/bluesky-social/indigo/atproto/auth/oauth" 7 + ) 4 8 5 9 // Repository defines the interface for community data persistence 6 10 // This is the AppView's indexed view of communities from the firehose ··· 66 70 SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) 67 71 68 72 // Subscription operations (write-forward: creates record in user's PDS) 69 - SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) 70 - UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error 73 + // OAuth session is passed for DPoP authentication to the user's PDS 74 + SubscribeToCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string, contentVisibility int) (*Subscription, error) 75 + UnsubscribeFromCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error 71 76 GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) 72 77 GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) 73 78 74 79 // Block operations (write-forward: creates record in user's PDS) 75 - BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) 76 - UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error 80 + // OAuth session is passed for DPoP authentication to the user's PDS 81 + BlockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) (*CommunityBlock, error) 82 + UnblockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error 77 83 GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*CommunityBlock, error) 78 84 IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) 79 85
+142 -44
internal/core/communities/service.go
··· 1 1 package communities 2 2 3 3 import ( 4 + oauthclient "Coves/internal/atproto/oauth" 5 + "Coves/internal/atproto/pds" 4 6 "Coves/internal/atproto/utils" 5 7 "bytes" 6 8 "context" ··· 14 16 "strings" 15 17 "sync" 16 18 "time" 19 + 20 + "github.com/bluesky-social/indigo/atproto/auth/oauth" 21 + "github.com/bluesky-social/indigo/atproto/syntax" 17 22 ) 18 23 19 24 // Community handle validation regex (DNS-valid handle: name.community.instance.com) ··· 25 30 26 31 // Domain validation (simplified - checks for valid DNS hostname structure) 27 32 var domainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 33 + 34 + // PDSClientFactory creates PDS clients from session data. 35 + // Used to allow injection of different auth mechanisms (OAuth for production, password for tests). 36 + type PDSClientFactory func(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) 28 37 29 38 type communityService struct { 30 39 // Interfaces and pointers first (better alignment) 31 40 repo Repository 32 41 provisioner *PDSAccountProvisioner 33 42 43 + // OAuth client/store for user PDS authentication (DPoP-based) 44 + oauthClient *oauthclient.OAuthClient 45 + oauthStore oauth.ClientAuthStore 46 + pdsClientFactory PDSClientFactory // Optional, for testing. If nil, uses OAuth. 47 + 34 48 // Token refresh concurrency control 35 49 // Each community gets its own mutex to prevent concurrent refresh attempts 36 50 refreshMutexes map[string]*sync.Mutex ··· 52 66 maxMutexCacheSize = 10000 53 67 ) 54 68 55 - // NewCommunityService creates a new community service 56 - func NewCommunityService(repo Repository, pdsURL, instanceDID, instanceDomain string, provisioner *PDSAccountProvisioner) Service { 69 + // NewCommunityService creates a new community service with OAuth client for user authentication 70 + func NewCommunityService( 71 + repo Repository, 72 + pdsURL, instanceDID, instanceDomain string, 73 + provisioner *PDSAccountProvisioner, 74 + oauthClient *oauthclient.OAuthClient, 75 + oauthStore oauth.ClientAuthStore, 76 + ) Service { 57 77 // SECURITY: Basic validation that did:web domain matches configured instanceDomain 58 78 // This catches honest configuration mistakes but NOT malicious code modifications 59 79 // Full verification (Phase 2) requires fetching DID document from domain ··· 74 94 instanceDID: instanceDID, 75 95 instanceDomain: instanceDomain, 76 96 provisioner: provisioner, 97 + oauthClient: oauthClient, 98 + oauthStore: oauthStore, 77 99 refreshMutexes: make(map[string]*sync.Mutex), 78 100 } 79 101 } 80 102 103 + // NewCommunityServiceWithPDSFactory creates a community service with a custom PDS client factory. 104 + // This is primarily for testing with password-based authentication. 105 + func NewCommunityServiceWithPDSFactory( 106 + repo Repository, 107 + pdsURL, instanceDID, instanceDomain string, 108 + provisioner *PDSAccountProvisioner, 109 + factory PDSClientFactory, 110 + ) Service { 111 + return &communityService{ 112 + repo: repo, 113 + pdsURL: pdsURL, 114 + instanceDID: instanceDID, 115 + instanceDomain: instanceDomain, 116 + provisioner: provisioner, 117 + pdsClientFactory: factory, 118 + refreshMutexes: make(map[string]*sync.Mutex), 119 + } 120 + } 121 + 81 122 // SetPDSAccessToken sets the PDS access token for authentication 82 123 // This should be called after creating a session for the Coves instance DID on the PDS 83 124 func (s *communityService) SetPDSAccessToken(token string) { 84 125 s.pdsAccessToken = token 126 + } 127 + 128 + // getPDSClient creates a PDS client from an OAuth session. 129 + // If a custom factory was provided (for testing), uses that. 130 + // Otherwise, uses DPoP authentication via indigo's APIClient for proper OAuth token handling. 131 + func (s *communityService) getPDSClient(ctx context.Context, session *oauth.ClientSessionData) (pds.Client, error) { 132 + // Use custom factory if provided (e.g., for testing with password auth) 133 + if s.pdsClientFactory != nil { 134 + return s.pdsClientFactory(ctx, session) 135 + } 136 + 137 + // Production path: use OAuth with DPoP 138 + if s.oauthClient == nil || s.oauthClient.ClientApp == nil { 139 + return nil, fmt.Errorf("OAuth client not configured") 140 + } 141 + 142 + client, err := pds.NewFromOAuthSession(ctx, s.oauthClient.ClientApp, session) 143 + if err != nil { 144 + return nil, fmt.Errorf("failed to create PDS client: %w", err) 145 + } 146 + 147 + return client, nil 85 148 } 86 149 87 150 // CreateCommunity creates a new community via write-forward to PDS ··· 585 648 } 586 649 587 650 // SubscribeToCommunity creates a subscription via write-forward to PDS 588 - func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string, contentVisibility int) (*Subscription, error) { 589 - if userDID == "" { 590 - return nil, NewValidationError("userDid", "required") 591 - } 592 - if userAccessToken == "" { 593 - return nil, NewValidationError("userAccessToken", "required") 651 + // Uses OAuth session with DPoP authentication for secure PDS communication 652 + func (s *communityService) SubscribeToCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string, contentVisibility int) (*Subscription, error) { 653 + if session == nil { 654 + return nil, NewValidationError("session", "required") 594 655 } 656 + 657 + userDID := session.AccountDID.String() 595 658 596 659 // Clamp contentVisibility to valid range (1-5), default to 3 if 0 or invalid 597 660 if contentVisibility <= 0 || contentVisibility > 5 { ··· 615 678 return nil, ErrUnauthorized 616 679 } 617 680 681 + // Create PDS client for this session (DPoP authentication) 682 + pdsClient, err := s.getPDSClient(ctx, session) 683 + if err != nil { 684 + return nil, fmt.Errorf("failed to create PDS client: %w", err) 685 + } 686 + 687 + // Generate TID for record key 688 + tid := syntax.NewTIDNow(0) 689 + 618 690 // Build subscription record 619 691 // CRITICAL: Collection is social.coves.community.subscription (RECORD TYPE), not social.coves.community.subscribe (XRPC procedure) 620 692 // This record will be created in the USER's repository: at://user_did/social.coves.community.subscription/{tid} ··· 626 698 "contentVisibility": contentVisibility, 627 699 } 628 700 629 - // Write-forward: create subscription record in user's repo using their access token 630 - // The collection parameter refers to the record type in the repository 631 - recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", "", subRecord, userAccessToken) 701 + // Write-forward: create subscription record in user's repo using DPoP-authenticated client 702 + recordURI, recordCID, err := pdsClient.CreateRecord(ctx, "social.coves.community.subscription", tid.String(), subRecord) 632 703 if err != nil { 704 + if pds.IsAuthError(err) { 705 + return nil, ErrUnauthorized 706 + } 633 707 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err) 634 708 } 635 709 ··· 647 721 } 648 722 649 723 // UnsubscribeFromCommunity removes a subscription via PDS delete 650 - func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 651 - if userDID == "" { 652 - return NewValidationError("userDid", "required") 724 + // Uses OAuth session with DPoP authentication for secure PDS communication 725 + func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error { 726 + if session == nil { 727 + return NewValidationError("session", "required") 653 728 } 654 - if userAccessToken == "" { 655 - return NewValidationError("userAccessToken", "required") 656 - } 729 + 730 + userDID := session.AccountDID.String() 657 731 658 732 // Resolve community identifier 659 733 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) ··· 673 747 return fmt.Errorf("invalid subscription record URI") 674 748 } 675 749 676 - // Write-forward: delete record from PDS using user's access token 750 + // Create PDS client for this session (DPoP authentication) 751 + pdsClient, err := s.getPDSClient(ctx, session) 752 + if err != nil { 753 + return fmt.Errorf("failed to create PDS client: %w", err) 754 + } 755 + 756 + // Write-forward: delete record from PDS using DPoP-authenticated client 677 757 // CRITICAL: Delete from social.coves.community.subscription (RECORD TYPE), not social.coves.community.unsubscribe 678 - if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.subscription", rkey, userAccessToken); err != nil { 758 + if err := pdsClient.DeleteRecord(ctx, "social.coves.community.subscription", rkey); err != nil { 759 + if pds.IsAuthError(err) { 760 + return ErrUnauthorized 761 + } 679 762 return fmt.Errorf("failed to delete subscription on PDS: %w", err) 680 763 } 681 764 ··· 730 813 } 731 814 732 815 // BlockCommunity blocks a community via write-forward to PDS 733 - func (s *communityService) BlockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) (*CommunityBlock, error) { 734 - if userDID == "" { 735 - return nil, NewValidationError("userDid", "required") 816 + // Uses OAuth session with DPoP authentication for secure PDS communication 817 + func (s *communityService) BlockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) (*CommunityBlock, error) { 818 + if session == nil { 819 + return nil, NewValidationError("session", "required") 736 820 } 737 - if userAccessToken == "" { 738 - return nil, NewValidationError("userAccessToken", "required") 739 - } 821 + 822 + userDID := session.AccountDID.String() 740 823 741 824 // Resolve community identifier (also verifies community exists) 742 825 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 743 826 if err != nil { 744 827 return nil, err 745 828 } 829 + 830 + // Create PDS client for this session (DPoP authentication) 831 + pdsClient, err := s.getPDSClient(ctx, session) 832 + if err != nil { 833 + return nil, fmt.Errorf("failed to create PDS client: %w", err) 834 + } 835 + 836 + // Generate TID for record key 837 + tid := syntax.NewTIDNow(0) 746 838 747 839 // Build block record 748 840 // CRITICAL: Collection is social.coves.community.block (RECORD TYPE) ··· 754 846 "createdAt": time.Now().Format(time.RFC3339), 755 847 } 756 848 757 - // Write-forward: create block record in user's repo using their access token 849 + // Write-forward: create block record in user's repo using DPoP-authenticated client 758 850 // Note: We don't check for existing blocks first because: 759 851 // 1. The PDS may reject duplicates (depending on implementation) 760 852 // 2. The repository layer handles idempotency with ON CONFLICT DO NOTHING 761 853 // 3. This avoids a race condition where two concurrent requests both pass the check 762 - recordURI, recordCID, err := s.createRecordOnPDSAs(ctx, userDID, "social.coves.community.block", "", blockRecord, userAccessToken) 854 + recordURI, recordCID, err := pdsClient.CreateRecord(ctx, "social.coves.community.block", tid.String(), blockRecord) 763 855 if err != nil { 856 + // Check for auth errors first 857 + if pds.IsAuthError(err) { 858 + return nil, ErrUnauthorized 859 + } 860 + 764 861 // Check if this is a duplicate/conflict error from PDS 765 - // PDS should return 409 Conflict for duplicate records, but we also check common error messages 766 - // for compatibility with different PDS implementations 767 - errMsg := err.Error() 768 - isDuplicate := strings.Contains(errMsg, "status 409") || // HTTP 409 Conflict 769 - strings.Contains(errMsg, "duplicate") || 770 - strings.Contains(errMsg, "already exists") || 771 - strings.Contains(errMsg, "AlreadyExists") 772 - 773 - if isDuplicate { 862 + if pds.IsConflictError(err) { 774 863 // Fetch and return existing block from our indexed view 775 864 existingBlock, getErr := s.repo.GetBlock(ctx, userDID, communityDID) 776 865 if getErr == nil { ··· 804 893 } 805 894 806 895 // UnblockCommunity removes a block via PDS delete 807 - func (s *communityService) UnblockCommunity(ctx context.Context, userDID, userAccessToken, communityIdentifier string) error { 808 - if userDID == "" { 809 - return NewValidationError("userDid", "required") 896 + // Uses OAuth session with DPoP authentication for secure PDS communication 897 + func (s *communityService) UnblockCommunity(ctx context.Context, session *oauth.ClientSessionData, communityIdentifier string) error { 898 + if session == nil { 899 + return NewValidationError("session", "required") 810 900 } 811 - if userAccessToken == "" { 812 - return NewValidationError("userAccessToken", "required") 813 - } 901 + 902 + userDID := session.AccountDID.String() 814 903 815 904 // Resolve community identifier 816 905 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) ··· 830 919 return fmt.Errorf("invalid block record URI") 831 920 } 832 921 833 - // Write-forward: delete record from PDS using user's access token 834 - if err := s.deleteRecordOnPDSAs(ctx, userDID, "social.coves.community.block", rkey, userAccessToken); err != nil { 922 + // Create PDS client for this session (DPoP authentication) 923 + pdsClient, err := s.getPDSClient(ctx, session) 924 + if err != nil { 925 + return fmt.Errorf("failed to create PDS client: %w", err) 926 + } 927 + 928 + // Write-forward: delete record from PDS using DPoP-authenticated client 929 + if err := pdsClient.DeleteRecord(ctx, "social.coves.community.block", rkey); err != nil { 930 + if pds.IsAuthError(err) { 931 + return ErrUnauthorized 932 + } 835 933 return fmt.Errorf("failed to delete block on PDS: %w", err) 836 934 } 837 935
+3 -5
tests/e2e/user_signup_test.go
··· 391 391 } 392 392 393 393 var result struct { 394 - DID string `json:"did"` 395 - Profile struct { 396 - Handle string `json:"handle"` 397 - } `json:"profile"` 394 + DID string `json:"did"` 395 + Handle string `json:"handle"` 398 396 } 399 397 400 398 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 401 399 return "", "", fmt.Errorf("failed to decode response: %w", err) 402 400 } 403 401 404 - return result.DID, result.Profile.Handle, nil 402 + return result.DID, result.Handle, nil 405 403 }
+1 -1
tests/integration/aggregator_e2e_test.go
··· 68 68 identityConfig := identity.DefaultConfig() 69 69 identityResolver := identity.NewResolver(db, identityConfig) 70 70 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001") 71 - communityService := communities.NewCommunityService(communityRepo, "http://localhost:3001", "did:web:test.coves.social", "coves.social", nil) 71 + communityService := communities.NewCommunityServiceWithPDSFactory(communityRepo, "http://localhost:3001", "did:web:test.coves.social", "coves.social", nil, nil) 72 72 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 73 73 postService := posts.NewPostService(postRepo, communityService, aggregatorService, nil, nil, nil, "http://localhost:3001") 74 74
+5 -5
tests/integration/author_posts_e2e_test.go
··· 71 71 // Setup services 72 72 resolver := identity.NewResolver(db, identity.DefaultConfig()) 73 73 userService := users.NewUserService(userRepo, resolver, pdsURL) 74 - communityService := communities.NewCommunityService(communityRepo, pdsURL, getTestInstanceDID(), "", nil) 74 + communityService := communities.NewCommunityServiceWithPDSFactory(communityRepo, pdsURL, getTestInstanceDID(), "", nil, nil) 75 75 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, pdsURL) 76 76 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 77 77 ··· 289 289 290 290 resolver := identity.NewResolver(db, identity.DefaultConfig()) 291 291 userService := users.NewUserService(userRepo, resolver, getTestPDSURL()) 292 - communityService := communities.NewCommunityService(communityRepo, getTestPDSURL(), getTestInstanceDID(), "", nil) 292 + communityService := communities.NewCommunityServiceWithPDSFactory(communityRepo, getTestPDSURL(), getTestInstanceDID(), "", nil, nil) 293 293 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, getTestPDSURL()) 294 294 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 295 295 ··· 428 428 429 429 resolver := identity.NewResolver(db, identity.DefaultConfig()) 430 430 userService := users.NewUserService(userRepo, resolver, getTestPDSURL()) 431 - communityService := communities.NewCommunityService(communityRepo, getTestPDSURL(), getTestInstanceDID(), "", nil) 431 + communityService := communities.NewCommunityServiceWithPDSFactory(communityRepo, getTestPDSURL(), getTestInstanceDID(), "", nil, nil) 432 432 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, getTestPDSURL()) 433 433 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 434 434 ··· 548 548 // Setup services 549 549 resolver := identity.NewResolver(db, identity.DefaultConfig()) 550 550 userService := users.NewUserService(userRepo, resolver, pdsURL) 551 - communityService := communities.NewCommunityService(communityRepo, pdsURL, getTestInstanceDID(), "", nil) 551 + communityService := communities.NewCommunityServiceWithPDSFactory(communityRepo, pdsURL, getTestInstanceDID(), "", nil, nil) 552 552 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, pdsURL) 553 553 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 554 554 ··· 658 658 659 659 resolver := identity.NewResolver(db, identity.DefaultConfig()) 660 660 userService := users.NewUserService(userRepo, resolver, getTestPDSURL()) 661 - communityService := communities.NewCommunityService(communityRepo, getTestPDSURL(), getTestInstanceDID(), "", nil) 661 + communityService := communities.NewCommunityServiceWithPDSFactory(communityRepo, getTestPDSURL(), getTestInstanceDID(), "", nil, nil) 662 662 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, getTestPDSURL()) 663 663 voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 664 664
+21 -5
tests/integration/block_handle_resolution_test.go
··· 13 13 "testing" 14 14 15 15 postgresRepo "Coves/internal/db/postgres" 16 + 17 + "github.com/bluesky-social/indigo/atproto/auth/oauth" 18 + "github.com/bluesky-social/indigo/atproto/syntax" 16 19 ) 17 20 21 + // createTestOAuthSessionForBlock creates a mock OAuth session for block handler tests 22 + func createTestOAuthSessionForBlock(did string) *oauth.ClientSessionData { 23 + parsedDID, _ := syntax.ParseDID(did) 24 + return &oauth.ClientSessionData{ 25 + AccountDID: parsedDID, 26 + SessionID: "test-session", 27 + HostURL: "http://localhost:3001", 28 + AccessToken: "test-access-token", 29 + } 30 + } 31 + 18 32 // TestBlockHandler_HandleResolution tests that the block handler accepts handles 19 33 // in addition to DIDs and resolves them correctly 20 34 func TestBlockHandler_HandleResolution(t *testing.T) { ··· 29 43 30 44 // Set up repositories and services 31 45 communityRepo := postgresRepo.NewCommunityRepository(db) 32 - communityService := communities.NewCommunityService( 46 + communityService := communities.NewCommunityServiceWithPDSFactory( 33 47 communityRepo, 34 48 getTestPDSURL(), 35 49 getTestInstanceDID(), 36 50 "coves.social", 37 51 nil, // No PDS HTTP client for this test 52 + nil, // No PDS factory needed for this test 38 53 ) 39 54 40 55 blockHandler := community.NewBlockHandler(communityService) ··· 193 208 req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.blockCommunity", bytes.NewBuffer(reqJSON)) 194 209 req.Header.Set("Content-Type", "application/json") 195 210 196 - // Add auth context so we get past auth checks and test resolution validation 197 - ctx := context.WithValue(req.Context(), middleware.UserDIDKey, "did:plc:test123") 198 - ctx = context.WithValue(ctx, middleware.UserAccessToken, "test-token") 211 + // Add OAuth session context so we get past auth checks and test resolution validation 212 + session := createTestOAuthSessionForBlock("did:plc:test123") 213 + ctx := context.WithValue(req.Context(), middleware.OAuthSessionKey, session) 199 214 req = req.WithContext(ctx) 200 215 201 216 w := httptest.NewRecorder() ··· 265 280 266 281 // Set up repositories and services 267 282 communityRepo := postgresRepo.NewCommunityRepository(db) 268 - communityService := communities.NewCommunityService( 283 + communityService := communities.NewCommunityServiceWithPDSFactory( 269 284 communityRepo, 270 285 getTestPDSURL(), 271 286 getTestInstanceDID(), 272 287 "coves.social", 273 288 nil, 289 + nil, // No PDS factory needed for this test 274 290 ) 275 291 276 292 blockHandler := community.NewBlockHandler(communityService)
+13 -3
tests/integration/community_e2e_test.go
··· 22 22 "testing" 23 23 "time" 24 24 25 + oauthlib "github.com/bluesky-social/indigo/atproto/auth/oauth" 26 + "github.com/bluesky-social/indigo/atproto/syntax" 25 27 "github.com/go-chi/chi/v5" 26 28 "github.com/gorilla/websocket" 27 29 _ "github.com/lib/pq" ··· 151 153 // PDS handles all DID generation and registration automatically 152 154 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 153 155 154 - // Create service (no longer needs didGen directly - provisioner owns it) 155 - communityService := communities.NewCommunityService(communityRepo, pdsURL, instanceDID, instanceDomain, provisioner) 156 + // Create service with PDS factory for password-based auth in tests 157 + communityService := communities.NewCommunityServiceWithPDSFactory(communityRepo, pdsURL, instanceDID, instanceDomain, provisioner, CommunityPasswordAuthPDSClientFactory()) 156 158 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 157 159 svc.SetPDSAccessToken(accessToken) 158 160 } ··· 950 952 t.Logf("Initial subscriber count: %d", initialSubscriberCount) 951 953 952 954 // Subscribe first (using instance access token for instance user, with contentVisibility=3) 953 - subscription, err := communityService.SubscribeToCommunity(ctx, instanceDID, accessToken, community.DID, 3) 955 + // Create a session for the instance user 956 + parsedDID, _ := syntax.ParseDID(instanceDID) 957 + instanceSession := &oauthlib.ClientSessionData{ 958 + AccountDID: parsedDID, 959 + SessionID: "test-session-e2e", 960 + HostURL: pdsURL, 961 + AccessToken: accessToken, 962 + } 963 + subscription, err := communityService.SubscribeToCommunity(ctx, instanceSession, community.DID, 3) 954 964 if err != nil { 955 965 t.Fatalf("Failed to subscribe: %v", err) 956 966 }
+8 -4
tests/integration/community_identifier_resolution_test.go
··· 50 50 instanceDID = "did:web:" + instanceDomain 51 51 } 52 52 53 - service := communities.NewCommunityService( 53 + service := communities.NewCommunityServiceWithPDSFactory( 54 54 repo, 55 55 pdsURL, 56 56 instanceDID, 57 57 instanceDomain, 58 58 provisioner, 59 + nil, 59 60 ) 60 61 61 62 // Create a test community to resolve ··· 244 245 } 245 246 246 247 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 247 - service := communities.NewCommunityService( 248 + service := communities.NewCommunityServiceWithPDSFactory( 248 249 repo, 249 250 pdsURL, 250 251 instanceDID, 251 252 instanceDomain, 252 253 provisioner, 254 + nil, 253 255 ) 254 256 255 257 tests := []struct { ··· 421 423 } 422 424 423 425 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 424 - service := communities.NewCommunityService( 426 + service := communities.NewCommunityServiceWithPDSFactory( 425 427 repo, 426 428 pdsURL, 427 429 instanceDID, 428 430 instanceDomain, 429 431 provisioner, 432 + nil, 430 433 ) 431 434 432 435 t.Run("DID error includes identifier", func(t *testing.T) { ··· 486 489 } 487 490 488 491 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 489 - service := communities.NewCommunityService( 492 + service := communities.NewCommunityServiceWithPDSFactory( 490 493 repo, 491 494 pdsURL, 492 495 instanceDID, 493 496 instanceDomain, 494 497 provisioner, 498 + nil, 495 499 ) 496 500 497 501 // Create a test community
+2 -1
tests/integration/community_provisioning_test.go
··· 146 146 147 147 repo := postgres.NewCommunityRepository(db) 148 148 provisioner := communities.NewPDSAccountProvisioner("test.local", "http://localhost:3001") 149 - service := communities.NewCommunityService( 149 + service := communities.NewCommunityServiceWithPDSFactory( 150 150 repo, 151 151 "http://localhost:3001", // pdsURL 152 152 "did:web:test.local", // instanceDID 153 153 "test.local", // instanceDomain 154 154 provisioner, 155 + nil, 155 156 ) 156 157 ctx := context.Background() 157 158
+10 -5
tests/integration/community_service_integration_test.go
··· 57 57 // Create provisioner and service (production code path) 58 58 // Use coves.social domain (configured in PDS_SERVICE_HANDLE_DOMAINS as c-{name}.coves.social) 59 59 provisioner := communities.NewPDSAccountProvisioner("coves.social", pdsURL) 60 - service := communities.NewCommunityService( 60 + service := communities.NewCommunityServiceWithPDSFactory( 61 61 repo, 62 62 pdsURL, 63 63 "did:web:coves.social", 64 64 "coves.social", 65 65 provisioner, 66 + nil, 66 67 ) 67 68 68 69 // Generate unique community name (keep short for DNS label limit) ··· 201 202 202 203 t.Run("handles PDS errors gracefully", func(t *testing.T) { 203 204 provisioner := communities.NewPDSAccountProvisioner("coves.social", pdsURL) 204 - service := communities.NewCommunityService( 205 + service := communities.NewCommunityServiceWithPDSFactory( 205 206 repo, 206 207 pdsURL, 207 208 "did:web:coves.social", 208 209 "coves.social", 209 210 provisioner, 211 + nil, 210 212 ) 211 213 212 214 // Try to create community with invalid name (should fail validation before PDS) ··· 232 234 233 235 t.Run("validates DNS label limits", func(t *testing.T) { 234 236 provisioner := communities.NewPDSAccountProvisioner("coves.social", pdsURL) 235 - service := communities.NewCommunityService( 237 + service := communities.NewCommunityServiceWithPDSFactory( 236 238 repo, 237 239 pdsURL, 238 240 "did:web:coves.social", 239 241 "coves.social", 240 242 provisioner, 243 + nil, 241 244 ) 242 245 243 246 // Try 64-char name (exceeds DNS limit of 63) ··· 301 304 repo := postgres.NewCommunityRepository(db) 302 305 303 306 provisioner := communities.NewPDSAccountProvisioner("coves.social", pdsURL) 304 - service := communities.NewCommunityService( 307 + service := communities.NewCommunityServiceWithPDSFactory( 305 308 repo, 306 309 pdsURL, 307 310 "did:web:coves.social", 308 311 "coves.social", 309 312 provisioner, 313 + nil, 310 314 ) 311 315 312 316 t.Run("updates community with real PDS", func(t *testing.T) { ··· 492 496 repo := postgres.NewCommunityRepository(db) 493 497 494 498 provisioner := communities.NewPDSAccountProvisioner("coves.social", pdsURL) 495 - service := communities.NewCommunityService( 499 + service := communities.NewCommunityServiceWithPDSFactory( 496 500 repo, 497 501 pdsURL, 498 502 "did:web:coves.social", 499 503 "coves.social", 500 504 provisioner, 505 + nil, 501 506 ) 502 507 503 508 t.Run("generated password works for session creation", func(t *testing.T) {
+2 -1
tests/integration/community_update_e2e_test.go
··· 88 88 // Setup services 89 89 communityRepo := postgres.NewCommunityRepository(db) 90 90 provisioner := communities.NewPDSAccountProvisioner("coves.social", pdsURL) 91 - communityService := communities.NewCommunityService( 91 + communityService := communities.NewCommunityServiceWithPDSFactory( 92 92 communityRepo, 93 93 pdsURL, 94 94 instanceDID, 95 95 "coves.social", 96 96 provisioner, 97 + nil, 97 98 ) 98 99 99 100 consumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, true, identityResolver)
+24 -12
tests/integration/feed_test.go
··· 29 29 // Setup services 30 30 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 31 31 communityRepo := postgres.NewCommunityRepository(db) 32 - communityService := communities.NewCommunityService( 32 + communityService := communities.NewCommunityServiceWithPDSFactory( 33 33 communityRepo, 34 34 "http://localhost:3001", 35 35 "did:web:test.coves.social", 36 36 "test.coves.social", 37 + nil, 37 38 nil, 38 39 ) 39 40 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) ··· 106 107 // Setup services 107 108 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 108 109 communityRepo := postgres.NewCommunityRepository(db) 109 - communityService := communities.NewCommunityService( 110 + communityService := communities.NewCommunityServiceWithPDSFactory( 110 111 communityRepo, 111 112 "http://localhost:3001", 112 113 "did:web:test.coves.social", 113 114 "test.coves.social", 114 115 nil, 116 + nil, 115 117 ) 116 118 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 117 119 handler := communityFeed.NewGetCommunityHandler(feedService, nil, nil) ··· 182 184 // Setup services 183 185 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 184 186 communityRepo := postgres.NewCommunityRepository(db) 185 - communityService := communities.NewCommunityService( 187 + communityService := communities.NewCommunityServiceWithPDSFactory( 186 188 communityRepo, 187 189 "http://localhost:3001", 188 190 "did:web:test.coves.social", 189 191 "test.coves.social", 192 + nil, 190 193 nil, 191 194 ) 192 195 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) ··· 238 241 // Setup services 239 242 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 240 243 communityRepo := postgres.NewCommunityRepository(db) 241 - communityService := communities.NewCommunityService( 244 + communityService := communities.NewCommunityServiceWithPDSFactory( 242 245 communityRepo, 243 246 "http://localhost:3001", 244 247 "did:web:test.coves.social", 245 248 "test.coves.social", 249 + nil, 246 250 nil, 247 251 ) 248 252 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) ··· 329 333 // Setup services 330 334 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 331 335 communityRepo := postgres.NewCommunityRepository(db) 332 - communityService := communities.NewCommunityService( 336 + communityService := communities.NewCommunityServiceWithPDSFactory( 333 337 communityRepo, 334 338 "http://localhost:3001", 335 339 "did:web:test.coves.social", 336 340 "test.coves.social", 341 + nil, 337 342 nil, 338 343 ) 339 344 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) ··· 365 370 // Setup services 366 371 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 367 372 communityRepo := postgres.NewCommunityRepository(db) 368 - communityService := communities.NewCommunityService( 373 + communityService := communities.NewCommunityServiceWithPDSFactory( 369 374 communityRepo, 370 375 "http://localhost:3001", 371 376 "did:web:test.coves.social", 372 377 "test.coves.social", 378 + nil, 373 379 nil, 374 380 ) 375 381 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) ··· 421 427 // Setup services 422 428 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 423 429 communityRepo := postgres.NewCommunityRepository(db) 424 - communityService := communities.NewCommunityService( 430 + communityService := communities.NewCommunityServiceWithPDSFactory( 425 431 communityRepo, 426 432 "http://localhost:3001", 427 433 "did:web:test.coves.social", 428 434 "test.coves.social", 435 + nil, 429 436 nil, 430 437 ) 431 438 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) ··· 465 472 // Setup services 466 473 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 467 474 communityRepo := postgres.NewCommunityRepository(db) 468 - communityService := communities.NewCommunityService( 475 + communityService := communities.NewCommunityServiceWithPDSFactory( 469 476 communityRepo, 470 477 "http://localhost:3001", 471 478 "did:web:test.coves.social", 472 479 "test.coves.social", 473 480 nil, 481 + nil, 474 482 ) 475 483 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 476 484 handler := communityFeed.NewGetCommunityHandler(feedService, nil, nil) ··· 518 526 // Setup services 519 527 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 520 528 communityRepo := postgres.NewCommunityRepository(db) 521 - communityService := communities.NewCommunityService( 529 + communityService := communities.NewCommunityServiceWithPDSFactory( 522 530 communityRepo, 523 531 "http://localhost:3001", 524 532 "did:web:test.coves.social", 525 533 "test.coves.social", 534 + nil, 526 535 nil, 527 536 ) 528 537 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) ··· 619 628 // Setup services 620 629 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 621 630 communityRepo := postgres.NewCommunityRepository(db) 622 - communityService := communities.NewCommunityService( 631 + communityService := communities.NewCommunityServiceWithPDSFactory( 623 632 communityRepo, 624 633 "http://localhost:3001", 625 634 "did:web:test.coves.social", 626 635 "test.coves.social", 636 + nil, 627 637 nil, 628 638 ) 629 639 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) ··· 721 731 // Setup services 722 732 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 723 733 communityRepo := postgres.NewCommunityRepository(db) 724 - communityService := communities.NewCommunityService( 734 + communityService := communities.NewCommunityServiceWithPDSFactory( 725 735 communityRepo, 726 736 "http://localhost:3001", 727 737 "did:web:test.coves.social", 728 738 "test.coves.social", 729 739 nil, 740 + nil, 730 741 ) 731 742 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 732 743 handler := communityFeed.NewGetCommunityHandler(feedService, nil, nil) ··· 822 833 // Setup services 823 834 feedRepo := postgres.NewCommunityFeedRepository(db, "test-cursor-secret") 824 835 communityRepo := postgres.NewCommunityRepository(db) 825 - communityService := communities.NewCommunityService( 836 + communityService := communities.NewCommunityServiceWithPDSFactory( 826 837 communityRepo, 827 838 "http://localhost:3001", 828 839 "did:web:test.coves.social", 829 840 "test.coves.social", 841 + nil, 830 842 nil, 831 843 ) 832 844 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
+17
tests/integration/helpers.go
··· 4 4 "Coves/internal/api/middleware" 5 5 "Coves/internal/atproto/oauth" 6 6 "Coves/internal/atproto/pds" 7 + "Coves/internal/core/communities" 7 8 "Coves/internal/core/users" 8 9 "Coves/internal/core/votes" 9 10 "bytes" ··· 443 444 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 444 445 } 445 446 } 447 + 448 + // CommunityPasswordAuthPDSClientFactory creates a PDSClientFactory for communities that uses password-based Bearer auth. 449 + // This is for E2E tests that use createSession instead of OAuth. 450 + // The factory extracts the access token and host URL from the session data. 451 + func CommunityPasswordAuthPDSClientFactory() communities.PDSClientFactory { 452 + return func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 453 + if session.AccessToken == "" { 454 + return nil, fmt.Errorf("session has no access token") 455 + } 456 + if session.HostURL == "" { 457 + return nil, fmt.Errorf("session has no host URL") 458 + } 459 + 460 + return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 461 + } 462 + }
+2 -1
tests/integration/post_creation_test.go
··· 35 35 36 36 communityRepo := postgres.NewCommunityRepository(db) 37 37 // Note: Provisioner not needed for this test (we're not actually creating communities) 38 - communityService := communities.NewCommunityService( 38 + communityService := communities.NewCommunityServiceWithPDSFactory( 39 39 communityRepo, 40 40 "http://localhost:3001", 41 41 "did:web:test.coves.social", 42 42 "test.coves.social", 43 43 nil, // provisioner 44 + nil, // pdsClientFactory 44 45 ) 45 46 46 47 postRepo := postgres.NewPostRepository(db)
+2 -1
tests/integration/post_e2e_test.go
··· 394 394 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 395 395 396 396 // Setup community service with real PDS provisioner 397 - communityService := communities.NewCommunityService( 397 + communityService := communities.NewCommunityServiceWithPDSFactory( 398 398 communityRepo, 399 399 pdsURL, 400 400 instanceDID, 401 401 instanceDomain, 402 402 provisioner, // ✅ Real provisioner for creating communities on PDS 403 + nil, // No PDS factory needed - no subscribe/block in this test 403 404 ) 404 405 405 406 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, pdsURL) // nil aggregatorService, blobService, unfurlService, blueskyService for user-only tests
+6 -3
tests/integration/post_handler_test.go
··· 32 32 33 33 // Setup services 34 34 communityRepo := postgres.NewCommunityRepository(db) 35 - communityService := communities.NewCommunityService( 35 + communityService := communities.NewCommunityServiceWithPDSFactory( 36 36 communityRepo, 37 37 "http://localhost:3001", 38 38 "did:web:test.coves.social", 39 39 "test.coves.social", 40 + nil, 40 41 nil, 41 42 ) 42 43 ··· 400 401 401 402 // Setup services 402 403 communityRepo := postgres.NewCommunityRepository(db) 403 - communityService := communities.NewCommunityService( 404 + communityService := communities.NewCommunityServiceWithPDSFactory( 404 405 communityRepo, 405 406 "http://localhost:3001", 406 407 "did:web:test.coves.social", 407 408 "test.coves.social", 409 + nil, 408 410 nil, 409 411 ) 410 412 ··· 484 486 485 487 // Setup services 486 488 communityRepo := postgres.NewCommunityRepository(db) 487 - communityService := communities.NewCommunityService( 489 + communityService := communities.NewCommunityServiceWithPDSFactory( 488 490 communityRepo, 489 491 "http://localhost:3001", 490 492 "did:web:test.coves.social", 491 493 "test.coves.social", 494 + nil, 492 495 nil, 493 496 ) 494 497
+2 -1
tests/integration/post_thumb_validation_test.go
··· 55 55 56 56 // Setup services 57 57 communityRepo := postgres.NewCommunityRepository(db) 58 - communityService := communities.NewCommunityService( 58 + communityService := communities.NewCommunityServiceWithPDSFactory( 59 59 communityRepo, 60 60 "http://localhost:3001", 61 61 "did:web:test.coves.social", 62 62 "test.coves.social", 63 + nil, 63 64 nil, 64 65 ) 65 66
+8 -4
tests/integration/post_unfurl_test.go
··· 51 51 unfurl.WithCacheTTL(24*time.Hour), 52 52 ) 53 53 54 - communityService := communities.NewCommunityService( 54 + communityService := communities.NewCommunityServiceWithPDSFactory( 55 55 communityRepo, 56 56 "http://localhost:3001", 57 57 "did:web:test.coves.social", 58 58 "test.coves.social", 59 + nil, 59 60 nil, 60 61 ) 61 62 ··· 348 349 identityResolver := identity.NewResolver(db, identityConfig) 349 350 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001") 350 351 351 - communityService := communities.NewCommunityService( 352 + communityService := communities.NewCommunityServiceWithPDSFactory( 352 353 communityRepo, 353 354 "http://localhost:3001", 354 355 "did:web:test.coves.social", 355 356 "test.coves.social", 357 + nil, 356 358 nil, 357 359 ) 358 360 ··· 456 458 unfurl.WithCacheTTL(24*time.Hour), 457 459 ) 458 460 459 - communityService := communities.NewCommunityService( 461 + communityService := communities.NewCommunityServiceWithPDSFactory( 460 462 communityRepo, 461 463 "http://localhost:3001", 462 464 "did:web:test.coves.social", 463 465 "test.coves.social", 464 466 nil, 467 + nil, 465 468 ) 466 469 467 470 postService := posts.NewPostService( ··· 568 571 unfurl.WithTimeout(30*time.Second), 569 572 ) 570 573 571 - communityService := communities.NewCommunityService( 574 + communityService := communities.NewCommunityServiceWithPDSFactory( 572 575 communityRepo, 573 576 "http://localhost:3001", 574 577 "did:web:test.coves.social", 575 578 "test.coves.social", 579 + nil, 576 580 nil, 577 581 ) 578 582
+1 -1
tests/integration/user_journey_e2e_test.go
··· 128 128 } 129 129 130 130 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 131 - communityService := communities.NewCommunityService(communityRepo, pdsURL, instanceDID, instanceDomain, provisioner) 131 + communityService := communities.NewCommunityServiceWithPDSFactory(communityRepo, pdsURL, instanceDID, instanceDomain, provisioner, CommunityPasswordAuthPDSClientFactory()) 132 132 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, pdsURL) 133 133 timelineService := timelineCore.NewTimelineService(timelineRepo) 134 134