+19
Dockerfile
+19
Dockerfile
···
1
+
FROM golang:alpine AS builder
2
+
3
+
WORKDIR /app
4
+
5
+
COPY . .
6
+
RUN go mod download
7
+
8
+
COPY . .
9
+
10
+
RUN CGO_ENABLED=0 go build -o statusphere-go ./cmd/main.go
11
+
12
+
FROM alpine:latest
13
+
14
+
RUN apk --no-cache add ca-certificates
15
+
16
+
WORKDIR /app/
17
+
COPY --from=builder /app/statusphere-go .
18
+
19
+
ENTRYPOINT ["./statusphere-go"]
+14
-4
cmd/main.go
+14
-4
cmd/main.go
···
24
24
defaultServerAddr = "wss://jetstream.atproto.tools/subscribe"
25
25
httpClientTimeoutDuration = time.Second * 5
26
26
transportIdleConnTimeoutDuration = time.Second * 90
27
+
defaultPort = "8080"
27
28
)
28
29
29
30
func main() {
30
-
err := godotenv.Load(".env")
31
+
envLocation := os.Getenv("ENV_LOCATION")
32
+
if envLocation == "" {
33
+
envLocation = ".env"
34
+
}
35
+
36
+
err := godotenv.Load(envLocation)
31
37
if err != nil {
32
38
if !os.IsNotExist(err) {
33
39
log.Fatal("Error loading .env file")
···
62
68
}
63
69
64
70
var config oauth.ClientConfig
65
-
bind := ":8080"
71
+
port := os.Getenv("PORT")
72
+
if port == "" {
73
+
port = defaultPort
74
+
}
66
75
scopes := []string{"atproto", "transition:generic"}
67
76
if host == "" {
77
+
host = fmt.Sprintf("http://127.0.0.1:%s", port)
68
78
config = oauth.NewLocalhostConfig(
69
-
fmt.Sprintf("http://127.0.0.1%s/oauth-callback", bind),
79
+
fmt.Sprintf("%s/oauth-callback", host),
70
80
scopes,
71
81
)
72
82
slog.Info("configuring localhost OAuth client", "CallbackURL", config.CallbackURL)
···
79
89
}
80
90
oauthClient := oauth.NewClientApp(&config, db)
81
91
82
-
server, err := statusphere.NewServer(host, 8080, db, oauthClient, httpClient)
92
+
server, err := statusphere.NewServer(host, port, db, oauthClient, httpClient)
83
93
if err != nil {
84
94
slog.Error("create new server", "error", err)
85
95
return
+16
-1
consumer.go
+16
-1
consumer.go
···
47
47
return fmt.Errorf("failed to create client: %w", err)
48
48
}
49
49
50
-
cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
50
+
cursor, err := c.handler.store.GetCursor(ctx)
51
+
// if cursor can't be fetched, just start from a couple days ago.
52
+
if err != nil || cursor == 0 {
53
+
cursor = time.Now().Add(-time.Hour * 48).UnixMicro()
54
+
}
55
+
56
+
slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor)
51
57
52
58
if err := client.ConnectAndRead(ctx, &cursor); err != nil {
53
59
return fmt.Errorf("connect and read: %w", err)
···
59
65
60
66
type HandlerStore interface {
61
67
CreateStatus(status Status) error
68
+
SaveCursor(ctx context.Context, cursor int64) error
69
+
GetCursor(ctx context.Context) (int64, error)
62
70
}
63
71
64
72
type handler struct {
···
69
77
if event.Commit == nil {
70
78
return nil
71
79
}
80
+
81
+
defer func() {
82
+
err := h.store.SaveCursor(ctx, event.TimeUS)
83
+
if err != nil {
84
+
slog.Error("failed to save cursor", "error", err)
85
+
}
86
+
}()
72
87
73
88
switch event.Commit.Operation {
74
89
case models.CommitOperationCreate:
+5
database/database.go
+5
database/database.go
+58
database/jetstream_cursor.go
+58
database/jetstream_cursor.go
···
1
+
package database
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"fmt"
7
+
"log/slog"
8
+
)
9
+
10
+
func createJetstreamTable(db *sql.DB) error {
11
+
createJetstreamTableSQL := `CREATE TABLE IF NOT EXISTS jetstream (
12
+
"id" integer NOT NULL PRIMARY KEY,
13
+
"cursor" INTEGER,
14
+
UNIQUE(id)
15
+
);`
16
+
17
+
slog.Info("Create jetstream table...")
18
+
statement, err := db.Prepare(createJetstreamTableSQL)
19
+
if err != nil {
20
+
return fmt.Errorf("prepare DB statement to create jetstream table: %w", err)
21
+
}
22
+
_, err = statement.Exec()
23
+
if err != nil {
24
+
return fmt.Errorf("exec sql statement to create jetstream table: %w", err)
25
+
}
26
+
slog.Info("jetstream table created")
27
+
28
+
return nil
29
+
}
30
+
31
+
func (d *DB) SaveCursor(ctx context.Context, cursor int64) error {
32
+
sql := `INSERT INTO jetstream (id, cursor) VALUES (1, ?) ON CONFLICT(id) DO UPDATE SET cursor = ?;`
33
+
_, err := d.db.Exec(sql, cursor, cursor)
34
+
if err != nil {
35
+
return fmt.Errorf("exec insert or update cursor: %w", err)
36
+
}
37
+
38
+
return nil
39
+
}
40
+
41
+
func (d *DB) GetCursor(ctx context.Context) (int64, error) {
42
+
sql := "SELECT cursor FROM jetstream where id = 1;"
43
+
rows, err := d.db.Query(sql)
44
+
if err != nil {
45
+
return 0, fmt.Errorf("run query to get cursor: %w", err)
46
+
}
47
+
defer rows.Close()
48
+
49
+
cursor := 0
50
+
for rows.Next() {
51
+
if err := rows.Scan(&cursor); err != nil {
52
+
return 0, fmt.Errorf("scan row: %w", err)
53
+
}
54
+
55
+
return int64(cursor), nil
56
+
}
57
+
return 0, fmt.Errorf("not found")
58
+
}
-2
database/oauth_sessions.go
-2
database/oauth_sessions.go
···
48
48
return fmt.Errorf("marshalling scopes: %w", err)
49
49
}
50
50
51
-
slog.Info("session to save", "did", sess.AccountDID.String(), "session id", sess.SessionID)
52
-
53
51
sql := `INSERT INTO oauthsessions (accountDID, sessionID, hostURL, authServerURL, authServerTokenEndpoint, scopes, accessToken, refreshToken, dpopAuthServerNonce, dpopHostNonce, dpopPrivateKeyMultibase) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(accountDID) DO NOTHING;` // TODO: update on conflict
54
52
_, err = d.db.Exec(sql, sess.AccountDID.String(), sess.SessionID, sess.HostURL, sess.AuthServerURL, sess.AuthServerTokenEndpoint, string(scopes), sess.AccessToken, sess.RefreshToken, sess.DPoPAuthServerNonce, sess.DPoPHostNonce, sess.DPoPPrivateKeyMultibase)
55
53
if err != nil {
+12
docker-compose.yaml
+12
docker-compose.yaml
-2
home_handler.go
-2
home_handler.go
-2
readme.md
-2
readme.md
+11
-7
server.go
+11
-7
server.go
···
2
2
3
3
import (
4
4
"context"
5
-
_ "embed"
5
+
"embed"
6
6
"encoding/json"
7
7
"errors"
8
8
"fmt"
···
44
44
httpClient *http.Client
45
45
}
46
46
47
-
func NewServer(host string, port int, store Store, oauthClient *oauth.ClientApp, httpClient *http.Client) (*Server, error) {
47
+
//go:embed html
48
+
var htmlFolder embed.FS
49
+
50
+
func NewServer(host string, port string, store Store, oauthClient *oauth.ClientApp, httpClient *http.Client) (*Server, error) {
48
51
sessionStore := sessions.NewCookieStore([]byte(os.Getenv("SESSION_KEY")))
49
52
50
-
homeTemplate, err := template.ParseFiles("./html/home.html")
53
+
homeTemplate, err := template.ParseFS(htmlFolder, "html/home.html")
51
54
if err != nil {
52
-
return nil, fmt.Errorf("parsing home template: %w", err)
55
+
return nil, fmt.Errorf("error parsing templates: %w", err)
53
56
}
54
-
loginTemplate, err := template.ParseFiles("./html/login.html")
57
+
58
+
loginTemplate, err := template.ParseFS(htmlFolder, "html/login.html")
55
59
if err != nil {
56
60
return nil, fmt.Errorf("parsing login template: %w", err)
57
61
}
···
83
87
mux.HandleFunc("/oauth-client-metadata.json", srv.serveClientMetadata)
84
88
mux.HandleFunc("/oauth-callback", srv.handleOauthCallback)
85
89
86
-
addr := fmt.Sprintf("0.0.0.0:%d", port)
90
+
addr := fmt.Sprintf("0.0.0.0:%s", port)
87
91
srv.httpserver = &http.Server{
88
92
Addr: addr,
89
93
Handler: mux,
···
140
144
metadata.ClientName = &clientName
141
145
metadata.ClientURI = &s.host
142
146
if s.oauthClient.Config.IsConfidential() {
143
-
jwksURI := fmt.Sprintf("%s/jwks.json", r.Host)
147
+
jwksURI := fmt.Sprintf("%s/jwks.json", s.host)
144
148
metadata.JWKSURI = &jwksURI
145
149
}
146
150