An implementation of the ATProto statusphere example app but in Go
1package statusphere
2
3import (
4 "context"
5 "encoding/json"
6
7 "fmt"
8 "log/slog"
9 "time"
10
11 "github.com/bluesky-social/jetstream/pkg/client"
12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
13 "github.com/bluesky-social/jetstream/pkg/models"
14)
15
16type consumer struct {
17 cfg *client.ClientConfig
18 handler handler
19 logger *slog.Logger
20}
21
22func NewConsumer(jsAddr string, logger *slog.Logger, store HandlerStore) *consumer {
23 cfg := client.DefaultClientConfig()
24 if jsAddr != "" {
25 cfg.WebsocketURL = jsAddr
26 }
27 cfg.WantedCollections = []string{
28 "xyz.statusphere.status",
29 }
30 cfg.WantedDids = []string{}
31
32 return &consumer{
33 cfg: cfg,
34 logger: logger,
35 handler: handler{
36 store: store,
37 },
38 }
39}
40
41func (c *consumer) Consume(ctx context.Context) error {
42 scheduler := sequential.NewScheduler("jetstream_localdev", c.logger, c.handler.HandleEvent)
43 defer scheduler.Shutdown()
44
45 client, err := client.NewClient(c.cfg, c.logger, scheduler)
46 if err != nil {
47 return fmt.Errorf("failed to create client: %w", err)
48 }
49
50 cursor, err := c.handler.store.GetCursor(ctx)
51 // if cursor can't be fetched, just start from a couple days ago.
52 if err != nil || cursor == 0 {
53 cursor = time.Now().Add(-time.Hour * 48).UnixMicro()
54 }
55
56 slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor)
57
58 if err := client.ConnectAndRead(ctx, &cursor); err != nil {
59 return fmt.Errorf("connect and read: %w", err)
60 }
61
62 slog.Info("stopping consume")
63 return nil
64}
65
66type HandlerStore interface {
67 CreateStatus(status Status) error
68 SaveCursor(ctx context.Context, cursor int64) error
69 GetCursor(ctx context.Context) (int64, error)
70}
71
72type handler struct {
73 store HandlerStore
74}
75
76func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
77 if event.Commit == nil {
78 return nil
79 }
80
81 defer func() {
82 err := h.store.SaveCursor(ctx, event.TimeUS)
83 if err != nil {
84 slog.Error("failed to save cursor", "error", err)
85 }
86 }()
87
88 switch event.Commit.Operation {
89 case models.CommitOperationCreate:
90 return h.handleCreateEvent(ctx, event)
91 default:
92 return nil
93 }
94}
95
96type StatusRecord struct {
97 Status string `json:"status"`
98 CreatedAt time.Time `json:"createdAt"`
99}
100
101func (h *handler) handleCreateEvent(_ context.Context, event *models.Event) error {
102 var statusRecord StatusRecord
103 if err := json.Unmarshal(event.Commit.Record, &statusRecord); err != nil {
104 slog.Error("unmarshal record", "error", err)
105 return nil
106 }
107
108 uri := fmt.Sprintf("at://%s/%s/%s", event.Did, event.Commit.Collection, event.Commit.RKey)
109
110 status := Status{
111 URI: uri,
112 Did: event.Did,
113 Status: statusRecord.Status,
114 CreatedAt: statusRecord.CreatedAt.UnixMilli(),
115 IndexedAt: time.Now().UnixMilli(),
116 }
117 err := h.store.CreateStatus(status)
118 if err != nil {
119 slog.Error("failed to store status", "error", err)
120 }
121
122 return nil
123}