fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package media
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "os"
8 "strings"
9
10 "github.com/go-gst/go-gst/gst"
11 "github.com/go-gst/go-gst/gst/app"
12 "github.com/google/uuid"
13 "stream.place/streamplace/pkg/bus"
14 "stream.place/streamplace/pkg/log"
15)
16
17func readFile(ctx context.Context, source string) (*bus.Seg, error) {
18 fd, err := os.Open(source)
19 if err != nil {
20 return nil, fmt.Errorf("failed to open source file: %w", err)
21 }
22 defer fd.Close()
23 bs, err := io.ReadAll(fd)
24 if err != nil {
25 return nil, fmt.Errorf("failed to read source file: %w", err)
26 }
27 seg := &bus.Seg{
28 Filepath: source,
29 Data: bs,
30 }
31 return seg, nil
32}
33
34// This function remains in scope for the duration of a single users' playback
35func Clip(ctx context.Context, sources []string, w io.Writer) error {
36 uu, err := uuid.NewV7()
37 if err != nil {
38 return err
39 }
40 ctx = log.WithLogValues(ctx, "webrtcID", uu.String())
41 ctx = log.WithLogValues(ctx, "mediafunc", "Clip")
42 ctx, cancel := context.WithCancel(ctx)
43 defer cancel()
44
45 pipelineSlice := []string{
46 "mp4mux faststart=true name=muxer ! appsink sync=false name=mp4sink",
47 "h264parse name=videoparse ! h264timestamper ! muxer.video_0",
48 "opusparse name=audioparse ! muxer.audio_0",
49 }
50
51 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
52 if err != nil {
53 return fmt.Errorf("failed to create GStreamer pipeline: %w", err)
54 }
55
56 segCh := make(chan *bus.Seg)
57 go func() {
58 for _, source := range sources {
59 log.Log(ctx, "reading file", "source", source)
60 seg, err := readFile(ctx, source)
61 if err != nil {
62 err = fmt.Errorf("failed to read file: %w", err)
63 pipeline.Error(err.Error(), err)
64 return
65 }
66
67 segCh <- seg
68 }
69 close(segCh)
70 }()
71
72 concatBin, err := ConcatBin(ctx, segCh)
73 if err != nil {
74 return fmt.Errorf("failed to create concat bin: %w", err)
75 }
76
77 err = pipeline.Add(concatBin.Element)
78 if err != nil {
79 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
80 }
81
82 videoPad := concatBin.GetStaticPad("video_0")
83 if videoPad == nil {
84 return fmt.Errorf("video pad not found")
85 }
86
87 audioPad := concatBin.GetStaticPad("audio_0")
88 if audioPad == nil {
89 return fmt.Errorf("audio pad not found")
90 }
91
92 // Get the videoparse and audioparse elements from the pipeline
93 videoParse, err := pipeline.GetElementByName("videoparse")
94 if err != nil {
95 return fmt.Errorf("failed to get video parse element: %w", err)
96 }
97
98 audioParse, err := pipeline.GetElementByName("audioparse")
99 if err != nil {
100 return fmt.Errorf("failed to get audio parse element: %w", err)
101 }
102
103 // Link the concat bin pads to the parse element sink pads
104 linked := videoPad.Link(videoParse.GetStaticPad("sink"))
105 if linked != gst.PadLinkOK {
106 return fmt.Errorf("failed to link video pad to video parse element: %v", linked)
107 }
108
109 linked = audioPad.Link(audioParse.GetStaticPad("sink"))
110 if linked != gst.PadLinkOK {
111 return fmt.Errorf("failed to link audio pad to audio parse element: %v", linked)
112 }
113
114 // Get the mp4sink element and set up its callback
115 mp4Sink, err := pipeline.GetElementByName("mp4sink")
116 if err != nil {
117 return fmt.Errorf("failed to get mp4sink element: %w", err)
118 }
119
120 eos := make(chan struct{})
121
122 appSink := app.SinkFromElement(mp4Sink)
123 appSink.SetCallbacks(&app.SinkCallbacks{
124 NewSampleFunc: WriterNewSample(ctx, w),
125 EOSFunc: func(sink *app.Sink) {
126 close(eos)
127 },
128 })
129
130 // Start the pipeline
131 err = pipeline.SetState(gst.StatePlaying)
132 if err != nil {
133 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
134 }
135 defer func() {
136 err := pipeline.BlockSetState(gst.StateNull)
137 if err != nil {
138 log.Error(ctx, "failed to set pipeline state to null", "error", err)
139 }
140 }()
141
142 // Handle bus messages
143 err = HandleBusMessages(ctx, pipeline)
144
145 <-eos
146
147 if err != nil {
148 return fmt.Errorf("pipeline error: %w", err)
149 }
150
151 return nil
152}