Live video on the AT Protocol
at eli/node-22 432 lines 12 kB view raw
1package api 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "crypto" 8 "fmt" 9 "io" 10 "net/http" 11 "strconv" 12 "strings" 13 "time" 14 15 atcrypto "github.com/bluesky-social/indigo/atproto/crypto" 16 "github.com/decred/dcrd/dcrec/secp256k1" 17 "github.com/julienschmidt/httprouter" 18 "github.com/mr-tron/base58" 19 "github.com/pion/webrtc/v4" 20 "golang.org/x/sync/errgroup" 21 "stream.place/streamplace/pkg/aqtime" 22 "stream.place/streamplace/pkg/atproto" 23 "stream.place/streamplace/pkg/constants" 24 "stream.place/streamplace/pkg/errors" 25 apierrors "stream.place/streamplace/pkg/errors" 26 "stream.place/streamplace/pkg/log" 27 "stream.place/streamplace/pkg/media" 28 "stream.place/streamplace/pkg/spmetrics" 29) 30 31func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) { 32 alias, ok := a.Aliases[user] 33 if ok { 34 user = alias 35 } 36 // did:key, pass through unaltered 37 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) { 38 return user, nil 39 } 40 // only other allowed case is a bluesky handle 41 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model) 42 if err != nil { 43 return "", err 44 } 45 return repo.DID, nil 46} 47 48func (a *StreamplaceAPI) HandleMP4Playback(ctx context.Context) httprouter.Handle { 49 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 50 user := p.ByName("user") 51 if user == "" { 52 errors.WriteHTTPBadRequest(w, "user required", nil) 53 return 54 } 55 rendition := getRendition(r) 56 user, err := a.NormalizeUser(ctx, user) 57 if err != nil { 58 errors.WriteHTTPBadRequest(w, "invalid user", err) 59 return 60 } 61 var delayMS int64 = 3000 62 userDelay := r.URL.Query().Get("delayms") 63 if userDelay != "" { 64 var err error 65 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 66 if err != nil { 67 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 68 return 69 } 70 if delayMS > 10000 { 71 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 72 return 73 } 74 } 75 spmetrics.ViewerInc(user) 76 defer spmetrics.ViewerDec(user) 77 w.Header().Set("Content-Type", "video/mp4") 78 w.WriteHeader(200) 79 g, ctx := errgroup.WithContext(ctx) 80 pr, pw := io.Pipe() 81 bufw := bufio.NewWriter(pw) 82 g.Go(func() error { 83 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw) 84 }) 85 g.Go(func() error { 86 <-ctx.Done() 87 pr.Close() 88 pw.Close() 89 return nil 90 }) 91 g.Go(func() error { 92 time.Sleep(time.Duration(delayMS) * time.Millisecond) 93 _, err := io.Copy(w, pr) 94 return err 95 }) 96 g.Wait() 97 } 98} 99 100func (a *StreamplaceAPI) HandleMKVPlayback(ctx context.Context) httprouter.Handle { 101 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 102 user := p.ByName("user") 103 if user == "" { 104 errors.WriteHTTPBadRequest(w, "user required", nil) 105 return 106 } 107 rendition := getRendition(r) 108 user, err := a.NormalizeUser(ctx, user) 109 if err != nil { 110 errors.WriteHTTPBadRequest(w, "invalid user", err) 111 return 112 } 113 var delayMS int64 = 1000 114 userDelay := r.URL.Query().Get("delayms") 115 if userDelay != "" { 116 var err error 117 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 118 if err != nil { 119 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 120 return 121 } 122 if delayMS > 10000 { 123 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 124 return 125 } 126 } 127 spmetrics.ViewerInc(user) 128 defer spmetrics.ViewerDec(user) 129 w.Header().Set("Content-Type", "video/webm") 130 w.WriteHeader(200) 131 g, ctx := errgroup.WithContext(ctx) 132 pr, pw := io.Pipe() 133 bufw := bufio.NewWriter(pw) 134 g.Go(func() error { 135 return a.MediaManager.SegmentToMKV(ctx, user, rendition, bufw) 136 }) 137 g.Go(func() error { 138 <-ctx.Done() 139 pr.Close() 140 pw.Close() 141 return nil 142 }) 143 g.Go(func() error { 144 time.Sleep(time.Duration(delayMS) * time.Millisecond) 145 _, err := io.Copy(w, pr) 146 return err 147 }) 148 g.Wait() 149 } 150} 151 152func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle { 153 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 154 user := p.ByName("user") 155 if user == "" { 156 errors.WriteHTTPBadRequest(w, "user required", nil) 157 return 158 } 159 rendition := getRendition(r) 160 user, err := a.NormalizeUser(ctx, user) 161 if err != nil { 162 errors.WriteHTTPBadRequest(w, "invalid user", err) 163 return 164 } 165 body, err := io.ReadAll(r.Body) 166 if err != nil { 167 errors.WriteHTTPBadRequest(w, "error reading body", err) 168 return 169 } 170 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 171 answer, err := a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer) 172 if err != nil { 173 errors.WriteHTTPInternalServerError(w, "error playing back", err) 174 return 175 } 176 w.WriteHeader(201) 177 w.Header().Add("Location", r.URL.Path) 178 w.Write([]byte(answer.SDP)) 179 } 180} 181 182const BEARER_PREFIX = "Bearer " 183const KEY_PREFIX = "0x" 184 185func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle { 186 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 187 ct := r.Header.Get("Content-Type") 188 if ct != "application/sdp" { 189 errors.WriteHTTPBadRequest(w, "invalid content type", nil) 190 return 191 } 192 var encoded string 193 urlKey := p.ByName("key") 194 if urlKey != "" { 195 encoded = urlKey 196 } else { 197 auth := r.Header.Get("Authorization") 198 if auth == "" { 199 errors.WriteHTTPUnauthorized(w, "authorization header required", nil) 200 return 201 } 202 if !strings.HasPrefix(auth, BEARER_PREFIX) { 203 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil) 204 return 205 } 206 encoded = auth[len(BEARER_PREFIX):] 207 // it's easy to copy-paste a trailing or leading space, so clear those out 208 encoded = strings.TrimSpace(encoded) 209 } 210 211 if len(encoded) < 2 || encoded[0] != 'z' { 212 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a multibase base58btc string)", nil) 213 return 214 } 215 216 var addrBytes []byte 217 var didBytes []byte 218 priv, err := atcrypto.ParsePrivateMultibase(encoded) 219 if err == nil { 220 addrBytes = priv.Bytes() 221 } else { 222 decoded, err := base58.Decode(encoded[1:]) 223 if err != nil { 224 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a base58btc string)", nil) 225 return 226 } 227 addrBytes = decoded[:32] 228 didBytes = decoded[32:] 229 priv, err = atcrypto.ParsePrivateBytesK256(addrBytes) 230 if err != nil { 231 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid atcrypto)", err) 232 return 233 } 234 } 235 236 key, _ := secp256k1.PrivKeyFromBytes(addrBytes) 237 if key == nil { 238 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", nil) 239 return 240 } 241 var signer crypto.Signer = key.ToECDSA() 242 pub, err := priv.PublicKey() 243 if err != nil { 244 apierrors.WriteHTTPUnauthorized(w, "invalid authorization key (could not parse as atcrypto)", err) 245 return 246 } 247 248 did := string(didBytes) 249 250 if did != "" { 251 repo, err := a.ATSync.SyncBlueskyRepo(ctx, did, a.Model) 252 if err != nil { 253 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 254 return 255 } 256 err = a.CLI.StreamIsAllowed(repo.DID) 257 if err != nil { 258 apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err) 259 return 260 } 261 signingKey, err := a.Model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 262 if err != nil { 263 apierrors.WriteHTTPUnauthorized(w, "signing key not found", err) 264 return 265 } 266 if signingKey == nil { 267 apierrors.WriteHTTPUnauthorized(w, "signing key not found", nil) 268 return 269 } 270 } else { 271 atkey, err := atproto.ParsePubKey(signer.Public()) 272 if err != nil { 273 apierrors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err) 274 return 275 } 276 did = atkey.DIDKey() 277 err = a.CLI.StreamIsAllowed(did) 278 if err != nil { 279 apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err) 280 return 281 } 282 } 283 284 ctx = log.WithLogValues(ctx, "did", did) 285 286 var mediaSigner media.MediaSigner 287 if a.CLI.ExternalSigning { 288 mediaSigner, err = media.MakeMediaSignerExt(ctx, a.CLI, did, addrBytes) 289 } else { 290 mediaSigner, err = media.MakeMediaSigner(ctx, a.CLI, did, signer) 291 } 292 if err != nil { 293 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err) 294 return 295 } 296 297 body, err := io.ReadAll(r.Body) 298 if err != nil { 299 errors.WriteHTTPBadRequest(w, "error reading body", err) 300 return 301 } 302 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 303 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner) 304 if err != nil { 305 errors.WriteHTTPInternalServerError(w, "error playing back", err) 306 return 307 } 308 host := r.Host 309 if host == "" { 310 host = r.URL.Host 311 } 312 scheme := "http" 313 if r.TLS != nil { 314 scheme = "https" 315 } 316 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host) 317 log.Log(ctx, "location", "location", location) 318 w.Header().Set("Location", location) 319 w.WriteHeader(201) 320 w.Write([]byte(answer.SDP)) 321 } 322} 323 324var epoch = time.Unix(0, 0).Format(time.RFC1123) 325 326var noCacheHeaders = map[string]string{ 327 "Expires": epoch, 328 "Cache-Control": "no-cache, private, max-age=0", 329 "Pragma": "no-cache", 330 "X-Accel-Expires": "0", 331} 332 333var etagHeaders = []string{ 334 "ETag", 335 "If-Modified-Since", 336 "If-Match", 337 "If-None-Match", 338 "If-Range", 339 "If-Unmodified-Since", 340} 341 342func NoCache(h httprouter.Handle) httprouter.Handle { 343 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 344 // Delete any ETag headers that may have been set 345 for _, v := range etagHeaders { 346 if r.Header.Get(v) != "" { 347 r.Header.Del(v) 348 } 349 } 350 351 // Set our NoCache headers 352 for k, v := range noCacheHeaders { 353 w.Header().Set(k, v) 354 } 355 356 h(w, r, p) 357 } 358} 359 360func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle { 361 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 362 user := p.ByName("user") 363 if user == "" { 364 errors.WriteHTTPBadRequest(w, "user required", nil) 365 return 366 } 367 user, err := a.NormalizeUser(ctx, user) 368 if err != nil { 369 errors.WriteHTTPBadRequest(w, "invalid user", err) 370 return 371 } 372 file := p.ByName("file") 373 if file == "" { 374 errors.WriteHTTPBadRequest(w, "file required", nil) 375 return 376 } 377 m3u8, err := a.Director.GetM3U8(ctx, user) 378 if err != nil { 379 errors.WriteHTTPNotFound(w, "could not get m3u8", err) 380 return 381 } 382 session := r.URL.Query().Get("session") 383 rendition := r.URL.Query().Get("rendition") 384 buf, err := m3u8.GetFile(file, session, rendition) 385 if err != nil { 386 errors.WriteHTTPNotFound(w, "segment not found", err) 387 return 388 } 389 390 if strings.HasSuffix(file, ".m3u8") { 391 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") 392 } else { 393 if session != "" { 394 spmetrics.SessionSeen(user, session) 395 } 396 w.Header().Set("Content-Type", "video/mp2t") 397 } 398 399 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf)) 400 }) 401} 402 403func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle { 404 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 405 user := p.ByName("user") 406 if user == "" { 407 errors.WriteHTTPBadRequest(w, "user required", nil) 408 return 409 } 410 user, err := a.NormalizeUser(ctx, user) 411 if err != nil { 412 errors.WriteHTTPNotFound(w, "user not found", err) 413 return 414 } 415 thumb, err := a.Model.LatestThumbnailForUser(user) 416 if err != nil { 417 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err) 418 return 419 } 420 if thumb == nil { 421 errors.WriteHTTPNotFound(w, "thumbnail not found", err) 422 return 423 } 424 aqt := aqtime.FromTime(thumb.Segment.StartTime) 425 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format)) 426 if err != nil { 427 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err) 428 return 429 } 430 http.ServeFile(w, r, fpath) 431 } 432}