1package cliutil
2
3import (
4 "encoding/json"
5 "errors"
6 "fmt"
7 "io"
8 "io/fs"
9 "log/slog"
10 "net/http"
11 "os"
12 "path/filepath"
13 "regexp"
14 "sort"
15 "strconv"
16 "strings"
17 "time"
18
19 "github.com/bluesky-social/indigo/did"
20 "github.com/bluesky-social/indigo/plc"
21 "github.com/bluesky-social/indigo/xrpc"
22 slogGorm "github.com/orandin/slog-gorm"
23 "github.com/urfave/cli/v2"
24 "gorm.io/driver/postgres"
25 "gorm.io/driver/sqlite"
26 "gorm.io/gorm"
27)
28
29func GetDidResolver(cctx *cli.Context) did.Resolver {
30 mr := did.NewMultiResolver()
31 mr.AddHandler("plc", &plc.PLCServer{
32 Host: cctx.String("plc"),
33 })
34 mr.AddHandler("web", &did.WebResolver{})
35
36 return mr
37}
38
39func GetPLCClient(cctx *cli.Context) *plc.PLCServer {
40 return &plc.PLCServer{
41 Host: cctx.String("plc"),
42 }
43}
44
45func NewHttpClient() *http.Client {
46 return &http.Client{
47 Transport: &http.Transport{
48 Proxy: http.ProxyFromEnvironment,
49 ForceAttemptHTTP2: true,
50 MaxIdleConns: 100,
51 IdleConnTimeout: 90 * time.Second,
52 TLSHandshakeTimeout: 10 * time.Second,
53 ExpectContinueTimeout: 1 * time.Second,
54 },
55 }
56}
57
58func GetXrpcClient(cctx *cli.Context, authreq bool) (*xrpc.Client, error) {
59 h := "http://localhost:4989"
60 if pdsurl := cctx.String("pds-host"); pdsurl != "" {
61 h = pdsurl
62 }
63
64 auth, err := loadAuthFromEnv(cctx, authreq)
65 if err != nil {
66 return nil, fmt.Errorf("loading auth: %w", err)
67 }
68
69 return &xrpc.Client{
70 Client: NewHttpClient(),
71 Host: h,
72 Auth: auth,
73 }, nil
74}
75
76func loadAuthFromEnv(cctx *cli.Context, req bool) (*xrpc.AuthInfo, error) {
77 if a := cctx.String("auth"); a != "" {
78 if ai, err := ReadAuth(a); err != nil && req {
79 return nil, err
80 } else {
81 return ai, nil
82 }
83 }
84
85 val := os.Getenv("ATP_AUTH_FILE")
86 if val == "" {
87 if req {
88 return nil, fmt.Errorf("no auth env present, ATP_AUTH_FILE not set")
89 }
90
91 return nil, nil
92 }
93
94 var auth xrpc.AuthInfo
95 if err := json.Unmarshal([]byte(val), &auth); err != nil {
96 return nil, err
97 }
98
99 return &auth, nil
100}
101
102func ReadAuth(fname string) (*xrpc.AuthInfo, error) {
103 b, err := os.ReadFile(fname)
104 if err != nil {
105 return nil, err
106 }
107 var auth xrpc.AuthInfo
108 if err := json.Unmarshal(b, &auth); err != nil {
109 return nil, err
110 }
111
112 return &auth, nil
113}
114
115// Supports both previous "dbtype=" prefixed DSNs, and URI-style database config strings, for both sqlite and postgresql.
116//
117// Examples:
118// - "sqlite=dir/file.sqlite"
119// - "sqlite://file.sqlite"
120// - "postgres=host=localhost user=postgres password=password dbname=pdsdb port=5432 sslmode=disable"
121// - "postgresql://postgres:password@localhost:5432/pdsdb?sslmode=disable"
122func SetupDatabase(dburl string, maxConnections int) (*gorm.DB, error) {
123 var dial gorm.Dialector
124 // NOTE(bnewbold): might also handle file:// as sqlite, but let's keep it
125 // explicit for now
126
127 isSqlite := false
128 openConns := maxConnections
129 if strings.HasPrefix(dburl, "sqlite://") {
130 sqliteSuffix := dburl[len("sqlite://"):]
131 // if this isn't ":memory:", ensure that directory exists (eg, if db
132 // file is being initialized)
133 if !strings.Contains(sqliteSuffix, ":?") {
134 os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm)
135 }
136 dial = sqlite.Open(sqliteSuffix)
137 openConns = 1
138 isSqlite = true
139 } else if strings.HasPrefix(dburl, "sqlite=") {
140 sqliteSuffix := dburl[len("sqlite="):]
141 // if this isn't ":memory:", ensure that directory exists (eg, if db
142 // file is being initialized)
143 if !strings.Contains(sqliteSuffix, ":?") {
144 os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm)
145 }
146 dial = sqlite.Open(sqliteSuffix)
147 openConns = 1
148 isSqlite = true
149 } else if strings.HasPrefix(dburl, "postgresql://") || strings.HasPrefix(dburl, "postgres://") {
150 // can pass entire URL, with prefix, to gorm driver
151 dial = postgres.Open(dburl)
152 } else if strings.HasPrefix(dburl, "postgres=") {
153 dsn := dburl[len("postgres="):]
154 dial = postgres.Open(dsn)
155 } else {
156 // TODO(bnewbold): this might print password?
157 return nil, fmt.Errorf("unsupported or unrecognized DATABASE_URL value: %s", dburl)
158 }
159
160 gormLogger := slogGorm.New()
161
162 db, err := gorm.Open(dial, &gorm.Config{
163 SkipDefaultTransaction: true,
164 TranslateError: true,
165 Logger: gormLogger,
166 })
167 if err != nil {
168 return nil, err
169 }
170
171 sqldb, err := db.DB()
172 if err != nil {
173 return nil, err
174 }
175
176 sqldb.SetMaxIdleConns(80)
177 sqldb.SetMaxOpenConns(openConns)
178 sqldb.SetConnMaxIdleTime(time.Hour)
179
180 if isSqlite {
181 // Set pragmas for sqlite
182 if err := db.Exec("PRAGMA journal_mode=WAL;").Error; err != nil {
183 return nil, err
184 }
185 if err := db.Exec("PRAGMA synchronous=normal;").Error; err != nil {
186 return nil, err
187 }
188 }
189
190 return db, nil
191}
192
193type LogOptions struct {
194 // e.g. 1_000_000_000
195 LogRotateBytes int64
196
197 // path to write to, if rotating, %T gets UnixMilli at file open time
198 // NOTE: substitution is simple replace("%T", "")
199 LogPath string
200
201 // text|json
202 LogFormat string
203
204 // info|debug|warn|error
205 LogLevel string
206
207 // Keep N old logs (not including current); <0 disables removal, 0==remove all old log files immediately
208 KeepOld int
209}
210
211func firstenv(env_var_names ...string) string {
212 for _, env_var_name := range env_var_names {
213 val := os.Getenv(env_var_name)
214 if val != "" {
215 return val
216 }
217 }
218 return ""
219}
220
221// SetupSlog integrates passed in options and env vars.
222//
223// passing default cliutil.LogOptions{} is ok.
224//
225// BSKYLOG_LOG_LEVEL=info|debug|warn|error
226//
227// BSKYLOG_LOG_FMT=text|json
228//
229// BSKYLOG_FILE=path (or "-" or "" for stdout), %T gets UnixMilli; if a path with '/', {prefix}/current becomes a link to active log file
230//
231// BSKYLOG_ROTATE_BYTES=int maximum size of log chunk before rotating
232//
233// BSKYLOG_ROTATE_KEEP=int keep N olg logs (not including current)
234//
235// The env vars were derived from ipfs logging library, and also respond to some GOLOG_ vars from that library,
236// but BSKYLOG_ variables are preferred because imported code still using the ipfs log library may misbehave
237// if some GOLOG values are set, especially GOLOG_FILE.
238func SetupSlog(options LogOptions) (*slog.Logger, io.Writer, error) {
239 fmt.Fprintf(os.Stderr, "SetupSlog\n")
240 var hopts slog.HandlerOptions
241 hopts.Level = slog.LevelInfo
242 hopts.AddSource = true
243 if options.LogLevel == "" {
244 options.LogLevel = firstenv("BSKYLOG_LOG_LEVEL", "GOLOG_LOG_LEVEL")
245 }
246 if options.LogLevel == "" {
247 hopts.Level = slog.LevelInfo
248 options.LogLevel = "info"
249 } else {
250 level := strings.ToLower(options.LogLevel)
251 switch level {
252 case "debug":
253 hopts.Level = slog.LevelDebug
254 case "info":
255 hopts.Level = slog.LevelInfo
256 case "warn":
257 hopts.Level = slog.LevelWarn
258 case "error":
259 hopts.Level = slog.LevelError
260 default:
261 return nil, nil, fmt.Errorf("unknown log level: %#v", options.LogLevel)
262 }
263 }
264 if options.LogFormat == "" {
265 options.LogFormat = firstenv("BSKYLOG_LOG_FMT", "GOLOG_LOG_FMT")
266 }
267 if options.LogFormat == "" {
268 options.LogFormat = "text"
269 } else {
270 format := strings.ToLower(options.LogFormat)
271 if format == "json" || format == "text" {
272 // ok
273 } else {
274 return nil, nil, fmt.Errorf("invalid log format: %#v", options.LogFormat)
275 }
276 options.LogFormat = format
277 }
278
279 if options.LogPath == "" {
280 options.LogPath = firstenv("BSKYLOG_FILE", "GOLOG_FILE")
281 }
282 if options.LogRotateBytes == 0 {
283 rotateBytesStr := os.Getenv("BSKYLOG_ROTATE_BYTES") // no GOLOG equivalent
284 if rotateBytesStr != "" {
285 rotateBytes, err := strconv.ParseInt(rotateBytesStr, 10, 64)
286 if err != nil {
287 return nil, nil, fmt.Errorf("invalid BSKYLOG_ROTATE_BYTES value: %w", err)
288 }
289 options.LogRotateBytes = rotateBytes
290 }
291 }
292 if options.KeepOld == 0 {
293 keepOldUnset := true
294 keepOldStr := os.Getenv("BSKYLOG_ROTATE_KEEP") // no GOLOG equivalent
295 if keepOldStr != "" {
296 keepOld, err := strconv.ParseInt(keepOldStr, 10, 64)
297 if err != nil {
298 return nil, nil, fmt.Errorf("invalid BSKYLOG_ROTATE_KEEP value: %w", err)
299 }
300 keepOldUnset = false
301 options.KeepOld = int(keepOld)
302 }
303 if keepOldUnset {
304 options.KeepOld = 2
305 }
306 }
307 logaround := make(chan string, 100)
308 go logbouncer(logaround)
309 var out io.Writer
310 if (options.LogPath == "") || (options.LogPath == "-") {
311 out = os.Stdout
312 } else if options.LogRotateBytes != 0 {
313 out = &logRotateWriter{
314 rotateBytes: options.LogRotateBytes,
315 outPathTemplate: options.LogPath,
316 keep: options.KeepOld,
317 logaround: logaround,
318 }
319 } else {
320 var err error
321 out, err = os.Create(options.LogPath)
322 if err != nil {
323 return nil, nil, fmt.Errorf("%s: %w", options.LogPath, err)
324 }
325 fmt.Fprintf(os.Stderr, "SetupSlog create %#v\n", options.LogPath)
326 }
327 var handler slog.Handler
328 switch options.LogFormat {
329 case "text":
330 handler = slog.NewTextHandler(out, &hopts)
331 case "json":
332 handler = slog.NewJSONHandler(out, &hopts)
333 default:
334 return nil, nil, fmt.Errorf("unknown log format: %#v", options.LogFormat)
335 }
336 logger := slog.New(handler)
337 slog.SetDefault(logger)
338 templateDirPart, _ := filepath.Split(options.LogPath)
339 ents, _ := os.ReadDir(templateDirPart)
340 for _, ent := range ents {
341 fmt.Fprintf(os.Stdout, "%s\n", filepath.Join(templateDirPart, ent.Name()))
342 }
343 SetIpfsWriter(out, options.LogFormat, options.LogLevel)
344 return logger, out, nil
345}
346
347type logRotateWriter struct {
348 currentWriter io.WriteCloser
349
350 // how much has been written to current log file
351 currentBytes int64
352
353 // e.g. path/to/logs/foo%T
354 currentPath string
355
356 // e.g. path/to/logs/current
357 currentPathCurrent string
358
359 rotateBytes int64
360
361 outPathTemplate string
362
363 // keep the most recent N log files (not including current)
364 keep int
365
366 // write strings to this from inside the log system, a task outside the log system hands them to slog.Info()
367 logaround chan<- string
368}
369
370func logbouncer(out <-chan string) {
371 var logger *slog.Logger
372 for line := range out {
373 fmt.Fprintf(os.Stderr, "ll %s\n", line)
374 if logger == nil {
375 // lazy to make sure it crops up after slog Default has been set
376 logger = slog.Default().With("system", "logging")
377 }
378 logger.Info(line)
379 }
380}
381
382var currentMatcher = regexp.MustCompile("current_\\d+")
383
384func (w *logRotateWriter) cleanOldLogs() {
385 if w.keep < 0 {
386 // old log removal is disabled
387 return
388 }
389 // w.currentPath was recently set as the new log
390 dirpart, _ := filepath.Split(w.currentPath)
391 // find old logs
392 templateDirPart, templateNamePart := filepath.Split(w.outPathTemplate)
393 if dirpart != templateDirPart {
394 w.logaround <- fmt.Sprintf("current dir part %#v != template dir part %#v\n", w.currentPath, w.outPathTemplate)
395 return
396 }
397 // build a regexp that is string literal parts with \d+ replacing the UnixMilli part
398 templateNameParts := strings.Split(templateNamePart, "%T")
399 var sb strings.Builder
400 first := true
401 for _, part := range templateNameParts {
402 if first {
403 first = false
404 } else {
405 sb.WriteString("\\d+")
406 }
407 sb.WriteString(regexp.QuoteMeta(part))
408 }
409 tmre, err := regexp.Compile(sb.String())
410 if err != nil {
411 w.logaround <- fmt.Sprintf("failed to compile old log template regexp: %#v\n", err)
412 return
413 }
414 dir, err := os.ReadDir(dirpart)
415 if err != nil {
416 w.logaround <- fmt.Sprintf("failed to read old log template dir: %#v\n", err)
417 return
418 }
419 var found []fs.FileInfo
420 for _, ent := range dir {
421 name := ent.Name()
422 if tmre.MatchString(name) || currentMatcher.MatchString(name) {
423 fi, err := ent.Info()
424 if err != nil {
425 continue
426 }
427 found = append(found, fi)
428 }
429 }
430 if len(found) <= w.keep {
431 // not too many, nothing to do
432 return
433 }
434 foundMtimeLess := func(i, j int) bool {
435 return found[i].ModTime().Before(found[j].ModTime())
436 }
437 sort.Slice(found, foundMtimeLess)
438 drops := found[:len(found)-w.keep]
439 for _, fi := range drops {
440 fullpath := filepath.Join(dirpart, fi.Name())
441 err = os.Remove(fullpath)
442 if err != nil {
443 w.logaround <- fmt.Sprintf("failed to rm old log: %#v\n", err)
444 // but keep going
445 }
446 // maybe it would be safe to debug-log old log removal from within the logging infrastructure?
447 }
448}
449
450func (w *logRotateWriter) closeOldLog() []error {
451 if w.currentWriter == nil {
452 return nil
453 }
454 var earlyWeakErrors []error
455 err := w.currentWriter.Close()
456 if err != nil {
457 earlyWeakErrors = append(earlyWeakErrors, err)
458 }
459 w.currentWriter = nil
460 w.currentBytes = 0
461 w.currentPath = ""
462 if w.currentPathCurrent != "" {
463 err = os.Remove(w.currentPathCurrent) // not really an error until something else goes wrong
464 if err != nil {
465 earlyWeakErrors = append(earlyWeakErrors, err)
466 }
467 w.currentPathCurrent = ""
468 }
469 return earlyWeakErrors
470}
471
472func (w *logRotateWriter) openNewLog(earlyWeakErrors []error) (badErr error, weakErrors []error) {
473 nowMillis := time.Now().UnixMilli()
474 nows := strconv.FormatInt(nowMillis, 10)
475 w.currentPath = strings.Replace(w.outPathTemplate, "%T", nows, -1)
476 var err error
477 w.currentWriter, err = os.Create(w.currentPath)
478 if err != nil {
479 earlyWeakErrors = append(earlyWeakErrors, err)
480 return errors.Join(earlyWeakErrors...), nil
481 }
482 w.logaround <- fmt.Sprintf("new log file %#v", w.currentPath)
483 w.cleanOldLogs()
484 dirpart, _ := filepath.Split(w.currentPath)
485 if dirpart != "" {
486 w.currentPathCurrent = filepath.Join(dirpart, "current")
487 fi, err := os.Stat(w.currentPathCurrent)
488 if err == nil && fi.Mode().IsRegular() {
489 // move aside unknown "current" from a previous run
490 // see also currentMatcher regexp current_\d+
491 err = os.Rename(w.currentPathCurrent, w.currentPathCurrent+"_"+nows)
492 if err != nil {
493 // not crucial if we can't move aside "current"
494 // TODO: log warning ... but not from inside log writer?
495 earlyWeakErrors = append(earlyWeakErrors, err)
496 }
497 }
498 err = os.Link(w.currentPath, w.currentPathCurrent)
499 if err != nil {
500 // not crucial if we can't make "current" link
501 // TODO: log warning ... but not from inside log writer?
502 earlyWeakErrors = append(earlyWeakErrors, err)
503 }
504 }
505 return nil, earlyWeakErrors
506}
507
508func (w *logRotateWriter) Write(p []byte) (n int, err error) {
509 var earlyWeakErrors []error
510 if int64(len(p))+w.currentBytes > w.rotateBytes {
511 // next write would be over the limit
512 earlyWeakErrors = w.closeOldLog()
513 }
514 if w.currentWriter == nil {
515 // start new log file
516 var err error
517 err, earlyWeakErrors = w.openNewLog(earlyWeakErrors)
518 if err != nil {
519 return 0, err
520 }
521 }
522 var wrote int
523 wrote, err = w.currentWriter.Write(p)
524 w.currentBytes += int64(wrote)
525 if err != nil {
526 earlyWeakErrors = append(earlyWeakErrors, err)
527 return wrote, errors.Join(earlyWeakErrors...)
528 }
529 if earlyWeakErrors != nil {
530 w.logaround <- fmt.Sprintf("ok, but: %s", errors.Join(earlyWeakErrors...).Error())
531 }
532 return wrote, nil
533}