Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/changesets-2 152 lines 4.0 kB view raw
1package spxrpc 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "net/http" 9 "strconv" 10 11 comatprototypes "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/carstore" 13 "github.com/bluesky-social/indigo/events" 14 "github.com/bluesky-social/indigo/repo" 15 "github.com/bluesky-social/indigo/util" 16 "github.com/gorilla/websocket" 17 "github.com/ipfs/go-cid" 18 cbor "github.com/ipfs/go-ipld-cbor" 19 "github.com/ipld/go-car" 20 "github.com/labstack/echo/v4" 21 "stream.place/streamplace/pkg/atproto" 22 "stream.place/streamplace/pkg/log" 23) 24 25func (s *Server) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) { 26 active := true 27 return &comatprototypes.SyncListRepos_Output{ 28 Repos: []*comatprototypes.SyncListRepos_Repo{ 29 { 30 Did: atproto.LexiconRepo.RepoDid(), 31 Head: atproto.LexiconRepo.SignedCommit().Data.String(), 32 Rev: atproto.LexiconRepo.SignedCommit().Rev, 33 Active: &active, 34 }, 35 }, 36 }, nil 37} 38 39func (s *Server) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) { 40 _, robs, err := atproto.OpenLexiconRepo(ctx) 41 if err != nil { 42 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err) 43 } 44 45 bs := util.NewLoggingBstore(robs) 46 47 root, err := atproto.CarStore.GetUserRepoHead(ctx, atproto.RepoUser) 48 if err != nil { 49 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get user repo head: %w", err) 50 } 51 52 r, err := repo.OpenRepo(ctx, bs, root) 53 if err != nil { 54 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to open repo: %w", err) 55 } 56 57 _, _, err = r.GetRecordBytes(ctx, collection+"/"+rkey) 58 if err != nil { 59 return nil, fmt.Errorf("handleComAtprotoRepoGetRecord: failed to get record bytes: %w", err) 60 } 61 62 blocks := bs.GetLoggedBlocks() 63 64 buf := new(bytes.Buffer) 65 hb, err := cbor.DumpObject(&car.CarHeader{ 66 Roots: []cid.Cid{root}, 67 Version: 1, 68 }) 69 if err != nil { 70 return nil, fmt.Errorf("failed to dump car header: %w", err) 71 } 72 if _, err := carstore.LdWrite(buf, hb); err != nil { 73 return nil, err 74 } 75 76 for _, blk := range blocks { 77 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 78 return nil, err 79 } 80 } 81 82 return bytes.NewReader(buf.Bytes()), nil 83} 84 85var upgrader = websocket.Upgrader{ 86 ReadBufferSize: 1024, 87 WriteBufferSize: 1024, 88 CheckOrigin: func(r *http.Request) bool { 89 return true 90 }, 91} 92 93func (s *Server) handleComAtprotoSyncSubscribeRepos(c echo.Context) error { 94 ctx := log.WithLogValues(c.Request().Context(), "client_ip", c.RealIP(), "user_agent", c.Request().UserAgent()) 95 cursor := c.QueryParam("cursor") 96 97 if cursor == "" { 98 cursor = "0" 99 } 100 101 seq, err := strconv.Atoi(cursor) 102 if err != nil { 103 return err 104 } 105 106 conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil) 107 if err != nil { 108 return err 109 } 110 111 evts, err := s.model.GetCommitEventsSinceSeq(atproto.LexiconRepo.RepoDid(), int64(seq)) 112 if err != nil { 113 return err 114 } 115 116 log.Log(ctx, "got com.atproto.sync.subscribeRepos", "cursor", c.QueryParam("cursor"), "eventCount", len(evts)) 117 118 header := events.EventHeader{Op: events.EvtKindMessage} 119 for _, evt := range evts { 120 commit, err := evt.ToCommitEvent() 121 if err != nil { 122 return err 123 } 124 125 wc, err := conn.NextWriter(websocket.BinaryMessage) 126 if err != nil { 127 return err 128 } 129 header.MsgType = "#commit" 130 131 if err := header.MarshalCBOR(wc); err != nil { 132 return fmt.Errorf("failed to write header: %w", err) 133 } 134 135 if err := commit.MarshalCBOR(wc); err != nil { 136 return fmt.Errorf("failed to write event: %w", err) 137 } 138 139 if err := wc.Close(); err != nil { 140 return fmt.Errorf("failed to flush-close our event write: %w", err) 141 } 142 } 143 144 // We don't have anything else to do but we'll keep the socket open until the client disconnects 145 for { 146 _, _, err = conn.ReadMessage() 147 if err != nil { 148 log.Log(c.Request().Context(), "client disconnected", "error", err) 149 return nil 150 } 151 } 152}