Live video on the AT Protocol
1package spxrpc
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "net/http"
9 "strconv"
10
11 comatprototypes "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/carstore"
13 "github.com/bluesky-social/indigo/events"
14 "github.com/bluesky-social/indigo/repo"
15 "github.com/bluesky-social/indigo/util"
16 "github.com/gorilla/websocket"
17 "github.com/ipfs/go-cid"
18 cbor "github.com/ipfs/go-ipld-cbor"
19 "github.com/ipld/go-car"
20 "github.com/labstack/echo/v4"
21 "stream.place/streamplace/pkg/atproto"
22 "stream.place/streamplace/pkg/log"
23)
24
25func (s *Server) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) {
26 active := true
27 return &comatprototypes.SyncListRepos_Output{
28 Repos: []*comatprototypes.SyncListRepos_Repo{
29 {
30 Did: atproto.LexiconRepo.RepoDid(),
31 Head: atproto.LexiconRepo.SignedCommit().Data.String(),
32 Rev: atproto.LexiconRepo.SignedCommit().Rev,
33 Active: &active,
34 },
35 },
36 }, nil
37}
38
39func (s *Server) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) {
40 _, robs, err := atproto.OpenLexiconRepo(ctx)
41 if err != nil {
42 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err)
43 }
44
45 bs := util.NewLoggingBstore(robs)
46
47 root, err := atproto.CarStore.GetUserRepoHead(ctx, atproto.RepoUser)
48 if err != nil {
49 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get user repo head: %w", err)
50 }
51
52 log.Warn(ctx, "got root", "root", root.String())
53
54 r, err := repo.OpenRepo(ctx, bs, root)
55 if err != nil {
56 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err)
57 }
58
59 _, _, err = r.GetRecordBytes(ctx, collection+"/"+rkey)
60 if err != nil {
61 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get record bytes: %w", err)
62 }
63
64 blocks := bs.GetLoggedBlocks()
65
66 buf := new(bytes.Buffer)
67 hb, err := cbor.DumpObject(&car.CarHeader{
68 Roots: []cid.Cid{root},
69 Version: 1,
70 })
71 if err != nil {
72 return nil, fmt.Errorf("failed to dump car header: %w", err)
73 }
74 if _, err := carstore.LdWrite(buf, hb); err != nil {
75 return nil, err
76 }
77
78 for _, blk := range blocks {
79 log.Warn(ctx, "writing block", "cid", blk.Cid().String(), "version", blk.Cid().Version())
80 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
81 return nil, err
82 }
83 }
84
85 return bytes.NewReader(buf.Bytes()), nil
86}
87
88var upgrader = websocket.Upgrader{
89 ReadBufferSize: 1024,
90 WriteBufferSize: 1024,
91 CheckOrigin: func(r *http.Request) bool {
92 return true
93 },
94}
95
96func (s *Server) handleComAtprotoSyncSubscribeRepos(c echo.Context) error {
97 ctx := log.WithLogValues(c.Request().Context(), "client_ip", c.RealIP(), "user_agent", c.Request().UserAgent())
98 cursor := c.QueryParam("cursor")
99
100 if cursor == "" {
101 cursor = "0"
102 }
103
104 seq, err := strconv.Atoi(cursor)
105 if err != nil {
106 return err
107 }
108
109 conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
110 if err != nil {
111 return err
112 }
113
114 evts, err := s.statefulDB.GetCommitEventsSinceSeq(atproto.LexiconRepo.RepoDid(), int64(seq))
115 if err != nil {
116 return err
117 }
118
119 log.Log(ctx, "got com.atproto.sync.subscribeRepos", "cursor", c.QueryParam("cursor"), "eventCount", len(evts))
120
121 header := events.EventHeader{Op: events.EvtKindMessage}
122 for _, evt := range evts {
123 commit, err := evt.ToCommitEvent()
124 if err != nil {
125 return err
126 }
127
128 wc, err := conn.NextWriter(websocket.BinaryMessage)
129 if err != nil {
130 return err
131 }
132 header.MsgType = "#commit"
133
134 if err := header.MarshalCBOR(wc); err != nil {
135 return fmt.Errorf("failed to write header: %w", err)
136 }
137
138 if err := commit.MarshalCBOR(wc); err != nil {
139 return fmt.Errorf("failed to write event: %w", err)
140 }
141
142 if err := wc.Close(); err != nil {
143 return fmt.Errorf("failed to flush-close our event write: %w", err)
144 }
145 }
146
147 // We don't have anything else to do but we'll keep the socket open until the client disconnects
148 for {
149 _, _, err = conn.ReadMessage()
150 if err != nil {
151 log.Log(c.Request().Context(), "client disconnected", "error", err)
152 return nil
153 }
154 }
155}