Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

multistreaming: proxy RTMP in addition to RTMPS

+47 -12
+47 -12
pkg/media/rtmp_push.go
··· 58 58 return fmt.Errorf("failed to set rtmp2sink location: %w", err) 59 59 } 60 60 } else if u.Scheme == "rtmp" { 61 - err = rtmp2sink.SetProperty("location", targetURL) 61 + localAddr, err := mm.RunTCPForwarder(ctx, targetURL) 62 + if err != nil { 63 + return fmt.Errorf("failed to run TCP forwarder: %w", err) 64 + } 65 + local := fmt.Sprintf("rtmp://%s%s", localAddr, u.Path) 66 + log.Debug(ctx, "running TCP forwarder", "localAddr", local, "destination", targetURL) 67 + err = rtmp2sink.SetProperty("location", local) 62 68 if err != nil { 63 69 return fmt.Errorf("failed to set rtmp2sink location: %w", err) 64 70 } ··· 213 219 return <-errCh 214 220 } 215 221 func (mm *MediaManager) RunTLSFForwarder(ctx context.Context, dest string) (string, error) { 222 + destURL, err := url.Parse(dest) 223 + if err != nil { 224 + return "", fmt.Errorf("failed to parse destination URL: %w", err) 225 + } 226 + return mm.runForwarder(ctx, dest, func(destHost string) (net.Conn, error) { 227 + return tls.Dial("tcp", destHost, &tls.Config{ 228 + ServerName: destURL.Hostname(), 229 + }) 230 + }) 231 + } 232 + 233 + func (mm *MediaManager) RunTCPForwarder(ctx context.Context, dest string) (string, error) { 234 + return mm.runForwarder(ctx, dest, func(destHost string) (net.Conn, error) { 235 + return net.Dial("tcp", destHost) 236 + }) 237 + } 238 + 239 + func (mm *MediaManager) runForwarder(ctx context.Context, dest string, dial func(destHost string) (net.Conn, error)) (string, error) { 216 240 // Parse the destination URL to extract host and port 217 241 destURL, err := url.Parse(dest) 218 242 if err != nil { ··· 231 255 return "", fmt.Errorf("failed to listen on random port: %w", err) 232 256 } 233 257 234 - log.Debug(ctx, "RTMP to RTMPS forwarder listening", "localAddr", listener.Addr().String(), "destination", dest) 258 + log.Debug(ctx, "RTMP forwarder listening", "localAddr", listener.Addr().String(), "destination", dest) 235 259 236 260 go func() { 237 261 <-ctx.Done() ··· 250 274 return 251 275 } 252 276 253 - // Only one connection so we don't need a goroutine 254 - defer clientConn.Close() 277 + closed := false 278 + go func() { 279 + <-ctx.Done() 280 + if !closed { 281 + closed = true 282 + clientConn.Close() 283 + } 284 + }() 285 + 286 + defer func() { 287 + if !closed { 288 + closed = true 289 + clientConn.Close() 290 + } 291 + }() 255 292 256 - // Establish TLS connection to destination 257 - tlsConn, err := tls.Dial("tcp", destHost, &tls.Config{ 258 - ServerName: destURL.Hostname(), 259 - }) 293 + // Establish connection to destination 294 + serverConn, err := dial(destHost) 260 295 if err != nil { 261 - log.Error(ctx, "failed to establish TLS connection to destination", "error", err) 296 + log.Error(ctx, "failed to establish connection to destination", "error", err) 262 297 return 263 298 } 264 - defer tlsConn.Close() 299 + defer serverConn.Close() 265 300 266 301 // Proxy data bidirectionally 267 302 done := make(chan error, 2) 268 303 269 304 // Copy from client to server 270 305 go func() { 271 - _, err := io.Copy(tlsConn, clientConn) 306 + _, err := io.Copy(serverConn, clientConn) 272 307 done <- err 273 308 }() 274 309 275 310 // Copy from server to client 276 311 go func() { 277 - _, err := io.Copy(clientConn, tlsConn) 312 + _, err := io.Copy(clientConn, serverConn) 278 313 done <- err 279 314 }() 280 315