An atproto PDS written in Go
1package server 2 3import ( 4 "bytes" 5 "context" 6 "crypto/ecdsa" 7 "embed" 8 "errors" 9 "fmt" 10 "io" 11 "log/slog" 12 "net/http" 13 "net/smtp" 14 "os" 15 "path/filepath" 16 "sync" 17 "text/template" 18 "time" 19 20 "github.com/aws/aws-sdk-go/aws" 21 "github.com/aws/aws-sdk-go/aws/credentials" 22 "github.com/aws/aws-sdk-go/aws/session" 23 "github.com/aws/aws-sdk-go/service/s3" 24 "github.com/bluesky-social/indigo/api/atproto" 25 "github.com/bluesky-social/indigo/atproto/syntax" 26 "github.com/bluesky-social/indigo/events" 27 "github.com/bluesky-social/indigo/util" 28 "github.com/bluesky-social/indigo/xrpc" 29 "github.com/domodwyer/mailyak/v3" 30 "github.com/go-playground/validator" 31 "github.com/gorilla/sessions" 32 "github.com/haileyok/cocoon/identity" 33 "github.com/haileyok/cocoon/internal/db" 34 "github.com/haileyok/cocoon/internal/helpers" 35 "github.com/haileyok/cocoon/models" 36 "github.com/haileyok/cocoon/oauth/client" 37 "github.com/haileyok/cocoon/oauth/constants" 38 "github.com/haileyok/cocoon/oauth/dpop" 39 "github.com/haileyok/cocoon/oauth/provider" 40 "github.com/haileyok/cocoon/plc" 41 "github.com/ipfs/go-cid" 42 "github.com/labstack/echo-contrib/echoprometheus" 43 echo_session "github.com/labstack/echo-contrib/session" 44 "github.com/labstack/echo/v4" 45 "github.com/labstack/echo/v4/middleware" 46 slogecho "github.com/samber/slog-echo" 47 "gorm.io/driver/postgres" 48 "gorm.io/driver/sqlite" 49 "gorm.io/gorm" 50) 51 52const ( 53 AccountSessionMaxAge = 30 * 24 * time.Hour // one week 54) 55 56type S3Config struct { 57 BackupsEnabled bool 58 BlobstoreEnabled bool 59 Endpoint string 60 Region string 61 Bucket string 62 AccessKey string 63 SecretKey string 64 CDNUrl string 65} 66 67type Server struct { 68 http *http.Client 69 httpd *http.Server 70 mail *mailyak.MailYak 71 mailLk *sync.Mutex 72 echo *echo.Echo 73 db *db.DB 74 plcClient *plc.Client 75 logger *slog.Logger 76 config *config 77 privateKey *ecdsa.PrivateKey 78 repoman *RepoMan 79 oauthProvider *provider.Provider 80 evtman *events.EventManager 81 passport *identity.Passport 82 fallbackProxy string 83 84 lastRequestCrawl time.Time 85 requestCrawlMu sync.Mutex 86 87 dbName string 88 dbType string 89 s3Config *S3Config 90} 91 92type Args struct { 93 Logger *slog.Logger 94 95 Addr string 96 DbName string 97 DbType string 98 DatabaseURL string 99 Version string 100 Did string 101 Hostname string 102 RotationKeyPath string 103 JwkPath string 104 ContactEmail string 105 Relays []string 106 AdminPassword string 107 RequireInvite bool 108 109 SmtpUser string 110 SmtpPass string 111 SmtpHost string 112 SmtpPort string 113 SmtpEmail string 114 SmtpName string 115 116 S3Config *S3Config 117 118 SessionSecret string 119 120 BlockstoreVariant BlockstoreVariant 121 FallbackProxy string 122} 123 124type config struct { 125 Version string 126 Did string 127 Hostname string 128 ContactEmail string 129 EnforcePeering bool 130 Relays []string 131 AdminPassword string 132 RequireInvite bool 133 SmtpEmail string 134 SmtpName string 135 BlockstoreVariant BlockstoreVariant 136 FallbackProxy string 137} 138 139type CustomValidator struct { 140 validator *validator.Validate 141} 142 143type ValidationError struct { 144 error 145 Field string 146 Tag string 147} 148 149func (cv *CustomValidator) Validate(i any) error { 150 if err := cv.validator.Struct(i); err != nil { 151 var validateErrors validator.ValidationErrors 152 if errors.As(err, &validateErrors) && len(validateErrors) > 0 { 153 first := validateErrors[0] 154 return ValidationError{ 155 error: err, 156 Field: first.Field(), 157 Tag: first.Tag(), 158 } 159 } 160 161 return err 162 } 163 164 return nil 165} 166 167//go:embed templates/* 168var templateFS embed.FS 169 170//go:embed static/* 171var staticFS embed.FS 172 173type TemplateRenderer struct { 174 templates *template.Template 175 isDev bool 176 templatePath string 177} 178 179func (s *Server) loadTemplates() { 180 absPath, _ := filepath.Abs("server/templates/*.html") 181 if s.config.Version == "dev" { 182 tmpl := template.Must(template.ParseGlob(absPath)) 183 s.echo.Renderer = &TemplateRenderer{ 184 templates: tmpl, 185 isDev: true, 186 templatePath: absPath, 187 } 188 } else { 189 tmpl := template.Must(template.ParseFS(templateFS, "templates/*.html")) 190 s.echo.Renderer = &TemplateRenderer{ 191 templates: tmpl, 192 isDev: false, 193 } 194 } 195} 196 197func (t *TemplateRenderer) Render(w io.Writer, name string, data any, c echo.Context) error { 198 if t.isDev { 199 tmpl, err := template.ParseGlob(t.templatePath) 200 if err != nil { 201 return err 202 } 203 t.templates = tmpl 204 } 205 206 if viewContext, isMap := data.(map[string]any); isMap { 207 viewContext["reverse"] = c.Echo().Reverse 208 } 209 210 return t.templates.ExecuteTemplate(w, name, data) 211} 212 213func New(args *Args) (*Server, error) { 214 if args.Logger == nil { 215 args.Logger = slog.Default() 216 } 217 218 logger := args.Logger.With("name", "New") 219 220 if args.Addr == "" { 221 return nil, fmt.Errorf("addr must be set") 222 } 223 224 if args.DbName == "" { 225 return nil, fmt.Errorf("db name must be set") 226 } 227 228 if args.Did == "" { 229 return nil, fmt.Errorf("cocoon did must be set") 230 } 231 232 if args.ContactEmail == "" { 233 return nil, fmt.Errorf("cocoon contact email is required") 234 } 235 236 if _, err := syntax.ParseDID(args.Did); err != nil { 237 return nil, fmt.Errorf("error parsing cocoon did: %w", err) 238 } 239 240 if args.Hostname == "" { 241 return nil, fmt.Errorf("cocoon hostname must be set") 242 } 243 244 if args.AdminPassword == "" { 245 return nil, fmt.Errorf("admin password must be set") 246 } 247 248 if args.SessionSecret == "" { 249 panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ") 250 } 251 252 e := echo.New() 253 254 e.Pre(middleware.RemoveTrailingSlash()) 255 e.Pre(slogecho.New(args.Logger.With("component", "slogecho"))) 256 e.Use(echo_session.Middleware(sessions.NewCookieStore([]byte(args.SessionSecret)))) 257 e.Use(echoprometheus.NewMiddleware("cocoon")) 258 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ 259 AllowOrigins: []string{"*"}, 260 AllowHeaders: []string{"*"}, 261 AllowMethods: []string{"*"}, 262 AllowCredentials: true, 263 MaxAge: 100_000_000, 264 })) 265 266 vdtor := validator.New() 267 vdtor.RegisterValidation("atproto-handle", func(fl validator.FieldLevel) bool { 268 if _, err := syntax.ParseHandle(fl.Field().String()); err != nil { 269 return false 270 } 271 return true 272 }) 273 vdtor.RegisterValidation("atproto-did", func(fl validator.FieldLevel) bool { 274 if _, err := syntax.ParseDID(fl.Field().String()); err != nil { 275 return false 276 } 277 return true 278 }) 279 vdtor.RegisterValidation("atproto-rkey", func(fl validator.FieldLevel) bool { 280 if _, err := syntax.ParseRecordKey(fl.Field().String()); err != nil { 281 return false 282 } 283 return true 284 }) 285 vdtor.RegisterValidation("atproto-nsid", func(fl validator.FieldLevel) bool { 286 if _, err := syntax.ParseNSID(fl.Field().String()); err != nil { 287 return false 288 } 289 return true 290 }) 291 292 e.Validator = &CustomValidator{validator: vdtor} 293 294 httpd := &http.Server{ 295 Addr: args.Addr, 296 Handler: e, 297 // shitty defaults but okay for now, needed for import repo 298 ReadTimeout: 5 * time.Minute, 299 WriteTimeout: 5 * time.Minute, 300 IdleTimeout: 5 * time.Minute, 301 } 302 303 dbType := args.DbType 304 if dbType == "" { 305 dbType = "sqlite" 306 } 307 308 var gdb *gorm.DB 309 var err error 310 switch dbType { 311 case "postgres": 312 if args.DatabaseURL == "" { 313 return nil, fmt.Errorf("database-url must be set when using postgres") 314 } 315 gdb, err = gorm.Open(postgres.Open(args.DatabaseURL), &gorm.Config{}) 316 if err != nil { 317 return nil, fmt.Errorf("failed to connect to postgres: %w", err) 318 } 319 logger.Info("connected to PostgreSQL database") 320 default: 321 gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{}) 322 if err != nil { 323 return nil, fmt.Errorf("failed to open sqlite database: %w", err) 324 } 325 gdb.Exec("PRAGMA journal_mode=WAL") 326 gdb.Exec("PRAGMA synchronous=NORMAL") 327 328 logger.Info("connected to SQLite database", "path", args.DbName) 329 } 330 dbw := db.NewDB(gdb) 331 332 rkbytes, err := os.ReadFile(args.RotationKeyPath) 333 if err != nil { 334 return nil, err 335 } 336 337 h := util.RobustHTTPClient() 338 339 plcClient, err := plc.NewClient(&plc.ClientArgs{ 340 H: h, 341 Service: "https://plc.directory", 342 PdsHostname: args.Hostname, 343 RotationKey: rkbytes, 344 }) 345 if err != nil { 346 return nil, err 347 } 348 349 jwkbytes, err := os.ReadFile(args.JwkPath) 350 if err != nil { 351 return nil, err 352 } 353 354 key, err := helpers.ParseJWKFromBytes(jwkbytes) 355 if err != nil { 356 return nil, err 357 } 358 359 var pkey ecdsa.PrivateKey 360 if err := key.Raw(&pkey); err != nil { 361 return nil, err 362 } 363 364 oauthCli := &http.Client{ 365 Timeout: 10 * time.Second, 366 } 367 368 var nonceSecret []byte 369 maybeSecret, err := os.ReadFile("nonce.secret") 370 if err != nil && !os.IsNotExist(err) { 371 logger.Error("error attempting to read nonce secret", "error", err) 372 } else { 373 nonceSecret = maybeSecret 374 } 375 376 s := &Server{ 377 http: h, 378 httpd: httpd, 379 echo: e, 380 logger: args.Logger, 381 db: dbw, 382 plcClient: plcClient, 383 privateKey: &pkey, 384 config: &config{ 385 Version: args.Version, 386 Did: args.Did, 387 Hostname: args.Hostname, 388 ContactEmail: args.ContactEmail, 389 EnforcePeering: false, 390 Relays: args.Relays, 391 AdminPassword: args.AdminPassword, 392 RequireInvite: args.RequireInvite, 393 SmtpName: args.SmtpName, 394 SmtpEmail: args.SmtpEmail, 395 BlockstoreVariant: args.BlockstoreVariant, 396 FallbackProxy: args.FallbackProxy, 397 }, 398 evtman: events.NewEventManager(events.NewMemPersister()), 399 passport: identity.NewPassport(h, identity.NewMemCache(10_000)), 400 401 dbName: args.DbName, 402 dbType: dbType, 403 s3Config: args.S3Config, 404 405 oauthProvider: provider.NewProvider(provider.Args{ 406 Hostname: args.Hostname, 407 ClientManagerArgs: client.ManagerArgs{ 408 Cli: oauthCli, 409 Logger: args.Logger.With("component", "oauth-client-manager"), 410 }, 411 DpopManagerArgs: dpop.ManagerArgs{ 412 NonceSecret: nonceSecret, 413 NonceRotationInterval: constants.NonceMaxRotationInterval / 3, 414 OnNonceSecretCreated: func(newNonce []byte) { 415 if err := os.WriteFile("nonce.secret", newNonce, 0644); err != nil { 416 logger.Error("error writing new nonce secret", "error", err) 417 } 418 }, 419 Logger: args.Logger.With("component", "dpop-manager"), 420 Hostname: args.Hostname, 421 }, 422 }), 423 } 424 425 s.loadTemplates() 426 427 s.repoman = NewRepoMan(s) // TODO: this is way too lazy, stop it 428 429 // TODO: should validate these args 430 if args.SmtpUser == "" || args.SmtpPass == "" || args.SmtpHost == "" || args.SmtpPort == "" || args.SmtpEmail == "" || args.SmtpName == "" { 431 args.Logger.Warn("not enough smtp args were provided. mailing will not work for your server.") 432 } else { 433 mail := mailyak.New(args.SmtpHost+":"+args.SmtpPort, smtp.PlainAuth("", args.SmtpUser, args.SmtpPass, args.SmtpHost)) 434 mail.From(s.config.SmtpEmail) 435 mail.FromName(s.config.SmtpName) 436 437 s.mail = mail 438 s.mailLk = &sync.Mutex{} 439 } 440 441 return s, nil 442} 443 444func (s *Server) addRoutes() { 445 // static 446 if s.config.Version == "dev" { 447 s.echo.Static("/static", "server/static") 448 } else { 449 s.echo.GET("/static/*", echo.WrapHandler(http.FileServer(http.FS(staticFS)))) 450 } 451 452 // random stuff 453 s.echo.GET("/", s.handleRoot) 454 s.echo.GET("/xrpc/_health", s.handleHealth) 455 s.echo.GET("/.well-known/did.json", s.handleWellKnown) 456 s.echo.GET("/.well-known/atproto-did", s.handleAtprotoDid) 457 s.echo.GET("/.well-known/oauth-protected-resource", s.handleOauthProtectedResource) 458 s.echo.GET("/.well-known/oauth-authorization-server", s.handleOauthAuthorizationServer) 459 s.echo.GET("/robots.txt", s.handleRobots) 460 461 // public 462 s.echo.GET("/xrpc/com.atproto.identity.resolveHandle", s.handleResolveHandle) 463 s.echo.POST("/xrpc/com.atproto.server.createAccount", s.handleCreateAccount) 464 s.echo.POST("/xrpc/com.atproto.server.createSession", s.handleCreateSession) 465 s.echo.GET("/xrpc/com.atproto.server.describeServer", s.handleDescribeServer) 466 s.echo.POST("/xrpc/com.atproto.server.reserveSigningKey", s.handleServerReserveSigningKey) 467 468 s.echo.GET("/xrpc/com.atproto.repo.describeRepo", s.handleDescribeRepo) 469 s.echo.GET("/xrpc/com.atproto.sync.listRepos", s.handleListRepos) 470 s.echo.GET("/xrpc/com.atproto.repo.listRecords", s.handleListRecords) 471 s.echo.GET("/xrpc/com.atproto.repo.getRecord", s.handleRepoGetRecord) 472 s.echo.GET("/xrpc/com.atproto.sync.getRecord", s.handleSyncGetRecord) 473 s.echo.GET("/xrpc/com.atproto.sync.getBlocks", s.handleGetBlocks) 474 s.echo.GET("/xrpc/com.atproto.sync.getLatestCommit", s.handleSyncGetLatestCommit) 475 s.echo.GET("/xrpc/com.atproto.sync.getRepoStatus", s.handleSyncGetRepoStatus) 476 s.echo.GET("/xrpc/com.atproto.sync.getRepo", s.handleSyncGetRepo) 477 s.echo.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleSyncSubscribeRepos) 478 s.echo.GET("/xrpc/com.atproto.sync.listBlobs", s.handleSyncListBlobs) 479 s.echo.GET("/xrpc/com.atproto.sync.getBlob", s.handleSyncGetBlob) 480 481 // labels 482 s.echo.GET("/xrpc/com.atproto.label.queryLabels", s.handleLabelQueryLabels) 483 484 // account 485 s.echo.GET("/account", s.handleAccount) 486 s.echo.POST("/account/revoke", s.handleAccountRevoke) 487 s.echo.GET("/account/signin", s.handleAccountSigninGet) 488 s.echo.POST("/account/signin", s.handleAccountSigninPost) 489 s.echo.GET("/account/signout", s.handleAccountSignout) 490 491 // oauth account 492 s.echo.GET("/oauth/jwks", s.handleOauthJwks) 493 s.echo.GET("/oauth/authorize", s.handleOauthAuthorizeGet) 494 s.echo.POST("/oauth/authorize", s.handleOauthAuthorizePost) 495 496 // oauth authorization 497 s.echo.POST("/oauth/par", s.handleOauthPar, s.oauthProvider.BaseMiddleware) 498 s.echo.POST("/oauth/token", s.handleOauthToken, s.oauthProvider.BaseMiddleware) 499 500 // authed 501 s.echo.GET("/xrpc/com.atproto.server.getSession", s.handleGetSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 502 s.echo.POST("/xrpc/com.atproto.server.refreshSession", s.handleRefreshSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 503 s.echo.POST("/xrpc/com.atproto.server.deleteSession", s.handleDeleteSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 504 s.echo.GET("/xrpc/com.atproto.identity.getRecommendedDidCredentials", s.handleGetRecommendedDidCredentials, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 505 s.echo.POST("/xrpc/com.atproto.identity.updateHandle", s.handleIdentityUpdateHandle, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 506 s.echo.POST("/xrpc/com.atproto.identity.requestPlcOperationSignature", s.handleIdentityRequestPlcOperationSignature, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 507 s.echo.POST("/xrpc/com.atproto.identity.signPlcOperation", s.handleSignPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 508 s.echo.POST("/xrpc/com.atproto.identity.submitPlcOperation", s.handleSubmitPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 509 s.echo.POST("/xrpc/com.atproto.server.confirmEmail", s.handleServerConfirmEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 510 s.echo.POST("/xrpc/com.atproto.server.requestEmailConfirmation", s.handleServerRequestEmailConfirmation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 511 s.echo.POST("/xrpc/com.atproto.server.requestPasswordReset", s.handleServerRequestPasswordReset) // AUTH NOT REQUIRED FOR THIS ONE 512 s.echo.POST("/xrpc/com.atproto.server.requestEmailUpdate", s.handleServerRequestEmailUpdate, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 513 s.echo.POST("/xrpc/com.atproto.server.resetPassword", s.handleServerResetPassword, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 514 s.echo.POST("/xrpc/com.atproto.server.updateEmail", s.handleServerUpdateEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 515 s.echo.GET("/xrpc/com.atproto.server.getServiceAuth", s.handleServerGetServiceAuth, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 516 s.echo.GET("/xrpc/com.atproto.server.checkAccountStatus", s.handleServerCheckAccountStatus, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 517 s.echo.POST("/xrpc/com.atproto.server.deactivateAccount", s.handleServerDeactivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 518 s.echo.POST("/xrpc/com.atproto.server.activateAccount", s.handleServerActivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 519 s.echo.POST("/xrpc/com.atproto.server.requestAccountDelete", s.handleServerRequestAccountDelete, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 520 s.echo.POST("/xrpc/com.atproto.server.deleteAccount", s.handleServerDeleteAccount) 521 522 // repo 523 s.echo.GET("/xrpc/com.atproto.repo.listMissingBlobs", s.handleListMissingBlobs, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 524 s.echo.POST("/xrpc/com.atproto.repo.createRecord", s.handleCreateRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 525 s.echo.POST("/xrpc/com.atproto.repo.putRecord", s.handlePutRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 526 s.echo.POST("/xrpc/com.atproto.repo.deleteRecord", s.handleDeleteRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 527 s.echo.POST("/xrpc/com.atproto.repo.applyWrites", s.handleApplyWrites, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 528 s.echo.POST("/xrpc/com.atproto.repo.uploadBlob", s.handleRepoUploadBlob, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 529 s.echo.POST("/xrpc/com.atproto.repo.importRepo", s.handleRepoImportRepo, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 530 531 // stupid silly endpoints 532 s.echo.GET("/xrpc/app.bsky.actor.getPreferences", s.handleActorGetPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 533 s.echo.POST("/xrpc/app.bsky.actor.putPreferences", s.handleActorPutPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 534 s.echo.GET("/xrpc/app.bsky.feed.getFeed", s.handleProxyBskyFeedGetFeed, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 535 536 // admin routes 537 s.echo.POST("/xrpc/com.atproto.server.createInviteCode", s.handleCreateInviteCode, s.handleAdminMiddleware) 538 s.echo.POST("/xrpc/com.atproto.server.createInviteCodes", s.handleCreateInviteCodes, s.handleAdminMiddleware) 539 540 // are there any routes that we should be allowing without auth? i dont think so but idk 541 s.echo.GET("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 542 s.echo.POST("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware) 543} 544 545func (s *Server) Serve(ctx context.Context) error { 546 logger := s.logger.With("name", "Serve") 547 548 s.addRoutes() 549 550 logger.Info("migrating...") 551 552 s.db.AutoMigrate( 553 &models.Actor{}, 554 &models.Repo{}, 555 &models.InviteCode{}, 556 &models.Token{}, 557 &models.RefreshToken{}, 558 &models.Block{}, 559 &models.Record{}, 560 &models.Blob{}, 561 &models.BlobPart{}, 562 &models.ReservedKey{}, 563 &provider.OauthToken{}, 564 &provider.OauthAuthorizationRequest{}, 565 ) 566 567 logger.Info("starting cocoon") 568 569 go func() { 570 if err := s.httpd.ListenAndServe(); err != nil { 571 panic(err) 572 } 573 }() 574 575 go s.backupRoutine() 576 577 go func() { 578 if err := s.requestCrawl(ctx); err != nil { 579 logger.Error("error requesting crawls", "err", err) 580 } 581 }() 582 583 <-ctx.Done() 584 585 fmt.Println("shut down") 586 587 return nil 588} 589 590func (s *Server) requestCrawl(ctx context.Context) error { 591 logger := s.logger.With("component", "request-crawl") 592 s.requestCrawlMu.Lock() 593 defer s.requestCrawlMu.Unlock() 594 595 logger.Info("requesting crawl with configured relays") 596 597 if time.Since(s.lastRequestCrawl) <= 1*time.Minute { 598 return fmt.Errorf("a crawl request has already been made within the last minute") 599 } 600 601 for _, relay := range s.config.Relays { 602 logger := logger.With("relay", relay) 603 logger.Info("requesting crawl from relay") 604 cli := xrpc.Client{Host: relay} 605 if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{ 606 Hostname: s.config.Hostname, 607 }); err != nil { 608 logger.Error("error requesting crawl", "err", err) 609 } else { 610 logger.Info("crawl requested successfully") 611 } 612 } 613 614 s.lastRequestCrawl = time.Now() 615 616 return nil 617} 618 619func (s *Server) doBackup() { 620 logger := s.logger.With("name", "doBackup") 621 622 if s.dbType == "postgres" { 623 logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)") 624 return 625 } 626 627 start := time.Now() 628 629 logger.Info("beginning backup to s3...") 630 631 tmpFile := fmt.Sprintf("/tmp/cocoon-backup-%s.db", time.Now().Format(time.RFC3339Nano)) 632 defer os.Remove(tmpFile) 633 634 if err := s.db.Client().Exec(fmt.Sprintf("VACUUM INTO '%s'", tmpFile)).Error; err != nil { 635 logger.Error("error creating tmp backup file", "err", err) 636 return 637 } 638 639 backupData, err := os.ReadFile(tmpFile) 640 if err != nil { 641 logger.Error("error reading tmp backup file", "err", err) 642 return 643 } 644 645 logger.Info("sending to s3...") 646 647 currTime := time.Now().Format("2006-01-02_15-04-05") 648 key := "cocoon-backup-" + currTime + ".db" 649 650 config := &aws.Config{ 651 Region: aws.String(s.s3Config.Region), 652 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 653 } 654 655 if s.s3Config.Endpoint != "" { 656 config.Endpoint = aws.String(s.s3Config.Endpoint) 657 config.S3ForcePathStyle = aws.Bool(true) 658 } 659 660 sess, err := session.NewSession(config) 661 if err != nil { 662 logger.Error("error creating s3 session", "err", err) 663 return 664 } 665 666 svc := s3.New(sess) 667 668 if _, err := svc.PutObject(&s3.PutObjectInput{ 669 Bucket: aws.String(s.s3Config.Bucket), 670 Key: aws.String(key), 671 Body: bytes.NewReader(backupData), 672 }); err != nil { 673 logger.Error("error uploading file to s3", "err", err) 674 return 675 } 676 677 logger.Info("finished uploading backup to s3", "key", key, "duration", time.Since(start).Seconds()) 678 679 os.WriteFile("last-backup.txt", []byte(time.Now().Format(time.RFC3339Nano)), 0644) 680} 681 682func (s *Server) backupRoutine() { 683 logger := s.logger.With("name", "backupRoutine") 684 685 if s.s3Config == nil || !s.s3Config.BackupsEnabled { 686 return 687 } 688 689 if s.s3Config.Region == "" { 690 logger.Warn("no s3 region configured but backups are enabled. backups will not run.") 691 return 692 } 693 694 if s.s3Config.Bucket == "" { 695 logger.Warn("no s3 bucket configured but backups are enabled. backups will not run.") 696 return 697 } 698 699 if s.s3Config.AccessKey == "" { 700 logger.Warn("no s3 access key configured but backups are enabled. backups will not run.") 701 return 702 } 703 704 if s.s3Config.SecretKey == "" { 705 logger.Warn("no s3 secret key configured but backups are enabled. backups will not run.") 706 return 707 } 708 709 shouldBackupNow := false 710 lastBackupStr, err := os.ReadFile("last-backup.txt") 711 if err != nil { 712 shouldBackupNow = true 713 } else { 714 lastBackup, err := time.Parse(time.RFC3339Nano, string(lastBackupStr)) 715 if err != nil { 716 shouldBackupNow = true 717 } else if time.Since(lastBackup).Seconds() > 3600 { 718 shouldBackupNow = true 719 } 720 } 721 722 if shouldBackupNow { 723 go s.doBackup() 724 } 725 726 ticker := time.NewTicker(time.Hour) 727 for range ticker.C { 728 go s.doBackup() 729 } 730} 731 732func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error { 733 if err := s.db.Exec(ctx, "UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil { 734 return err 735 } 736 737 return nil 738}