Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

rename clip to segment_combine

+137 -139
-124
pkg/media/clip.go
··· 1 - package media 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "io" 7 - "strings" 8 - 9 - "github.com/go-gst/go-gst/gst" 10 - "github.com/go-gst/go-gst/gst/app" 11 - "stream.place/streamplace/pkg/bus" 12 - "stream.place/streamplace/pkg/log" 13 - ) 14 - 15 - func CombineSegmentsUnsigned(ctx context.Context, sources []io.ReadSeeker, w io.Writer) error { 16 - ctx = log.WithLogValues(ctx, "mediafunc", "CombineSegmentsUnsigned") 17 - ctx, cancel := context.WithCancel(ctx) 18 - defer cancel() 19 - 20 - pipelineSlice := []string{ 21 - fmt.Sprintf("mp4mux name=muxer faststart=true interleave-bytes=%d interleave-time=%d movie-timescale=60000 trak-timescale=60000 ! appsink sync=false name=mp4sink", InterleaveBytes, InterleaveTime), 22 - "capsfilter caps=video/x-h264,parsed=true name=videoqueue ! queue ! muxer.", 23 - "capsfilter caps=audio/x-opus,framed=true name=audioparse ! queue ! muxer.", 24 - } 25 - 26 - pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 27 - if err != nil { 28 - return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 29 - } 30 - 31 - segCh := make(chan *bus.Seg) 32 - go func() { 33 - for _, source := range sources { 34 - bs, err := io.ReadAll(source) 35 - if err != nil { 36 - err = fmt.Errorf("failed to read file: %w", err) 37 - pipeline.Error(err.Error(), err) 38 - return 39 - } 40 - segCh <- &bus.Seg{ 41 - Filepath: "ignored", 42 - Data: bs, 43 - } 44 - } 45 - close(segCh) 46 - }() 47 - 48 - concatBin, err := ConcatBin(ctx, segCh) 49 - if err != nil { 50 - return fmt.Errorf("failed to create concat bin: %w", err) 51 - } 52 - 53 - err = pipeline.Add(concatBin.Element) 54 - if err != nil { 55 - return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 56 - } 57 - 58 - videoPad := concatBin.GetStaticPad("video_0") 59 - if videoPad == nil { 60 - return fmt.Errorf("video pad not found") 61 - } 62 - 63 - audioPad := concatBin.GetStaticPad("audio_0") 64 - if audioPad == nil { 65 - return fmt.Errorf("audio pad not found") 66 - } 67 - 68 - // Get the videoparse and audioparse elements from the pipeline 69 - videoQueue, err := pipeline.GetElementByName("videoqueue") 70 - if err != nil { 71 - return fmt.Errorf("failed to get video parse element: %w", err) 72 - } 73 - 74 - audioParse, err := pipeline.GetElementByName("audioparse") 75 - if err != nil { 76 - return fmt.Errorf("failed to get audio parse element: %w", err) 77 - } 78 - 79 - // Link the concat bin pads to the parse element sink pads 80 - linked := videoPad.Link(videoQueue.GetStaticPad("sink")) 81 - if linked != gst.PadLinkOK { 82 - return fmt.Errorf("failed to link video pad to video parse element: %v", linked) 83 - } 84 - 85 - linked = audioPad.Link(audioParse.GetStaticPad("sink")) 86 - if linked != gst.PadLinkOK { 87 - return fmt.Errorf("failed to link audio pad to audio parse element: %v", linked) 88 - } 89 - 90 - // Get the mp4sink element and set up its callback 91 - mp4Sink, err := pipeline.GetElementByName("mp4sink") 92 - if err != nil { 93 - return fmt.Errorf("failed to get mp4sink element: %w", err) 94 - } 95 - 96 - appSink := app.SinkFromElement(mp4Sink) 97 - appSink.SetCallbacks(&app.SinkCallbacks{ 98 - NewSampleFunc: WriterNewSample(ctx, w), 99 - }) 100 - 101 - // Start the pipeline 102 - err = pipeline.SetState(gst.StatePlaying) 103 - if err != nil { 104 - return fmt.Errorf("failed to set pipeline state to playing: %w", err) 105 - } 106 - defer func() { 107 - err := pipeline.BlockSetState(gst.StateNull) 108 - if err != nil { 109 - log.Error(ctx, "failed to set pipeline state to null", "error", err) 110 - } 111 - }() 112 - 113 - // Handle bus messages 114 - err = HandleBusMessages(ctx, pipeline) 115 - if err != nil { 116 - return fmt.Errorf("failed to handle bus messages: %w", err) 117 - } 118 - 119 - if err != nil { 120 - return fmt.Errorf("pipeline error: %w", err) 121 - } 122 - 123 - return nil 124 - }
pkg/media/clip_test.go pkg/media/segment_combine_test.go
+20 -15
pkg/media/rtcrec_test.go
··· 16 16 ) 17 17 18 18 var RTCRecTestCases = []struct { 19 + name string 19 20 fatalErrors bool 20 21 fixture string 21 22 }{ 22 23 { 24 + name: "IntermittentTracks", 23 25 fatalErrors: false, 24 26 fixture: getFixture("intermittent-tracks.cbor"), 25 27 }, 26 28 { 29 + name: "SegmentConvergenceIssues", 27 30 fatalErrors: true, 28 31 fixture: remote.RemoteFixture("6a1fb84e3c23405fc53161f59d5b837839c4889fc1a96533c82fb44fafc51d27/2025-11-14T22-41-20-399Z.cbor"), 29 32 }, ··· 55 58 // ctx := context.Background() 56 59 // mm, ms := getStaticTestMediaManager(t) 57 60 for _, testCase := range RTCRecTestCases { 58 - FatalSegmentationErrors = testCase.fatalErrors 59 - fd, err := os.Open(testCase.fixture) 60 - require.NoError(t, err) 61 - defer fd.Close() 62 - pc, err := rtcrec.NewReplayPeerConnection(ctx, fd) 63 - require.NoError(t, err) 64 - done := make(chan error) 65 - _, err = mm.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, done) 66 - require.NoError(t, err) 67 - // fmt.Println(answer.SDP) 68 - pipelineError := <-done 69 - require.NoError(t, pipelineError) 70 - for _, err := range globalerror.GlobalErrors { 71 - fmt.Printf("got error, non-fatal for now: %v\n", err) 72 - } 61 + t.Run(testCase.name, func(t *testing.T) { 62 + FatalSegmentationErrors = testCase.fatalErrors 63 + fd, err := os.Open(testCase.fixture) 64 + require.NoError(t, err) 65 + defer fd.Close() 66 + pc, err := rtcrec.NewReplayPeerConnection(ctx, fd) 67 + require.NoError(t, err) 68 + done := make(chan error) 69 + _, err = mm.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, done) 70 + require.NoError(t, err) 71 + // fmt.Println(answer.SDP) 72 + pipelineError := <-done 73 + require.NoError(t, pipelineError) 74 + for _, err := range globalerror.GlobalErrors { 75 + fmt.Printf("got error, non-fatal for now: %v\n", err) 76 + } 77 + }) 73 78 } 74 79 }) 75 80 }
+117
pkg/media/segment_combine.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "fmt" 6 7 "io" 8 + "strings" 7 9 10 + "github.com/go-gst/go-gst/gst" 11 + "github.com/go-gst/go-gst/gst/app" 8 12 "stream.place/streamplace/pkg/aqio" 13 + "stream.place/streamplace/pkg/bus" 14 + "stream.place/streamplace/pkg/log" 9 15 ) 10 16 11 17 // CombineSegments combines a list of segments into a single segment that maintains all of the manifests ··· 32 38 } 33 39 return nil 34 40 } 41 + 42 + func CombineSegmentsUnsigned(ctx context.Context, sources []io.ReadSeeker, w io.Writer) error { 43 + ctx = log.WithLogValues(ctx, "mediafunc", "CombineSegmentsUnsigned") 44 + ctx, cancel := context.WithCancel(ctx) 45 + defer cancel() 46 + 47 + pipelineSlice := []string{ 48 + fmt.Sprintf("mp4mux name=muxer faststart=true interleave-bytes=%d interleave-time=%d movie-timescale=60000 trak-timescale=60000 ! appsink sync=false name=mp4sink", InterleaveBytes, InterleaveTime), 49 + "capsfilter caps=video/x-h264,parsed=true name=videoqueue ! queue ! muxer.", 50 + "capsfilter caps=audio/x-opus,framed=true name=audioparse ! queue ! muxer.", 51 + } 52 + 53 + pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 54 + if err != nil { 55 + return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 56 + } 57 + 58 + segCh := make(chan *bus.Seg) 59 + go func() { 60 + for _, source := range sources { 61 + bs, err := io.ReadAll(source) 62 + if err != nil { 63 + err = fmt.Errorf("failed to read file: %w", err) 64 + pipeline.Error(err.Error(), err) 65 + return 66 + } 67 + segCh <- &bus.Seg{ 68 + Filepath: "ignored", 69 + Data: bs, 70 + } 71 + } 72 + close(segCh) 73 + }() 74 + 75 + concatBin, err := ConcatBin(ctx, segCh) 76 + if err != nil { 77 + return fmt.Errorf("failed to create concat bin: %w", err) 78 + } 79 + 80 + err = pipeline.Add(concatBin.Element) 81 + if err != nil { 82 + return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 83 + } 84 + 85 + videoPad := concatBin.GetStaticPad("video_0") 86 + if videoPad == nil { 87 + return fmt.Errorf("video pad not found") 88 + } 89 + 90 + audioPad := concatBin.GetStaticPad("audio_0") 91 + if audioPad == nil { 92 + return fmt.Errorf("audio pad not found") 93 + } 94 + 95 + // Get the videoparse and audioparse elements from the pipeline 96 + videoQueue, err := pipeline.GetElementByName("videoqueue") 97 + if err != nil { 98 + return fmt.Errorf("failed to get video parse element: %w", err) 99 + } 100 + 101 + audioParse, err := pipeline.GetElementByName("audioparse") 102 + if err != nil { 103 + return fmt.Errorf("failed to get audio parse element: %w", err) 104 + } 105 + 106 + // Link the concat bin pads to the parse element sink pads 107 + linked := videoPad.Link(videoQueue.GetStaticPad("sink")) 108 + if linked != gst.PadLinkOK { 109 + return fmt.Errorf("failed to link video pad to video parse element: %v", linked) 110 + } 111 + 112 + linked = audioPad.Link(audioParse.GetStaticPad("sink")) 113 + if linked != gst.PadLinkOK { 114 + return fmt.Errorf("failed to link audio pad to audio parse element: %v", linked) 115 + } 116 + 117 + // Get the mp4sink element and set up its callback 118 + mp4Sink, err := pipeline.GetElementByName("mp4sink") 119 + if err != nil { 120 + return fmt.Errorf("failed to get mp4sink element: %w", err) 121 + } 122 + 123 + appSink := app.SinkFromElement(mp4Sink) 124 + appSink.SetCallbacks(&app.SinkCallbacks{ 125 + NewSampleFunc: WriterNewSample(ctx, w), 126 + }) 127 + 128 + // Start the pipeline 129 + err = pipeline.SetState(gst.StatePlaying) 130 + if err != nil { 131 + return fmt.Errorf("failed to set pipeline state to playing: %w", err) 132 + } 133 + defer func() { 134 + err := pipeline.BlockSetState(gst.StateNull) 135 + if err != nil { 136 + log.Error(ctx, "failed to set pipeline state to null", "error", err) 137 + } 138 + }() 139 + 140 + // Handle bus messages 141 + err = HandleBusMessages(ctx, pipeline) 142 + if err != nil { 143 + return fmt.Errorf("failed to handle bus messages: %w", err) 144 + } 145 + 146 + if err != nil { 147 + return fmt.Errorf("pipeline error: %w", err) 148 + } 149 + 150 + return nil 151 + }