An implementation of the ATProto statusphere example app but in Go
at main 2.9 kB view raw
1package statusphere 2 3import ( 4 "context" 5 "encoding/json" 6 7 "fmt" 8 "log/slog" 9 "time" 10 11 "github.com/bluesky-social/jetstream/pkg/client" 12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 13 "github.com/bluesky-social/jetstream/pkg/models" 14) 15 16type consumer struct { 17 cfg *client.ClientConfig 18 handler handler 19 logger *slog.Logger 20} 21 22func NewConsumer(jsAddr string, logger *slog.Logger, store HandlerStore) *consumer { 23 cfg := client.DefaultClientConfig() 24 if jsAddr != "" { 25 cfg.WebsocketURL = jsAddr 26 } 27 cfg.WantedCollections = []string{ 28 "xyz.statusphere.status", 29 } 30 cfg.WantedDids = []string{} 31 32 return &consumer{ 33 cfg: cfg, 34 logger: logger, 35 handler: handler{ 36 store: store, 37 }, 38 } 39} 40 41func (c *consumer) Consume(ctx context.Context) error { 42 scheduler := sequential.NewScheduler("jetstream_localdev", c.logger, c.handler.HandleEvent) 43 defer scheduler.Shutdown() 44 45 client, err := client.NewClient(c.cfg, c.logger, scheduler) 46 if err != nil { 47 return fmt.Errorf("failed to create client: %w", err) 48 } 49 50 cursor, err := c.handler.store.GetCursor(ctx) 51 // if cursor can't be fetched, just start from a couple days ago. 52 if err != nil || cursor == 0 { 53 cursor = time.Now().Add(-time.Hour * 48).UnixMicro() 54 } 55 56 slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor) 57 58 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 59 return fmt.Errorf("connect and read: %w", err) 60 } 61 62 slog.Info("stopping consume") 63 return nil 64} 65 66type HandlerStore interface { 67 CreateStatus(status Status) error 68 SaveCursor(ctx context.Context, cursor int64) error 69 GetCursor(ctx context.Context) (int64, error) 70} 71 72type handler struct { 73 store HandlerStore 74} 75 76func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { 77 if event.Commit == nil { 78 return nil 79 } 80 81 defer func() { 82 err := h.store.SaveCursor(ctx, event.TimeUS) 83 if err != nil { 84 slog.Error("failed to save cursor", "error", err) 85 } 86 }() 87 88 switch event.Commit.Operation { 89 case models.CommitOperationCreate: 90 return h.handleCreateEvent(ctx, event) 91 default: 92 return nil 93 } 94} 95 96type StatusRecord struct { 97 Status string `json:"status"` 98 CreatedAt time.Time `json:"createdAt"` 99} 100 101func (h *handler) handleCreateEvent(_ context.Context, event *models.Event) error { 102 var statusRecord StatusRecord 103 if err := json.Unmarshal(event.Commit.Record, &statusRecord); err != nil { 104 slog.Error("unmarshal record", "error", err) 105 return nil 106 } 107 108 uri := fmt.Sprintf("at://%s/%s/%s", event.Did, event.Commit.Collection, event.Commit.RKey) 109 110 status := Status{ 111 URI: uri, 112 Did: event.Did, 113 Status: statusRecord.Status, 114 CreatedAt: statusRecord.CreatedAt.UnixMilli(), 115 IndexedAt: time.Now().UnixMilli(), 116 } 117 err := h.store.CreateStatus(status) 118 if err != nil { 119 slog.Error("failed to store status", "error", err) 120 } 121 122 return nil 123}