Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "io"
8 "net"
9 "net/url"
10 "strings"
11 "time"
12
13 "github.com/go-gst/go-gst/gst"
14 "stream.place/streamplace/pkg/bus"
15 "stream.place/streamplace/pkg/log"
16 "stream.place/streamplace/pkg/streamplace"
17)
18
19func (mm *MediaManager) RTMPPush(ctx context.Context, user string, rendition string, targetView *streamplace.MultistreamDefs_TargetView) error {
20 ctx, cancel := context.WithCancel(ctx)
21 defer cancel()
22 ctx = log.WithLogValues(ctx, "mediafunc", "RTMPPush")
23 rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget)
24 if !ok {
25 return fmt.Errorf("failed to convert target view to multistream target")
26 }
27 targetURL := rec.Url
28
29 pipelineSlice := []string{
30 "flvmux name=muxer ! rtmp2sink name=rtmp2sink",
31 "h264parse name=videoparse ! muxer.video",
32 "opusparse name=audioparse ! opusdec ! audioresample ! fdkaacenc ! muxer.audio",
33 }
34
35 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
36 if err != nil {
37 return fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all
38 }
39
40 rtmp2sink, err := pipeline.GetElementByName("rtmp2sink")
41 if err != nil {
42 return fmt.Errorf("failed to get rtmp2sink element from pipeline: %w", err)
43 }
44
45 u, err := url.Parse(targetURL)
46 if err != nil {
47 return fmt.Errorf("failed to parse target URL: %w", err)
48 }
49 if u.Scheme == "rtmps" {
50 localAddr, err := mm.RunTLSFForwarder(ctx, targetURL)
51 if err != nil {
52 return fmt.Errorf("failed to run TLS forwarder: %w", err)
53 }
54 local := fmt.Sprintf("rtmp://%s%s", localAddr, u.Path)
55 log.Debug(ctx, "running TLS forwarder", "localAddr", local, "destination", targetURL)
56 err = rtmp2sink.SetProperty("location", local)
57 if err != nil {
58 return fmt.Errorf("failed to set rtmp2sink location: %w", err)
59 }
60 } else if u.Scheme == "rtmp" {
61 localAddr, err := mm.RunTCPForwarder(ctx, targetURL)
62 if err != nil {
63 return fmt.Errorf("failed to run TCP forwarder: %w", err)
64 }
65 local := fmt.Sprintf("rtmp://%s%s", localAddr, u.Path)
66 log.Debug(ctx, "running TCP forwarder", "localAddr", local, "destination", targetURL)
67 err = rtmp2sink.SetProperty("location", local)
68 if err != nil {
69 return fmt.Errorf("failed to set rtmp2sink location: %w", err)
70 }
71 } else {
72 return fmt.Errorf("invalid target URL scheme: %s", u.Scheme)
73 }
74
75 go func() {
76 pollFreq := time.Second * 1
77 for {
78 select {
79 case <-ctx.Done():
80 return
81 case <-time.After(pollFreq):
82 prop, err := rtmp2sink.GetProperty("stats")
83 if err != nil {
84 log.Error(ctx, "error getting rtmp2sink peak-kbps", "error", err)
85 continue
86 }
87 if prop == nil {
88 log.Error(ctx, "failed to get rtmp2sink peak-kbps", "prop", prop)
89 continue
90 }
91 propVal, ok := prop.(*gst.Structure)
92 if !ok {
93 log.Error(ctx, "failed to convert rtmp2sink peak-kbps", "prop", prop)
94 continue
95 }
96 outBytesAcked, err := propVal.GetValue("out-bytes-acked")
97 if err != nil {
98 log.Error(ctx, "failed to get rtmp2sink out-bytes-acked", "error", err)
99 continue
100 }
101 outBytesAckedVal, ok := outBytesAcked.(uint64)
102 if !ok {
103 log.Error(ctx, "failed to convert rtmp2sink out-bytes-acked", "prop", prop)
104 continue
105 }
106 if outBytesAckedVal > 0 {
107 err = mm.atsync.StatefulDB.CreateMultistreamEvent(targetView.Uri, fmt.Sprintf("wrote %d bytes", outBytesAckedVal), "active")
108 if err != nil {
109 log.Error(ctx, "failed to create multistream event", "error", err)
110 }
111 // increase pollFreq, once it's working we don't need to spam the database
112 pollFreq = time.Second * 15
113 }
114 log.Debug(ctx, "rtmp2sink out-bytes-acked", "outBytesAckedVal", outBytesAckedVal)
115 }
116
117 }
118 }()
119
120 segBuffer := make(chan *bus.Seg, 1024)
121 go func() {
122 segChan := mm.bus.SubscribeSegment(ctx, user, rendition)
123 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan)
124 for {
125 select {
126 case <-ctx.Done():
127 log.Debug(ctx, "exiting segment reader")
128 return
129 case file := <-segChan.C:
130 log.Debug(ctx, "got segment", "file", file.Filepath)
131 segBuffer <- file
132 }
133 }
134 }()
135
136 segCh := make(chan *bus.Seg)
137 go func() {
138 for {
139 select {
140 case <-ctx.Done():
141 log.Debug(ctx, "exiting segment reader")
142 return
143 case seg := <-segBuffer:
144 select {
145 case <-ctx.Done():
146 return
147 case segCh <- seg:
148 }
149 }
150 }
151 }()
152
153 concatBin, err := ConcatBin(ctx, segCh, true)
154 if err != nil {
155 return fmt.Errorf("failed to create concat bin: %w", err)
156 }
157
158 err = pipeline.Add(concatBin.Element)
159 if err != nil {
160 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
161 }
162
163 videoPad := concatBin.GetStaticPad("video_0")
164 if videoPad == nil {
165 return fmt.Errorf("video pad not found")
166 }
167
168 audioPad := concatBin.GetStaticPad("audio_0")
169 if audioPad == nil {
170 return fmt.Errorf("audio pad not found")
171 }
172
173 videoParse, err := pipeline.GetElementByName("videoparse")
174 if err != nil {
175 return fmt.Errorf("failed to get video sink element from pipeline: %w", err)
176 }
177 videoParsePad := videoParse.GetStaticPad("sink")
178 if videoParsePad == nil {
179 return fmt.Errorf("video parse pad not found")
180 }
181 linked := videoPad.Link(videoParsePad)
182 if linked != gst.PadLinkOK {
183 return fmt.Errorf("failed to link video pad to video parse pad: %v", linked)
184 }
185
186 audioParse, err := pipeline.GetElementByName("audioparse")
187 if err != nil {
188 return fmt.Errorf("failed to get audio parse element from pipeline: %w", err)
189 }
190 audioParsePad := audioParse.GetStaticPad("sink")
191 if audioParsePad == nil {
192 return fmt.Errorf("audio parse pad not found")
193 }
194 linked = audioPad.Link(audioParsePad)
195 if linked != gst.PadLinkOK {
196 return fmt.Errorf("failed to link audio pad to audio parse pad: %v", linked)
197 }
198
199 errCh := make(chan error)
200 go func() {
201 err := HandleBusMessages(ctx, pipeline)
202 log.Log(ctx, "RTMP push pipeline error", "error", err)
203 errCh <- err
204 }()
205
206 err = pipeline.SetState(gst.StatePlaying)
207 if err != nil {
208 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
209 }
210
211 defer func() {
212 log.Log(ctx, "shutting down RTMP push pipeline")
213 err = pipeline.SetState(gst.StateNull)
214 if err != nil {
215 log.Error(ctx, "failed to set pipeline state to null", "error", err)
216 }
217 }()
218
219 return <-errCh
220}
221func (mm *MediaManager) RunTLSFForwarder(ctx context.Context, dest string) (string, error) {
222 destURL, err := url.Parse(dest)
223 if err != nil {
224 return "", fmt.Errorf("failed to parse destination URL: %w", err)
225 }
226 return mm.runForwarder(ctx, dest, func(destHost string) (net.Conn, error) {
227 return tls.Dial("tcp", destHost, &tls.Config{
228 ServerName: destURL.Hostname(),
229 })
230 })
231}
232
233func (mm *MediaManager) RunTCPForwarder(ctx context.Context, dest string) (string, error) {
234 return mm.runForwarder(ctx, dest, func(destHost string) (net.Conn, error) {
235 return net.Dial("tcp", destHost)
236 })
237}
238
239func (mm *MediaManager) runForwarder(ctx context.Context, dest string, dial func(destHost string) (net.Conn, error)) (string, error) {
240 ctx = log.WithLogValues(ctx, "mediafunc", "runForwarder")
241 // Parse the destination URL to extract host and port
242 destURL, err := url.Parse(dest)
243 if err != nil {
244 return "", fmt.Errorf("failed to parse destination URL: %w", err)
245 }
246
247 // Default to port 1935 if not specified
248 destHost := destURL.Host
249 if !strings.Contains(destHost, ":") {
250 destHost = destHost + ":1935"
251 }
252
253 // Listen on a random port
254 listener, err := net.Listen("tcp", "127.0.0.1:0")
255 if err != nil {
256 return "", fmt.Errorf("failed to listen on random port: %w", err)
257 }
258
259 log.Debug(ctx, "RTMP forwarder listening", "localAddr", listener.Addr().String(), "destination", dest)
260
261 go func() {
262 <-ctx.Done()
263 listener.Close()
264 }()
265
266 go func() {
267 defer listener.Close()
268 if ctx.Err() != nil {
269 return
270 }
271 // Accept incoming RTMP connection
272 clientConn, err := listener.Accept()
273 if err != nil {
274 log.Error(ctx, "failed to accept connection", "error", err)
275 return
276 }
277
278 closed := false
279 go func() {
280 <-ctx.Done()
281 if !closed {
282 closed = true
283 clientConn.Close()
284 }
285 }()
286
287 defer func() {
288 if !closed {
289 closed = true
290 clientConn.Close()
291 }
292 }()
293
294 // Establish connection to destination
295 serverConn, err := dial(destHost)
296 if err != nil {
297 log.Error(ctx, "failed to establish connection to destination", "error", err)
298 return
299 }
300 defer serverConn.Close()
301
302 // Proxy data bidirectionally
303 done := make(chan error, 2)
304
305 // Copy from client to server
306 go func() {
307 _, err := io.Copy(serverConn, clientConn)
308 done <- err
309 }()
310
311 // Copy from server to client
312 go func() {
313 _, err := io.Copy(clientConn, serverConn)
314 done <- err
315 }()
316
317 // Wait for either direction to complete or error
318 err = <-done
319 if err != nil {
320 log.Error(ctx, "proxy connection error", "error", err)
321 }
322 }()
323
324 return listener.Addr().String(), nil
325}