Live video on the AT Protocol
1package api
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "fmt"
8 "io"
9 "net/http"
10 "strconv"
11 "strings"
12 "time"
13
14 "github.com/julienschmidt/httprouter"
15 "github.com/pion/webrtc/v4"
16 "golang.org/x/sync/errgroup"
17 "stream.place/streamplace/pkg/aqtime"
18 "stream.place/streamplace/pkg/constants"
19 "stream.place/streamplace/pkg/errors"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/spmetrics"
22)
23
24func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) {
25 alias, ok := a.Aliases[user]
26 if ok {
27 user = alias
28 }
29 // did:key, pass through unaltered
30 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) {
31 return user, nil
32 }
33 // only other allowed case is a bluesky handle
34 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model)
35 if err != nil {
36 return "", err
37 }
38 return repo.DID, nil
39}
40
41func (a *StreamplaceAPI) HandleMP4Playback(ctx context.Context) httprouter.Handle {
42 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
43 user := p.ByName("user")
44 if user == "" {
45 errors.WriteHTTPBadRequest(w, "user required", nil)
46 return
47 }
48 rendition := getRendition(r)
49 user, err := a.NormalizeUser(ctx, user)
50 if err != nil {
51 errors.WriteHTTPBadRequest(w, "invalid user", err)
52 return
53 }
54 var delayMS int64 = 3000
55 userDelay := r.URL.Query().Get("delayms")
56 if userDelay != "" {
57 var err error
58 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
59 if err != nil {
60 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
61 return
62 }
63 if delayMS > 10000 {
64 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
65 return
66 }
67 }
68 spmetrics.ViewerInc(user)
69 defer spmetrics.ViewerDec(user)
70 w.Header().Set("Content-Type", "video/mp4")
71 w.WriteHeader(200)
72 g, ctx := errgroup.WithContext(ctx)
73 pr, pw := io.Pipe()
74 bufw := bufio.NewWriter(pw)
75 g.Go(func() error {
76 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw)
77 })
78 g.Go(func() error {
79 <-ctx.Done()
80 pr.Close()
81 pw.Close()
82 return nil
83 })
84 g.Go(func() error {
85 time.Sleep(time.Duration(delayMS) * time.Millisecond)
86 _, err := io.Copy(w, pr)
87 return err
88 })
89 if err := g.Wait(); err != nil {
90 errors.WriteHTTPBadRequest(w, "request failed", err)
91 }
92 }
93}
94
95func (a *StreamplaceAPI) HandleMKVPlayback(ctx context.Context) httprouter.Handle {
96 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
97 user := p.ByName("user")
98 if user == "" {
99 errors.WriteHTTPBadRequest(w, "user required", nil)
100 return
101 }
102 rendition := getRendition(r)
103 user, err := a.NormalizeUser(ctx, user)
104 if err != nil {
105 errors.WriteHTTPBadRequest(w, "invalid user", err)
106 return
107 }
108 var delayMS int64 = 1000
109 userDelay := r.URL.Query().Get("delayms")
110 if userDelay != "" {
111 var err error
112 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
113 if err != nil {
114 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
115 return
116 }
117 if delayMS > 10000 {
118 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
119 return
120 }
121 }
122 spmetrics.ViewerInc(user)
123 defer spmetrics.ViewerDec(user)
124 w.Header().Set("Content-Type", "video/webm")
125 w.WriteHeader(200)
126 g, ctx := errgroup.WithContext(ctx)
127 pr, pw := io.Pipe()
128 bufw := bufio.NewWriter(pw)
129 g.Go(func() error {
130 return a.MediaManager.SegmentToMKV(ctx, user, rendition, bufw)
131 })
132 g.Go(func() error {
133 <-ctx.Done()
134 pr.Close()
135 pw.Close()
136 return nil
137 })
138 g.Go(func() error {
139 time.Sleep(time.Duration(delayMS) * time.Millisecond)
140 _, err := io.Copy(w, pr)
141 return err
142 })
143 if err := g.Wait(); err != nil {
144 errors.WriteHTTPBadRequest(w, "request failed", err)
145 }
146 }
147}
148
149func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle {
150 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
151 user := p.ByName("user")
152 if user == "" {
153 errors.WriteHTTPBadRequest(w, "user required", nil)
154 return
155 }
156 rendition := getRendition(r)
157 user, err := a.NormalizeUser(ctx, user)
158 if err != nil {
159 errors.WriteHTTPBadRequest(w, "invalid user", err)
160 return
161 }
162 body, err := io.ReadAll(r.Body)
163 if err != nil {
164 errors.WriteHTTPBadRequest(w, "error reading body", err)
165 return
166 }
167 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
168 var answer *webrtc.SessionDescription
169 if a.CLI.NewWebRTCPlayback {
170 answer, err = a.MediaManager.WebRTCPlayback2(ctx, user, rendition, &offer)
171 } else {
172 answer, err = a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer)
173 }
174 if err != nil {
175 errors.WriteHTTPInternalServerError(w, "error playing back", err)
176 return
177 }
178 w.WriteHeader(201)
179 w.Header().Add("Location", r.URL.Path)
180 if _, err := w.Write([]byte(answer.SDP)); err != nil {
181 log.Error(ctx, "error writing response", "error", err)
182 }
183 }
184}
185
186const BearerPrefix = "Bearer "
187const KeyPrefix = "0x"
188
189func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle {
190 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
191 ct := r.Header.Get("Content-Type")
192 if ct != "application/sdp" {
193 errors.WriteHTTPBadRequest(w, "invalid content type", nil)
194 return
195 }
196 var encoded string
197 urlKey := p.ByName("key")
198 if urlKey != "" {
199 encoded = urlKey
200 } else {
201 auth := r.Header.Get("Authorization")
202 if auth == "" {
203 errors.WriteHTTPUnauthorized(w, "authorization header required", nil)
204 return
205 }
206 if !strings.HasPrefix(auth, BearerPrefix) {
207 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil)
208 return
209 }
210 encoded = auth[len(BearerPrefix):]
211 // it's easy to copy-paste a trailing or leading space, so clear those out
212 encoded = strings.TrimSpace(encoded)
213 }
214
215 mediaSigner, err := a.MakeMediaSigner(ctx, encoded)
216 if err != nil {
217 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
218 return
219 }
220
221 body, err := io.ReadAll(r.Body)
222 if err != nil {
223 errors.WriteHTTPBadRequest(w, "error reading body", err)
224 return
225 }
226 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
227 pc, err := a.MediaManager.NewPeerConnection(ctx, mediaSigner.Streamer())
228 if err != nil {
229 errors.WriteHTTPInternalServerError(w, "unable to create peer connection", err)
230 return
231 }
232 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner, pc, make(chan struct{}))
233 if err != nil {
234 errors.WriteHTTPInternalServerError(w, "error playing back", err)
235 return
236 }
237 host := r.Host
238 if host == "" {
239 host = r.URL.Host
240 }
241 scheme := "http"
242 if r.TLS != nil {
243 scheme = "https"
244 }
245 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host)
246 log.Log(ctx, "location", "location", location)
247 w.Header().Set("Location", location)
248 w.WriteHeader(201)
249 if _, err := w.Write([]byte(answer.SDP)); err != nil {
250 log.Error(ctx, "error writing response", "error", err)
251 }
252 }
253}
254
255var epoch = time.Unix(0, 0).Format(time.RFC1123)
256
257var noCacheHeaders = map[string]string{
258 "Expires": epoch,
259 "Cache-Control": "no-cache, private, max-age=0",
260 "Pragma": "no-cache",
261 "X-Accel-Expires": "0",
262}
263
264var etagHeaders = []string{
265 "ETag",
266 "If-Modified-Since",
267 "If-Match",
268 "If-None-Match",
269 "If-Range",
270 "If-Unmodified-Since",
271}
272
273func NoCache(h httprouter.Handle) httprouter.Handle {
274 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
275 // Delete any ETag headers that may have been set
276 for _, v := range etagHeaders {
277 if r.Header.Get(v) != "" {
278 r.Header.Del(v)
279 }
280 }
281
282 // Set our NoCache headers
283 for k, v := range noCacheHeaders {
284 w.Header().Set(k, v)
285 }
286
287 h(w, r, p)
288 }
289}
290
291func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle {
292 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
293 user := p.ByName("user")
294 if user == "" {
295 errors.WriteHTTPBadRequest(w, "user required", nil)
296 return
297 }
298 user, err := a.NormalizeUser(ctx, user)
299 if err != nil {
300 errors.WriteHTTPBadRequest(w, "invalid user", err)
301 return
302 }
303 file := p.ByName("file")
304 if file == "" {
305 errors.WriteHTTPBadRequest(w, "file required", nil)
306 return
307 }
308 m3u8, err := a.Director.GetM3U8(ctx, user)
309 if err != nil {
310 errors.WriteHTTPNotFound(w, "could not get m3u8", err)
311 return
312 }
313 session := r.URL.Query().Get("session")
314 rendition := r.URL.Query().Get("rendition")
315 buf, err := m3u8.GetFile(file, session, rendition)
316 if err != nil {
317 errors.WriteHTTPNotFound(w, "segment not found", err)
318 return
319 }
320
321 if strings.HasSuffix(file, ".m3u8") {
322 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
323 } else {
324 if session != "" {
325 spmetrics.SessionSeen(user, session)
326 }
327 w.Header().Set("Content-Type", "video/mp2t")
328 }
329
330 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf))
331 })
332}
333
334func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle {
335 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
336 user := p.ByName("user")
337 if user == "" {
338 errors.WriteHTTPBadRequest(w, "user required", nil)
339 return
340 }
341 user, err := a.NormalizeUser(ctx, user)
342 if err != nil {
343 errors.WriteHTTPNotFound(w, "user not found", err)
344 return
345 }
346 thumb, err := a.Model.LatestThumbnailForUser(user)
347 if err != nil {
348 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err)
349 return
350 }
351 if thumb == nil {
352 errors.WriteHTTPNotFound(w, "thumbnail not found", err)
353 return
354 }
355 aqt := aqtime.FromTime(thumb.Segment.StartTime)
356 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format))
357 if err != nil {
358 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err)
359 return
360 }
361 http.ServeFile(w, r, fpath)
362 }
363}