Live video on the AT Protocol
at eli/routing-cleanup 318 lines 8.8 kB view raw
1package api 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "net/http" 9 "strings" 10 "time" 11 12 "github.com/julienschmidt/httprouter" 13 "github.com/pion/webrtc/v4" 14 "stream.place/streamplace/pkg/aqtime" 15 "stream.place/streamplace/pkg/constants" 16 "stream.place/streamplace/pkg/errors" 17 "stream.place/streamplace/pkg/log" 18 "stream.place/streamplace/pkg/spmetrics" 19 "stream.place/streamplace/pkg/streamplace" 20) 21 22func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) { 23 alias, ok := a.Aliases[user] 24 if ok { 25 user = alias 26 } 27 // did:key, pass through unaltered 28 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) { 29 return user, nil 30 } 31 // only other allowed case is a bluesky handle 32 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user) 33 if err != nil { 34 return "", err 35 } 36 return repo.DID, nil 37} 38 39func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle { 40 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 41 user := p.ByName("user") 42 if user == "" { 43 errors.WriteHTTPBadRequest(w, "user required", nil) 44 return 45 } 46 rendition := getRendition(r) 47 user, err := a.NormalizeUser(ctx, user) 48 if err != nil { 49 errors.WriteHTTPBadRequest(w, "invalid user", err) 50 return 51 } 52 body, err := io.ReadAll(r.Body) 53 if err != nil { 54 errors.WriteHTTPBadRequest(w, "error reading body", err) 55 return 56 } 57 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 58 var answer *webrtc.SessionDescription 59 if a.CLI.NewWebRTCPlayback { 60 answer, err = a.MediaManager.WebRTCPlayback2(ctx, user, rendition, &offer, "") 61 } else { 62 answer, err = a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer) 63 } 64 if err != nil { 65 errors.WriteHTTPInternalServerError(w, "error playing back", err) 66 return 67 } 68 w.WriteHeader(201) 69 w.Header().Add("Location", r.URL.Path) 70 if _, err := w.Write([]byte(answer.SDP)); err != nil { 71 log.Error(ctx, "error writing response", "error", err) 72 } 73 } 74} 75 76const BearerPrefix = "Bearer " 77const KeyPrefix = "0x" 78 79func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle { 80 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 81 ct := r.Header.Get("Content-Type") 82 if ct != "application/sdp" { 83 errors.WriteHTTPBadRequest(w, "invalid content type", nil) 84 return 85 } 86 var encoded string 87 urlKey := p.ByName("key") 88 if urlKey != "" { 89 encoded = urlKey 90 } else { 91 auth := r.Header.Get("Authorization") 92 if auth == "" { 93 errors.WriteHTTPUnauthorized(w, "authorization header required", nil) 94 return 95 } 96 if !strings.HasPrefix(auth, BearerPrefix) { 97 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil) 98 return 99 } 100 encoded = auth[len(BearerPrefix):] 101 // it's easy to copy-paste a trailing or leading space, so clear those out 102 encoded = strings.TrimSpace(encoded) 103 } 104 105 mediaSigner, err := a.MakeMediaSigner(ctx, encoded) 106 if err != nil { 107 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 108 return 109 } 110 111 body, err := io.ReadAll(r.Body) 112 if err != nil { 113 errors.WriteHTTPBadRequest(w, "error reading body", err) 114 return 115 } 116 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 117 pc, err := a.MediaManager.NewPeerConnection(ctx, mediaSigner.Streamer()) 118 if err != nil { 119 errors.WriteHTTPInternalServerError(w, "unable to create peer connection", err) 120 return 121 } 122 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner, pc, make(chan error, 1)) 123 if err != nil { 124 errors.WriteHTTPInternalServerError(w, "error playing back", err) 125 return 126 } 127 host := r.Host 128 if host == "" { 129 host = r.URL.Host 130 } 131 scheme := "http" 132 if r.TLS != nil { 133 scheme = "https" 134 } 135 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host) 136 log.Log(ctx, "location", "location", location) 137 w.Header().Set("Location", location) 138 w.WriteHeader(201) 139 if _, err := w.Write([]byte(answer.SDP)); err != nil { 140 log.Error(ctx, "error writing response", "error", err) 141 } 142 } 143} 144 145var epoch = time.Unix(0, 0).Format(time.RFC1123) 146 147var noCacheHeaders = map[string]string{ 148 "Expires": epoch, 149 "Cache-Control": "no-cache, private, max-age=0", 150 "Pragma": "no-cache", 151 "X-Accel-Expires": "0", 152} 153 154var etagHeaders = []string{ 155 "ETag", 156 "If-Modified-Since", 157 "If-Match", 158 "If-None-Match", 159 "If-Range", 160 "If-Unmodified-Since", 161} 162 163func NoCache(h httprouter.Handle) httprouter.Handle { 164 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 165 // Delete any ETag headers that may have been set 166 for _, v := range etagHeaders { 167 if r.Header.Get(v) != "" { 168 r.Header.Del(v) 169 } 170 } 171 172 // Set our NoCache headers 173 for k, v := range noCacheHeaders { 174 w.Header().Set(k, v) 175 } 176 177 h(w, r, p) 178 } 179} 180 181const SessionExpireTime = 30 * time.Second 182 183func (a *StreamplaceAPI) SessionSeen(ctx context.Context, user string, session string) { 184 now := time.Now() 185 go func() { 186 a.sessionsLock.Lock() 187 defer a.sessionsLock.Unlock() 188 if _, ok := a.sessions[user]; !ok { 189 a.sessions[user] = map[string]time.Time{} 190 } 191 if _, ok := a.sessions[user][session]; !ok { 192 log.Warn(ctx, "ViewerInc", "user", user, "session", session) 193 spmetrics.ViewerInc(user, "hls") 194 a.Bus.IncrementViewerCount(user, "local") 195 } 196 a.sessions[user][session] = now 197 }() 198} 199 200func (a *StreamplaceAPI) ExpireSessions(ctx context.Context) error { 201 for { 202 select { 203 case <-ctx.Done(): 204 return nil 205 case <-time.After(5 * time.Second): 206 a.sessionsLock.Lock() 207 for user, sessions := range a.sessions { 208 for session, seen := range sessions { 209 if time.Since(seen) > SessionExpireTime { 210 delete(sessions, session) 211 spmetrics.ViewerDec(user, "hls") 212 a.Bus.DecrementViewerCount(user, "local") 213 } 214 } 215 } 216 a.sessionsLock.Unlock() 217 } 218 } 219} 220 221func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle { 222 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 223 user := p.ByName("user") 224 if user == "" { 225 errors.WriteHTTPBadRequest(w, "user required", nil) 226 return 227 } 228 user, err := a.NormalizeUser(ctx, user) 229 if err != nil { 230 errors.WriteHTTPBadRequest(w, "invalid user", err) 231 return 232 } 233 file := p.ByName("file") 234 if file == "" { 235 errors.WriteHTTPBadRequest(w, "file required", nil) 236 return 237 } 238 m3u8, err := a.Director.GetM3U8(ctx, user) 239 if err != nil { 240 errors.WriteHTTPNotFound(w, "could not get m3u8", err) 241 return 242 } 243 session := r.URL.Query().Get("session") 244 rendition := r.URL.Query().Get("rendition") 245 buf, err := m3u8.GetFile(file, session, rendition) 246 if err != nil { 247 errors.WriteHTTPNotFound(w, "segment not found", err) 248 return 249 } 250 251 if strings.HasSuffix(file, ".m3u8") { 252 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") 253 } else { 254 if session != "" { 255 a.SessionSeen(ctx, user, session) 256 } 257 w.Header().Set("Content-Type", "video/mp2t") 258 } 259 260 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf)) 261 }) 262} 263 264func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle { 265 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 266 user := p.ByName("user") 267 if user == "" { 268 errors.WriteHTTPBadRequest(w, "user required", nil) 269 return 270 } 271 user, err := a.NormalizeUser(ctx, user) 272 if err != nil { 273 errors.WriteHTTPNotFound(w, "user not found", err) 274 return 275 } 276 if !a.CLI.WideOpen { 277 ls, err := a.Model.GetLatestLivestreamForRepo(user) 278 if err != nil { 279 errors.WriteHTTPInternalServerError(w, "could not get livestream", err) 280 return 281 } 282 if ls == nil { 283 errors.WriteHTTPNotFound(w, "livestream not found", err) 284 return 285 } 286 lsrv, err := ls.ToLivestreamView() 287 if err != nil { 288 errors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 289 return 290 } 291 lsr, ok := lsrv.Record.Val.(*streamplace.Livestream) 292 if !ok { 293 errors.WriteHTTPInternalServerError(w, "livestream is not a streamplace livestream", nil) 294 return 295 } 296 if lsr.EndedAt != nil { 297 errors.WriteHTTPNotFound(w, "livestream has ended", nil) 298 return 299 } 300 } 301 thumb, err := a.LocalDB.LatestThumbnailForUser(user) 302 if err != nil { 303 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err) 304 return 305 } 306 if thumb == nil { 307 errors.WriteHTTPNotFound(w, "thumbnail not found", err) 308 return 309 } 310 aqt := aqtime.FromTime(thumb.Segment.StartTime) 311 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format)) 312 if err != nil { 313 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err) 314 return 315 } 316 http.ServeFile(w, r, fpath) 317 } 318}