An implementation of the ATProto statusphere example app but in Go
at main 3.0 kB view raw
1package main 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log" 8 "log/slog" 9 "net/http" 10 "os" 11 "os/signal" 12 "path" 13 "syscall" 14 "time" 15 16 "github.com/avast/retry-go/v4" 17 "github.com/bluesky-social/indigo/atproto/auth/oauth" 18 "github.com/joho/godotenv" 19 "github.com/willdot/statusphere-go" 20 "github.com/willdot/statusphere-go/database" 21) 22 23const ( 24 defaultServerAddr = "wss://jetstream.atproto.tools/subscribe" 25 httpClientTimeoutDuration = time.Second * 5 26 transportIdleConnTimeoutDuration = time.Second * 90 27 defaultPort = "8080" 28) 29 30func main() { 31 envLocation := os.Getenv("ENV_LOCATION") 32 if envLocation == "" { 33 envLocation = ".env" 34 } 35 36 err := godotenv.Load(envLocation) 37 if err != nil { 38 if !os.IsNotExist(err) { 39 log.Fatal("Error loading .env file") 40 } 41 } 42 43 host := os.Getenv("HOST") 44 if host == "" { 45 slog.Error("missing HOST env variable") 46 return 47 } 48 49 dbMountPath := os.Getenv("DATABASE_MOUNT_PATH") 50 if dbMountPath == "" { 51 slog.Error("DATABASE_MOUNT_PATH env not set") 52 return 53 } 54 55 dbFilename := path.Join(dbMountPath, "database.db") 56 db, err := database.New(dbFilename) 57 if err != nil { 58 slog.Error("create new database", "error", err) 59 return 60 } 61 defer db.Close() 62 63 httpClient := &http.Client{ 64 Timeout: httpClientTimeoutDuration, 65 Transport: &http.Transport{ 66 IdleConnTimeout: transportIdleConnTimeoutDuration, 67 }, 68 } 69 70 var config oauth.ClientConfig 71 port := os.Getenv("PORT") 72 if port == "" { 73 port = defaultPort 74 } 75 scopes := []string{"atproto", "transition:generic"} 76 if host == "" { 77 host = fmt.Sprintf("http://127.0.0.1:%s", port) 78 config = oauth.NewLocalhostConfig( 79 fmt.Sprintf("%s/oauth-callback", host), 80 scopes, 81 ) 82 slog.Info("configuring localhost OAuth client", "CallbackURL", config.CallbackURL) 83 } else { 84 config = oauth.NewPublicConfig( 85 fmt.Sprintf("%s/oauth-client-metadata.json", host), 86 fmt.Sprintf("%s/oauth-callback", host), 87 scopes, 88 ) 89 } 90 oauthClient := oauth.NewClientApp(&config, db) 91 92 server, err := statusphere.NewServer(host, port, db, oauthClient, httpClient) 93 if err != nil { 94 slog.Error("create new server", "error", err) 95 return 96 } 97 98 signals := make(chan os.Signal, 1) 99 signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) 100 101 ctx, cancel := context.WithCancel(context.Background()) 102 defer cancel() 103 104 go func() { 105 <-signals 106 cancel() 107 _ = server.Stop(context.Background()) 108 }() 109 110 go consumeLoop(ctx, db) 111 112 server.Run() 113} 114 115func consumeLoop(ctx context.Context, db *database.DB) { 116 jsServerAddr := os.Getenv("JS_SERVER_ADDR") 117 if jsServerAddr == "" { 118 jsServerAddr = defaultServerAddr 119 } 120 121 consumer := statusphere.NewConsumer(jsServerAddr, slog.Default(), db) 122 123 err := retry.Do(func() error { 124 err := consumer.Consume(ctx) 125 if err != nil { 126 if errors.Is(err, context.Canceled) { 127 return nil 128 } 129 slog.Error("consume loop", "error", err) 130 return err 131 } 132 return nil 133 }, retry.UntilSucceeded()) // retry indefinitly until context canceled 134 slog.Error(err.Error()) 135 slog.Warn("exiting consume loop") 136}