A URL shortener service that uses ATProto to allow self hosting and ensuring the user owns their data
at main 3.5 kB view raw
1package atshorter 2 3import ( 4 "context" 5 "encoding/json" 6 7 "fmt" 8 "log/slog" 9 "time" 10 11 "github.com/bluesky-social/jetstream/pkg/client" 12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 13 "github.com/bluesky-social/jetstream/pkg/models" 14) 15 16type consumer struct { 17 cfg *client.ClientConfig 18 handler handler 19 logger *slog.Logger 20} 21 22func NewConsumer(jsAddr string, logger *slog.Logger, store HandlerStore, did string) *consumer { 23 cfg := client.DefaultClientConfig() 24 if jsAddr != "" { 25 cfg.WebsocketURL = jsAddr 26 } 27 cfg.WantedCollections = []string{ 28 "com.atshorter.shorturl", 29 } 30 cfg.WantedDids = []string{did} 31 32 return &consumer{ 33 cfg: cfg, 34 logger: logger, 35 handler: handler{ 36 store: store, 37 did: did, 38 }, 39 } 40} 41 42func (c *consumer) Consume(ctx context.Context) error { 43 scheduler := sequential.NewScheduler("jetstream_at_shorter_url", c.logger, c.handler.HandleEvent) 44 defer scheduler.Shutdown() 45 46 client, err := client.NewClient(c.cfg, c.logger, scheduler) 47 if err != nil { 48 return fmt.Errorf("failed to create client: %w", err) 49 } 50 51 cursor, err := c.handler.store.GetCursor(ctx, c.handler.did) 52 // if error or not found set to be around the time this app was create so that it starts from the begining 53 // of when the type of records were first created 54 if err != nil || cursor == 0 { 55 cursor = time.Date(2025, time.October, 5, 12, 0, 0, 0, time.UTC).UnixMicro() 56 } 57 58 slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor) 59 60 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 61 return fmt.Errorf("connect and read: %w", err) 62 } 63 64 slog.Info("stopping consume") 65 return nil 66} 67 68type HandlerStore interface { 69 CreateURL(id, url, did, originHost string, createdAt int64) error 70 DeleteURL(id, did string) error 71 SaveCursor(ctx context.Context, did string, cursor int64) error 72 GetCursor(ctx context.Context, did string) (int64, error) 73} 74 75type handler struct { 76 store HandlerStore 77 did string 78} 79 80func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { 81 if event == nil { 82 return nil 83 } 84 85 defer func() { 86 err := h.store.SaveCursor(ctx, h.did, event.TimeUS) 87 if err != nil { 88 slog.Error("failed to save cursor", "error", err) 89 } 90 }() 91 if event.Commit == nil { 92 return nil 93 } 94 95 slog.Info("handle event") 96 97 switch event.Commit.Operation { 98 case models.CommitOperationCreate: 99 return h.handleCreateEvent(ctx, event) 100 case models.CommitOperationDelete: 101 return h.handleDeleteEvent(ctx, event) 102 default: 103 return nil 104 } 105} 106 107type ShortURLRecord struct { 108 URL string `json:"url"` 109 CreatedAt time.Time `json:"createdAt"` 110 Origin string `json:"origin"` 111} 112 113func (h *handler) handleCreateEvent(_ context.Context, event *models.Event) error { 114 var record ShortURLRecord 115 if err := json.Unmarshal(event.Commit.Record, &record); err != nil { 116 slog.Error("unmarshal record", "error", err) 117 return nil 118 } 119 120 err := h.store.CreateURL(event.Commit.RKey, record.URL, event.Did, record.Origin, record.CreatedAt.UnixMilli()) 121 if err != nil { 122 // TODO: proper error handling in case this fails, we want to try again 123 slog.Error("failed to store short URL", "error", err) 124 } 125 126 return nil 127} 128 129func (h *handler) handleDeleteEvent(_ context.Context, event *models.Event) error { 130 err := h.store.DeleteURL(event.Commit.RKey, event.Did) 131 if err != nil { 132 // TODO: proper error handling in case this fails, we want to try again 133 slog.Error("failed to delete short URL from store", "error", err) 134 } 135 136 return nil 137}