Live video on the AT Protocol
at next 96 lines 2.8 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 "github.com/go-gst/go-gst/gst" 9 "stream.place/streamplace/pkg/log" 10) 11 12func HandleBusMessages(ctx context.Context, pipeline *gst.Pipeline) error { 13 return HandleBusMessagesCustom(ctx, pipeline, nil) 14} 15 16func HandleBusMessagesCustom(ctx context.Context, pipeline *gst.Pipeline, handler func(msg *gst.Message)) error { 17 for { 18 if ctx.Err() != nil { 19 return ctx.Err() 20 } 21 msg := pipeline.GetPipelineBus().PopMessage(gst.ClockTime(time.Second * 1)) 22 if msg == nil { 23 continue 24 } 25 if handler != nil { 26 handler(msg) 27 } 28 switch msg.Type() { 29 case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop 30 log.Debug(ctx, "got gst.MessageEOS, exiting") 31 return nil 32 case gst.MessageError: // Error messages are always fatal 33 err := msg.ParseError() 34 if err.Error() == fmt.Sprintf("%s: %s", ErrConcatDone.Error(), ErrConcatDone.Error()) { 35 log.Debug(ctx, "got ErrConcatDone, exiting") 36 return nil 37 } 38 log.Error(ctx, "gstreamer error", "error", err.Error()) 39 if debug := err.DebugString(); debug != "" { 40 log.Debug(ctx, "gstreamer debug", "message", debug) 41 } 42 return fmt.Errorf("gstreamer error: %w", err) 43 case gst.MessageElement: 44 // this one is noisy and not useful 45 default: 46 log.Debug(ctx, msg.String()) 47 } 48 } 49} 50 51// func HandleBusMessages(ctx context.Context, pipeline *gst.Pipeline) error { 52// return HandleBusMessagesCustom(ctx, pipeline, nil) 53// } 54 55// func HandleBusMessagesCustom(ctx context.Context, pipeline *gst.Pipeline, handler func(msg *gst.Message)) error { 56// msgCh := make(chan *gst.Message, 1024) 57// bus := pipeline.GetPipelineBus() 58// bus.SetSyncHandler(func(msg *gst.Message) gst.BusSyncReply { 59// if ctx.Err() != nil { 60// log.Error(ctx, "context cancelled, dropping message", "message", msg.String()) 61// msg.Unref() 62// return gst.BusDrop 63// } 64// log.Error(ctx, "got message", "message", msg.String()) 65// msgCh <- msg 66// return gst.BusDrop 67// }) 68// for { 69// if ctx.Err() != nil { 70// return ctx.Err() 71// } 72// select { 73// case <-ctx.Done(): 74// return nil 75// case msg := <-msgCh: 76// if handler != nil { 77// handler(msg) 78// } 79// switch msg.Type() { 80// case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop 81// log.Debug(ctx, "got gst.MessageEOS, exiting") 82// return nil 83// case gst.MessageError: // Error messages are always fatal 84// err := msg.ParseError() 85// log.Error(ctx, "gstreamer error", "error", err.Error()) 86// if debug := err.DebugString(); debug != "" { 87// log.Debug(ctx, "gstreamer debug", "message", debug) 88// } 89// return fmt.Errorf("gstreamer error: %w", err) 90// default: 91// log.Debug(ctx, msg.String()) 92// msg.Unref() 93// } 94// } 95// } 96// }