Live video on the AT Protocol
1// Package main contains an example.
2package api
3
4import (
5 "context"
6 "crypto/tls"
7 "fmt"
8 "net"
9 "strings"
10 "time"
11
12 "github.com/bluenviron/gortmplib"
13 "github.com/bluenviron/gortsplib/v5/pkg/format"
14 "golang.org/x/sync/errgroup"
15 "stream.place/streamplace/pkg/config"
16 "stream.place/streamplace/pkg/log"
17 "stream.place/streamplace/pkg/media"
18 "stream.place/streamplace/pkg/rtmprec"
19)
20
21// This example shows how to:
22// 1. create a RTMP server
23// 2. accept a stream from a reader.
24// 3. broadcast the stream to readers.
25
26var RTMPTimeout = 10 * time.Second
27
28const RTMPPrefix = "/live/"
29
30func (a *StreamplaceAPI) HandleRTMPPublisher(ctx context.Context, sc *gortmplib.ServerConn) error {
31 err := sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(RTMPTimeout))
32 if err != nil {
33 return err
34 }
35
36 if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) {
37 return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix)
38 }
39 streamKey := strings.TrimPrefix(sc.URL.Path, RTMPPrefix)
40 mediaSigner, err := a.MakeMediaSigner(ctx, streamKey)
41 if err != nil {
42 return fmt.Errorf("failed to make media signer: %w", err)
43 }
44
45 streamer := mediaSigner.Streamer()
46 ctx = log.WithLogValues(ctx, "streamer", streamer)
47 session := &media.RTMPSession{
48 EventChan: make(chan any, 1024),
49 MediaSigner: mediaSigner,
50 }
51 a.rtmpSessionsLock.Lock()
52 a.rtmpSessions[streamer] = session
53 a.rtmpSessionsLock.Unlock()
54
55 defer func() {
56 a.rtmpSessionsLock.Lock()
57 delete(a.rtmpSessions, streamer)
58 a.rtmpSessionsLock.Unlock()
59 close(session.EventChan)
60 }()
61
62 r := &gortmplib.Reader{
63 Conn: sc,
64 }
65 err = r.Initialize()
66 if err != nil {
67 return err
68 }
69
70 for _, track := range r.Tracks() {
71 log.Log(ctx, "get track", "track", track)
72
73 switch track := track.(type) {
74 case *format.H264:
75 session.VideoTrack = track
76 r.OnDataH264(track, func(pts time.Duration, dts time.Duration, au [][]byte) {
77 // log.Log(ctx, "got H264", "len", len(au), "pts", pts, "dts", dts)
78 session.EventChan <- &media.RTMPH264Data{
79 AU: au,
80 PTS: pts,
81 DTS: dts,
82 }
83 })
84
85 case *format.MPEG4Audio:
86 session.AudioTrack = track
87 r.OnDataMPEG4Audio(track, func(pts time.Duration, au []byte) {
88 // log.Log(ctx, "got MPEG4Au", "len", len(au), "pts", pts)
89 session.EventChan <- &media.RTMPAACData{
90 AU: au,
91 PTS: pts,
92 }
93 })
94
95 default:
96 return fmt.Errorf("unsupported track type: %T", track)
97 }
98 }
99
100 g, ctx := errgroup.WithContext(ctx)
101 g.Go(func() error {
102 for {
103 if ctx.Err() != nil {
104 return ctx.Err()
105 }
106 err = sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(RTMPTimeout))
107 if err != nil {
108 return err
109 }
110 err = r.Read()
111 if err != nil {
112 return err
113 }
114 }
115 })
116
117 g.Go(func() error {
118 return a.MediaManager.RTMPIngest(ctx, fmt.Sprintf("rtmp://%s/live/%s", a.rtmpInternalPlaybackAddr, streamer), mediaSigner)
119 })
120
121 return g.Wait()
122}
123
124func (a *StreamplaceAPI) HandleRTMPPlayback(ctx context.Context, sc *gortmplib.ServerConn) error {
125 if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) {
126 return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix)
127 }
128 streamer := strings.TrimPrefix(sc.URL.Path, RTMPPrefix)
129 a.rtmpSessionsLock.Lock()
130 session, ok := a.rtmpSessions[streamer]
131 a.rtmpSessionsLock.Unlock()
132 if !ok {
133 return fmt.Errorf("RTMP session not found for streamer %s", streamer)
134 }
135
136 w := &gortmplib.Writer{
137 Conn: sc,
138 Tracks: []format.Format{session.VideoTrack, session.AudioTrack},
139 }
140 err := w.Initialize()
141 if err != nil {
142 return err
143 }
144 for {
145 select {
146 case <-ctx.Done():
147 return ctx.Err()
148 case event := <-session.EventChan:
149 if event == nil {
150 return fmt.Errorf("RTMP session closed")
151 }
152 switch event := event.(type) {
153 case *media.RTMPH264Data:
154 err := w.WriteH264(session.VideoTrack, event.PTS, event.DTS, event.AU)
155 if err != nil {
156 return fmt.Errorf("error writing H264: %w", err)
157 }
158 case *media.RTMPAACData:
159 err := w.WriteMPEG4Audio(session.AudioTrack, event.PTS, event.AU)
160 if err != nil {
161 return fmt.Errorf("error writing MPEG4Audio: %w", err)
162 }
163 default:
164 return fmt.Errorf("unsupported event type: %T", event)
165 }
166 }
167 }
168}
169
170func (a *StreamplaceAPI) HandleRTMPPublishConn(ctx context.Context, conn net.Conn) error {
171 err := conn.SetReadDeadline(time.Now().Add(RTMPTimeout))
172 if err != nil {
173 return err
174 }
175
176 sc := &gortmplib.ServerConn{
177 RW: conn,
178 }
179 err = sc.Initialize()
180 if err != nil {
181 return err
182 }
183
184 err = sc.Accept()
185 if err != nil {
186 return err
187 }
188
189 if sc.Publish {
190 return a.HandleRTMPPublisher(ctx, sc)
191 }
192 return fmt.Errorf("RTMP playback is not allowed")
193}
194
195func (a *StreamplaceAPI) HandleRTMPPlaybackConn(ctx context.Context, conn net.Conn) error {
196 err := conn.SetReadDeadline(time.Now().Add(RTMPTimeout))
197 if err != nil {
198 return err
199 }
200
201 sc := &gortmplib.ServerConn{
202 RW: conn,
203 }
204 err = sc.Initialize()
205 if err != nil {
206 return err
207 }
208
209 err = sc.Accept()
210 if err != nil {
211 return err
212 }
213
214 if !sc.Publish {
215 return a.HandleRTMPPlayback(ctx, sc)
216 }
217 return fmt.Errorf("RTMP playback is not allowed")
218}
219
220func (a *StreamplaceAPI) ServeRTMP(ctx context.Context) error {
221 ln, err := net.Listen("tcp", a.CLI.RTMPAddr)
222 if err != nil {
223 return fmt.Errorf("failed to listen: %w", err)
224 }
225 defer ln.Close()
226
227 go func() {
228 <-ctx.Done()
229 ln.Close()
230 }()
231
232 log.Log(ctx, "rtmp server starting", "addr", a.CLI.RTMPAddr)
233
234 g, ctx := errgroup.WithContext(ctx)
235 g.Go(func() error {
236 return a.ServeRTMPInternalPlayback(ctx)
237 })
238 g.Go(func() error {
239 for {
240 if ctx.Err() != nil {
241 return ctx.Err()
242 }
243 conn, err := ln.Accept()
244 if err != nil {
245 return fmt.Errorf("error accepting RTMP connection: %w", err)
246 }
247 go func() {
248 err := a.HandleRTMPPublishConn(ctx, conn)
249 if err != nil {
250 log.Error(ctx, "error handling RTMP publish connection", "error", err)
251 }
252 }()
253 }
254 })
255
256 return g.Wait()
257}
258
259// Serve RTMP internal playback server for gstreamer to pull from
260func (a *StreamplaceAPI) ServeRTMPInternalPlayback(ctx context.Context) error {
261 ln, err := net.Listen("tcp", "127.0.0.1:0")
262 if err != nil {
263 return fmt.Errorf("failed to listen: %w", err)
264 }
265 addr := ln.Addr().String()
266 defer ln.Close()
267
268 _, port, err := net.SplitHostPort(addr)
269 if err != nil {
270 return fmt.Errorf("failed to split host and port: %w", err)
271 }
272
273 go func() {
274 <-ctx.Done()
275 ln.Close()
276 }()
277
278 a.rtmpInternalPlaybackAddr = fmt.Sprintf("127.0.0.1:%s", port)
279
280 log.Log(ctx, "rtmp internal playback server starting", "addr", a.rtmpInternalPlaybackAddr)
281
282 // Accept loop in a goroutine so we can select on context.Done
283 for {
284 if ctx.Err() != nil {
285 return ctx.Err()
286 }
287 conn, err := ln.Accept()
288 if err != nil {
289 return fmt.Errorf("error accepting RTMP connection: %w", err)
290 }
291
292 recordingConn := rtmprec.NewRecordingConn(conn)
293
294 go func() {
295 err := a.HandleRTMPPlaybackConn(ctx, recordingConn)
296 if err != nil {
297 log.Error(ctx, "error handling RTMP internal playback connection", "error", err)
298 }
299 }()
300 }
301}
302
303func (a *StreamplaceAPI) ServeRTMPS(ctx context.Context, cli *config.CLI) error {
304 cert, err := tls.LoadX509KeyPair(cli.TLSCertPath, cli.TLSKeyPath)
305 if err != nil {
306 return fmt.Errorf("failed to load TLS certificate: %w", err)
307 }
308
309 tlsConfig := &tls.Config{
310 Certificates: []tls.Certificate{cert},
311 MinVersion: tls.VersionTLS12,
312 }
313
314 ln, err := tls.Listen("tcp", cli.RTMPSAddr, tlsConfig)
315 if err != nil {
316 return fmt.Errorf("failed to create RTMPS listener: %w", err)
317 }
318
319 log.Log(ctx, "rtmps server starting", "addr", cli.RTMPAddr)
320
321 go func() {
322 <-ctx.Done()
323 ln.Close()
324 }()
325
326 g, ctx := errgroup.WithContext(ctx)
327 g.Go(func() error {
328 return a.ServeRTMPInternalPlayback(ctx)
329 })
330 g.Go(func() error {
331 for {
332 if ctx.Err() != nil {
333 return ctx.Err()
334 }
335 conn, err := ln.Accept()
336 if err != nil {
337 return fmt.Errorf("error accepting RTMP connection: %w", err)
338 }
339 recordingConn := rtmprec.NewRecordingConn(conn)
340 go func() {
341 err := a.HandleRTMPPublishConn(ctx, recordingConn)
342 if err != nil {
343 log.Error(ctx, "error handling RTMP publish connection", "error", err)
344 }
345 }()
346 }
347 })
348
349 return g.Wait()
350}