An implementation of the ATProto statusphere example app but in Go
at test 2.4 kB view raw
1package statusphere 2 3import ( 4 "context" 5 "encoding/json" 6 7 "fmt" 8 "log/slog" 9 "time" 10 11 "github.com/bluesky-social/jetstream/pkg/client" 12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 13 "github.com/bluesky-social/jetstream/pkg/models" 14) 15 16type consumer struct { 17 cfg *client.ClientConfig 18 handler handler 19 logger *slog.Logger 20} 21 22func NewConsumer(jsAddr string, logger *slog.Logger, store HandlerStore) *consumer { 23 cfg := client.DefaultClientConfig() 24 if jsAddr != "" { 25 cfg.WebsocketURL = jsAddr 26 } 27 cfg.WantedCollections = []string{ 28 "xyz.statusphere.status", 29 } 30 cfg.WantedDids = []string{} 31 32 return &consumer{ 33 cfg: cfg, 34 logger: logger, 35 handler: handler{ 36 store: store, 37 }, 38 } 39} 40 41func (c *consumer) Consume(ctx context.Context) error { 42 scheduler := sequential.NewScheduler("jetstream_localdev", c.logger, c.handler.HandleEvent) 43 defer scheduler.Shutdown() 44 45 client, err := client.NewClient(c.cfg, c.logger, scheduler) 46 if err != nil { 47 return fmt.Errorf("failed to create client: %w", err) 48 } 49 50 cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 51 52 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 53 return fmt.Errorf("connect and read: %w", err) 54 } 55 56 slog.Info("stopping consume") 57 return nil 58} 59 60type HandlerStore interface { 61 CreateStatus(status Status) error 62} 63 64type handler struct { 65 store HandlerStore 66} 67 68func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { 69 if event.Commit == nil { 70 return nil 71 } 72 73 switch event.Commit.Operation { 74 case models.CommitOperationCreate: 75 return h.handleCreateEvent(ctx, event) 76 default: 77 return nil 78 } 79} 80 81type StatusRecord struct { 82 Status string `json:"status"` 83 CreatedAt time.Time `json:"createdAt"` 84} 85 86func (h *handler) handleCreateEvent(_ context.Context, event *models.Event) error { 87 var statusRecord StatusRecord 88 if err := json.Unmarshal(event.Commit.Record, &statusRecord); err != nil { 89 slog.Error("unmarshal record", "error", err) 90 return nil 91 } 92 93 uri := fmt.Sprintf("at://%s/%s/%s", event.Did, event.Commit.Collection, event.Commit.RKey) 94 95 status := Status{ 96 URI: uri, 97 Did: event.Did, 98 Status: statusRecord.Status, 99 CreatedAt: statusRecord.CreatedAt.UnixMilli(), 100 IndexedAt: time.Now().UnixMilli(), 101 } 102 err := h.store.CreateStatus(status) 103 if err != nil { 104 slog.Error("failed to store status", "error", err) 105 } 106 107 return nil 108}