Live video on the AT Protocol
at eli/sync-tangled 413 lines 11 kB view raw
1package api 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "crypto" 8 "fmt" 9 "io" 10 "net/http" 11 "strconv" 12 "strings" 13 "time" 14 15 atcrypto "github.com/bluesky-social/indigo/atproto/crypto" 16 "github.com/decred/dcrd/dcrec/secp256k1" 17 "github.com/julienschmidt/httprouter" 18 "github.com/mr-tron/base58" 19 "github.com/pion/webrtc/v4" 20 "golang.org/x/sync/errgroup" 21 "stream.place/streamplace/pkg/aqtime" 22 "stream.place/streamplace/pkg/atproto" 23 "stream.place/streamplace/pkg/constants" 24 "stream.place/streamplace/pkg/errors" 25 apierrors "stream.place/streamplace/pkg/errors" 26 "stream.place/streamplace/pkg/log" 27 "stream.place/streamplace/pkg/media" 28 "stream.place/streamplace/pkg/spmetrics" 29) 30 31func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) { 32 alias, ok := a.Aliases[user] 33 if ok { 34 user = alias 35 } 36 // did:key, pass through unaltered 37 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) { 38 return user, nil 39 } 40 // only other allowed case is a bluesky handle 41 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model) 42 if err != nil { 43 return "", err 44 } 45 return repo.DID, nil 46} 47 48func (a *StreamplaceAPI) HandleMP4Playback(ctx context.Context) httprouter.Handle { 49 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 50 user := p.ByName("user") 51 if user == "" { 52 errors.WriteHTTPBadRequest(w, "user required", nil) 53 return 54 } 55 rendition := getRendition(r) 56 user, err := a.NormalizeUser(ctx, user) 57 if err != nil { 58 errors.WriteHTTPBadRequest(w, "invalid user", err) 59 return 60 } 61 var delayMS int64 = 3000 62 userDelay := r.URL.Query().Get("delayms") 63 if userDelay != "" { 64 var err error 65 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 66 if err != nil { 67 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 68 return 69 } 70 if delayMS > 10000 { 71 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 72 return 73 } 74 } 75 spmetrics.ViewerInc(user) 76 defer spmetrics.ViewerDec(user) 77 w.Header().Set("Content-Type", "video/mp4") 78 w.WriteHeader(200) 79 g, ctx := errgroup.WithContext(ctx) 80 pr, pw := io.Pipe() 81 bufw := bufio.NewWriter(pw) 82 g.Go(func() error { 83 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw) 84 }) 85 g.Go(func() error { 86 <-ctx.Done() 87 pr.Close() 88 pw.Close() 89 return nil 90 }) 91 g.Go(func() error { 92 time.Sleep(time.Duration(delayMS) * time.Millisecond) 93 _, err := io.Copy(w, pr) 94 return err 95 }) 96 g.Wait() 97 } 98} 99 100func (a *StreamplaceAPI) HandleMKVPlayback(ctx context.Context) httprouter.Handle { 101 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 102 user := p.ByName("user") 103 if user == "" { 104 errors.WriteHTTPBadRequest(w, "user required", nil) 105 return 106 } 107 rendition := getRendition(r) 108 user, err := a.NormalizeUser(ctx, user) 109 if err != nil { 110 errors.WriteHTTPBadRequest(w, "invalid user", err) 111 return 112 } 113 var delayMS int64 = 1000 114 userDelay := r.URL.Query().Get("delayms") 115 if userDelay != "" { 116 var err error 117 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 118 if err != nil { 119 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 120 return 121 } 122 if delayMS > 10000 { 123 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 124 return 125 } 126 } 127 spmetrics.ViewerInc(user) 128 defer spmetrics.ViewerDec(user) 129 w.Header().Set("Content-Type", "video/webm") 130 w.WriteHeader(200) 131 g, ctx := errgroup.WithContext(ctx) 132 pr, pw := io.Pipe() 133 bufw := bufio.NewWriter(pw) 134 g.Go(func() error { 135 return a.MediaManager.SegmentToMKV(ctx, user, rendition, bufw) 136 }) 137 g.Go(func() error { 138 <-ctx.Done() 139 pr.Close() 140 pw.Close() 141 return nil 142 }) 143 g.Go(func() error { 144 time.Sleep(time.Duration(delayMS) * time.Millisecond) 145 _, err := io.Copy(w, pr) 146 return err 147 }) 148 g.Wait() 149 } 150} 151 152func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle { 153 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 154 user := p.ByName("user") 155 if user == "" { 156 errors.WriteHTTPBadRequest(w, "user required", nil) 157 return 158 } 159 rendition := getRendition(r) 160 user, err := a.NormalizeUser(ctx, user) 161 if err != nil { 162 errors.WriteHTTPBadRequest(w, "invalid user", err) 163 return 164 } 165 body, err := io.ReadAll(r.Body) 166 if err != nil { 167 errors.WriteHTTPBadRequest(w, "error reading body", err) 168 return 169 } 170 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 171 answer, err := a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer) 172 if err != nil { 173 errors.WriteHTTPInternalServerError(w, "error playing back", err) 174 return 175 } 176 w.WriteHeader(201) 177 w.Header().Add("Location", r.URL.Path) 178 w.Write([]byte(answer.SDP)) 179 } 180} 181 182const BEARER_PREFIX = "Bearer " 183const KEY_PREFIX = "0x" 184 185func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle { 186 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 187 ct := r.Header.Get("Content-Type") 188 if ct != "application/sdp" { 189 errors.WriteHTTPBadRequest(w, "invalid content type", nil) 190 return 191 } 192 var encoded string 193 urlKey := p.ByName("key") 194 if urlKey != "" { 195 encoded = urlKey 196 } else { 197 auth := r.Header.Get("Authorization") 198 if auth == "" { 199 errors.WriteHTTPUnauthorized(w, "authorization header required", nil) 200 return 201 } 202 if !strings.HasPrefix(auth, BEARER_PREFIX) { 203 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil) 204 return 205 } 206 encoded = auth[len(BEARER_PREFIX):] 207 // it's easy to copy-paste a trailing or leading space, so clear those out 208 encoded = strings.TrimSpace(encoded) 209 } 210 211 if len(encoded) < 2 || encoded[0] != 'z' { 212 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a multibase base58btc string)", nil) 213 return 214 } 215 216 var addrBytes []byte 217 var didBytes []byte 218 priv, err := atcrypto.ParsePrivateMultibase(encoded) 219 if err == nil { 220 addrBytes = priv.Bytes() 221 } else { 222 decoded, err := base58.Decode(encoded[1:]) 223 if err != nil { 224 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a base58btc string)", nil) 225 return 226 } 227 addrBytes = decoded[:32] 228 didBytes = decoded[32:] 229 } 230 231 key, _ := secp256k1.PrivKeyFromBytes(addrBytes) 232 if key == nil { 233 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", nil) 234 return 235 } 236 var signer crypto.Signer = key.ToECDSA() 237 238 did := string(didBytes) 239 240 if did != "" { 241 repo, err := a.ATSync.SyncBlueskyRepo(ctx, did, a.Model) 242 if err != nil { 243 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 244 return 245 } 246 err = a.CLI.StreamIsAllowed(repo.DID) 247 if err != nil { 248 apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err) 249 return 250 } 251 } else { 252 atkey, err := atproto.ParsePubKey(signer.Public()) 253 if err != nil { 254 apierrors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err) 255 return 256 } 257 did = atkey.DIDKey() 258 err = a.CLI.StreamIsAllowed(did) 259 if err != nil { 260 apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err) 261 return 262 } 263 } 264 265 ctx = log.WithLogValues(ctx, "did", did) 266 267 var mediaSigner media.MediaSigner 268 if a.CLI.ExternalSigning { 269 mediaSigner, err = media.MakeMediaSignerExt(ctx, a.CLI, did, addrBytes) 270 } else { 271 mediaSigner, err = media.MakeMediaSigner(ctx, a.CLI, did, signer) 272 } 273 if err != nil { 274 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err) 275 return 276 } 277 278 body, err := io.ReadAll(r.Body) 279 if err != nil { 280 errors.WriteHTTPBadRequest(w, "error reading body", err) 281 return 282 } 283 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 284 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner) 285 if err != nil { 286 errors.WriteHTTPInternalServerError(w, "error playing back", err) 287 return 288 } 289 host := r.Host 290 if host == "" { 291 host = r.URL.Host 292 } 293 scheme := "http" 294 if r.TLS != nil { 295 scheme = "https" 296 } 297 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host) 298 log.Log(ctx, "location", "location", location) 299 w.Header().Set("Location", location) 300 w.WriteHeader(201) 301 w.Write([]byte(answer.SDP)) 302 } 303} 304 305var epoch = time.Unix(0, 0).Format(time.RFC1123) 306 307var noCacheHeaders = map[string]string{ 308 "Expires": epoch, 309 "Cache-Control": "no-cache, private, max-age=0", 310 "Pragma": "no-cache", 311 "X-Accel-Expires": "0", 312} 313 314var etagHeaders = []string{ 315 "ETag", 316 "If-Modified-Since", 317 "If-Match", 318 "If-None-Match", 319 "If-Range", 320 "If-Unmodified-Since", 321} 322 323func NoCache(h httprouter.Handle) httprouter.Handle { 324 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 325 // Delete any ETag headers that may have been set 326 for _, v := range etagHeaders { 327 if r.Header.Get(v) != "" { 328 r.Header.Del(v) 329 } 330 } 331 332 // Set our NoCache headers 333 for k, v := range noCacheHeaders { 334 w.Header().Set(k, v) 335 } 336 337 h(w, r, p) 338 } 339} 340 341func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle { 342 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 343 user := p.ByName("user") 344 if user == "" { 345 errors.WriteHTTPBadRequest(w, "user required", nil) 346 return 347 } 348 user, err := a.NormalizeUser(ctx, user) 349 if err != nil { 350 errors.WriteHTTPBadRequest(w, "invalid user", err) 351 return 352 } 353 file := p.ByName("file") 354 if file == "" { 355 errors.WriteHTTPBadRequest(w, "file required", nil) 356 return 357 } 358 m3u8, err := a.Director.GetM3U8(ctx, user) 359 if err != nil { 360 errors.WriteHTTPNotFound(w, "could not get m3u8", err) 361 return 362 } 363 session := r.URL.Query().Get("session") 364 rendition := r.URL.Query().Get("rendition") 365 buf, err := m3u8.GetFile(file, session, rendition) 366 if err != nil { 367 errors.WriteHTTPNotFound(w, "segment not found", err) 368 return 369 } 370 371 if strings.HasSuffix(file, ".m3u8") { 372 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") 373 } else { 374 if session != "" { 375 spmetrics.SessionSeen(user, session) 376 } 377 w.Header().Set("Content-Type", "video/mp2t") 378 } 379 380 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf)) 381 }) 382} 383 384func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle { 385 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 386 user := p.ByName("user") 387 if user == "" { 388 errors.WriteHTTPBadRequest(w, "user required", nil) 389 return 390 } 391 user, err := a.NormalizeUser(ctx, user) 392 if err != nil { 393 errors.WriteHTTPNotFound(w, "user not found", err) 394 return 395 } 396 thumb, err := a.Model.LatestThumbnailForUser(user) 397 if err != nil { 398 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err) 399 return 400 } 401 if thumb == nil { 402 errors.WriteHTTPNotFound(w, "thumbnail not found", err) 403 return 404 } 405 aqt := aqtime.FromTime(thumb.Segment.StartTime) 406 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format)) 407 if err != nil { 408 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err) 409 return 410 } 411 http.ServeFile(w, r, fpath) 412 } 413}