+19
Dockerfile
+19
Dockerfile
···
···
1
+
FROM golang:alpine AS builder
2
+
3
+
WORKDIR /app
4
+
5
+
COPY . .
6
+
RUN go mod download
7
+
8
+
COPY . .
9
+
10
+
RUN CGO_ENABLED=0 go build -o statusphere-go ./cmd/main.go
11
+
12
+
FROM alpine:latest
13
+
14
+
RUN apk --no-cache add ca-certificates
15
+
16
+
WORKDIR /app/
17
+
COPY --from=builder /app/statusphere-go .
18
+
19
+
ENTRYPOINT ["./statusphere-go"]
+14
-4
cmd/main.go
+14
-4
cmd/main.go
···
24
defaultServerAddr = "wss://jetstream.atproto.tools/subscribe"
25
httpClientTimeoutDuration = time.Second * 5
26
transportIdleConnTimeoutDuration = time.Second * 90
27
)
28
29
func main() {
30
-
err := godotenv.Load(".env")
31
if err != nil {
32
if !os.IsNotExist(err) {
33
log.Fatal("Error loading .env file")
···
62
}
63
64
var config oauth.ClientConfig
65
-
bind := ":8080"
66
scopes := []string{"atproto", "transition:generic"}
67
if host == "" {
68
config = oauth.NewLocalhostConfig(
69
-
fmt.Sprintf("http://127.0.0.1%s/oauth-callback", bind),
70
scopes,
71
)
72
slog.Info("configuring localhost OAuth client", "CallbackURL", config.CallbackURL)
···
79
}
80
oauthClient := oauth.NewClientApp(&config, db)
81
82
-
server, err := statusphere.NewServer(host, 8080, db, oauthClient, httpClient)
83
if err != nil {
84
slog.Error("create new server", "error", err)
85
return
···
24
defaultServerAddr = "wss://jetstream.atproto.tools/subscribe"
25
httpClientTimeoutDuration = time.Second * 5
26
transportIdleConnTimeoutDuration = time.Second * 90
27
+
defaultPort = "8080"
28
)
29
30
func main() {
31
+
envLocation := os.Getenv("ENV_LOCATION")
32
+
if envLocation == "" {
33
+
envLocation = ".env"
34
+
}
35
+
36
+
err := godotenv.Load(envLocation)
37
if err != nil {
38
if !os.IsNotExist(err) {
39
log.Fatal("Error loading .env file")
···
68
}
69
70
var config oauth.ClientConfig
71
+
port := os.Getenv("PORT")
72
+
if port == "" {
73
+
port = defaultPort
74
+
}
75
scopes := []string{"atproto", "transition:generic"}
76
if host == "" {
77
+
host = fmt.Sprintf("http://127.0.0.1:%s", port)
78
config = oauth.NewLocalhostConfig(
79
+
fmt.Sprintf("%s/oauth-callback", host),
80
scopes,
81
)
82
slog.Info("configuring localhost OAuth client", "CallbackURL", config.CallbackURL)
···
89
}
90
oauthClient := oauth.NewClientApp(&config, db)
91
92
+
server, err := statusphere.NewServer(host, port, db, oauthClient, httpClient)
93
if err != nil {
94
slog.Error("create new server", "error", err)
95
return
+16
-1
consumer.go
+16
-1
consumer.go
···
47
return fmt.Errorf("failed to create client: %w", err)
48
}
49
50
-
cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
51
52
if err := client.ConnectAndRead(ctx, &cursor); err != nil {
53
return fmt.Errorf("connect and read: %w", err)
···
59
60
type HandlerStore interface {
61
CreateStatus(status Status) error
62
}
63
64
type handler struct {
···
69
if event.Commit == nil {
70
return nil
71
}
72
73
switch event.Commit.Operation {
74
case models.CommitOperationCreate:
···
47
return fmt.Errorf("failed to create client: %w", err)
48
}
49
50
+
cursor, err := c.handler.store.GetCursor(ctx)
51
+
// if cursor can't be fetched, just start from a couple days ago.
52
+
if err != nil || cursor == 0 {
53
+
cursor = time.Now().Add(-time.Hour * 48).UnixMicro()
54
+
}
55
+
56
+
slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor)
57
58
if err := client.ConnectAndRead(ctx, &cursor); err != nil {
59
return fmt.Errorf("connect and read: %w", err)
···
65
66
type HandlerStore interface {
67
CreateStatus(status Status) error
68
+
SaveCursor(ctx context.Context, cursor int64) error
69
+
GetCursor(ctx context.Context) (int64, error)
70
}
71
72
type handler struct {
···
77
if event.Commit == nil {
78
return nil
79
}
80
+
81
+
defer func() {
82
+
err := h.store.SaveCursor(ctx, event.TimeUS)
83
+
if err != nil {
84
+
slog.Error("failed to save cursor", "error", err)
85
+
}
86
+
}()
87
88
switch event.Commit.Operation {
89
case models.CommitOperationCreate:
+5
database/database.go
+5
database/database.go
+58
database/jetstream_cursor.go
+58
database/jetstream_cursor.go
···
···
1
+
package database
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"fmt"
7
+
"log/slog"
8
+
)
9
+
10
+
func createJetstreamTable(db *sql.DB) error {
11
+
createJetstreamTableSQL := `CREATE TABLE IF NOT EXISTS jetstream (
12
+
"id" integer NOT NULL PRIMARY KEY,
13
+
"cursor" INTEGER,
14
+
UNIQUE(id)
15
+
);`
16
+
17
+
slog.Info("Create jetstream table...")
18
+
statement, err := db.Prepare(createJetstreamTableSQL)
19
+
if err != nil {
20
+
return fmt.Errorf("prepare DB statement to create jetstream table: %w", err)
21
+
}
22
+
_, err = statement.Exec()
23
+
if err != nil {
24
+
return fmt.Errorf("exec sql statement to create jetstream table: %w", err)
25
+
}
26
+
slog.Info("jetstream table created")
27
+
28
+
return nil
29
+
}
30
+
31
+
func (d *DB) SaveCursor(ctx context.Context, cursor int64) error {
32
+
sql := `INSERT INTO jetstream (id, cursor) VALUES (1, ?) ON CONFLICT(id) DO UPDATE SET cursor = ?;`
33
+
_, err := d.db.Exec(sql, cursor, cursor)
34
+
if err != nil {
35
+
return fmt.Errorf("exec insert or update cursor: %w", err)
36
+
}
37
+
38
+
return nil
39
+
}
40
+
41
+
func (d *DB) GetCursor(ctx context.Context) (int64, error) {
42
+
sql := "SELECT cursor FROM jetstream where id = 1;"
43
+
rows, err := d.db.Query(sql)
44
+
if err != nil {
45
+
return 0, fmt.Errorf("run query to get cursor: %w", err)
46
+
}
47
+
defer rows.Close()
48
+
49
+
cursor := 0
50
+
for rows.Next() {
51
+
if err := rows.Scan(&cursor); err != nil {
52
+
return 0, fmt.Errorf("scan row: %w", err)
53
+
}
54
+
55
+
return int64(cursor), nil
56
+
}
57
+
return 0, fmt.Errorf("not found")
58
+
}
-2
database/oauth_sessions.go
-2
database/oauth_sessions.go
···
48
return fmt.Errorf("marshalling scopes: %w", err)
49
}
50
51
-
slog.Info("session to save", "did", sess.AccountDID.String(), "session id", sess.SessionID)
52
-
53
sql := `INSERT INTO oauthsessions (accountDID, sessionID, hostURL, authServerURL, authServerTokenEndpoint, scopes, accessToken, refreshToken, dpopAuthServerNonce, dpopHostNonce, dpopPrivateKeyMultibase) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(accountDID) DO NOTHING;` // TODO: update on conflict
54
_, err = d.db.Exec(sql, sess.AccountDID.String(), sess.SessionID, sess.HostURL, sess.AuthServerURL, sess.AuthServerTokenEndpoint, string(scopes), sess.AccessToken, sess.RefreshToken, sess.DPoPAuthServerNonce, sess.DPoPHostNonce, sess.DPoPPrivateKeyMultibase)
55
if err != nil {
···
48
return fmt.Errorf("marshalling scopes: %w", err)
49
}
50
51
sql := `INSERT INTO oauthsessions (accountDID, sessionID, hostURL, authServerURL, authServerTokenEndpoint, scopes, accessToken, refreshToken, dpopAuthServerNonce, dpopHostNonce, dpopPrivateKeyMultibase) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(accountDID) DO NOTHING;` // TODO: update on conflict
52
_, err = d.db.Exec(sql, sess.AccountDID.String(), sess.SessionID, sess.HostURL, sess.AuthServerURL, sess.AuthServerTokenEndpoint, string(scopes), sess.AccessToken, sess.RefreshToken, sess.DPoPAuthServerNonce, sess.DPoPHostNonce, sess.DPoPPrivateKeyMultibase)
53
if err != nil {
+12
docker-compose.yaml
+12
docker-compose.yaml
-2
home_handler.go
-2
home_handler.go
-2
readme.md
-2
readme.md
+11
-7
server.go
+11
-7
server.go
···
2
3
import (
4
"context"
5
-
_ "embed"
6
"encoding/json"
7
"errors"
8
"fmt"
···
44
httpClient *http.Client
45
}
46
47
-
func NewServer(host string, port int, store Store, oauthClient *oauth.ClientApp, httpClient *http.Client) (*Server, error) {
48
sessionStore := sessions.NewCookieStore([]byte(os.Getenv("SESSION_KEY")))
49
50
-
homeTemplate, err := template.ParseFiles("./html/home.html")
51
if err != nil {
52
-
return nil, fmt.Errorf("parsing home template: %w", err)
53
}
54
-
loginTemplate, err := template.ParseFiles("./html/login.html")
55
if err != nil {
56
return nil, fmt.Errorf("parsing login template: %w", err)
57
}
···
83
mux.HandleFunc("/oauth-client-metadata.json", srv.serveClientMetadata)
84
mux.HandleFunc("/oauth-callback", srv.handleOauthCallback)
85
86
-
addr := fmt.Sprintf("0.0.0.0:%d", port)
87
srv.httpserver = &http.Server{
88
Addr: addr,
89
Handler: mux,
···
140
metadata.ClientName = &clientName
141
metadata.ClientURI = &s.host
142
if s.oauthClient.Config.IsConfidential() {
143
-
jwksURI := fmt.Sprintf("%s/jwks.json", r.Host)
144
metadata.JWKSURI = &jwksURI
145
}
146
···
2
3
import (
4
"context"
5
+
"embed"
6
"encoding/json"
7
"errors"
8
"fmt"
···
44
httpClient *http.Client
45
}
46
47
+
//go:embed html
48
+
var htmlFolder embed.FS
49
+
50
+
func NewServer(host string, port string, store Store, oauthClient *oauth.ClientApp, httpClient *http.Client) (*Server, error) {
51
sessionStore := sessions.NewCookieStore([]byte(os.Getenv("SESSION_KEY")))
52
53
+
homeTemplate, err := template.ParseFS(htmlFolder, "html/home.html")
54
if err != nil {
55
+
return nil, fmt.Errorf("error parsing templates: %w", err)
56
}
57
+
58
+
loginTemplate, err := template.ParseFS(htmlFolder, "html/login.html")
59
if err != nil {
60
return nil, fmt.Errorf("parsing login template: %w", err)
61
}
···
87
mux.HandleFunc("/oauth-client-metadata.json", srv.serveClientMetadata)
88
mux.HandleFunc("/oauth-callback", srv.handleOauthCallback)
89
90
+
addr := fmt.Sprintf("0.0.0.0:%s", port)
91
srv.httpserver = &http.Server{
92
Addr: addr,
93
Handler: mux,
···
144
metadata.ClientName = &clientName
145
metadata.ClientURI = &s.host
146
if s.oauthClient.Config.IsConfidential() {
147
+
jwksURI := fmt.Sprintf("%s/jwks.json", s.host)
148
metadata.JWKSURI = &jwksURI
149
}
150