fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package media
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/go-gst/go-gst/gst"
9 "stream.place/streamplace/pkg/log"
10)
11
12func HandleBusMessages(ctx context.Context, pipeline *gst.Pipeline) error {
13 return HandleBusMessagesCustom(ctx, pipeline, nil)
14}
15
16func HandleBusMessagesCustom(ctx context.Context, pipeline *gst.Pipeline, handler func(msg *gst.Message)) error {
17 for {
18 if ctx.Err() != nil {
19 return ctx.Err()
20 }
21 msg := pipeline.GetPipelineBus().PopMessage(gst.ClockTime(time.Second * 1))
22 if msg == nil {
23 continue
24 }
25 if handler != nil {
26 handler(msg)
27 }
28 switch msg.Type() {
29 case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
30 log.Debug(ctx, "got gst.MessageEOS, exiting")
31 return nil
32 case gst.MessageError: // Error messages are always fatal
33 err := msg.ParseError()
34 if err.Error() == fmt.Sprintf("%s: %s", ErrConcatDone.Error(), ErrConcatDone.Error()) {
35 log.Debug(ctx, "got ErrConcatDone, exiting")
36 return nil
37 }
38 log.Error(ctx, "gstreamer error", "error", err.Error())
39 if debug := err.DebugString(); debug != "" {
40 log.Debug(ctx, "gstreamer debug", "message", debug)
41 }
42 return fmt.Errorf("gstreamer error: %w", err)
43 case gst.MessageElement:
44 // this one is noisy and not useful
45 default:
46 log.Debug(ctx, msg.String())
47 }
48 }
49}
50
51// func HandleBusMessages(ctx context.Context, pipeline *gst.Pipeline) error {
52// return HandleBusMessagesCustom(ctx, pipeline, nil)
53// }
54
55// func HandleBusMessagesCustom(ctx context.Context, pipeline *gst.Pipeline, handler func(msg *gst.Message)) error {
56// msgCh := make(chan *gst.Message, 1024)
57// bus := pipeline.GetPipelineBus()
58// bus.SetSyncHandler(func(msg *gst.Message) gst.BusSyncReply {
59// if ctx.Err() != nil {
60// log.Error(ctx, "context cancelled, dropping message", "message", msg.String())
61// msg.Unref()
62// return gst.BusDrop
63// }
64// log.Error(ctx, "got message", "message", msg.String())
65// msgCh <- msg
66// return gst.BusDrop
67// })
68// for {
69// if ctx.Err() != nil {
70// return ctx.Err()
71// }
72// select {
73// case <-ctx.Done():
74// return nil
75// case msg := <-msgCh:
76// if handler != nil {
77// handler(msg)
78// }
79// switch msg.Type() {
80// case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
81// log.Debug(ctx, "got gst.MessageEOS, exiting")
82// return nil
83// case gst.MessageError: // Error messages are always fatal
84// err := msg.ParseError()
85// log.Error(ctx, "gstreamer error", "error", err.Error())
86// if debug := err.DebugString(); debug != "" {
87// log.Debug(ctx, "gstreamer debug", "message", debug)
88// }
89// return fmt.Errorf("gstreamer error: %w", err)
90// default:
91// log.Debug(ctx, msg.String())
92// msg.Unref()
93// }
94// }
95// }
96// }