Live video on the AT Protocol
1package spxrpc
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "strings"
11
12 "github.com/labstack/echo/v4"
13 "github.com/pion/webrtc/v4"
14 "github.com/streamplace/oatproxy/pkg/oatproxy"
15 "stream.place/streamplace/pkg/constants"
16 placestream "stream.place/streamplace/pkg/streamplace"
17)
18
19func (s *Server) handlePlaceStreamPlaybackGetPlaybackServer(ctx context.Context, stream string) (*placestream.PlaybackGetPlaybackServer_Output, error) {
20 addr := strings.TrimPrefix(s.cli.ServerDID(), "did:web:")
21 return &placestream.PlaybackGetPlaybackServer_Output{
22 Servers: []string{fmt.Sprintf("https://%s", addr)},
23 }, nil
24}
25
26func (s *Server) handlePlaceStreamPlaybackWhep(ctx context.Context, rendition string, streamer string, r io.Reader, _contentType string) (io.Reader, error) {
27 if alias, ok := s.aliases[streamer]; ok {
28 streamer = alias
29 }
30
31 if streamer == "" {
32 return nil, echo.NewHTTPError(http.StatusBadRequest, "streamer is required")
33 }
34 if rendition == "" {
35 return nil, echo.NewHTTPError(http.StatusBadRequest, "rendition is required")
36 }
37 viewer := ""
38 if !s.cli.WideOpen && !strings.HasPrefix(streamer, constants.DID_KEY_PREFIX) {
39 repo, err := s.ATSync.SyncBlueskyRepoCached(ctx, streamer)
40 if err != nil {
41 return nil, err
42 }
43 streamer = repo.DID
44 }
45 session, _ := oatproxy.GetOAuthSession(ctx)
46 if session != nil {
47 viewer = session.DID
48 } else {
49 svc := GetServiceAuth(ctx)
50 if svc != nil {
51 // this is a signed request from a peer node, allow them to see unpublished streams
52 viewer = streamer
53 }
54 }
55 body, err := io.ReadAll(r)
56 if err != nil {
57 return nil, echo.NewHTTPError(http.StatusBadRequest, "error reading body", err)
58 }
59 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
60 if streamer == viewer {
61 // this user gets sent right to the origin in case we're unpublished
62 origin, err := s.statefulDB.GetLatestBroadcastOriginForStreamer(streamer)
63 if err != nil {
64 return nil, echo.NewHTTPError(http.StatusInternalServerError, "error getting broadcast origin", err)
65 }
66 myDID := s.cli.ServerDID()
67 if origin != nil && origin.ServerDID != myDID {
68 data, err := s.ProxyServiceRequest(ctx, origin.ServerDID, "POST", "place.stream.playback.whep",
69 url.Values{"rendition": {rendition}, "streamer": {streamer}},
70 bytes.NewReader([]byte(offer.SDP)), _contentType)
71 if err != nil {
72 return nil, err
73 }
74 return bytes.NewReader(data), nil
75 }
76 }
77 answer, err := s.mm.WebRTCPlayback2(ctx, streamer, rendition, &offer, viewer)
78 if err != nil {
79 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("error playing back: %s", err.Error()), err)
80 }
81 return bytes.NewReader([]byte(answer.SDP)), nil
82}