An implementation of the ATProto statusphere example app but in Go
1package statusphere
2
3import (
4 "context"
5 "encoding/json"
6
7 "fmt"
8 "log/slog"
9 "time"
10
11 "github.com/bluesky-social/jetstream/pkg/client"
12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
13 "github.com/bluesky-social/jetstream/pkg/models"
14)
15
16type consumer struct {
17 cfg *client.ClientConfig
18 handler handler
19 logger *slog.Logger
20}
21
22func NewConsumer(jsAddr string, logger *slog.Logger, store HandlerStore) *consumer {
23 cfg := client.DefaultClientConfig()
24 if jsAddr != "" {
25 cfg.WebsocketURL = jsAddr
26 }
27 cfg.WantedCollections = []string{
28 "xyz.statusphere.status",
29 }
30 cfg.WantedDids = []string{}
31
32 return &consumer{
33 cfg: cfg,
34 logger: logger,
35 handler: handler{
36 store: store,
37 },
38 }
39}
40
41func (c *consumer) Consume(ctx context.Context) error {
42 scheduler := sequential.NewScheduler("jetstream_localdev", c.logger, c.handler.HandleEvent)
43 defer scheduler.Shutdown()
44
45 client, err := client.NewClient(c.cfg, c.logger, scheduler)
46 if err != nil {
47 return fmt.Errorf("failed to create client: %w", err)
48 }
49
50 cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
51
52 if err := client.ConnectAndRead(ctx, &cursor); err != nil {
53 return fmt.Errorf("connect and read: %w", err)
54 }
55
56 slog.Info("stopping consume")
57 return nil
58}
59
60type HandlerStore interface {
61 CreateStatus(status Status) error
62}
63
64type handler struct {
65 store HandlerStore
66}
67
68func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
69 if event.Commit == nil {
70 return nil
71 }
72
73 switch event.Commit.Operation {
74 case models.CommitOperationCreate:
75 return h.handleCreateEvent(ctx, event)
76 default:
77 return nil
78 }
79}
80
81type StatusRecord struct {
82 Status string `json:"status"`
83 CreatedAt time.Time `json:"createdAt"`
84}
85
86func (h *handler) handleCreateEvent(_ context.Context, event *models.Event) error {
87 var statusRecord StatusRecord
88 if err := json.Unmarshal(event.Commit.Record, &statusRecord); err != nil {
89 slog.Error("unmarshal record", "error", err)
90 return nil
91 }
92
93 uri := fmt.Sprintf("at://%s/%s/%s", event.Did, event.Commit.Collection, event.Commit.RKey)
94
95 status := Status{
96 URI: uri,
97 Did: event.Did,
98 Status: statusRecord.Status,
99 CreatedAt: statusRecord.CreatedAt.UnixMilli(),
100 IndexedAt: time.Now().UnixMilli(),
101 }
102 err := h.store.CreateStatus(status)
103 if err != nil {
104 slog.Error("failed to store status", "error", err)
105 }
106
107 return nil
108}