a love letter to tangled (android, iOS, and a search API)
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "os"
9 "os/signal"
10 "syscall"
11 "time"
12
13 "github.com/spf13/cobra"
14 "tangled.org/desertthunder.dev/twister/internal/api"
15 "tangled.org/desertthunder.dev/twister/internal/backfill"
16 "tangled.org/desertthunder.dev/twister/internal/config"
17 "tangled.org/desertthunder.dev/twister/internal/constellation"
18 "tangled.org/desertthunder.dev/twister/internal/enrich"
19 "tangled.org/desertthunder.dev/twister/internal/ingest"
20 "tangled.org/desertthunder.dev/twister/internal/normalize"
21 "tangled.org/desertthunder.dev/twister/internal/observability"
22 "tangled.org/desertthunder.dev/twister/internal/reindex"
23 "tangled.org/desertthunder.dev/twister/internal/search"
24 "tangled.org/desertthunder.dev/twister/internal/store"
25 "tangled.org/desertthunder.dev/twister/internal/tapclient"
26 "tangled.org/desertthunder.dev/twister/internal/xrpc"
27)
28
29var (
30 version = "dev"
31 commit = "none"
32)
33
34func main() {
35 var local bool
36
37 root := &cobra.Command{
38 Use: "twister",
39 Short: "Tangled search service",
40 Version: fmt.Sprintf("%s (%s)", version, commit),
41 SilenceUsage: true,
42 SilenceErrors: true,
43 }
44
45 root.PersistentFlags().BoolVar(&local, "local", false, "Deprecated: use the legacy local SQLite fallback instead of the default Postgres database URL")
46
47 root.AddCommand(
48 newAPICmd(&local),
49 newIndexerCmd(&local),
50 newMigrateCmd(&local),
51 newBackfillCmd(&local),
52 newReindexCmd(&local),
53 newEnrichCmd(&local),
54 newHealthcheckCmd(&local),
55 )
56
57 if err := root.Execute(); err != nil {
58 _, _ = fmt.Fprintln(os.Stderr, "Error:", err)
59 os.Exit(1)
60 }
61}
62
63func baseContext() (context.Context, context.CancelFunc) {
64 ctx, cancel := context.WithCancel(context.Background())
65 go func() {
66 ch := make(chan os.Signal, 1)
67 signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)
68 <-ch
69 cancel()
70 }()
71 return ctx, cancel
72}
73
74func newAPICmd(local *bool) *cobra.Command {
75 return &cobra.Command{
76 Use: "api",
77 Aliases: []string{"serve"},
78 Short: "Start the HTTP search API",
79 RunE: func(cmd *cobra.Command, args []string) error {
80 cfg, err := config.Load(config.LoadOptions{Local: *local})
81 if err != nil {
82 return fmt.Errorf("config: %w", err)
83 }
84 log := observability.NewLogger(cfg)
85 log.Info("starting api", slog.String("service", "api"), slog.String("version", version), slog.String("addr", cfg.HTTPBindAddr))
86
87 db, err := store.Open(cfg.DatabaseURL)
88 if err != nil {
89 return fmt.Errorf("open database: %w", err)
90 }
91 defer db.Close()
92
93 st := store.New(cfg.DatabaseURL, db)
94 searchRepo := search.NewRepository(cfg.DatabaseURL, db)
95
96 constellationClient := constellation.NewClient(
97 constellation.WithBaseURL(cfg.ConstellationURL),
98 constellation.WithUserAgent(cfg.ConstellationUserAgent),
99 constellation.WithTimeout(cfg.ConstellationTimeout),
100 constellation.WithCacheTTL(cfg.ConstellationCacheTTL),
101 )
102 log.Info("constellation client configured", slog.String("url", cfg.ConstellationURL))
103
104 xrpcClient := xrpc.NewClient(
105 xrpc.WithPLCDirectory(cfg.PLCDirectoryURL),
106 xrpc.WithIdentityService(cfg.IdentityServiceURL),
107 xrpc.WithTimeout(cfg.XRPCTimeout),
108 )
109
110 srv := api.New(searchRepo, st, cfg, log, constellationClient, xrpcClient)
111
112 ctx, cancel := baseContext()
113 defer cancel()
114
115 if err := srv.Run(ctx); err != nil {
116 return fmt.Errorf("run api: %w", err)
117 }
118
119 log.Info("shutting down api")
120 return nil
121 },
122 }
123}
124
125func newIndexerCmd(local *bool) *cobra.Command {
126 return &cobra.Command{
127 Use: "indexer",
128 Short: "Start the Tap consumer and indexer",
129 RunE: func(cmd *cobra.Command, args []string) error {
130 cfg, err := config.Load(config.LoadOptions{Local: *local})
131 if err != nil {
132 return fmt.Errorf("config: %w", err)
133 }
134 log := observability.NewLogger(cfg)
135 log.Info("starting indexer", slog.String("service", "indexer"), slog.String("version", version))
136
137 if cfg.TapURL == "" {
138 return fmt.Errorf("TAP_URL is required for indexer")
139 }
140
141 db, err := store.Open(cfg.DatabaseURL)
142 if err != nil {
143 return fmt.Errorf("open database: %w", err)
144 }
145 defer db.Close()
146
147 st := store.New(cfg.DatabaseURL, db)
148 registry := normalize.NewRegistry()
149 tap := tapclient.New(cfg.TapURL, cfg.TapAuthPassword, log)
150 runner := ingest.NewRunner(st, registry, tap, cfg.IndexedCollections, log)
151
152 if cfg.EnableIngestEnrichment {
153 xrpcClient := xrpc.NewClient(
154 xrpc.WithPLCDirectory(cfg.PLCDirectoryURL),
155 xrpc.WithIdentityService(cfg.IdentityServiceURL),
156 xrpc.WithTimeout(cfg.XRPCTimeout),
157 )
158 runner.SetXRPCClient(xrpcClient)
159 log.Info("ingest enrichment enabled")
160 }
161
162 ctx, cancel := baseContext()
163 defer cancel()
164
165 healthMux := http.NewServeMux()
166 healthMux.HandleFunc("GET /health", func(w http.ResponseWriter, r *http.Request) {
167 if err := st.Ping(r.Context()); err != nil {
168 w.Header().Set("Content-Type", "application/json")
169 w.WriteHeader(http.StatusServiceUnavailable)
170 fmt.Fprintf(w, `{"status":"unhealthy","error":"db_unreachable"}`)
171 return
172 }
173 w.Header().Set("Content-Type", "application/json")
174 fmt.Fprintf(w, `{"status":"ok"}`)
175 })
176 healthSrv := &http.Server{
177 Addr: cfg.IndexerHealthAddr,
178 Handler: healthMux,
179 ReadHeaderTimeout: 5 * time.Second,
180 }
181 go func() {
182 log.Info("indexer health server listening", slog.String("addr", cfg.IndexerHealthAddr))
183 if err := healthSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
184 log.Error("indexer health server failed", slog.String("error", err.Error()))
185 }
186 }()
187 defer func() {
188 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
189 defer shutdownCancel()
190 _ = healthSrv.Shutdown(shutdownCtx)
191 }()
192
193 if err := runner.Run(ctx); err != nil {
194 return fmt.Errorf("run indexer: %w", err)
195 }
196
197 log.Info("shutting down indexer")
198 return nil
199 },
200 }
201}
202
203func newMigrateCmd(local *bool) *cobra.Command {
204 return &cobra.Command{
205 Use: "migrate",
206 Short: "Apply database schema migrations",
207 RunE: func(cmd *cobra.Command, args []string) error {
208 cfg, err := config.Load(config.LoadOptions{Local: *local})
209 if err != nil {
210 return fmt.Errorf("config: %w", err)
211 }
212 log := observability.NewLogger(cfg)
213 log.Info("starting migrate", slog.String("service", "migrate"), slog.String("version", version))
214
215 db, err := store.Open(cfg.DatabaseURL)
216 if err != nil {
217 return fmt.Errorf("open database: %w", err)
218 }
219 defer db.Close()
220
221 if err := store.Migrate(db, cfg.DatabaseURL); err != nil {
222 return fmt.Errorf("migrate database: %w", err)
223 }
224
225 log.Info("migrate finished")
226 return nil
227 },
228 }
229}
230
231func newBackfillCmd(local *bool) *cobra.Command {
232 var opts backfill.Options
233
234 cmd := &cobra.Command{
235 Use: "backfill",
236 Short: "Discover repos and register them with Tap backfill",
237 RunE: func(cmd *cobra.Command, args []string) error {
238 cfg, err := config.Load(config.LoadOptions{Local: *local})
239 if err != nil {
240 return fmt.Errorf("config: %w", err)
241 }
242 log := observability.NewLogger(cfg)
243 log.Info("starting backfill", slog.String("service", "backfill"), slog.String("version", version))
244
245 if cfg.TapURL == "" {
246 return fmt.Errorf("TAP_URL is required for backfill")
247 }
248
249 db, err := store.Open(cfg.DatabaseURL)
250 if err != nil {
251 return fmt.Errorf("open database: %w", err)
252 }
253 defer db.Close()
254
255 tapAdmin, err := backfill.NewHTTPTapAdmin(cfg.TapURL, cfg.TapAuthPassword)
256 if err != nil {
257 return fmt.Errorf("tap admin client: %w", err)
258 }
259
260 xrpcClient := xrpc.NewClient(
261 xrpc.WithPLCDirectory(cfg.PLCDirectoryURL),
262 xrpc.WithIdentityService(cfg.IdentityServiceURL),
263 xrpc.WithTimeout(cfg.XRPCTimeout),
264 )
265
266 runner := backfill.NewRunner(
267 store.New(cfg.DatabaseURL, db),
268 tapAdmin,
269 xrpcClient,
270 log,
271 )
272
273 ctx, cancel := baseContext()
274 defer cancel()
275
276 if err := runner.Run(ctx, opts); err != nil {
277 return fmt.Errorf("run backfill: %w", err)
278 }
279
280 log.Info("shutting down backfill")
281 return nil
282 },
283 }
284
285 cmd.Flags().StringVar(&opts.Source, "source", backfill.SourceLightrail, "Discovery source: lightrail or graph")
286 cmd.Flags().StringVar(&opts.LightrailURL, "lightrail-url", backfill.DefaultLightrailURL, "Base URL for listReposByCollection discovery")
287 cmd.Flags().StringArrayVar(&opts.Collections, "collection", nil, "Collection to discover via Lightrail (repeatable)")
288 cmd.Flags().IntVar(&opts.PageLimit, "page-limit", backfill.DefaultPageLimit, "Max DIDs to request per Lightrail page")
289 cmd.Flags().StringVar(&opts.SeedsPath, "seeds", "", "Seed source for --source graph: file path or comma-separated DIDs/handles")
290 cmd.Flags().IntVar(&opts.MaxHops, "max-hops", 2, "Max fan-out depth from graph seeds")
291 cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "Print discovery plan without mutating Tap")
292 cmd.Flags().IntVar(&opts.Concurrency, "concurrency", 5, "Parallel discovery workers")
293 cmd.Flags().IntVar(&opts.BatchSize, "batch-size", 10, "DIDs per /repos/add request")
294 cmd.Flags().DurationVar(&opts.BatchDelay, "batch-delay", time.Second, "Delay between Tap /repos/add batches")
295
296 return cmd
297}
298
299func newReindexCmd(local *bool) *cobra.Command {
300 var opts reindex.Options
301
302 cmd := &cobra.Command{
303 Use: "reindex",
304 Short: "Re-normalize and upsert all documents into the FTS index",
305 RunE: func(cmd *cobra.Command, args []string) error {
306 cfg, err := config.Load(config.LoadOptions{Local: *local})
307 if err != nil {
308 return fmt.Errorf("config: %w", err)
309 }
310 log := observability.NewLogger(cfg)
311 log.Info("starting reindex", slog.String("service", "reindex"), slog.String("version", version))
312
313 db, err := store.Open(cfg.DatabaseURL)
314 if err != nil {
315 return fmt.Errorf("open database: %w", err)
316 }
317 defer db.Close()
318
319 ctx, cancel := baseContext()
320 defer cancel()
321
322 runner := reindex.New(store.New(cfg.DatabaseURL, db), log)
323 result, err := runner.Run(ctx, opts)
324 if result != nil {
325 log.Info("reindex finished",
326 slog.Int("total", result.Total),
327 slog.Int("updated", result.Updated),
328 slog.Int("errors", result.Errors),
329 )
330 }
331 return err
332 },
333 }
334
335 cmd.Flags().StringVar(&opts.Collection, "collection", "", "Reindex only documents in this collection")
336 cmd.Flags().StringVar(&opts.DID, "did", "", "Reindex only documents authored by this DID")
337 cmd.Flags().StringVar(&opts.DocumentID, "document", "", "Reindex a single document by stable ID")
338 cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "Show intended work without writing")
339
340 return cmd
341}
342
343func newEnrichCmd(local *bool) *cobra.Command {
344 var opts enrich.Options
345
346 cmd := &cobra.Command{
347 Use: "enrich",
348 Short: "Backfill RepoName, AuthorHandle, and WebURL on existing documents",
349 RunE: func(cmd *cobra.Command, args []string) error {
350 cfg, err := config.Load(config.LoadOptions{Local: *local})
351 if err != nil {
352 return fmt.Errorf("config: %w", err)
353 }
354 log := observability.NewLogger(cfg)
355 log.Info("starting enrich", slog.String("service", "enrich"), slog.String("version", version))
356
357 db, err := store.Open(cfg.DatabaseURL)
358 if err != nil {
359 return fmt.Errorf("open database: %w", err)
360 }
361 defer db.Close()
362
363 xrpcClient := xrpc.NewClient(
364 xrpc.WithPLCDirectory(cfg.PLCDirectoryURL),
365 xrpc.WithIdentityService(cfg.IdentityServiceURL),
366 xrpc.WithTimeout(cfg.XRPCTimeout),
367 )
368
369 ctx, cancel := baseContext()
370 defer cancel()
371
372 runner := enrich.New(store.New(cfg.DatabaseURL, db), xrpcClient, log)
373 result, err := runner.Run(ctx, opts)
374 if result != nil {
375 log.Info("enrich finished",
376 slog.Int("total", result.Total),
377 slog.Int("updated", result.Updated),
378 slog.Int("skipped", result.Skipped),
379 slog.Int("errors", result.Errors),
380 )
381 }
382 return err
383 },
384 }
385
386 cmd.Flags().StringVar(&opts.Collection, "collection", "", "Enrich only documents in this collection")
387 cmd.Flags().StringVar(&opts.DID, "did", "", "Enrich only documents authored by this DID")
388 cmd.Flags().StringVar(&opts.DocumentID, "document", "", "Enrich a single document by stable ID")
389 cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "Show intended work without writing")
390 cmd.Flags().IntVar(&opts.Concurrency, "concurrency", 5, "Parallel enrichment workers")
391
392 return cmd
393}
394
395func newHealthcheckCmd(local *bool) *cobra.Command {
396 return &cobra.Command{
397 Use: "healthcheck",
398 Short: "One-shot health probe",
399 RunE: func(cmd *cobra.Command, args []string) error {
400 cfg, err := config.Load(config.LoadOptions{Local: *local})
401 if err != nil {
402 return fmt.Errorf("config: %w", err)
403 }
404 log := observability.NewLogger(cfg)
405 log.Info("healthcheck: ok")
406 return nil
407 },
408 }
409}