An implementation of the ATProto statusphere example app but in Go
1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log"
8 "log/slog"
9 "net/http"
10 "os"
11 "os/signal"
12 "path"
13 "syscall"
14 "time"
15
16 "github.com/avast/retry-go/v4"
17 "github.com/bluesky-social/indigo/atproto/auth/oauth"
18 "github.com/joho/godotenv"
19 "github.com/willdot/statusphere-go"
20 "github.com/willdot/statusphere-go/database"
21)
22
23const (
24 defaultServerAddr = "wss://jetstream.atproto.tools/subscribe"
25 httpClientTimeoutDuration = time.Second * 5
26 transportIdleConnTimeoutDuration = time.Second * 90
27 defaultPort = "8080"
28)
29
30func main() {
31 envLocation := os.Getenv("ENV_LOCATION")
32 if envLocation == "" {
33 envLocation = ".env"
34 }
35
36 err := godotenv.Load(envLocation)
37 if err != nil {
38 if !os.IsNotExist(err) {
39 log.Fatal("Error loading .env file")
40 }
41 }
42
43 host := os.Getenv("HOST")
44 if host == "" {
45 slog.Error("missing HOST env variable")
46 return
47 }
48
49 dbMountPath := os.Getenv("DATABASE_MOUNT_PATH")
50 if dbMountPath == "" {
51 slog.Error("DATABASE_MOUNT_PATH env not set")
52 return
53 }
54
55 dbFilename := path.Join(dbMountPath, "database.db")
56 db, err := database.New(dbFilename)
57 if err != nil {
58 slog.Error("create new database", "error", err)
59 return
60 }
61 defer db.Close()
62
63 httpClient := &http.Client{
64 Timeout: httpClientTimeoutDuration,
65 Transport: &http.Transport{
66 IdleConnTimeout: transportIdleConnTimeoutDuration,
67 },
68 }
69
70 var config oauth.ClientConfig
71 port := os.Getenv("PORT")
72 if port == "" {
73 port = defaultPort
74 }
75 scopes := []string{"atproto", "transition:generic"}
76 if host == "" {
77 host = fmt.Sprintf("http://127.0.0.1:%s", port)
78 config = oauth.NewLocalhostConfig(
79 fmt.Sprintf("%s/oauth-callback", host),
80 scopes,
81 )
82 slog.Info("configuring localhost OAuth client", "CallbackURL", config.CallbackURL)
83 } else {
84 config = oauth.NewPublicConfig(
85 fmt.Sprintf("%s/oauth-client-metadata.json", host),
86 fmt.Sprintf("%s/oauth-callback", host),
87 scopes,
88 )
89 }
90 oauthClient := oauth.NewClientApp(&config, db)
91
92 server, err := statusphere.NewServer(host, port, db, oauthClient, httpClient)
93 if err != nil {
94 slog.Error("create new server", "error", err)
95 return
96 }
97
98 signals := make(chan os.Signal, 1)
99 signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
100
101 ctx, cancel := context.WithCancel(context.Background())
102 defer cancel()
103
104 go func() {
105 <-signals
106 cancel()
107 _ = server.Stop(context.Background())
108 }()
109
110 go consumeLoop(ctx, db)
111
112 server.Run()
113}
114
115func consumeLoop(ctx context.Context, db *database.DB) {
116 jsServerAddr := os.Getenv("JS_SERVER_ADDR")
117 if jsServerAddr == "" {
118 jsServerAddr = defaultServerAddr
119 }
120
121 consumer := statusphere.NewConsumer(jsServerAddr, slog.Default(), db)
122
123 err := retry.Do(func() error {
124 err := consumer.Consume(ctx)
125 if err != nil {
126 if errors.Is(err, context.Canceled) {
127 return nil
128 }
129 slog.Error("consume loop", "error", err)
130 return err
131 }
132 return nil
133 }, retry.UntilSucceeded()) // retry indefinitly until context canceled
134 slog.Error(err.Error())
135 slog.Warn("exiting consume loop")
136}