Live video on the AT Protocol
1package api
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "net/http"
9 "strings"
10 "time"
11
12 "github.com/julienschmidt/httprouter"
13 "github.com/pion/webrtc/v4"
14 "stream.place/streamplace/pkg/aqtime"
15 "stream.place/streamplace/pkg/constants"
16 "stream.place/streamplace/pkg/errors"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/spmetrics"
19 "stream.place/streamplace/pkg/streamplace"
20)
21
22func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) {
23 alias, ok := a.Aliases[user]
24 if ok {
25 user = alias
26 }
27 // did:key, pass through unaltered
28 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) {
29 return user, nil
30 }
31 // only other allowed case is a bluesky handle
32 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user)
33 if err != nil {
34 return "", err
35 }
36 return repo.DID, nil
37}
38
39func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle {
40 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
41 user := p.ByName("user")
42 if user == "" {
43 errors.WriteHTTPBadRequest(w, "user required", nil)
44 return
45 }
46 rendition := getRendition(r)
47 user, err := a.NormalizeUser(ctx, user)
48 if err != nil {
49 errors.WriteHTTPBadRequest(w, "invalid user", err)
50 return
51 }
52 body, err := io.ReadAll(r.Body)
53 if err != nil {
54 errors.WriteHTTPBadRequest(w, "error reading body", err)
55 return
56 }
57 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
58 var answer *webrtc.SessionDescription
59 if a.CLI.NewWebRTCPlayback {
60 answer, err = a.MediaManager.WebRTCPlayback2(ctx, user, rendition, &offer, "")
61 } else {
62 answer, err = a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer)
63 }
64 if err != nil {
65 errors.WriteHTTPInternalServerError(w, "error playing back", err)
66 return
67 }
68 w.WriteHeader(201)
69 w.Header().Add("Location", r.URL.Path)
70 if _, err := w.Write([]byte(answer.SDP)); err != nil {
71 log.Error(ctx, "error writing response", "error", err)
72 }
73 }
74}
75
76const BearerPrefix = "Bearer "
77const KeyPrefix = "0x"
78
79func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle {
80 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
81 ct := r.Header.Get("Content-Type")
82 if ct != "application/sdp" {
83 errors.WriteHTTPBadRequest(w, "invalid content type", nil)
84 return
85 }
86 var encoded string
87 urlKey := p.ByName("key")
88 if urlKey != "" {
89 encoded = urlKey
90 } else {
91 auth := r.Header.Get("Authorization")
92 if auth == "" {
93 errors.WriteHTTPUnauthorized(w, "authorization header required", nil)
94 return
95 }
96 if !strings.HasPrefix(auth, BearerPrefix) {
97 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil)
98 return
99 }
100 encoded = auth[len(BearerPrefix):]
101 // it's easy to copy-paste a trailing or leading space, so clear those out
102 encoded = strings.TrimSpace(encoded)
103 }
104
105 mediaSigner, err := a.MakeMediaSigner(ctx, encoded)
106 if err != nil {
107 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
108 return
109 }
110
111 body, err := io.ReadAll(r.Body)
112 if err != nil {
113 errors.WriteHTTPBadRequest(w, "error reading body", err)
114 return
115 }
116 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
117 pc, err := a.MediaManager.NewPeerConnection(ctx, mediaSigner.Streamer())
118 if err != nil {
119 errors.WriteHTTPInternalServerError(w, "unable to create peer connection", err)
120 return
121 }
122 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner, pc, make(chan error, 1))
123 if err != nil {
124 errors.WriteHTTPInternalServerError(w, "error playing back", err)
125 return
126 }
127 host := r.Host
128 if host == "" {
129 host = r.URL.Host
130 }
131 scheme := "http"
132 if r.TLS != nil {
133 scheme = "https"
134 }
135 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host)
136 log.Log(ctx, "location", "location", location)
137 w.Header().Set("Location", location)
138 w.WriteHeader(201)
139 if _, err := w.Write([]byte(answer.SDP)); err != nil {
140 log.Error(ctx, "error writing response", "error", err)
141 }
142 }
143}
144
145var epoch = time.Unix(0, 0).Format(time.RFC1123)
146
147var noCacheHeaders = map[string]string{
148 "Expires": epoch,
149 "Cache-Control": "no-cache, private, max-age=0",
150 "Pragma": "no-cache",
151 "X-Accel-Expires": "0",
152}
153
154var etagHeaders = []string{
155 "ETag",
156 "If-Modified-Since",
157 "If-Match",
158 "If-None-Match",
159 "If-Range",
160 "If-Unmodified-Since",
161}
162
163func NoCache(h httprouter.Handle) httprouter.Handle {
164 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
165 // Delete any ETag headers that may have been set
166 for _, v := range etagHeaders {
167 if r.Header.Get(v) != "" {
168 r.Header.Del(v)
169 }
170 }
171
172 // Set our NoCache headers
173 for k, v := range noCacheHeaders {
174 w.Header().Set(k, v)
175 }
176
177 h(w, r, p)
178 }
179}
180
181const SessionExpireTime = 30 * time.Second
182
183func (a *StreamplaceAPI) SessionSeen(ctx context.Context, user string, session string) {
184 now := time.Now()
185 go func() {
186 a.sessionsLock.Lock()
187 defer a.sessionsLock.Unlock()
188 if _, ok := a.sessions[user]; !ok {
189 a.sessions[user] = map[string]time.Time{}
190 }
191 if _, ok := a.sessions[user][session]; !ok {
192 log.Warn(ctx, "ViewerInc", "user", user, "session", session)
193 spmetrics.ViewerInc(user, "hls")
194 a.Bus.IncrementViewerCount(user, "local")
195 }
196 a.sessions[user][session] = now
197 }()
198}
199
200func (a *StreamplaceAPI) ExpireSessions(ctx context.Context) error {
201 for {
202 select {
203 case <-ctx.Done():
204 return nil
205 case <-time.After(5 * time.Second):
206 a.sessionsLock.Lock()
207 for user, sessions := range a.sessions {
208 for session, seen := range sessions {
209 if time.Since(seen) > SessionExpireTime {
210 delete(sessions, session)
211 spmetrics.ViewerDec(user, "hls")
212 a.Bus.DecrementViewerCount(user, "local")
213 }
214 }
215 }
216 a.sessionsLock.Unlock()
217 }
218 }
219}
220
221func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle {
222 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
223 user := p.ByName("user")
224 if user == "" {
225 errors.WriteHTTPBadRequest(w, "user required", nil)
226 return
227 }
228 user, err := a.NormalizeUser(ctx, user)
229 if err != nil {
230 errors.WriteHTTPBadRequest(w, "invalid user", err)
231 return
232 }
233 file := p.ByName("file")
234 if file == "" {
235 errors.WriteHTTPBadRequest(w, "file required", nil)
236 return
237 }
238 m3u8, err := a.Director.GetM3U8(ctx, user)
239 if err != nil {
240 errors.WriteHTTPNotFound(w, "could not get m3u8", err)
241 return
242 }
243 session := r.URL.Query().Get("session")
244 rendition := r.URL.Query().Get("rendition")
245 buf, err := m3u8.GetFile(file, session, rendition)
246 if err != nil {
247 errors.WriteHTTPNotFound(w, "segment not found", err)
248 return
249 }
250
251 if strings.HasSuffix(file, ".m3u8") {
252 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
253 } else {
254 if session != "" {
255 a.SessionSeen(ctx, user, session)
256 }
257 w.Header().Set("Content-Type", "video/mp2t")
258 }
259
260 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf))
261 })
262}
263
264func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle {
265 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
266 user := p.ByName("user")
267 if user == "" {
268 errors.WriteHTTPBadRequest(w, "user required", nil)
269 return
270 }
271 user, err := a.NormalizeUser(ctx, user)
272 if err != nil {
273 errors.WriteHTTPNotFound(w, "user not found", err)
274 return
275 }
276 if !a.CLI.WideOpen {
277 ls, err := a.Model.GetLatestLivestreamForRepo(user)
278 if err != nil {
279 errors.WriteHTTPInternalServerError(w, "could not get livestream", err)
280 return
281 }
282 if ls == nil {
283 errors.WriteHTTPNotFound(w, "livestream not found", err)
284 return
285 }
286 lsrv, err := ls.ToLivestreamView()
287 if err != nil {
288 errors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
289 return
290 }
291 lsr, ok := lsrv.Record.Val.(*streamplace.Livestream)
292 if !ok {
293 errors.WriteHTTPInternalServerError(w, "livestream is not a streamplace livestream", nil)
294 return
295 }
296 if lsr.EndedAt != nil {
297 errors.WriteHTTPNotFound(w, "livestream has ended", nil)
298 return
299 }
300 }
301 thumb, err := a.LocalDB.LatestThumbnailForUser(user)
302 if err != nil {
303 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err)
304 return
305 }
306 if thumb == nil {
307 errors.WriteHTTPNotFound(w, "thumbnail not found", err)
308 return
309 }
310 aqt := aqtime.FromTime(thumb.Segment.StartTime)
311 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format))
312 if err != nil {
313 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err)
314 return
315 }
316 http.ServeFile(w, r, fpath)
317 }
318}