spindle: improve the logging situation #655

closed
opened by oppi.li targeting master from push-nsovyllpxlwk
Changed files
+45 -11
cmd
spindle
spindle
+1 -2
cmd/spindle/main.go
··· 7 8 tlog "tangled.org/core/log" 9 "tangled.org/core/spindle" 10 - _ "tangled.org/core/tid" 11 ) 12 13 func main() { 14 - logger := tlog.New("spindl3") 15 slog.SetDefault(logger) 16 17 ctx := context.Background()
··· 7 8 tlog "tangled.org/core/log" 9 "tangled.org/core/spindle" 10 ) 11 12 func main() { 13 + logger := tlog.New("spindle") 14 slog.SetDefault(logger) 15 16 ctx := context.Background()
+35
spindle/middleware.go
···
··· 1 + package spindle 2 + 3 + import ( 4 + "log/slog" 5 + "net/http" 6 + "time" 7 + ) 8 + 9 + func (s *Spindle) RequestLogger(next http.Handler) http.Handler { 10 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 11 + start := time.Now() 12 + 13 + next.ServeHTTP(w, r) 14 + 15 + // Build query params as slog.Attrs for the group 16 + queryParams := r.URL.Query() 17 + queryAttrs := make([]any, 0, len(queryParams)) 18 + for key, values := range queryParams { 19 + if len(values) == 1 { 20 + queryAttrs = append(queryAttrs, slog.String(key, values[0])) 21 + } else { 22 + queryAttrs = append(queryAttrs, slog.Any(key, values)) 23 + } 24 + } 25 + 26 + s.l.LogAttrs(r.Context(), slog.LevelInfo, "", 27 + slog.Group("request", 28 + slog.String("method", r.Method), 29 + slog.String("path", r.URL.Path), 30 + slog.Group("query", queryAttrs...), 31 + slog.Duration("duration", time.Since(start)), 32 + ), 33 + ) 34 + }) 35 + }
+6 -6
spindle/server.go
··· 108 tangled.RepoNSID, 109 tangled.RepoCollaboratorNSID, 110 } 111 - jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true) 112 if err != nil { 113 return fmt.Errorf("failed to setup jetstream client: %w", err) 114 } ··· 171 // spindle.processPipeline, which in turn enqueues the pipeline 172 // job in the above registered queue. 173 ccfg := eventconsumer.NewConsumerConfig() 174 - ccfg.Logger = logger 175 ccfg.Dev = cfg.Server.Dev 176 ccfg.ProcessFunc = spindle.processPipeline 177 ccfg.CursorStore = cursorStore ··· 210 } 211 212 func (s *Spindle) XrpcRouter() http.Handler { 213 - logger := s.l.With("route", "xrpc") 214 - 215 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 216 217 x := xrpc.Xrpc{ 218 - Logger: logger, 219 Db: s.db, 220 Enforcer: s.e, 221 Engines: s.engs, ··· 305 306 ok := s.jq.Enqueue(queue.Job{ 307 Run: func() error { 308 - engine.StartWorkflows(s.l, s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 309 RepoOwner: tpl.TriggerMetadata.Repo.Did, 310 RepoName: tpl.TriggerMetadata.Repo.Repo, 311 Workflows: workflows,
··· 108 tangled.RepoNSID, 109 tangled.RepoCollaboratorNSID, 110 } 111 + jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 112 if err != nil { 113 return fmt.Errorf("failed to setup jetstream client: %w", err) 114 } ··· 171 // spindle.processPipeline, which in turn enqueues the pipeline 172 // job in the above registered queue. 173 ccfg := eventconsumer.NewConsumerConfig() 174 + ccfg.Logger = log.SubLogger(logger, "eventconsumer") 175 ccfg.Dev = cfg.Server.Dev 176 ccfg.ProcessFunc = spindle.processPipeline 177 ccfg.CursorStore = cursorStore ··· 210 } 211 212 func (s *Spindle) XrpcRouter() http.Handler { 213 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 214 215 + l := log.SubLogger(s.l, "xrpc") 216 + 217 x := xrpc.Xrpc{ 218 + Logger: l, 219 Db: s.db, 220 Enforcer: s.e, 221 Engines: s.engs, ··· 305 306 ok := s.jq.Enqueue(queue.Job{ 307 Run: func() error { 308 + engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 309 RepoOwner: tpl.TriggerMetadata.Repo.Did, 310 RepoName: tpl.TriggerMetadata.Repo.Repo, 311 Workflows: workflows,
+3 -3
spindle/stream.go
··· 10 "strconv" 11 "time" 12 13 "tangled.org/core/spindle/models" 14 15 "github.com/go-chi/chi/v5" ··· 23 } 24 25 func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 26 - l := s.l.With("handler", "Events") 27 l.Debug("received new connection") 28 29 conn, err := upgrader.Upgrade(w, r, nil) ··· 82 } 83 case <-time.After(30 * time.Second): 84 // send a keep-alive 85 - l.Debug("sent keepalive") 86 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 87 l.Error("failed to write control", "err", err) 88 } ··· 222 s.l.Debug("err", "err", err) 223 return err 224 } 225 - s.l.Debug("ops", "ops", events) 226 227 for _, event := range events { 228 // first extract the inner json into a map
··· 10 "strconv" 11 "time" 12 13 + "tangled.org/core/log" 14 "tangled.org/core/spindle/models" 15 16 "github.com/go-chi/chi/v5" ··· 24 } 25 26 func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) { 27 + l := log.SubLogger(s.l, "eventstream") 28 + 29 l.Debug("received new connection") 30 31 conn, err := upgrader.Upgrade(w, r, nil) ··· 84 } 85 case <-time.After(30 * time.Second): 86 // send a keep-alive 87 if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil { 88 l.Error("failed to write control", "err", err) 89 } ··· 223 s.l.Debug("err", "err", err) 224 return err 225 } 226 227 for _, event := range events { 228 // first extract the inner json into a map