Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "strings"
8 "time"
9
10 "github.com/go-gst/go-gst/gst"
11 "github.com/go-gst/go-gst/gst/app"
12 "stream.place/streamplace/pkg/aqtime"
13 "stream.place/streamplace/pkg/log"
14)
15
16// ingest a H264+AAC MKV stream (prolly from an RTMP server)
17func (mm *MediaManager) MKVIngest(ctx context.Context, input io.Reader, ms MediaSigner) error {
18 shouldRecord, err := mm.shouldRecord(ctx, ms.Streamer())
19 if err != nil {
20 return err
21 }
22 if shouldRecord {
23 log.Log(ctx, "recording RTMP stream to file", "streamer", ms.Streamer())
24 pr, pw := io.Pipe()
25 input = io.TeeReader(input, pw)
26 go func() {
27 err := mm.dumpToFile(ctx, pr, ms.Streamer(), ".rtmp.mkv")
28 if err != nil {
29 log.Error(ctx, "error dumping to file", "error", err)
30 }
31 }()
32 } else {
33 log.Log(ctx, "not recording RTMP stream to file", "streamer", ms.Streamer())
34 }
35 ctx, cancel := context.WithCancel(ctx)
36 defer cancel()
37 pipelineSlice := []string{
38 "appsrc name=streamsrc ! matroskademux name=demux",
39 "demux. ! queue ! h264parse name=parse",
40 "demux. ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc",
41 }
42 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
43 if err != nil {
44 return fmt.Errorf("error creating MKVIngest pipeline: %w", err)
45 }
46
47 srcele, err := pipeline.GetElementByName("streamsrc")
48 if err != nil {
49 return err
50 }
51 // defer runtime.KeepAlive(srcele)
52 src := app.SrcFromElement(srcele)
53 src.SetCallbacks(&app.SourceCallbacks{
54 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
55 })
56 parseEle, err := pipeline.GetElementByName("parse")
57 if err != nil {
58 return err
59 }
60
61 signer, err := mm.SegmentAndSignElem(ctx, ms)
62 if err != nil {
63 return err
64 }
65
66 err = pipeline.Add(signer)
67 if err != nil {
68 return err
69 }
70 err = parseEle.Link(signer)
71 if err != nil {
72 return err
73 }
74 audioenc, err := pipeline.GetElementByName("audioenc")
75 if err != nil {
76 return err
77 }
78 err = audioenc.Link(signer)
79 if err != nil {
80 return err
81 }
82
83 busErr := make(chan error)
84 go func() {
85 err := HandleBusMessages(ctx, pipeline)
86 busErr <- err
87 }()
88
89 go mm.HandleKeyRevocation(ctx, ms, pipeline)
90
91 err = pipeline.SetState(gst.StatePlaying)
92 if err != nil {
93 return err
94 }
95
96 defer func() {
97 err := pipeline.SetState(gst.StateNull)
98 if err != nil {
99 log.Error(ctx, "error setting pipeline to null state", "error", err)
100 }
101 }()
102
103 err = <-busErr
104
105 return err
106}
107
108func (mm *MediaManager) dumpToFile(ctx context.Context, r io.Reader, user string, filesuffix string) error {
109 now := aqtime.FromTime(time.Now())
110 filename := fmt.Sprintf("%s%s", now.FileSafeString(), filesuffix)
111 f, err := mm.cli.DataFileCreate([]string{"debug-recordings", user, filename}, false)
112 if err != nil {
113 return fmt.Errorf("failed to create data file: %w", err)
114 }
115 defer f.Close()
116 _, err = io.Copy(f, r)
117 if err != nil {
118 return fmt.Errorf("failed to copy to file: %w", err)
119 }
120 return nil
121}