Live video on the AT Protocol
1package rtmps
2
3import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "sync"
11
12 "stream.place/streamplace/pkg/config"
13 "stream.place/streamplace/pkg/log"
14)
15
16// passthrough RTMPS TLS terminator to external RTMP server
17func ServeRTMPSAddon(ctx context.Context, cli *config.CLI) error {
18 if cli.RTMPServerAddon == "" {
19 return fmt.Errorf("RTMP server address not configured")
20 }
21
22 cert, err := tls.LoadX509KeyPair(cli.TLSCertPath, cli.TLSKeyPath)
23 if err != nil {
24 return fmt.Errorf("failed to load TLS certificate: %w", err)
25 }
26
27 tlsConfig := &tls.Config{
28 Certificates: []tls.Certificate{cert},
29 MinVersion: tls.VersionTLS12,
30 }
31
32 listener, err := tls.Listen("tcp", cli.RTMPSAddonAddr, tlsConfig)
33 if err != nil {
34 return fmt.Errorf("failed to create RTMPS listener: %w", err)
35 }
36
37 log.Log(ctx, "rtmps server starting",
38 "addr", cli.RTMPSAddonAddr,
39 "forwarding_to", cli.RTMPServerAddon)
40
41 go func() {
42 <-ctx.Done()
43 listener.Close()
44 }()
45
46 for {
47 conn, err := listener.Accept()
48 if err != nil {
49 // Check if the context was canceled, which means we're shutting down
50 select {
51 case <-ctx.Done():
52 return nil
53 default:
54 log.Error(ctx, "error accepting RTMPS connection", "error", err)
55 continue
56 }
57 }
58
59 go func(clientConn net.Conn) {
60 defer clientConn.Close()
61
62 rtmpConn, err := net.Dial("tcp", cli.RTMPServerAddon)
63 if err != nil {
64 log.Error(ctx, "failed to connect to RTMP server", "error", err)
65 return
66 }
67 defer rtmpConn.Close()
68
69 // Create a wait group to wait for both copy operations to complete
70 var wg sync.WaitGroup
71 wg.Add(2)
72
73 // Copy from client to RTMP server
74 go func() {
75 defer wg.Done()
76 _, err := io.Copy(rtmpConn, clientConn)
77 if err != nil && !errors.Is(err, io.EOF) {
78 log.Error(ctx, "error copying from client to RTMP server", "error", err)
79 }
80 // Signal the other goroutine to stop by closing the connection
81 rtmpConn.Close()
82 }()
83
84 // Copy from RTMP server to client
85 go func() {
86 defer wg.Done()
87 _, err := io.Copy(clientConn, rtmpConn)
88 if err != nil && !errors.Is(err, io.EOF) {
89 log.Error(ctx, "error copying from RTMP server to client", "error", err)
90 }
91 // Signal the other goroutine to stop by closing the connection
92 clientConn.Close()
93 }()
94
95 // Wait for both copy operations to complete
96 wg.Wait()
97 }(conn)
98 }
99}