Live video on the AT Protocol
at natb/urfave 346 lines 8.1 kB view raw
1// Package main contains an example. 2package api 3 4import ( 5 "context" 6 "crypto/tls" 7 "fmt" 8 "net" 9 "strings" 10 "time" 11 12 "github.com/bluenviron/gortmplib" 13 "github.com/bluenviron/gortsplib/v5/pkg/format" 14 "golang.org/x/sync/errgroup" 15 "stream.place/streamplace/pkg/config" 16 "stream.place/streamplace/pkg/log" 17 "stream.place/streamplace/pkg/media" 18) 19 20// This example shows how to: 21// 1. create a RTMP server 22// 2. accept a stream from a reader. 23// 3. broadcast the stream to readers. 24 25var RTMPTimeout = 10 * time.Second 26 27const RTMPPrefix = "/live/" 28 29func (a *StreamplaceAPI) HandleRTMPPublisher(ctx context.Context, sc *gortmplib.ServerConn) error { 30 err := sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(RTMPTimeout)) 31 if err != nil { 32 return err 33 } 34 35 if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) { 36 return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix) 37 } 38 streamKey := strings.TrimPrefix(sc.URL.Path, RTMPPrefix) 39 mediaSigner, err := a.MakeMediaSigner(ctx, streamKey) 40 if err != nil { 41 return fmt.Errorf("failed to make media signer: %w", err) 42 } 43 44 streamer := mediaSigner.Streamer() 45 ctx = log.WithLogValues(ctx, "streamer", streamer) 46 session := &media.RTMPSession{ 47 EventChan: make(chan any, 1024), 48 MediaSigner: mediaSigner, 49 } 50 a.rtmpSessionsLock.Lock() 51 a.rtmpSessions[streamer] = session 52 a.rtmpSessionsLock.Unlock() 53 54 defer func() { 55 a.rtmpSessionsLock.Lock() 56 delete(a.rtmpSessions, streamer) 57 a.rtmpSessionsLock.Unlock() 58 close(session.EventChan) 59 }() 60 61 r := &gortmplib.Reader{ 62 Conn: sc, 63 } 64 err = r.Initialize() 65 if err != nil { 66 return err 67 } 68 69 for _, track := range r.Tracks() { 70 log.Log(ctx, "get track", "track", track) 71 72 switch track := track.(type) { 73 case *format.H264: 74 session.VideoTrack = track 75 r.OnDataH264(track, func(pts time.Duration, dts time.Duration, au [][]byte) { 76 // log.Log(ctx, "got H264", "len", len(au), "pts", pts, "dts", dts) 77 session.EventChan <- &media.RTMPH264Data{ 78 AU: au, 79 PTS: pts, 80 DTS: dts, 81 } 82 }) 83 84 case *format.MPEG4Audio: 85 session.AudioTrack = track 86 r.OnDataMPEG4Audio(track, func(pts time.Duration, au []byte) { 87 // log.Log(ctx, "got MPEG4Au", "len", len(au), "pts", pts) 88 session.EventChan <- &media.RTMPAACData{ 89 AU: au, 90 PTS: pts, 91 } 92 }) 93 94 default: 95 return fmt.Errorf("unsupported track type: %T", track) 96 } 97 } 98 99 g, ctx := errgroup.WithContext(ctx) 100 g.Go(func() error { 101 for { 102 if ctx.Err() != nil { 103 return ctx.Err() 104 } 105 err = sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(RTMPTimeout)) 106 if err != nil { 107 return err 108 } 109 err = r.Read() 110 if err != nil { 111 return err 112 } 113 } 114 }) 115 116 g.Go(func() error { 117 return a.MediaManager.RTMPIngest(ctx, fmt.Sprintf("rtmp://%s/live/%s", a.rtmpInternalPlaybackAddr, streamer), mediaSigner) 118 }) 119 120 return g.Wait() 121} 122 123func (a *StreamplaceAPI) HandleRTMPPlayback(ctx context.Context, sc *gortmplib.ServerConn) error { 124 if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) { 125 return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix) 126 } 127 streamer := strings.TrimPrefix(sc.URL.Path, RTMPPrefix) 128 a.rtmpSessionsLock.Lock() 129 session, ok := a.rtmpSessions[streamer] 130 a.rtmpSessionsLock.Unlock() 131 if !ok { 132 return fmt.Errorf("RTMP session not found for streamer %s", streamer) 133 } 134 135 w := &gortmplib.Writer{ 136 Conn: sc, 137 Tracks: []format.Format{session.VideoTrack, session.AudioTrack}, 138 } 139 err := w.Initialize() 140 if err != nil { 141 return err 142 } 143 for { 144 select { 145 case <-ctx.Done(): 146 return ctx.Err() 147 case event := <-session.EventChan: 148 if event == nil { 149 return fmt.Errorf("RTMP session closed") 150 } 151 switch event := event.(type) { 152 case *media.RTMPH264Data: 153 err := w.WriteH264(session.VideoTrack, event.PTS, event.DTS, event.AU) 154 if err != nil { 155 return fmt.Errorf("error writing H264: %w", err) 156 } 157 case *media.RTMPAACData: 158 err := w.WriteMPEG4Audio(session.AudioTrack, event.PTS, event.AU) 159 if err != nil { 160 return fmt.Errorf("error writing MPEG4Audio: %w", err) 161 } 162 default: 163 return fmt.Errorf("unsupported event type: %T", event) 164 } 165 } 166 } 167} 168 169func (a *StreamplaceAPI) HandleRTMPPublishConn(ctx context.Context, conn net.Conn) error { 170 err := conn.SetReadDeadline(time.Now().Add(RTMPTimeout)) 171 if err != nil { 172 return err 173 } 174 175 sc := &gortmplib.ServerConn{ 176 RW: conn, 177 } 178 err = sc.Initialize() 179 if err != nil { 180 return err 181 } 182 183 err = sc.Accept() 184 if err != nil { 185 return err 186 } 187 188 if sc.Publish { 189 return a.HandleRTMPPublisher(ctx, sc) 190 } 191 return fmt.Errorf("RTMP playback is not allowed") 192} 193 194func (a *StreamplaceAPI) HandleRTMPPlaybackConn(ctx context.Context, conn net.Conn) error { 195 err := conn.SetReadDeadline(time.Now().Add(RTMPTimeout)) 196 if err != nil { 197 return err 198 } 199 200 sc := &gortmplib.ServerConn{ 201 RW: conn, 202 } 203 err = sc.Initialize() 204 if err != nil { 205 return err 206 } 207 208 err = sc.Accept() 209 if err != nil { 210 return err 211 } 212 213 if !sc.Publish { 214 return a.HandleRTMPPlayback(ctx, sc) 215 } 216 return fmt.Errorf("RTMP playback is not allowed") 217} 218 219func (a *StreamplaceAPI) ServeRTMP(ctx context.Context) error { 220 ln, err := net.Listen("tcp", a.CLI.RTMPAddr) 221 if err != nil { 222 return fmt.Errorf("failed to listen: %w", err) 223 } 224 defer ln.Close() 225 226 go func() { 227 <-ctx.Done() 228 ln.Close() 229 }() 230 231 log.Log(ctx, "rtmp server starting", "addr", a.CLI.RTMPAddr) 232 233 g, ctx := errgroup.WithContext(ctx) 234 g.Go(func() error { 235 return a.ServeRTMPInternalPlayback(ctx) 236 }) 237 g.Go(func() error { 238 for { 239 if ctx.Err() != nil { 240 return ctx.Err() 241 } 242 conn, err := ln.Accept() 243 if err != nil { 244 return fmt.Errorf("error accepting RTMP connection: %w", err) 245 } 246 go func() { 247 err := a.HandleRTMPPublishConn(ctx, conn) 248 if err != nil { 249 log.Error(ctx, "error handling RTMP publish connection", "error", err) 250 } 251 }() 252 } 253 }) 254 255 return g.Wait() 256} 257 258// Serve RTMP internal playback server for gstreamer to pull from 259func (a *StreamplaceAPI) ServeRTMPInternalPlayback(ctx context.Context) error { 260 ln, err := net.Listen("tcp", "127.0.0.1:0") 261 if err != nil { 262 return fmt.Errorf("failed to listen: %w", err) 263 } 264 addr := ln.Addr().String() 265 defer ln.Close() 266 267 _, port, err := net.SplitHostPort(addr) 268 if err != nil { 269 return fmt.Errorf("failed to split host and port: %w", err) 270 } 271 272 go func() { 273 <-ctx.Done() 274 ln.Close() 275 }() 276 277 a.rtmpInternalPlaybackAddr = fmt.Sprintf("127.0.0.1:%s", port) 278 279 log.Log(ctx, "rtmp internal playback server starting", "addr", a.rtmpInternalPlaybackAddr) 280 281 // Accept loop in a goroutine so we can select on context.Done 282 for { 283 if ctx.Err() != nil { 284 return ctx.Err() 285 } 286 conn, err := ln.Accept() 287 if err != nil { 288 return fmt.Errorf("error accepting RTMP connection: %w", err) 289 } 290 291 go func() { 292 err := a.HandleRTMPPlaybackConn(ctx, conn) 293 if err != nil { 294 log.Error(ctx, "error handling RTMP internal playback connection", "error", err) 295 } 296 }() 297 } 298} 299 300func (a *StreamplaceAPI) ServeRTMPS(ctx context.Context, cli *config.CLI) error { 301 cert, err := tls.LoadX509KeyPair(cli.TLSCertPath, cli.TLSKeyPath) 302 if err != nil { 303 return fmt.Errorf("failed to load TLS certificate: %w", err) 304 } 305 306 tlsConfig := &tls.Config{ 307 Certificates: []tls.Certificate{cert}, 308 MinVersion: tls.VersionTLS12, 309 } 310 311 ln, err := tls.Listen("tcp", cli.RTMPSAddr, tlsConfig) 312 if err != nil { 313 return fmt.Errorf("failed to create RTMPS listener: %w", err) 314 } 315 316 log.Log(ctx, "rtmps server starting", "addr", cli.RTMPAddr) 317 318 go func() { 319 <-ctx.Done() 320 ln.Close() 321 }() 322 323 g, ctx := errgroup.WithContext(ctx) 324 g.Go(func() error { 325 return a.ServeRTMPInternalPlayback(ctx) 326 }) 327 g.Go(func() error { 328 for { 329 if ctx.Err() != nil { 330 return ctx.Err() 331 } 332 conn, err := ln.Accept() 333 if err != nil { 334 return fmt.Errorf("error accepting RTMP connection: %w", err) 335 } 336 go func() { 337 err := a.HandleRTMPPublishConn(ctx, conn) 338 if err != nil { 339 log.Error(ctx, "error handling RTMP publish connection", "error", err) 340 } 341 }() 342 } 343 }) 344 345 return g.Wait() 346}