Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/temp-error-reporting 90 lines 2.4 kB view raw
1package media 2 3import ( 4 "context" 5 "errors" 6 "io" 7 8 "github.com/go-gst/go-gst/gst" 9 "github.com/go-gst/go-gst/gst/app" 10 "stream.place/streamplace/pkg/log" 11) 12 13// ReaderNeedData is a function that reads from an io.Reader and pushes the data to a gstreamer source. 14func ReaderNeedData(ctx context.Context, input io.Reader) func(self *app.Source, length uint) { 15 bsCopy, err := io.ReadAll(input) 16 if err != nil { 17 log.Error(ctx, "error reading from input", "error", err) 18 } 19 return func(self *app.Source, length uint) { 20 if ctx.Err() != nil { 21 self.EndStream() 22 return 23 } 24 buffer := gst.NewBufferWithSize(int64(len(bsCopy))) 25 buffer.Map(gst.MapWrite).WriteData(bsCopy) 26 defer buffer.Unmap() 27 ret := self.PushBuffer(buffer) 28 if ret != gst.FlowOK { 29 log.Error(ctx, "failed to push buffer", "error", ret.String()) 30 } else { 31 log.Debug(ctx, "pushed buffer", "length", len(bsCopy)) 32 } 33 } 34} 35 36// Different from ReaderNeedData in that it reads the data in chunks and pushes them to the source. 37func ReaderNeedDataIncremental(ctx context.Context, input io.Reader) func(self *app.Source, length uint) { 38 return func(self *app.Source, length uint) { 39 if ctx.Err() != nil { 40 self.EndStream() 41 return 42 } 43 bs := make([]byte, length) 44 read, err := input.Read(bs) 45 if err != nil && !errors.Is(err, io.EOF) { 46 log.Error(ctx, "error reading from input", "error", err) 47 self.Error("error reading from input", err) 48 return 49 } 50 if read > 0 { 51 toPush := bs 52 if uint(read) < length { 53 toPush = bs[:read] 54 } 55 buffer := gst.NewBufferWithSize(int64(len(toPush))) 56 buffer.Map(gst.MapWrite).WriteData(toPush) 57 defer buffer.Unmap() 58 self.PushBuffer(buffer) 59 } 60 if err != nil && errors.Is(err, io.EOF) { 61 log.Debug(ctx, "EOF, ending stream", "length", read) 62 self.EndStream() 63 return 64 } 65 } 66} 67 68// WriterNewSample is a function that reads from a gstreamer sink and writes the data to an io.Writer. 69func WriterNewSample(ctx context.Context, output io.Writer) func(sink *app.Sink) gst.FlowReturn { 70 return func(sink *app.Sink) gst.FlowReturn { 71 sample := sink.PullSample() 72 if sample == nil { 73 return gst.FlowOK 74 } 75 76 // Retrieve the buffer from the sample. 77 buffer := sample.GetBuffer() 78 bs := buffer.Map(gst.MapRead).Bytes() 79 defer buffer.Unmap() 80 81 _, err := output.Write(bs) 82 83 if err != nil { 84 log.Error(ctx, "error writing to output", "error", err) 85 return gst.FlowError 86 } 87 88 return gst.FlowOK 89 } 90}