Live video on the AT Protocol
1package api 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "net/http" 9 "strings" 10 "time" 11 12 "github.com/julienschmidt/httprouter" 13 "github.com/pion/webrtc/v4" 14 "stream.place/streamplace/pkg/aqtime" 15 "stream.place/streamplace/pkg/constants" 16 "stream.place/streamplace/pkg/errors" 17 "stream.place/streamplace/pkg/log" 18 "stream.place/streamplace/pkg/spmetrics" 19) 20 21func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) { 22 alias, ok := a.Aliases[user] 23 if ok { 24 user = alias 25 } 26 // did:key, pass through unaltered 27 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) { 28 return user, nil 29 } 30 // only other allowed case is a bluesky handle 31 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model) 32 if err != nil { 33 return "", err 34 } 35 return repo.DID, nil 36} 37 38func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle { 39 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 40 user := p.ByName("user") 41 if user == "" { 42 errors.WriteHTTPBadRequest(w, "user required", nil) 43 return 44 } 45 rendition := getRendition(r) 46 user, err := a.NormalizeUser(ctx, user) 47 if err != nil { 48 errors.WriteHTTPBadRequest(w, "invalid user", err) 49 return 50 } 51 body, err := io.ReadAll(r.Body) 52 if err != nil { 53 errors.WriteHTTPBadRequest(w, "error reading body", err) 54 return 55 } 56 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 57 var answer *webrtc.SessionDescription 58 if a.CLI.NewWebRTCPlayback { 59 answer, err = a.MediaManager.WebRTCPlayback2(ctx, user, rendition, &offer) 60 } else { 61 answer, err = a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer) 62 } 63 if err != nil { 64 errors.WriteHTTPInternalServerError(w, "error playing back", err) 65 return 66 } 67 w.WriteHeader(201) 68 w.Header().Add("Location", r.URL.Path) 69 if _, err := w.Write([]byte(answer.SDP)); err != nil { 70 log.Error(ctx, "error writing response", "error", err) 71 } 72 } 73} 74 75const BearerPrefix = "Bearer " 76const KeyPrefix = "0x" 77 78func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle { 79 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 80 ct := r.Header.Get("Content-Type") 81 if ct != "application/sdp" { 82 errors.WriteHTTPBadRequest(w, "invalid content type", nil) 83 return 84 } 85 var encoded string 86 urlKey := p.ByName("key") 87 if urlKey != "" { 88 encoded = urlKey 89 } else { 90 auth := r.Header.Get("Authorization") 91 if auth == "" { 92 errors.WriteHTTPUnauthorized(w, "authorization header required", nil) 93 return 94 } 95 if !strings.HasPrefix(auth, BearerPrefix) { 96 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil) 97 return 98 } 99 encoded = auth[len(BearerPrefix):] 100 // it's easy to copy-paste a trailing or leading space, so clear those out 101 encoded = strings.TrimSpace(encoded) 102 } 103 104 mediaSigner, err := a.MakeMediaSigner(ctx, encoded) 105 if err != nil { 106 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 107 return 108 } 109 110 body, err := io.ReadAll(r.Body) 111 if err != nil { 112 errors.WriteHTTPBadRequest(w, "error reading body", err) 113 return 114 } 115 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 116 pc, err := a.MediaManager.NewPeerConnection(ctx, mediaSigner.Streamer()) 117 if err != nil { 118 errors.WriteHTTPInternalServerError(w, "unable to create peer connection", err) 119 return 120 } 121 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner, pc, make(chan struct{})) 122 if err != nil { 123 errors.WriteHTTPInternalServerError(w, "error playing back", err) 124 return 125 } 126 host := r.Host 127 if host == "" { 128 host = r.URL.Host 129 } 130 scheme := "http" 131 if r.TLS != nil { 132 scheme = "https" 133 } 134 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host) 135 log.Log(ctx, "location", "location", location) 136 w.Header().Set("Location", location) 137 w.WriteHeader(201) 138 if _, err := w.Write([]byte(answer.SDP)); err != nil { 139 log.Error(ctx, "error writing response", "error", err) 140 } 141 } 142} 143 144var epoch = time.Unix(0, 0).Format(time.RFC1123) 145 146var noCacheHeaders = map[string]string{ 147 "Expires": epoch, 148 "Cache-Control": "no-cache, private, max-age=0", 149 "Pragma": "no-cache", 150 "X-Accel-Expires": "0", 151} 152 153var etagHeaders = []string{ 154 "ETag", 155 "If-Modified-Since", 156 "If-Match", 157 "If-None-Match", 158 "If-Range", 159 "If-Unmodified-Since", 160} 161 162func NoCache(h httprouter.Handle) httprouter.Handle { 163 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 164 // Delete any ETag headers that may have been set 165 for _, v := range etagHeaders { 166 if r.Header.Get(v) != "" { 167 r.Header.Del(v) 168 } 169 } 170 171 // Set our NoCache headers 172 for k, v := range noCacheHeaders { 173 w.Header().Set(k, v) 174 } 175 176 h(w, r, p) 177 } 178} 179 180func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle { 181 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 182 user := p.ByName("user") 183 if user == "" { 184 errors.WriteHTTPBadRequest(w, "user required", nil) 185 return 186 } 187 user, err := a.NormalizeUser(ctx, user) 188 if err != nil { 189 errors.WriteHTTPBadRequest(w, "invalid user", err) 190 return 191 } 192 file := p.ByName("file") 193 if file == "" { 194 errors.WriteHTTPBadRequest(w, "file required", nil) 195 return 196 } 197 m3u8, err := a.Director.GetM3U8(ctx, user) 198 if err != nil { 199 errors.WriteHTTPNotFound(w, "could not get m3u8", err) 200 return 201 } 202 session := r.URL.Query().Get("session") 203 rendition := r.URL.Query().Get("rendition") 204 buf, err := m3u8.GetFile(file, session, rendition) 205 if err != nil { 206 errors.WriteHTTPNotFound(w, "segment not found", err) 207 return 208 } 209 210 if strings.HasSuffix(file, ".m3u8") { 211 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") 212 } else { 213 if session != "" { 214 spmetrics.SessionSeen(user, session) 215 } 216 w.Header().Set("Content-Type", "video/mp2t") 217 } 218 219 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf)) 220 }) 221} 222 223func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle { 224 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 225 user := p.ByName("user") 226 if user == "" { 227 errors.WriteHTTPBadRequest(w, "user required", nil) 228 return 229 } 230 user, err := a.NormalizeUser(ctx, user) 231 if err != nil { 232 errors.WriteHTTPNotFound(w, "user not found", err) 233 return 234 } 235 thumb, err := a.Model.LatestThumbnailForUser(user) 236 if err != nil { 237 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err) 238 return 239 } 240 if thumb == nil { 241 errors.WriteHTTPNotFound(w, "thumbnail not found", err) 242 return 243 } 244 aqt := aqtime.FromTime(thumb.Segment.StartTime) 245 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format)) 246 if err != nil { 247 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err) 248 return 249 } 250 http.ServeFile(w, r, fpath) 251 } 252}