Live video on the AT Protocol
at next 99 lines 2.4 kB view raw
1package rtmps 2 3import ( 4 "context" 5 "crypto/tls" 6 "errors" 7 "fmt" 8 "io" 9 "net" 10 "sync" 11 12 "stream.place/streamplace/pkg/config" 13 "stream.place/streamplace/pkg/log" 14) 15 16// passthrough RTMPS TLS terminator to external RTMP server 17func ServeRTMPSAddon(ctx context.Context, cli *config.CLI) error { 18 if cli.RTMPServerAddon == "" { 19 return fmt.Errorf("RTMP server address not configured") 20 } 21 22 cert, err := tls.LoadX509KeyPair(cli.TLSCertPath, cli.TLSKeyPath) 23 if err != nil { 24 return fmt.Errorf("failed to load TLS certificate: %w", err) 25 } 26 27 tlsConfig := &tls.Config{ 28 Certificates: []tls.Certificate{cert}, 29 MinVersion: tls.VersionTLS12, 30 } 31 32 listener, err := tls.Listen("tcp", cli.RTMPSAddonAddr, tlsConfig) 33 if err != nil { 34 return fmt.Errorf("failed to create RTMPS listener: %w", err) 35 } 36 37 log.Log(ctx, "rtmps server starting", 38 "addr", cli.RTMPSAddonAddr, 39 "forwarding_to", cli.RTMPServerAddon) 40 41 go func() { 42 <-ctx.Done() 43 listener.Close() 44 }() 45 46 for { 47 conn, err := listener.Accept() 48 if err != nil { 49 // Check if the context was canceled, which means we're shutting down 50 select { 51 case <-ctx.Done(): 52 return nil 53 default: 54 log.Error(ctx, "error accepting RTMPS connection", "error", err) 55 continue 56 } 57 } 58 59 go func(clientConn net.Conn) { 60 defer clientConn.Close() 61 62 rtmpConn, err := net.Dial("tcp", cli.RTMPServerAddon) 63 if err != nil { 64 log.Error(ctx, "failed to connect to RTMP server", "error", err) 65 return 66 } 67 defer rtmpConn.Close() 68 69 // Create a wait group to wait for both copy operations to complete 70 var wg sync.WaitGroup 71 wg.Add(2) 72 73 // Copy from client to RTMP server 74 go func() { 75 defer wg.Done() 76 _, err := io.Copy(rtmpConn, clientConn) 77 if err != nil && !errors.Is(err, io.EOF) { 78 log.Error(ctx, "error copying from client to RTMP server", "error", err) 79 } 80 // Signal the other goroutine to stop by closing the connection 81 rtmpConn.Close() 82 }() 83 84 // Copy from RTMP server to client 85 go func() { 86 defer wg.Done() 87 _, err := io.Copy(clientConn, rtmpConn) 88 if err != nil && !errors.Is(err, io.EOF) { 89 log.Error(ctx, "error copying from RTMP server to client", "error", err) 90 } 91 // Signal the other goroutine to stop by closing the connection 92 clientConn.Close() 93 }() 94 95 // Wait for both copy operations to complete 96 wg.Wait() 97 }(conn) 98 } 99}