+1
-2
cmd/spindle/main.go
+1
-2
cmd/spindle/main.go
+35
spindle/middleware.go
+35
spindle/middleware.go
···
1
+
package spindle
2
+
3
+
import (
4
+
"log/slog"
5
+
"net/http"
6
+
"time"
7
+
)
8
+
9
+
func (s *Spindle) RequestLogger(next http.Handler) http.Handler {
10
+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
11
+
start := time.Now()
12
+
13
+
next.ServeHTTP(w, r)
14
+
15
+
// Build query params as slog.Attrs for the group
16
+
queryParams := r.URL.Query()
17
+
queryAttrs := make([]any, 0, len(queryParams))
18
+
for key, values := range queryParams {
19
+
if len(values) == 1 {
20
+
queryAttrs = append(queryAttrs, slog.String(key, values[0]))
21
+
} else {
22
+
queryAttrs = append(queryAttrs, slog.Any(key, values))
23
+
}
24
+
}
25
+
26
+
s.l.LogAttrs(r.Context(), slog.LevelInfo, "",
27
+
slog.Group("request",
28
+
slog.String("method", r.Method),
29
+
slog.String("path", r.URL.Path),
30
+
slog.Group("query", queryAttrs...),
31
+
slog.Duration("duration", time.Since(start)),
32
+
),
33
+
)
34
+
})
35
+
}
+6
-6
spindle/server.go
+6
-6
spindle/server.go
···
108
108
tangled.RepoNSID,
109
109
tangled.RepoCollaboratorNSID,
110
110
}
111
-
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
111
+
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
112
112
if err != nil {
113
113
return fmt.Errorf("failed to setup jetstream client: %w", err)
114
114
}
···
171
171
// spindle.processPipeline, which in turn enqueues the pipeline
172
172
// job in the above registered queue.
173
173
ccfg := eventconsumer.NewConsumerConfig()
174
-
ccfg.Logger = logger
174
+
ccfg.Logger = log.SubLogger(logger, "eventconsumer")
175
175
ccfg.Dev = cfg.Server.Dev
176
176
ccfg.ProcessFunc = spindle.processPipeline
177
177
ccfg.CursorStore = cursorStore
···
210
210
}
211
211
212
212
func (s *Spindle) XrpcRouter() http.Handler {
213
-
logger := s.l.With("route", "xrpc")
214
-
215
213
serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
216
214
215
+
l := log.SubLogger(s.l, "xrpc")
216
+
217
217
x := xrpc.Xrpc{
218
-
Logger: logger,
218
+
Logger: l,
219
219
Db: s.db,
220
220
Enforcer: s.e,
221
221
Engines: s.engs,
···
305
305
306
306
ok := s.jq.Enqueue(queue.Job{
307
307
Run: func() error {
308
-
engine.StartWorkflows(s.l, s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
308
+
engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
309
309
RepoOwner: tpl.TriggerMetadata.Repo.Did,
310
310
RepoName: tpl.TriggerMetadata.Repo.Repo,
311
311
Workflows: workflows,
+3
-3
spindle/stream.go
+3
-3
spindle/stream.go
···
10
10
"strconv"
11
11
"time"
12
12
13
+
"tangled.org/core/log"
13
14
"tangled.org/core/spindle/models"
14
15
15
16
"github.com/go-chi/chi/v5"
···
23
24
}
24
25
25
26
func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
26
-
l := s.l.With("handler", "Events")
27
+
l := log.SubLogger(s.l, "eventstream")
28
+
27
29
l.Debug("received new connection")
28
30
29
31
conn, err := upgrader.Upgrade(w, r, nil)
···
82
84
}
83
85
case <-time.After(30 * time.Second):
84
86
// send a keep-alive
85
-
l.Debug("sent keepalive")
86
87
if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
87
88
l.Error("failed to write control", "err", err)
88
89
}
···
222
223
s.l.Debug("err", "err", err)
223
224
return err
224
225
}
225
-
s.l.Debug("ops", "ops", events)
226
226
227
227
for _, event := range events {
228
228
// first extract the inner json into a map