Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "io"
8 "net"
9 "net/url"
10 "strings"
11 "time"
12
13 "github.com/go-gst/go-gst/gst"
14 "github.com/google/uuid"
15 "stream.place/streamplace/pkg/bus"
16 "stream.place/streamplace/pkg/log"
17 "stream.place/streamplace/pkg/streamplace"
18)
19
20func (mm *MediaManager) RTMPPush(ctx context.Context, user string, rendition string, targetView *streamplace.MultistreamDefs_TargetView) error {
21 uu, err := uuid.NewV7()
22 if err != nil {
23 return err
24 }
25 ctx, cancel := context.WithCancel(ctx)
26 defer cancel()
27 ctx = log.WithLogValues(ctx, "pushID", uu.String())
28 ctx = log.WithLogValues(ctx, "mediafunc", "RTMPPush")
29 rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget)
30 if !ok {
31 return fmt.Errorf("failed to convert target view to multistream target")
32 }
33 targetURL := rec.Url
34
35 pipelineSlice := []string{
36 "flvmux name=muxer ! rtmp2sink name=rtmp2sink",
37 "h264parse name=videoparse ! muxer.video",
38 "opusparse name=audioparse ! opusdec ! fdkaacenc ! muxer.audio",
39 }
40
41 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
42 if err != nil {
43 return fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all
44 }
45
46 rtmp2sink, err := pipeline.GetElementByName("rtmp2sink")
47 if err != nil {
48 return fmt.Errorf("failed to get rtmp2sink element from pipeline: %w", err)
49 }
50
51 u, err := url.Parse(targetURL)
52 if err != nil {
53 return fmt.Errorf("failed to parse target URL: %w", err)
54 }
55 if u.Scheme == "rtmps" {
56 localAddr, err := mm.RunTLSFForwarder(ctx, targetURL)
57 if err != nil {
58 return fmt.Errorf("failed to run TLS forwarder: %w", err)
59 }
60 local := fmt.Sprintf("rtmp://%s%s", localAddr, u.Path)
61 log.Debug(ctx, "running TLS forwarder", "localAddr", local, "destination", targetURL)
62 err = rtmp2sink.SetProperty("location", local)
63 if err != nil {
64 return fmt.Errorf("failed to set rtmp2sink location: %w", err)
65 }
66 } else if u.Scheme == "rtmp" {
67 err = rtmp2sink.SetProperty("location", targetURL)
68 if err != nil {
69 return fmt.Errorf("failed to set rtmp2sink location: %w", err)
70 }
71 } else {
72 return fmt.Errorf("invalid target URL scheme: %s", u.Scheme)
73 }
74
75 go func() {
76 pollFreq := time.Second * 1
77 for {
78 select {
79 case <-ctx.Done():
80 return
81 case <-time.After(pollFreq):
82 prop, err := rtmp2sink.GetProperty("stats")
83 if err != nil {
84 log.Error(ctx, "error getting rtmp2sink peak-kbps", "error", err)
85 continue
86 }
87 if prop == nil {
88 log.Error(ctx, "failed to get rtmp2sink peak-kbps", "prop", prop)
89 continue
90 }
91 propVal, ok := prop.(*gst.Structure)
92 if !ok {
93 log.Error(ctx, "failed to convert rtmp2sink peak-kbps", "prop", prop)
94 continue
95 }
96 outBytesAcked, err := propVal.GetValue("out-bytes-acked")
97 if err != nil {
98 log.Error(ctx, "failed to get rtmp2sink out-bytes-acked", "error", err)
99 continue
100 }
101 outBytesAckedVal, ok := outBytesAcked.(uint64)
102 if !ok {
103 log.Error(ctx, "failed to convert rtmp2sink out-bytes-acked", "prop", prop)
104 continue
105 }
106 if outBytesAckedVal > 0 {
107 err = mm.atsync.StatefulDB.CreateMultistreamEvent(targetView.Uri, fmt.Sprintf("wrote %d bytes", outBytesAckedVal), "active")
108 if err != nil {
109 log.Error(ctx, "failed to create multistream event", "error", err)
110 }
111 // increase pollFreq, once it's working we don't need to spam the database
112 pollFreq = time.Second * 15
113 }
114 log.Debug(ctx, "rtmp2sink out-bytes-acked", "outBytesAckedVal", outBytesAckedVal)
115 }
116
117 }
118 }()
119
120 segBuffer := make(chan *bus.Seg, 1024)
121 go func() {
122 segChan := mm.bus.SubscribeSegment(ctx, user, rendition)
123 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan)
124 for {
125 select {
126 case <-ctx.Done():
127 log.Debug(ctx, "exiting segment reader")
128 return
129 case file := <-segChan.C:
130 log.Debug(ctx, "got segment", "file", file.Filepath)
131 segBuffer <- file
132 }
133 }
134 }()
135
136 segCh := make(chan *bus.Seg)
137 go func() {
138 for {
139 select {
140 case <-ctx.Done():
141 log.Debug(ctx, "exiting segment reader")
142 return
143 case seg := <-segBuffer:
144 select {
145 case <-ctx.Done():
146 return
147 case segCh <- seg:
148 }
149 }
150 }
151 }()
152
153 concatBin, err := ConcatBin(ctx, segCh, true)
154 if err != nil {
155 return fmt.Errorf("failed to create concat bin: %w", err)
156 }
157
158 err = pipeline.Add(concatBin.Element)
159 if err != nil {
160 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
161 }
162
163 videoPad := concatBin.GetStaticPad("video_0")
164 if videoPad == nil {
165 return fmt.Errorf("video pad not found")
166 }
167
168 audioPad := concatBin.GetStaticPad("audio_0")
169 if audioPad == nil {
170 return fmt.Errorf("audio pad not found")
171 }
172
173 videoParse, err := pipeline.GetElementByName("videoparse")
174 if err != nil {
175 return fmt.Errorf("failed to get video sink element from pipeline: %w", err)
176 }
177 videoParsePad := videoParse.GetStaticPad("sink")
178 if videoParsePad == nil {
179 return fmt.Errorf("video parse pad not found")
180 }
181 linked := videoPad.Link(videoParsePad)
182 if linked != gst.PadLinkOK {
183 return fmt.Errorf("failed to link video pad to video parse pad: %v", linked)
184 }
185
186 audioParse, err := pipeline.GetElementByName("audioparse")
187 if err != nil {
188 return fmt.Errorf("failed to get audio parse element from pipeline: %w", err)
189 }
190 audioParsePad := audioParse.GetStaticPad("sink")
191 if audioParsePad == nil {
192 return fmt.Errorf("audio parse pad not found")
193 }
194 linked = audioPad.Link(audioParsePad)
195 if linked != gst.PadLinkOK {
196 return fmt.Errorf("failed to link audio pad to audio parse pad: %v", linked)
197 }
198
199 errCh := make(chan error)
200 go func() {
201 err := HandleBusMessages(ctx, pipeline)
202 errCh <- err
203 }()
204
205 err = pipeline.SetState(gst.StatePlaying)
206 if err != nil {
207 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
208 }
209
210 defer func() {
211 err = pipeline.SetState(gst.StateNull)
212 if err != nil {
213 log.Error(ctx, "failed to set pipeline state to null", "error", err)
214 }
215 }()
216
217 return <-errCh
218}
219func (mm *MediaManager) RunTLSFForwarder(ctx context.Context, dest string) (string, error) {
220 // Parse the destination URL to extract host and port
221 destURL, err := url.Parse(dest)
222 if err != nil {
223 return "", fmt.Errorf("failed to parse destination URL: %w", err)
224 }
225
226 // Default to port 1935 if not specified
227 destHost := destURL.Host
228 if !strings.Contains(destHost, ":") {
229 destHost = destHost + ":1935"
230 }
231
232 // Listen on a random port
233 listener, err := net.Listen("tcp", "127.0.0.1:0")
234 if err != nil {
235 return "", fmt.Errorf("failed to listen on random port: %w", err)
236 }
237
238 log.Debug(ctx, "RTMP to RTMPS forwarder listening", "localAddr", listener.Addr().String(), "destination", dest)
239
240 go func() {
241 <-ctx.Done()
242 listener.Close()
243 }()
244
245 go func() {
246 defer listener.Close()
247 if ctx.Err() != nil {
248 return
249 }
250 // Accept incoming RTMP connection
251 clientConn, err := listener.Accept()
252 if err != nil {
253 log.Error(ctx, "failed to accept connection", "error", err)
254 return
255 }
256
257 // Only one connection so we don't need a goroutine
258 defer clientConn.Close()
259
260 // Establish TLS connection to destination
261 tlsConn, err := tls.Dial("tcp", destHost, &tls.Config{
262 ServerName: destURL.Hostname(),
263 })
264 if err != nil {
265 log.Error(ctx, "failed to establish TLS connection to destination", "error", err)
266 return
267 }
268 defer tlsConn.Close()
269
270 // Proxy data bidirectionally
271 done := make(chan error, 2)
272
273 // Copy from client to server
274 go func() {
275 _, err := io.Copy(tlsConn, clientConn)
276 done <- err
277 }()
278
279 // Copy from server to client
280 go func() {
281 _, err := io.Copy(clientConn, tlsConn)
282 done <- err
283 }()
284
285 // Wait for either direction to complete or error
286 err = <-done
287 if err != nil {
288 log.Error(ctx, "proxy connection error", "error", err)
289 }
290 }()
291
292 return listener.Addr().String(), nil
293}