Live video on the AT Protocol
at natb/rtmp-preferred 345 lines 9.2 kB view raw
1package api 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "fmt" 8 "io" 9 "net/http" 10 "strconv" 11 "strings" 12 "time" 13 14 "github.com/julienschmidt/httprouter" 15 "github.com/pion/webrtc/v4" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/aqtime" 18 "stream.place/streamplace/pkg/constants" 19 "stream.place/streamplace/pkg/errors" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/spmetrics" 22) 23 24func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) { 25 alias, ok := a.Aliases[user] 26 if ok { 27 user = alias 28 } 29 // did:key, pass through unaltered 30 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) { 31 return user, nil 32 } 33 // only other allowed case is a bluesky handle 34 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model) 35 if err != nil { 36 return "", err 37 } 38 return repo.DID, nil 39} 40 41func (a *StreamplaceAPI) HandleMP4Playback(ctx context.Context) httprouter.Handle { 42 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 43 user := p.ByName("user") 44 if user == "" { 45 errors.WriteHTTPBadRequest(w, "user required", nil) 46 return 47 } 48 rendition := getRendition(r) 49 user, err := a.NormalizeUser(ctx, user) 50 if err != nil { 51 errors.WriteHTTPBadRequest(w, "invalid user", err) 52 return 53 } 54 var delayMS int64 = 3000 55 userDelay := r.URL.Query().Get("delayms") 56 if userDelay != "" { 57 var err error 58 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 59 if err != nil { 60 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 61 return 62 } 63 if delayMS > 10000 { 64 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 65 return 66 } 67 } 68 spmetrics.ViewerInc(user) 69 defer spmetrics.ViewerDec(user) 70 w.Header().Set("Content-Type", "video/mp4") 71 w.WriteHeader(200) 72 g, ctx := errgroup.WithContext(ctx) 73 pr, pw := io.Pipe() 74 bufw := bufio.NewWriter(pw) 75 g.Go(func() error { 76 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw) 77 }) 78 g.Go(func() error { 79 <-ctx.Done() 80 pr.Close() 81 pw.Close() 82 return nil 83 }) 84 g.Go(func() error { 85 time.Sleep(time.Duration(delayMS) * time.Millisecond) 86 _, err := io.Copy(w, pr) 87 return err 88 }) 89 g.Wait() 90 } 91} 92 93func (a *StreamplaceAPI) HandleMKVPlayback(ctx context.Context) httprouter.Handle { 94 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 95 user := p.ByName("user") 96 if user == "" { 97 errors.WriteHTTPBadRequest(w, "user required", nil) 98 return 99 } 100 rendition := getRendition(r) 101 user, err := a.NormalizeUser(ctx, user) 102 if err != nil { 103 errors.WriteHTTPBadRequest(w, "invalid user", err) 104 return 105 } 106 var delayMS int64 = 1000 107 userDelay := r.URL.Query().Get("delayms") 108 if userDelay != "" { 109 var err error 110 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 111 if err != nil { 112 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 113 return 114 } 115 if delayMS > 10000 { 116 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 117 return 118 } 119 } 120 spmetrics.ViewerInc(user) 121 defer spmetrics.ViewerDec(user) 122 w.Header().Set("Content-Type", "video/webm") 123 w.WriteHeader(200) 124 g, ctx := errgroup.WithContext(ctx) 125 pr, pw := io.Pipe() 126 bufw := bufio.NewWriter(pw) 127 g.Go(func() error { 128 return a.MediaManager.SegmentToMKV(ctx, user, rendition, bufw) 129 }) 130 g.Go(func() error { 131 <-ctx.Done() 132 pr.Close() 133 pw.Close() 134 return nil 135 }) 136 g.Go(func() error { 137 time.Sleep(time.Duration(delayMS) * time.Millisecond) 138 _, err := io.Copy(w, pr) 139 return err 140 }) 141 g.Wait() 142 } 143} 144 145func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle { 146 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 147 user := p.ByName("user") 148 if user == "" { 149 errors.WriteHTTPBadRequest(w, "user required", nil) 150 return 151 } 152 rendition := getRendition(r) 153 user, err := a.NormalizeUser(ctx, user) 154 if err != nil { 155 errors.WriteHTTPBadRequest(w, "invalid user", err) 156 return 157 } 158 body, err := io.ReadAll(r.Body) 159 if err != nil { 160 errors.WriteHTTPBadRequest(w, "error reading body", err) 161 return 162 } 163 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 164 answer, err := a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer) 165 if err != nil { 166 errors.WriteHTTPInternalServerError(w, "error playing back", err) 167 return 168 } 169 w.WriteHeader(201) 170 w.Header().Add("Location", r.URL.Path) 171 w.Write([]byte(answer.SDP)) 172 } 173} 174 175const BEARER_PREFIX = "Bearer " 176const KEY_PREFIX = "0x" 177 178func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle { 179 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 180 ct := r.Header.Get("Content-Type") 181 if ct != "application/sdp" { 182 errors.WriteHTTPBadRequest(w, "invalid content type", nil) 183 return 184 } 185 var encoded string 186 urlKey := p.ByName("key") 187 if urlKey != "" { 188 encoded = urlKey 189 } else { 190 auth := r.Header.Get("Authorization") 191 if auth == "" { 192 errors.WriteHTTPUnauthorized(w, "authorization header required", nil) 193 return 194 } 195 if !strings.HasPrefix(auth, BEARER_PREFIX) { 196 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil) 197 return 198 } 199 encoded = auth[len(BEARER_PREFIX):] 200 // it's easy to copy-paste a trailing or leading space, so clear those out 201 encoded = strings.TrimSpace(encoded) 202 } 203 204 mediaSigner, err := a.MakeMediaSigner(ctx, encoded) 205 if err != nil { 206 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 207 return 208 } 209 210 body, err := io.ReadAll(r.Body) 211 if err != nil { 212 errors.WriteHTTPBadRequest(w, "error reading body", err) 213 return 214 } 215 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 216 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner) 217 if err != nil { 218 errors.WriteHTTPInternalServerError(w, "error playing back", err) 219 return 220 } 221 host := r.Host 222 if host == "" { 223 host = r.URL.Host 224 } 225 scheme := "http" 226 if r.TLS != nil { 227 scheme = "https" 228 } 229 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host) 230 log.Log(ctx, "location", "location", location) 231 w.Header().Set("Location", location) 232 w.WriteHeader(201) 233 w.Write([]byte(answer.SDP)) 234 } 235} 236 237var epoch = time.Unix(0, 0).Format(time.RFC1123) 238 239var noCacheHeaders = map[string]string{ 240 "Expires": epoch, 241 "Cache-Control": "no-cache, private, max-age=0", 242 "Pragma": "no-cache", 243 "X-Accel-Expires": "0", 244} 245 246var etagHeaders = []string{ 247 "ETag", 248 "If-Modified-Since", 249 "If-Match", 250 "If-None-Match", 251 "If-Range", 252 "If-Unmodified-Since", 253} 254 255func NoCache(h httprouter.Handle) httprouter.Handle { 256 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 257 // Delete any ETag headers that may have been set 258 for _, v := range etagHeaders { 259 if r.Header.Get(v) != "" { 260 r.Header.Del(v) 261 } 262 } 263 264 // Set our NoCache headers 265 for k, v := range noCacheHeaders { 266 w.Header().Set(k, v) 267 } 268 269 h(w, r, p) 270 } 271} 272 273func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle { 274 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 275 user := p.ByName("user") 276 if user == "" { 277 errors.WriteHTTPBadRequest(w, "user required", nil) 278 return 279 } 280 user, err := a.NormalizeUser(ctx, user) 281 if err != nil { 282 errors.WriteHTTPBadRequest(w, "invalid user", err) 283 return 284 } 285 file := p.ByName("file") 286 if file == "" { 287 errors.WriteHTTPBadRequest(w, "file required", nil) 288 return 289 } 290 m3u8, err := a.Director.GetM3U8(ctx, user) 291 if err != nil { 292 errors.WriteHTTPNotFound(w, "could not get m3u8", err) 293 return 294 } 295 session := r.URL.Query().Get("session") 296 rendition := r.URL.Query().Get("rendition") 297 buf, err := m3u8.GetFile(file, session, rendition) 298 if err != nil { 299 errors.WriteHTTPNotFound(w, "segment not found", err) 300 return 301 } 302 303 if strings.HasSuffix(file, ".m3u8") { 304 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") 305 } else { 306 if session != "" { 307 spmetrics.SessionSeen(user, session) 308 } 309 w.Header().Set("Content-Type", "video/mp2t") 310 } 311 312 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf)) 313 }) 314} 315 316func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle { 317 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 318 user := p.ByName("user") 319 if user == "" { 320 errors.WriteHTTPBadRequest(w, "user required", nil) 321 return 322 } 323 user, err := a.NormalizeUser(ctx, user) 324 if err != nil { 325 errors.WriteHTTPNotFound(w, "user not found", err) 326 return 327 } 328 thumb, err := a.Model.LatestThumbnailForUser(user) 329 if err != nil { 330 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err) 331 return 332 } 333 if thumb == nil { 334 errors.WriteHTTPNotFound(w, "thumbnail not found", err) 335 return 336 } 337 aqt := aqtime.FromTime(thumb.Segment.StartTime) 338 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format)) 339 if err != nil { 340 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err) 341 return 342 } 343 http.ServeFile(w, r, fpath) 344 } 345}