Live video on the AT Protocol
at natb/i18next-cli 292 lines 8.0 kB view raw
1package api 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "net/http" 9 "strings" 10 "time" 11 12 "github.com/julienschmidt/httprouter" 13 "github.com/pion/webrtc/v4" 14 "stream.place/streamplace/pkg/aqtime" 15 "stream.place/streamplace/pkg/constants" 16 "stream.place/streamplace/pkg/errors" 17 "stream.place/streamplace/pkg/log" 18 "stream.place/streamplace/pkg/spmetrics" 19) 20 21func (a *StreamplaceAPI) NormalizeUser(ctx context.Context, user string) (string, error) { 22 alias, ok := a.Aliases[user] 23 if ok { 24 user = alias 25 } 26 // did:key, pass through unaltered 27 if strings.HasPrefix(user, constants.DID_KEY_PREFIX) { 28 return user, nil 29 } 30 // only other allowed case is a bluesky handle 31 repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model) 32 if err != nil { 33 return "", err 34 } 35 return repo.DID, nil 36} 37 38func (a *StreamplaceAPI) HandleWebRTCPlayback(ctx context.Context) httprouter.Handle { 39 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 40 user := p.ByName("user") 41 if user == "" { 42 errors.WriteHTTPBadRequest(w, "user required", nil) 43 return 44 } 45 rendition := getRendition(r) 46 user, err := a.NormalizeUser(ctx, user) 47 if err != nil { 48 errors.WriteHTTPBadRequest(w, "invalid user", err) 49 return 50 } 51 body, err := io.ReadAll(r.Body) 52 if err != nil { 53 errors.WriteHTTPBadRequest(w, "error reading body", err) 54 return 55 } 56 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 57 var answer *webrtc.SessionDescription 58 if a.CLI.NewWebRTCPlayback { 59 answer, err = a.MediaManager.WebRTCPlayback2(ctx, user, rendition, &offer) 60 } else { 61 answer, err = a.MediaManager.WebRTCPlayback(ctx, user, rendition, &offer) 62 } 63 if err != nil { 64 errors.WriteHTTPInternalServerError(w, "error playing back", err) 65 return 66 } 67 w.WriteHeader(201) 68 w.Header().Add("Location", r.URL.Path) 69 if _, err := w.Write([]byte(answer.SDP)); err != nil { 70 log.Error(ctx, "error writing response", "error", err) 71 } 72 } 73} 74 75const BearerPrefix = "Bearer " 76const KeyPrefix = "0x" 77 78func (a *StreamplaceAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle { 79 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 80 ct := r.Header.Get("Content-Type") 81 if ct != "application/sdp" { 82 errors.WriteHTTPBadRequest(w, "invalid content type", nil) 83 return 84 } 85 var encoded string 86 urlKey := p.ByName("key") 87 if urlKey != "" { 88 encoded = urlKey 89 } else { 90 auth := r.Header.Get("Authorization") 91 if auth == "" { 92 errors.WriteHTTPUnauthorized(w, "authorization header required", nil) 93 return 94 } 95 if !strings.HasPrefix(auth, BearerPrefix) { 96 errors.WriteHTTPUnauthorized(w, "invalid authorization header (needs Bearer prefix)", nil) 97 return 98 } 99 encoded = auth[len(BearerPrefix):] 100 // it's easy to copy-paste a trailing or leading space, so clear those out 101 encoded = strings.TrimSpace(encoded) 102 } 103 104 mediaSigner, err := a.MakeMediaSigner(ctx, encoded) 105 if err != nil { 106 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 107 return 108 } 109 110 body, err := io.ReadAll(r.Body) 111 if err != nil { 112 errors.WriteHTTPBadRequest(w, "error reading body", err) 113 return 114 } 115 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 116 pc, err := a.MediaManager.NewPeerConnection(ctx, mediaSigner.Streamer()) 117 if err != nil { 118 errors.WriteHTTPInternalServerError(w, "unable to create peer connection", err) 119 return 120 } 121 answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner, pc, make(chan error, 1)) 122 if err != nil { 123 errors.WriteHTTPInternalServerError(w, "error playing back", err) 124 return 125 } 126 host := r.Host 127 if host == "" { 128 host = r.URL.Host 129 } 130 scheme := "http" 131 if r.TLS != nil { 132 scheme = "https" 133 } 134 location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host) 135 log.Log(ctx, "location", "location", location) 136 w.Header().Set("Location", location) 137 w.WriteHeader(201) 138 if _, err := w.Write([]byte(answer.SDP)); err != nil { 139 log.Error(ctx, "error writing response", "error", err) 140 } 141 } 142} 143 144var epoch = time.Unix(0, 0).Format(time.RFC1123) 145 146var noCacheHeaders = map[string]string{ 147 "Expires": epoch, 148 "Cache-Control": "no-cache, private, max-age=0", 149 "Pragma": "no-cache", 150 "X-Accel-Expires": "0", 151} 152 153var etagHeaders = []string{ 154 "ETag", 155 "If-Modified-Since", 156 "If-Match", 157 "If-None-Match", 158 "If-Range", 159 "If-Unmodified-Since", 160} 161 162func NoCache(h httprouter.Handle) httprouter.Handle { 163 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 164 // Delete any ETag headers that may have been set 165 for _, v := range etagHeaders { 166 if r.Header.Get(v) != "" { 167 r.Header.Del(v) 168 } 169 } 170 171 // Set our NoCache headers 172 for k, v := range noCacheHeaders { 173 w.Header().Set(k, v) 174 } 175 176 h(w, r, p) 177 } 178} 179 180const SessionExpireTime = 30 * time.Second 181 182func (a *StreamplaceAPI) SessionSeen(ctx context.Context, user string, session string) { 183 now := time.Now() 184 go func() { 185 a.sessionsLock.Lock() 186 defer a.sessionsLock.Unlock() 187 if _, ok := a.sessions[user]; !ok { 188 a.sessions[user] = map[string]time.Time{} 189 } 190 if _, ok := a.sessions[user][session]; !ok { 191 log.Warn(ctx, "ViewerInc", "user", user, "session", session) 192 spmetrics.ViewerInc(user, "hls") 193 a.Bus.IncrementViewerCount(user, "local") 194 } 195 a.sessions[user][session] = now 196 }() 197} 198 199func (a *StreamplaceAPI) ExpireSessions(ctx context.Context) error { 200 for { 201 select { 202 case <-ctx.Done(): 203 return nil 204 case <-time.After(5 * time.Second): 205 a.sessionsLock.Lock() 206 for user, sessions := range a.sessions { 207 for session, seen := range sessions { 208 if time.Since(seen) > SessionExpireTime { 209 delete(sessions, session) 210 spmetrics.ViewerDec(user, "hls") 211 a.Bus.DecrementViewerCount(user, "local") 212 } 213 } 214 } 215 a.sessionsLock.Unlock() 216 } 217 } 218} 219 220func (a *StreamplaceAPI) HandleHLSPlayback(ctx context.Context) httprouter.Handle { 221 return NoCache(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 222 user := p.ByName("user") 223 if user == "" { 224 errors.WriteHTTPBadRequest(w, "user required", nil) 225 return 226 } 227 user, err := a.NormalizeUser(ctx, user) 228 if err != nil { 229 errors.WriteHTTPBadRequest(w, "invalid user", err) 230 return 231 } 232 file := p.ByName("file") 233 if file == "" { 234 errors.WriteHTTPBadRequest(w, "file required", nil) 235 return 236 } 237 m3u8, err := a.Director.GetM3U8(ctx, user) 238 if err != nil { 239 errors.WriteHTTPNotFound(w, "could not get m3u8", err) 240 return 241 } 242 session := r.URL.Query().Get("session") 243 rendition := r.URL.Query().Get("rendition") 244 buf, err := m3u8.GetFile(file, session, rendition) 245 if err != nil { 246 errors.WriteHTTPNotFound(w, "segment not found", err) 247 return 248 } 249 250 if strings.HasSuffix(file, ".m3u8") { 251 w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") 252 } else { 253 if session != "" { 254 a.SessionSeen(ctx, user, session) 255 } 256 w.Header().Set("Content-Type", "video/mp2t") 257 } 258 259 http.ServeContent(w, r, file, time.Now(), bytes.NewReader(buf)) 260 }) 261} 262 263func (a *StreamplaceAPI) HandleThumbnailPlayback(ctx context.Context) httprouter.Handle { 264 return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 265 user := p.ByName("user") 266 if user == "" { 267 errors.WriteHTTPBadRequest(w, "user required", nil) 268 return 269 } 270 user, err := a.NormalizeUser(ctx, user) 271 if err != nil { 272 errors.WriteHTTPNotFound(w, "user not found", err) 273 return 274 } 275 thumb, err := a.Model.LatestThumbnailForUser(user) 276 if err != nil { 277 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err) 278 return 279 } 280 if thumb == nil { 281 errors.WriteHTTPNotFound(w, "thumbnail not found", err) 282 return 283 } 284 aqt := aqtime.FromTime(thumb.Segment.StartTime) 285 fpath, err := a.CLI.SegmentFilePath(user, fmt.Sprintf("%s.%s", aqt.String(), thumb.Format)) 286 if err != nil { 287 errors.WriteHTTPInternalServerError(w, "could not get segment file path", err) 288 return 289 } 290 http.ServeFile(w, r, fpath) 291 } 292}