Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "errors"
6 "io"
7
8 "github.com/go-gst/go-gst/gst"
9 "github.com/go-gst/go-gst/gst/app"
10 "stream.place/streamplace/pkg/log"
11)
12
13// ReaderNeedData is a function that reads from an io.Reader and pushes the data to a gstreamer source.
14func ReaderNeedData(ctx context.Context, input io.Reader) func(self *app.Source, length uint) {
15 bsCopy, err := io.ReadAll(input)
16 if err != nil {
17 log.Error(ctx, "error reading from input", "error", err)
18 }
19 return func(self *app.Source, length uint) {
20 if ctx.Err() != nil {
21 self.EndStream()
22 return
23 }
24 buffer := gst.NewBufferWithSize(int64(len(bsCopy)))
25 buffer.Map(gst.MapWrite).WriteData(bsCopy)
26 defer buffer.Unmap()
27 ret := self.PushBuffer(buffer)
28 if ret != gst.FlowOK {
29 log.Error(ctx, "failed to push buffer", "error", ret.String())
30 } else {
31 log.Debug(ctx, "pushed buffer", "length", len(bsCopy))
32 }
33 }
34}
35
36// Different from ReaderNeedData in that it reads the data in chunks and pushes them to the source.
37func ReaderNeedDataIncremental(ctx context.Context, input io.Reader) func(self *app.Source, length uint) {
38 return func(self *app.Source, length uint) {
39 if ctx.Err() != nil {
40 self.EndStream()
41 return
42 }
43 bs := make([]byte, length)
44 read, err := input.Read(bs)
45 if err != nil && !errors.Is(err, io.EOF) {
46 log.Error(ctx, "error reading from input", "error", err)
47 self.Error("error reading from input", err)
48 return
49 }
50 if read > 0 {
51 toPush := bs
52 if uint(read) < length {
53 toPush = bs[:read]
54 }
55 buffer := gst.NewBufferWithSize(int64(len(toPush)))
56 buffer.Map(gst.MapWrite).WriteData(toPush)
57 defer buffer.Unmap()
58 self.PushBuffer(buffer)
59 }
60 if err != nil && errors.Is(err, io.EOF) {
61 log.Debug(ctx, "EOF, ending stream", "length", read)
62 self.EndStream()
63 return
64 }
65 }
66}
67
68// WriterNewSample is a function that reads from a gstreamer sink and writes the data to an io.Writer.
69func WriterNewSample(ctx context.Context, output io.Writer) func(sink *app.Sink) gst.FlowReturn {
70 return func(sink *app.Sink) gst.FlowReturn {
71 sample := sink.PullSample()
72 if sample == nil {
73 return gst.FlowOK
74 }
75
76 // Retrieve the buffer from the sample.
77 buffer := sample.GetBuffer()
78 bs := buffer.Map(gst.MapRead).Bytes()
79 defer buffer.Unmap()
80
81 _, err := output.Write(bs)
82
83 if err != nil {
84 log.Error(ctx, "error writing to output", "error", err)
85 return gst.FlowError
86 }
87
88 return gst.FlowOK
89 }
90}