Live video on the AT Protocol
1package api
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "fmt"
8 "io"
9 "net/http"
10 "strconv"
11 "strings"
12 "time"
13
14 "github.com/julienschmidt/httprouter"
15 "github.com/pion/webrtc/v4"
16 "golang.org/x/sync/errgroup"
17 "stream.place/streamplace/pkg/aqtime"
18 "stream.place/streamplace/pkg/constants"
19 "stream.place/streamplace/pkg/errors"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/spmetrics"
22)
23
24func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) {
25 alias, ok := a.Aliases[user]
26 if ok {
27 user = alias
28 }
29 // did:key, pass through unaltered
30 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) {
31 return user, nil
32 }
33 // only other allowed case is a bluesky handle
34 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model)
35 if err != nil {
36 return "", err
37 }
38 return repo.DID, nil
39}
40
41func (a *StreamplaceAPI) HandleMP4Playback(ctx context.Context) httprouter.Handle {
42 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
43 user := p.ByName("user")
44 if user == "" {
45 errors.WriteHTTPBadRequest(w, "user required", nil)
46 return
47 }
48 rendition := getRendition(r)
49 user, err := a.NormalizeUser(ctx, user)
50 if err != nil {
51 errors.WriteHTTPBadRequest(w, "invalid user", err)
52 return
53 }
54 var delayMS int64 = 3000
55 userDelay := r.URL.Query().Get("delayms")
56 if userDelay != "" {
57 var err error
58 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
59 if err != nil {
60 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
61 return
62 }
63 if delayMS > 10000 {
64 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
65 return
66 }
67 }
68 spmetrics.ViewerInc(user)
69 defer spmetrics.ViewerDec(user)
70 w.Header().Set("Content-Type", "video/mp4")
71 w.WriteHeader(200)
72 g, ctx := errgroup.WithContext(ctx)
73 pr, pw := io.Pipe()
74 bufw := bufio.NewWriter(pw)
75 g.Go(func() error {
76 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw)
77 })
78 g.Go(func() error {
79 <-ctx.Done()
80 pr.Close()
81 pw.Close()
82 return nil
83 })
84 g.Go(func() error {
85 time.Sleep(time.Duration(delayMS) * time.Millisecond)
86 _, err := io.Copy(w, pr)
87 return err
88 })
89 g.Wait()
90 }
91}
92
93func (a *StreamplaceAPI) HandleMKVPlayback(ctx context.Context) httprouter.Handle {
94 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
95 user := p.ByName("user")
96 if user == "" {
97 errors.WriteHTTPBadRequest(w, "user required", nil)
98 return
99 }
100 rendition := getRendition(r)
101 user, err := a.NormalizeUser(ctx, user)
102 if err != nil {
103 errors.WriteHTTPBadRequest(w, "invalid user", err)
104 return
105 }
106 var delayMS int64 = 1000
107 userDelay := r.URL.Query().Get("delayms")
108 if userDelay != "" {
109 var err error
110 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
111 if err != nil {
112 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
113 return
114 }
115 if delayMS > 10000 {
116 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
117 return
118 }
119 }
120 spmetrics.ViewerInc(user)
121 defer spmetrics.ViewerDec(user)
122 w.Header().Set("Content-Type", "video/webm")
123 w.WriteHeader(200)
124 g, ctx := errgroup.WithContext(ctx)
125 pr, pw := io.Pipe()
126 bufw := bufio.NewWriter(pw)
127 g.Go(func() error {
128 return a.MediaManager.SegmentToMKV(ctx, user, rendition, bufw)
129 })
130 g.Go(func() error {
131 <-ctx.Done()
132 pr.Close()
133 pw.Close()
134 return nil
135 })
136 g.Go(func() error {
137 time.Sleep(time.Duration(delayMS) * time.Millisecond)
138 _, err := io.Copy(w, pr)
139 return err
140 })
141 g.Wait()
142 }
143}
144
145func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle {
146 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
147 user := p.ByName("user")
148 if user == "" {
149 errors.WriteHTTPBadRequest(w, "user required", nil)
150 return
151 }
152 rendition := getRendition(r)
153 user, err := a.NormalizeUser(ctx, user)
154 if err != nil {
155 errors.WriteHTTPBadRequest(w, "invalid user", err)
156 return
157 }
158 body, err := io.ReadAll(r.Body)
159 if err != nil {
160 errors.WriteHTTPBadRequest(w, "error reading body", err)
161 return
162 }
163 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
164 answer, err := a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer)
165 if err != nil {
166 errors.WriteHTTPInternalServerError(w, "error playing back", err)
167 return
168 }
169 w.WriteHeader(201)
170 w.Header().Add("Location", r.URL.Path)
171 w.Write([]byte(answer.SDP))
172 }
173}
174
175const BEARER_PREFIX = "Bearer "
176const KEY_PREFIX = "0x"
177
178func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle {
179 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
180 ct := r.Header.Get("Content-Type")
181 if ct != "application/sdp" {
182 errors.WriteHTTPBadRequest(w, "invalid content type", nil)
183 return
184 }
185 var encoded string
186 urlKey := p.ByName("key")
187 if urlKey != "" {
188 encoded = urlKey
189 } else {
190 auth := r.Header.Get("Authorization")
191 if auth == "" {
192 errors.WriteHTTPUnauthorized(w, "authorization header required", nil)
193 return
194 }
195 if !strings.HasPrefix(auth, BEARER_PREFIX) {
196 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil)
197 return
198 }
199 encoded = auth[len(BEARER_PREFIX):]
200 // it's easy to copy-paste a trailing or leading space, so clear those out
201 encoded = strings.TrimSpace(encoded)
202 }
203
204 mediaSigner, err := a.MakeMediaSigner(ctx, encoded)
205 if err != nil {
206 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
207 return
208 }
209
210 body, err := io.ReadAll(r.Body)
211 if err != nil {
212 errors.WriteHTTPBadRequest(w, "error reading body", err)
213 return
214 }
215 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
216 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner)
217 if err != nil {
218 errors.WriteHTTPInternalServerError(w, "error playing back", err)
219 return
220 }
221 host := r.Host
222 if host == "" {
223 host = r.URL.Host
224 }
225 scheme := "http"
226 if r.TLS != nil {
227 scheme = "https"
228 }
229 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host)
230 log.Log(ctx, "location", "location", location)
231 w.Header().Set("Location", location)
232 w.WriteHeader(201)
233 w.Write([]byte(answer.SDP))
234 }
235}
236
237var epoch = time.Unix(0, 0).Format(time.RFC1123)
238
239var noCacheHeaders = map[string]string{
240 "Expires": epoch,
241 "Cache-Control": "no-cache, private, max-age=0",
242 "Pragma": "no-cache",
243 "X-Accel-Expires": "0",
244}
245
246var etagHeaders = []string{
247 "ETag",
248 "If-Modified-Since",
249 "If-Match",
250 "If-None-Match",
251 "If-Range",
252 "If-Unmodified-Since",
253}
254
255func NoCache(h httprouter.Handle) httprouter.Handle {
256 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
257 // Delete any ETag headers that may have been set
258 for _, v := range etagHeaders {
259 if r.Header.Get(v) != "" {
260 r.Header.Del(v)
261 }
262 }
263
264 // Set our NoCache headers
265 for k, v := range noCacheHeaders {
266 w.Header().Set(k, v)
267 }
268
269 h(w, r, p)
270 }
271}
272
273func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle {
274 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
275 user := p.ByName("user")
276 if user == "" {
277 errors.WriteHTTPBadRequest(w, "user required", nil)
278 return
279 }
280 user, err := a.NormalizeUser(ctx, user)
281 if err != nil {
282 errors.WriteHTTPBadRequest(w, "invalid user", err)
283 return
284 }
285 file := p.ByName("file")
286 if file == "" {
287 errors.WriteHTTPBadRequest(w, "file required", nil)
288 return
289 }
290 m3u8, err := a.Director.GetM3U8(ctx, user)
291 if err != nil {
292 errors.WriteHTTPNotFound(w, "could not get m3u8", err)
293 return
294 }
295 session := r.URL.Query().Get("session")
296 rendition := r.URL.Query().Get("rendition")
297 buf, err := m3u8.GetFile(file, session, rendition)
298 if err != nil {
299 errors.WriteHTTPNotFound(w, "segment not found", err)
300 return
301 }
302
303 if strings.HasSuffix(file, ".m3u8") {
304 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
305 } else {
306 if session != "" {
307 spmetrics.SessionSeen(user, session)
308 }
309 w.Header().Set("Content-Type", "video/mp2t")
310 }
311
312 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf))
313 })
314}
315
316func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle {
317 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
318 user := p.ByName("user")
319 if user == "" {
320 errors.WriteHTTPBadRequest(w, "user required", nil)
321 return
322 }
323 user, err := a.NormalizeUser(ctx, user)
324 if err != nil {
325 errors.WriteHTTPNotFound(w, "user not found", err)
326 return
327 }
328 thumb, err := a.Model.LatestThumbnailForUser(user)
329 if err != nil {
330 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err)
331 return
332 }
333 if thumb == nil {
334 errors.WriteHTTPNotFound(w, "thumbnail not found", err)
335 return
336 }
337 aqt := aqtime.FromTime(thumb.Segment.StartTime)
338 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format))
339 if err != nil {
340 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err)
341 return
342 }
343 http.ServeFile(w, r, fpath)
344 }
345}