Live video on the AT Protocol
at eli/rtmp-push 293 lines 7.8 kB view raw
1package media 2 3import ( 4 "context" 5 "crypto/tls" 6 "fmt" 7 "io" 8 "net" 9 "net/url" 10 "strings" 11 "time" 12 13 "github.com/go-gst/go-gst/gst" 14 "github.com/google/uuid" 15 "stream.place/streamplace/pkg/bus" 16 "stream.place/streamplace/pkg/log" 17 "stream.place/streamplace/pkg/streamplace" 18) 19 20func (mm *MediaManager) RTMPPush(ctx context.Context, user string, rendition string, targetView *streamplace.MultistreamDefs_TargetView) error { 21 uu, err := uuid.NewV7() 22 if err != nil { 23 return err 24 } 25 ctx, cancel := context.WithCancel(ctx) 26 defer cancel() 27 ctx = log.WithLogValues(ctx, "pushID", uu.String()) 28 ctx = log.WithLogValues(ctx, "mediafunc", "RTMPPush") 29 rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget) 30 if !ok { 31 return fmt.Errorf("failed to convert target view to multistream target") 32 } 33 targetURL := rec.Url 34 35 pipelineSlice := []string{ 36 "flvmux name=muxer ! rtmp2sink name=rtmp2sink", 37 "h264parse name=videoparse ! muxer.video", 38 "opusparse name=audioparse ! opusdec ! fdkaacenc ! muxer.audio", 39 } 40 41 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 42 if err != nil { 43 return fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 44 } 45 46 rtmp2sink, err := pipeline.GetElementByName("rtmp2sink") 47 if err != nil { 48 return fmt.Errorf("failed to get rtmp2sink element from pipeline: %w", err) 49 } 50 51 u, err := url.Parse(targetURL) 52 if err != nil { 53 return fmt.Errorf("failed to parse target URL: %w", err) 54 } 55 if u.Scheme == "rtmps" { 56 localAddr, err := mm.RunTLSFForwarder(ctx, targetURL) 57 if err != nil { 58 return fmt.Errorf("failed to run TLS forwarder: %w", err) 59 } 60 local := fmt.Sprintf("rtmp://%s%s", localAddr, u.Path) 61 log.Debug(ctx, "running TLS forwarder", "localAddr", local, "destination", targetURL) 62 err = rtmp2sink.SetProperty("location", local) 63 if err != nil { 64 return fmt.Errorf("failed to set rtmp2sink location: %w", err) 65 } 66 } else if u.Scheme == "rtmp" { 67 err = rtmp2sink.SetProperty("location", targetURL) 68 if err != nil { 69 return fmt.Errorf("failed to set rtmp2sink location: %w", err) 70 } 71 } else { 72 return fmt.Errorf("invalid target URL scheme: %s", u.Scheme) 73 } 74 75 go func() { 76 pollFreq := time.Second * 1 77 for { 78 select { 79 case <-ctx.Done(): 80 return 81 case <-time.After(pollFreq): 82 prop, err := rtmp2sink.GetProperty("stats") 83 if err != nil { 84 log.Error(ctx, "error getting rtmp2sink peak-kbps", "error", err) 85 continue 86 } 87 if prop == nil { 88 log.Error(ctx, "failed to get rtmp2sink peak-kbps", "prop", prop) 89 continue 90 } 91 propVal, ok := prop.(*gst.Structure) 92 if !ok { 93 log.Error(ctx, "failed to convert rtmp2sink peak-kbps", "prop", prop) 94 continue 95 } 96 outBytesAcked, err := propVal.GetValue("out-bytes-acked") 97 if err != nil { 98 log.Error(ctx, "failed to get rtmp2sink out-bytes-acked", "error", err) 99 continue 100 } 101 outBytesAckedVal, ok := outBytesAcked.(uint64) 102 if !ok { 103 log.Error(ctx, "failed to convert rtmp2sink out-bytes-acked", "prop", prop) 104 continue 105 } 106 if outBytesAckedVal > 0 { 107 err = mm.atsync.StatefulDB.CreateMultistreamEvent(targetView.Uri, fmt.Sprintf("wrote %d bytes", outBytesAckedVal), "active") 108 if err != nil { 109 log.Error(ctx, "failed to create multistream event", "error", err) 110 } 111 // increase pollFreq, once it's working we don't need to spam the database 112 pollFreq = time.Second * 15 113 } 114 log.Debug(ctx, "rtmp2sink out-bytes-acked", "outBytesAckedVal", outBytesAckedVal) 115 } 116 117 } 118 }() 119 120 segBuffer := make(chan *bus.Seg, 1024) 121 go func() { 122 segChan := mm.bus.SubscribeSegment(ctx, user, rendition) 123 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan) 124 for { 125 select { 126 case <-ctx.Done(): 127 log.Debug(ctx, "exiting segment reader") 128 return 129 case file := <-segChan.C: 130 log.Debug(ctx, "got segment", "file", file.Filepath) 131 segBuffer <- file 132 } 133 } 134 }() 135 136 segCh := make(chan *bus.Seg) 137 go func() { 138 for { 139 select { 140 case <-ctx.Done(): 141 log.Debug(ctx, "exiting segment reader") 142 return 143 case seg := <-segBuffer: 144 select { 145 case <-ctx.Done(): 146 return 147 case segCh <- seg: 148 } 149 } 150 } 151 }() 152 153 concatBin, err := ConcatBin(ctx, segCh, true) 154 if err != nil { 155 return fmt.Errorf("failed to create concat bin: %w", err) 156 } 157 158 err = pipeline.Add(concatBin.Element) 159 if err != nil { 160 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 161 } 162 163 videoPad := concatBin.GetStaticPad("video_0") 164 if videoPad == nil { 165 return fmt.Errorf("video pad not found") 166 } 167 168 audioPad := concatBin.GetStaticPad("audio_0") 169 if audioPad == nil { 170 return fmt.Errorf("audio pad not found") 171 } 172 173 videoParse, err := pipeline.GetElementByName("videoparse") 174 if err != nil { 175 return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 176 } 177 videoParsePad := videoParse.GetStaticPad("sink") 178 if videoParsePad == nil { 179 return fmt.Errorf("video parse pad not found") 180 } 181 linked := videoPad.Link(videoParsePad) 182 if linked != gst.PadLinkOK { 183 return fmt.Errorf("failed to link video pad to video parse pad: %v", linked) 184 } 185 186 audioParse, err := pipeline.GetElementByName("audioparse") 187 if err != nil { 188 return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 189 } 190 audioParsePad := audioParse.GetStaticPad("sink") 191 if audioParsePad == nil { 192 return fmt.Errorf("audio parse pad not found") 193 } 194 linked = audioPad.Link(audioParsePad) 195 if linked != gst.PadLinkOK { 196 return fmt.Errorf("failed to link audio pad to audio parse pad: %v", linked) 197 } 198 199 errCh := make(chan error) 200 go func() { 201 err := HandleBusMessages(ctx, pipeline) 202 errCh <- err 203 }() 204 205 err = pipeline.SetState(gst.StatePlaying) 206 if err != nil { 207 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 208 } 209 210 defer func() { 211 err = pipeline.SetState(gst.StateNull) 212 if err != nil { 213 log.Error(ctx, "failed to set pipeline state to null", "error", err) 214 } 215 }() 216 217 return <-errCh 218} 219func (mm *MediaManager) RunTLSFForwarder(ctx context.Context, dest string) (string, error) { 220 // Parse the destination URL to extract host and port 221 destURL, err := url.Parse(dest) 222 if err != nil { 223 return "", fmt.Errorf("failed to parse destination URL: %w", err) 224 } 225 226 // Default to port 1935 if not specified 227 destHost := destURL.Host 228 if !strings.Contains(destHost, ":") { 229 destHost = destHost + ":1935" 230 } 231 232 // Listen on a random port 233 listener, err := net.Listen("tcp", "127.0.0.1:0") 234 if err != nil { 235 return "", fmt.Errorf("failed to listen on random port: %w", err) 236 } 237 238 log.Debug(ctx, "RTMP to RTMPS forwarder listening", "localAddr", listener.Addr().String(), "destination", dest) 239 240 go func() { 241 <-ctx.Done() 242 listener.Close() 243 }() 244 245 go func() { 246 defer listener.Close() 247 if ctx.Err() != nil { 248 return 249 } 250 // Accept incoming RTMP connection 251 clientConn, err := listener.Accept() 252 if err != nil { 253 log.Error(ctx, "failed to accept connection", "error", err) 254 return 255 } 256 257 // Only one connection so we don't need a goroutine 258 defer clientConn.Close() 259 260 // Establish TLS connection to destination 261 tlsConn, err := tls.Dial("tcp", destHost, &tls.Config{ 262 ServerName: destURL.Hostname(), 263 }) 264 if err != nil { 265 log.Error(ctx, "failed to establish TLS connection to destination", "error", err) 266 return 267 } 268 defer tlsConn.Close() 269 270 // Proxy data bidirectionally 271 done := make(chan error, 2) 272 273 // Copy from client to server 274 go func() { 275 _, err := io.Copy(tlsConn, clientConn) 276 done <- err 277 }() 278 279 // Copy from server to client 280 go func() { 281 _, err := io.Copy(clientConn, tlsConn) 282 done <- err 283 }() 284 285 // Wait for either direction to complete or error 286 err = <-done 287 if err != nil { 288 log.Error(ctx, "proxy connection error", "error", err) 289 } 290 }() 291 292 return listener.Addr().String(), nil 293}