Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/go-gst/go-gst/gst"
9 "stream.place/streamplace/pkg/log"
10)
11
12func HandleBusMessages(ctx context.Context, pipeline *gst.Pipeline) error {
13 return HandleBusMessagesCustom(ctx, pipeline, nil)
14}
15
16func HandleBusMessagesCustom(ctx context.Context, pipeline *gst.Pipeline, handler func(msg *gst.Message)) error {
17 for {
18 if ctx.Err() != nil {
19 return ctx.Err()
20 }
21 msg := pipeline.GetPipelineBus().PopMessage(gst.ClockTime(time.Second * 1))
22 if msg == nil {
23 continue
24 }
25 if handler != nil {
26 handler(msg)
27 }
28 switch msg.Type() {
29 case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
30 log.Debug(ctx, "got gst.MessageEOS, exiting")
31 return nil
32 case gst.MessageError: // Error messages are always fatal
33 err := msg.ParseError()
34 if err.Error() == fmt.Sprintf("%s: %s", ErrConcatDone.Error(), ErrConcatDone.Error()) {
35 log.Debug(ctx, "got ErrConcatDone, exiting")
36 return nil
37 }
38 log.Error(ctx, "gstreamer error", "error", err.Error())
39 if debug := err.DebugString(); debug != "" {
40 log.Debug(ctx, "gstreamer debug", "message", debug)
41 }
42 return fmt.Errorf("gstreamer error: %w", err)
43 case gst.MessageElement:
44 // this one is noisy and not useful
45 default:
46 log.Debug(ctx, msg.String())
47 }
48 }
49}
50
51// func HandleBusMessages(ctx context.Context, pipeline *gst.Pipeline) error {
52// return HandleBusMessagesCustom(ctx, pipeline, nil)
53// }
54
55// func HandleBusMessagesCustom(ctx context.Context, pipeline *gst.Pipeline, handler func(msg *gst.Message)) error {
56// msgCh := make(chan *gst.Message, 1024)
57// bus := pipeline.GetPipelineBus()
58// bus.SetSyncHandler(func(msg *gst.Message) gst.BusSyncReply {
59// if ctx.Err() != nil {
60// log.Error(ctx, "context cancelled, dropping message", "message", msg.String())
61// msg.Unref()
62// return gst.BusDrop
63// }
64// log.Error(ctx, "got message", "message", msg.String())
65// msgCh <- msg
66// return gst.BusDrop
67// })
68// for {
69// if ctx.Err() != nil {
70// return ctx.Err()
71// }
72// select {
73// case <-ctx.Done():
74// return nil
75// case msg := <-msgCh:
76// if handler != nil {
77// handler(msg)
78// }
79// switch msg.Type() {
80// case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
81// log.Debug(ctx, "got gst.MessageEOS, exiting")
82// return nil
83// case gst.MessageError: // Error messages are always fatal
84// err := msg.ParseError()
85// log.Error(ctx, "gstreamer error", "error", err.Error())
86// if debug := err.DebugString(); debug != "" {
87// log.Debug(ctx, "gstreamer debug", "message", debug)
88// }
89// return fmt.Errorf("gstreamer error: %w", err)
90// default:
91// log.Debug(ctx, msg.String())
92// msg.Unref()
93// }
94// }
95// }
96// }