fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package spxrpc
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "net/http"
9 "strconv"
10
11 comatprototypes "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/carstore"
13 "github.com/bluesky-social/indigo/events"
14 "github.com/bluesky-social/indigo/repo"
15 "github.com/bluesky-social/indigo/util"
16 "github.com/gorilla/websocket"
17 "github.com/ipfs/go-cid"
18 cbor "github.com/ipfs/go-ipld-cbor"
19 "github.com/ipld/go-car"
20 "github.com/labstack/echo/v4"
21 "stream.place/streamplace/pkg/atproto"
22 "stream.place/streamplace/pkg/log"
23)
24
25func (s *Server) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) {
26 active := true
27 return &comatprototypes.SyncListRepos_Output{
28 Repos: []*comatprototypes.SyncListRepos_Repo{
29 {
30 Did: atproto.LexiconRepo.RepoDid(),
31 Head: atproto.LexiconRepo.SignedCommit().Data.String(),
32 Rev: atproto.LexiconRepo.SignedCommit().Rev,
33 Active: &active,
34 },
35 },
36 }, nil
37}
38
39func (s *Server) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) {
40 _, robs, err := atproto.OpenLexiconRepo(ctx)
41 if err != nil {
42 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err)
43 }
44
45 bs := util.NewLoggingBstore(robs)
46
47 root, err := atproto.CarStore.GetUserRepoHead(ctx, atproto.RepoUser)
48 if err != nil {
49 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get user repo head: %w", err)
50 }
51
52 log.Warn(ctx, "got root", "root", root.String())
53
54 r, err := repo.OpenRepo(ctx, bs, root)
55 if err != nil {
56 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err)
57 }
58
59 _, _, err = r.GetRecordBytes(ctx, collection+"/"+rkey)
60 if err != nil {
61 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get record bytes: %w", err)
62 }
63
64 blocks := bs.GetLoggedBlocks()
65
66 buf := new(bytes.Buffer)
67 hb, err := cbor.DumpObject(&car.CarHeader{
68 Roots: []cid.Cid{root},
69 Version: 1,
70 })
71 if err != nil {
72 return nil, fmt.Errorf("failed to dump car header: %w", err)
73 }
74 if _, err := carstore.LdWrite(buf, hb); err != nil {
75 return nil, err
76 }
77
78 for _, blk := range blocks {
79 log.Warn(ctx, "writing block", "cid", blk.Cid().String(), "version", blk.Cid().Version())
80 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
81 return nil, err
82 }
83 }
84
85 return bytes.NewReader(buf.Bytes()), nil
86}
87
88var upgrader = websocket.Upgrader{
89 ReadBufferSize: 1024,
90 WriteBufferSize: 1024,
91 CheckOrigin: func(r *http.Request) bool {
92 return true
93 },
94}
95
96func (s *Server) handleComAtprotoSyncSubscribeRepos(c echo.Context) error {
97 ctx := log.WithLogValues(c.Request().Context(), "client_ip", c.RealIP(), "user_agent", c.Request().UserAgent())
98 cursor := c.QueryParam("cursor")
99
100 if cursor == "" {
101 cursor = "0"
102 }
103
104 seq, err := strconv.Atoi(cursor)
105 if err != nil {
106 return err
107 }
108
109 conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
110 if err != nil {
111 return err
112 }
113
114 evts, err := s.statefulDB.GetCommitEventsSinceSeq(atproto.LexiconRepo.RepoDid(), int64(seq))
115 if err != nil {
116 return err
117 }
118
119 log.Log(ctx, "got com.atproto.sync.subscribeRepos", "cursor", c.QueryParam("cursor"), "eventCount", len(evts))
120
121 header := events.EventHeader{Op: events.EvtKindMessage}
122 for _, evt := range evts {
123 commit, err := evt.ToCommitEvent()
124 if err != nil {
125 return err
126 }
127
128 wc, err := conn.NextWriter(websocket.BinaryMessage)
129 if err != nil {
130 return err
131 }
132 header.MsgType = "#commit"
133
134 if err := header.MarshalCBOR(wc); err != nil {
135 return fmt.Errorf("failed to write header: %w", err)
136 }
137
138 if err := commit.MarshalCBOR(wc); err != nil {
139 return fmt.Errorf("failed to write event: %w", err)
140 }
141
142 if err := wc.Close(); err != nil {
143 return fmt.Errorf("failed to flush-close our event write: %w", err)
144 }
145 }
146
147 // We don't have anything else to do but we'll keep the socket open until the client disconnects
148 for {
149 _, _, err = conn.ReadMessage()
150 if err != nil {
151 log.Log(c.Request().Context(), "client disconnected", "error", err)
152 return nil
153 }
154 }
155}