Live video on the AT Protocol
1package media
2
3// func (mm *MediaManager) MP4Playback(ctx context.Context, user string, rendition string, w io.Writer) error {
4// uu, err := uuid.NewV7()
5// if err != nil {
6// return err
7// }
8// ctx = log.WithLogValues(ctx, "playbackID", uu.String())
9// ctx, cancel := context.WithCancel(ctx)
10
11// ctx = log.WithLogValues(ctx, "mediafunc", "MP4Playback")
12
13// pipelineSlice := []string{
14// "mp4mux name=muxer fragment-mode=first-moov-then-finalise fragment-duration=1000 streamable=true ! appsink name=mp4sink",
15// "h264parse name=videoparse ! muxer.",
16// "opusparse name=audioparse ! muxer.",
17// }
18
19// pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
20// if err != nil {
21// return fmt.Errorf("failed to create GStreamer pipeline: %w", err)
22// }
23
24// go func() {
25// HandleBusMessages(ctx, pipeline)
26// cancel()
27// }()
28
29// outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm)
30// if err != nil {
31// return fmt.Errorf("failed to get output queue: %w", err)
32// }
33// go func() {
34// select {
35// case <-ctx.Done():
36// return
37// case <-done:
38// cancel()
39// }
40// }()
41
42// videoParse, err := pipeline.GetElementByName("videoparse")
43// if err != nil {
44// return fmt.Errorf("failed to get video sink element from pipeline: %w", err)
45// }
46// err = outputQueue.Link(videoParse)
47// if err != nil {
48// return fmt.Errorf("failed to link output queue to video parse: %w", err)
49// }
50
51// audioParse, err := pipeline.GetElementByName("audioparse")
52// if err != nil {
53// return fmt.Errorf("failed to get audio parse element from pipeline: %w", err)
54// }
55// err = outputQueue.Link(audioParse)
56// if err != nil {
57// return fmt.Errorf("failed to link output queue to audio parse: %w", err)
58// }
59
60// go func() {
61// ticker := time.NewTicker(time.Second * 1)
62// for {
63// select {
64// case <-ctx.Done():
65// return
66// case <-ticker.C:
67// state := pipeline.GetCurrentState()
68// log.Debug(ctx, "pipeline state", "state", state)
69// }
70// }
71// }()
72
73// mp4sinkele, err := pipeline.GetElementByName("mp4sink")
74// if err != nil {
75// return fmt.Errorf("failed to get video sink element from pipeline: %w", err)
76// }
77// mp4sink := app.SinkFromElement(mp4sinkele)
78// mp4sink.SetCallbacks(&app.SinkCallbacks{
79// NewSampleFunc: WriterNewSample(ctx, w),
80// EOSFunc: func(sink *app.Sink) {
81// log.Warn(ctx, "mp4sink EOSFunc")
82// cancel()
83// },
84// })
85
86// pipeline.SetState(gst.StatePlaying)
87
88// <-ctx.Done()
89
90// pipeline.BlockSetState(gst.StateNull)
91
92// return nil
93// }
94
95// func (mm *MediaManager) MKVPlayback(ctx context.Context, user string, rendition string, w io.Writer) error {
96// uu, err := uuid.NewV7()
97// if err != nil {
98// return err
99// }
100// ctx = log.WithLogValues(ctx, "playbackID", uu.String())
101// ctx, cancel := context.WithCancel(ctx)
102
103// ctx = log.WithLogValues(ctx, "mediafunc", "MKVPlayback")
104
105// pipelineSlice := []string{
106// "matroskamux name=muxer streamable=true ! appsink name=mkvsink",
107// "h264parse name=videoparse ! muxer.",
108// "opusparse name=audioparse ! muxer.",
109// }
110
111// pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
112// if err != nil {
113// return fmt.Errorf("failed to create GStreamer pipeline: %w", err)
114// }
115
116// go func() {
117// HandleBusMessages(ctx, pipeline)
118// cancel()
119// }()
120
121// outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm)
122// if err != nil {
123// return fmt.Errorf("failed to get output queue: %w", err)
124// }
125// go func() {
126// select {
127// case <-ctx.Done():
128// return
129// case <-done:
130// cancel()
131// }
132// }()
133
134// videoParse, err := pipeline.GetElementByName("videoparse")
135// if err != nil {
136// return fmt.Errorf("failed to get video sink element from pipeline: %w", err)
137// }
138// err = outputQueue.Link(videoParse)
139// if err != nil {
140// return fmt.Errorf("failed to link output queue to video parse: %w", err)
141// }
142
143// audioParse, err := pipeline.GetElementByName("audioparse")
144// if err != nil {
145// return fmt.Errorf("failed to get audio parse element from pipeline: %w", err)
146// }
147// err = outputQueue.Link(audioParse)
148// if err != nil {
149// return fmt.Errorf("failed to link output queue to audio parse: %w", err)
150// }
151
152// go func() {
153// ticker := time.NewTicker(time.Second * 1)
154// for {
155// select {
156// case <-ctx.Done():
157// return
158// case <-ticker.C:
159// state := pipeline.GetCurrentState()
160// log.Debug(ctx, "pipeline state", "state", state)
161// }
162// }
163// }()
164
165// mkvsinkele, err := pipeline.GetElementByName("mkvsink")
166// if err != nil {
167// return fmt.Errorf("failed to get video sink element from pipeline: %w", err)
168// }
169// mkvsink := app.SinkFromElement(mkvsinkele)
170// mkvsink.SetCallbacks(&app.SinkCallbacks{
171// NewSampleFunc: WriterNewSample(ctx, w),
172// EOSFunc: func(sink *app.Sink) {
173// log.Warn(ctx, "mp4sink EOSFunc")
174// cancel()
175// },
176// })
177
178// pipeline.SetState(gst.StatePlaying)
179
180// <-ctx.Done()
181
182// pipeline.BlockSetState(gst.StateNull)
183
184// return nil
185// }