Live video on the AT Protocol
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/multitesting 155 lines 4.2 kB view raw
1package spxrpc 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "net/http" 9 "strconv" 10 11 comatprototypes "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/carstore" 13 "github.com/bluesky-social/indigo/events" 14 "github.com/bluesky-social/indigo/repo" 15 "github.com/bluesky-social/indigo/util" 16 "github.com/gorilla/websocket" 17 "github.com/ipfs/go-cid" 18 cbor "github.com/ipfs/go-ipld-cbor" 19 "github.com/ipld/go-car" 20 "github.com/labstack/echo/v4" 21 "stream.place/streamplace/pkg/atproto" 22 "stream.place/streamplace/pkg/log" 23) 24 25func (s *Server) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) { 26 active := true 27 return &comatprototypes.SyncListRepos_Output{ 28 Repos: []*comatprototypes.SyncListRepos_Repo{ 29 { 30 Did: atproto.LexiconRepo.RepoDid(), 31 Head: atproto.LexiconRepo.SignedCommit().Data.String(), 32 Rev: atproto.LexiconRepo.SignedCommit().Rev, 33 Active: &active, 34 }, 35 }, 36 }, nil 37} 38 39func (s *Server) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) { 40 _, robs, err := atproto.OpenLexiconRepo(ctx) 41 if err != nil { 42 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err) 43 } 44 45 bs := util.NewLoggingBstore(robs) 46 47 root, err := atproto.CarStore.GetUserRepoHead(ctx, atproto.RepoUser) 48 if err != nil { 49 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get user repo head: %w", err) 50 } 51 52 log.Warn(ctx, "got root", "root", root.String()) 53 54 r, err := repo.OpenRepo(ctx, bs, root) 55 if err != nil { 56 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err) 57 } 58 59 _, _, err = r.GetRecordBytes(ctx, collection+"/"+rkey) 60 if err != nil { 61 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get record bytes: %w", err) 62 } 63 64 blocks := bs.GetLoggedBlocks() 65 66 buf := new(bytes.Buffer) 67 hb, err := cbor.DumpObject(&car.CarHeader{ 68 Roots: []cid.Cid{root}, 69 Version: 1, 70 }) 71 if err != nil { 72 return nil, fmt.Errorf("failed to dump car header: %w", err) 73 } 74 if _, err := carstore.LdWrite(buf, hb); err != nil { 75 return nil, err 76 } 77 78 for _, blk := range blocks { 79 log.Warn(ctx, "writing block", "cid", blk.Cid().String(), "version", blk.Cid().Version()) 80 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 81 return nil, err 82 } 83 } 84 85 return bytes.NewReader(buf.Bytes()), nil 86} 87 88var upgrader = websocket.Upgrader{ 89 ReadBufferSize: 1024, 90 WriteBufferSize: 1024, 91 CheckOrigin: func(r *http.Request) bool { 92 return true 93 }, 94} 95 96func (s *Server) handleComAtprotoSyncSubscribeRepos(c echo.Context) error { 97 ctx := log.WithLogValues(c.Request().Context(), "client_ip", c.RealIP(), "user_agent", c.Request().UserAgent()) 98 cursor := c.QueryParam("cursor") 99 100 if cursor == "" { 101 cursor = "0" 102 } 103 104 seq, err := strconv.Atoi(cursor) 105 if err != nil { 106 return err 107 } 108 109 conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil) 110 if err != nil { 111 return err 112 } 113 114 evts, err := s.statefulDB.GetCommitEventsSinceSeq(atproto.LexiconRepo.RepoDid(), int64(seq)) 115 if err != nil { 116 return err 117 } 118 119 log.Log(ctx, "got com.atproto.sync.subscribeRepos", "cursor", c.QueryParam("cursor"), "eventCount", len(evts)) 120 121 header := events.EventHeader{Op: events.EvtKindMessage} 122 for _, evt := range evts { 123 commit, err := evt.ToCommitEvent() 124 if err != nil { 125 return err 126 } 127 128 wc, err := conn.NextWriter(websocket.BinaryMessage) 129 if err != nil { 130 return err 131 } 132 header.MsgType = "#commit" 133 134 if err := header.MarshalCBOR(wc); err != nil { 135 return fmt.Errorf("failed to write header: %w", err) 136 } 137 138 if err := commit.MarshalCBOR(wc); err != nil { 139 return fmt.Errorf("failed to write event: %w", err) 140 } 141 142 if err := wc.Close(); err != nil { 143 return fmt.Errorf("failed to flush-close our event write: %w", err) 144 } 145 } 146 147 // We don't have anything else to do but we'll keep the socket open until the client disconnects 148 for { 149 _, _, err = conn.ReadMessage() 150 if err != nil { 151 log.Log(c.Request().Context(), "client disconnected", "error", err) 152 return nil 153 } 154 } 155}