Live video on the AT Protocol
1package api
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "crypto"
8 "fmt"
9 "io"
10 "net/http"
11 "strconv"
12 "strings"
13 "time"
14
15 atcrypto "github.com/bluesky-social/indigo/atproto/crypto"
16 "github.com/decred/dcrd/dcrec/secp256k1"
17 "github.com/julienschmidt/httprouter"
18 "github.com/mr-tron/base58"
19 "github.com/pion/webrtc/v4"
20 "golang.org/x/sync/errgroup"
21 "stream.place/streamplace/pkg/aqtime"
22 "stream.place/streamplace/pkg/atproto"
23 "stream.place/streamplace/pkg/constants"
24 "stream.place/streamplace/pkg/errors"
25 apierrors "stream.place/streamplace/pkg/errors"
26 "stream.place/streamplace/pkg/log"
27 "stream.place/streamplace/pkg/media"
28 "stream.place/streamplace/pkg/spmetrics"
29)
30
31func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) {
32 alias, ok := a.Aliases[user]
33 if ok {
34 user = alias
35 }
36 // did:key, pass through unaltered
37 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) {
38 return user, nil
39 }
40 // only other allowed case is a bluesky handle
41 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model)
42 if err != nil {
43 return "", err
44 }
45 return repo.DID, nil
46}
47
48func (a *StreamplaceAPI) HandleMP4Playback(ctx context.Context) httprouter.Handle {
49 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
50 user := p.ByName("user")
51 if user == "" {
52 errors.WriteHTTPBadRequest(w, "user required", nil)
53 return
54 }
55 rendition := getRendition(r)
56 user, err := a.NormalizeUser(ctx, user)
57 if err != nil {
58 errors.WriteHTTPBadRequest(w, "invalid user", err)
59 return
60 }
61 var delayMS int64 = 3000
62 userDelay := r.URL.Query().Get("delayms")
63 if userDelay != "" {
64 var err error
65 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
66 if err != nil {
67 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
68 return
69 }
70 if delayMS > 10000 {
71 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
72 return
73 }
74 }
75 spmetrics.ViewerInc(user)
76 defer spmetrics.ViewerDec(user)
77 w.Header().Set("Content-Type", "video/mp4")
78 w.WriteHeader(200)
79 g, ctx := errgroup.WithContext(ctx)
80 pr, pw := io.Pipe()
81 bufw := bufio.NewWriter(pw)
82 g.Go(func() error {
83 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw)
84 })
85 g.Go(func() error {
86 <-ctx.Done()
87 pr.Close()
88 pw.Close()
89 return nil
90 })
91 g.Go(func() error {
92 time.Sleep(time.Duration(delayMS) * time.Millisecond)
93 _, err := io.Copy(w, pr)
94 return err
95 })
96 g.Wait()
97 }
98}
99
100func (a *StreamplaceAPI) HandleMKVPlayback(ctx context.Context) httprouter.Handle {
101 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
102 user := p.ByName("user")
103 if user == "" {
104 errors.WriteHTTPBadRequest(w, "user required", nil)
105 return
106 }
107 rendition := getRendition(r)
108 user, err := a.NormalizeUser(ctx, user)
109 if err != nil {
110 errors.WriteHTTPBadRequest(w, "invalid user", err)
111 return
112 }
113 var delayMS int64 = 1000
114 userDelay := r.URL.Query().Get("delayms")
115 if userDelay != "" {
116 var err error
117 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
118 if err != nil {
119 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
120 return
121 }
122 if delayMS > 10000 {
123 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
124 return
125 }
126 }
127 spmetrics.ViewerInc(user)
128 defer spmetrics.ViewerDec(user)
129 w.Header().Set("Content-Type", "video/webm")
130 w.WriteHeader(200)
131 g, ctx := errgroup.WithContext(ctx)
132 pr, pw := io.Pipe()
133 bufw := bufio.NewWriter(pw)
134 g.Go(func() error {
135 return a.MediaManager.SegmentToMKV(ctx, user, rendition, bufw)
136 })
137 g.Go(func() error {
138 <-ctx.Done()
139 pr.Close()
140 pw.Close()
141 return nil
142 })
143 g.Go(func() error {
144 time.Sleep(time.Duration(delayMS) * time.Millisecond)
145 _, err := io.Copy(w, pr)
146 return err
147 })
148 g.Wait()
149 }
150}
151
152func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle {
153 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
154 user := p.ByName("user")
155 if user == "" {
156 errors.WriteHTTPBadRequest(w, "user required", nil)
157 return
158 }
159 rendition := getRendition(r)
160 user, err := a.NormalizeUser(ctx, user)
161 if err != nil {
162 errors.WriteHTTPBadRequest(w, "invalid user", err)
163 return
164 }
165 body, err := io.ReadAll(r.Body)
166 if err != nil {
167 errors.WriteHTTPBadRequest(w, "error reading body", err)
168 return
169 }
170 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
171 answer, err := a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer)
172 if err != nil {
173 errors.WriteHTTPInternalServerError(w, "error playing back", err)
174 return
175 }
176 w.WriteHeader(201)
177 w.Header().Add("Location", r.URL.Path)
178 w.Write([]byte(answer.SDP))
179 }
180}
181
182const BEARER_PREFIX = "Bearer "
183const KEY_PREFIX = "0x"
184
185func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle {
186 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
187 ct := r.Header.Get("Content-Type")
188 if ct != "application/sdp" {
189 errors.WriteHTTPBadRequest(w, "invalid content type", nil)
190 return
191 }
192 var encoded string
193 urlKey := p.ByName("key")
194 if urlKey != "" {
195 encoded = urlKey
196 } else {
197 auth := r.Header.Get("Authorization")
198 if auth == "" {
199 errors.WriteHTTPUnauthorized(w, "authorization header required", nil)
200 return
201 }
202 if !strings.HasPrefix(auth, BEARER_PREFIX) {
203 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil)
204 return
205 }
206 encoded = auth[len(BEARER_PREFIX):]
207 // it's easy to copy-paste a trailing or leading space, so clear those out
208 encoded = strings.TrimSpace(encoded)
209 }
210
211 if len(encoded) < 2 || encoded[0] != 'z' {
212 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a multibase base58btc string)", nil)
213 return
214 }
215
216 var addrBytes []byte
217 var didBytes []byte
218 priv, err := atcrypto.ParsePrivateMultibase(encoded)
219 if err == nil {
220 addrBytes = priv.Bytes()
221 } else {
222 decoded, err := base58.Decode(encoded[1:])
223 if err != nil {
224 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a base58btc string)", nil)
225 return
226 }
227 addrBytes = decoded[:32]
228 didBytes = decoded[32:]
229 priv, err = atcrypto.ParsePrivateBytesK256(addrBytes)
230 if err != nil {
231 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid atcrypto)", err)
232 return
233 }
234 }
235
236 key, _ := secp256k1.PrivKeyFromBytes(addrBytes)
237 if key == nil {
238 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", nil)
239 return
240 }
241 var signer crypto.Signer = key.ToECDSA()
242 pub, err := priv.PublicKey()
243 if err != nil {
244 apierrors.WriteHTTPUnauthorized(w, "invalid authorization key (could not parse as atcrypto)", err)
245 return
246 }
247
248 did := string(didBytes)
249
250 if did != "" {
251 repo, err := a.ATSync.SyncBlueskyRepo(ctx, did, a.Model)
252 if err != nil {
253 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
254 return
255 }
256 err = a.CLI.StreamIsAllowed(repo.DID)
257 if err != nil {
258 apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err)
259 return
260 }
261 signingKey, err := a.Model.GetSigningKey(ctx, pub.DIDKey(), repo.DID)
262 if err != nil {
263 apierrors.WriteHTTPUnauthorized(w, "signing key not found", err)
264 return
265 }
266 if signingKey == nil {
267 apierrors.WriteHTTPUnauthorized(w, "signing key not found", nil)
268 return
269 }
270 } else {
271 atkey, err := atproto.ParsePubKey(signer.Public())
272 if err != nil {
273 apierrors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err)
274 return
275 }
276 did = atkey.DIDKey()
277 err = a.CLI.StreamIsAllowed(did)
278 if err != nil {
279 apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err)
280 return
281 }
282 }
283
284 ctx = log.WithLogValues(ctx, "did", did)
285
286 var mediaSigner media.MediaSigner
287 if a.CLI.ExternalSigning {
288 mediaSigner, err = media.MakeMediaSignerExt(ctx, a.CLI, did, addrBytes)
289 } else {
290 mediaSigner, err = media.MakeMediaSigner(ctx, a.CLI, did, signer)
291 }
292 if err != nil {
293 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err)
294 return
295 }
296
297 body, err := io.ReadAll(r.Body)
298 if err != nil {
299 errors.WriteHTTPBadRequest(w, "error reading body", err)
300 return
301 }
302 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
303 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner)
304 if err != nil {
305 errors.WriteHTTPInternalServerError(w, "error playing back", err)
306 return
307 }
308 host := r.Host
309 if host == "" {
310 host = r.URL.Host
311 }
312 scheme := "http"
313 if r.TLS != nil {
314 scheme = "https"
315 }
316 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host)
317 log.Log(ctx, "location", "location", location)
318 w.Header().Set("Location", location)
319 w.WriteHeader(201)
320 w.Write([]byte(answer.SDP))
321 }
322}
323
324var epoch = time.Unix(0, 0).Format(time.RFC1123)
325
326var noCacheHeaders = map[string]string{
327 "Expires": epoch,
328 "Cache-Control": "no-cache, private, max-age=0",
329 "Pragma": "no-cache",
330 "X-Accel-Expires": "0",
331}
332
333var etagHeaders = []string{
334 "ETag",
335 "If-Modified-Since",
336 "If-Match",
337 "If-None-Match",
338 "If-Range",
339 "If-Unmodified-Since",
340}
341
342func NoCache(h httprouter.Handle) httprouter.Handle {
343 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
344 // Delete any ETag headers that may have been set
345 for _, v := range etagHeaders {
346 if r.Header.Get(v) != "" {
347 r.Header.Del(v)
348 }
349 }
350
351 // Set our NoCache headers
352 for k, v := range noCacheHeaders {
353 w.Header().Set(k, v)
354 }
355
356 h(w, r, p)
357 }
358}
359
360func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle {
361 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
362 user := p.ByName("user")
363 if user == "" {
364 errors.WriteHTTPBadRequest(w, "user required", nil)
365 return
366 }
367 user, err := a.NormalizeUser(ctx, user)
368 if err != nil {
369 errors.WriteHTTPBadRequest(w, "invalid user", err)
370 return
371 }
372 file := p.ByName("file")
373 if file == "" {
374 errors.WriteHTTPBadRequest(w, "file required", nil)
375 return
376 }
377 m3u8, err := a.Director.GetM3U8(ctx, user)
378 if err != nil {
379 errors.WriteHTTPNotFound(w, "could not get m3u8", err)
380 return
381 }
382 session := r.URL.Query().Get("session")
383 rendition := r.URL.Query().Get("rendition")
384 buf, err := m3u8.GetFile(file, session, rendition)
385 if err != nil {
386 errors.WriteHTTPNotFound(w, "segment not found", err)
387 return
388 }
389
390 if strings.HasSuffix(file, ".m3u8") {
391 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
392 } else {
393 if session != "" {
394 spmetrics.SessionSeen(user, session)
395 }
396 w.Header().Set("Content-Type", "video/mp2t")
397 }
398
399 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf))
400 })
401}
402
403func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle {
404 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
405 user := p.ByName("user")
406 if user == "" {
407 errors.WriteHTTPBadRequest(w, "user required", nil)
408 return
409 }
410 user, err := a.NormalizeUser(ctx, user)
411 if err != nil {
412 errors.WriteHTTPNotFound(w, "user not found", err)
413 return
414 }
415 thumb, err := a.Model.LatestThumbnailForUser(user)
416 if err != nil {
417 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err)
418 return
419 }
420 if thumb == nil {
421 errors.WriteHTTPNotFound(w, "thumbnail not found", err)
422 return
423 }
424 aqt := aqtime.FromTime(thumb.Segment.StartTime)
425 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format))
426 if err != nil {
427 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err)
428 return
429 }
430 http.ServeFile(w, r, fpath)
431 }
432}