Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/lexicon-dev 82 lines 2.7 kB view raw
1package spxrpc 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "strings" 11 12 "github.com/labstack/echo/v4" 13 "github.com/pion/webrtc/v4" 14 "github.com/streamplace/oatproxy/pkg/oatproxy" 15 "stream.place/streamplace/pkg/constants" 16 placestream "stream.place/streamplace/pkg/streamplace" 17) 18 19func (s *Server) handlePlaceStreamPlaybackGetPlaybackServer(ctx context.Context, stream string) (*placestream.PlaybackGetPlaybackServer_Output, error) { 20 addr := strings.TrimPrefix(s.cli.ServerDID(), "did:web:") 21 return &placestream.PlaybackGetPlaybackServer_Output{ 22 Servers: []string{fmt.Sprintf("https://%s", addr)}, 23 }, nil 24} 25 26func (s *Server) handlePlaceStreamPlaybackWhep(ctx context.Context, rendition string, streamer string, r io.Reader, _contentType string) (io.Reader, error) { 27 if alias, ok := s.aliases[streamer]; ok { 28 streamer = alias 29 } 30 31 if streamer == "" { 32 return nil, echo.NewHTTPError(http.StatusBadRequest, "streamer is required") 33 } 34 if rendition == "" { 35 return nil, echo.NewHTTPError(http.StatusBadRequest, "rendition is required") 36 } 37 viewer := "" 38 if !s.cli.WideOpen && !strings.HasPrefix(streamer, constants.DID_KEY_PREFIX) { 39 repo, err := s.ATSync.SyncBlueskyRepoCached(ctx, streamer) 40 if err != nil { 41 return nil, err 42 } 43 streamer = repo.DID 44 } 45 session, _ := oatproxy.GetOAuthSession(ctx) 46 if session != nil { 47 viewer = session.DID 48 } else { 49 svc := GetServiceAuth(ctx) 50 if svc != nil { 51 // this is a signed request from a peer node, allow them to see unpublished streams 52 viewer = streamer 53 } 54 } 55 body, err := io.ReadAll(r) 56 if err != nil { 57 return nil, echo.NewHTTPError(http.StatusBadRequest, "error reading body", err) 58 } 59 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 60 if streamer == viewer { 61 // this user gets sent right to the origin in case we're unpublished 62 origin, err := s.statefulDB.GetLatestBroadcastOriginForStreamer(streamer) 63 if err != nil { 64 return nil, echo.NewHTTPError(http.StatusInternalServerError, "error getting broadcast origin", err) 65 } 66 myDID := s.cli.ServerDID() 67 if origin != nil && origin.ServerDID != myDID { 68 data, err := s.ProxyServiceRequest(ctx, origin.ServerDID, "POST", "place.stream.playback.whep", 69 url.Values{"rendition": {rendition}, "streamer": {streamer}}, 70 bytes.NewReader([]byte(offer.SDP)), _contentType) 71 if err != nil { 72 return nil, err 73 } 74 return bytes.NewReader(data), nil 75 } 76 } 77 answer, err := s.mm.WebRTCPlayback2(ctx, streamer, rendition, &offer, viewer) 78 if err != nil { 79 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("error playing back: %s", err.Error()), err) 80 } 81 return bytes.NewReader([]byte(answer.SDP)), nil 82}