Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "strconv"
8 "strings"
9
10 "github.com/go-gst/go-gst/gst"
11 "github.com/go-gst/go-gst/gst/app"
12 "go.opentelemetry.io/otel"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/model"
15)
16
17func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*model.SegmentMediaData, error) {
18 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData")
19 defer span.End()
20 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData")
21 ctx, cancel := context.WithCancel(ctx)
22 defer cancel()
23 pipelineSlice := []string{
24 "appsrc name=appsrc ! qtdemux name=demux ! fakesink sync=false",
25 }
26
27 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
28 if err != nil {
29 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
30 }
31
32 var videoMetadata *model.SegmentMediadataVideo
33 var audioMetadata *model.SegmentMediadataAudio
34
35 appsrc, err := pipeline.GetElementByName("appsrc")
36 if err != nil {
37 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
38 }
39
40 src := app.SrcFromElement(appsrc)
41 src.SetCallbacks(&app.SourceCallbacks{
42 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(mp4bs)),
43 })
44
45 onPadAdded := func(element *gst.Element, pad *gst.Pad) {
46 caps := pad.GetCurrentCaps()
47 if caps == nil {
48 log.Warn(ctx, "Unable to get pad caps")
49 cancel()
50 return
51 }
52
53 structure := caps.GetStructureAt(0)
54 if structure == nil {
55 log.Warn(ctx, "Unable to get structure from caps")
56 cancel()
57 return
58 }
59
60 name := structure.Name()
61
62 if name[:5] == "video" {
63 videoMetadata = &model.SegmentMediadataVideo{}
64 // Get some common video properties
65 widthVal, _ := structure.GetValue("width")
66 heightVal, _ := structure.GetValue("height")
67
68 width, ok := widthVal.(int)
69 if ok {
70 videoMetadata.Width = width
71 }
72 height, ok := heightVal.(int)
73 if ok {
74 videoMetadata.Height = height
75 }
76 framerateVal, _ := structure.GetValue("framerate")
77 framerateStr := fmt.Sprintf("%v", framerateVal)
78 parts := strings.Split(framerateStr, "/")
79 num := 0
80 den := 0
81 if len(parts) == 2 {
82 num, _ = strconv.Atoi(parts[0])
83 den, _ = strconv.Atoi(parts[1])
84 }
85 if num != 0 && den != 0 {
86 videoMetadata.FPSNum = num
87 videoMetadata.FPSDen = den
88 }
89 }
90
91 if name[:5] == "audio" {
92 audioMetadata = &model.SegmentMediadataAudio{}
93 // Get some common audio properties
94 rateVal, _ := structure.GetValue("rate")
95 channelsVal, _ := structure.GetValue("channels")
96
97 rate, ok := rateVal.(int)
98 if ok {
99 audioMetadata.Rate = rate
100 }
101 channels, ok := channelsVal.(int)
102 if ok {
103 audioMetadata.Channels = channels
104 }
105 }
106
107 // if videoMetadata != nil && audioMetadata != nil {
108 // cancel()
109 // }
110 }
111
112 demux, err := pipeline.GetElementByName("demux")
113 if err != nil {
114 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
115 }
116 _, err = demux.Connect("pad-added", onPadAdded)
117 if err != nil {
118 return nil, fmt.Errorf("error connecting pad-add: %w", err)
119 }
120
121 go func() {
122 if err := HandleBusMessages(ctx, pipeline); err != nil {
123 log.Log(ctx, "pipeline error", "error", err)
124 }
125 cancel()
126 }()
127
128 // Start the pipeline
129 if err := pipeline.SetState(gst.StatePlaying); err != nil {
130 return nil, err
131 }
132
133 <-ctx.Done()
134
135 meta := &model.SegmentMediaData{
136 Video: []*model.SegmentMediadataVideo{videoMetadata},
137 Audio: []*model.SegmentMediadataAudio{audioMetadata},
138 }
139
140 ok, dur := pipeline.QueryDuration(gst.FormatTime)
141 if !ok {
142 return nil, fmt.Errorf("error getting duration")
143 } else {
144 meta.Duration = dur
145 }
146
147 if err := pipeline.BlockSetState(gst.StateNull); err != nil {
148 return nil, err
149 }
150
151 return meta, nil
152}