Live video on the AT Protocol
at natb/command-errors 325 lines 8.8 kB view raw
1package media 2 3import ( 4 "context" 5 "crypto/tls" 6 "fmt" 7 "io" 8 "net" 9 "net/url" 10 "strings" 11 "time" 12 13 "github.com/go-gst/go-gst/gst" 14 "stream.place/streamplace/pkg/bus" 15 "stream.place/streamplace/pkg/log" 16 "stream.place/streamplace/pkg/streamplace" 17) 18 19func (mm *MediaManager) RTMPPush(ctx context.Context, user string, rendition string, targetView *streamplace.MultistreamDefs_TargetView) error { 20 ctx, cancel := context.WithCancel(ctx) 21 defer cancel() 22 ctx = log.WithLogValues(ctx, "mediafunc", "RTMPPush") 23 rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget) 24 if !ok { 25 return fmt.Errorf("failed to convert target view to multistream target") 26 } 27 targetURL := rec.Url 28 29 pipelineSlice := []string{ 30 "flvmux name=muxer ! rtmp2sink name=rtmp2sink", 31 "h264parse name=videoparse ! muxer.video", 32 "opusparse name=audioparse ! opusdec ! audioresample ! fdkaacenc ! muxer.audio", 33 } 34 35 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 36 if err != nil { 37 return fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 38 } 39 40 rtmp2sink, err := pipeline.GetElementByName("rtmp2sink") 41 if err != nil { 42 return fmt.Errorf("failed to get rtmp2sink element from pipeline: %w", err) 43 } 44 45 u, err := url.Parse(targetURL) 46 if err != nil { 47 return fmt.Errorf("failed to parse target URL: %w", err) 48 } 49 if u.Scheme == "rtmps" { 50 localAddr, err := mm.RunTLSFForwarder(ctx, targetURL) 51 if err != nil { 52 return fmt.Errorf("failed to run TLS forwarder: %w", err) 53 } 54 local := fmt.Sprintf("rtmp://%s%s", localAddr, u.Path) 55 log.Debug(ctx, "running TLS forwarder", "localAddr", local, "destination", targetURL) 56 err = rtmp2sink.SetProperty("location", local) 57 if err != nil { 58 return fmt.Errorf("failed to set rtmp2sink location: %w", err) 59 } 60 } else if u.Scheme == "rtmp" { 61 localAddr, err := mm.RunTCPForwarder(ctx, targetURL) 62 if err != nil { 63 return fmt.Errorf("failed to run TCP forwarder: %w", err) 64 } 65 local := fmt.Sprintf("rtmp://%s%s", localAddr, u.Path) 66 log.Debug(ctx, "running TCP forwarder", "localAddr", local, "destination", targetURL) 67 err = rtmp2sink.SetProperty("location", local) 68 if err != nil { 69 return fmt.Errorf("failed to set rtmp2sink location: %w", err) 70 } 71 } else { 72 return fmt.Errorf("invalid target URL scheme: %s", u.Scheme) 73 } 74 75 go func() { 76 pollFreq := time.Second * 1 77 for { 78 select { 79 case <-ctx.Done(): 80 return 81 case <-time.After(pollFreq): 82 prop, err := rtmp2sink.GetProperty("stats") 83 if err != nil { 84 log.Error(ctx, "error getting rtmp2sink peak-kbps", "error", err) 85 continue 86 } 87 if prop == nil { 88 log.Error(ctx, "failed to get rtmp2sink peak-kbps", "prop", prop) 89 continue 90 } 91 propVal, ok := prop.(*gst.Structure) 92 if !ok { 93 log.Error(ctx, "failed to convert rtmp2sink peak-kbps", "prop", prop) 94 continue 95 } 96 outBytesAcked, err := propVal.GetValue("out-bytes-acked") 97 if err != nil { 98 log.Error(ctx, "failed to get rtmp2sink out-bytes-acked", "error", err) 99 continue 100 } 101 outBytesAckedVal, ok := outBytesAcked.(uint64) 102 if !ok { 103 log.Error(ctx, "failed to convert rtmp2sink out-bytes-acked", "prop", prop) 104 continue 105 } 106 if outBytesAckedVal > 0 { 107 err = mm.atsync.StatefulDB.CreateMultistreamEvent(targetView.Uri, fmt.Sprintf("wrote %d bytes", outBytesAckedVal), "active") 108 if err != nil { 109 log.Error(ctx, "failed to create multistream event", "error", err) 110 } 111 // increase pollFreq, once it's working we don't need to spam the database 112 pollFreq = time.Second * 15 113 } 114 log.Debug(ctx, "rtmp2sink out-bytes-acked", "outBytesAckedVal", outBytesAckedVal) 115 } 116 117 } 118 }() 119 120 segBuffer := make(chan *bus.Seg, 1024) 121 go func() { 122 segChan := mm.bus.SubscribeSegment(ctx, user, rendition) 123 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan) 124 for { 125 select { 126 case <-ctx.Done(): 127 log.Debug(ctx, "exiting segment reader") 128 return 129 case file := <-segChan.C: 130 log.Debug(ctx, "got segment", "file", file.Filepath) 131 segBuffer <- file 132 } 133 } 134 }() 135 136 segCh := make(chan *bus.Seg) 137 go func() { 138 for { 139 select { 140 case <-ctx.Done(): 141 log.Debug(ctx, "exiting segment reader") 142 return 143 case seg := <-segBuffer: 144 select { 145 case <-ctx.Done(): 146 return 147 case segCh <- seg: 148 } 149 } 150 } 151 }() 152 153 concatBin, err := ConcatBin(ctx, segCh, true) 154 if err != nil { 155 return fmt.Errorf("failed to create concat bin: %w", err) 156 } 157 158 err = pipeline.Add(concatBin.Element) 159 if err != nil { 160 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 161 } 162 163 videoPad := concatBin.GetStaticPad("video_0") 164 if videoPad == nil { 165 return fmt.Errorf("video pad not found") 166 } 167 168 audioPad := concatBin.GetStaticPad("audio_0") 169 if audioPad == nil { 170 return fmt.Errorf("audio pad not found") 171 } 172 173 videoParse, err := pipeline.GetElementByName("videoparse") 174 if err != nil { 175 return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 176 } 177 videoParsePad := videoParse.GetStaticPad("sink") 178 if videoParsePad == nil { 179 return fmt.Errorf("video parse pad not found") 180 } 181 linked := videoPad.Link(videoParsePad) 182 if linked != gst.PadLinkOK { 183 return fmt.Errorf("failed to link video pad to video parse pad: %v", linked) 184 } 185 186 audioParse, err := pipeline.GetElementByName("audioparse") 187 if err != nil { 188 return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 189 } 190 audioParsePad := audioParse.GetStaticPad("sink") 191 if audioParsePad == nil { 192 return fmt.Errorf("audio parse pad not found") 193 } 194 linked = audioPad.Link(audioParsePad) 195 if linked != gst.PadLinkOK { 196 return fmt.Errorf("failed to link audio pad to audio parse pad: %v", linked) 197 } 198 199 errCh := make(chan error) 200 go func() { 201 err := HandleBusMessages(ctx, pipeline) 202 log.Log(ctx, "RTMP push pipeline error", "error", err) 203 errCh <- err 204 }() 205 206 err = pipeline.SetState(gst.StatePlaying) 207 if err != nil { 208 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 209 } 210 211 defer func() { 212 log.Log(ctx, "shutting down RTMP push pipeline") 213 err = pipeline.SetState(gst.StateNull) 214 if err != nil { 215 log.Error(ctx, "failed to set pipeline state to null", "error", err) 216 } 217 }() 218 219 return <-errCh 220} 221func (mm *MediaManager) RunTLSFForwarder(ctx context.Context, dest string) (string, error) { 222 destURL, err := url.Parse(dest) 223 if err != nil { 224 return "", fmt.Errorf("failed to parse destination URL: %w", err) 225 } 226 return mm.runForwarder(ctx, dest, func(destHost string) (net.Conn, error) { 227 return tls.Dial("tcp", destHost, &tls.Config{ 228 ServerName: destURL.Hostname(), 229 }) 230 }) 231} 232 233func (mm *MediaManager) RunTCPForwarder(ctx context.Context, dest string) (string, error) { 234 return mm.runForwarder(ctx, dest, func(destHost string) (net.Conn, error) { 235 return net.Dial("tcp", destHost) 236 }) 237} 238 239func (mm *MediaManager) runForwarder(ctx context.Context, dest string, dial func(destHost string) (net.Conn, error)) (string, error) { 240 ctx = log.WithLogValues(ctx, "mediafunc", "runForwarder") 241 // Parse the destination URL to extract host and port 242 destURL, err := url.Parse(dest) 243 if err != nil { 244 return "", fmt.Errorf("failed to parse destination URL: %w", err) 245 } 246 247 // Default to port 1935 if not specified 248 destHost := destURL.Host 249 if !strings.Contains(destHost, ":") { 250 destHost = destHost + ":1935" 251 } 252 253 // Listen on a random port 254 listener, err := net.Listen("tcp", "127.0.0.1:0") 255 if err != nil { 256 return "", fmt.Errorf("failed to listen on random port: %w", err) 257 } 258 259 log.Debug(ctx, "RTMP forwarder listening", "localAddr", listener.Addr().String(), "destination", dest) 260 261 go func() { 262 <-ctx.Done() 263 listener.Close() 264 }() 265 266 go func() { 267 defer listener.Close() 268 if ctx.Err() != nil { 269 return 270 } 271 // Accept incoming RTMP connection 272 clientConn, err := listener.Accept() 273 if err != nil { 274 log.Error(ctx, "failed to accept connection", "error", err) 275 return 276 } 277 278 closed := false 279 go func() { 280 <-ctx.Done() 281 if !closed { 282 closed = true 283 clientConn.Close() 284 } 285 }() 286 287 defer func() { 288 if !closed { 289 closed = true 290 clientConn.Close() 291 } 292 }() 293 294 // Establish connection to destination 295 serverConn, err := dial(destHost) 296 if err != nil { 297 log.Error(ctx, "failed to establish connection to destination", "error", err) 298 return 299 } 300 defer serverConn.Close() 301 302 // Proxy data bidirectionally 303 done := make(chan error, 2) 304 305 // Copy from client to server 306 go func() { 307 _, err := io.Copy(serverConn, clientConn) 308 done <- err 309 }() 310 311 // Copy from server to client 312 go func() { 313 _, err := io.Copy(clientConn, serverConn) 314 done <- err 315 }() 316 317 // Wait for either direction to complete or error 318 err = <-done 319 if err != nil { 320 log.Error(ctx, "proxy connection error", "error", err) 321 } 322 }() 323 324 return listener.Addr().String(), nil 325}