a love letter to tangled (android, iOS, and a search API)

feat: JetStream event caching and management

+699 -146
+3 -3
docs/roadmap.md
··· 23 23 - [x] Reuse the existing normalization and upsert path for on-demand indexing jobs 24 24 - [x] Trigger indexing jobs from repo, issue, PR, profile, and similar fetch handlers 25 25 - [x] Add dedupe, retries, and observability for indexing jobs 26 - - [ ] Add a JetStream cache consumer with a persisted timestamp cursor 27 - - [ ] Seed the JetStream cursor to `now - 24h` on first boot and rewind slightly on reconnect 28 - - [ ] Store and serve bounded recent activity from the local cache 26 + - [x] Add a JetStream cache consumer with a persisted timestamp cursor 27 + - [x] Seed the JetStream cursor to `now - 24h` on first boot and rewind slightly on reconnect 28 + - [x] Store and serve bounded recent activity from the local cache 29 29 - [ ] Keep Tap as the authoritative indexing and bulk backfill path 30 30 - [ ] Define a controlled backfill and repo-resync playbook for recovery (`docs/references/resync.md`) 31 31
-51
packages/api/internal/api/actors.go
··· 178 178 } 179 179 } 180 180 181 - // isError is a type-safe errors.As replacement for pointer receiver targets. 182 - func isError[T error](err error, target *T) bool { 183 - if err == nil { 184 - return false 185 - } 186 - 187 - type unwrapper interface{ Unwrap() error } 188 - for e := err; e != nil; { 189 - if t, ok := e.(T); ok { 190 - *target = t 191 - return true 192 - } 193 - if u, ok := e.(unwrapper); ok { 194 - e = u.Unwrap() 195 - } else { 196 - break 197 - } 198 - } 199 - return false 200 - } 201 - 202 181 // handleGetActor returns the actor's Tangled profile + optional Bluesky info. 203 182 // GET /actors/{handle} 204 183 func (s *Server) handleGetActor(w http.ResponseWriter, r *http.Request) { ··· 872 851 return pulls, statusMap, nil 873 852 } 874 853 875 - func resolveIssueState(stateMap map[string]string, issueURI string) string { 876 - raw := stateMap[issueURI] 877 - if strings.HasSuffix(raw, ".closed") { 878 - return "closed" 879 - } 880 - return "open" 881 - } 882 - 883 - func resolvePullStatus(statusMap map[string]string, pullURI string) string { 884 - raw := statusMap[pullURI] 885 - switch { 886 - case strings.HasSuffix(raw, ".merged"): 887 - return "merged" 888 - case strings.HasSuffix(raw, ".closed"): 889 - return "closed" 890 - default: 891 - return "open" 892 - } 893 - } 894 - 895 854 type bskyProfileResponse struct { 896 855 DisplayName string `json:"displayName,omitempty"` 897 856 Avatar string `json:"avatar,omitempty"` ··· 910 869 } 911 870 return &p 912 871 } 913 - 914 - // parseATURI splits an AT URI (at://did/collection/rkey) into its components. 915 - func parseATURI(uri string) (did, collection, rkey string, err error) { 916 - trimmed := strings.TrimPrefix(uri, "at://") 917 - parts := strings.SplitN(trimmed, "/", 3) 918 - if len(parts) != 3 { 919 - return "", "", "", fmt.Errorf("invalid AT URI: %q", uri) 920 - } 921 - return parts[0], parts[1], parts[2], nil 922 - }
+69 -59
packages/api/internal/api/api.go
··· 8 8 "log/slog" 9 9 "net" 10 10 "net/http" 11 - "strconv" 12 11 "strings" 13 12 "sync" 14 13 "time" ··· 48 47 } 49 48 50 49 // Handler returns the HTTP handler with all routes registered. 50 + // 51 + // TODO: refactor this to have a func router() that returns [http.Handler] (mux), 52 + // then registerXXX routes 51 53 func (s *Server) Handler() http.Handler { 52 54 mux := http.NewServeMux() 53 55 ··· 63 65 mux.HandleFunc("GET /profiles/{did}/summary", s.handleProfileSummary) 64 66 65 67 mux.HandleFunc("GET /backlinks/count", s.handleBacklinksCount) 68 + mux.HandleFunc("GET /activity", s.handleActivity) 66 69 mux.HandleFunc("GET /activity/stream", s.handleActivityStream) 67 70 mux.HandleFunc("GET /identity/resolve", s.handleResolveHandle) 68 71 mux.HandleFunc("GET /identity/did/{did}", s.handleDidDocument) ··· 120 123 121 124 errCh := make(chan error, 1) 122 125 go s.runReadThroughIndexer(ctx) 126 + go s.runJetstreamConsumer(ctx) 123 127 124 128 go func() { 125 129 s.log.Info("listening", slog.String("addr", s.cfg.HTTPBindAddr)) ··· 407 411 writeJSON(w, http.StatusOK, summary) 408 412 } 409 413 410 - func (s *Server) handleNotImplemented(w http.ResponseWriter, _ *http.Request) { 411 - writeJSON(w, http.StatusNotImplemented, errorBody("not_implemented", "this endpoint is not yet available")) 414 + // knownActivityParams is the whitelist of accepted query parameters for the activity endpoint. 415 + var knownActivityParams = map[string]bool{ 416 + "limit": true, "offset": true, 417 + "collection": true, "operation": true, "did": true, 412 418 } 413 419 414 - func writeJSON(w http.ResponseWriter, status int, v any) { 415 - w.Header().Set("Content-Type", "application/json") 416 - w.WriteHeader(status) 417 - _ = json.NewEncoder(w).Encode(v) 418 - } 420 + // handleActivity returns bounded recent activity from the JetStream cache. 421 + // Route: GET /activity 422 + func (s *Server) handleActivity(w http.ResponseWriter, r *http.Request) { 423 + for key := range r.URL.Query() { 424 + if !knownActivityParams[key] { 425 + writeJSON(w, http.StatusBadRequest, errorBody("unknown_parameter", fmt.Sprintf("unknown parameter: %s", key))) 426 + return 427 + } 428 + } 419 429 420 - func errorBody(code, message string) map[string]string { 421 - return map[string]string{"error": code, "message": message} 422 - } 430 + limit, err := intParam(r, "limit", 50) 431 + if err != nil || limit < 1 || limit > 200 { 432 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "limit must be between 1 and 200")) 433 + return 434 + } 435 + offset, err := intParam(r, "offset", 0) 436 + if err != nil || offset < 0 { 437 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "offset must be >= 0")) 438 + return 439 + } 423 440 424 - func intParam(r *http.Request, key string, def int) (int, error) { 425 - v := r.URL.Query().Get(key) 426 - if v == "" { 427 - return def, nil 441 + filter := store.JetstreamEventFilter{ 442 + Collection: r.URL.Query().Get("collection"), 443 + Operation: r.URL.Query().Get("operation"), 444 + DID: r.URL.Query().Get("did"), 445 + Limit: limit, 446 + Offset: offset, 428 447 } 429 - n, err := strconv.Atoi(v) 448 + 449 + events, err := s.store.ListJetstreamEvents(r.Context(), filter) 430 450 if err != nil { 431 - return 0, err 451 + s.log.Error("list activity failed", slog.String("error", err.Error())) 452 + writeJSON(w, http.StatusInternalServerError, errorBody("db_error", "failed to fetch activity")) 453 + return 432 454 } 433 - return n, nil 434 - } 435 455 436 - type documentJSON struct { 437 - ID string `json:"id"` 438 - DID string `json:"did"` 439 - Collection string `json:"collection"` 440 - RKey string `json:"rkey"` 441 - ATURI string `json:"at_uri"` 442 - CID string `json:"cid"` 443 - RecordType string `json:"record_type"` 444 - Title string `json:"title"` 445 - Body string `json:"body"` 446 - Summary string `json:"summary,omitempty"` 447 - RepoName string `json:"repo_name,omitempty"` 448 - AuthorHandle string `json:"author_handle,omitempty"` 449 - TagsJSON string `json:"tags_json,omitempty"` 450 - Language string `json:"language,omitempty"` 451 - WebURL string `json:"web_url,omitempty"` 452 - CreatedAt string `json:"created_at,omitempty"` 453 - UpdatedAt string `json:"updated_at,omitempty"` 454 - IndexedAt string `json:"indexed_at"` 456 + type eventJSON struct { 457 + TimeUS int64 `json:"time_us"` 458 + DID string `json:"did"` 459 + Kind string `json:"kind"` 460 + Collection string `json:"collection,omitempty"` 461 + RKey string `json:"rkey,omitempty"` 462 + Operation string `json:"operation,omitempty"` 463 + Payload json.RawMessage `json:"payload"` 464 + } 465 + 466 + out := make([]eventJSON, 0, len(events)) 467 + for _, e := range events { 468 + out = append(out, eventJSON{ 469 + TimeUS: e.TimeUS, 470 + DID: e.DID, 471 + Kind: e.Kind, 472 + Collection: e.Collection, 473 + RKey: e.RKey, 474 + Operation: e.Operation, 475 + Payload: json.RawMessage(e.Payload), 476 + }) 477 + } 478 + 479 + writeJSON(w, http.StatusOK, map[string]any{ 480 + "limit": limit, 481 + "offset": offset, 482 + "events": out, 483 + }) 455 484 } 456 485 457 - func documentResponse(doc *store.Document) documentJSON { 458 - return documentJSON{ 459 - ID: doc.ID, 460 - DID: doc.DID, 461 - Collection: doc.Collection, 462 - RKey: doc.RKey, 463 - ATURI: doc.ATURI, 464 - CID: doc.CID, 465 - RecordType: doc.RecordType, 466 - Title: doc.Title, 467 - Body: doc.Body, 468 - Summary: doc.Summary, 469 - RepoName: doc.RepoName, 470 - AuthorHandle: doc.AuthorHandle, 471 - TagsJSON: doc.TagsJSON, 472 - Language: doc.Language, 473 - WebURL: doc.WebURL, 474 - CreatedAt: doc.CreatedAt, 475 - UpdatedAt: doc.UpdatedAt, 476 - IndexedAt: doc.IndexedAt, 477 - } 486 + func (s *Server) handleNotImplemented(w http.ResponseWriter, _ *http.Request) { 487 + writeJSON(w, http.StatusNotImplemented, errorBody("not_implemented", "this endpoint is not yet available")) 478 488 }
+151
packages/api/internal/api/helpers.go
··· 1 + package api 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "net/http" 7 + "strconv" 8 + "strings" 9 + "time" 10 + 11 + "tangled.org/desertthunder.dev/twister/internal/store" 12 + ) 13 + 14 + func writeJSON(w http.ResponseWriter, status int, v any) { 15 + w.Header().Set("Content-Type", "application/json") 16 + w.WriteHeader(status) 17 + _ = json.NewEncoder(w).Encode(v) 18 + } 19 + 20 + func errorBody(code, message string) map[string]string { 21 + return map[string]string{"error": code, "message": message} 22 + } 23 + 24 + func intParam(r *http.Request, key string, def int) (int, error) { 25 + v := r.URL.Query().Get(key) 26 + if v == "" { 27 + return def, nil 28 + } 29 + n, err := strconv.Atoi(v) 30 + if err != nil { 31 + return 0, err 32 + } 33 + return n, nil 34 + } 35 + 36 + type documentJSON struct { 37 + ID string `json:"id"` 38 + DID string `json:"did"` 39 + Collection string `json:"collection"` 40 + RKey string `json:"rkey"` 41 + ATURI string `json:"at_uri"` 42 + CID string `json:"cid"` 43 + RecordType string `json:"record_type"` 44 + Title string `json:"title"` 45 + Body string `json:"body"` 46 + Summary string `json:"summary,omitempty"` 47 + RepoName string `json:"repo_name,omitempty"` 48 + AuthorHandle string `json:"author_handle,omitempty"` 49 + TagsJSON string `json:"tags_json,omitempty"` 50 + Language string `json:"language,omitempty"` 51 + WebURL string `json:"web_url,omitempty"` 52 + CreatedAt string `json:"created_at,omitempty"` 53 + UpdatedAt string `json:"updated_at,omitempty"` 54 + IndexedAt string `json:"indexed_at"` 55 + } 56 + 57 + func documentResponse(doc *store.Document) documentJSON { 58 + return documentJSON{ 59 + ID: doc.ID, 60 + DID: doc.DID, 61 + Collection: doc.Collection, 62 + RKey: doc.RKey, 63 + ATURI: doc.ATURI, 64 + CID: doc.CID, 65 + RecordType: doc.RecordType, 66 + Title: doc.Title, 67 + Body: doc.Body, 68 + Summary: doc.Summary, 69 + RepoName: doc.RepoName, 70 + AuthorHandle: doc.AuthorHandle, 71 + TagsJSON: doc.TagsJSON, 72 + Language: doc.Language, 73 + WebURL: doc.WebURL, 74 + CreatedAt: doc.CreatedAt, 75 + UpdatedAt: doc.UpdatedAt, 76 + IndexedAt: doc.IndexedAt, 77 + } 78 + } 79 + 80 + // isError is a type-safe errors.As replacement for pointer receiver targets. 81 + func isError[T error](err error, target *T) bool { 82 + if err == nil { 83 + return false 84 + } 85 + 86 + type unwrapper interface{ Unwrap() error } 87 + for e := err; e != nil; { 88 + if t, ok := e.(T); ok { 89 + *target = t 90 + return true 91 + } 92 + if u, ok := e.(unwrapper); ok { 93 + e = u.Unwrap() 94 + } else { 95 + break 96 + } 97 + } 98 + return false 99 + } 100 + 101 + func resolveIssueState(stateMap map[string]string, issueURI string) string { 102 + raw := stateMap[issueURI] 103 + if strings.HasSuffix(raw, ".closed") { 104 + return "closed" 105 + } 106 + return "open" 107 + } 108 + 109 + func resolvePullStatus(statusMap map[string]string, pullURI string) string { 110 + raw := statusMap[pullURI] 111 + switch { 112 + case strings.HasSuffix(raw, ".merged"): 113 + return "merged" 114 + case strings.HasSuffix(raw, ".closed"): 115 + return "closed" 116 + default: 117 + return "open" 118 + } 119 + } 120 + 121 + // parseATURI splits an AT URI (at://did/collection/rkey) into its components. 122 + func parseATURI(uri string) (did, collection, rkey string, err error) { 123 + trimmed := strings.TrimPrefix(uri, "at://") 124 + parts := strings.SplitN(trimmed, "/", 3) 125 + if len(parts) != 3 { 126 + return "", "", "", fmt.Errorf("invalid AT URI: %q", uri) 127 + } 128 + return parts[0], parts[1], parts[2], nil 129 + } 130 + 131 + func retryDelay(attempt int) time.Duration { 132 + if attempt < 1 { 133 + attempt = 1 134 + } 135 + base := time.Second * time.Duration(1<<min(attempt-1, 8)) 136 + if base > 5*time.Minute { 137 + return 5 * time.Minute 138 + } 139 + return base 140 + } 141 + 142 + func truncateErr(err error) string { 143 + if err == nil { 144 + return "" 145 + } 146 + msg := err.Error() 147 + if len(msg) > 500 { 148 + return msg[:500] 149 + } 150 + return msg 151 + }
+174
packages/api/internal/api/jetstream.go
··· 1 + package api 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log/slog" 8 + "strconv" 9 + "strings" 10 + "time" 11 + 12 + "github.com/coder/websocket" 13 + "tangled.org/desertthunder.dev/twister/internal/store" 14 + ) 15 + 16 + const ( 17 + jetstreamConsumerName = "jetstream-cache-v1" 18 + jetstreamReconnectDelay = 5 * time.Second 19 + // persist cursor every N events 20 + jetstreamCursorInterval = 50 21 + ) 22 + 23 + type jetstreamMessage struct { 24 + DID string `json:"did"` 25 + TimeUS int64 `json:"time_us"` 26 + Kind string `json:"kind"` 27 + Commit *struct { 28 + Operation string `json:"operation"` 29 + Collection string `json:"collection"` 30 + RKey string `json:"rkey"` 31 + } `json:"commit"` 32 + } 33 + 34 + // runJetstreamConsumer connects to JetStream and caches sh.tangled.* events. 35 + // Reconnects automatically after any disconnect. 36 + func (s *Server) runJetstreamConsumer(ctx context.Context) { 37 + s.log.Info("jetstream cache consumer starting") 38 + for { 39 + if ctx.Err() != nil { 40 + s.log.Info("jetstream cache consumer stopped") 41 + return 42 + } 43 + if err := s.consumeJetstream(ctx); err != nil && ctx.Err() == nil { 44 + s.log.Warn("jetstream consumer disconnected; reconnecting", 45 + slog.String("error", err.Error()), 46 + slog.Duration("retry_in", jetstreamReconnectDelay), 47 + ) 48 + } 49 + select { 50 + case <-ctx.Done(): 51 + return 52 + case <-time.After(jetstreamReconnectDelay): 53 + } 54 + } 55 + } 56 + 57 + func (s *Server) consumeJetstream(ctx context.Context) error { 58 + cursorUS, err := s.loadJetstreamCursor(ctx) 59 + if err != nil { 60 + return fmt.Errorf("load jetstream cursor: %w", err) 61 + } 62 + 63 + u := buildJetstreamURL(s.cfg.JetstreamURL, s.cfg.JetstreamWantedCollections, cursorUS) 64 + conn, _, err := websocket.Dial(ctx, u, nil) 65 + if err != nil { 66 + return fmt.Errorf("jetstream dial: %w", err) 67 + } 68 + defer conn.CloseNow() 69 + 70 + s.log.Info("jetstream consumer connected", slog.Int64("cursor_us", cursorUS)) 71 + 72 + var count int64 73 + var lastTimeUS int64 = cursorUS 74 + 75 + for { 76 + if ctx.Err() != nil { 77 + return nil 78 + } 79 + 80 + _, msg, err := conn.Read(ctx) 81 + if err != nil { 82 + if ctx.Err() != nil { 83 + return nil 84 + } 85 + return fmt.Errorf("jetstream read: %w", err) 86 + } 87 + 88 + var jmsg jetstreamMessage 89 + if err := json.Unmarshal(msg, &jmsg); err != nil { 90 + s.log.Debug("jetstream parse failed", slog.String("error", err.Error())) 91 + continue 92 + } 93 + if jmsg.DID == "" || jmsg.TimeUS == 0 { 94 + continue 95 + } 96 + 97 + evt := &store.JetstreamEvent{ 98 + TimeUS: jmsg.TimeUS, 99 + DID: jmsg.DID, 100 + Kind: jmsg.Kind, 101 + Payload: string(msg), 102 + ReceivedAt: time.Now().UTC().Format(time.RFC3339), 103 + } 104 + if jmsg.Commit != nil { 105 + evt.Collection = jmsg.Commit.Collection 106 + evt.RKey = jmsg.Commit.RKey 107 + evt.Operation = jmsg.Commit.Operation 108 + } 109 + 110 + if err := s.store.InsertJetstreamEvent(ctx, evt, s.cfg.ActivityMaxEvents); err != nil { 111 + s.log.Warn("jetstream insert failed", slog.String("error", err.Error())) 112 + } 113 + 114 + lastTimeUS = jmsg.TimeUS 115 + count++ 116 + 117 + if count%jetstreamCursorInterval == 0 { 118 + cursor := strconv.FormatInt(lastTimeUS, 10) 119 + if err := s.store.SetSyncState(ctx, jetstreamConsumerName, cursor); err != nil { 120 + s.log.Warn("jetstream cursor persist failed", slog.String("error", err.Error())) 121 + } 122 + } 123 + } 124 + } 125 + 126 + // loadJetstreamCursor returns the cursor to use when connecting to JetStream. 127 + // First boot: seeds to now-24h. Subsequent connects: rewinds by ActivityRewindDuration. 128 + func (s *Server) loadJetstreamCursor(ctx context.Context) (int64, error) { 129 + state, err := s.store.GetSyncState(ctx, jetstreamConsumerName) 130 + if err != nil { 131 + return 0, err 132 + } 133 + 134 + if state == nil || strings.TrimSpace(state.Cursor) == "" { 135 + seed := time.Now().UTC().Add(-24 * time.Hour).UnixMicro() 136 + s.log.Info("jetstream cursor: first boot, seeding to now-24h", slog.Int64("cursor_us", seed)) 137 + return seed, nil 138 + } 139 + 140 + cursor, err := strconv.ParseInt(strings.TrimSpace(state.Cursor), 10, 64) 141 + if err != nil { 142 + seed := time.Now().UTC().Add(-24 * time.Hour).UnixMicro() 143 + s.log.Warn("jetstream cursor parse failed; reseeding to now-24h", 144 + slog.String("raw", state.Cursor), slog.String("error", err.Error())) 145 + return seed, nil 146 + } 147 + 148 + rewound := cursor - s.cfg.ActivityRewindDuration.Microseconds() 149 + s.log.Info("jetstream cursor: rewinding on reconnect", 150 + slog.Int64("prev_us", cursor), 151 + slog.Duration("rewind", s.cfg.ActivityRewindDuration), 152 + slog.Int64("cursor_us", rewound), 153 + ) 154 + return rewound, nil 155 + } 156 + 157 + // buildJetstreamURL constructs the JetStream subscribe URL with wanted collections 158 + // and cursor. The wildcard character in collection names is not percent-encoded. 159 + func buildJetstreamURL(base, collections string, cursorUS int64) string { 160 + var parts []string 161 + for _, c := range strings.Split(collections, ",") { 162 + c = strings.TrimSpace(c) 163 + if c != "" { 164 + parts = append(parts, "wantedCollections="+c) 165 + } 166 + } 167 + if cursorUS > 0 { 168 + parts = append(parts, "cursor="+strconv.FormatInt(cursorUS, 10)) 169 + } 170 + if len(parts) == 0 { 171 + return base 172 + } 173 + return base + "?" + strings.Join(parts, "&") 174 + }
+47
packages/api/internal/api/jetstream_test.go
··· 1 + package api 2 + 3 + import ( 4 + "strings" 5 + "testing" 6 + ) 7 + 8 + func TestBuildJetstreamURL(t *testing.T) { 9 + base := "wss://jetstream2.us-east.bsky.network/subscribe" 10 + 11 + t.Run("single collection with cursor", func(t *testing.T) { 12 + u := buildJetstreamURL(base, "sh.tangled.*", 1_000_000) 13 + if !strings.Contains(u, "wantedCollections=sh.tangled.*") { 14 + t.Errorf("missing wantedCollections in %q", u) 15 + } 16 + if !strings.Contains(u, "cursor=1000000") { 17 + t.Errorf("missing cursor in %q", u) 18 + } 19 + }) 20 + 21 + t.Run("multiple collections", func(t *testing.T) { 22 + u := buildJetstreamURL(base, "sh.tangled.repo, sh.tangled.repo.issue", 0) 23 + if !strings.Contains(u, "wantedCollections=sh.tangled.repo") { 24 + t.Errorf("missing first collection in %q", u) 25 + } 26 + if !strings.Contains(u, "wantedCollections=sh.tangled.repo.issue") { 27 + t.Errorf("missing second collection in %q", u) 28 + } 29 + if strings.Contains(u, "cursor=") { 30 + t.Errorf("unexpected cursor when cursorUS=0 in %q", u) 31 + } 32 + }) 33 + 34 + t.Run("no collections no cursor", func(t *testing.T) { 35 + u := buildJetstreamURL(base, "", 0) 36 + if u != base { 37 + t.Errorf("expected bare base URL, got %q", u) 38 + } 39 + }) 40 + 41 + t.Run("wildcard is not percent-encoded", func(t *testing.T) { 42 + u := buildJetstreamURL(base, "sh.tangled.*", 42) 43 + if strings.Contains(u, "%2A") { 44 + t.Errorf("wildcard was percent-encoded in %q", u) 45 + } 46 + }) 47 + }
-29
packages/api/internal/api/readthrough.go
··· 291 291 s.enqueueXRPCRecord(ctx, e.URI, e.CID, e.Value) 292 292 } 293 293 } 294 - 295 - func retryDelay(attempt int) time.Duration { 296 - if attempt < 1 { 297 - attempt = 1 298 - } 299 - base := time.Second * time.Duration(1<<minInt(attempt-1, 8)) 300 - if base > 5*time.Minute { 301 - return 5 * time.Minute 302 - } 303 - return base 304 - } 305 - 306 - func truncateErr(err error) string { 307 - if err == nil { 308 - return "" 309 - } 310 - msg := err.Error() 311 - if len(msg) > 500 { 312 - return msg[:500] 313 - } 314 - return msg 315 - } 316 - 317 - func minInt(a, b int) int { 318 - if a < b { 319 - return a 320 - } 321 - return b 322 - }
+12 -4
packages/api/internal/config/config.go
··· 42 42 ConstellationUserAgent string 43 43 ConstellationTimeout time.Duration 44 44 ConstellationCacheTTL time.Duration 45 - OAuthClientID string 46 - OAuthRedirectURIs []string 45 + OAuthClientID string 46 + OAuthRedirectURIs []string 47 + JetstreamURL string 48 + JetstreamWantedCollections string 49 + ActivityMaxEvents int 50 + ActivityRewindDuration time.Duration 47 51 } 48 52 49 53 type LoadOptions struct { ··· 85 89 ConstellationUserAgent: envOrDefault("CONSTELLATION_USER_AGENT", "twister/1.0 (https://tangled.org/desertthunder.dev/twisted; Owais <desertthunder.dev@gmail.com>)"), 86 90 ConstellationTimeout: envDuration("CONSTELLATION_TIMEOUT", 10*time.Second), 87 91 ConstellationCacheTTL: envDuration("CONSTELLATION_CACHE_TTL", 5*time.Minute), 88 - OAuthClientID: os.Getenv("OAUTH_CLIENT_ID"), 89 - OAuthRedirectURIs: envSlice("OAUTH_REDIRECT_URIS", nil), 92 + OAuthClientID: os.Getenv("OAUTH_CLIENT_ID"), 93 + OAuthRedirectURIs: envSlice("OAUTH_REDIRECT_URIS", nil), 94 + JetstreamURL: envOrDefault("JETSTREAM_URL", "wss://jetstream2.us-east.bsky.network/subscribe"), 95 + JetstreamWantedCollections: envOrDefault("JETSTREAM_WANTED_COLLECTIONS", "sh.tangled.*"), 96 + ActivityMaxEvents: envInt("ACTIVITY_MAX_EVENTS", 500), 97 + ActivityRewindDuration: envDuration("ACTIVITY_REWIND_DURATION", 5*time.Minute), 90 98 } 91 99 92 100 if opts.Local {
+8
packages/api/internal/ingest/ingest_test.go
··· 147 147 return 0, nil 148 148 } 149 149 150 + func (f *fakeStore) InsertJetstreamEvent(_ context.Context, _ *store.JetstreamEvent, _ int) error { 151 + return nil 152 + } 153 + 154 + func (f *fakeStore) ListJetstreamEvents(_ context.Context, _ store.JetstreamEventFilter) ([]*store.JetstreamEvent, error) { 155 + return nil, nil 156 + } 157 + 150 158 func (f *fakeStore) Ping(_ context.Context) error { 151 159 return nil 152 160 }
+14
packages/api/internal/store/migrations/006_jetstream_events.sql
··· 1 + CREATE TABLE IF NOT EXISTS jetstream_events ( 2 + id INTEGER PRIMARY KEY AUTOINCREMENT, 3 + time_us INTEGER NOT NULL, 4 + did TEXT NOT NULL, 5 + kind TEXT NOT NULL, 6 + collection TEXT, 7 + rkey TEXT, 8 + operation TEXT, 9 + payload TEXT NOT NULL, 10 + received_at TEXT NOT NULL 11 + ); 12 + 13 + CREATE INDEX IF NOT EXISTS idx_jetstream_events_time_us 14 + ON jetstream_events(time_us DESC);
+94
packages/api/internal/store/sql_store.go
··· 469 469 return n, nil 470 470 } 471 471 472 + func (s *SQLStore) InsertJetstreamEvent(ctx context.Context, event *JetstreamEvent, maxEvents int) error { 473 + tx, err := s.db.BeginTx(ctx, nil) 474 + if err != nil { 475 + return fmt.Errorf("begin insert jetstream event tx: %w", err) 476 + } 477 + defer tx.Rollback() 478 + 479 + _, err = tx.ExecContext(ctx, ` 480 + INSERT INTO jetstream_events (time_us, did, kind, collection, rkey, operation, payload, received_at) 481 + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, 482 + event.TimeUS, event.DID, event.Kind, 483 + nullableStr(event.Collection), nullableStr(event.RKey), nullableStr(event.Operation), 484 + event.Payload, event.ReceivedAt, 485 + ) 486 + if err != nil { 487 + return fmt.Errorf("insert jetstream event: %w", err) 488 + } 489 + 490 + if maxEvents > 0 { 491 + _, err = tx.ExecContext(ctx, ` 492 + DELETE FROM jetstream_events 493 + WHERE id NOT IN ( 494 + SELECT id FROM jetstream_events ORDER BY time_us DESC LIMIT ? 495 + )`, maxEvents) 496 + if err != nil { 497 + return fmt.Errorf("trim jetstream events: %w", err) 498 + } 499 + } 500 + 501 + if err := tx.Commit(); err != nil { 502 + return fmt.Errorf("commit insert jetstream event tx: %w", err) 503 + } 504 + return nil 505 + } 506 + 507 + func (s *SQLStore) ListJetstreamEvents(ctx context.Context, filter JetstreamEventFilter) ([]*JetstreamEvent, error) { 508 + query := ` 509 + SELECT id, time_us, did, kind, 510 + COALESCE(collection, ''), COALESCE(rkey, ''), COALESCE(operation, ''), 511 + payload, received_at 512 + FROM jetstream_events WHERE 1=1` 513 + args := []any{} 514 + 515 + if filter.Collection != "" { 516 + query += " AND collection = ?" 517 + args = append(args, filter.Collection) 518 + } 519 + if filter.DID != "" { 520 + query += " AND did = ?" 521 + args = append(args, filter.DID) 522 + } 523 + if filter.Operation != "" { 524 + query += " AND operation = ?" 525 + args = append(args, filter.Operation) 526 + } 527 + 528 + query += " ORDER BY time_us DESC" 529 + 530 + limit := filter.Limit 531 + if limit <= 0 { 532 + limit = 50 533 + } 534 + query += " LIMIT ?" 535 + args = append(args, limit) 536 + 537 + if filter.Offset > 0 { 538 + query += " OFFSET ?" 539 + args = append(args, filter.Offset) 540 + } 541 + 542 + rows, err := s.db.QueryContext(ctx, query, args...) 543 + if err != nil { 544 + return nil, fmt.Errorf("list jetstream events: %w", err) 545 + } 546 + defer rows.Close() 547 + 548 + var events []*JetstreamEvent 549 + for rows.Next() { 550 + e := &JetstreamEvent{} 551 + if err := rows.Scan( 552 + &e.ID, &e.TimeUS, &e.DID, &e.Kind, 553 + &e.Collection, &e.RKey, &e.Operation, 554 + &e.Payload, &e.ReceivedAt, 555 + ); err != nil { 556 + return nil, fmt.Errorf("scan jetstream event: %w", err) 557 + } 558 + events = append(events, e) 559 + } 560 + if err := rows.Err(); err != nil { 561 + return nil, fmt.Errorf("iterate jetstream events: %w", err) 562 + } 563 + return events, nil 564 + } 565 + 472 566 func (s *SQLStore) Ping(ctx context.Context) error { 473 567 return s.db.PingContext(ctx) 474 568 }
+24
packages/api/internal/store/store.go
··· 56 56 UpdatedAt string 57 57 } 58 58 59 + // JetstreamEvent is a cached JetStream activity event. 60 + type JetstreamEvent struct { 61 + ID int64 62 + TimeUS int64 63 + DID string 64 + Kind string 65 + Collection string 66 + RKey string 67 + Operation string 68 + Payload string 69 + ReceivedAt string 70 + } 71 + 72 + // JetstreamEventFilter scopes a ListJetstreamEvents query. 73 + type JetstreamEventFilter struct { 74 + Collection string 75 + DID string 76 + Operation string 77 + Limit int 78 + Offset int 79 + } 80 + 59 81 // IndexingJobInput is the payload used to enqueue or refresh an indexing job. 60 82 type IndexingJobInput struct { 61 83 DocumentID string ··· 97 119 GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) 98 120 CountDocuments(ctx context.Context) (int64, error) 99 121 CountPendingIndexingJobs(ctx context.Context) (int64, error) 122 + InsertJetstreamEvent(ctx context.Context, event *JetstreamEvent, maxEvents int) error 123 + ListJetstreamEvents(ctx context.Context, filter JetstreamEventFilter) ([]*JetstreamEvent, error) 100 124 Ping(ctx context.Context) error 101 125 }
+103
packages/api/internal/store/store_test.go
··· 316 316 } 317 317 }) 318 318 319 + t.Run("jetstream events insert and list", func(t *testing.T) { 320 + evt := &store.JetstreamEvent{ 321 + TimeUS: 1_000_000, 322 + DID: "did:plc:evt1", 323 + Kind: "commit", 324 + Collection: "sh.tangled.repo", 325 + RKey: "rkey1", 326 + Operation: "create", 327 + Payload: `{"did":"did:plc:evt1","time_us":1000000,"kind":"commit"}`, 328 + ReceivedAt: "2026-01-01T00:00:00Z", 329 + } 330 + if err := st.InsertJetstreamEvent(ctx, evt, 500); err != nil { 331 + t.Fatalf("insert jetstream event: %v", err) 332 + } 333 + 334 + events, err := st.ListJetstreamEvents(ctx, store.JetstreamEventFilter{Limit: 10}) 335 + if err != nil { 336 + t.Fatalf("list jetstream events: %v", err) 337 + } 338 + if len(events) < 1 { 339 + t.Fatal("expected at least one jetstream event") 340 + } 341 + found := false 342 + for _, e := range events { 343 + if e.DID == "did:plc:evt1" && e.Collection == "sh.tangled.repo" { 344 + found = true 345 + if e.Operation != "create" { 346 + t.Errorf("operation: got %q, want create", e.Operation) 347 + } 348 + if e.TimeUS != 1_000_000 { 349 + t.Errorf("time_us: got %d, want 1000000", e.TimeUS) 350 + } 351 + } 352 + } 353 + if !found { 354 + t.Error("inserted event not found in list") 355 + } 356 + }) 357 + 358 + t.Run("jetstream events filter by collection", func(t *testing.T) { 359 + evt := &store.JetstreamEvent{ 360 + TimeUS: 2_000_000, 361 + DID: "did:plc:evt2", 362 + Kind: "commit", 363 + Collection: "sh.tangled.repo.issue", 364 + RKey: "rkey2", 365 + Operation: "create", 366 + Payload: `{"did":"did:plc:evt2","time_us":2000000,"kind":"commit"}`, 367 + ReceivedAt: "2026-01-01T00:00:01Z", 368 + } 369 + if err := st.InsertJetstreamEvent(ctx, evt, 500); err != nil { 370 + t.Fatalf("insert jetstream event: %v", err) 371 + } 372 + 373 + events, err := st.ListJetstreamEvents(ctx, store.JetstreamEventFilter{ 374 + Collection: "sh.tangled.repo.issue", 375 + Limit: 10, 376 + }) 377 + if err != nil { 378 + t.Fatalf("list with collection filter: %v", err) 379 + } 380 + for _, e := range events { 381 + if e.Collection != "sh.tangled.repo.issue" { 382 + t.Errorf("filter leaked event with collection %q", e.Collection) 383 + } 384 + } 385 + if len(events) == 0 { 386 + t.Error("expected at least one event matching collection filter") 387 + } 388 + }) 389 + 390 + t.Run("jetstream events bounded by maxEvents", func(t *testing.T) { 391 + // Insert 5 events with max=3; only the 3 most recent should survive. 392 + for i := int64(1); i <= 5; i++ { 393 + e := &store.JetstreamEvent{ 394 + TimeUS: 100 + i, 395 + DID: "did:plc:bound", 396 + Kind: "commit", 397 + Collection: "sh.tangled.repo", 398 + RKey: "rkey-bound", 399 + Operation: "create", 400 + Payload: `{}`, 401 + ReceivedAt: "2026-01-01T00:00:00Z", 402 + } 403 + if err := st.InsertJetstreamEvent(ctx, e, 3); err != nil { 404 + t.Fatalf("insert bounded event %d: %v", i, err) 405 + } 406 + } 407 + 408 + all, err := st.ListJetstreamEvents(ctx, store.JetstreamEventFilter{ 409 + DID: "did:plc:bound", 410 + Limit: 100, 411 + }) 412 + if err != nil { 413 + t.Fatalf("list bounded events: %v", err) 414 + } 415 + // After inserting 5 events total (including the one from the previous subtests), 416 + // maxEvents=3 means at most 3 rows survive across all events. 417 + if len(all) > 3 { 418 + t.Errorf("expected at most 3 events after bounding, got %d", len(all)) 419 + } 420 + }) 421 + 319 422 t.Run("indexing jobs enqueue claim retry complete", func(t *testing.T) { 320 423 job := store.IndexingJobInput{ 321 424 DocumentID: "did:plc:owner|sh.tangled.repo|repo1",