Live video on the AT Protocol
1package spxrpc
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "net/http"
9 "strconv"
10
11 comatprototypes "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/carstore"
13 "github.com/bluesky-social/indigo/events"
14 "github.com/bluesky-social/indigo/repo"
15 "github.com/bluesky-social/indigo/util"
16 "github.com/gorilla/websocket"
17 "github.com/ipfs/go-cid"
18 cbor "github.com/ipfs/go-ipld-cbor"
19 "github.com/ipld/go-car"
20 "github.com/labstack/echo/v4"
21 "stream.place/streamplace/pkg/atproto"
22 "stream.place/streamplace/pkg/log"
23)
24
25func (s *Server) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) {
26 active := true
27 return &comatprototypes.SyncListRepos_Output{
28 Repos: []*comatprototypes.SyncListRepos_Repo{
29 {
30 Did: atproto.LexiconRepo.RepoDid(),
31 Head: atproto.LexiconRepo.SignedCommit().Data.String(),
32 Rev: atproto.LexiconRepo.SignedCommit().Rev,
33 Active: &active,
34 },
35 },
36 }, nil
37}
38
39func (s *Server) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) {
40 _, robs, err := atproto.OpenLexiconRepo(ctx)
41 if err != nil {
42 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err)
43 }
44
45 bs := util.NewLoggingBstore(robs)
46
47 root, err := atproto.CarStore.GetUserRepoHead(ctx, atproto.RepoUser)
48 if err != nil {
49 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get user repo head: %w", err)
50 }
51
52 r, err := repo.OpenRepo(ctx, bs, root)
53 if err != nil {
54 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err)
55 }
56
57 _, _, err = r.GetRecordBytes(ctx, collection+"/"+rkey)
58 if err != nil {
59 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get record bytes: %w", err)
60 }
61
62 blocks := bs.GetLoggedBlocks()
63
64 buf := new(bytes.Buffer)
65 hb, err := cbor.DumpObject(&car.CarHeader{
66 Roots: []cid.Cid{root},
67 Version: 1,
68 })
69 if err != nil {
70 return nil, fmt.Errorf("failed to dump car header: %w", err)
71 }
72 if _, err := carstore.LdWrite(buf, hb); err != nil {
73 return nil, err
74 }
75
76 for _, blk := range blocks {
77 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
78 return nil, err
79 }
80 }
81
82 return bytes.NewReader(buf.Bytes()), nil
83}
84
85var upgrader = websocket.Upgrader{
86 ReadBufferSize: 1024,
87 WriteBufferSize: 1024,
88 CheckOrigin: func(r *http.Request) bool {
89 return true
90 },
91}
92
93func (s *Server) handleComAtprotoSyncSubscribeRepos(c echo.Context) error {
94 ctx := log.WithLogValues(c.Request().Context(), "client_ip", c.RealIP(), "user_agent", c.Request().UserAgent())
95 cursor := c.QueryParam("cursor")
96
97 if cursor == "" {
98 cursor = "0"
99 }
100
101 seq, err := strconv.Atoi(cursor)
102 if err != nil {
103 return err
104 }
105
106 conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
107 if err != nil {
108 return err
109 }
110
111 evts, err := s.model.GetCommitEventsSinceSeq(atproto.LexiconRepo.RepoDid(), int64(seq))
112 if err != nil {
113 return err
114 }
115
116 log.Log(ctx, "got com.atproto.sync.subscribeRepos", "cursor", c.QueryParam("cursor"), "eventCount", len(evts))
117
118 header := events.EventHeader{Op: events.EvtKindMessage}
119 for _, evt := range evts {
120 commit, err := evt.ToCommitEvent()
121 if err != nil {
122 return err
123 }
124
125 wc, err := conn.NextWriter(websocket.BinaryMessage)
126 if err != nil {
127 return err
128 }
129 header.MsgType = "#commit"
130
131 if err := header.MarshalCBOR(wc); err != nil {
132 return fmt.Errorf("failed to write header: %w", err)
133 }
134
135 if err := commit.MarshalCBOR(wc); err != nil {
136 return fmt.Errorf("failed to write event: %w", err)
137 }
138
139 if err := wc.Close(); err != nil {
140 return fmt.Errorf("failed to flush-close our event write: %w", err)
141 }
142 }
143
144 // We don't have anything else to do but we'll keep the socket open until the client disconnects
145 for {
146 _, _, err = conn.ReadMessage()
147 if err != nil {
148 log.Log(c.Request().Context(), "client disconnected", "error", err)
149 return nil
150 }
151 }
152}