fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package media
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "strings"
10 "sync"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/media/segchanman"
16)
17
18type ConcatStreamer interface {
19 SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg
20 UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg)
21}
22
23// This function remains in scope for the duration of a single users' playback
24func ConcatStream(ctx context.Context, pipeline *gst.Pipeline, user string, rendition string, streamer ConcatStreamer) (*gst.Element, <-chan struct{}, error) {
25 ctx = log.WithLogValues(ctx, "func", "ConcatStream")
26 ctx, cancel := context.WithCancel(ctx)
27 defer cancel()
28
29 // make 1000000000000 elements!
30
31 // input multiqueue
32 inputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{})
33 if err != nil {
34 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err)
35 }
36 err = pipeline.Add(inputQueue)
37 if err != nil {
38 return nil, nil, fmt.Errorf("failed to add input multiqueue to pipeline: %w", err)
39 }
40 inputQueuePadVideoSink := inputQueue.GetRequestPad("sink_%u")
41 if inputQueuePadVideoSink == nil {
42 return nil, nil, fmt.Errorf("failed to get input queue video sink pad")
43 }
44 inputQueuePadAudioSink := inputQueue.GetRequestPad("sink_%u")
45 if inputQueuePadAudioSink == nil {
46 return nil, nil, fmt.Errorf("failed to get input queue audio sink pad")
47 }
48 inputQueuePadVideoSrc := inputQueue.GetStaticPad("src_0")
49 if inputQueuePadVideoSrc == nil {
50 return nil, nil, fmt.Errorf("failed to get input queue video src pad")
51 }
52 inputQueuePadAudioSrc := inputQueue.GetStaticPad("src_1")
53 if inputQueuePadAudioSrc == nil {
54 return nil, nil, fmt.Errorf("failed to get input queue audio src pad")
55 }
56 // streamsynchronizer
57 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{})
58 if err != nil {
59 return nil, nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err)
60 }
61
62 err = pipeline.Add(streamsynchronizer)
63 if err != nil {
64 return nil, nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err)
65 }
66 syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u")
67 if syncPadVideoSink == nil {
68 return nil, nil, fmt.Errorf("failed to get sync video sink pad")
69 }
70 syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u")
71 if syncPadAudioSink == nil {
72 return nil, nil, fmt.Errorf("failed to get sync audio sink pad")
73 }
74 syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0")
75 if syncPadVideoSrc == nil {
76 return nil, nil, fmt.Errorf("failed to get sync video src pad")
77 }
78 syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1")
79 if syncPadAudioSrc == nil {
80 return nil, nil, fmt.Errorf("failed to get sync audio src pad")
81 }
82
83 // output multiqueue
84 outputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{
85 "name": "concat-output-queue",
86 })
87 if err != nil {
88 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err)
89 }
90 err = pipeline.Add(outputQueue)
91 if err != nil {
92 return nil, nil, fmt.Errorf("failed to add output multiqueue to pipeline: %w", err)
93 }
94 outputQueuePadVideoSink := outputQueue.GetRequestPad("sink_%u")
95 if outputQueuePadVideoSink == nil {
96 return nil, nil, fmt.Errorf("failed to get output queue video sink pad")
97 }
98 outputQueuePadAudioSink := outputQueue.GetRequestPad("sink_%u")
99 if outputQueuePadAudioSink == nil {
100 return nil, nil, fmt.Errorf("failed to get output queue audio sink pad")
101 }
102 // linking
103
104 // input queue to streamsynchronizer
105 ret := inputQueuePadVideoSrc.Link(syncPadVideoSink)
106 if ret != gst.PadLinkOK {
107 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret)
108 }
109 ret = inputQueuePadAudioSrc.Link(syncPadAudioSink)
110 if ret != gst.PadLinkOK {
111 return nil, nil, fmt.Errorf("failed to link multiqueue to streamsynchronizer: %v", ret)
112 }
113
114 // streamsynchronizer to output queue
115 ret = syncPadVideoSrc.Link(outputQueuePadVideoSink)
116 if ret != gst.PadLinkOK {
117 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret)
118 }
119 ret = syncPadAudioSrc.Link(outputQueuePadAudioSink)
120 if ret != gst.PadLinkOK {
121 return nil, nil, fmt.Errorf("failed to link streamsynchronizer to output queue: %v", ret)
122 }
123
124 // ok now we can start looping over input files
125
126 // this goroutine will read all the files from the segment queue and buffer
127 // them in a pipe so that we don't miss any in between iterations of the output
128 allFiles := make(chan []byte, 1024)
129 go func() {
130 ch := streamer.SubscribeSegment(ctx, user, rendition)
131 defer streamer.UnsubscribeSegment(ctx, user, rendition, ch)
132 for {
133 select {
134 case <-ctx.Done():
135 log.Debug(ctx, "exiting segment reader")
136 return
137 case file := <-ch:
138 log.Debug(ctx, "got segment", "file", file.Filepath)
139 allFiles <- file.Data
140 if len(file.Data) == 0 {
141 log.Warn(ctx, "no more segments, stopping segment reader")
142 return
143 }
144 }
145 }
146 }()
147
148 segCount := 0
149
150 // nextFile is the primary loop that pops off a file, creates new demuxer elements for it,
151 // and pushes into the pipeline
152 var nextFile func()
153 nextFile = func() {
154 mySegCount := segCount
155 segCount += 1
156 segDone := make(chan struct{})
157 log.Debug(ctx, "moving to next file", "segCount", mySegCount)
158 pr, pw := io.Pipe()
159 go func() {
160 select {
161 case <-ctx.Done():
162 pr.Close()
163 pw.Close()
164 return
165 case bs := <-allFiles:
166 if len(bs) == 0 {
167 log.Warn(ctx, "no more segments, ending stream")
168 pr.Close()
169 pw.Close()
170 cancel()
171 return
172 }
173 _, err = io.Copy(pw, bytes.NewReader(bs))
174 if err != nil {
175 log.Error(ctx, "failed to copy segment file", "error", err)
176 cancel()
177 return
178 }
179 return
180 }
181 }()
182
183 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{
184 "name": fmt.Sprintf("concat-demux-%d", mySegCount),
185 })
186 if err != nil {
187 log.Error(ctx, "failed to create demux element", "error", err)
188 cancel()
189 return
190 }
191
192 err = pipeline.Add(demux)
193 if err != nil {
194 log.Error(ctx, "failed to add demux to pipeline", "error", err)
195 cancel()
196 return
197 }
198
199 demuxSinkPad := demux.GetStaticPad("sink")
200 if demuxSinkPad == nil {
201 log.Error(ctx, "failed to get demux sink pad")
202 cancel()
203 return
204 }
205
206 mu := sync.Mutex{}
207 count := 0
208 _, err = demux.Connect("pad-added", func(self *gst.Element, pad *gst.Pad) {
209 mu.Lock()
210 count += 1
211 mu.Unlock()
212 log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection())
213 var downstreamPad *gst.Pad
214 if strings.HasPrefix(pad.GetName(), "video_") {
215 downstreamPad = inputQueuePadVideoSink
216 } else if strings.HasPrefix(pad.GetName(), "audio_") {
217 downstreamPad = inputQueuePadAudioSink
218 } else {
219 log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection())
220 cancel()
221 return
222 }
223 ret := pad.Link(downstreamPad)
224 if ret != gst.PadLinkOK {
225 log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret)
226 cancel()
227 return
228 }
229 if pad.GetDirection() == gst.PadDirectionSource {
230 pad.AddProbe(gst.PadProbeTypeEventBoth, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
231 if info.GetEvent().Type() != gst.EventTypeEOS {
232 return gst.PadProbeOK
233 }
234 log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection())
235 pad.Unlink(downstreamPad)
236 mu.Lock()
237 defer mu.Unlock()
238 count -= 1
239
240 if count == 0 {
241 // don't keep going if our context is done
242 if ctx.Err() == nil {
243 go nextFile()
244 segDone <- struct{}{}
245 }
246 } else {
247 log.Debug(ctx, "demux has more pads, waiting for them to close")
248 }
249 return gst.PadProbeRemove
250 })
251 }
252 })
253 if err != nil {
254 log.Error(ctx, "failed to connect demux pad-added", "error", err)
255 cancel()
256 return
257 }
258
259 appsrc, err := gst.NewElementWithProperties("appsrc", map[string]any{
260 "name": fmt.Sprintf("concat-appsrc-%d", mySegCount),
261 "is-live": true,
262 })
263 if err != nil {
264 log.Error(ctx, "failed to get appsrc element from pipeline", "error", err)
265 cancel()
266 return
267 }
268
269 src := app.SrcFromElement(appsrc)
270
271 appSrcPad := appsrc.GetStaticPad("src")
272 if appSrcPad == nil {
273 log.Error(ctx, "failed to get appsrc pad")
274 cancel()
275 return
276 }
277
278 done := func() {
279 // appsrc.Unlink(demux)
280 pads, err := src.GetPads()
281 if err != nil {
282 log.Error(ctx, "failed to get pads", "error", err)
283 cancel()
284 return
285 }
286 for _, pad := range pads {
287 log.Debug(ctx, "setting pad-idle", "name", pad.GetName(), "direction", pad.GetDirection())
288
289 pad.AddProbe(gst.PadProbeTypeIdle, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
290 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection())
291 src.EndStream()
292 return gst.PadProbeRemove
293 })
294 }
295 }
296
297 src.SetAutomaticEOS(false)
298 src.SetCallbacks(&app.SourceCallbacks{
299 NeedDataFunc: func(self *app.Source, length uint) {
300 bs := make([]byte, length)
301 read, err := pr.Read(bs)
302 if err != nil {
303 if errors.Is(err, io.EOF) {
304 if read > 0 {
305 log.Debug(ctx, "got data on eof???")
306 cancel()
307 return
308 }
309 log.Debug(ctx, "EOF, ending segment", "length", read)
310 done()
311 return
312 } else {
313 log.Debug(ctx, "failed to read data, ending stream", "error", err)
314 cancel()
315 return
316 }
317 }
318 toPush := bs
319 if uint(read) < length {
320 toPush = bs[:read]
321 }
322 buffer := gst.NewBufferWithSize(int64(len(toPush)))
323 buffer.Map(gst.MapWrite).WriteData(toPush)
324 defer buffer.Unmap()
325 self.PushBuffer(buffer)
326
327 if uint(read) < length {
328 log.Debug(ctx, "short write, ending segment", "length", read)
329 done()
330 }
331 },
332 })
333 err = pipeline.Add(appsrc)
334 if err != nil {
335 log.Error(ctx, "failed to add appsrc to pipeline", "error", err)
336 cancel()
337 return
338 }
339
340 ret := appSrcPad.Link(demuxSinkPad)
341 if ret != gst.PadLinkOK {
342 log.Error(ctx, "failed to link appsrc to demux", "error", ret)
343 cancel()
344 return
345 }
346
347 err = demux.SetState(gst.StatePlaying)
348 if err != nil {
349 log.Error(ctx, "failed to set demux state", "error", err)
350 cancel()
351 return
352 }
353 err = appsrc.SetState(gst.StatePlaying)
354 if err != nil {
355 log.Error(ctx, "failed to set appsrc state", "error", err)
356 cancel()
357 return
358 }
359
360 select {
361 case <-ctx.Done():
362 return
363 case <-segDone:
364 }
365
366 log.Debug(ctx, "ending segment")
367 if err := demux.SetState(gst.StateNull); err != nil {
368 log.Error(ctx, "failed to set demux state", "error", err)
369 return
370 }
371 src.SetCallbacks(&app.SourceCallbacks{})
372 if err := appsrc.SetState(gst.StateNull); err != nil {
373 log.Error(ctx, "failed to set appsrc state", "error", err)
374 return
375 }
376 if err := pipeline.Remove(demux); err != nil {
377 log.Error(ctx, "failed to remove demux from pipleine", "error", err)
378 return
379 }
380 if err := pipeline.Remove(appsrc); err != nil {
381 log.Error(ctx, "failed to remove appsrc from pipleine", "error", err)
382 return
383 }
384 pr.Close()
385 pw.Close()
386 }
387
388 // fire it up!
389 go nextFile()
390
391 return outputQueue, ctx.Done(), nil
392}