Live video on the AT Protocol
at natb/loading-overlay 363 lines 9.9 kB view raw
1package api 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "fmt" 8 "io" 9 "net/http" 10 "strconv" 11 "strings" 12 "time" 13 14 "github.com/julienschmidt/httprouter" 15 "github.com/pion/webrtc/v4" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/aqtime" 18 "stream.place/streamplace/pkg/constants" 19 "stream.place/streamplace/pkg/errors" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/spmetrics" 22) 23 24func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) { 25 alias, ok := a.Aliases[user] 26 if ok { 27 user = alias 28 } 29 // did:key, pass through unaltered 30 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) { 31 return user, nil 32 } 33 // only other allowed case is a bluesky handle 34 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model) 35 if err != nil { 36 return "", err 37 } 38 return repo.DID, nil 39} 40 41func (a *StreamplaceAPI) HandleMP4Playback(ctx context.Context) httprouter.Handle { 42 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 43 user := p.ByName("user") 44 if user == "" { 45 errors.WriteHTTPBadRequest(w, "user required", nil) 46 return 47 } 48 rendition := getRendition(r) 49 user, err := a.NormalizeUser(ctx, user) 50 if err != nil { 51 errors.WriteHTTPBadRequest(w, "invalid user", err) 52 return 53 } 54 var delayMS int64 = 3000 55 userDelay := r.URL.Query().Get("delayms") 56 if userDelay != "" { 57 var err error 58 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 59 if err != nil { 60 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 61 return 62 } 63 if delayMS > 10000 { 64 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 65 return 66 } 67 } 68 spmetrics.ViewerInc(user) 69 defer spmetrics.ViewerDec(user) 70 w.Header().Set("Content-Type", "video/mp4") 71 w.WriteHeader(200) 72 g, ctx := errgroup.WithContext(ctx) 73 pr, pw := io.Pipe() 74 bufw := bufio.NewWriter(pw) 75 g.Go(func() error { 76 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw) 77 }) 78 g.Go(func() error { 79 <-ctx.Done() 80 pr.Close() 81 pw.Close() 82 return nil 83 }) 84 g.Go(func() error { 85 time.Sleep(time.Duration(delayMS) * time.Millisecond) 86 _, err := io.Copy(w, pr) 87 return err 88 }) 89 if err := g.Wait(); err != nil { 90 errors.WriteHTTPBadRequest(w, "request failed", err) 91 } 92 } 93} 94 95func (a *StreamplaceAPI) HandleMKVPlayback(ctx context.Context) httprouter.Handle { 96 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 97 user := p.ByName("user") 98 if user == "" { 99 errors.WriteHTTPBadRequest(w, "user required", nil) 100 return 101 } 102 rendition := getRendition(r) 103 user, err := a.NormalizeUser(ctx, user) 104 if err != nil { 105 errors.WriteHTTPBadRequest(w, "invalid user", err) 106 return 107 } 108 var delayMS int64 = 1000 109 userDelay := r.URL.Query().Get("delayms") 110 if userDelay != "" { 111 var err error 112 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 113 if err != nil { 114 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 115 return 116 } 117 if delayMS > 10000 { 118 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 119 return 120 } 121 } 122 spmetrics.ViewerInc(user) 123 defer spmetrics.ViewerDec(user) 124 w.Header().Set("Content-Type", "video/webm") 125 w.WriteHeader(200) 126 g, ctx := errgroup.WithContext(ctx) 127 pr, pw := io.Pipe() 128 bufw := bufio.NewWriter(pw) 129 g.Go(func() error { 130 return a.MediaManager.SegmentToMKV(ctx, user, rendition, bufw) 131 }) 132 g.Go(func() error { 133 <-ctx.Done() 134 pr.Close() 135 pw.Close() 136 return nil 137 }) 138 g.Go(func() error { 139 time.Sleep(time.Duration(delayMS) * time.Millisecond) 140 _, err := io.Copy(w, pr) 141 return err 142 }) 143 if err := g.Wait(); err != nil { 144 errors.WriteHTTPBadRequest(w, "request failed", err) 145 } 146 } 147} 148 149func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle { 150 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 151 user := p.ByName("user") 152 if user == "" { 153 errors.WriteHTTPBadRequest(w, "user required", nil) 154 return 155 } 156 rendition := getRendition(r) 157 user, err := a.NormalizeUser(ctx, user) 158 if err != nil { 159 errors.WriteHTTPBadRequest(w, "invalid user", err) 160 return 161 } 162 body, err := io.ReadAll(r.Body) 163 if err != nil { 164 errors.WriteHTTPBadRequest(w, "error reading body", err) 165 return 166 } 167 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 168 var answer *webrtc.SessionDescription 169 if a.CLI.NewWebRTCPlayback { 170 answer, err = a.MediaManager.WebRTCPlayback2(ctx, user, rendition, &offer) 171 } else { 172 answer, err = a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer) 173 } 174 if err != nil { 175 errors.WriteHTTPInternalServerError(w, "error playing back", err) 176 return 177 } 178 w.WriteHeader(201) 179 w.Header().Add("Location", r.URL.Path) 180 if _, err := w.Write([]byte(answer.SDP)); err != nil { 181 log.Error(ctx, "error writing response", "error", err) 182 } 183 } 184} 185 186const BearerPrefix = "Bearer " 187const KeyPrefix = "0x" 188 189func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle { 190 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 191 ct := r.Header.Get("Content-Type") 192 if ct != "application/sdp" { 193 errors.WriteHTTPBadRequest(w, "invalid content type", nil) 194 return 195 } 196 var encoded string 197 urlKey := p.ByName("key") 198 if urlKey != "" { 199 encoded = urlKey 200 } else { 201 auth := r.Header.Get("Authorization") 202 if auth == "" { 203 errors.WriteHTTPUnauthorized(w, "authorization header required", nil) 204 return 205 } 206 if !strings.HasPrefix(auth, BearerPrefix) { 207 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil) 208 return 209 } 210 encoded = auth[len(BearerPrefix):] 211 // it's easy to copy-paste a trailing or leading space, so clear those out 212 encoded = strings.TrimSpace(encoded) 213 } 214 215 mediaSigner, err := a.MakeMediaSigner(ctx, encoded) 216 if err != nil { 217 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 218 return 219 } 220 221 body, err := io.ReadAll(r.Body) 222 if err != nil { 223 errors.WriteHTTPBadRequest(w, "error reading body", err) 224 return 225 } 226 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 227 pc, err := a.MediaManager.NewPeerConnection(ctx, mediaSigner.Streamer()) 228 if err != nil { 229 errors.WriteHTTPInternalServerError(w, "unable to create peer connection", err) 230 return 231 } 232 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner, pc, make(chan struct{})) 233 if err != nil { 234 errors.WriteHTTPInternalServerError(w, "error playing back", err) 235 return 236 } 237 host := r.Host 238 if host == "" { 239 host = r.URL.Host 240 } 241 scheme := "http" 242 if r.TLS != nil { 243 scheme = "https" 244 } 245 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host) 246 log.Log(ctx, "location", "location", location) 247 w.Header().Set("Location", location) 248 w.WriteHeader(201) 249 if _, err := w.Write([]byte(answer.SDP)); err != nil { 250 log.Error(ctx, "error writing response", "error", err) 251 } 252 } 253} 254 255var epoch = time.Unix(0, 0).Format(time.RFC1123) 256 257var noCacheHeaders = map[string]string{ 258 "Expires": epoch, 259 "Cache-Control": "no-cache, private, max-age=0", 260 "Pragma": "no-cache", 261 "X-Accel-Expires": "0", 262} 263 264var etagHeaders = []string{ 265 "ETag", 266 "If-Modified-Since", 267 "If-Match", 268 "If-None-Match", 269 "If-Range", 270 "If-Unmodified-Since", 271} 272 273func NoCache(h httprouter.Handle) httprouter.Handle { 274 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 275 // Delete any ETag headers that may have been set 276 for _, v := range etagHeaders { 277 if r.Header.Get(v) != "" { 278 r.Header.Del(v) 279 } 280 } 281 282 // Set our NoCache headers 283 for k, v := range noCacheHeaders { 284 w.Header().Set(k, v) 285 } 286 287 h(w, r, p) 288 } 289} 290 291func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle { 292 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 293 user := p.ByName("user") 294 if user == "" { 295 errors.WriteHTTPBadRequest(w, "user required", nil) 296 return 297 } 298 user, err := a.NormalizeUser(ctx, user) 299 if err != nil { 300 errors.WriteHTTPBadRequest(w, "invalid user", err) 301 return 302 } 303 file := p.ByName("file") 304 if file == "" { 305 errors.WriteHTTPBadRequest(w, "file required", nil) 306 return 307 } 308 m3u8, err := a.Director.GetM3U8(ctx, user) 309 if err != nil { 310 errors.WriteHTTPNotFound(w, "could not get m3u8", err) 311 return 312 } 313 session := r.URL.Query().Get("session") 314 rendition := r.URL.Query().Get("rendition") 315 buf, err := m3u8.GetFile(file, session, rendition) 316 if err != nil { 317 errors.WriteHTTPNotFound(w, "segment not found", err) 318 return 319 } 320 321 if strings.HasSuffix(file, ".m3u8") { 322 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") 323 } else { 324 if session != "" { 325 spmetrics.SessionSeen(user, session) 326 } 327 w.Header().Set("Content-Type", "video/mp2t") 328 } 329 330 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf)) 331 }) 332} 333 334func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle { 335 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 336 user := p.ByName("user") 337 if user == "" { 338 errors.WriteHTTPBadRequest(w, "user required", nil) 339 return 340 } 341 user, err := a.NormalizeUser(ctx, user) 342 if err != nil { 343 errors.WriteHTTPNotFound(w, "user not found", err) 344 return 345 } 346 thumb, err := a.Model.LatestThumbnailForUser(user) 347 if err != nil { 348 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err) 349 return 350 } 351 if thumb == nil { 352 errors.WriteHTTPNotFound(w, "thumbnail not found", err) 353 return 354 } 355 aqt := aqtime.FromTime(thumb.Segment.StartTime) 356 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format)) 357 if err != nil { 358 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err) 359 return 360 } 361 http.ServeFile(w, r, fpath) 362 } 363}