Live video on the AT Protocol
at eli/github-skip-darwin 159 lines 4.8 kB view raw
1package misttriggers 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "net/http" 8 9 "github.com/golang/glog" 10 "stream.place/streamplace/pkg/errors" 11) 12 13// This trigger is run whenever the live buffer state of a stream changes. It is 14// not ran for VoD streams. This trigger is stream-specific and non-blocking. 15// 16// The payload for this trigger is multiple lines, each separated by a single 17// newline character (without an ending newline), containing data as such: 18// 19// stream name 20// stream state (one of: FULL, EMPTY, DRY, RECOVER) 21// {JSON object with stream details, only when state is not EMPTY} 22// 23// Read the Mist documentation for more details on each of the stream states. 24func (d *MistCallbackHandlersCollection) TriggerStreamBuffer(ctx context.Context, w http.ResponseWriter, req *http.Request, payload MistTriggerBody) { 25 sessionID := req.Header.Get("X-UUID") 26 27 body, err := ParseStreamBufferPayload(payload) 28 if err != nil { 29 glog.Infof("Error parsing STREAM_BUFFER payload error=%q payload=%q", err, string(payload)) 30 errors.WriteHTTPBadRequest(w, "Error parsing STREAM_BUFFER payload", err) 31 return 32 } 33 34 rawBody, _ := json.Marshal(body) 35 go d.broker.TriggerStreamBuffer(ctx, body) 36 glog.Infof("Got STREAM_BUFFER trigger sessionId=%q payload=%s", sessionID, rawBody) 37} 38 39type StreamHealthPayload struct { 40 StreamName string `json:"stream_name"` 41 SessionID string `json:"session_id"` 42 IsActive bool `json:"is_active"` 43 44 IsHealthy bool `json:"is_healthy"` 45 Issues string `json:"issues,omitempty"` 46 HumanIssues []string `json:"human_issues,omitempty"` 47 48 Tracks map[string]TrackDetails `json:"tracks,omitempty"` 49 Extra map[string]any `json:"extra,omitempty"` 50} 51 52type StreamBufferPayload struct { 53 StreamName string 54 State string 55 Details *MistStreamDetails 56} 57 58func (s *StreamBufferPayload) IsEmpty() bool { 59 return s.State == "EMPTY" 60} 61 62func (s *StreamBufferPayload) IsFull() bool { 63 return s.State == "FULL" 64} 65 66func (s *StreamBufferPayload) IsRecover() bool { 67 return s.State == "RECOVER" 68} 69 70type TrackDetails struct { 71 Codec string `json:"codec"` 72 Kbits int `json:"kbits"` 73 Keys map[string]any `json:"keys"` 74 Fpks int `json:"fpks,omitempty"` 75 Height int `json:"height,omitempty"` 76 Width int `json:"width,omitempty"` 77} 78 79func ParseStreamBufferPayload(payload MistTriggerBody) (*StreamBufferPayload, error) { 80 lines := payload.Lines() 81 if len(lines) < 2 || len(lines) > 3 { 82 return nil, fmt.Errorf("invalid payload: expected 2 or 3 lines but got %d", len(lines)) 83 } 84 85 streamName := lines[0] 86 streamState := lines[1] 87 var streamDetailsStr string 88 if len(lines) == 3 { 89 streamDetailsStr = lines[2] 90 } 91 92 streamDetails, err := ParseMistStreamDetails(streamState, []byte(streamDetailsStr)) 93 if err != nil { 94 return nil, fmt.Errorf("error parsing stream details JSON: %w", err) 95 } 96 97 return &StreamBufferPayload{ 98 StreamName: streamName, 99 State: streamState, 100 Details: streamDetails, 101 }, nil 102} 103 104type MistStreamDetails struct { 105 Tracks map[string]TrackDetails 106 Issues string 107 HumanIssues []string 108 Extra map[string]any 109} 110 111// Mists sends the track detail objects in the same JSON object as other 112// non-object fields (string and array issues and numeric metrics). So we need 113// to parse them separately and do a couple of JSON juggling here. 114// e.g. {track-id-1: {...}, issues: "a string", human_issues: ["a", "b"], "jitter": 32} 115func ParseMistStreamDetails(streamState string, data []byte) (*MistStreamDetails, error) { 116 if streamState == "EMPTY" { 117 return nil, nil 118 } 119 120 var issues struct { 121 Issues string `json:"issues"` 122 HumanIssues []string `json:"human_issues"` 123 } 124 err := json.Unmarshal(data, &issues) 125 if err != nil { 126 return nil, fmt.Errorf("error unmarshalling issues JSON: %w", err) 127 } 128 129 var tracksAndIssues map[string]any 130 err = json.Unmarshal(data, &tracksAndIssues) 131 if err != nil { 132 return nil, fmt.Errorf("error unmarshalling JSON: %w", err) 133 } 134 delete(tracksAndIssues, "issues") 135 delete(tracksAndIssues, "human_issues") 136 137 extra := map[string]any{} 138 for key, val := range tracksAndIssues { 139 if _, isObj := val.(map[string]any); isObj { 140 // this is a track, it will be parsed from the serialized obj below 141 continue 142 } else { 143 extra[key] = val 144 delete(tracksAndIssues, key) 145 } 146 } 147 148 tracksJSON, err := json.Marshal(tracksAndIssues) // only tracks now 149 if err != nil { 150 return nil, fmt.Errorf("error marshalling stream details tracks: %w", err) 151 } 152 153 var tracks map[string]TrackDetails 154 if err = json.Unmarshal(tracksJSON, &tracks); err != nil { 155 return nil, fmt.Errorf("error parsing stream details tracks: %w", err) 156 } 157 158 return &MistStreamDetails{tracks, issues.Issues, issues.HumanIssues, extra}, nil 159}