a love letter to tangled (android, iOS, and a search API)
at main 409 lines 13 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "os" 9 "os/signal" 10 "syscall" 11 "time" 12 13 "github.com/spf13/cobra" 14 "tangled.org/desertthunder.dev/twister/internal/api" 15 "tangled.org/desertthunder.dev/twister/internal/backfill" 16 "tangled.org/desertthunder.dev/twister/internal/config" 17 "tangled.org/desertthunder.dev/twister/internal/constellation" 18 "tangled.org/desertthunder.dev/twister/internal/enrich" 19 "tangled.org/desertthunder.dev/twister/internal/ingest" 20 "tangled.org/desertthunder.dev/twister/internal/normalize" 21 "tangled.org/desertthunder.dev/twister/internal/observability" 22 "tangled.org/desertthunder.dev/twister/internal/reindex" 23 "tangled.org/desertthunder.dev/twister/internal/search" 24 "tangled.org/desertthunder.dev/twister/internal/store" 25 "tangled.org/desertthunder.dev/twister/internal/tapclient" 26 "tangled.org/desertthunder.dev/twister/internal/xrpc" 27) 28 29var ( 30 version = "dev" 31 commit = "none" 32) 33 34func main() { 35 var local bool 36 37 root := &cobra.Command{ 38 Use: "twister", 39 Short: "Tangled search service", 40 Version: fmt.Sprintf("%s (%s)", version, commit), 41 SilenceUsage: true, 42 SilenceErrors: true, 43 } 44 45 root.PersistentFlags().BoolVar(&local, "local", false, "Deprecated: use the legacy local SQLite fallback instead of the default Postgres database URL") 46 47 root.AddCommand( 48 newAPICmd(&local), 49 newIndexerCmd(&local), 50 newMigrateCmd(&local), 51 newBackfillCmd(&local), 52 newReindexCmd(&local), 53 newEnrichCmd(&local), 54 newHealthcheckCmd(&local), 55 ) 56 57 if err := root.Execute(); err != nil { 58 _, _ = fmt.Fprintln(os.Stderr, "Error:", err) 59 os.Exit(1) 60 } 61} 62 63func baseContext() (context.Context, context.CancelFunc) { 64 ctx, cancel := context.WithCancel(context.Background()) 65 go func() { 66 ch := make(chan os.Signal, 1) 67 signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) 68 <-ch 69 cancel() 70 }() 71 return ctx, cancel 72} 73 74func newAPICmd(local *bool) *cobra.Command { 75 return &cobra.Command{ 76 Use: "api", 77 Aliases: []string{"serve"}, 78 Short: "Start the HTTP search API", 79 RunE: func(cmd *cobra.Command, args []string) error { 80 cfg, err := config.Load(config.LoadOptions{Local: *local}) 81 if err != nil { 82 return fmt.Errorf("config: %w", err) 83 } 84 log := observability.NewLogger(cfg) 85 log.Info("starting api", slog.String("service", "api"), slog.String("version", version), slog.String("addr", cfg.HTTPBindAddr)) 86 87 db, err := store.Open(cfg.DatabaseURL) 88 if err != nil { 89 return fmt.Errorf("open database: %w", err) 90 } 91 defer db.Close() 92 93 st := store.New(cfg.DatabaseURL, db) 94 searchRepo := search.NewRepository(cfg.DatabaseURL, db) 95 96 constellationClient := constellation.NewClient( 97 constellation.WithBaseURL(cfg.ConstellationURL), 98 constellation.WithUserAgent(cfg.ConstellationUserAgent), 99 constellation.WithTimeout(cfg.ConstellationTimeout), 100 constellation.WithCacheTTL(cfg.ConstellationCacheTTL), 101 ) 102 log.Info("constellation client configured", slog.String("url", cfg.ConstellationURL)) 103 104 xrpcClient := xrpc.NewClient( 105 xrpc.WithPLCDirectory(cfg.PLCDirectoryURL), 106 xrpc.WithIdentityService(cfg.IdentityServiceURL), 107 xrpc.WithTimeout(cfg.XRPCTimeout), 108 ) 109 110 srv := api.New(searchRepo, st, cfg, log, constellationClient, xrpcClient) 111 112 ctx, cancel := baseContext() 113 defer cancel() 114 115 if err := srv.Run(ctx); err != nil { 116 return fmt.Errorf("run api: %w", err) 117 } 118 119 log.Info("shutting down api") 120 return nil 121 }, 122 } 123} 124 125func newIndexerCmd(local *bool) *cobra.Command { 126 return &cobra.Command{ 127 Use: "indexer", 128 Short: "Start the Tap consumer and indexer", 129 RunE: func(cmd *cobra.Command, args []string) error { 130 cfg, err := config.Load(config.LoadOptions{Local: *local}) 131 if err != nil { 132 return fmt.Errorf("config: %w", err) 133 } 134 log := observability.NewLogger(cfg) 135 log.Info("starting indexer", slog.String("service", "indexer"), slog.String("version", version)) 136 137 if cfg.TapURL == "" { 138 return fmt.Errorf("TAP_URL is required for indexer") 139 } 140 141 db, err := store.Open(cfg.DatabaseURL) 142 if err != nil { 143 return fmt.Errorf("open database: %w", err) 144 } 145 defer db.Close() 146 147 st := store.New(cfg.DatabaseURL, db) 148 registry := normalize.NewRegistry() 149 tap := tapclient.New(cfg.TapURL, cfg.TapAuthPassword, log) 150 runner := ingest.NewRunner(st, registry, tap, cfg.IndexedCollections, log) 151 152 if cfg.EnableIngestEnrichment { 153 xrpcClient := xrpc.NewClient( 154 xrpc.WithPLCDirectory(cfg.PLCDirectoryURL), 155 xrpc.WithIdentityService(cfg.IdentityServiceURL), 156 xrpc.WithTimeout(cfg.XRPCTimeout), 157 ) 158 runner.SetXRPCClient(xrpcClient) 159 log.Info("ingest enrichment enabled") 160 } 161 162 ctx, cancel := baseContext() 163 defer cancel() 164 165 healthMux := http.NewServeMux() 166 healthMux.HandleFunc("GET /health", func(w http.ResponseWriter, r *http.Request) { 167 if err := st.Ping(r.Context()); err != nil { 168 w.Header().Set("Content-Type", "application/json") 169 w.WriteHeader(http.StatusServiceUnavailable) 170 fmt.Fprintf(w, `{"status":"unhealthy","error":"db_unreachable"}`) 171 return 172 } 173 w.Header().Set("Content-Type", "application/json") 174 fmt.Fprintf(w, `{"status":"ok"}`) 175 }) 176 healthSrv := &http.Server{ 177 Addr: cfg.IndexerHealthAddr, 178 Handler: healthMux, 179 ReadHeaderTimeout: 5 * time.Second, 180 } 181 go func() { 182 log.Info("indexer health server listening", slog.String("addr", cfg.IndexerHealthAddr)) 183 if err := healthSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { 184 log.Error("indexer health server failed", slog.String("error", err.Error())) 185 } 186 }() 187 defer func() { 188 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) 189 defer shutdownCancel() 190 _ = healthSrv.Shutdown(shutdownCtx) 191 }() 192 193 if err := runner.Run(ctx); err != nil { 194 return fmt.Errorf("run indexer: %w", err) 195 } 196 197 log.Info("shutting down indexer") 198 return nil 199 }, 200 } 201} 202 203func newMigrateCmd(local *bool) *cobra.Command { 204 return &cobra.Command{ 205 Use: "migrate", 206 Short: "Apply database schema migrations", 207 RunE: func(cmd *cobra.Command, args []string) error { 208 cfg, err := config.Load(config.LoadOptions{Local: *local}) 209 if err != nil { 210 return fmt.Errorf("config: %w", err) 211 } 212 log := observability.NewLogger(cfg) 213 log.Info("starting migrate", slog.String("service", "migrate"), slog.String("version", version)) 214 215 db, err := store.Open(cfg.DatabaseURL) 216 if err != nil { 217 return fmt.Errorf("open database: %w", err) 218 } 219 defer db.Close() 220 221 if err := store.Migrate(db, cfg.DatabaseURL); err != nil { 222 return fmt.Errorf("migrate database: %w", err) 223 } 224 225 log.Info("migrate finished") 226 return nil 227 }, 228 } 229} 230 231func newBackfillCmd(local *bool) *cobra.Command { 232 var opts backfill.Options 233 234 cmd := &cobra.Command{ 235 Use: "backfill", 236 Short: "Discover repos and register them with Tap backfill", 237 RunE: func(cmd *cobra.Command, args []string) error { 238 cfg, err := config.Load(config.LoadOptions{Local: *local}) 239 if err != nil { 240 return fmt.Errorf("config: %w", err) 241 } 242 log := observability.NewLogger(cfg) 243 log.Info("starting backfill", slog.String("service", "backfill"), slog.String("version", version)) 244 245 if cfg.TapURL == "" { 246 return fmt.Errorf("TAP_URL is required for backfill") 247 } 248 249 db, err := store.Open(cfg.DatabaseURL) 250 if err != nil { 251 return fmt.Errorf("open database: %w", err) 252 } 253 defer db.Close() 254 255 tapAdmin, err := backfill.NewHTTPTapAdmin(cfg.TapURL, cfg.TapAuthPassword) 256 if err != nil { 257 return fmt.Errorf("tap admin client: %w", err) 258 } 259 260 xrpcClient := xrpc.NewClient( 261 xrpc.WithPLCDirectory(cfg.PLCDirectoryURL), 262 xrpc.WithIdentityService(cfg.IdentityServiceURL), 263 xrpc.WithTimeout(cfg.XRPCTimeout), 264 ) 265 266 runner := backfill.NewRunner( 267 store.New(cfg.DatabaseURL, db), 268 tapAdmin, 269 xrpcClient, 270 log, 271 ) 272 273 ctx, cancel := baseContext() 274 defer cancel() 275 276 if err := runner.Run(ctx, opts); err != nil { 277 return fmt.Errorf("run backfill: %w", err) 278 } 279 280 log.Info("shutting down backfill") 281 return nil 282 }, 283 } 284 285 cmd.Flags().StringVar(&opts.Source, "source", backfill.SourceLightrail, "Discovery source: lightrail or graph") 286 cmd.Flags().StringVar(&opts.LightrailURL, "lightrail-url", backfill.DefaultLightrailURL, "Base URL for listReposByCollection discovery") 287 cmd.Flags().StringArrayVar(&opts.Collections, "collection", nil, "Collection to discover via Lightrail (repeatable)") 288 cmd.Flags().IntVar(&opts.PageLimit, "page-limit", backfill.DefaultPageLimit, "Max DIDs to request per Lightrail page") 289 cmd.Flags().StringVar(&opts.SeedsPath, "seeds", "", "Seed source for --source graph: file path or comma-separated DIDs/handles") 290 cmd.Flags().IntVar(&opts.MaxHops, "max-hops", 2, "Max fan-out depth from graph seeds") 291 cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "Print discovery plan without mutating Tap") 292 cmd.Flags().IntVar(&opts.Concurrency, "concurrency", 5, "Parallel discovery workers") 293 cmd.Flags().IntVar(&opts.BatchSize, "batch-size", 10, "DIDs per /repos/add request") 294 cmd.Flags().DurationVar(&opts.BatchDelay, "batch-delay", time.Second, "Delay between Tap /repos/add batches") 295 296 return cmd 297} 298 299func newReindexCmd(local *bool) *cobra.Command { 300 var opts reindex.Options 301 302 cmd := &cobra.Command{ 303 Use: "reindex", 304 Short: "Re-normalize and upsert all documents into the FTS index", 305 RunE: func(cmd *cobra.Command, args []string) error { 306 cfg, err := config.Load(config.LoadOptions{Local: *local}) 307 if err != nil { 308 return fmt.Errorf("config: %w", err) 309 } 310 log := observability.NewLogger(cfg) 311 log.Info("starting reindex", slog.String("service", "reindex"), slog.String("version", version)) 312 313 db, err := store.Open(cfg.DatabaseURL) 314 if err != nil { 315 return fmt.Errorf("open database: %w", err) 316 } 317 defer db.Close() 318 319 ctx, cancel := baseContext() 320 defer cancel() 321 322 runner := reindex.New(store.New(cfg.DatabaseURL, db), log) 323 result, err := runner.Run(ctx, opts) 324 if result != nil { 325 log.Info("reindex finished", 326 slog.Int("total", result.Total), 327 slog.Int("updated", result.Updated), 328 slog.Int("errors", result.Errors), 329 ) 330 } 331 return err 332 }, 333 } 334 335 cmd.Flags().StringVar(&opts.Collection, "collection", "", "Reindex only documents in this collection") 336 cmd.Flags().StringVar(&opts.DID, "did", "", "Reindex only documents authored by this DID") 337 cmd.Flags().StringVar(&opts.DocumentID, "document", "", "Reindex a single document by stable ID") 338 cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "Show intended work without writing") 339 340 return cmd 341} 342 343func newEnrichCmd(local *bool) *cobra.Command { 344 var opts enrich.Options 345 346 cmd := &cobra.Command{ 347 Use: "enrich", 348 Short: "Backfill RepoName, AuthorHandle, and WebURL on existing documents", 349 RunE: func(cmd *cobra.Command, args []string) error { 350 cfg, err := config.Load(config.LoadOptions{Local: *local}) 351 if err != nil { 352 return fmt.Errorf("config: %w", err) 353 } 354 log := observability.NewLogger(cfg) 355 log.Info("starting enrich", slog.String("service", "enrich"), slog.String("version", version)) 356 357 db, err := store.Open(cfg.DatabaseURL) 358 if err != nil { 359 return fmt.Errorf("open database: %w", err) 360 } 361 defer db.Close() 362 363 xrpcClient := xrpc.NewClient( 364 xrpc.WithPLCDirectory(cfg.PLCDirectoryURL), 365 xrpc.WithIdentityService(cfg.IdentityServiceURL), 366 xrpc.WithTimeout(cfg.XRPCTimeout), 367 ) 368 369 ctx, cancel := baseContext() 370 defer cancel() 371 372 runner := enrich.New(store.New(cfg.DatabaseURL, db), xrpcClient, log) 373 result, err := runner.Run(ctx, opts) 374 if result != nil { 375 log.Info("enrich finished", 376 slog.Int("total", result.Total), 377 slog.Int("updated", result.Updated), 378 slog.Int("skipped", result.Skipped), 379 slog.Int("errors", result.Errors), 380 ) 381 } 382 return err 383 }, 384 } 385 386 cmd.Flags().StringVar(&opts.Collection, "collection", "", "Enrich only documents in this collection") 387 cmd.Flags().StringVar(&opts.DID, "did", "", "Enrich only documents authored by this DID") 388 cmd.Flags().StringVar(&opts.DocumentID, "document", "", "Enrich a single document by stable ID") 389 cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "Show intended work without writing") 390 cmd.Flags().IntVar(&opts.Concurrency, "concurrency", 5, "Parallel enrichment workers") 391 392 return cmd 393} 394 395func newHealthcheckCmd(local *bool) *cobra.Command { 396 return &cobra.Command{ 397 Use: "healthcheck", 398 Short: "One-shot health probe", 399 RunE: func(cmd *cobra.Command, args []string) error { 400 cfg, err := config.Load(config.LoadOptions{Local: *local}) 401 if err != nil { 402 return fmt.Errorf("config: %w", err) 403 } 404 log := observability.NewLogger(cfg) 405 log.Info("healthcheck: ok") 406 return nil 407 }, 408 } 409}