a love letter to tangled (android, iOS, and a search API)

feat: constellation integration

+342 -42
+5 -5
docs/roadmap.md
··· 7 7 8 8 Add a Constellation client to the Go API for enriching search results with social signals. 9 9 10 - - [ ] Constellation XRPC client (`internal/constellation/`) with `getBacklinksCount` and `getBacklinks` 11 - - [ ] User-agent header with project name and contact 12 - - [ ] Enrich search results with star counts from Constellation 13 - - [ ] Profile summary endpoint (`GET /profiles/{did}/summary`) with follower/following counts from Constellation 14 - - [ ] Cache Constellation responses with short TTL (star/follower counts change infrequently) 10 + - [x] Constellation XRPC client (`internal/constellation/`) with `getBacklinksCount` and `getBacklinks` 11 + - [x] User-agent header with project name and contact 12 + - [x] Enrich search results with star counts from Constellation 13 + - [x] Profile summary endpoint (`GET /profiles/{did}/summary`) with follower/following counts from Constellation 14 + - [x] Cache Constellation responses with short TTL (star/follower counts change infrequently) 15 15 16 16 ## API: Semantic Search Pipeline 17 17
+78 -9
packages/api/internal/api/api.go
··· 8 8 "net/http" 9 9 "strconv" 10 10 "strings" 11 + "sync" 11 12 "time" 12 13 13 14 "tangled.org/desertthunder.dev/twister/internal/config" 15 + "tangled.org/desertthunder.dev/twister/internal/constellation" 14 16 "tangled.org/desertthunder.dev/twister/internal/reindex" 15 17 "tangled.org/desertthunder.dev/twister/internal/search" 16 18 "tangled.org/desertthunder.dev/twister/internal/store" ··· 19 21 20 22 // Server is the HTTP search API server. 21 23 type Server struct { 22 - search *search.Repository 23 - store store.Store 24 - cfg *config.Config 25 - log *slog.Logger 24 + search *search.Repository 25 + store store.Store 26 + cfg *config.Config 27 + log *slog.Logger 28 + constellation *constellation.Client 26 29 } 27 30 28 31 // New creates a new API server. 29 - func New(searchRepo *search.Repository, st store.Store, cfg *config.Config, log *slog.Logger) *Server { 32 + func New(searchRepo *search.Repository, st store.Store, cfg *config.Config, log *slog.Logger, constellation *constellation.Client) *Server { 30 33 return &Server{ 31 - search: searchRepo, 32 - store: st, 33 - cfg: cfg, 34 - log: log, 34 + search: searchRepo, 35 + store: st, 36 + cfg: cfg, 37 + log: log, 38 + constellation: constellation, 35 39 } 36 40 } 37 41 ··· 47 51 mux.HandleFunc("GET /search/hybrid", s.handleNotImplemented) 48 52 49 53 mux.HandleFunc("GET /documents/{id}", s.handleGetDocument) 54 + mux.HandleFunc("GET /profiles/{did}/summary", s.handleProfileSummary) 50 55 51 56 if s.cfg.EnableAdminEndpoints { 52 57 mux.HandleFunc("POST /admin/reindex", s.handleAdminReindex) ··· 213 218 return 214 219 } 215 220 221 + if s.constellation != nil { 222 + s.enrichStarCounts(r.Context(), resp.Results) 223 + } 224 + 216 225 writeJSON(w, http.StatusOK, resp) 217 226 } 218 227 ··· 280 289 "updated": result.Updated, 281 290 "errors": result.Errors, 282 291 }) 292 + } 293 + 294 + // enrichStarCounts fetches star counts from Constellation for repo results in parallel. 295 + // It is best-effort: failures are logged and results are returned without star counts. 296 + // 297 + // Uses a short deadline so enrichment doesn't stall the response. 298 + func (s *Server) enrichStarCounts(ctx context.Context, results []search.Result) { 299 + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) 300 + defer cancel() 301 + 302 + var wg sync.WaitGroup 303 + for i := range results { 304 + if results[i].RecordType != "repo" || results[i].ATURI == "" { 305 + continue 306 + } 307 + wg.Add(1) 308 + go func(i int) { 309 + defer wg.Done() 310 + n, err := s.constellation.GetBacklinksCount(ctx, constellation.BacklinksParams{ 311 + Subject: results[i].ATURI, 312 + Source: constellation.SourceStarURI, 313 + }) 314 + if err != nil { 315 + s.log.Debug("constellation star count failed", slog.String("at_uri", results[i].ATURI), slog.String("error", err.Error())) 316 + return 317 + } 318 + results[i].StarCount = &n 319 + }(i) 320 + } 321 + wg.Wait() 322 + } 323 + 324 + // handleProfileSummary returns follower count and other social signals for a DID. 325 + func (s *Server) handleProfileSummary(w http.ResponseWriter, r *http.Request) { 326 + did := r.PathValue("did") 327 + if did == "" { 328 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "did is required")) 329 + return 330 + } 331 + 332 + type summaryResponse struct { 333 + DID string `json:"did"` 334 + FollowerCount int `json:"follower_count"` 335 + } 336 + 337 + summary := summaryResponse{DID: did} 338 + 339 + if s.constellation != nil { 340 + n, err := s.constellation.GetBacklinksCount(r.Context(), constellation.BacklinksParams{ 341 + Subject: did, 342 + Source: constellation.SourceFollowDID, 343 + }) 344 + if err != nil { 345 + s.log.Debug("constellation follower count failed", slog.String("did", did), slog.String("error", err.Error())) 346 + } else { 347 + summary.FollowerCount = n 348 + } 349 + } 350 + 351 + writeJSON(w, http.StatusOK, summary) 283 352 } 284 353 285 354 func (s *Server) handleNotImplemented(w http.ResponseWriter, _ *http.Request) {
+8
packages/api/internal/config/config.go
··· 38 38 PLCDirectoryURL string 39 39 IdentityServiceURL string 40 40 XRPCTimeout time.Duration 41 + ConstellationURL string 42 + ConstellationUserAgent string 43 + ConstellationTimeout time.Duration 44 + ConstellationCacheTTL time.Duration 41 45 } 42 46 43 47 type LoadOptions struct { ··· 75 79 PLCDirectoryURL: envOrDefault("PLC_DIRECTORY_URL", "https://plc.directory"), 76 80 IdentityServiceURL: envOrDefault("IDENTITY_SERVICE_URL", "https://public.api.bsky.app"), 77 81 XRPCTimeout: envDuration("XRPC_TIMEOUT", 15*time.Second), 82 + ConstellationURL: envOrDefault("CONSTELLATION_URL", "https://constellation.microcosm.blue"), 83 + ConstellationUserAgent: envOrDefault("CONSTELLATION_USER_AGENT", "twister/1.0 (https://tangled.sh; Owais <desertthunder.dev@gmail.com>)"), 84 + ConstellationTimeout: envDuration("CONSTELLATION_TIMEOUT", 10*time.Second), 85 + ConstellationCacheTTL: envDuration("CONSTELLATION_CACHE_TTL", 5*time.Minute), 78 86 } 79 87 80 88 if opts.Local {
+185
packages/api/internal/constellation/client.go
··· 1 + // Package constellation provides a client for the Constellation backlink API. 2 + // Constellation is a public AT Protocol backlink index at https://constellation.microcosm.blue. 3 + // It answers "who linked to this?" across the network, providing social signal counts 4 + // (stars, followers, reactions) without requiring us to maintain our own counters. 5 + package constellation 6 + 7 + import ( 8 + "context" 9 + "encoding/json" 10 + "fmt" 11 + "io" 12 + "net/http" 13 + "net/url" 14 + "time" 15 + ) 16 + 17 + const ( 18 + DefaultBaseURL string = "https://constellation.microcosm.blue" 19 + defaultTimeout time.Duration = 10 * time.Second 20 + defaultCacheTTL time.Duration = 5 * time.Minute 21 + ) 22 + 23 + // Source constants for common Tangled collections. 24 + const ( 25 + SourceStarURI string = "sh.tangled.feed.star:subject.uri" 26 + SourceFollowDID string = "sh.tangled.graph.follow:subject" 27 + SourceReactionURI string = "sh.tangled.feed.reaction:subject.uri" 28 + ) 29 + 30 + // Client is an HTTP client for the Constellation backlink API. 31 + type Client struct { 32 + http *http.Client 33 + baseURL string 34 + userAgent string 35 + countCache *ttlCache[int] 36 + } 37 + 38 + // Option configures a Client. 39 + type Option func(*Client) 40 + 41 + // WithBaseURL overrides the Constellation base URL. 42 + func WithBaseURL(u string) Option { 43 + return func(c *Client) { c.baseURL = u } 44 + } 45 + 46 + // WithUserAgent sets the User-Agent header sent with every request. 47 + // Constellation requires a user-agent identifying the project and a contact. 48 + func WithUserAgent(ua string) Option { 49 + return func(c *Client) { c.userAgent = ua } 50 + } 51 + 52 + // WithTimeout sets the HTTP request timeout. 53 + func WithTimeout(d time.Duration) Option { 54 + return func(c *Client) { c.http.Timeout = d } 55 + } 56 + 57 + // WithCacheTTL sets the TTL for cached count responses. 58 + func WithCacheTTL(d time.Duration) Option { 59 + return func(c *Client) { c.countCache = newTTLCache[int](d) } 60 + } 61 + 62 + // NewClient creates a Constellation client with sensible defaults. 63 + func NewClient(opts ...Option) *Client { 64 + c := &Client{ 65 + http: &http.Client{Timeout: defaultTimeout}, 66 + baseURL: DefaultBaseURL, 67 + userAgent: "twister/1.0", 68 + countCache: newTTLCache[int](defaultCacheTTL), 69 + } 70 + for _, o := range opts { 71 + o(c) 72 + } 73 + return c 74 + } 75 + 76 + // BacklinksParams holds query parameters for backlink endpoints. 77 + type BacklinksParams struct { 78 + // Subject is the target AT-URI or DID being linked to (required). 79 + Subject string 80 + // Source is the collection path, e.g. "sh.tangled.feed.star:subject.uri" (required). 81 + Source string 82 + // DID optionally filters results to a specific actor (repeatable in the raw API; here single-value). 83 + DID string 84 + // Limit is the max results to return (default 16, max 100). 85 + Limit int 86 + // Reverse reverses the ordering. 87 + Reverse bool 88 + } 89 + 90 + // BacklinkRecord is one entry returned by GetBacklinks. 91 + type BacklinkRecord struct { 92 + URI string `json:"uri"` 93 + CID string `json:"cid"` 94 + ActorDID string `json:"actorDid"` 95 + CreatedAt string `json:"createdAt"` 96 + } 97 + 98 + // BacklinksResponse is returned by GetBacklinks. 99 + type BacklinksResponse struct { 100 + Records []BacklinkRecord `json:"records"` 101 + Cursor string `json:"cursor,omitempty"` 102 + } 103 + 104 + // GetBacklinksCount returns the count of records linking to the given subject. 105 + // Results are cached with the configured TTL. Errors are returned without caching. 106 + func (c *Client) GetBacklinksCount(ctx context.Context, p BacklinksParams) (int, error) { 107 + cacheKey := "count\x00" + p.Subject + "\x00" + p.Source 108 + if n, ok := c.countCache.Get(cacheKey); ok { 109 + return n, nil 110 + } 111 + 112 + params := url.Values{} 113 + params.Set("subject", p.Subject) 114 + params.Set("source", p.Source) 115 + 116 + var resp struct { 117 + Count int `json:"count"` 118 + } 119 + if err := c.get(ctx, "blue.microcosm.links.getBacklinksCount", params, &resp); err != nil { 120 + return 0, err 121 + } 122 + 123 + c.countCache.Set(cacheKey, resp.Count) 124 + return resp.Count, nil 125 + } 126 + 127 + // GetBacklinks returns records linking to the given subject. 128 + // Results are not cached because paginated lists change frequently. 129 + func (c *Client) GetBacklinks(ctx context.Context, p BacklinksParams) (*BacklinksResponse, error) { 130 + params := url.Values{} 131 + params.Set("subject", p.Subject) 132 + params.Set("source", p.Source) 133 + if p.DID != "" { 134 + params.Set("did", p.DID) 135 + } 136 + if p.Limit > 0 { 137 + params.Set("limit", fmt.Sprintf("%d", p.Limit)) 138 + } 139 + if p.Reverse { 140 + params.Set("reverse", "true") 141 + } 142 + 143 + var resp BacklinksResponse 144 + if err := c.get(ctx, "blue.microcosm.links.getBacklinks", params, &resp); err != nil { 145 + return nil, err 146 + } 147 + return &resp, nil 148 + } 149 + 150 + func (c *Client) get(ctx context.Context, method string, params url.Values, out any) error { 151 + u := c.baseURL + "/xrpc/" + method 152 + if len(params) > 0 { 153 + u += "?" + params.Encode() 154 + } 155 + 156 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) 157 + if err != nil { 158 + return fmt.Errorf("constellation: build request: %w", err) 159 + } 160 + if c.userAgent != "" { 161 + req.Header.Set("User-Agent", c.userAgent) 162 + } 163 + 164 + resp, err := c.http.Do(req) 165 + if err != nil { 166 + return fmt.Errorf("constellation: request %s: %w", method, err) 167 + } 168 + defer resp.Body.Close() 169 + 170 + if resp.StatusCode < 200 || resp.StatusCode >= 300 { 171 + body, _ := io.ReadAll(resp.Body) 172 + var errResp struct { 173 + Message string `json:"message"` 174 + } 175 + if json.Unmarshal(body, &errResp) == nil && errResp.Message != "" { 176 + return fmt.Errorf("constellation: %s: status %d: %s", method, resp.StatusCode, errResp.Message) 177 + } 178 + return fmt.Errorf("constellation: %s: status %d", method, resp.StatusCode) 179 + } 180 + 181 + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { 182 + return fmt.Errorf("constellation: %s: decode response: %w", method, err) 183 + } 184 + return nil 185 + }
+51
packages/api/internal/constellation/ttl.go
··· 1 + package constellation 2 + 3 + import ( 4 + "sync" 5 + "time" 6 + ) 7 + 8 + type cacheEntry[T any] struct { 9 + value T 10 + expiresAt time.Time 11 + } 12 + 13 + type ttlCache[T any] struct { 14 + mu sync.RWMutex 15 + entries map[string]cacheEntry[T] 16 + ttl time.Duration 17 + } 18 + 19 + func newTTLCache[T any](ttl time.Duration) *ttlCache[T] { 20 + return &ttlCache[T]{ 21 + entries: make(map[string]cacheEntry[T]), 22 + ttl: ttl, 23 + } 24 + } 25 + 26 + func (c *ttlCache[T]) Get(key string) (T, bool) { 27 + c.mu.RLock() 28 + entry, ok := c.entries[key] 29 + c.mu.RUnlock() 30 + if !ok { 31 + var zero T 32 + return zero, false 33 + } 34 + if time.Now().After(entry.expiresAt) { 35 + c.mu.Lock() 36 + delete(c.entries, key) 37 + c.mu.Unlock() 38 + var zero T 39 + return zero, false 40 + } 41 + return entry.value, true 42 + } 43 + 44 + func (c *ttlCache[T]) Set(key string, value T) { 45 + c.mu.Lock() 46 + c.entries[key] = cacheEntry[T]{ 47 + value: value, 48 + expiresAt: time.Now().Add(c.ttl), 49 + } 50 + c.mu.Unlock() 51 + }
+3 -9
packages/api/internal/enrich/enrich.go
··· 29 29 30 30 // Runner performs the enrichment operation. 31 31 type Runner struct { 32 - store store.Store 33 - xrpc *xrpc.Client 34 - log *slog.Logger 32 + store store.Store 33 + xrpc *xrpc.Client 34 + log *slog.Logger 35 35 } 36 36 37 37 // New creates a Runner. ··· 143 143 func (r *Runner) enrichDoc(ctx context.Context, doc *store.Document) bool { 144 144 changed := false 145 145 146 - // Resolve author handle 147 146 if doc.AuthorHandle == "" && doc.DID != "" { 148 147 handle, err := r.store.GetIdentityHandle(ctx, doc.DID) 149 148 if err == nil && handle != "" { ··· 163 162 } 164 163 } 165 164 166 - // Resolve repo name for repo-scoped records 167 165 if doc.RepoDID != "" && doc.RepoName == "" { 168 - // Try to find the repo name from an existing repo document in the store 169 166 repoName := r.findRepoNameFromStore(ctx, doc.RepoDID) 170 167 if repoName != "" { 171 168 doc.RepoName = repoName ··· 173 170 } 174 171 } 175 172 176 - // Resolve repo owner handle for WebURL 177 173 ownerHandle := doc.AuthorHandle 178 174 if doc.RepoDID != "" && doc.RepoDID != doc.DID { 179 175 repoOwnerHandle, err := r.store.GetIdentityHandle(ctx, doc.RepoDID) ··· 187 183 } 188 184 } 189 185 190 - // Build WebURL 191 186 if doc.WebURL == "" { 192 187 webURL := xrpc.BuildWebURL(ownerHandle, doc.RepoName, doc.RecordType, doc.RKey) 193 188 if webURL != "" { ··· 209 204 if err != nil || len(docs) == 0 { 210 205 return "" 211 206 } 212 - // Return the first repo's title (which is the repo name) 213 207 for _, d := range docs { 214 208 if d.Title != "" { 215 209 return d.Title
-12
packages/api/internal/ingest/ingest.go
··· 403 403 return 404 404 } 405 405 406 - // Resolve repo name if RepoDID is set but RepoName is empty 407 406 if doc.RepoDID != "" && doc.RepoName == "" { 408 - // Extract the repo rkey from the RepoDID — the repo AT-URI typically encodes 409 - // the rkey as the last segment. We try to look up the repo record. 410 - // The RepoDID in the document refers to the repo owner's DID. The repo rkey 411 - // can be extracted from the document's AT-URI for repo-scoped records. 412 407 repoRKey := extractRepoRKey(doc.ATURI, doc.Collection) 413 408 if repoRKey != "" { 414 409 name, err := r.xrpcClient.ResolveRepoName(ctx, doc.RepoDID, repoRKey) ··· 424 419 } 425 420 } 426 421 427 - // Resolve author handle if empty 428 422 if doc.AuthorHandle == "" && doc.DID != "" { 429 423 info, err := r.xrpcClient.ResolveIdentity(ctx, doc.DID) 430 424 if err != nil { ··· 438 432 } 439 433 } 440 434 441 - // Build WebURL if we have enough data 442 435 if doc.WebURL == "" { 443 436 ownerHandle := doc.AuthorHandle 444 437 if doc.RepoDID != "" && doc.RepoDID != doc.DID { 445 - // Repo owner may differ from author — try to resolve repo owner handle 446 438 repoOwnerHandle, err := r.store.GetIdentityHandle(ctx, doc.RepoDID) 447 439 if err == nil && repoOwnerHandle != "" { 448 440 ownerHandle = repoOwnerHandle ··· 462 454 // at://did/collection/rkey but the repo is identified by RepoDID. We look 463 455 // for a stored repo document, or try common rkey patterns. 464 456 func extractRepoRKey(atURI, collection string) string { 465 - // For repo records themselves, the rkey IS the repo rkey 466 457 if collection == "sh.tangled.repo" { 467 458 parts := strings.SplitN(atURI, "/", 5) 468 459 if len(parts) >= 5 { 469 460 return parts[4] 470 461 } 471 462 } 472 - // For sub-collections like sh.tangled.repo.issue, the AT-URI contains the 473 - // issue rkey, not the repo rkey. We can't derive the repo rkey from the URI. 474 - // This will be resolved by the enrich command for existing documents. 475 463 return "" 476 464 } 477 465
+1
packages/api/internal/search/search.go
··· 38 38 WebURL string `json:"web_url,omitempty"` 39 39 Score float64 `json:"score"` 40 40 MatchedBy []string `json:"matched_by"` 41 + StarCount *int `json:"star_count,omitempty"` 41 42 CreatedAt string `json:"created_at,omitempty"` 42 43 UpdatedAt string `json:"updated_at,omitempty"` 43 44 }
-6
packages/api/internal/xrpc/did_test.go
··· 55 55 })) 56 56 defer srv.Close() 57 57 58 - // For did:web, we need to override how the URL is constructed. 59 - // We'll use a did:plc: test instead since did:web requires DNS resolution. 60 - // The PLC test above validates the parsing logic. Let's test ResolveIdentity instead. 61 58 t.Skip("did:web requires DNS; tested via PLC path") 62 59 } 63 60 ··· 113 110 114 111 c := NewClient(WithPLCDirectory(srv.URL)) 115 112 116 - // First call: cache miss 117 113 _, err := c.ResolveDIDDoc(context.Background(), "did:plc:cached") 118 114 if err != nil { 119 115 t.Fatal(err) ··· 122 118 t.Fatalf("expected 1 call, got %d", calls) 123 119 } 124 120 125 - // Second call: cache hit 126 121 _, err = c.ResolveDIDDoc(context.Background(), "did:plc:cached") 127 122 if err != nil { 128 123 t.Fatal(err) ··· 131 126 t.Fatalf("expected 1 call (cached), got %d", calls) 132 127 } 133 128 134 - // Invalidate 135 129 c.didCache.Invalidate("did:plc:cached") 136 130 _, err = c.ResolveDIDDoc(context.Background(), "did:plc:cached") 137 131 if err != nil {
+11 -1
packages/api/main.go
··· 14 14 "tangled.org/desertthunder.dev/twister/internal/api" 15 15 "tangled.org/desertthunder.dev/twister/internal/backfill" 16 16 "tangled.org/desertthunder.dev/twister/internal/config" 17 + "tangled.org/desertthunder.dev/twister/internal/constellation" 17 18 "tangled.org/desertthunder.dev/twister/internal/enrich" 18 19 "tangled.org/desertthunder.dev/twister/internal/ingest" 19 20 "tangled.org/desertthunder.dev/twister/internal/normalize" ··· 96 97 97 98 st := store.New(db) 98 99 searchRepo := search.NewRepository(db) 99 - srv := api.New(searchRepo, st, cfg, log) 100 + 101 + constellationClient := constellation.NewClient( 102 + constellation.WithBaseURL(cfg.ConstellationURL), 103 + constellation.WithUserAgent(cfg.ConstellationUserAgent), 104 + constellation.WithTimeout(cfg.ConstellationTimeout), 105 + constellation.WithCacheTTL(cfg.ConstellationCacheTTL), 106 + ) 107 + log.Info("constellation client configured", slog.String("url", cfg.ConstellationURL)) 108 + 109 + srv := api.New(searchRepo, st, cfg, log, constellationClient) 100 110 101 111 ctx, cancel := baseContext() 102 112 defer cancel()