Live video on the AT Protocol
1package misttriggers
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "net/http"
8
9 "github.com/golang/glog"
10 "stream.place/streamplace/pkg/errors"
11)
12
13// This trigger is run whenever the live buffer state of a stream changes. It is
14// not ran for VoD streams. This trigger is stream-specific and non-blocking.
15//
16// The payload for this trigger is multiple lines, each separated by a single
17// newline character (without an ending newline), containing data as such:
18//
19// stream name
20// stream state (one of: FULL, EMPTY, DRY, RECOVER)
21// {JSON object with stream details, only when state is not EMPTY}
22//
23// Read the Mist documentation for more details on each of the stream states.
24func (d *MistCallbackHandlersCollection) TriggerStreamBuffer(ctx context.Context, w http.ResponseWriter, req *http.Request, payload MistTriggerBody) {
25 sessionID := req.Header.Get("X-UUID")
26
27 body, err := ParseStreamBufferPayload(payload)
28 if err != nil {
29 glog.Infof("Error parsing STREAM_BUFFER payload error=%q payload=%q", err, string(payload))
30 errors.WriteHTTPBadRequest(w, "Error parsing STREAM_BUFFER payload", err)
31 return
32 }
33
34 rawBody, _ := json.Marshal(body)
35 go d.broker.TriggerStreamBuffer(ctx, body)
36 glog.Infof("Got STREAM_BUFFER trigger sessionId=%q payload=%s", sessionID, rawBody)
37}
38
39type StreamHealthPayload struct {
40 StreamName string `json:"stream_name"`
41 SessionID string `json:"session_id"`
42 IsActive bool `json:"is_active"`
43
44 IsHealthy bool `json:"is_healthy"`
45 Issues string `json:"issues,omitempty"`
46 HumanIssues []string `json:"human_issues,omitempty"`
47
48 Tracks map[string]TrackDetails `json:"tracks,omitempty"`
49 Extra map[string]any `json:"extra,omitempty"`
50}
51
52type StreamBufferPayload struct {
53 StreamName string
54 State string
55 Details *MistStreamDetails
56}
57
58func (s *StreamBufferPayload) IsEmpty() bool {
59 return s.State == "EMPTY"
60}
61
62func (s *StreamBufferPayload) IsFull() bool {
63 return s.State == "FULL"
64}
65
66func (s *StreamBufferPayload) IsRecover() bool {
67 return s.State == "RECOVER"
68}
69
70type TrackDetails struct {
71 Codec string `json:"codec"`
72 Kbits int `json:"kbits"`
73 Keys map[string]any `json:"keys"`
74 Fpks int `json:"fpks,omitempty"`
75 Height int `json:"height,omitempty"`
76 Width int `json:"width,omitempty"`
77}
78
79func ParseStreamBufferPayload(payload MistTriggerBody) (*StreamBufferPayload, error) {
80 lines := payload.Lines()
81 if len(lines) < 2 || len(lines) > 3 {
82 return nil, fmt.Errorf("invalid payload: expected 2 or 3 lines but got %d", len(lines))
83 }
84
85 streamName := lines[0]
86 streamState := lines[1]
87 var streamDetailsStr string
88 if len(lines) == 3 {
89 streamDetailsStr = lines[2]
90 }
91
92 streamDetails, err := ParseMistStreamDetails(streamState, []byte(streamDetailsStr))
93 if err != nil {
94 return nil, fmt.Errorf("error parsing stream details JSON: %w", err)
95 }
96
97 return &StreamBufferPayload{
98 StreamName: streamName,
99 State: streamState,
100 Details: streamDetails,
101 }, nil
102}
103
104type MistStreamDetails struct {
105 Tracks map[string]TrackDetails
106 Issues string
107 HumanIssues []string
108 Extra map[string]any
109}
110
111// Mists sends the track detail objects in the same JSON object as other
112// non-object fields (string and array issues and numeric metrics). So we need
113// to parse them separately and do a couple of JSON juggling here.
114// e.g. {track-id-1: {...}, issues: "a string", human_issues: ["a", "b"], "jitter": 32}
115func ParseMistStreamDetails(streamState string, data []byte) (*MistStreamDetails, error) {
116 if streamState == "EMPTY" {
117 return nil, nil
118 }
119
120 var issues struct {
121 Issues string `json:"issues"`
122 HumanIssues []string `json:"human_issues"`
123 }
124 err := json.Unmarshal(data, &issues)
125 if err != nil {
126 return nil, fmt.Errorf("error unmarshalling issues JSON: %w", err)
127 }
128
129 var tracksAndIssues map[string]any
130 err = json.Unmarshal(data, &tracksAndIssues)
131 if err != nil {
132 return nil, fmt.Errorf("error unmarshalling JSON: %w", err)
133 }
134 delete(tracksAndIssues, "issues")
135 delete(tracksAndIssues, "human_issues")
136
137 extra := map[string]any{}
138 for key, val := range tracksAndIssues {
139 if _, isObj := val.(map[string]any); isObj {
140 // this is a track, it will be parsed from the serialized obj below
141 continue
142 } else {
143 extra[key] = val
144 delete(tracksAndIssues, key)
145 }
146 }
147
148 tracksJSON, err := json.Marshal(tracksAndIssues) // only tracks now
149 if err != nil {
150 return nil, fmt.Errorf("error marshalling stream details tracks: %w", err)
151 }
152
153 var tracks map[string]TrackDetails
154 if err = json.Unmarshal(tracksJSON, &tracks); err != nil {
155 return nil, fmt.Errorf("error parsing stream details tracks: %w", err)
156 }
157
158 return &MistStreamDetails{tracks, issues.Issues, issues.HumanIssues, extra}, nil
159}