Live video on the AT Protocol
1package api
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "net/http"
9 "strings"
10 "time"
11
12 "github.com/julienschmidt/httprouter"
13 "github.com/pion/webrtc/v4"
14 "stream.place/streamplace/pkg/aqtime"
15 "stream.place/streamplace/pkg/constants"
16 "stream.place/streamplace/pkg/errors"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/spmetrics"
19)
20
21func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) {
22 alias, ok := a.Aliases[user]
23 if ok {
24 user = alias
25 }
26 // did:key, pass through unaltered
27 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) {
28 return user, nil
29 }
30 // only other allowed case is a bluesky handle
31 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model)
32 if err != nil {
33 return "", err
34 }
35 return repo.DID, nil
36}
37
38func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle {
39 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
40 user := p.ByName("user")
41 if user == "" {
42 errors.WriteHTTPBadRequest(w, "user required", nil)
43 return
44 }
45 rendition := getRendition(r)
46 user, err := a.NormalizeUser(ctx, user)
47 if err != nil {
48 errors.WriteHTTPBadRequest(w, "invalid user", err)
49 return
50 }
51 body, err := io.ReadAll(r.Body)
52 if err != nil {
53 errors.WriteHTTPBadRequest(w, "error reading body", err)
54 return
55 }
56 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
57 var answer *webrtc.SessionDescription
58 if a.CLI.NewWebRTCPlayback {
59 answer, err = a.MediaManager.WebRTCPlayback2(ctx, user, rendition, &offer)
60 } else {
61 answer, err = a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer)
62 }
63 if err != nil {
64 errors.WriteHTTPInternalServerError(w, "error playing back", err)
65 return
66 }
67 w.WriteHeader(201)
68 w.Header().Add("Location", r.URL.Path)
69 if _, err := w.Write([]byte(answer.SDP)); err != nil {
70 log.Error(ctx, "error writing response", "error", err)
71 }
72 }
73}
74
75const BearerPrefix = "Bearer "
76const KeyPrefix = "0x"
77
78func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle {
79 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
80 ct := r.Header.Get("Content-Type")
81 if ct != "application/sdp" {
82 errors.WriteHTTPBadRequest(w, "invalid content type", nil)
83 return
84 }
85 var encoded string
86 urlKey := p.ByName("key")
87 if urlKey != "" {
88 encoded = urlKey
89 } else {
90 auth := r.Header.Get("Authorization")
91 if auth == "" {
92 errors.WriteHTTPUnauthorized(w, "authorization header required", nil)
93 return
94 }
95 if !strings.HasPrefix(auth, BearerPrefix) {
96 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil)
97 return
98 }
99 encoded = auth[len(BearerPrefix):]
100 // it's easy to copy-paste a trailing or leading space, so clear those out
101 encoded = strings.TrimSpace(encoded)
102 }
103
104 mediaSigner, err := a.MakeMediaSigner(ctx, encoded)
105 if err != nil {
106 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
107 return
108 }
109
110 body, err := io.ReadAll(r.Body)
111 if err != nil {
112 errors.WriteHTTPBadRequest(w, "error reading body", err)
113 return
114 }
115 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
116 pc, err := a.MediaManager.NewPeerConnection(ctx, mediaSigner.Streamer())
117 if err != nil {
118 errors.WriteHTTPInternalServerError(w, "unable to create peer connection", err)
119 return
120 }
121 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner, pc, make(chan struct{}))
122 if err != nil {
123 errors.WriteHTTPInternalServerError(w, "error playing back", err)
124 return
125 }
126 host := r.Host
127 if host == "" {
128 host = r.URL.Host
129 }
130 scheme := "http"
131 if r.TLS != nil {
132 scheme = "https"
133 }
134 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host)
135 log.Log(ctx, "location", "location", location)
136 w.Header().Set("Location", location)
137 w.WriteHeader(201)
138 if _, err := w.Write([]byte(answer.SDP)); err != nil {
139 log.Error(ctx, "error writing response", "error", err)
140 }
141 }
142}
143
144var epoch = time.Unix(0, 0).Format(time.RFC1123)
145
146var noCacheHeaders = map[string]string{
147 "Expires": epoch,
148 "Cache-Control": "no-cache, private, max-age=0",
149 "Pragma": "no-cache",
150 "X-Accel-Expires": "0",
151}
152
153var etagHeaders = []string{
154 "ETag",
155 "If-Modified-Since",
156 "If-Match",
157 "If-None-Match",
158 "If-Range",
159 "If-Unmodified-Since",
160}
161
162func NoCache(h httprouter.Handle) httprouter.Handle {
163 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
164 // Delete any ETag headers that may have been set
165 for _, v := range etagHeaders {
166 if r.Header.Get(v) != "" {
167 r.Header.Del(v)
168 }
169 }
170
171 // Set our NoCache headers
172 for k, v := range noCacheHeaders {
173 w.Header().Set(k, v)
174 }
175
176 h(w, r, p)
177 }
178}
179
180func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle {
181 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
182 user := p.ByName("user")
183 if user == "" {
184 errors.WriteHTTPBadRequest(w, "user required", nil)
185 return
186 }
187 user, err := a.NormalizeUser(ctx, user)
188 if err != nil {
189 errors.WriteHTTPBadRequest(w, "invalid user", err)
190 return
191 }
192 file := p.ByName("file")
193 if file == "" {
194 errors.WriteHTTPBadRequest(w, "file required", nil)
195 return
196 }
197 m3u8, err := a.Director.GetM3U8(ctx, user)
198 if err != nil {
199 errors.WriteHTTPNotFound(w, "could not get m3u8", err)
200 return
201 }
202 session := r.URL.Query().Get("session")
203 rendition := r.URL.Query().Get("rendition")
204 buf, err := m3u8.GetFile(file, session, rendition)
205 if err != nil {
206 errors.WriteHTTPNotFound(w, "segment not found", err)
207 return
208 }
209
210 if strings.HasSuffix(file, ".m3u8") {
211 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
212 } else {
213 if session != "" {
214 spmetrics.SessionSeen(user, session)
215 }
216 w.Header().Set("Content-Type", "video/mp2t")
217 }
218
219 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf))
220 })
221}
222
223func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle {
224 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
225 user := p.ByName("user")
226 if user == "" {
227 errors.WriteHTTPBadRequest(w, "user required", nil)
228 return
229 }
230 user, err := a.NormalizeUser(ctx, user)
231 if err != nil {
232 errors.WriteHTTPNotFound(w, "user not found", err)
233 return
234 }
235 thumb, err := a.Model.LatestThumbnailForUser(user)
236 if err != nil {
237 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err)
238 return
239 }
240 if thumb == nil {
241 errors.WriteHTTPNotFound(w, "thumbnail not found", err)
242 return
243 }
244 aqt := aqtime.FromTime(thumb.Segment.StartTime)
245 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format))
246 if err != nil {
247 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err)
248 return
249 }
250 http.ServeFile(w, r, fpath)
251 }
252}