Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.7.28 96 lines 2.8 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 "github.com/go-gst/go-gst/gst" 9 "stream.place/streamplace/pkg/log" 10) 11 12func HandleBusMessages(ctx context.Context, pipeline *gst.Pipeline) error { 13 return HandleBusMessagesCustom(ctx, pipeline, nil) 14} 15 16func HandleBusMessagesCustom(ctx context.Context, pipeline *gst.Pipeline, handler func(msg *gst.Message)) error { 17 for { 18 if ctx.Err() != nil { 19 return ctx.Err() 20 } 21 msg := pipeline.GetPipelineBus().PopMessage(gst.ClockTime(time.Second * 1)) 22 if msg == nil { 23 continue 24 } 25 if handler != nil { 26 handler(msg) 27 } 28 switch msg.Type() { 29 case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop 30 log.Debug(ctx, "got gst.MessageEOS, exiting") 31 return nil 32 case gst.MessageError: // Error messages are always fatal 33 err := msg.ParseError() 34 if err.Error() == fmt.Sprintf("%s: %s", ErrConcatDone.Error(), ErrConcatDone.Error()) { 35 log.Debug(ctx, "got ErrConcatDone, exiting") 36 return nil 37 } 38 log.Error(ctx, "gstreamer error", "error", err.Error()) 39 if debug := err.DebugString(); debug != "" { 40 log.Debug(ctx, "gstreamer debug", "message", debug) 41 } 42 return fmt.Errorf("gstreamer error: %w", err) 43 case gst.MessageElement: 44 // this one is noisy and not useful 45 default: 46 log.Debug(ctx, msg.String()) 47 } 48 } 49} 50 51// func HandleBusMessages(ctx context.Context, pipeline *gst.Pipeline) error { 52// return HandleBusMessagesCustom(ctx, pipeline, nil) 53// } 54 55// func HandleBusMessagesCustom(ctx context.Context, pipeline *gst.Pipeline, handler func(msg *gst.Message)) error { 56// msgCh := make(chan *gst.Message, 1024) 57// bus := pipeline.GetPipelineBus() 58// bus.SetSyncHandler(func(msg *gst.Message) gst.BusSyncReply { 59// if ctx.Err() != nil { 60// log.Error(ctx, "context cancelled, dropping message", "message", msg.String()) 61// msg.Unref() 62// return gst.BusDrop 63// } 64// log.Error(ctx, "got message", "message", msg.String()) 65// msgCh <- msg 66// return gst.BusDrop 67// }) 68// for { 69// if ctx.Err() != nil { 70// return ctx.Err() 71// } 72// select { 73// case <-ctx.Done(): 74// return nil 75// case msg := <-msgCh: 76// if handler != nil { 77// handler(msg) 78// } 79// switch msg.Type() { 80// case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop 81// log.Debug(ctx, "got gst.MessageEOS, exiting") 82// return nil 83// case gst.MessageError: // Error messages are always fatal 84// err := msg.ParseError() 85// log.Error(ctx, "gstreamer error", "error", err.Error()) 86// if debug := err.DebugString(); debug != "" { 87// log.Debug(ctx, "gstreamer debug", "message", debug) 88// } 89// return fmt.Errorf("gstreamer error: %w", err) 90// default: 91// log.Debug(ctx, msg.String()) 92// msg.Unref() 93// } 94// } 95// } 96// }