fork of indigo with slightly nicer lexgen
at main 15 kB view raw
1package cliutil 2 3import ( 4 "encoding/json" 5 "errors" 6 "fmt" 7 "io" 8 "io/fs" 9 "log/slog" 10 "net/http" 11 "os" 12 "path/filepath" 13 "regexp" 14 "sort" 15 "strconv" 16 "strings" 17 "time" 18 19 "github.com/bluesky-social/indigo/did" 20 "github.com/bluesky-social/indigo/plc" 21 "github.com/bluesky-social/indigo/xrpc" 22 slogGorm "github.com/orandin/slog-gorm" 23 "github.com/urfave/cli/v2" 24 "gorm.io/driver/postgres" 25 "gorm.io/driver/sqlite" 26 "gorm.io/gorm" 27) 28 29func GetDidResolver(cctx *cli.Context) did.Resolver { 30 mr := did.NewMultiResolver() 31 mr.AddHandler("plc", &plc.PLCServer{ 32 Host: cctx.String("plc"), 33 }) 34 mr.AddHandler("web", &did.WebResolver{}) 35 36 return mr 37} 38 39func GetPLCClient(cctx *cli.Context) *plc.PLCServer { 40 return &plc.PLCServer{ 41 Host: cctx.String("plc"), 42 } 43} 44 45func NewHttpClient() *http.Client { 46 return &http.Client{ 47 Transport: &http.Transport{ 48 Proxy: http.ProxyFromEnvironment, 49 ForceAttemptHTTP2: true, 50 MaxIdleConns: 100, 51 IdleConnTimeout: 90 * time.Second, 52 TLSHandshakeTimeout: 10 * time.Second, 53 ExpectContinueTimeout: 1 * time.Second, 54 }, 55 } 56} 57 58func GetXrpcClient(cctx *cli.Context, authreq bool) (*xrpc.Client, error) { 59 h := "http://localhost:4989" 60 if pdsurl := cctx.String("pds-host"); pdsurl != "" { 61 h = pdsurl 62 } 63 64 auth, err := loadAuthFromEnv(cctx, authreq) 65 if err != nil { 66 return nil, fmt.Errorf("loading auth: %w", err) 67 } 68 69 return &xrpc.Client{ 70 Client: NewHttpClient(), 71 Host: h, 72 Auth: auth, 73 }, nil 74} 75 76func loadAuthFromEnv(cctx *cli.Context, req bool) (*xrpc.AuthInfo, error) { 77 if a := cctx.String("auth"); a != "" { 78 if ai, err := ReadAuth(a); err != nil && req { 79 return nil, err 80 } else { 81 return ai, nil 82 } 83 } 84 85 val := os.Getenv("ATP_AUTH_FILE") 86 if val == "" { 87 if req { 88 return nil, fmt.Errorf("no auth env present, ATP_AUTH_FILE not set") 89 } 90 91 return nil, nil 92 } 93 94 var auth xrpc.AuthInfo 95 if err := json.Unmarshal([]byte(val), &auth); err != nil { 96 return nil, err 97 } 98 99 return &auth, nil 100} 101 102func ReadAuth(fname string) (*xrpc.AuthInfo, error) { 103 b, err := os.ReadFile(fname) 104 if err != nil { 105 return nil, err 106 } 107 var auth xrpc.AuthInfo 108 if err := json.Unmarshal(b, &auth); err != nil { 109 return nil, err 110 } 111 112 return &auth, nil 113} 114 115// Supports both previous "dbtype=" prefixed DSNs, and URI-style database config strings, for both sqlite and postgresql. 116// 117// Examples: 118// - "sqlite=dir/file.sqlite" 119// - "sqlite://file.sqlite" 120// - "postgres=host=localhost user=postgres password=password dbname=pdsdb port=5432 sslmode=disable" 121// - "postgresql://postgres:password@localhost:5432/pdsdb?sslmode=disable" 122func SetupDatabase(dburl string, maxConnections int) (*gorm.DB, error) { 123 var dial gorm.Dialector 124 // NOTE(bnewbold): might also handle file:// as sqlite, but let's keep it 125 // explicit for now 126 127 isSqlite := false 128 openConns := maxConnections 129 if strings.HasPrefix(dburl, "sqlite://") { 130 sqliteSuffix := dburl[len("sqlite://"):] 131 // if this isn't ":memory:", ensure that directory exists (eg, if db 132 // file is being initialized) 133 if !strings.Contains(sqliteSuffix, ":?") { 134 os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm) 135 } 136 dial = sqlite.Open(sqliteSuffix) 137 openConns = 1 138 isSqlite = true 139 } else if strings.HasPrefix(dburl, "sqlite=") { 140 sqliteSuffix := dburl[len("sqlite="):] 141 // if this isn't ":memory:", ensure that directory exists (eg, if db 142 // file is being initialized) 143 if !strings.Contains(sqliteSuffix, ":?") { 144 os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm) 145 } 146 dial = sqlite.Open(sqliteSuffix) 147 openConns = 1 148 isSqlite = true 149 } else if strings.HasPrefix(dburl, "postgresql://") || strings.HasPrefix(dburl, "postgres://") { 150 // can pass entire URL, with prefix, to gorm driver 151 dial = postgres.Open(dburl) 152 } else if strings.HasPrefix(dburl, "postgres=") { 153 dsn := dburl[len("postgres="):] 154 dial = postgres.Open(dsn) 155 } else { 156 // TODO(bnewbold): this might print password? 157 return nil, fmt.Errorf("unsupported or unrecognized DATABASE_URL value: %s", dburl) 158 } 159 160 gormLogger := slogGorm.New() 161 162 db, err := gorm.Open(dial, &gorm.Config{ 163 SkipDefaultTransaction: true, 164 TranslateError: true, 165 Logger: gormLogger, 166 }) 167 if err != nil { 168 return nil, err 169 } 170 171 sqldb, err := db.DB() 172 if err != nil { 173 return nil, err 174 } 175 176 sqldb.SetMaxIdleConns(80) 177 sqldb.SetMaxOpenConns(openConns) 178 sqldb.SetConnMaxIdleTime(time.Hour) 179 180 if isSqlite { 181 // Set pragmas for sqlite 182 if err := db.Exec("PRAGMA journal_mode=WAL;").Error; err != nil { 183 return nil, err 184 } 185 if err := db.Exec("PRAGMA synchronous=normal;").Error; err != nil { 186 return nil, err 187 } 188 } 189 190 return db, nil 191} 192 193type LogOptions struct { 194 // e.g. 1_000_000_000 195 LogRotateBytes int64 196 197 // path to write to, if rotating, %T gets UnixMilli at file open time 198 // NOTE: substitution is simple replace("%T", "") 199 LogPath string 200 201 // text|json 202 LogFormat string 203 204 // info|debug|warn|error 205 LogLevel string 206 207 // Keep N old logs (not including current); <0 disables removal, 0==remove all old log files immediately 208 KeepOld int 209} 210 211func firstenv(env_var_names ...string) string { 212 for _, env_var_name := range env_var_names { 213 val := os.Getenv(env_var_name) 214 if val != "" { 215 return val 216 } 217 } 218 return "" 219} 220 221// SetupSlog integrates passed in options and env vars. 222// 223// passing default cliutil.LogOptions{} is ok. 224// 225// BSKYLOG_LOG_LEVEL=info|debug|warn|error 226// 227// BSKYLOG_LOG_FMT=text|json 228// 229// BSKYLOG_FILE=path (or "-" or "" for stdout), %T gets UnixMilli; if a path with '/', {prefix}/current becomes a link to active log file 230// 231// BSKYLOG_ROTATE_BYTES=int maximum size of log chunk before rotating 232// 233// BSKYLOG_ROTATE_KEEP=int keep N olg logs (not including current) 234// 235// The env vars were derived from ipfs logging library, and also respond to some GOLOG_ vars from that library, 236// but BSKYLOG_ variables are preferred because imported code still using the ipfs log library may misbehave 237// if some GOLOG values are set, especially GOLOG_FILE. 238func SetupSlog(options LogOptions) (*slog.Logger, io.Writer, error) { 239 fmt.Fprintf(os.Stderr, "SetupSlog\n") 240 var hopts slog.HandlerOptions 241 hopts.Level = slog.LevelInfo 242 hopts.AddSource = true 243 if options.LogLevel == "" { 244 options.LogLevel = firstenv("BSKYLOG_LOG_LEVEL", "GOLOG_LOG_LEVEL") 245 } 246 if options.LogLevel == "" { 247 hopts.Level = slog.LevelInfo 248 options.LogLevel = "info" 249 } else { 250 level := strings.ToLower(options.LogLevel) 251 switch level { 252 case "debug": 253 hopts.Level = slog.LevelDebug 254 case "info": 255 hopts.Level = slog.LevelInfo 256 case "warn": 257 hopts.Level = slog.LevelWarn 258 case "error": 259 hopts.Level = slog.LevelError 260 default: 261 return nil, nil, fmt.Errorf("unknown log level: %#v", options.LogLevel) 262 } 263 } 264 if options.LogFormat == "" { 265 options.LogFormat = firstenv("BSKYLOG_LOG_FMT", "GOLOG_LOG_FMT") 266 } 267 if options.LogFormat == "" { 268 options.LogFormat = "text" 269 } else { 270 format := strings.ToLower(options.LogFormat) 271 if format == "json" || format == "text" { 272 // ok 273 } else { 274 return nil, nil, fmt.Errorf("invalid log format: %#v", options.LogFormat) 275 } 276 options.LogFormat = format 277 } 278 279 if options.LogPath == "" { 280 options.LogPath = firstenv("BSKYLOG_FILE", "GOLOG_FILE") 281 } 282 if options.LogRotateBytes == 0 { 283 rotateBytesStr := os.Getenv("BSKYLOG_ROTATE_BYTES") // no GOLOG equivalent 284 if rotateBytesStr != "" { 285 rotateBytes, err := strconv.ParseInt(rotateBytesStr, 10, 64) 286 if err != nil { 287 return nil, nil, fmt.Errorf("invalid BSKYLOG_ROTATE_BYTES value: %w", err) 288 } 289 options.LogRotateBytes = rotateBytes 290 } 291 } 292 if options.KeepOld == 0 { 293 keepOldUnset := true 294 keepOldStr := os.Getenv("BSKYLOG_ROTATE_KEEP") // no GOLOG equivalent 295 if keepOldStr != "" { 296 keepOld, err := strconv.ParseInt(keepOldStr, 10, 64) 297 if err != nil { 298 return nil, nil, fmt.Errorf("invalid BSKYLOG_ROTATE_KEEP value: %w", err) 299 } 300 keepOldUnset = false 301 options.KeepOld = int(keepOld) 302 } 303 if keepOldUnset { 304 options.KeepOld = 2 305 } 306 } 307 logaround := make(chan string, 100) 308 go logbouncer(logaround) 309 var out io.Writer 310 if (options.LogPath == "") || (options.LogPath == "-") { 311 out = os.Stdout 312 } else if options.LogRotateBytes != 0 { 313 out = &logRotateWriter{ 314 rotateBytes: options.LogRotateBytes, 315 outPathTemplate: options.LogPath, 316 keep: options.KeepOld, 317 logaround: logaround, 318 } 319 } else { 320 var err error 321 out, err = os.Create(options.LogPath) 322 if err != nil { 323 return nil, nil, fmt.Errorf("%s: %w", options.LogPath, err) 324 } 325 fmt.Fprintf(os.Stderr, "SetupSlog create %#v\n", options.LogPath) 326 } 327 var handler slog.Handler 328 switch options.LogFormat { 329 case "text": 330 handler = slog.NewTextHandler(out, &hopts) 331 case "json": 332 handler = slog.NewJSONHandler(out, &hopts) 333 default: 334 return nil, nil, fmt.Errorf("unknown log format: %#v", options.LogFormat) 335 } 336 logger := slog.New(handler) 337 slog.SetDefault(logger) 338 templateDirPart, _ := filepath.Split(options.LogPath) 339 ents, _ := os.ReadDir(templateDirPart) 340 for _, ent := range ents { 341 fmt.Fprintf(os.Stdout, "%s\n", filepath.Join(templateDirPart, ent.Name())) 342 } 343 SetIpfsWriter(out, options.LogFormat, options.LogLevel) 344 return logger, out, nil 345} 346 347type logRotateWriter struct { 348 currentWriter io.WriteCloser 349 350 // how much has been written to current log file 351 currentBytes int64 352 353 // e.g. path/to/logs/foo%T 354 currentPath string 355 356 // e.g. path/to/logs/current 357 currentPathCurrent string 358 359 rotateBytes int64 360 361 outPathTemplate string 362 363 // keep the most recent N log files (not including current) 364 keep int 365 366 // write strings to this from inside the log system, a task outside the log system hands them to slog.Info() 367 logaround chan<- string 368} 369 370func logbouncer(out <-chan string) { 371 var logger *slog.Logger 372 for line := range out { 373 fmt.Fprintf(os.Stderr, "ll %s\n", line) 374 if logger == nil { 375 // lazy to make sure it crops up after slog Default has been set 376 logger = slog.Default().With("system", "logging") 377 } 378 logger.Info(line) 379 } 380} 381 382var currentMatcher = regexp.MustCompile("current_\\d+") 383 384func (w *logRotateWriter) cleanOldLogs() { 385 if w.keep < 0 { 386 // old log removal is disabled 387 return 388 } 389 // w.currentPath was recently set as the new log 390 dirpart, _ := filepath.Split(w.currentPath) 391 // find old logs 392 templateDirPart, templateNamePart := filepath.Split(w.outPathTemplate) 393 if dirpart != templateDirPart { 394 w.logaround <- fmt.Sprintf("current dir part %#v != template dir part %#v\n", w.currentPath, w.outPathTemplate) 395 return 396 } 397 // build a regexp that is string literal parts with \d+ replacing the UnixMilli part 398 templateNameParts := strings.Split(templateNamePart, "%T") 399 var sb strings.Builder 400 first := true 401 for _, part := range templateNameParts { 402 if first { 403 first = false 404 } else { 405 sb.WriteString("\\d+") 406 } 407 sb.WriteString(regexp.QuoteMeta(part)) 408 } 409 tmre, err := regexp.Compile(sb.String()) 410 if err != nil { 411 w.logaround <- fmt.Sprintf("failed to compile old log template regexp: %#v\n", err) 412 return 413 } 414 dir, err := os.ReadDir(dirpart) 415 if err != nil { 416 w.logaround <- fmt.Sprintf("failed to read old log template dir: %#v\n", err) 417 return 418 } 419 var found []fs.FileInfo 420 for _, ent := range dir { 421 name := ent.Name() 422 if tmre.MatchString(name) || currentMatcher.MatchString(name) { 423 fi, err := ent.Info() 424 if err != nil { 425 continue 426 } 427 found = append(found, fi) 428 } 429 } 430 if len(found) <= w.keep { 431 // not too many, nothing to do 432 return 433 } 434 foundMtimeLess := func(i, j int) bool { 435 return found[i].ModTime().Before(found[j].ModTime()) 436 } 437 sort.Slice(found, foundMtimeLess) 438 drops := found[:len(found)-w.keep] 439 for _, fi := range drops { 440 fullpath := filepath.Join(dirpart, fi.Name()) 441 err = os.Remove(fullpath) 442 if err != nil { 443 w.logaround <- fmt.Sprintf("failed to rm old log: %#v\n", err) 444 // but keep going 445 } 446 // maybe it would be safe to debug-log old log removal from within the logging infrastructure? 447 } 448} 449 450func (w *logRotateWriter) closeOldLog() []error { 451 if w.currentWriter == nil { 452 return nil 453 } 454 var earlyWeakErrors []error 455 err := w.currentWriter.Close() 456 if err != nil { 457 earlyWeakErrors = append(earlyWeakErrors, err) 458 } 459 w.currentWriter = nil 460 w.currentBytes = 0 461 w.currentPath = "" 462 if w.currentPathCurrent != "" { 463 err = os.Remove(w.currentPathCurrent) // not really an error until something else goes wrong 464 if err != nil { 465 earlyWeakErrors = append(earlyWeakErrors, err) 466 } 467 w.currentPathCurrent = "" 468 } 469 return earlyWeakErrors 470} 471 472func (w *logRotateWriter) openNewLog(earlyWeakErrors []error) (badErr error, weakErrors []error) { 473 nowMillis := time.Now().UnixMilli() 474 nows := strconv.FormatInt(nowMillis, 10) 475 w.currentPath = strings.Replace(w.outPathTemplate, "%T", nows, -1) 476 var err error 477 w.currentWriter, err = os.Create(w.currentPath) 478 if err != nil { 479 earlyWeakErrors = append(earlyWeakErrors, err) 480 return errors.Join(earlyWeakErrors...), nil 481 } 482 w.logaround <- fmt.Sprintf("new log file %#v", w.currentPath) 483 w.cleanOldLogs() 484 dirpart, _ := filepath.Split(w.currentPath) 485 if dirpart != "" { 486 w.currentPathCurrent = filepath.Join(dirpart, "current") 487 fi, err := os.Stat(w.currentPathCurrent) 488 if err == nil && fi.Mode().IsRegular() { 489 // move aside unknown "current" from a previous run 490 // see also currentMatcher regexp current_\d+ 491 err = os.Rename(w.currentPathCurrent, w.currentPathCurrent+"_"+nows) 492 if err != nil { 493 // not crucial if we can't move aside "current" 494 // TODO: log warning ... but not from inside log writer? 495 earlyWeakErrors = append(earlyWeakErrors, err) 496 } 497 } 498 err = os.Link(w.currentPath, w.currentPathCurrent) 499 if err != nil { 500 // not crucial if we can't make "current" link 501 // TODO: log warning ... but not from inside log writer? 502 earlyWeakErrors = append(earlyWeakErrors, err) 503 } 504 } 505 return nil, earlyWeakErrors 506} 507 508func (w *logRotateWriter) Write(p []byte) (n int, err error) { 509 var earlyWeakErrors []error 510 if int64(len(p))+w.currentBytes > w.rotateBytes { 511 // next write would be over the limit 512 earlyWeakErrors = w.closeOldLog() 513 } 514 if w.currentWriter == nil { 515 // start new log file 516 var err error 517 err, earlyWeakErrors = w.openNewLog(earlyWeakErrors) 518 if err != nil { 519 return 0, err 520 } 521 } 522 var wrote int 523 wrote, err = w.currentWriter.Write(p) 524 w.currentBytes += int64(wrote) 525 if err != nil { 526 earlyWeakErrors = append(earlyWeakErrors, err) 527 return wrote, errors.Join(earlyWeakErrors...) 528 } 529 if earlyWeakErrors != nil { 530 w.logaround <- fmt.Sprintf("ok, but: %s", errors.Join(earlyWeakErrors...).Error()) 531 } 532 return wrote, nil 533}