Live video on the AT Protocol
1package api
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "crypto"
8 "fmt"
9 "io"
10 "net/http"
11 "strconv"
12 "strings"
13 "time"
14
15 atcrypto "github.com/bluesky-social/indigo/atproto/crypto"
16 "github.com/decred/dcrd/dcrec/secp256k1"
17 "github.com/julienschmidt/httprouter"
18 "github.com/mr-tron/base58"
19 "github.com/pion/webrtc/v4"
20 "golang.org/x/sync/errgroup"
21 "stream.place/streamplace/pkg/aqtime"
22 "stream.place/streamplace/pkg/atproto"
23 "stream.place/streamplace/pkg/constants"
24 "stream.place/streamplace/pkg/errors"
25 apierrors "stream.place/streamplace/pkg/errors"
26 "stream.place/streamplace/pkg/log"
27 "stream.place/streamplace/pkg/media"
28 "stream.place/streamplace/pkg/spmetrics"
29)
30
31func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) {
32 alias, ok := a.Aliases[user]
33 if ok {
34 user = alias
35 }
36 // did:key, pass through unaltered
37 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) {
38 return user, nil
39 }
40 // only other allowed case is a bluesky handle
41 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model)
42 if err != nil {
43 return "", err
44 }
45 return repo.DID, nil
46}
47
48func (a *StreamplaceAPI) HandleMP4Playback(ctx context.Context) httprouter.Handle {
49 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
50 user := p.ByName("user")
51 if user == "" {
52 errors.WriteHTTPBadRequest(w, "user required", nil)
53 return
54 }
55 rendition := getRendition(r)
56 user, err := a.NormalizeUser(ctx, user)
57 if err != nil {
58 errors.WriteHTTPBadRequest(w, "invalid user", err)
59 return
60 }
61 var delayMS int64 = 3000
62 userDelay := r.URL.Query().Get("delayms")
63 if userDelay != "" {
64 var err error
65 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
66 if err != nil {
67 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
68 return
69 }
70 if delayMS > 10000 {
71 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
72 return
73 }
74 }
75 spmetrics.ViewerInc(user)
76 defer spmetrics.ViewerDec(user)
77 w.Header().Set("Content-Type", "video/mp4")
78 w.WriteHeader(200)
79 g, ctx := errgroup.WithContext(ctx)
80 pr, pw := io.Pipe()
81 bufw := bufio.NewWriter(pw)
82 g.Go(func() error {
83 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw)
84 })
85 g.Go(func() error {
86 <-ctx.Done()
87 pr.Close()
88 pw.Close()
89 return nil
90 })
91 g.Go(func() error {
92 time.Sleep(time.Duration(delayMS) * time.Millisecond)
93 _, err := io.Copy(w, pr)
94 return err
95 })
96 g.Wait()
97 }
98}
99
100func (a *StreamplaceAPI) HandleMKVPlayback(ctx context.Context) httprouter.Handle {
101 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
102 user := p.ByName("user")
103 if user == "" {
104 errors.WriteHTTPBadRequest(w, "user required", nil)
105 return
106 }
107 rendition := getRendition(r)
108 user, err := a.NormalizeUser(ctx, user)
109 if err != nil {
110 errors.WriteHTTPBadRequest(w, "invalid user", err)
111 return
112 }
113 var delayMS int64 = 1000
114 userDelay := r.URL.Query().Get("delayms")
115 if userDelay != "" {
116 var err error
117 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
118 if err != nil {
119 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
120 return
121 }
122 if delayMS > 10000 {
123 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
124 return
125 }
126 }
127 spmetrics.ViewerInc(user)
128 defer spmetrics.ViewerDec(user)
129 w.Header().Set("Content-Type", "video/webm")
130 w.WriteHeader(200)
131 g, ctx := errgroup.WithContext(ctx)
132 pr, pw := io.Pipe()
133 bufw := bufio.NewWriter(pw)
134 g.Go(func() error {
135 return a.MediaManager.SegmentToMKV(ctx, user, rendition, bufw)
136 })
137 g.Go(func() error {
138 <-ctx.Done()
139 pr.Close()
140 pw.Close()
141 return nil
142 })
143 g.Go(func() error {
144 time.Sleep(time.Duration(delayMS) * time.Millisecond)
145 _, err := io.Copy(w, pr)
146 return err
147 })
148 g.Wait()
149 }
150}
151
152func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle {
153 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
154 user := p.ByName("user")
155 if user == "" {
156 errors.WriteHTTPBadRequest(w, "user required", nil)
157 return
158 }
159 rendition := getRendition(r)
160 user, err := a.NormalizeUser(ctx, user)
161 if err != nil {
162 errors.WriteHTTPBadRequest(w, "invalid user", err)
163 return
164 }
165 body, err := io.ReadAll(r.Body)
166 if err != nil {
167 errors.WriteHTTPBadRequest(w, "error reading body", err)
168 return
169 }
170 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
171 answer, err := a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer)
172 if err != nil {
173 errors.WriteHTTPInternalServerError(w, "error playing back", err)
174 return
175 }
176 w.WriteHeader(201)
177 w.Header().Add("Location", r.URL.Path)
178 w.Write([]byte(answer.SDP))
179 }
180}
181
182const BEARER_PREFIX = "Bearer "
183const KEY_PREFIX = "0x"
184
185func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle {
186 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
187 ct := r.Header.Get("Content-Type")
188 if ct != "application/sdp" {
189 errors.WriteHTTPBadRequest(w, "invalid content type", nil)
190 return
191 }
192 var encoded string
193 urlKey := p.ByName("key")
194 if urlKey != "" {
195 encoded = urlKey
196 } else {
197 auth := r.Header.Get("Authorization")
198 if auth == "" {
199 errors.WriteHTTPUnauthorized(w, "authorization header required", nil)
200 return
201 }
202 if !strings.HasPrefix(auth, BEARER_PREFIX) {
203 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil)
204 return
205 }
206 encoded = auth[len(BEARER_PREFIX):]
207 // it's easy to copy-paste a trailing or leading space, so clear those out
208 encoded = strings.TrimSpace(encoded)
209 }
210
211 if len(encoded) < 2 || encoded[0] != 'z' {
212 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a multibase base58btc string)", nil)
213 return
214 }
215
216 var addrBytes []byte
217 var didBytes []byte
218 priv, err := atcrypto.ParsePrivateMultibase(encoded)
219 if err == nil {
220 addrBytes = priv.Bytes()
221 } else {
222 decoded, err := base58.Decode(encoded[1:])
223 if err != nil {
224 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a base58btc string)", nil)
225 return
226 }
227 addrBytes = decoded[:32]
228 didBytes = decoded[32:]
229 }
230
231 key, _ := secp256k1.PrivKeyFromBytes(addrBytes)
232 if key == nil {
233 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", nil)
234 return
235 }
236 var signer crypto.Signer = key.ToECDSA()
237
238 did := string(didBytes)
239
240 if did != "" {
241 repo, err := a.ATSync.SyncBlueskyRepo(ctx, did, a.Model)
242 if err != nil {
243 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
244 return
245 }
246 err = a.CLI.StreamIsAllowed(repo.DID)
247 if err != nil {
248 apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err)
249 return
250 }
251 } else {
252 atkey, err := atproto.ParsePubKey(signer.Public())
253 if err != nil {
254 apierrors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err)
255 return
256 }
257 did = atkey.DIDKey()
258 err = a.CLI.StreamIsAllowed(did)
259 if err != nil {
260 apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err)
261 return
262 }
263 }
264
265 ctx = log.WithLogValues(ctx, "did", did)
266
267 var mediaSigner media.MediaSigner
268 if a.CLI.ExternalSigning {
269 mediaSigner, err = media.MakeMediaSignerExt(ctx, a.CLI, did, addrBytes)
270 } else {
271 mediaSigner, err = media.MakeMediaSigner(ctx, a.CLI, did, signer)
272 }
273 if err != nil {
274 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err)
275 return
276 }
277
278 body, err := io.ReadAll(r.Body)
279 if err != nil {
280 errors.WriteHTTPBadRequest(w, "error reading body", err)
281 return
282 }
283 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
284 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner)
285 if err != nil {
286 errors.WriteHTTPInternalServerError(w, "error playing back", err)
287 return
288 }
289 host := r.Host
290 if host == "" {
291 host = r.URL.Host
292 }
293 scheme := "http"
294 if r.TLS != nil {
295 scheme = "https"
296 }
297 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host)
298 log.Log(ctx, "location", "location", location)
299 w.Header().Set("Location", location)
300 w.WriteHeader(201)
301 w.Write([]byte(answer.SDP))
302 }
303}
304
305var epoch = time.Unix(0, 0).Format(time.RFC1123)
306
307var noCacheHeaders = map[string]string{
308 "Expires": epoch,
309 "Cache-Control": "no-cache, private, max-age=0",
310 "Pragma": "no-cache",
311 "X-Accel-Expires": "0",
312}
313
314var etagHeaders = []string{
315 "ETag",
316 "If-Modified-Since",
317 "If-Match",
318 "If-None-Match",
319 "If-Range",
320 "If-Unmodified-Since",
321}
322
323func NoCache(h httprouter.Handle) httprouter.Handle {
324 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
325 // Delete any ETag headers that may have been set
326 for _, v := range etagHeaders {
327 if r.Header.Get(v) != "" {
328 r.Header.Del(v)
329 }
330 }
331
332 // Set our NoCache headers
333 for k, v := range noCacheHeaders {
334 w.Header().Set(k, v)
335 }
336
337 h(w, r, p)
338 }
339}
340
341func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle {
342 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
343 user := p.ByName("user")
344 if user == "" {
345 errors.WriteHTTPBadRequest(w, "user required", nil)
346 return
347 }
348 user, err := a.NormalizeUser(ctx, user)
349 if err != nil {
350 errors.WriteHTTPBadRequest(w, "invalid user", err)
351 return
352 }
353 file := p.ByName("file")
354 if file == "" {
355 errors.WriteHTTPBadRequest(w, "file required", nil)
356 return
357 }
358 m3u8, err := a.Director.GetM3U8(ctx, user)
359 if err != nil {
360 errors.WriteHTTPNotFound(w, "could not get m3u8", err)
361 return
362 }
363 session := r.URL.Query().Get("session")
364 rendition := r.URL.Query().Get("rendition")
365 buf, err := m3u8.GetFile(file, session, rendition)
366 if err != nil {
367 errors.WriteHTTPNotFound(w, "segment not found", err)
368 return
369 }
370
371 if strings.HasSuffix(file, ".m3u8") {
372 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
373 } else {
374 if session != "" {
375 spmetrics.SessionSeen(user, session)
376 }
377 w.Header().Set("Content-Type", "video/mp2t")
378 }
379
380 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf))
381 })
382}
383
384func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle {
385 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
386 user := p.ByName("user")
387 if user == "" {
388 errors.WriteHTTPBadRequest(w, "user required", nil)
389 return
390 }
391 user, err := a.NormalizeUser(ctx, user)
392 if err != nil {
393 errors.WriteHTTPNotFound(w, "user not found", err)
394 return
395 }
396 thumb, err := a.Model.LatestThumbnailForUser(user)
397 if err != nil {
398 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err)
399 return
400 }
401 if thumb == nil {
402 errors.WriteHTTPNotFound(w, "thumbnail not found", err)
403 return
404 }
405 aqt := aqtime.FromTime(thumb.Segment.StartTime)
406 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format))
407 if err != nil {
408 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err)
409 return
410 }
411 http.ServeFile(w, r, fpath)
412 }
413}