Live video on the AT Protocol
at eli/rtmprec-2 350 lines 8.2 kB view raw
1// Package main contains an example. 2package api 3 4import ( 5 "context" 6 "crypto/tls" 7 "fmt" 8 "net" 9 "strings" 10 "time" 11 12 "github.com/bluenviron/gortmplib" 13 "github.com/bluenviron/gortsplib/v5/pkg/format" 14 "golang.org/x/sync/errgroup" 15 "stream.place/streamplace/pkg/config" 16 "stream.place/streamplace/pkg/log" 17 "stream.place/streamplace/pkg/media" 18 "stream.place/streamplace/pkg/rtmprec" 19) 20 21// This example shows how to: 22// 1. create a RTMP server 23// 2. accept a stream from a reader. 24// 3. broadcast the stream to readers. 25 26var RTMPTimeout = 10 * time.Second 27 28const RTMPPrefix = "/live/" 29 30func (a *StreamplaceAPI) HandleRTMPPublisher(ctx context.Context, sc *gortmplib.ServerConn) error { 31 err := sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(RTMPTimeout)) 32 if err != nil { 33 return err 34 } 35 36 if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) { 37 return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix) 38 } 39 streamKey := strings.TrimPrefix(sc.URL.Path, RTMPPrefix) 40 mediaSigner, err := a.MakeMediaSigner(ctx, streamKey) 41 if err != nil { 42 return fmt.Errorf("failed to make media signer: %w", err) 43 } 44 45 streamer := mediaSigner.Streamer() 46 ctx = log.WithLogValues(ctx, "streamer", streamer) 47 session := &media.RTMPSession{ 48 EventChan: make(chan any, 1024), 49 MediaSigner: mediaSigner, 50 } 51 a.rtmpSessionsLock.Lock() 52 a.rtmpSessions[streamer] = session 53 a.rtmpSessionsLock.Unlock() 54 55 defer func() { 56 a.rtmpSessionsLock.Lock() 57 delete(a.rtmpSessions, streamer) 58 a.rtmpSessionsLock.Unlock() 59 close(session.EventChan) 60 }() 61 62 r := &gortmplib.Reader{ 63 Conn: sc, 64 } 65 err = r.Initialize() 66 if err != nil { 67 return err 68 } 69 70 for _, track := range r.Tracks() { 71 log.Log(ctx, "get track", "track", track) 72 73 switch track := track.(type) { 74 case *format.H264: 75 session.VideoTrack = track 76 r.OnDataH264(track, func(pts time.Duration, dts time.Duration, au [][]byte) { 77 // log.Log(ctx, "got H264", "len", len(au), "pts", pts, "dts", dts) 78 session.EventChan <- &media.RTMPH264Data{ 79 AU: au, 80 PTS: pts, 81 DTS: dts, 82 } 83 }) 84 85 case *format.MPEG4Audio: 86 session.AudioTrack = track 87 r.OnDataMPEG4Audio(track, func(pts time.Duration, au []byte) { 88 // log.Log(ctx, "got MPEG4Au", "len", len(au), "pts", pts) 89 session.EventChan <- &media.RTMPAACData{ 90 AU: au, 91 PTS: pts, 92 } 93 }) 94 95 default: 96 return fmt.Errorf("unsupported track type: %T", track) 97 } 98 } 99 100 g, ctx := errgroup.WithContext(ctx) 101 g.Go(func() error { 102 for { 103 if ctx.Err() != nil { 104 return ctx.Err() 105 } 106 err = sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(RTMPTimeout)) 107 if err != nil { 108 return err 109 } 110 err = r.Read() 111 if err != nil { 112 return err 113 } 114 } 115 }) 116 117 g.Go(func() error { 118 return a.MediaManager.RTMPIngest(ctx, fmt.Sprintf("rtmp://%s/live/%s", a.rtmpInternalPlaybackAddr, streamer), mediaSigner) 119 }) 120 121 return g.Wait() 122} 123 124func (a *StreamplaceAPI) HandleRTMPPlayback(ctx context.Context, sc *gortmplib.ServerConn) error { 125 if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) { 126 return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix) 127 } 128 streamer := strings.TrimPrefix(sc.URL.Path, RTMPPrefix) 129 a.rtmpSessionsLock.Lock() 130 session, ok := a.rtmpSessions[streamer] 131 a.rtmpSessionsLock.Unlock() 132 if !ok { 133 return fmt.Errorf("RTMP session not found for streamer %s", streamer) 134 } 135 136 w := &gortmplib.Writer{ 137 Conn: sc, 138 Tracks: []format.Format{session.VideoTrack, session.AudioTrack}, 139 } 140 err := w.Initialize() 141 if err != nil { 142 return err 143 } 144 for { 145 select { 146 case <-ctx.Done(): 147 return ctx.Err() 148 case event := <-session.EventChan: 149 if event == nil { 150 return fmt.Errorf("RTMP session closed") 151 } 152 switch event := event.(type) { 153 case *media.RTMPH264Data: 154 err := w.WriteH264(session.VideoTrack, event.PTS, event.DTS, event.AU) 155 if err != nil { 156 return fmt.Errorf("error writing H264: %w", err) 157 } 158 case *media.RTMPAACData: 159 err := w.WriteMPEG4Audio(session.AudioTrack, event.PTS, event.AU) 160 if err != nil { 161 return fmt.Errorf("error writing MPEG4Audio: %w", err) 162 } 163 default: 164 return fmt.Errorf("unsupported event type: %T", event) 165 } 166 } 167 } 168} 169 170func (a *StreamplaceAPI) HandleRTMPPublishConn(ctx context.Context, conn net.Conn) error { 171 err := conn.SetReadDeadline(time.Now().Add(RTMPTimeout)) 172 if err != nil { 173 return err 174 } 175 176 sc := &gortmplib.ServerConn{ 177 RW: conn, 178 } 179 err = sc.Initialize() 180 if err != nil { 181 return err 182 } 183 184 err = sc.Accept() 185 if err != nil { 186 return err 187 } 188 189 if sc.Publish { 190 return a.HandleRTMPPublisher(ctx, sc) 191 } 192 return fmt.Errorf("RTMP playback is not allowed") 193} 194 195func (a *StreamplaceAPI) HandleRTMPPlaybackConn(ctx context.Context, conn net.Conn) error { 196 err := conn.SetReadDeadline(time.Now().Add(RTMPTimeout)) 197 if err != nil { 198 return err 199 } 200 201 sc := &gortmplib.ServerConn{ 202 RW: conn, 203 } 204 err = sc.Initialize() 205 if err != nil { 206 return err 207 } 208 209 err = sc.Accept() 210 if err != nil { 211 return err 212 } 213 214 if !sc.Publish { 215 return a.HandleRTMPPlayback(ctx, sc) 216 } 217 return fmt.Errorf("RTMP playback is not allowed") 218} 219 220func (a *StreamplaceAPI) ServeRTMP(ctx context.Context) error { 221 ln, err := net.Listen("tcp", a.CLI.RTMPAddr) 222 if err != nil { 223 return fmt.Errorf("failed to listen: %w", err) 224 } 225 defer ln.Close() 226 227 go func() { 228 <-ctx.Done() 229 ln.Close() 230 }() 231 232 log.Log(ctx, "rtmp server starting", "addr", a.CLI.RTMPAddr) 233 234 g, ctx := errgroup.WithContext(ctx) 235 g.Go(func() error { 236 return a.ServeRTMPInternalPlayback(ctx) 237 }) 238 g.Go(func() error { 239 for { 240 if ctx.Err() != nil { 241 return ctx.Err() 242 } 243 conn, err := ln.Accept() 244 if err != nil { 245 return fmt.Errorf("error accepting RTMP connection: %w", err) 246 } 247 go func() { 248 err := a.HandleRTMPPublishConn(ctx, conn) 249 if err != nil { 250 log.Error(ctx, "error handling RTMP publish connection", "error", err) 251 } 252 }() 253 } 254 }) 255 256 return g.Wait() 257} 258 259// Serve RTMP internal playback server for gstreamer to pull from 260func (a *StreamplaceAPI) ServeRTMPInternalPlayback(ctx context.Context) error { 261 ln, err := net.Listen("tcp", "127.0.0.1:0") 262 if err != nil { 263 return fmt.Errorf("failed to listen: %w", err) 264 } 265 addr := ln.Addr().String() 266 defer ln.Close() 267 268 _, port, err := net.SplitHostPort(addr) 269 if err != nil { 270 return fmt.Errorf("failed to split host and port: %w", err) 271 } 272 273 go func() { 274 <-ctx.Done() 275 ln.Close() 276 }() 277 278 a.rtmpInternalPlaybackAddr = fmt.Sprintf("127.0.0.1:%s", port) 279 280 log.Log(ctx, "rtmp internal playback server starting", "addr", a.rtmpInternalPlaybackAddr) 281 282 // Accept loop in a goroutine so we can select on context.Done 283 for { 284 if ctx.Err() != nil { 285 return ctx.Err() 286 } 287 conn, err := ln.Accept() 288 if err != nil { 289 return fmt.Errorf("error accepting RTMP connection: %w", err) 290 } 291 292 recordingConn := rtmprec.NewRecordingConn(conn) 293 294 go func() { 295 err := a.HandleRTMPPlaybackConn(ctx, recordingConn) 296 if err != nil { 297 log.Error(ctx, "error handling RTMP internal playback connection", "error", err) 298 } 299 }() 300 } 301} 302 303func (a *StreamplaceAPI) ServeRTMPS(ctx context.Context, cli *config.CLI) error { 304 cert, err := tls.LoadX509KeyPair(cli.TLSCertPath, cli.TLSKeyPath) 305 if err != nil { 306 return fmt.Errorf("failed to load TLS certificate: %w", err) 307 } 308 309 tlsConfig := &tls.Config{ 310 Certificates: []tls.Certificate{cert}, 311 MinVersion: tls.VersionTLS12, 312 } 313 314 ln, err := tls.Listen("tcp", cli.RTMPSAddr, tlsConfig) 315 if err != nil { 316 return fmt.Errorf("failed to create RTMPS listener: %w", err) 317 } 318 319 log.Log(ctx, "rtmps server starting", "addr", cli.RTMPAddr) 320 321 go func() { 322 <-ctx.Done() 323 ln.Close() 324 }() 325 326 g, ctx := errgroup.WithContext(ctx) 327 g.Go(func() error { 328 return a.ServeRTMPInternalPlayback(ctx) 329 }) 330 g.Go(func() error { 331 for { 332 if ctx.Err() != nil { 333 return ctx.Err() 334 } 335 conn, err := ln.Accept() 336 if err != nil { 337 return fmt.Errorf("error accepting RTMP connection: %w", err) 338 } 339 recordingConn := rtmprec.NewRecordingConn(conn) 340 go func() { 341 err := a.HandleRTMPPublishConn(ctx, recordingConn) 342 if err != nil { 343 log.Error(ctx, "error handling RTMP publish connection", "error", err) 344 } 345 }() 346 } 347 }) 348 349 return g.Wait() 350}