Live video on the AT Protocol
at eli/database-resync 152 lines 3.7 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "strconv" 8 "strings" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" 12 "go.opentelemetry.io/otel" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/model" 15) 16 17func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*model.SegmentMediaData, error) { 18 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData") 19 defer span.End() 20 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData") 21 ctx, cancel := context.WithCancel(ctx) 22 defer cancel() 23 pipelineSlice := []string{ 24 "appsrc name=appsrc ! qtdemux name=demux ! fakesink sync=false", 25 } 26 27 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 28 if err != nil { 29 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 30 } 31 32 var videoMetadata *model.SegmentMediadataVideo 33 var audioMetadata *model.SegmentMediadataAudio 34 35 appsrc, err := pipeline.GetElementByName("appsrc") 36 if err != nil { 37 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 38 } 39 40 src := app.SrcFromElement(appsrc) 41 src.SetCallbacks(&app.SourceCallbacks{ 42 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(mp4bs)), 43 }) 44 45 onPadAdded := func(element *gst.Element, pad *gst.Pad) { 46 caps := pad.GetCurrentCaps() 47 if caps == nil { 48 log.Warn(ctx, "Unable to get pad caps") 49 cancel() 50 return 51 } 52 53 structure := caps.GetStructureAt(0) 54 if structure == nil { 55 log.Warn(ctx, "Unable to get structure from caps") 56 cancel() 57 return 58 } 59 60 name := structure.Name() 61 62 if name[:5] == "video" { 63 videoMetadata = &model.SegmentMediadataVideo{} 64 // Get some common video properties 65 widthVal, _ := structure.GetValue("width") 66 heightVal, _ := structure.GetValue("height") 67 68 width, ok := widthVal.(int) 69 if ok { 70 videoMetadata.Width = width 71 } 72 height, ok := heightVal.(int) 73 if ok { 74 videoMetadata.Height = height 75 } 76 framerateVal, _ := structure.GetValue("framerate") 77 framerateStr := fmt.Sprintf("%v", framerateVal) 78 parts := strings.Split(framerateStr, "/") 79 num := 0 80 den := 0 81 if len(parts) == 2 { 82 num, _ = strconv.Atoi(parts[0]) 83 den, _ = strconv.Atoi(parts[1]) 84 } 85 if num != 0 && den != 0 { 86 videoMetadata.FPSNum = num 87 videoMetadata.FPSDen = den 88 } 89 } 90 91 if name[:5] == "audio" { 92 audioMetadata = &model.SegmentMediadataAudio{} 93 // Get some common audio properties 94 rateVal, _ := structure.GetValue("rate") 95 channelsVal, _ := structure.GetValue("channels") 96 97 rate, ok := rateVal.(int) 98 if ok { 99 audioMetadata.Rate = rate 100 } 101 channels, ok := channelsVal.(int) 102 if ok { 103 audioMetadata.Channels = channels 104 } 105 } 106 107 // if videoMetadata != nil && audioMetadata != nil { 108 // cancel() 109 // } 110 } 111 112 demux, err := pipeline.GetElementByName("demux") 113 if err != nil { 114 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 115 } 116 _, err = demux.Connect("pad-added", onPadAdded) 117 if err != nil { 118 return nil, fmt.Errorf("error connecting pad-add: %w", err) 119 } 120 121 go func() { 122 if err := HandleBusMessages(ctx, pipeline); err != nil { 123 log.Log(ctx, "pipeline error", "error", err) 124 } 125 cancel() 126 }() 127 128 // Start the pipeline 129 if err := pipeline.SetState(gst.StatePlaying); err != nil { 130 return nil, err 131 } 132 133 <-ctx.Done() 134 135 meta := &model.SegmentMediaData{ 136 Video: []*model.SegmentMediadataVideo{videoMetadata}, 137 Audio: []*model.SegmentMediadataAudio{audioMetadata}, 138 } 139 140 ok, dur := pipeline.QueryDuration(gst.FormatTime) 141 if !ok { 142 return nil, fmt.Errorf("error getting duration") 143 } else { 144 meta.Duration = dur 145 } 146 147 if err := pipeline.BlockSetState(gst.StateNull); err != nil { 148 return nil, err 149 } 150 151 return meta, nil 152}