A URL shortener service that uses ATProto to allow self hosting and ensuring the user owns their data
1package atshorter
2
3import (
4 "context"
5 "encoding/json"
6
7 "fmt"
8 "log/slog"
9 "time"
10
11 "github.com/bluesky-social/jetstream/pkg/client"
12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
13 "github.com/bluesky-social/jetstream/pkg/models"
14)
15
16type consumer struct {
17 cfg *client.ClientConfig
18 handler handler
19 logger *slog.Logger
20}
21
22func NewConsumer(jsAddr string, logger *slog.Logger, store HandlerStore, did string) *consumer {
23 cfg := client.DefaultClientConfig()
24 if jsAddr != "" {
25 cfg.WebsocketURL = jsAddr
26 }
27 cfg.WantedCollections = []string{
28 "com.atshorter.shorturl",
29 }
30 cfg.WantedDids = []string{did}
31
32 return &consumer{
33 cfg: cfg,
34 logger: logger,
35 handler: handler{
36 store: store,
37 did: did,
38 },
39 }
40}
41
42func (c *consumer) Consume(ctx context.Context) error {
43 scheduler := sequential.NewScheduler("jetstream_at_shorter_url", c.logger, c.handler.HandleEvent)
44 defer scheduler.Shutdown()
45
46 client, err := client.NewClient(c.cfg, c.logger, scheduler)
47 if err != nil {
48 return fmt.Errorf("failed to create client: %w", err)
49 }
50
51 cursor, err := c.handler.store.GetCursor(ctx, c.handler.did)
52 // if error or not found set to be around the time this app was create so that it starts from the begining
53 // of when the type of records were first created
54 if err != nil || cursor == 0 {
55 cursor = time.Date(2025, time.October, 5, 12, 0, 0, 0, time.UTC).UnixMicro()
56 }
57
58 slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor)
59
60 if err := client.ConnectAndRead(ctx, &cursor); err != nil {
61 return fmt.Errorf("connect and read: %w", err)
62 }
63
64 slog.Info("stopping consume")
65 return nil
66}
67
68type HandlerStore interface {
69 CreateURL(id, url, did, originHost string, createdAt int64) error
70 DeleteURL(id, did string) error
71 SaveCursor(ctx context.Context, did string, cursor int64) error
72 GetCursor(ctx context.Context, did string) (int64, error)
73}
74
75type handler struct {
76 store HandlerStore
77 did string
78}
79
80func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
81 if event == nil {
82 return nil
83 }
84
85 defer func() {
86 err := h.store.SaveCursor(ctx, h.did, event.TimeUS)
87 if err != nil {
88 slog.Error("failed to save cursor", "error", err)
89 }
90 }()
91 if event.Commit == nil {
92 return nil
93 }
94
95 slog.Info("handle event")
96
97 switch event.Commit.Operation {
98 case models.CommitOperationCreate:
99 return h.handleCreateEvent(ctx, event)
100 case models.CommitOperationDelete:
101 return h.handleDeleteEvent(ctx, event)
102 default:
103 return nil
104 }
105}
106
107type ShortURLRecord struct {
108 URL string `json:"url"`
109 CreatedAt time.Time `json:"createdAt"`
110 Origin string `json:"origin"`
111}
112
113func (h *handler) handleCreateEvent(_ context.Context, event *models.Event) error {
114 var record ShortURLRecord
115 if err := json.Unmarshal(event.Commit.Record, &record); err != nil {
116 slog.Error("unmarshal record", "error", err)
117 return nil
118 }
119
120 err := h.store.CreateURL(event.Commit.RKey, record.URL, event.Did, record.Origin, record.CreatedAt.UnixMilli())
121 if err != nil {
122 // TODO: proper error handling in case this fails, we want to try again
123 slog.Error("failed to store short URL", "error", err)
124 }
125
126 return nil
127}
128
129func (h *handler) handleDeleteEvent(_ context.Context, event *models.Event) error {
130 err := h.store.DeleteURL(event.Commit.RKey, event.Did)
131 if err != nil {
132 // TODO: proper error handling in case this fails, we want to try again
133 slog.Error("failed to delete short URL from store", "error", err)
134 }
135
136 return nil
137}