Live video on the AT Protocol

media: add SignMP4 tracing, fix channel leak (#126)

* tracing: add a lot more instrumentation around SignMP4 tracing

* tracing: track signing duration for external signing too

* segchanman: time out segment send

* concat: okay yeah i see why this was a leak now

* webrtc_playback: actually unsubscribe here too

authored by Eli Mallon and committed by GitHub c8fb7f3c a74a4842

+60 -24
+1 -6
go.mod
··· 49 49 gitlab.com/gitlab-org/release-cli v0.18.0 50 50 go.opentelemetry.io/otel v1.35.0 51 51 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0 52 - go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.11.0 53 - go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0 54 - go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0 55 - go.opentelemetry.io/otel/log v0.11.0 56 52 go.opentelemetry.io/otel/sdk v1.35.0 57 - go.opentelemetry.io/otel/sdk/log v0.11.0 58 - go.opentelemetry.io/otel/sdk/metric v1.35.0 59 53 go.uber.org/goleak v1.3.0 60 54 golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 61 55 golang.org/x/image v0.22.0 ··· 244 238 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect 245 239 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect 246 240 go.opentelemetry.io/otel/metric v1.35.0 // indirect 241 + go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect 247 242 go.opentelemetry.io/otel/trace v1.35.0 // indirect 248 243 go.opentelemetry.io/proto/otlp v1.5.0 // indirect 249 244 go.uber.org/atomic v1.11.0 // indirect
-10
go.sum
··· 657 657 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4= 658 658 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0 h1:m639+BofXTvcY1q8CGs4ItwQarYtJPOWmVobfM1HpVI= 659 659 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0/go.mod h1:LjReUci/F4BUyv+y4dwnq3h/26iNOeC3wAIqgvTIZVo= 660 - go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.11.0 h1:k6KdfZk72tVW/QVZf60xlDziDvYAePj5QHwoQvrB2m8= 661 - go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.11.0/go.mod h1:5Y3ZJLqzi/x/kYtrSrPSx7TFI/SGsL7q2kME027tH6I= 662 - go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0 h1:PB3Zrjs1sG1GBX51SXyTSoOTqcDglmsk7nT6tkKPb/k= 663 - go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0/go.mod h1:U2R3XyVPzn0WX7wOIypPuptulsMcPDPs/oiSVOMVnHY= 664 - go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0 h1:T0Ec2E+3YZf5bgTNQVet8iTDW7oIk03tXHq+wkwIDnE= 665 - go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0/go.mod h1:30v2gqH+vYGJsesLWFov8u47EpYTcIQcBjKpI6pJThg= 666 - go.opentelemetry.io/otel/log v0.11.0 h1:c24Hrlk5WJ8JWcwbQxdBqxZdOK7PcP/LFtOtwpDTe3Y= 667 - go.opentelemetry.io/otel/log v0.11.0/go.mod h1:U/sxQ83FPmT29trrifhQg+Zj2lo1/IPN1PF6RTFqdwc= 668 660 go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= 669 661 go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= 670 662 go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= 671 663 go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= 672 - go.opentelemetry.io/otel/sdk/log v0.11.0 h1:7bAOpjpGglWhdEzP8z0VXc4jObOiDEwr3IYbhBnjk2c= 673 - go.opentelemetry.io/otel/sdk/log v0.11.0/go.mod h1:dndLTxZbwBstZoqsJB3kGsRPkpAgaJrWfQg3lhlHFFY= 674 664 go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= 675 665 go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= 676 666 go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
+2 -2
pkg/media/concat.go
··· 126 126 // them in a pipe so that we don't miss any in between iterations of the output 127 127 allFiles := make(chan []byte, 1024) 128 128 go func() { 129 + ch := streamer.SubscribeSegment(ctx, user, rendition) 130 + defer streamer.UnsubscribeSegment(ctx, user, rendition, ch) 129 131 for { 130 - ch := streamer.SubscribeSegment(ctx, user, rendition) 131 132 select { 132 133 case <-ctx.Done(): 133 134 log.Debug(ctx, "exiting segment reader") 134 - streamer.UnsubscribeSegment(ctx, user, rendition, ch) 135 135 return 136 136 case file := <-ch: 137 137 log.Debug(ctx, "got segment", "file", file.Filepath)
+22
pkg/media/media_signer.go
··· 9 9 "fmt" 10 10 "io" 11 11 "path/filepath" 12 + "time" 12 13 13 14 "git.stream.place/streamplace/c2pa-go/pkg/c2pa" 14 15 "go.opentelemetry.io/otel" ··· 18 19 "stream.place/streamplace/pkg/crypto/aqpub" 19 20 "stream.place/streamplace/pkg/crypto/signers" 20 21 "stream.place/streamplace/pkg/log" 22 + "stream.place/streamplace/pkg/spmetrics" 21 23 ) 22 24 23 25 type MediaSigner interface { 24 26 SignMP4(ctx context.Context, input io.ReadSeeker, start int64) ([]byte, error) 25 27 Pub() aqpub.Pub 28 + Streamer() string 26 29 } 27 30 28 31 type MediaSignerLocal struct { ··· 80 83 }, nil 81 84 } 82 85 86 + func (ms *MediaSignerLocal) Streamer() string { 87 + return ms.StreamerName 88 + } 89 + 83 90 func (ms *MediaSignerLocal) SignMP4(ctx context.Context, input io.ReadSeeker, start int64) ([]byte, error) { 91 + startTime := time.Now() 84 92 ctx, span := otel.Tracer("signer").Start(ctx, "SignMP4") 85 93 defer span.End() 86 94 title := "livestream" ··· 109 117 }, 110 118 }, 111 119 } 120 + ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_MarshalManifest") 112 121 manifestBs, err := json.Marshal(mani) 113 122 if err != nil { 114 123 return nil, fmt.Errorf("failed to marshal manifest: %w", err) ··· 118 127 if err != nil { 119 128 return nil, fmt.Errorf("failed to unmarshal manifest: %w", err) 120 129 } 130 + span.End() 131 + 132 + ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_GetSigningAlgorithm") 121 133 alg, err := c2pa.GetSigningAlgorithm(string(c2pa.ES256K)) 122 134 if err != nil { 123 135 return nil, fmt.Errorf("failed to get signing algorithm: %w", err) 124 136 } 137 + span.End() 138 + 139 + ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_NewBuilder") 125 140 b, err := c2pa.NewBuilder(&manifest, &c2pa.BuilderParams{ 126 141 Cert: ms.Cert, 127 142 Signer: ms.Signer, ··· 131 146 if err != nil { 132 147 return nil, fmt.Errorf("failed to create C2PA builder: %w", err) 133 148 } 149 + span.End() 134 150 151 + ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_Sign") 135 152 output := &aqio.ReadWriteSeeker{} 136 153 err = b.Sign(input, output, "video/mp4") 137 154 if err != nil { 138 155 return nil, fmt.Errorf("failed to sign MP4: %w", err) 139 156 } 157 + span.End() 158 + 159 + ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_OutputBytes") 140 160 bs, err := output.Bytes() 141 161 if err != nil { 142 162 return nil, fmt.Errorf("failed to get output bytes: %w", err) 143 163 } 164 + span.End() 165 + spmetrics.SigningDuration.WithLabelValues(ms.StreamerName).Observe(float64(time.Since(startTime).Milliseconds())) 144 166 return bs, nil 145 167 } 146 168
+11 -1
pkg/media/media_signer_ext.go
··· 9 9 "io" 10 10 "os" 11 11 "os/exec" 12 + "time" 12 13 13 14 "github.com/decred/dcrd/dcrec/secp256k1" 14 15 "github.com/mr-tron/base58" 16 + "go.opentelemetry.io/otel" 15 17 "stream.place/streamplace/pkg/config" 16 18 "stream.place/streamplace/pkg/crypto/aqpub" 19 + "stream.place/streamplace/pkg/spmetrics" 17 20 ) 18 21 19 22 type MediaSignerExt struct { ··· 52 55 } 53 56 54 57 func (ms *MediaSignerExt) SignMP4(ctx context.Context, input io.ReadSeeker, start int64) ([]byte, error) { 58 + startTime := time.Now() 59 + ctx, span := otel.Tracer("signer").Start(ctx, "SignMP4_Ext") 60 + defer span.End() 55 61 // Get the path to the current executable 56 62 execPath, err := os.Executable() 57 63 if err != nil { ··· 98 104 if err := cmd.Wait(); err != nil { 99 105 return nil, fmt.Errorf("command failed: %w, stderr: %s", err, stderr.String()) 100 106 } 101 - 107 + spmetrics.SigningDuration.WithLabelValues(ms.streamer).Observe(float64(time.Since(startTime).Milliseconds())) 102 108 return stdout.Bytes(), nil 103 109 } 104 110 105 111 func (ms *MediaSignerExt) Pub() aqpub.Pub { 106 112 return ms.pub 107 113 } 114 + 115 + func (ms *MediaSignerExt) Streamer() string { 116 + return ms.streamer 117 + }
+11 -2
pkg/media/segchanman/segchanman.go
··· 4 4 "context" 5 5 "fmt" 6 6 "sync" 7 + "time" 7 8 8 9 "go.opentelemetry.io/otel" 10 + "stream.place/streamplace/pkg/log" 9 11 ) 10 12 11 13 // it's a segment channel manager, you see ··· 39 41 chs = []chan *Seg{} 40 42 s.segChans[key] = chs 41 43 } 42 - ch := make(chan *Seg, 1024) 44 + ch := make(chan *Seg) 43 45 chs = append(chs, ch) 44 46 s.segChans[key] = chs 45 47 return ch ··· 74 76 } 75 77 for _, ch := range chs { 76 78 go func(ch chan *Seg) { 77 - ch <- seg 79 + select { 80 + case ch <- seg: 81 + case <-ctx.Done(): 82 + return 83 + case <-time.After(1 * time.Minute): 84 + log.Warn(ctx, "failed to send segment to channel, timing out", "user", user, "rendition", rendition) 85 + } 86 + 78 87 }(ch) 79 88 } 80 89 }
+5 -1
pkg/media/segmenter.go
··· 9 9 "github.com/go-gst/go-gst/gst" 10 10 "github.com/go-gst/go-gst/gst/app" 11 11 "go.opentelemetry.io/otel" 12 + "go.opentelemetry.io/otel/attribute" 13 + "go.opentelemetry.io/otel/trace" 12 14 "stream.place/streamplace/pkg/log" 13 15 ) 14 16 ··· 62 64 appsink.SetCallbacks(&app.SinkCallbacks{ 63 65 NewSampleFunc: WriterNewSample(ctx, buf), 64 66 EOSFunc: func(sink *app.Sink) { 65 - ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem") 67 + ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes( 68 + attribute.String("streamer", ms.Streamer()), 69 + )) 66 70 defer span.End() 67 71 resetTimer <- struct{}{} 68 72 now := time.Now().UnixMilli()
+2 -2
pkg/media/webrtc_playback.go
··· 49 49 50 50 segBuffer := make(chan *segchanman.Seg, 1024) 51 51 go func() { 52 + ch := mm.SubscribeSegment(ctx, user, rendition) 53 + defer mm.UnsubscribeSegment(ctx, user, rendition, ch) 52 54 for { 53 - ch := mm.SubscribeSegment(ctx, user, rendition) 54 55 select { 55 56 case <-ctx.Done(): 56 57 log.Debug(ctx, "exiting segment reader") 57 - mm.UnsubscribeSegment(ctx, user, rendition, ch) 58 58 return 59 59 case file := <-ch: 60 60 log.Debug(ctx, "got segment", "file", file.Filepath)
+6
pkg/spmetrics/spmetrics.go
··· 48 48 Buckets: []float64{0, 250, 500, 750, 1000, 1250, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 10000}, 49 49 }, []string{"streamer"}) 50 50 51 + var SigningDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 52 + Name: "streamplace_signing_duration_ms", 53 + Help: "duration of transcode in ms", 54 + Buckets: []float64{0, 250, 500, 750, 1000, 1250, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 10000, 20000, 30000, 60000}, 55 + }, []string{"streamer"}) 56 + 51 57 var QueuedTranscodeDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{ 52 58 Name: "streamplace_queued_transcode_duration_ms", 53 59 Help: "duration of transcode in ms, including time spent waiting",