A community based topic aggregation platform built on atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(votes): add in-memory vote cache for viewer state in feeds

Add vote caching to solve eventual consistency issues when displaying
user vote state in community feeds and timeline. The cache is populated
from the user's PDS (source of truth) on first authenticated request,
avoiding stale data from the AppView database.

Changes:
- Add VoteCache with TTL-based expiration and incremental updates
- Integrate cache into feed and timeline handlers for viewer vote state
- Add EnsureCachePopulated and GetViewerVotesForSubjects to vote service
- Add reindex-votes CLI tool for rebuilding vote counts from PDS
- Update CLAUDE.md to PR reviewer persona
- Fix E2E tests to properly simulate Jetstream events

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+787 -127
+92 -99
CLAUDE.md
··· 1 1 2 - Project: Coves Builder You are a distinguished developer actively building Coves, a forum-like atProto social media platform. Your goal is to ship working features quickly while maintaining quality and security. 2 + Project: Coves PR Reviewer 3 + You are a distinguished senior architect conducting a thorough code review for Coves, a forum-like atProto social media platform. 3 4 4 - ## Builder Mindset 5 + ## Review Mindset 6 + - Be constructive but thorough - catch issues before they reach production 7 + - Question assumptions and look for edge cases 8 + - Prioritize security, performance, and maintainability concerns 9 + - Suggest alternatives when identifying problems 10 + - Ensure there is proper test coverage that adequately tests atproto write forward architecture 5 11 6 - - Ship working code today, refactor tomorrow 7 - - Security is built-in, not bolted-on 8 - - Test-driven: write the test, then make it pass 9 - - ASK QUESTIONS if you need context surrounding the product DONT ASSUME 10 12 11 - ## No Stubs, No Shortcuts 12 - - **NEVER** use `unimplemented!()`, `todo!()`, or stub implementations 13 - - **NEVER** leave placeholder code or incomplete implementations 14 - - **NEVER** skip functionality because it seems complex 15 - - Every function must be fully implemented and working 16 - - Every feature must be complete before moving on 17 - - E2E tests must test REAL infrastructure - not mocks 13 + ## Special Attention Areas for Coves 14 + - **atProto architecture**: - Ensure architecture follows atProto recommendations with WRITE FORWARD ARCHITECTURE (Appview -> PDS -> Relay -> Appview -> App DB (if necessary)) 15 + - Ensure HTTP Endpoints match the Lexicon data contract 16 + - **Federation**: Check for proper DID resolution and identity verification 18 17 19 - ## Issue Tracking 18 + ## Review Checklist 20 19 21 - **This project uses [bd (beads)](https://github.com/steveyegge/beads) for ALL issue tracking.** 20 + ### 1. Architecture Compliance 21 + **MUST VERIFY:** 22 + - [ ] NO SQL queries in handlers (automatic rejection if found) 23 + - [ ] Proper layer separation: Handler → Service → Repository → Database 24 + - [ ] Services use repository interfaces, not concrete implementations 25 + - [ ] Dependencies injected via constructors, not globals 26 + - [ ] No database packages imported in handlers 22 27 23 - - Use `bd` commands, NOT markdown TODOs or task lists 24 - - Check `bd ready` for unblocked work 25 - - Always commit `.beads/issues.jsonl` with code changes 26 - - See [AGENTS.md](AGENTS.md) for full workflow details 28 + ### 2. Security Review 29 + **CHECK FOR:** 30 + - SQL injection vulnerabilities (even with prepared statements, verify) 31 + - Proper input validation and sanitization 32 + - Authentication/authorization checks on all protected endpoints 33 + - No sensitive data in logs or error messages 34 + - Rate limiting on public endpoints 35 + - CSRF protection where applicable 36 + - Proper atProto identity verification 27 37 28 - Quick commands: 29 - - `bd ready --json` - Show ready work 30 - - `bd create "Title" -t bug|feature|task -p 0-4 --json` - Create issue 31 - - `bd update <id> --status in_progress --json` - Claim work 32 - - `bd close <id> --reason "Done" --json` - Complete work 33 - ## Break Down Complex Tasks 34 - - Large files or complex features should be broken into manageable chunks 35 - - If a file is too large, discuss breaking it into smaller modules 36 - - If a task seems overwhelming, ask the user how to break it down 37 - - Work incrementally, but each increment must be complete and functional 38 + ### 3. Error Handling Audit 39 + **VERIFY:** 40 + - All errors are handled, not ignored 41 + - Error wrapping provides context: `fmt.Errorf("service: %w", err)` 42 + - Domain errors defined in core/errors/ 43 + - HTTP status codes correctly map to error types 44 + - No internal error details exposed to API consumers 45 + - Nil pointer checks before dereferencing 38 46 39 - #### Human & LLM Readability Guidelines: 40 - - Descriptive Naming: Use full words over abbreviations (e.g., CommunityGovernance not CommGov) 47 + ### 4. Performance Considerations 48 + **LOOK FOR:** 49 + - N+1 query problems 50 + - Missing database indexes for frequently queried fields 51 + - Unnecessary database round trips 52 + - Large unbounded queries without pagination 53 + - Memory leaks in goroutines 54 + - Proper connection pool usage 55 + - Efficient atProto federation calls 41 56 42 - ## atProto Essentials for Coves 57 + ### 5. Testing Coverage 58 + **REQUIRE:** 59 + - Unit tests for all new service methods 60 + - Integration tests for new API endpoints 61 + - Edge case coverage (empty inputs, max values, special characters) 62 + - Error path testing 63 + - Mock verification in unit tests 64 + - No flaky tests (check for time dependencies, random values) 43 65 44 - ### Architecture 66 + ### 6. Code Quality 67 + **ASSESS:** 68 + - Naming follows conventions (full words, not abbreviations) 69 + - Functions do one thing well 70 + - No code duplication (DRY principle) 71 + - Consistent error handling patterns 72 + - Proper use of Go idioms 73 + - No commented-out code 45 74 46 - - **PDS is Self-Contained**: Uses internal SQLite + CAR files (in Docker volume) 47 - - **PostgreSQL for AppView Only**: One database for Coves AppView indexing 48 - - **Don't Touch PDS Internals**: PDS manages its own storage, we just read from firehose 49 - - **Data Flow**: Client → PDS → Firehose → AppView → PostgreSQL 75 + ### 7. Breaking Changes 76 + **IDENTIFY:** 77 + - API contract changes 78 + - Database schema modifications affecting existing data 79 + - Changes to core interfaces 80 + - Modified error codes or response formats 50 81 51 - ### Always Consider: 82 + ### 8. Documentation 83 + **ENSURE:** 84 + - API endpoints have example requests/responses 85 + - Complex business logic is explained 86 + - Database migrations include rollback scripts 87 + - README updated if setup process changes 88 + - Swagger/OpenAPI specs updated if applicable 52 89 53 - - [ ]  **Identity**: Every action needs DID verification 54 - - [ ]  **Record Types**: Define custom lexicons (e.g., `social.coves.post`, `social.coves.community`) 55 - - [ ]  **Is it federated-friendly?** (Can other PDSs interact with it?) 56 - - [ ]  **Does the Lexicon make sense?** (Would it work for other forums?) 57 - - [ ]  **AppView only indexes**: We don't write to CAR files, only read from firehose 90 + ## Review Process 58 91 59 - ## Security-First Building 92 + 1. **First Pass - Automatic Rejections** 93 + - SQL in handlers 94 + - Missing tests 95 + - Security vulnerabilities 96 + - Broken layer separation 60 97 61 - ### Every Feature MUST: 98 + 2. **Second Pass - Deep Dive** 99 + - Business logic correctness 100 + - Edge case handling 101 + - Performance implications 102 + - Code maintainability 62 103 63 - - [ ]  **Validate all inputs** at the handler level 64 - - [ ]  **Use parameterized queries** (never string concatenation) 65 - - [ ]  **Check authorization** before any operation 66 - - [ ]  **Limit resource access** (pagination, rate limits) 67 - - [ ]  **Log security events** (failed auth, invalid inputs) 68 - - [ ]  **Never log sensitive data** (passwords, tokens, PII) 104 + 3. **Third Pass - Suggestions** 105 + - Better patterns or approaches 106 + - Refactoring opportunities 107 + - Future considerations 69 108 70 - ### Red Flags to Avoid: 109 + Then provide detailed feedback organized by: 1. 🚨 **Critical Issues** (must fix) 2. ⚠️ **Important Issues** (should fix) 3. 💡 **Suggestions** (consider for improvement) 4. ✅ **Good Practices Observed** (reinforce positive patterns) 71 110 72 - - `fmt.Sprintf` in SQL queries → Use parameterized queries 73 - - Missing `context.Context` → Need it for timeouts/cancellation 74 - - No input validation → Add it immediately 75 - - Error messages with internal details → Wrap errors properly 76 - - Unbounded queries → Add limits/pagination 77 111 78 - ### "How should I structure this?" 79 - 80 - 1. One domain, one package 81 - 2. Interfaces for testability 82 - 3. Services coordinate repos 83 - 4. Handlers only handle XRPC 84 - 85 - ## Comprehensive Testing 86 - - Write comprehensive unit tests for every module 87 - - Aim for high test coverage (all major code paths) 88 - - Test edge cases, error conditions, and boundary values 89 - - Include doc tests for public APIs 90 - - All tests must pass before considering a file "complete" 91 - - Test both success and failure cases 92 - ## Pre-Production Advantages 93 - 94 - Since we're pre-production: 95 - 96 - - **Break things**: Delete and rebuild rather than complex migrations 97 - - **Experiment**: Try approaches, keep what works 98 - - **Simplify**: Remove unused code aggressively 99 - - **But never compromise security basics** 100 - 101 - ## Success Metrics 102 - 103 - Your code is ready when: 104 - 105 - - [ ]  Tests pass (including security tests) 106 - - [ ]  Follows atProto patterns 107 - - [ ]  Handles errors gracefully 108 - - [ ]  Works end-to-end with auth 109 - 110 - ## Quick Checks Before Committing 111 - 112 - 1. **Will it work?** (Integration test proves it) 113 - 2. **Is it secure?** (Auth, validation, parameterized queries) 114 - 3. **Is it simple?** (Could you explain to a junior?) 115 - 4. **Is it complete?** (Test, implementation, documentation) 116 - 117 - Remember: We're building a working product. Perfect is the enemy of shipped, but the ultimate goal is **production-quality GO code, not a prototype.** 118 - 119 - Every line of code should be something you'd be proud to ship in a production system. Quality over speed. Completeness over convenience. 112 + Remember: The goal is to ship quality code quickly. Perfection is not required, but safety and maintainability are non-negotiable.
+267
cmd/reindex-votes/main.go
··· 1 + // cmd/reindex-votes/main.go 2 + // Quick tool to reindex votes from PDS to AppView database 3 + package main 4 + 5 + import ( 6 + "context" 7 + "database/sql" 8 + "encoding/json" 9 + "fmt" 10 + "log" 11 + "net/http" 12 + "net/url" 13 + "os" 14 + "strings" 15 + "time" 16 + 17 + _ "github.com/lib/pq" 18 + ) 19 + 20 + type ListRecordsResponse struct { 21 + Records []Record `json:"records"` 22 + Cursor string `json:"cursor"` 23 + } 24 + 25 + type Record struct { 26 + URI string `json:"uri"` 27 + CID string `json:"cid"` 28 + Value map[string]interface{} `json:"value"` 29 + } 30 + 31 + func main() { 32 + // Get config from env 33 + dbURL := os.Getenv("DATABASE_URL") 34 + if dbURL == "" { 35 + dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 36 + } 37 + pdsURL := os.Getenv("PDS_URL") 38 + if pdsURL == "" { 39 + pdsURL = "http://localhost:3001" 40 + } 41 + 42 + log.Printf("Connecting to database...") 43 + db, err := sql.Open("postgres", dbURL) 44 + if err != nil { 45 + log.Fatalf("Failed to connect to database: %v", err) 46 + } 47 + defer db.Close() 48 + 49 + ctx := context.Background() 50 + 51 + // Get all accounts directly from the PDS 52 + log.Printf("Fetching accounts from PDS (%s)...", pdsURL) 53 + dids, err := fetchAllAccountsFromPDS(pdsURL) 54 + if err != nil { 55 + log.Fatalf("Failed to fetch accounts from PDS: %v", err) 56 + } 57 + log.Printf("Found %d accounts on PDS to check for votes", len(dids)) 58 + 59 + // Reset vote counts first 60 + log.Printf("Resetting all vote counts...") 61 + if _, err := db.ExecContext(ctx, "DELETE FROM votes"); err != nil { 62 + log.Fatalf("Failed to clear votes table: %v", err) 63 + } 64 + if _, err := db.ExecContext(ctx, "UPDATE posts SET upvote_count = 0, downvote_count = 0, score = 0"); err != nil { 65 + log.Fatalf("Failed to reset post vote counts: %v", err) 66 + } 67 + if _, err := db.ExecContext(ctx, "UPDATE comments SET upvote_count = 0, downvote_count = 0, score = 0"); err != nil { 68 + log.Fatalf("Failed to reset comment vote counts: %v", err) 69 + } 70 + 71 + // For each user, fetch their votes from PDS 72 + totalVotes := 0 73 + for _, did := range dids { 74 + votes, err := fetchVotesFromPDS(pdsURL, did) 75 + if err != nil { 76 + log.Printf("Warning: failed to fetch votes for %s: %v", did, err) 77 + continue 78 + } 79 + 80 + if len(votes) == 0 { 81 + continue 82 + } 83 + 84 + log.Printf("Found %d votes for %s", len(votes), did) 85 + 86 + // Index each vote 87 + for _, vote := range votes { 88 + if err := indexVote(ctx, db, did, vote); err != nil { 89 + log.Printf("Warning: failed to index vote %s: %v", vote.URI, err) 90 + continue 91 + } 92 + totalVotes++ 93 + } 94 + } 95 + 96 + log.Printf("✓ Reindexed %d votes from PDS", totalVotes) 97 + } 98 + 99 + // fetchAllAccountsFromPDS queries the PDS sync API to get all repo DIDs 100 + func fetchAllAccountsFromPDS(pdsURL string) ([]string, error) { 101 + // Use com.atproto.sync.listRepos to get all repos on this PDS 102 + var allDIDs []string 103 + cursor := "" 104 + 105 + for { 106 + reqURL := fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=100", pdsURL) 107 + if cursor != "" { 108 + reqURL += "&cursor=" + url.QueryEscape(cursor) 109 + } 110 + 111 + resp, err := http.Get(reqURL) 112 + if err != nil { 113 + return nil, fmt.Errorf("HTTP request failed: %w", err) 114 + } 115 + defer resp.Body.Close() 116 + 117 + if resp.StatusCode != 200 { 118 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 119 + } 120 + 121 + var result struct { 122 + Repos []struct { 123 + DID string `json:"did"` 124 + } `json:"repos"` 125 + Cursor string `json:"cursor"` 126 + } 127 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 128 + return nil, fmt.Errorf("failed to decode response: %w", err) 129 + } 130 + 131 + for _, repo := range result.Repos { 132 + allDIDs = append(allDIDs, repo.DID) 133 + } 134 + 135 + if result.Cursor == "" { 136 + break 137 + } 138 + cursor = result.Cursor 139 + } 140 + 141 + return allDIDs, nil 142 + } 143 + 144 + func fetchVotesFromPDS(pdsURL, did string) ([]Record, error) { 145 + var allRecords []Record 146 + cursor := "" 147 + collection := "social.coves.feed.vote" 148 + 149 + for { 150 + reqURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=100", 151 + pdsURL, url.QueryEscape(did), url.QueryEscape(collection)) 152 + if cursor != "" { 153 + reqURL += "&cursor=" + url.QueryEscape(cursor) 154 + } 155 + 156 + resp, err := http.Get(reqURL) 157 + if err != nil { 158 + return nil, fmt.Errorf("HTTP request failed: %w", err) 159 + } 160 + defer resp.Body.Close() 161 + 162 + if resp.StatusCode == 400 { 163 + // User doesn't exist on this PDS or has no records - that's OK 164 + return nil, nil 165 + } 166 + if resp.StatusCode != 200 { 167 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 168 + } 169 + 170 + var result ListRecordsResponse 171 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 172 + return nil, fmt.Errorf("failed to decode response: %w", err) 173 + } 174 + 175 + allRecords = append(allRecords, result.Records...) 176 + 177 + if result.Cursor == "" { 178 + break 179 + } 180 + cursor = result.Cursor 181 + } 182 + 183 + return allRecords, nil 184 + } 185 + 186 + func indexVote(ctx context.Context, db *sql.DB, voterDID string, record Record) error { 187 + // Extract vote data from record 188 + subject, ok := record.Value["subject"].(map[string]interface{}) 189 + if !ok { 190 + return fmt.Errorf("missing subject") 191 + } 192 + subjectURI, _ := subject["uri"].(string) 193 + subjectCID, _ := subject["cid"].(string) 194 + direction, _ := record.Value["direction"].(string) 195 + createdAtStr, _ := record.Value["createdAt"].(string) 196 + 197 + if subjectURI == "" || direction == "" { 198 + return fmt.Errorf("invalid vote record: missing required fields") 199 + } 200 + 201 + // Parse created_at 202 + createdAt, err := time.Parse(time.RFC3339, createdAtStr) 203 + if err != nil { 204 + createdAt = time.Now() 205 + } 206 + 207 + // Extract rkey from URI (at://did/collection/rkey) 208 + parts := strings.Split(record.URI, "/") 209 + if len(parts) < 5 { 210 + return fmt.Errorf("invalid URI format: %s", record.URI) 211 + } 212 + rkey := parts[len(parts)-1] 213 + 214 + // Start transaction 215 + tx, err := db.BeginTx(ctx, nil) 216 + if err != nil { 217 + return fmt.Errorf("failed to begin transaction: %w", err) 218 + } 219 + defer tx.Rollback() 220 + 221 + // Insert vote 222 + _, err = tx.ExecContext(ctx, ` 223 + INSERT INTO votes (uri, cid, rkey, voter_did, subject_uri, subject_cid, direction, created_at, indexed_at) 224 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW()) 225 + ON CONFLICT (uri) DO NOTHING 226 + `, record.URI, record.CID, rkey, voterDID, subjectURI, subjectCID, direction, createdAt) 227 + if err != nil { 228 + return fmt.Errorf("failed to insert vote: %w", err) 229 + } 230 + 231 + // Update post/comment counts 232 + collection := extractCollectionFromURI(subjectURI) 233 + var updateQuery string 234 + 235 + switch collection { 236 + case "social.coves.community.post": 237 + if direction == "up" { 238 + updateQuery = `UPDATE posts SET upvote_count = upvote_count + 1, score = upvote_count + 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL` 239 + } else { 240 + updateQuery = `UPDATE posts SET downvote_count = downvote_count + 1, score = upvote_count - (downvote_count + 1) WHERE uri = $1 AND deleted_at IS NULL` 241 + } 242 + case "social.coves.community.comment": 243 + if direction == "up" { 244 + updateQuery = `UPDATE comments SET upvote_count = upvote_count + 1, score = upvote_count + 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL` 245 + } else { 246 + updateQuery = `UPDATE comments SET downvote_count = downvote_count + 1, score = upvote_count - (downvote_count + 1) WHERE uri = $1 AND deleted_at IS NULL` 247 + } 248 + default: 249 + // Unknown collection, just index the vote 250 + return tx.Commit() 251 + } 252 + 253 + if _, err := tx.ExecContext(ctx, updateQuery, subjectURI); err != nil { 254 + return fmt.Errorf("failed to update vote counts: %w", err) 255 + } 256 + 257 + return tx.Commit() 258 + } 259 + 260 + func extractCollectionFromURI(uri string) string { 261 + // at://did:plc:xxx/social.coves.community.post/rkey 262 + parts := strings.Split(uri, "/") 263 + if len(parts) >= 4 { 264 + return parts[3] 265 + } 266 + return "" 267 + }
+11 -6
cmd/server/main.go
··· 396 396 commentRepo := postgresRepo.NewCommentRepository(db) 397 397 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 398 398 399 + // Initialize vote cache (stores user votes from PDS to avoid eventual consistency issues) 400 + // TTL of 10 minutes - cache is also updated on vote create/delete 401 + voteCache := votes.NewVoteCache(10*time.Minute, nil) 402 + log.Println("✅ Vote cache initialized (10 minute TTL)") 403 + 399 404 // Initialize vote service (for XRPC API endpoints) 400 405 // Note: We don't validate subject existence - the vote goes to the user's PDS regardless. 401 406 // The Jetstream consumer handles orphaned votes correctly by only updating counts for 402 407 // non-deleted subjects. This avoids race conditions and eventual consistency issues. 403 - voteService := votes.NewService(voteRepo, oauthClient, oauthStore, nil) 404 - log.Println("✅ Vote service initialized (with OAuth authentication)") 408 + voteService := votes.NewService(voteRepo, oauthClient, oauthStore, voteCache, nil) 409 + log.Println("✅ Vote service initialized (with OAuth authentication and vote cache)") 405 410 406 411 // Initialize comment service (for query API) 407 412 // Requires user and community repos for proper author/community hydration per lexicon ··· 524 529 routes.RegisterVoteRoutes(r, voteService, authMiddleware) 525 530 log.Println("Vote XRPC endpoints registered with OAuth authentication") 526 531 527 - routes.RegisterCommunityFeedRoutes(r, feedService) 528 - log.Println("Feed XRPC endpoints registered (public, no auth required)") 532 + routes.RegisterCommunityFeedRoutes(r, feedService, voteService, authMiddleware) 533 + log.Println("Feed XRPC endpoints registered (public with optional auth for viewer vote state)") 529 534 530 - routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 531 - log.Println("Timeline XRPC endpoints registered (requires authentication)") 535 + routes.RegisterTimelineRoutes(r, timelineService, voteService, authMiddleware) 536 + log.Println("Timeline XRPC endpoints registered (requires authentication, includes viewer vote state)") 532 537 533 538 routes.RegisterDiscoverRoutes(r, discoverService) 534 539 log.Println("Discover XRPC endpoints registered (public, no auth required)")
+43 -7
internal/api/handlers/communityFeed/get_community.go
··· 1 1 package communityFeed 2 2 3 3 import ( 4 + "Coves/internal/api/middleware" 4 5 "Coves/internal/core/communityFeeds" 5 6 "Coves/internal/core/posts" 7 + "Coves/internal/core/votes" 6 8 "encoding/json" 7 9 "log" 8 10 "net/http" ··· 11 13 12 14 // GetCommunityHandler handles community feed retrieval 13 15 type GetCommunityHandler struct { 14 - service communityFeeds.Service 16 + service communityFeeds.Service 17 + voteService votes.Service 15 18 } 16 19 17 20 // NewGetCommunityHandler creates a new community feed handler 18 - func NewGetCommunityHandler(service communityFeeds.Service) *GetCommunityHandler { 21 + func NewGetCommunityHandler(service communityFeeds.Service, voteService votes.Service) *GetCommunityHandler { 19 22 return &GetCommunityHandler{ 20 - service: service, 23 + service: service, 24 + voteService: voteService, 21 25 } 22 26 } 23 27 ··· 36 40 return 37 41 } 38 42 39 - // Alpha: No viewer context needed for basic community sorting 40 - // TODO(feed-generator): Extract viewer DID when implementing viewer-specific state 41 - // (blocks, upvotes, saves) in feed generator skeleton 42 - 43 43 // Get community feed 44 44 response, err := h.service.GetCommunityFeed(r.Context(), req) 45 45 if err != nil { 46 46 handleServiceError(w, err) 47 47 return 48 + } 49 + 50 + // Populate viewer vote state if authenticated and vote service available 51 + if h.voteService != nil { 52 + session := middleware.GetOAuthSession(r) 53 + if session != nil { 54 + userDID := middleware.GetUserDID(r) 55 + // Ensure vote cache is populated from PDS 56 + if err := h.voteService.EnsureCachePopulated(r.Context(), session); err != nil { 57 + // Log but don't fail - viewer state is optional 58 + log.Printf("Warning: failed to populate vote cache: %v", err) 59 + } else { 60 + // Collect post URIs to batch lookup 61 + postURIs := make([]string, 0, len(response.Feed)) 62 + for _, feedPost := range response.Feed { 63 + if feedPost.Post != nil { 64 + postURIs = append(postURIs, feedPost.Post.URI) 65 + } 66 + } 67 + 68 + // Get viewer votes for all posts 69 + viewerVotes := h.voteService.GetViewerVotesForSubjects(userDID, postURIs) 70 + 71 + // Populate viewer state on each post 72 + for _, feedPost := range response.Feed { 73 + if feedPost.Post != nil { 74 + if vote, exists := viewerVotes[feedPost.Post.URI]; exists { 75 + feedPost.Post.Viewer = &posts.ViewerState{ 76 + Vote: &vote.Direction, 77 + VoteURI: &vote.URI, 78 + } 79 + } 80 + } 81 + } 82 + } 83 + } 48 84 } 49 85 50 86 // Transform blob refs to URLs for all posts
+41 -3
internal/api/handlers/timeline/get_timeline.go
··· 4 4 "Coves/internal/api/middleware" 5 5 "Coves/internal/core/posts" 6 6 "Coves/internal/core/timeline" 7 + "Coves/internal/core/votes" 7 8 "encoding/json" 8 9 "log" 9 10 "net/http" ··· 13 14 14 15 // GetTimelineHandler handles timeline feed retrieval 15 16 type GetTimelineHandler struct { 16 - service timeline.Service 17 + service timeline.Service 18 + voteService votes.Service 17 19 } 18 20 19 21 // NewGetTimelineHandler creates a new timeline handler 20 - func NewGetTimelineHandler(service timeline.Service) *GetTimelineHandler { 22 + func NewGetTimelineHandler(service timeline.Service, voteService votes.Service) *GetTimelineHandler { 21 23 return &GetTimelineHandler{ 22 - service: service, 24 + service: service, 25 + voteService: voteService, 23 26 } 24 27 } 25 28 ··· 51 54 if err != nil { 52 55 handleServiceError(w, err) 53 56 return 57 + } 58 + 59 + // Populate viewer vote state if authenticated and vote service available 60 + if h.voteService != nil { 61 + session := middleware.GetOAuthSession(r) 62 + if session != nil { 63 + // Ensure vote cache is populated from PDS 64 + if err := h.voteService.EnsureCachePopulated(r.Context(), session); err != nil { 65 + // Log but don't fail - viewer state is optional 66 + log.Printf("Warning: failed to populate vote cache: %v", err) 67 + } else { 68 + // Collect post URIs to batch lookup 69 + postURIs := make([]string, 0, len(response.Feed)) 70 + for _, feedPost := range response.Feed { 71 + if feedPost.Post != nil { 72 + postURIs = append(postURIs, feedPost.Post.URI) 73 + } 74 + } 75 + 76 + // Get viewer votes for all posts 77 + viewerVotes := h.voteService.GetViewerVotesForSubjects(userDID, postURIs) 78 + 79 + // Populate viewer state on each post 80 + for _, feedPost := range response.Feed { 81 + if feedPost.Post != nil { 82 + if vote, exists := viewerVotes[feedPost.Post.URI]; exists { 83 + feedPost.Post.Viewer = &posts.ViewerState{ 84 + Vote: &vote.Direction, 85 + VoteURI: &vote.URI, 86 + } 87 + } 88 + } 89 + } 90 + } 91 + } 54 92 } 55 93 56 94 // Transform blob refs to URLs for all posts
+7 -5
internal/api/routes/communityFeed.go
··· 2 2 3 3 import ( 4 4 "Coves/internal/api/handlers/communityFeed" 5 + "Coves/internal/api/middleware" 5 6 "Coves/internal/core/communityFeeds" 7 + "Coves/internal/core/votes" 6 8 7 9 "github.com/go-chi/chi/v5" 8 10 ) ··· 11 13 func RegisterCommunityFeedRoutes( 12 14 r chi.Router, 13 15 feedService communityFeeds.Service, 16 + voteService votes.Service, 17 + authMiddleware *middleware.OAuthAuthMiddleware, 14 18 ) { 15 19 // Create handlers 16 - getCommunityHandler := communityFeed.NewGetCommunityHandler(feedService) 20 + getCommunityHandler := communityFeed.NewGetCommunityHandler(feedService, voteService) 17 21 18 22 // GET /xrpc/social.coves.communityFeed.getCommunity 19 - // Public endpoint - basic community sorting only for Alpha 20 - // TODO(feed-generator): Add OptionalAuth middleware when implementing viewer-specific state 21 - // (blocks, upvotes, saves, etc.) in feed generator skeleton 22 - r.Get("/xrpc/social.coves.communityFeed.getCommunity", getCommunityHandler.HandleGetCommunity) 23 + // Public endpoint with optional auth for viewer-specific state (vote state) 24 + r.With(authMiddleware.OptionalAuth).Get("/xrpc/social.coves.communityFeed.getCommunity", getCommunityHandler.HandleGetCommunity) 23 25 }
+3 -1
internal/api/routes/timeline.go
··· 4 4 "Coves/internal/api/handlers/timeline" 5 5 "Coves/internal/api/middleware" 6 6 timelineCore "Coves/internal/core/timeline" 7 + "Coves/internal/core/votes" 7 8 8 9 "github.com/go-chi/chi/v5" 9 10 ) ··· 12 13 func RegisterTimelineRoutes( 13 14 r chi.Router, 14 15 timelineService timelineCore.Service, 16 + voteService votes.Service, 15 17 authMiddleware *middleware.OAuthAuthMiddleware, 16 18 ) { 17 19 // Create handlers 18 - getTimelineHandler := timeline.NewGetTimelineHandler(timelineService) 20 + getTimelineHandler := timeline.NewGetTimelineHandler(timelineService, voteService) 19 21 20 22 // GET /xrpc/social.coves.feed.getTimeline 21 23 // Requires authentication - user must be logged in to see their timeline
+221
internal/core/votes/cache.go
··· 1 + package votes 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "strings" 8 + "sync" 9 + "time" 10 + 11 + "Coves/internal/atproto/pds" 12 + ) 13 + 14 + // CachedVote represents a vote stored in the cache 15 + type CachedVote struct { 16 + Direction string // "up" or "down" 17 + URI string // vote record URI (at://did/collection/rkey) 18 + RKey string // record key 19 + } 20 + 21 + // VoteCache provides an in-memory cache of user votes fetched from their PDS. 22 + // This avoids eventual consistency issues with the AppView database. 23 + type VoteCache struct { 24 + mu sync.RWMutex 25 + votes map[string]map[string]*CachedVote // userDID -> subjectURI -> vote 26 + expiry map[string]time.Time // userDID -> expiry time 27 + ttl time.Duration 28 + logger *slog.Logger 29 + } 30 + 31 + // NewVoteCache creates a new vote cache with the specified TTL 32 + func NewVoteCache(ttl time.Duration, logger *slog.Logger) *VoteCache { 33 + if logger == nil { 34 + logger = slog.Default() 35 + } 36 + return &VoteCache{ 37 + votes: make(map[string]map[string]*CachedVote), 38 + expiry: make(map[string]time.Time), 39 + ttl: ttl, 40 + logger: logger, 41 + } 42 + } 43 + 44 + // GetVotesForUser returns all cached votes for a user. 45 + // Returns nil if cache is empty or expired for this user. 46 + func (c *VoteCache) GetVotesForUser(userDID string) map[string]*CachedVote { 47 + c.mu.RLock() 48 + defer c.mu.RUnlock() 49 + 50 + // Check if cache exists and is not expired 51 + expiry, exists := c.expiry[userDID] 52 + if !exists || time.Now().After(expiry) { 53 + return nil 54 + } 55 + 56 + return c.votes[userDID] 57 + } 58 + 59 + // GetVote returns the cached vote for a specific subject, or nil if not found/expired 60 + func (c *VoteCache) GetVote(userDID, subjectURI string) *CachedVote { 61 + votes := c.GetVotesForUser(userDID) 62 + if votes == nil { 63 + return nil 64 + } 65 + return votes[subjectURI] 66 + } 67 + 68 + // IsCached returns true if the user's votes are cached and not expired 69 + func (c *VoteCache) IsCached(userDID string) bool { 70 + c.mu.RLock() 71 + defer c.mu.RUnlock() 72 + 73 + expiry, exists := c.expiry[userDID] 74 + return exists && time.Now().Before(expiry) 75 + } 76 + 77 + // SetVotesForUser replaces all cached votes for a user 78 + func (c *VoteCache) SetVotesForUser(userDID string, votes map[string]*CachedVote) { 79 + c.mu.Lock() 80 + defer c.mu.Unlock() 81 + 82 + c.votes[userDID] = votes 83 + c.expiry[userDID] = time.Now().Add(c.ttl) 84 + 85 + c.logger.Debug("vote cache updated", 86 + "user", userDID, 87 + "vote_count", len(votes), 88 + "expires_at", c.expiry[userDID]) 89 + } 90 + 91 + // SetVote adds or updates a single vote in the cache 92 + func (c *VoteCache) SetVote(userDID, subjectURI string, vote *CachedVote) { 93 + c.mu.Lock() 94 + defer c.mu.Unlock() 95 + 96 + if c.votes[userDID] == nil { 97 + c.votes[userDID] = make(map[string]*CachedVote) 98 + } 99 + 100 + c.votes[userDID][subjectURI] = vote 101 + 102 + // Always extend expiry on vote action - active users keep their cache fresh 103 + c.expiry[userDID] = time.Now().Add(c.ttl) 104 + 105 + c.logger.Debug("vote cached", 106 + "user", userDID, 107 + "subject", subjectURI, 108 + "direction", vote.Direction) 109 + } 110 + 111 + // RemoveVote removes a vote from the cache (for toggle-off) 112 + func (c *VoteCache) RemoveVote(userDID, subjectURI string) { 113 + c.mu.Lock() 114 + defer c.mu.Unlock() 115 + 116 + if c.votes[userDID] != nil { 117 + delete(c.votes[userDID], subjectURI) 118 + 119 + // Extend expiry on vote action - active users keep their cache fresh 120 + c.expiry[userDID] = time.Now().Add(c.ttl) 121 + 122 + c.logger.Debug("vote removed from cache", 123 + "user", userDID, 124 + "subject", subjectURI) 125 + } 126 + } 127 + 128 + // Invalidate removes all cached votes for a user 129 + func (c *VoteCache) Invalidate(userDID string) { 130 + c.mu.Lock() 131 + defer c.mu.Unlock() 132 + 133 + delete(c.votes, userDID) 134 + delete(c.expiry, userDID) 135 + 136 + c.logger.Debug("vote cache invalidated", "user", userDID) 137 + } 138 + 139 + // FetchAndCacheFromPDS fetches all votes from the user's PDS and caches them. 140 + // This should be called on first authenticated request or when cache is expired. 141 + func (c *VoteCache) FetchAndCacheFromPDS(ctx context.Context, pdsClient pds.Client) error { 142 + userDID := pdsClient.DID() 143 + 144 + c.logger.Debug("fetching votes from PDS", 145 + "user", userDID, 146 + "pds", pdsClient.HostURL()) 147 + 148 + votes, err := c.fetchAllVotesFromPDS(ctx, pdsClient) 149 + if err != nil { 150 + return fmt.Errorf("failed to fetch votes from PDS: %w", err) 151 + } 152 + 153 + c.SetVotesForUser(userDID, votes) 154 + 155 + c.logger.Info("vote cache populated from PDS", 156 + "user", userDID, 157 + "vote_count", len(votes)) 158 + 159 + return nil 160 + } 161 + 162 + // fetchAllVotesFromPDS paginates through all vote records on the user's PDS 163 + func (c *VoteCache) fetchAllVotesFromPDS(ctx context.Context, pdsClient pds.Client) (map[string]*CachedVote, error) { 164 + votes := make(map[string]*CachedVote) 165 + cursor := "" 166 + const pageSize = 100 167 + const collection = "social.coves.feed.vote" 168 + 169 + for { 170 + result, err := pdsClient.ListRecords(ctx, collection, pageSize, cursor) 171 + if err != nil { 172 + if pds.IsAuthError(err) { 173 + return nil, ErrNotAuthorized 174 + } 175 + return nil, fmt.Errorf("listRecords failed: %w", err) 176 + } 177 + 178 + for _, rec := range result.Records { 179 + // Extract subject from record value 180 + subject, ok := rec.Value["subject"].(map[string]any) 181 + if !ok { 182 + continue 183 + } 184 + 185 + subjectURI, ok := subject["uri"].(string) 186 + if !ok || subjectURI == "" { 187 + continue 188 + } 189 + 190 + direction, _ := rec.Value["direction"].(string) 191 + if direction == "" { 192 + continue 193 + } 194 + 195 + // Extract rkey from URI 196 + rkey := extractRKeyFromURI(rec.URI) 197 + 198 + votes[subjectURI] = &CachedVote{ 199 + Direction: direction, 200 + URI: rec.URI, 201 + RKey: rkey, 202 + } 203 + } 204 + 205 + if result.Cursor == "" { 206 + break 207 + } 208 + cursor = result.Cursor 209 + } 210 + 211 + return votes, nil 212 + } 213 + 214 + // extractRKeyFromURI extracts the rkey from an AT-URI (at://did/collection/rkey) 215 + func extractRKeyFromURI(uri string) string { 216 + parts := strings.Split(uri, "/") 217 + if len(parts) >= 5 { 218 + return parts[len(parts)-1] 219 + } 220 + return "" 221 + }
+14
internal/core/votes/service.go
··· 44 44 // - Deletes the user's vote record from their PDS 45 45 // - AppView will soft-delete via Jetstream consumer 46 46 DeleteVote(ctx context.Context, session *oauthlib.ClientSessionData, req DeleteVoteRequest) error 47 + 48 + // EnsureCachePopulated fetches the user's votes from their PDS if not already cached. 49 + // This should be called before rendering feeds to ensure vote state is available. 50 + // If cache is already populated and not expired, this is a no-op. 51 + EnsureCachePopulated(ctx context.Context, session *oauthlib.ClientSessionData) error 52 + 53 + // GetViewerVote returns the viewer's vote for a specific subject, or nil if not voted. 54 + // Returns from cache if available, otherwise returns nil (caller should ensure cache is populated). 55 + GetViewerVote(userDID, subjectURI string) *CachedVote 56 + 57 + // GetViewerVotesForSubjects returns the viewer's votes for multiple subjects. 58 + // Returns a map of subjectURI -> CachedVote for subjects the user has voted on. 59 + // This is efficient for batch lookups when rendering feeds. 60 + GetViewerVotesForSubjects(userDID string, subjectURIs []string) map[string]*CachedVote 47 61 } 48 62 49 63 // CreateVoteRequest contains the parameters for creating a vote
+84 -2
internal/core/votes/service_impl.go
··· 30 30 oauthStore oauth.ClientAuthStore 31 31 logger *slog.Logger 32 32 pdsClientFactory PDSClientFactory // Optional, for testing. If nil, uses OAuth. 33 + cache *VoteCache // In-memory cache of user votes from PDS 33 34 } 34 35 35 36 // NewService creates a new vote service instance 36 - func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service { 37 + func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, cache *VoteCache, logger *slog.Logger) Service { 37 38 if logger == nil { 38 39 logger = slog.Default() 39 40 } ··· 41 42 repo: repo, 42 43 oauthClient: oauthClient, 43 44 oauthStore: oauthStore, 45 + cache: cache, 44 46 logger: logger, 45 47 } 46 48 } 47 49 48 50 // NewServiceWithPDSFactory creates a vote service with a custom PDS client factory. 49 51 // This is primarily for testing with password-based authentication. 50 - func NewServiceWithPDSFactory(repo Repository, logger *slog.Logger, factory PDSClientFactory) Service { 52 + func NewServiceWithPDSFactory(repo Repository, cache *VoteCache, logger *slog.Logger, factory PDSClientFactory) Service { 51 53 if logger == nil { 52 54 logger = slog.Default() 53 55 } 54 56 return &voteService{ 55 57 repo: repo, 58 + cache: cache, 56 59 logger: logger, 57 60 pdsClientFactory: factory, 58 61 } ··· 150 153 "subject", req.Subject.URI, 151 154 "direction", req.Direction) 152 155 156 + // Update cache - remove the vote 157 + if s.cache != nil { 158 + s.cache.RemoveVote(session.AccountDID.String(), req.Subject.URI) 159 + } 160 + 153 161 // Return empty response to indicate deletion 154 162 return &CreateVoteResponse{ 155 163 URI: "", ··· 196 204 "direction", req.Direction, 197 205 "uri", uri, 198 206 "cid", cid) 207 + 208 + // Update cache - add the new vote 209 + if s.cache != nil { 210 + s.cache.SetVote(session.AccountDID.String(), req.Subject.URI, &CachedVote{ 211 + Direction: req.Direction, 212 + URI: uri, 213 + RKey: extractRKeyFromURI(uri), 214 + }) 215 + } 199 216 200 217 return &CreateVoteResponse{ 201 218 URI: uri, ··· 253 270 "subject", req.Subject.URI, 254 271 "uri", existing.URI) 255 272 273 + // Update cache - remove the vote 274 + if s.cache != nil { 275 + s.cache.RemoveVote(session.AccountDID.String(), req.Subject.URI) 276 + } 277 + 256 278 return nil 257 279 } 258 280 ··· 350 372 // No vote found for this subject after checking all pages 351 373 return nil, nil 352 374 } 375 + 376 + // EnsureCachePopulated fetches the user's votes from their PDS if not already cached. 377 + func (s *voteService) EnsureCachePopulated(ctx context.Context, session *oauth.ClientSessionData) error { 378 + if s.cache == nil { 379 + return nil // No cache configured 380 + } 381 + 382 + // Check if already cached 383 + if s.cache.IsCached(session.AccountDID.String()) { 384 + return nil 385 + } 386 + 387 + // Create PDS client for this session 388 + pdsClient, err := s.getPDSClient(ctx, session) 389 + if err != nil { 390 + s.logger.Error("failed to create PDS client for cache population", 391 + "error", err, 392 + "user", session.AccountDID) 393 + return fmt.Errorf("failed to create PDS client: %w", err) 394 + } 395 + 396 + // Fetch and cache votes from PDS 397 + if err := s.cache.FetchAndCacheFromPDS(ctx, pdsClient); err != nil { 398 + s.logger.Error("failed to populate vote cache from PDS", 399 + "error", err, 400 + "user", session.AccountDID) 401 + return fmt.Errorf("failed to populate vote cache: %w", err) 402 + } 403 + 404 + return nil 405 + } 406 + 407 + // GetViewerVote returns the viewer's vote for a specific subject, or nil if not voted. 408 + func (s *voteService) GetViewerVote(userDID, subjectURI string) *CachedVote { 409 + if s.cache == nil { 410 + return nil 411 + } 412 + return s.cache.GetVote(userDID, subjectURI) 413 + } 414 + 415 + // GetViewerVotesForSubjects returns the viewer's votes for multiple subjects. 416 + func (s *voteService) GetViewerVotesForSubjects(userDID string, subjectURIs []string) map[string]*CachedVote { 417 + result := make(map[string]*CachedVote) 418 + if s.cache == nil { 419 + return result 420 + } 421 + 422 + allVotes := s.cache.GetVotesForUser(userDID) 423 + if allVotes == nil { 424 + return result 425 + } 426 + 427 + for _, uri := range subjectURIs { 428 + if vote, exists := allVotes[uri]; exists { 429 + result[uri] = vote 430 + } 431 + } 432 + 433 + return result 434 + }
+4 -4
tests/integration/vote_e2e_test.go
··· 81 81 postRepo := postgres.NewPostRepository(db) 82 82 83 83 // Setup services with password-based PDS client factory for E2E testing 84 - voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, PasswordAuthPDSClientFactory()) 84 + voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 85 85 86 86 // Create test user on PDS 87 87 testUserHandle := fmt.Sprintf("voter-%d.local.coves.dev", time.Now().Unix()) ··· 304 304 voteRepo := postgres.NewVoteRepository(db) 305 305 postRepo := postgres.NewPostRepository(db) 306 306 307 - voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, PasswordAuthPDSClientFactory()) 307 + voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 308 308 309 309 // Create test user 310 310 testUserHandle := fmt.Sprintf("toggle-%d.local.coves.dev", time.Now().Unix()) ··· 473 473 voteRepo := postgres.NewVoteRepository(db) 474 474 postRepo := postgres.NewPostRepository(db) 475 475 476 - voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, PasswordAuthPDSClientFactory()) 476 + voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 477 477 478 478 // Create test user 479 479 testUserHandle := fmt.Sprintf("flip-%d.local.coves.dev", time.Now().Unix()) ··· 698 698 voteRepo := postgres.NewVoteRepository(db) 699 699 postRepo := postgres.NewPostRepository(db) 700 700 701 - voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, PasswordAuthPDSClientFactory()) 701 + voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory()) 702 702 703 703 // Create test user 704 704 testUserHandle := fmt.Sprintf("delete-%d.local.coves.dev", time.Now().Unix())