1package server
2
3import (
4 "bytes"
5 "context"
6 "crypto/ecdsa"
7 "embed"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "net/http"
13 "net/smtp"
14 "os"
15 "path/filepath"
16 "sync"
17 "text/template"
18 "time"
19
20 "github.com/aws/aws-sdk-go/aws"
21 "github.com/aws/aws-sdk-go/aws/credentials"
22 "github.com/aws/aws-sdk-go/aws/session"
23 "github.com/aws/aws-sdk-go/service/s3"
24 "github.com/bluesky-social/indigo/api/atproto"
25 "github.com/bluesky-social/indigo/atproto/syntax"
26 "github.com/bluesky-social/indigo/events"
27 "github.com/bluesky-social/indigo/util"
28 "github.com/bluesky-social/indigo/xrpc"
29 "github.com/domodwyer/mailyak/v3"
30 "github.com/go-playground/validator"
31 "github.com/gorilla/sessions"
32 "github.com/haileyok/cocoon/identity"
33 "github.com/haileyok/cocoon/internal/db"
34 "github.com/haileyok/cocoon/internal/helpers"
35 "github.com/haileyok/cocoon/models"
36 "github.com/haileyok/cocoon/oauth/client"
37 "github.com/haileyok/cocoon/oauth/constants"
38 "github.com/haileyok/cocoon/oauth/dpop"
39 "github.com/haileyok/cocoon/oauth/provider"
40 "github.com/haileyok/cocoon/plc"
41 "github.com/ipfs/go-cid"
42 "github.com/labstack/echo-contrib/echoprometheus"
43 echo_session "github.com/labstack/echo-contrib/session"
44 "github.com/labstack/echo/v4"
45 "github.com/labstack/echo/v4/middleware"
46 slogecho "github.com/samber/slog-echo"
47 "gorm.io/driver/postgres"
48 "gorm.io/driver/sqlite"
49 "gorm.io/gorm"
50)
51
52const (
53 AccountSessionMaxAge = 30 * 24 * time.Hour // one week
54)
55
56type S3Config struct {
57 BackupsEnabled bool
58 BlobstoreEnabled bool
59 Endpoint string
60 Region string
61 Bucket string
62 AccessKey string
63 SecretKey string
64 CDNUrl string
65}
66
67type Server struct {
68 http *http.Client
69 httpd *http.Server
70 mail *mailyak.MailYak
71 mailLk *sync.Mutex
72 echo *echo.Echo
73 db *db.DB
74 plcClient *plc.Client
75 logger *slog.Logger
76 config *config
77 privateKey *ecdsa.PrivateKey
78 repoman *RepoMan
79 oauthProvider *provider.Provider
80 evtman *events.EventManager
81 passport *identity.Passport
82 fallbackProxy string
83
84 lastRequestCrawl time.Time
85 requestCrawlMu sync.Mutex
86
87 dbName string
88 dbType string
89 s3Config *S3Config
90}
91
92type Args struct {
93 Logger *slog.Logger
94
95 Addr string
96 DbName string
97 DbType string
98 DatabaseURL string
99 Version string
100 Did string
101 Hostname string
102 RotationKeyPath string
103 JwkPath string
104 ContactEmail string
105 Relays []string
106 AdminPassword string
107 RequireInvite bool
108
109 SmtpUser string
110 SmtpPass string
111 SmtpHost string
112 SmtpPort string
113 SmtpEmail string
114 SmtpName string
115
116 S3Config *S3Config
117
118 SessionSecret string
119
120 BlockstoreVariant BlockstoreVariant
121 FallbackProxy string
122}
123
124type config struct {
125 Version string
126 Did string
127 Hostname string
128 ContactEmail string
129 EnforcePeering bool
130 Relays []string
131 AdminPassword string
132 RequireInvite bool
133 SmtpEmail string
134 SmtpName string
135 BlockstoreVariant BlockstoreVariant
136 FallbackProxy string
137}
138
139type CustomValidator struct {
140 validator *validator.Validate
141}
142
143type ValidationError struct {
144 error
145 Field string
146 Tag string
147}
148
149func (cv *CustomValidator) Validate(i any) error {
150 if err := cv.validator.Struct(i); err != nil {
151 var validateErrors validator.ValidationErrors
152 if errors.As(err, &validateErrors) && len(validateErrors) > 0 {
153 first := validateErrors[0]
154 return ValidationError{
155 error: err,
156 Field: first.Field(),
157 Tag: first.Tag(),
158 }
159 }
160
161 return err
162 }
163
164 return nil
165}
166
167//go:embed templates/*
168var templateFS embed.FS
169
170//go:embed static/*
171var staticFS embed.FS
172
173type TemplateRenderer struct {
174 templates *template.Template
175 isDev bool
176 templatePath string
177}
178
179func (s *Server) loadTemplates() {
180 absPath, _ := filepath.Abs("server/templates/*.html")
181 if s.config.Version == "dev" {
182 tmpl := template.Must(template.ParseGlob(absPath))
183 s.echo.Renderer = &TemplateRenderer{
184 templates: tmpl,
185 isDev: true,
186 templatePath: absPath,
187 }
188 } else {
189 tmpl := template.Must(template.ParseFS(templateFS, "templates/*.html"))
190 s.echo.Renderer = &TemplateRenderer{
191 templates: tmpl,
192 isDev: false,
193 }
194 }
195}
196
197func (t *TemplateRenderer) Render(w io.Writer, name string, data any, c echo.Context) error {
198 if t.isDev {
199 tmpl, err := template.ParseGlob(t.templatePath)
200 if err != nil {
201 return err
202 }
203 t.templates = tmpl
204 }
205
206 if viewContext, isMap := data.(map[string]any); isMap {
207 viewContext["reverse"] = c.Echo().Reverse
208 }
209
210 return t.templates.ExecuteTemplate(w, name, data)
211}
212
213func New(args *Args) (*Server, error) {
214 if args.Logger == nil {
215 args.Logger = slog.Default()
216 }
217
218 logger := args.Logger.With("name", "New")
219
220 if args.Addr == "" {
221 return nil, fmt.Errorf("addr must be set")
222 }
223
224 if args.DbName == "" {
225 return nil, fmt.Errorf("db name must be set")
226 }
227
228 if args.Did == "" {
229 return nil, fmt.Errorf("cocoon did must be set")
230 }
231
232 if args.ContactEmail == "" {
233 return nil, fmt.Errorf("cocoon contact email is required")
234 }
235
236 if _, err := syntax.ParseDID(args.Did); err != nil {
237 return nil, fmt.Errorf("error parsing cocoon did: %w", err)
238 }
239
240 if args.Hostname == "" {
241 return nil, fmt.Errorf("cocoon hostname must be set")
242 }
243
244 if args.AdminPassword == "" {
245 return nil, fmt.Errorf("admin password must be set")
246 }
247
248 if args.SessionSecret == "" {
249 panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ")
250 }
251
252 e := echo.New()
253
254 e.Pre(middleware.RemoveTrailingSlash())
255 e.Pre(slogecho.New(args.Logger.With("component", "slogecho")))
256 e.Use(echo_session.Middleware(sessions.NewCookieStore([]byte(args.SessionSecret))))
257 e.Use(echoprometheus.NewMiddleware("cocoon"))
258 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
259 AllowOrigins: []string{"*"},
260 AllowHeaders: []string{"*"},
261 AllowMethods: []string{"*"},
262 AllowCredentials: true,
263 MaxAge: 100_000_000,
264 }))
265
266 vdtor := validator.New()
267 vdtor.RegisterValidation("atproto-handle", func(fl validator.FieldLevel) bool {
268 if _, err := syntax.ParseHandle(fl.Field().String()); err != nil {
269 return false
270 }
271 return true
272 })
273 vdtor.RegisterValidation("atproto-did", func(fl validator.FieldLevel) bool {
274 if _, err := syntax.ParseDID(fl.Field().String()); err != nil {
275 return false
276 }
277 return true
278 })
279 vdtor.RegisterValidation("atproto-rkey", func(fl validator.FieldLevel) bool {
280 if _, err := syntax.ParseRecordKey(fl.Field().String()); err != nil {
281 return false
282 }
283 return true
284 })
285 vdtor.RegisterValidation("atproto-nsid", func(fl validator.FieldLevel) bool {
286 if _, err := syntax.ParseNSID(fl.Field().String()); err != nil {
287 return false
288 }
289 return true
290 })
291
292 e.Validator = &CustomValidator{validator: vdtor}
293
294 httpd := &http.Server{
295 Addr: args.Addr,
296 Handler: e,
297 // shitty defaults but okay for now, needed for import repo
298 ReadTimeout: 5 * time.Minute,
299 WriteTimeout: 5 * time.Minute,
300 IdleTimeout: 5 * time.Minute,
301 }
302
303 dbType := args.DbType
304 if dbType == "" {
305 dbType = "sqlite"
306 }
307
308 var gdb *gorm.DB
309 var err error
310 switch dbType {
311 case "postgres":
312 if args.DatabaseURL == "" {
313 return nil, fmt.Errorf("database-url must be set when using postgres")
314 }
315 gdb, err = gorm.Open(postgres.Open(args.DatabaseURL), &gorm.Config{})
316 if err != nil {
317 return nil, fmt.Errorf("failed to connect to postgres: %w", err)
318 }
319 logger.Info("connected to PostgreSQL database")
320 default:
321 gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{})
322 if err != nil {
323 return nil, fmt.Errorf("failed to open sqlite database: %w", err)
324 }
325 gdb.Exec("PRAGMA journal_mode=WAL")
326 gdb.Exec("PRAGMA synchronous=NORMAL")
327
328 logger.Info("connected to SQLite database", "path", args.DbName)
329 }
330 dbw := db.NewDB(gdb)
331
332 rkbytes, err := os.ReadFile(args.RotationKeyPath)
333 if err != nil {
334 return nil, err
335 }
336
337 h := util.RobustHTTPClient()
338
339 plcClient, err := plc.NewClient(&plc.ClientArgs{
340 H: h,
341 Service: "https://plc.directory",
342 PdsHostname: args.Hostname,
343 RotationKey: rkbytes,
344 })
345 if err != nil {
346 return nil, err
347 }
348
349 jwkbytes, err := os.ReadFile(args.JwkPath)
350 if err != nil {
351 return nil, err
352 }
353
354 key, err := helpers.ParseJWKFromBytes(jwkbytes)
355 if err != nil {
356 return nil, err
357 }
358
359 var pkey ecdsa.PrivateKey
360 if err := key.Raw(&pkey); err != nil {
361 return nil, err
362 }
363
364 oauthCli := &http.Client{
365 Timeout: 10 * time.Second,
366 }
367
368 var nonceSecret []byte
369 maybeSecret, err := os.ReadFile("nonce.secret")
370 if err != nil && !os.IsNotExist(err) {
371 logger.Error("error attempting to read nonce secret", "error", err)
372 } else {
373 nonceSecret = maybeSecret
374 }
375
376 s := &Server{
377 http: h,
378 httpd: httpd,
379 echo: e,
380 logger: args.Logger,
381 db: dbw,
382 plcClient: plcClient,
383 privateKey: &pkey,
384 config: &config{
385 Version: args.Version,
386 Did: args.Did,
387 Hostname: args.Hostname,
388 ContactEmail: args.ContactEmail,
389 EnforcePeering: false,
390 Relays: args.Relays,
391 AdminPassword: args.AdminPassword,
392 RequireInvite: args.RequireInvite,
393 SmtpName: args.SmtpName,
394 SmtpEmail: args.SmtpEmail,
395 BlockstoreVariant: args.BlockstoreVariant,
396 FallbackProxy: args.FallbackProxy,
397 },
398 evtman: events.NewEventManager(events.NewMemPersister()),
399 passport: identity.NewPassport(h, identity.NewMemCache(10_000)),
400
401 dbName: args.DbName,
402 dbType: dbType,
403 s3Config: args.S3Config,
404
405 oauthProvider: provider.NewProvider(provider.Args{
406 Hostname: args.Hostname,
407 ClientManagerArgs: client.ManagerArgs{
408 Cli: oauthCli,
409 Logger: args.Logger.With("component", "oauth-client-manager"),
410 },
411 DpopManagerArgs: dpop.ManagerArgs{
412 NonceSecret: nonceSecret,
413 NonceRotationInterval: constants.NonceMaxRotationInterval / 3,
414 OnNonceSecretCreated: func(newNonce []byte) {
415 if err := os.WriteFile("nonce.secret", newNonce, 0644); err != nil {
416 logger.Error("error writing new nonce secret", "error", err)
417 }
418 },
419 Logger: args.Logger.With("component", "dpop-manager"),
420 Hostname: args.Hostname,
421 },
422 }),
423 }
424
425 s.loadTemplates()
426
427 s.repoman = NewRepoMan(s) // TODO: this is way too lazy, stop it
428
429 // TODO: should validate these args
430 if args.SmtpUser == "" || args.SmtpPass == "" || args.SmtpHost == "" || args.SmtpPort == "" || args.SmtpEmail == "" || args.SmtpName == "" {
431 args.Logger.Warn("not enough smtp args were provided. mailing will not work for your server.")
432 } else {
433 mail := mailyak.New(args.SmtpHost+":"+args.SmtpPort, smtp.PlainAuth("", args.SmtpUser, args.SmtpPass, args.SmtpHost))
434 mail.From(s.config.SmtpEmail)
435 mail.FromName(s.config.SmtpName)
436
437 s.mail = mail
438 s.mailLk = &sync.Mutex{}
439 }
440
441 return s, nil
442}
443
444func (s *Server) addRoutes() {
445 // static
446 if s.config.Version == "dev" {
447 s.echo.Static("/static", "server/static")
448 } else {
449 s.echo.GET("/static/*", echo.WrapHandler(http.FileServer(http.FS(staticFS))))
450 }
451
452 // random stuff
453 s.echo.GET("/", s.handleRoot)
454 s.echo.GET("/xrpc/_health", s.handleHealth)
455 s.echo.GET("/.well-known/did.json", s.handleWellKnown)
456 s.echo.GET("/.well-known/atproto-did", s.handleAtprotoDid)
457 s.echo.GET("/.well-known/oauth-protected-resource", s.handleOauthProtectedResource)
458 s.echo.GET("/.well-known/oauth-authorization-server", s.handleOauthAuthorizationServer)
459 s.echo.GET("/robots.txt", s.handleRobots)
460
461 // public
462 s.echo.GET("/xrpc/com.atproto.identity.resolveHandle", s.handleResolveHandle)
463 s.echo.POST("/xrpc/com.atproto.server.createAccount", s.handleCreateAccount)
464 s.echo.POST("/xrpc/com.atproto.server.createSession", s.handleCreateSession)
465 s.echo.GET("/xrpc/com.atproto.server.describeServer", s.handleDescribeServer)
466 s.echo.POST("/xrpc/com.atproto.server.reserveSigningKey", s.handleServerReserveSigningKey)
467
468 s.echo.GET("/xrpc/com.atproto.repo.describeRepo", s.handleDescribeRepo)
469 s.echo.GET("/xrpc/com.atproto.sync.listRepos", s.handleListRepos)
470 s.echo.GET("/xrpc/com.atproto.repo.listRecords", s.handleListRecords)
471 s.echo.GET("/xrpc/com.atproto.repo.getRecord", s.handleRepoGetRecord)
472 s.echo.GET("/xrpc/com.atproto.sync.getRecord", s.handleSyncGetRecord)
473 s.echo.GET("/xrpc/com.atproto.sync.getBlocks", s.handleGetBlocks)
474 s.echo.GET("/xrpc/com.atproto.sync.getLatestCommit", s.handleSyncGetLatestCommit)
475 s.echo.GET("/xrpc/com.atproto.sync.getRepoStatus", s.handleSyncGetRepoStatus)
476 s.echo.GET("/xrpc/com.atproto.sync.getRepo", s.handleSyncGetRepo)
477 s.echo.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleSyncSubscribeRepos)
478 s.echo.GET("/xrpc/com.atproto.sync.listBlobs", s.handleSyncListBlobs)
479 s.echo.GET("/xrpc/com.atproto.sync.getBlob", s.handleSyncGetBlob)
480
481 // labels
482 s.echo.GET("/xrpc/com.atproto.label.queryLabels", s.handleLabelQueryLabels)
483
484 // account
485 s.echo.GET("/account", s.handleAccount)
486 s.echo.POST("/account/revoke", s.handleAccountRevoke)
487 s.echo.GET("/account/signin", s.handleAccountSigninGet)
488 s.echo.POST("/account/signin", s.handleAccountSigninPost)
489 s.echo.GET("/account/signout", s.handleAccountSignout)
490
491 // oauth account
492 s.echo.GET("/oauth/jwks", s.handleOauthJwks)
493 s.echo.GET("/oauth/authorize", s.handleOauthAuthorizeGet)
494 s.echo.POST("/oauth/authorize", s.handleOauthAuthorizePost)
495
496 // oauth authorization
497 s.echo.POST("/oauth/par", s.handleOauthPar, s.oauthProvider.BaseMiddleware)
498 s.echo.POST("/oauth/token", s.handleOauthToken, s.oauthProvider.BaseMiddleware)
499
500 // authed
501 s.echo.GET("/xrpc/com.atproto.server.getSession", s.handleGetSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
502 s.echo.POST("/xrpc/com.atproto.server.refreshSession", s.handleRefreshSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
503 s.echo.POST("/xrpc/com.atproto.server.deleteSession", s.handleDeleteSession, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
504 s.echo.GET("/xrpc/com.atproto.identity.getRecommendedDidCredentials", s.handleGetRecommendedDidCredentials, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
505 s.echo.POST("/xrpc/com.atproto.identity.updateHandle", s.handleIdentityUpdateHandle, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
506 s.echo.POST("/xrpc/com.atproto.identity.requestPlcOperationSignature", s.handleIdentityRequestPlcOperationSignature, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
507 s.echo.POST("/xrpc/com.atproto.identity.signPlcOperation", s.handleSignPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
508 s.echo.POST("/xrpc/com.atproto.identity.submitPlcOperation", s.handleSubmitPlcOperation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
509 s.echo.POST("/xrpc/com.atproto.server.confirmEmail", s.handleServerConfirmEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
510 s.echo.POST("/xrpc/com.atproto.server.requestEmailConfirmation", s.handleServerRequestEmailConfirmation, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
511 s.echo.POST("/xrpc/com.atproto.server.requestPasswordReset", s.handleServerRequestPasswordReset) // AUTH NOT REQUIRED FOR THIS ONE
512 s.echo.POST("/xrpc/com.atproto.server.requestEmailUpdate", s.handleServerRequestEmailUpdate, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
513 s.echo.POST("/xrpc/com.atproto.server.resetPassword", s.handleServerResetPassword, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
514 s.echo.POST("/xrpc/com.atproto.server.updateEmail", s.handleServerUpdateEmail, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
515 s.echo.GET("/xrpc/com.atproto.server.getServiceAuth", s.handleServerGetServiceAuth, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
516 s.echo.GET("/xrpc/com.atproto.server.checkAccountStatus", s.handleServerCheckAccountStatus, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
517 s.echo.POST("/xrpc/com.atproto.server.deactivateAccount", s.handleServerDeactivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
518 s.echo.POST("/xrpc/com.atproto.server.activateAccount", s.handleServerActivateAccount, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
519 s.echo.POST("/xrpc/com.atproto.server.requestAccountDelete", s.handleServerRequestAccountDelete, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
520 s.echo.POST("/xrpc/com.atproto.server.deleteAccount", s.handleServerDeleteAccount)
521
522 // repo
523 s.echo.GET("/xrpc/com.atproto.repo.listMissingBlobs", s.handleListMissingBlobs, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
524 s.echo.POST("/xrpc/com.atproto.repo.createRecord", s.handleCreateRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
525 s.echo.POST("/xrpc/com.atproto.repo.putRecord", s.handlePutRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
526 s.echo.POST("/xrpc/com.atproto.repo.deleteRecord", s.handleDeleteRecord, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
527 s.echo.POST("/xrpc/com.atproto.repo.applyWrites", s.handleApplyWrites, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
528 s.echo.POST("/xrpc/com.atproto.repo.uploadBlob", s.handleRepoUploadBlob, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
529 s.echo.POST("/xrpc/com.atproto.repo.importRepo", s.handleRepoImportRepo, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
530
531 // stupid silly endpoints
532 s.echo.GET("/xrpc/app.bsky.actor.getPreferences", s.handleActorGetPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
533 s.echo.POST("/xrpc/app.bsky.actor.putPreferences", s.handleActorPutPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
534 s.echo.GET("/xrpc/app.bsky.feed.getFeed", s.handleProxyBskyFeedGetFeed, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
535
536 // admin routes
537 s.echo.POST("/xrpc/com.atproto.server.createInviteCode", s.handleCreateInviteCode, s.handleAdminMiddleware)
538 s.echo.POST("/xrpc/com.atproto.server.createInviteCodes", s.handleCreateInviteCodes, s.handleAdminMiddleware)
539
540 // are there any routes that we should be allowing without auth? i dont think so but idk
541 s.echo.GET("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
542 s.echo.POST("/xrpc/*", s.handleProxy, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
543}
544
545func (s *Server) Serve(ctx context.Context) error {
546 logger := s.logger.With("name", "Serve")
547
548 s.addRoutes()
549
550 logger.Info("migrating...")
551
552 s.db.AutoMigrate(
553 &models.Actor{},
554 &models.Repo{},
555 &models.InviteCode{},
556 &models.Token{},
557 &models.RefreshToken{},
558 &models.Block{},
559 &models.Record{},
560 &models.Blob{},
561 &models.BlobPart{},
562 &models.ReservedKey{},
563 &provider.OauthToken{},
564 &provider.OauthAuthorizationRequest{},
565 )
566
567 logger.Info("starting cocoon")
568
569 go func() {
570 if err := s.httpd.ListenAndServe(); err != nil {
571 panic(err)
572 }
573 }()
574
575 go s.backupRoutine()
576
577 go func() {
578 if err := s.requestCrawl(ctx); err != nil {
579 logger.Error("error requesting crawls", "err", err)
580 }
581 }()
582
583 <-ctx.Done()
584
585 fmt.Println("shut down")
586
587 return nil
588}
589
590func (s *Server) requestCrawl(ctx context.Context) error {
591 logger := s.logger.With("component", "request-crawl")
592 s.requestCrawlMu.Lock()
593 defer s.requestCrawlMu.Unlock()
594
595 logger.Info("requesting crawl with configured relays")
596
597 if time.Since(s.lastRequestCrawl) <= 1*time.Minute {
598 return fmt.Errorf("a crawl request has already been made within the last minute")
599 }
600
601 for _, relay := range s.config.Relays {
602 logger := logger.With("relay", relay)
603 logger.Info("requesting crawl from relay")
604 cli := xrpc.Client{Host: relay}
605 if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{
606 Hostname: s.config.Hostname,
607 }); err != nil {
608 logger.Error("error requesting crawl", "err", err)
609 } else {
610 logger.Info("crawl requested successfully")
611 }
612 }
613
614 s.lastRequestCrawl = time.Now()
615
616 return nil
617}
618
619func (s *Server) doBackup() {
620 logger := s.logger.With("name", "doBackup")
621
622 if s.dbType == "postgres" {
623 logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)")
624 return
625 }
626
627 start := time.Now()
628
629 logger.Info("beginning backup to s3...")
630
631 tmpFile := fmt.Sprintf("/tmp/cocoon-backup-%s.db", time.Now().Format(time.RFC3339Nano))
632 defer os.Remove(tmpFile)
633
634 if err := s.db.Client().Exec(fmt.Sprintf("VACUUM INTO '%s'", tmpFile)).Error; err != nil {
635 logger.Error("error creating tmp backup file", "err", err)
636 return
637 }
638
639 backupData, err := os.ReadFile(tmpFile)
640 if err != nil {
641 logger.Error("error reading tmp backup file", "err", err)
642 return
643 }
644
645 logger.Info("sending to s3...")
646
647 currTime := time.Now().Format("2006-01-02_15-04-05")
648 key := "cocoon-backup-" + currTime + ".db"
649
650 config := &aws.Config{
651 Region: aws.String(s.s3Config.Region),
652 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
653 }
654
655 if s.s3Config.Endpoint != "" {
656 config.Endpoint = aws.String(s.s3Config.Endpoint)
657 config.S3ForcePathStyle = aws.Bool(true)
658 }
659
660 sess, err := session.NewSession(config)
661 if err != nil {
662 logger.Error("error creating s3 session", "err", err)
663 return
664 }
665
666 svc := s3.New(sess)
667
668 if _, err := svc.PutObject(&s3.PutObjectInput{
669 Bucket: aws.String(s.s3Config.Bucket),
670 Key: aws.String(key),
671 Body: bytes.NewReader(backupData),
672 }); err != nil {
673 logger.Error("error uploading file to s3", "err", err)
674 return
675 }
676
677 logger.Info("finished uploading backup to s3", "key", key, "duration", time.Since(start).Seconds())
678
679 os.WriteFile("last-backup.txt", []byte(time.Now().Format(time.RFC3339Nano)), 0644)
680}
681
682func (s *Server) backupRoutine() {
683 logger := s.logger.With("name", "backupRoutine")
684
685 if s.s3Config == nil || !s.s3Config.BackupsEnabled {
686 return
687 }
688
689 if s.s3Config.Region == "" {
690 logger.Warn("no s3 region configured but backups are enabled. backups will not run.")
691 return
692 }
693
694 if s.s3Config.Bucket == "" {
695 logger.Warn("no s3 bucket configured but backups are enabled. backups will not run.")
696 return
697 }
698
699 if s.s3Config.AccessKey == "" {
700 logger.Warn("no s3 access key configured but backups are enabled. backups will not run.")
701 return
702 }
703
704 if s.s3Config.SecretKey == "" {
705 logger.Warn("no s3 secret key configured but backups are enabled. backups will not run.")
706 return
707 }
708
709 shouldBackupNow := false
710 lastBackupStr, err := os.ReadFile("last-backup.txt")
711 if err != nil {
712 shouldBackupNow = true
713 } else {
714 lastBackup, err := time.Parse(time.RFC3339Nano, string(lastBackupStr))
715 if err != nil {
716 shouldBackupNow = true
717 } else if time.Since(lastBackup).Seconds() > 3600 {
718 shouldBackupNow = true
719 }
720 }
721
722 if shouldBackupNow {
723 go s.doBackup()
724 }
725
726 ticker := time.NewTicker(time.Hour)
727 for range ticker.C {
728 go s.doBackup()
729 }
730}
731
732func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error {
733 if err := s.db.Exec(ctx, "UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil {
734 return err
735 }
736
737 return nil
738}