Live video on the AT Protocol
at eli/postgres 185 lines 5.1 kB view raw
1package media 2 3// func (mm *MediaManager) MP4Playback(ctx context.Context, user string, rendition string, w io.Writer) error { 4// uu, err := uuid.NewV7() 5// if err != nil { 6// return err 7// } 8// ctx = log.WithLogValues(ctx, "playbackID", uu.String()) 9// ctx, cancel := context.WithCancel(ctx) 10 11// ctx = log.WithLogValues(ctx, "mediafunc", "MP4Playback") 12 13// pipelineSlice := []string{ 14// "mp4mux name=muxer fragment-mode=first-moov-then-finalise fragment-duration=1000 streamable=true ! appsink name=mp4sink", 15// "h264parse name=videoparse ! muxer.", 16// "opusparse name=audioparse ! muxer.", 17// } 18 19// pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 20// if err != nil { 21// return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 22// } 23 24// go func() { 25// HandleBusMessages(ctx, pipeline) 26// cancel() 27// }() 28 29// outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 30// if err != nil { 31// return fmt.Errorf("failed to get output queue: %w", err) 32// } 33// go func() { 34// select { 35// case <-ctx.Done(): 36// return 37// case <-done: 38// cancel() 39// } 40// }() 41 42// videoParse, err := pipeline.GetElementByName("videoparse") 43// if err != nil { 44// return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 45// } 46// err = outputQueue.Link(videoParse) 47// if err != nil { 48// return fmt.Errorf("failed to link output queue to video parse: %w", err) 49// } 50 51// audioParse, err := pipeline.GetElementByName("audioparse") 52// if err != nil { 53// return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 54// } 55// err = outputQueue.Link(audioParse) 56// if err != nil { 57// return fmt.Errorf("failed to link output queue to audio parse: %w", err) 58// } 59 60// go func() { 61// ticker := time.NewTicker(time.Second * 1) 62// for { 63// select { 64// case <-ctx.Done(): 65// return 66// case <-ticker.C: 67// state := pipeline.GetCurrentState() 68// log.Debug(ctx, "pipeline state", "state", state) 69// } 70// } 71// }() 72 73// mp4sinkele, err := pipeline.GetElementByName("mp4sink") 74// if err != nil { 75// return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 76// } 77// mp4sink := app.SinkFromElement(mp4sinkele) 78// mp4sink.SetCallbacks(&app.SinkCallbacks{ 79// NewSampleFunc: WriterNewSample(ctx, w), 80// EOSFunc: func(sink *app.Sink) { 81// log.Warn(ctx, "mp4sink EOSFunc") 82// cancel() 83// }, 84// }) 85 86// pipeline.SetState(gst.StatePlaying) 87 88// <-ctx.Done() 89 90// pipeline.BlockSetState(gst.StateNull) 91 92// return nil 93// } 94 95// func (mm *MediaManager) MKVPlayback(ctx context.Context, user string, rendition string, w io.Writer) error { 96// uu, err := uuid.NewV7() 97// if err != nil { 98// return err 99// } 100// ctx = log.WithLogValues(ctx, "playbackID", uu.String()) 101// ctx, cancel := context.WithCancel(ctx) 102 103// ctx = log.WithLogValues(ctx, "mediafunc", "MKVPlayback") 104 105// pipelineSlice := []string{ 106// "matroskamux name=muxer streamable=true ! appsink name=mkvsink", 107// "h264parse name=videoparse ! muxer.", 108// "opusparse name=audioparse ! muxer.", 109// } 110 111// pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 112// if err != nil { 113// return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 114// } 115 116// go func() { 117// HandleBusMessages(ctx, pipeline) 118// cancel() 119// }() 120 121// outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 122// if err != nil { 123// return fmt.Errorf("failed to get output queue: %w", err) 124// } 125// go func() { 126// select { 127// case <-ctx.Done(): 128// return 129// case <-done: 130// cancel() 131// } 132// }() 133 134// videoParse, err := pipeline.GetElementByName("videoparse") 135// if err != nil { 136// return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 137// } 138// err = outputQueue.Link(videoParse) 139// if err != nil { 140// return fmt.Errorf("failed to link output queue to video parse: %w", err) 141// } 142 143// audioParse, err := pipeline.GetElementByName("audioparse") 144// if err != nil { 145// return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 146// } 147// err = outputQueue.Link(audioParse) 148// if err != nil { 149// return fmt.Errorf("failed to link output queue to audio parse: %w", err) 150// } 151 152// go func() { 153// ticker := time.NewTicker(time.Second * 1) 154// for { 155// select { 156// case <-ctx.Done(): 157// return 158// case <-ticker.C: 159// state := pipeline.GetCurrentState() 160// log.Debug(ctx, "pipeline state", "state", state) 161// } 162// } 163// }() 164 165// mkvsinkele, err := pipeline.GetElementByName("mkvsink") 166// if err != nil { 167// return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 168// } 169// mkvsink := app.SinkFromElement(mkvsinkele) 170// mkvsink.SetCallbacks(&app.SinkCallbacks{ 171// NewSampleFunc: WriterNewSample(ctx, w), 172// EOSFunc: func(sink *app.Sink) { 173// log.Warn(ctx, "mp4sink EOSFunc") 174// cancel() 175// }, 176// }) 177 178// pipeline.SetState(gst.StatePlaying) 179 180// <-ctx.Done() 181 182// pipeline.BlockSetState(gst.StateNull) 183 184// return nil 185// }