Live video on the AT Protocol
1package spxrpc
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "net/http"
9 "strconv"
10
11 comatprototypes "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/events"
13 "github.com/gorilla/websocket"
14 "github.com/labstack/echo/v4"
15 "stream.place/streamplace/pkg/atproto"
16 "stream.place/streamplace/pkg/log"
17)
18
19func (s *Server) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) {
20 active := true
21 return &comatprototypes.SyncListRepos_Output{
22 Repos: []*comatprototypes.SyncListRepos_Repo{
23 {
24 Did: atproto.LexiconRepo.RepoDid(),
25 Head: atproto.LexiconRepo.SignedCommit().Data.String(),
26 Rev: atproto.LexiconRepo.SignedCommit().Rev,
27 Active: &active,
28 },
29 },
30 }, nil
31}
32
33func (s *Server) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) {
34 bs, err := atproto.LexiconRepoMerkleProof(ctx, collection, rkey)
35 if err != nil {
36 return nil, err
37 }
38 return bytes.NewReader(bs), nil
39}
40
41var upgrader = websocket.Upgrader{
42 ReadBufferSize: 1024,
43 WriteBufferSize: 1024,
44 CheckOrigin: func(r *http.Request) bool {
45 return true
46 },
47}
48
49func (s *Server) handleComAtprotoSyncSubscribeRepos(c echo.Context) error {
50 ctx := log.WithLogValues(c.Request().Context(), "client_ip", c.RealIP(), "user_agent", c.Request().UserAgent())
51 cursor := c.QueryParam("cursor")
52
53 if cursor == "" {
54 cursor = "0"
55 }
56
57 seq, err := strconv.Atoi(cursor)
58 if err != nil {
59 return err
60 }
61
62 conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
63 if err != nil {
64 return err
65 }
66
67 evts, err := s.statefulDB.GetCommitEventsSinceSeq(atproto.LexiconRepo.RepoDid(), int64(seq))
68 if err != nil {
69 return err
70 }
71
72 log.Log(ctx, "got com.atproto.sync.subscribeRepos", "cursor", c.QueryParam("cursor"), "eventCount", len(evts))
73
74 header := events.EventHeader{Op: events.EvtKindMessage}
75 for _, evt := range evts {
76 commit, err := evt.ToCommitEvent()
77 if err != nil {
78 return err
79 }
80
81 wc, err := conn.NextWriter(websocket.BinaryMessage)
82 if err != nil {
83 return err
84 }
85 header.MsgType = "#commit"
86
87 if err := header.MarshalCBOR(wc); err != nil {
88 return fmt.Errorf("failed to write header: %w", err)
89 }
90
91 if err := commit.MarshalCBOR(wc); err != nil {
92 return fmt.Errorf("failed to write event: %w", err)
93 }
94
95 if err := wc.Close(); err != nil {
96 return fmt.Errorf("failed to flush-close our event write: %w", err)
97 }
98 }
99
100 // We don't have anything else to do but we'll keep the socket open until the client disconnects
101 for {
102 _, _, err = conn.ReadMessage()
103 if err != nil {
104 log.Log(c.Request().Context(), "client disconnected", "error", err)
105 return nil
106 }
107 }
108}