Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/rtmprec-2 108 lines 2.7 kB view raw
1package spxrpc 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "net/http" 9 "strconv" 10 11 comatprototypes "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/events" 13 "github.com/gorilla/websocket" 14 "github.com/labstack/echo/v4" 15 "stream.place/streamplace/pkg/atproto" 16 "stream.place/streamplace/pkg/log" 17) 18 19func (s *Server) handleComAtprotoSyncListRepos(ctx context.Context, cursor string, limit int) (*comatprototypes.SyncListRepos_Output, error) { 20 active := true 21 return &comatprototypes.SyncListRepos_Output{ 22 Repos: []*comatprototypes.SyncListRepos_Repo{ 23 { 24 Did: atproto.LexiconRepo.RepoDid(), 25 Head: atproto.LexiconRepo.SignedCommit().Data.String(), 26 Rev: atproto.LexiconRepo.SignedCommit().Rev, 27 Active: &active, 28 }, 29 }, 30 }, nil 31} 32 33func (s *Server) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) { 34 bs, err := atproto.LexiconRepoMerkleProof(ctx, collection, rkey) 35 if err != nil { 36 return nil, err 37 } 38 return bytes.NewReader(bs), nil 39} 40 41var upgrader = websocket.Upgrader{ 42 ReadBufferSize: 1024, 43 WriteBufferSize: 1024, 44 CheckOrigin: func(r *http.Request) bool { 45 return true 46 }, 47} 48 49func (s *Server) handleComAtprotoSyncSubscribeRepos(c echo.Context) error { 50 ctx := log.WithLogValues(c.Request().Context(), "client_ip", c.RealIP(), "user_agent", c.Request().UserAgent()) 51 cursor := c.QueryParam("cursor") 52 53 if cursor == "" { 54 cursor = "0" 55 } 56 57 seq, err := strconv.Atoi(cursor) 58 if err != nil { 59 return err 60 } 61 62 conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil) 63 if err != nil { 64 return err 65 } 66 67 evts, err := s.statefulDB.GetCommitEventsSinceSeq(atproto.LexiconRepo.RepoDid(), int64(seq)) 68 if err != nil { 69 return err 70 } 71 72 log.Log(ctx, "got com.atproto.sync.subscribeRepos", "cursor", c.QueryParam("cursor"), "eventCount", len(evts)) 73 74 header := events.EventHeader{Op: events.EvtKindMessage} 75 for _, evt := range evts { 76 commit, err := evt.ToCommitEvent() 77 if err != nil { 78 return err 79 } 80 81 wc, err := conn.NextWriter(websocket.BinaryMessage) 82 if err != nil { 83 return err 84 } 85 header.MsgType = "#commit" 86 87 if err := header.MarshalCBOR(wc); err != nil { 88 return fmt.Errorf("failed to write header: %w", err) 89 } 90 91 if err := commit.MarshalCBOR(wc); err != nil { 92 return fmt.Errorf("failed to write event: %w", err) 93 } 94 95 if err := wc.Close(); err != nil { 96 return fmt.Errorf("failed to flush-close our event write: %w", err) 97 } 98 } 99 100 // We don't have anything else to do but we'll keep the socket open until the client disconnects 101 for { 102 _, _, err = conn.ReadMessage() 103 if err != nil { 104 log.Log(c.Request().Context(), "client disconnected", "error", err) 105 return nil 106 } 107 } 108}