An implementation of the ATProto statusphere example app but in Go

Compare changes

Choose any two refs to compare.

+4
.dockerignore
···
··· 1 + statuspherego 2 + database.db 3 + .env 4 + example.env
+19
Dockerfile
···
··· 1 + FROM golang:alpine AS builder 2 + 3 + WORKDIR /app 4 + 5 + COPY . . 6 + RUN go mod download 7 + 8 + COPY . . 9 + 10 + RUN CGO_ENABLED=0 go build -o statusphere-go ./cmd/main.go 11 + 12 + FROM alpine:latest 13 + 14 + RUN apk --no-cache add ca-certificates 15 + 16 + WORKDIR /app/ 17 + COPY --from=builder /app/statusphere-go . 18 + 19 + ENTRYPOINT ["./statusphere-go"]
+14 -4
cmd/main.go
··· 24 defaultServerAddr = "wss://jetstream.atproto.tools/subscribe" 25 httpClientTimeoutDuration = time.Second * 5 26 transportIdleConnTimeoutDuration = time.Second * 90 27 ) 28 29 func main() { 30 - err := godotenv.Load(".env") 31 if err != nil { 32 if !os.IsNotExist(err) { 33 log.Fatal("Error loading .env file") ··· 62 } 63 64 var config oauth.ClientConfig 65 - bind := ":8080" 66 scopes := []string{"atproto", "transition:generic"} 67 if host == "" { 68 config = oauth.NewLocalhostConfig( 69 - fmt.Sprintf("http://127.0.0.1%s/oauth-callback", bind), 70 scopes, 71 ) 72 slog.Info("configuring localhost OAuth client", "CallbackURL", config.CallbackURL) ··· 79 } 80 oauthClient := oauth.NewClientApp(&config, db) 81 82 - server, err := statusphere.NewServer(host, 8080, db, oauthClient, httpClient) 83 if err != nil { 84 slog.Error("create new server", "error", err) 85 return
··· 24 defaultServerAddr = "wss://jetstream.atproto.tools/subscribe" 25 httpClientTimeoutDuration = time.Second * 5 26 transportIdleConnTimeoutDuration = time.Second * 90 27 + defaultPort = "8080" 28 ) 29 30 func main() { 31 + envLocation := os.Getenv("ENV_LOCATION") 32 + if envLocation == "" { 33 + envLocation = ".env" 34 + } 35 + 36 + err := godotenv.Load(envLocation) 37 if err != nil { 38 if !os.IsNotExist(err) { 39 log.Fatal("Error loading .env file") ··· 68 } 69 70 var config oauth.ClientConfig 71 + port := os.Getenv("PORT") 72 + if port == "" { 73 + port = defaultPort 74 + } 75 scopes := []string{"atproto", "transition:generic"} 76 if host == "" { 77 + host = fmt.Sprintf("http://127.0.0.1:%s", port) 78 config = oauth.NewLocalhostConfig( 79 + fmt.Sprintf("%s/oauth-callback", host), 80 scopes, 81 ) 82 slog.Info("configuring localhost OAuth client", "CallbackURL", config.CallbackURL) ··· 89 } 90 oauthClient := oauth.NewClientApp(&config, db) 91 92 + server, err := statusphere.NewServer(host, port, db, oauthClient, httpClient) 93 if err != nil { 94 slog.Error("create new server", "error", err) 95 return
+16 -1
consumer.go
··· 47 return fmt.Errorf("failed to create client: %w", err) 48 } 49 50 - cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 51 52 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 53 return fmt.Errorf("connect and read: %w", err) ··· 59 60 type HandlerStore interface { 61 CreateStatus(status Status) error 62 } 63 64 type handler struct { ··· 69 if event.Commit == nil { 70 return nil 71 } 72 73 switch event.Commit.Operation { 74 case models.CommitOperationCreate:
··· 47 return fmt.Errorf("failed to create client: %w", err) 48 } 49 50 + cursor, err := c.handler.store.GetCursor(ctx) 51 + // if cursor can't be fetched, just start from a couple days ago. 52 + if err != nil || cursor == 0 { 53 + cursor = time.Now().Add(-time.Hour * 48).UnixMicro() 54 + } 55 + 56 + slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor) 57 58 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 59 return fmt.Errorf("connect and read: %w", err) ··· 65 66 type HandlerStore interface { 67 CreateStatus(status Status) error 68 + SaveCursor(ctx context.Context, cursor int64) error 69 + GetCursor(ctx context.Context) (int64, error) 70 } 71 72 type handler struct { ··· 77 if event.Commit == nil { 78 return nil 79 } 80 + 81 + defer func() { 82 + err := h.store.SaveCursor(ctx, event.TimeUS) 83 + if err != nil { 84 + slog.Error("failed to save cursor", "error", err) 85 + } 86 + }() 87 88 switch event.Commit.Operation { 89 case models.CommitOperationCreate:
+5
database/database.go
··· 52 return nil, fmt.Errorf("creating profile table: %w", err) 53 } 54 55 return &DB{db: db}, nil 56 } 57
··· 52 return nil, fmt.Errorf("creating profile table: %w", err) 53 } 54 55 + err = createJetstreamTable(db) 56 + if err != nil { 57 + return nil, fmt.Errorf("creating jetstream table: %w", err) 58 + } 59 + 60 return &DB{db: db}, nil 61 } 62
+58
database/jetstream_cursor.go
···
··· 1 + package database 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "log/slog" 8 + ) 9 + 10 + func createJetstreamTable(db *sql.DB) error { 11 + createJetstreamTableSQL := `CREATE TABLE IF NOT EXISTS jetstream ( 12 + "id" integer NOT NULL PRIMARY KEY, 13 + "cursor" INTEGER, 14 + UNIQUE(id) 15 + );` 16 + 17 + slog.Info("Create jetstream table...") 18 + statement, err := db.Prepare(createJetstreamTableSQL) 19 + if err != nil { 20 + return fmt.Errorf("prepare DB statement to create jetstream table: %w", err) 21 + } 22 + _, err = statement.Exec() 23 + if err != nil { 24 + return fmt.Errorf("exec sql statement to create jetstream table: %w", err) 25 + } 26 + slog.Info("jetstream table created") 27 + 28 + return nil 29 + } 30 + 31 + func (d *DB) SaveCursor(ctx context.Context, cursor int64) error { 32 + sql := `INSERT INTO jetstream (id, cursor) VALUES (1, ?) ON CONFLICT(id) DO UPDATE SET cursor = ?;` 33 + _, err := d.db.Exec(sql, cursor, cursor) 34 + if err != nil { 35 + return fmt.Errorf("exec insert or update cursor: %w", err) 36 + } 37 + 38 + return nil 39 + } 40 + 41 + func (d *DB) GetCursor(ctx context.Context) (int64, error) { 42 + sql := "SELECT cursor FROM jetstream where id = 1;" 43 + rows, err := d.db.Query(sql) 44 + if err != nil { 45 + return 0, fmt.Errorf("run query to get cursor: %w", err) 46 + } 47 + defer rows.Close() 48 + 49 + cursor := 0 50 + for rows.Next() { 51 + if err := rows.Scan(&cursor); err != nil { 52 + return 0, fmt.Errorf("scan row: %w", err) 53 + } 54 + 55 + return int64(cursor), nil 56 + } 57 + return 0, fmt.Errorf("not found") 58 + }
-2
database/oauth_sessions.go
··· 48 return fmt.Errorf("marshalling scopes: %w", err) 49 } 50 51 - slog.Info("session to save", "did", sess.AccountDID.String(), "session id", sess.SessionID) 52 - 53 sql := `INSERT INTO oauthsessions (accountDID, sessionID, hostURL, authServerURL, authServerTokenEndpoint, scopes, accessToken, refreshToken, dpopAuthServerNonce, dpopHostNonce, dpopPrivateKeyMultibase) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(accountDID) DO NOTHING;` // TODO: update on conflict 54 _, err = d.db.Exec(sql, sess.AccountDID.String(), sess.SessionID, sess.HostURL, sess.AuthServerURL, sess.AuthServerTokenEndpoint, string(scopes), sess.AccessToken, sess.RefreshToken, sess.DPoPAuthServerNonce, sess.DPoPHostNonce, sess.DPoPPrivateKeyMultibase) 55 if err != nil {
··· 48 return fmt.Errorf("marshalling scopes: %w", err) 49 } 50 51 sql := `INSERT INTO oauthsessions (accountDID, sessionID, hostURL, authServerURL, authServerTokenEndpoint, scopes, accessToken, refreshToken, dpopAuthServerNonce, dpopHostNonce, dpopPrivateKeyMultibase) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(accountDID) DO NOTHING;` // TODO: update on conflict 52 _, err = d.db.Exec(sql, sess.AccountDID.String(), sess.SessionID, sess.HostURL, sess.AuthServerURL, sess.AuthServerTokenEndpoint, string(scopes), sess.AccessToken, sess.RefreshToken, sess.DPoPAuthServerNonce, sess.DPoPHostNonce, sess.DPoPPrivateKeyMultibase) 53 if err != nil {
+12
docker-compose.yaml
···
··· 1 + services: 2 + statusphere-go: 3 + platform: linux/amd64 4 + container_name: statusphere-go 5 + image: willdot/statusphere-go:latest 6 + environment: 7 + ENV_LOCATION: "/app/data/statusphere-go.env" 8 + volumes: 9 + - ./data:/app/data 10 + ports: 11 + - "3005:3005" 12 + restart: always
-2
home_handler.go
··· 114 return 115 } 116 117 - slog.Info("session", "did", did.String(), "session id", sessionID) 118 - 119 oauthSess, err := s.oauthClient.ResumeSession(r.Context(), *did, sessionID) 120 if err != nil { 121 http.Error(w, "not authenticated", http.StatusUnauthorized)
··· 114 return 115 } 116 117 oauthSess, err := s.oauthClient.ResumeSession(r.Context(), *did, sessionID) 118 if err != nil { 119 http.Error(w, "not authenticated", http.StatusUnauthorized)
-2
readme.md
··· 1 ## Statusphere Go 2 3 - 4 - 5 This is an implementation of the example [ATProto application Statusphere](https://atproto.com/guides/applications) but in Go. 6 7 ### What is the Statusphere app?
··· 1 ## Statusphere Go 2 3 This is an implementation of the example [ATProto application Statusphere](https://atproto.com/guides/applications) but in Go. 4 5 ### What is the Statusphere app?
+11 -7
server.go
··· 2 3 import ( 4 "context" 5 - _ "embed" 6 "encoding/json" 7 "errors" 8 "fmt" ··· 44 httpClient *http.Client 45 } 46 47 - func NewServer(host string, port int, store Store, oauthClient *oauth.ClientApp, httpClient *http.Client) (*Server, error) { 48 sessionStore := sessions.NewCookieStore([]byte(os.Getenv("SESSION_KEY"))) 49 50 - homeTemplate, err := template.ParseFiles("./html/home.html") 51 if err != nil { 52 - return nil, fmt.Errorf("parsing home template: %w", err) 53 } 54 - loginTemplate, err := template.ParseFiles("./html/login.html") 55 if err != nil { 56 return nil, fmt.Errorf("parsing login template: %w", err) 57 } ··· 83 mux.HandleFunc("/oauth-client-metadata.json", srv.serveClientMetadata) 84 mux.HandleFunc("/oauth-callback", srv.handleOauthCallback) 85 86 - addr := fmt.Sprintf("0.0.0.0:%d", port) 87 srv.httpserver = &http.Server{ 88 Addr: addr, 89 Handler: mux, ··· 140 metadata.ClientName = &clientName 141 metadata.ClientURI = &s.host 142 if s.oauthClient.Config.IsConfidential() { 143 - jwksURI := fmt.Sprintf("%s/jwks.json", r.Host) 144 metadata.JWKSURI = &jwksURI 145 } 146
··· 2 3 import ( 4 "context" 5 + "embed" 6 "encoding/json" 7 "errors" 8 "fmt" ··· 44 httpClient *http.Client 45 } 46 47 + //go:embed html 48 + var htmlFolder embed.FS 49 + 50 + func NewServer(host string, port string, store Store, oauthClient *oauth.ClientApp, httpClient *http.Client) (*Server, error) { 51 sessionStore := sessions.NewCookieStore([]byte(os.Getenv("SESSION_KEY"))) 52 53 + homeTemplate, err := template.ParseFS(htmlFolder, "html/home.html") 54 if err != nil { 55 + return nil, fmt.Errorf("error parsing templates: %w", err) 56 } 57 + 58 + loginTemplate, err := template.ParseFS(htmlFolder, "html/login.html") 59 if err != nil { 60 return nil, fmt.Errorf("parsing login template: %w", err) 61 } ··· 87 mux.HandleFunc("/oauth-client-metadata.json", srv.serveClientMetadata) 88 mux.HandleFunc("/oauth-callback", srv.handleOauthCallback) 89 90 + addr := fmt.Sprintf("0.0.0.0:%s", port) 91 srv.httpserver = &http.Server{ 92 Addr: addr, 93 Handler: mux, ··· 144 metadata.ClientName = &clientName 145 metadata.ClientURI = &s.host 146 if s.oauthClient.Config.IsConfidential() { 147 + jwksURI := fmt.Sprintf("%s/jwks.json", s.host) 148 metadata.JWKSURI = &jwksURI 149 } 150