Live video on the AT Protocol
1// Package main contains an example.
2package api
3
4import (
5 "context"
6 "crypto/tls"
7 "fmt"
8 "net"
9 "strings"
10 "time"
11
12 "github.com/bluenviron/gortmplib"
13 "github.com/bluenviron/gortsplib/v5/pkg/format"
14 "golang.org/x/sync/errgroup"
15 "stream.place/streamplace/pkg/config"
16 "stream.place/streamplace/pkg/log"
17 "stream.place/streamplace/pkg/media"
18)
19
20// This example shows how to:
21// 1. create a RTMP server
22// 2. accept a stream from a reader.
23// 3. broadcast the stream to readers.
24
25var RTMPTimeout = 10 * time.Second
26
27const RTMPPrefix = "/live/"
28
29func (a *StreamplaceAPI) HandleRTMPPublisher(ctx context.Context, sc *gortmplib.ServerConn) error {
30 err := sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(RTMPTimeout))
31 if err != nil {
32 return err
33 }
34
35 if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) {
36 return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix)
37 }
38 streamKey := strings.TrimPrefix(sc.URL.Path, RTMPPrefix)
39 mediaSigner, err := a.MakeMediaSigner(ctx, streamKey)
40 if err != nil {
41 return fmt.Errorf("failed to make media signer: %w", err)
42 }
43
44 streamer := mediaSigner.Streamer()
45 ctx = log.WithLogValues(ctx, "streamer", streamer)
46 session := &media.RTMPSession{
47 EventChan: make(chan any, 1024),
48 MediaSigner: mediaSigner,
49 }
50 a.rtmpSessionsLock.Lock()
51 a.rtmpSessions[streamer] = session
52 a.rtmpSessionsLock.Unlock()
53
54 defer func() {
55 a.rtmpSessionsLock.Lock()
56 delete(a.rtmpSessions, streamer)
57 a.rtmpSessionsLock.Unlock()
58 close(session.EventChan)
59 }()
60
61 r := &gortmplib.Reader{
62 Conn: sc,
63 }
64 err = r.Initialize()
65 if err != nil {
66 return err
67 }
68
69 for _, track := range r.Tracks() {
70 log.Log(ctx, "get track", "track", track)
71
72 switch track := track.(type) {
73 case *format.H264:
74 session.VideoTrack = track
75 r.OnDataH264(track, func(pts time.Duration, dts time.Duration, au [][]byte) {
76 // log.Log(ctx, "got H264", "len", len(au), "pts", pts, "dts", dts)
77 session.EventChan <- &media.RTMPH264Data{
78 AU: au,
79 PTS: pts,
80 DTS: dts,
81 }
82 })
83
84 case *format.MPEG4Audio:
85 session.AudioTrack = track
86 r.OnDataMPEG4Audio(track, func(pts time.Duration, au []byte) {
87 // log.Log(ctx, "got MPEG4Au", "len", len(au), "pts", pts)
88 session.EventChan <- &media.RTMPAACData{
89 AU: au,
90 PTS: pts,
91 }
92 })
93
94 default:
95 return fmt.Errorf("unsupported track type: %T", track)
96 }
97 }
98
99 g, ctx := errgroup.WithContext(ctx)
100 g.Go(func() error {
101 for {
102 if ctx.Err() != nil {
103 return ctx.Err()
104 }
105 err = sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(RTMPTimeout))
106 if err != nil {
107 return err
108 }
109 err = r.Read()
110 if err != nil {
111 return err
112 }
113 }
114 })
115
116 g.Go(func() error {
117 return a.MediaManager.RTMPIngest(ctx, fmt.Sprintf("rtmp://%s/live/%s", a.rtmpInternalPlaybackAddr, streamer), mediaSigner)
118 })
119
120 return g.Wait()
121}
122
123func (a *StreamplaceAPI) HandleRTMPPlayback(ctx context.Context, sc *gortmplib.ServerConn) error {
124 if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) {
125 return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix)
126 }
127 streamer := strings.TrimPrefix(sc.URL.Path, RTMPPrefix)
128 a.rtmpSessionsLock.Lock()
129 session, ok := a.rtmpSessions[streamer]
130 a.rtmpSessionsLock.Unlock()
131 if !ok {
132 return fmt.Errorf("RTMP session not found for streamer %s", streamer)
133 }
134
135 w := &gortmplib.Writer{
136 Conn: sc,
137 Tracks: []format.Format{session.VideoTrack, session.AudioTrack},
138 }
139 err := w.Initialize()
140 if err != nil {
141 return err
142 }
143 for {
144 select {
145 case <-ctx.Done():
146 return ctx.Err()
147 case event := <-session.EventChan:
148 if event == nil {
149 return fmt.Errorf("RTMP session closed")
150 }
151 switch event := event.(type) {
152 case *media.RTMPH264Data:
153 err := w.WriteH264(session.VideoTrack, event.PTS, event.DTS, event.AU)
154 if err != nil {
155 return fmt.Errorf("error writing H264: %w", err)
156 }
157 case *media.RTMPAACData:
158 err := w.WriteMPEG4Audio(session.AudioTrack, event.PTS, event.AU)
159 if err != nil {
160 return fmt.Errorf("error writing MPEG4Audio: %w", err)
161 }
162 default:
163 return fmt.Errorf("unsupported event type: %T", event)
164 }
165 }
166 }
167}
168
169func (a *StreamplaceAPI) HandleRTMPPublishConn(ctx context.Context, conn net.Conn) error {
170 err := conn.SetReadDeadline(time.Now().Add(RTMPTimeout))
171 if err != nil {
172 return err
173 }
174
175 sc := &gortmplib.ServerConn{
176 RW: conn,
177 }
178 err = sc.Initialize()
179 if err != nil {
180 return err
181 }
182
183 err = sc.Accept()
184 if err != nil {
185 return err
186 }
187
188 if sc.Publish {
189 return a.HandleRTMPPublisher(ctx, sc)
190 }
191 return fmt.Errorf("RTMP playback is not allowed")
192}
193
194func (a *StreamplaceAPI) HandleRTMPPlaybackConn(ctx context.Context, conn net.Conn) error {
195 err := conn.SetReadDeadline(time.Now().Add(RTMPTimeout))
196 if err != nil {
197 return err
198 }
199
200 sc := &gortmplib.ServerConn{
201 RW: conn,
202 }
203 err = sc.Initialize()
204 if err != nil {
205 return err
206 }
207
208 err = sc.Accept()
209 if err != nil {
210 return err
211 }
212
213 if !sc.Publish {
214 return a.HandleRTMPPlayback(ctx, sc)
215 }
216 return fmt.Errorf("RTMP playback is not allowed")
217}
218
219func (a *StreamplaceAPI) ServeRTMP(ctx context.Context) error {
220 ln, err := net.Listen("tcp", a.CLI.RTMPAddr)
221 if err != nil {
222 return fmt.Errorf("failed to listen: %w", err)
223 }
224 defer ln.Close()
225
226 go func() {
227 <-ctx.Done()
228 ln.Close()
229 }()
230
231 log.Log(ctx, "rtmp server starting", "addr", a.CLI.RTMPAddr)
232
233 g, ctx := errgroup.WithContext(ctx)
234 g.Go(func() error {
235 return a.ServeRTMPInternalPlayback(ctx)
236 })
237 g.Go(func() error {
238 for {
239 if ctx.Err() != nil {
240 return ctx.Err()
241 }
242 conn, err := ln.Accept()
243 if err != nil {
244 return fmt.Errorf("error accepting RTMP connection: %w", err)
245 }
246 go func() {
247 err := a.HandleRTMPPublishConn(ctx, conn)
248 if err != nil {
249 log.Error(ctx, "error handling RTMP publish connection", "error", err)
250 }
251 }()
252 }
253 })
254
255 return g.Wait()
256}
257
258// Serve RTMP internal playback server for gstreamer to pull from
259func (a *StreamplaceAPI) ServeRTMPInternalPlayback(ctx context.Context) error {
260 ln, err := net.Listen("tcp", "127.0.0.1:0")
261 if err != nil {
262 return fmt.Errorf("failed to listen: %w", err)
263 }
264 addr := ln.Addr().String()
265 defer ln.Close()
266
267 _, port, err := net.SplitHostPort(addr)
268 if err != nil {
269 return fmt.Errorf("failed to split host and port: %w", err)
270 }
271
272 go func() {
273 <-ctx.Done()
274 ln.Close()
275 }()
276
277 a.rtmpInternalPlaybackAddr = fmt.Sprintf("127.0.0.1:%s", port)
278
279 log.Log(ctx, "rtmp internal playback server starting", "addr", a.rtmpInternalPlaybackAddr)
280
281 // Accept loop in a goroutine so we can select on context.Done
282 for {
283 if ctx.Err() != nil {
284 return ctx.Err()
285 }
286 conn, err := ln.Accept()
287 if err != nil {
288 return fmt.Errorf("error accepting RTMP connection: %w", err)
289 }
290
291 go func() {
292 err := a.HandleRTMPPlaybackConn(ctx, conn)
293 if err != nil {
294 log.Error(ctx, "error handling RTMP internal playback connection", "error", err)
295 }
296 }()
297 }
298}
299
300func (a *StreamplaceAPI) ServeRTMPS(ctx context.Context, cli *config.CLI) error {
301 cert, err := tls.LoadX509KeyPair(cli.TLSCertPath, cli.TLSKeyPath)
302 if err != nil {
303 return fmt.Errorf("failed to load TLS certificate: %w", err)
304 }
305
306 tlsConfig := &tls.Config{
307 Certificates: []tls.Certificate{cert},
308 MinVersion: tls.VersionTLS12,
309 }
310
311 ln, err := tls.Listen("tcp", cli.RTMPSAddr, tlsConfig)
312 if err != nil {
313 return fmt.Errorf("failed to create RTMPS listener: %w", err)
314 }
315
316 log.Log(ctx, "rtmps server starting", "addr", cli.RTMPAddr)
317
318 go func() {
319 <-ctx.Done()
320 ln.Close()
321 }()
322
323 g, ctx := errgroup.WithContext(ctx)
324 g.Go(func() error {
325 return a.ServeRTMPInternalPlayback(ctx)
326 })
327 g.Go(func() error {
328 for {
329 if ctx.Err() != nil {
330 return ctx.Err()
331 }
332 conn, err := ln.Accept()
333 if err != nil {
334 return fmt.Errorf("error accepting RTMP connection: %w", err)
335 }
336 go func() {
337 err := a.HandleRTMPPublishConn(ctx, conn)
338 if err != nil {
339 log.Error(ctx, "error handling RTMP publish connection", "error", err)
340 }
341 }()
342 }
343 })
344
345 return g.Wait()
346}