Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #420 from streamplace/eli/problem-detection

implement basic stream problem detection

authored by

Eli Mallon and committed by
GitHub
39451569 bfc3129a

+398 -26
hack/upload-fixture.sh
+107
js/app/components/live-dashboard/problems.tsx
··· 1 + import { useLivestreamStore } from "@streamplace/components"; 2 + import { LivestreamProblem } from "@streamplace/components/src/livestream-store/livestream-state"; 3 + import { ExternalLink } from "@tamagui/lucide-icons"; 4 + import { useState } from "react"; 5 + import { Linking, Pressable } from "react-native"; 6 + import { Button, H3, Text, View } from "tamagui"; 7 + 8 + const Problems = ({ 9 + probs, 10 + onIgnore, 11 + }: { 12 + probs: LivestreamProblem[]; 13 + onIgnore: () => void; 14 + }) => { 15 + return ( 16 + <View gap={"$3"}> 17 + <View> 18 + <H3>Optimize Your Stream</H3> 19 + <Text> 20 + We’ve found a few things that could improve your stream’s reliability. 21 + </Text> 22 + </View> 23 + {probs.map((p) => ( 24 + <View> 25 + <View gap="$2" key={p.message} flexDirection="row" ai="flex-start"> 26 + <Text 27 + borderRadius="$2" 28 + px="$2" 29 + width={82} 30 + textAlign="center" 31 + bg={ 32 + p.severity === "error" 33 + ? "$red8Dark" 34 + : p.severity === "warning" 35 + ? "$yellow8Dark" 36 + : "$blue8Dark" 37 + } 38 + > 39 + {p.severity} 40 + </Text> 41 + <View flex={1} gap="$1"> 42 + <Text>{p.code}</Text> 43 + <Text color="$gray11Dark" fontSize="$6"> 44 + {p.message} 45 + </Text> 46 + {p.link && ( 47 + <Pressable onPress={() => p.link && Linking.openURL(p.link)}> 48 + <View flexDirection="row" ai="center" gap="$2"> 49 + <Text color="$blue10" fontSize="$6"> 50 + Learn More 51 + </Text> 52 + <ExternalLink size="$1" /> 53 + </View> 54 + </Pressable> 55 + )} 56 + </View> 57 + </View> 58 + </View> 59 + ))} 60 + 61 + <Button onPress={onIgnore}> 62 + <Text>Ignore</Text> 63 + </Button> 64 + </View> 65 + ); 66 + }; 67 + 68 + export const ProblemsWrapper = ({ 69 + children, 70 + }: { 71 + children: React.ReactElement; 72 + }) => { 73 + const problems = useLivestreamStore((x) => x.problems); 74 + const [dismiss, setDismiss] = useState(false); 75 + 76 + return ( 77 + <View position="relative" f={1} ai="center" jc="center" fb={0}> 78 + {children} 79 + {problems.length > 0 && !dismiss && ( 80 + <View 81 + position="absolute" 82 + top={0} 83 + left={0} 84 + right={0} 85 + bottom={0} 86 + backgroundColor="rgba(0, 0, 0, 0.8)" 87 + ai="center" 88 + jc="flex-start" 89 + zIndex={100} 90 + padding="$8" 91 + > 92 + <View 93 + backgroundColor="$gray1" 94 + borderColor="$gray5" 95 + borderWidth={1} 96 + borderRadius="$4" 97 + padding="$4" 98 + maxWidth={700} 99 + width="100%" 100 + > 101 + <Problems probs={problems} onIgnore={() => setDismiss(true)} /> 102 + </View> 103 + </View> 104 + )} 105 + </View> 106 + ); 107 + };
+19 -13
js/app/src/screens/live-dashboard.tsx
··· 22 22 import { useAppDispatch, useAppSelector } from "store/hooks"; 23 23 import { Button, H3, H6, isWeb, Text, View } from "tamagui"; 24 24 25 + import { ProblemsWrapper } from "components/live-dashboard/problems"; 26 + 25 27 enum StreamSource { 26 28 Start, 27 29 Camera, ··· 120 122 {closeButton} 121 123 </View> 122 124 <View f={1} ai="center" jc="center" fb={0}> 123 - <ButtonSelector 124 - values={[ 125 - { label: "Create", value: "create" }, 126 - { label: "Update", value: "update" }, 127 - ]} 128 - disabledValues={isLive ? [] : ["update"]} 129 - selectedValue={page} 130 - setSelectedValue={setPage} 131 - maxWidth={250} 132 - width="100%" 133 - /> 134 - {page === "update" && isLive ? <UpdateLivestream /> : null} 135 - {page === "create" ? <CreateLivestream /> : null} 125 + <ProblemsWrapper> 126 + <> 127 + <ButtonSelector 128 + values={[ 129 + { label: "Create", value: "create" }, 130 + { label: "Update", value: "update" }, 131 + ]} 132 + disabledValues={isLive ? [] : ["update"]} 133 + selectedValue={page} 134 + setSelectedValue={setPage} 135 + maxWidth={250} 136 + width="100%" 137 + /> 138 + {page === "update" && isLive ? <UpdateLivestream /> : null} 139 + {page === "create" ? <CreateLivestream /> : null} 140 + </> 141 + </ProblemsWrapper> 136 142 </View> 137 143 {madeChoiceAboutDebugRecording ? null : <DebugRecordingPopup />} 138 144 </View>
+9
js/components/src/livestream-store/livestream-state.tsx
··· 15 15 viewers: number | null; 16 16 pendingHides: string[]; 17 17 segment: PlaceStreamSegment.Record | null; 18 + recentSegments: PlaceStreamSegment.Record[]; 19 + problems: LivestreamProblem[]; 18 20 renditions: PlaceStreamDefs.Rendition[]; 19 21 replyToMessage: ChatMessageViewHydrated | null; 20 22 streamKey: string | null; 21 23 setStreamKey: (key: string | null) => void; 22 24 } 25 + 26 + export interface LivestreamProblem { 27 + code: string; 28 + message: string; 29 + severity: "error" | "warning" | "info"; 30 + link?: string; 31 + }
+2
js/components/src/livestream-store/livestream-store.tsx
··· 20 20 streamKey: null, 21 21 setStreamKey: (sk) => set({ streamKey: sk }), 22 22 authors: {}, 23 + recentSegments: [], 24 + problems: [], 23 25 })); 24 26 }; 25 27
+96
js/components/src/livestream-store/problems.tsx
··· 1 + import { PlaceStreamSegment } from "streamplace"; 2 + import { LivestreamProblem } from "./livestream-state"; 3 + 4 + const VARIANCE_THRESHOLD = 0.5; 5 + const DURATION_THRESHOLD = 5000000000; // 5s in ns 6 + 7 + const detectVariableSegmentLength = ( 8 + segments: PlaceStreamSegment.Record[], 9 + ): { variable: boolean; duration: boolean } => { 10 + if (segments.length < 3) { 11 + // Need at least 3 segments to detect variability 12 + return { variable: false, duration: false }; 13 + } 14 + 15 + const durations = segments 16 + .map((segment) => segment.duration) 17 + .filter( 18 + (duration): duration is number => duration !== undefined && duration > 0, 19 + ); 20 + 21 + if (durations.length < 3) { 22 + return { variable: false, duration: false }; 23 + } 24 + 25 + // Calculate mean 26 + const mean = 27 + durations.reduce((sum: number, duration: number) => sum + duration, 0) / 28 + durations.length; 29 + 30 + // Calculate standard deviation 31 + const variance = 32 + durations.reduce((sum: number, duration: number) => { 33 + const diff = duration - mean; 34 + return sum + diff * diff; 35 + }, 0) / durations.length; 36 + const stdDev = Math.sqrt(variance); 37 + 38 + // Calculate coefficient of variation (CV) 39 + const cv = stdDev / mean; 40 + 41 + // CV > 0.5 indicates high variability 42 + // This threshold can be adjusted based on testing 43 + return { 44 + variable: cv > VARIANCE_THRESHOLD, 45 + duration: mean > DURATION_THRESHOLD, 46 + }; 47 + }; 48 + 49 + export const findProblems = ( 50 + segments: PlaceStreamSegment.Record[], 51 + ): LivestreamProblem[] => { 52 + const problems: LivestreamProblem[] = []; 53 + let hasBFrames = false; 54 + for (const segment of segments) { 55 + const video = segment.video?.[0]; 56 + if (!video) { 57 + // i mean yes this is a problem but it can't happen yet 58 + continue; 59 + } 60 + if (video.bframes === true) { 61 + hasBFrames = true; 62 + break; 63 + } 64 + } 65 + if (hasBFrames) { 66 + problems.push({ 67 + code: "bframes", 68 + message: 69 + "Your stream contains B-Frames, which are not supported in Streamplace. Your stream will stutter.", 70 + severity: "error", 71 + link: "https://stream.place/docs/guides/start-streaming/obs/#obs-configuration", 72 + }); 73 + } 74 + 75 + const { variable, duration } = detectVariableSegmentLength(segments); 76 + if (variable) { 77 + problems.push({ 78 + code: "variable_segment_length", 79 + message: 80 + "Your stream contains variable segment lengths, which may cause playback issues.", 81 + severity: "warning", 82 + link: "https://stream.place/docs/guides/start-streaming/obs/#obs-configuration", 83 + }); 84 + } 85 + if (duration) { 86 + problems.push({ 87 + code: "long_segments", 88 + message: 89 + "Your stream contains long segments (>5s). This will work fine, but increases the delay of the livestream.", 90 + severity: "warning", 91 + link: "https://stream.place/docs/guides/start-streaming/obs/#obs-configuration", 92 + }); 93 + } 94 + 95 + return problems; 96 + };
+10
js/components/src/livestream-store/websocket-consumer.tsx
··· 12 12 import { SystemMessages } from "../lib/system-messages"; 13 13 import { reduceChat } from "./chat"; 14 14 import { LivestreamState } from "./livestream-state"; 15 + import { findProblems } from "./problems"; 16 + 17 + const MAX_RECENT_SEGMENTS = 10; 15 18 16 19 export const handleWebSocketMessages = ( 17 20 state: LivestreamState, ··· 56 59 }; 57 60 state = reduceChat(state, [hydrated], [], []); 58 61 } else if (PlaceStreamSegment.isRecord(message)) { 62 + const newRecentSegments = [...state.recentSegments]; 63 + newRecentSegments.unshift(message); 64 + if (newRecentSegments.length > MAX_RECENT_SEGMENTS) { 65 + newRecentSegments.pop(); 66 + } 59 67 state = { 60 68 ...state, 61 69 segment: message as PlaceStreamSegment.Record, 70 + recentSegments: newRecentSegments, 71 + problems: findProblems(newRecentSegments), 62 72 }; 63 73 } else if (PlaceStreamDefs.isBlockView(message)) { 64 74 const block = message as PlaceStreamDefs.BlockView;
+1 -1
js/docs/src/content/docs/guides/start-streaming/obs.md
··· 26 26 6. Click "Generate Stream Key" 27 27 - The stream key will automatically be copied to your clipboard 28 28 29 - ### 2. Configure OBS Studio 29 + ### 2. Configure OBS Studio <a name="obs-configuration"></a> 30 30 31 31 #### 2a. Initial OBS Configuration 32 32
+4
js/docs/src/content/docs/lex-reference/place-stream-segment.md
··· 61 61 | `width` | `integer` | ✅ | | | 62 62 | `height` | `integer` | ✅ | | | 63 63 | `framerate` | [`#framerate`](#framerate) | ❌ | | | 64 + | `bframes` | `boolean` | ❌ | | | 64 65 65 66 --- 66 67 ··· 180 181 "framerate": { 181 182 "type": "ref", 182 183 "ref": "#framerate" 184 + }, 185 + "bframes": { 186 + "type": "boolean" 183 187 } 184 188 } 185 189 },
+2 -1
lexicons/place/stream/segment.json
··· 67 67 "framerate": { 68 68 "type": "ref", 69 69 "ref": "#framerate" 70 - } 70 + }, 71 + "bframes": { "type": "boolean" } 71 72 } 72 73 }, 73 74 "framerate": {
+5 -5
pkg/media/audio_smear.go
··· 137 137 NeedDataFunc: ReaderNeedData(ctx, input), 138 138 }) 139 139 140 + seg := SegmentData{ 141 + Audio: []SegmentBuffer{}, 142 + Video: []SegmentBuffer{}, 143 + } 144 + 140 145 audioSinkElem, err := pipeline.GetElementByName("audioappsink") 141 146 if err != nil { 142 147 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) ··· 144 149 audioSink := app.SinkFromElement(audioSinkElem) 145 150 if audioSink == nil { 146 151 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 147 - } 148 - 149 - seg := SegmentData{ 150 - Audio: []SegmentBuffer{}, 151 - Video: []SegmentBuffer{}, 152 152 } 153 153 154 154 audioSink.SetCallbacks(&app.SinkCallbacks{
+54 -1
pkg/media/media_data_parser.go
··· 21 21 ctx, cancel := context.WithCancel(ctx) 22 22 defer cancel() 23 23 pipelineSlice := []string{ 24 - "appsrc name=appsrc ! qtdemux name=demux ! fakesink sync=false", 24 + "appsrc name=appsrc ! qtdemux name=demux", 25 + "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! h264timestamper ! appsink sync=false name=videoappsink", 26 + "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", 25 27 } 26 28 27 29 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) ··· 118 120 return nil, fmt.Errorf("error connecting pad-add: %w", err) 119 121 } 120 122 123 + audioSinkElem, err := pipeline.GetElementByName("audioappsink") 124 + if err != nil { 125 + return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 126 + } 127 + audioSink := app.SinkFromElement(audioSinkElem) 128 + if audioSink == nil { 129 + return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 130 + } 131 + 132 + audioSink.SetCallbacks(&app.SinkCallbacks{ 133 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 134 + sample := sink.PullSample() 135 + if sample == nil { 136 + return gst.FlowOK 137 + } 138 + 139 + return gst.FlowOK 140 + }, 141 + }) 142 + 143 + videoSinkElem, err := pipeline.GetElementByName("videoappsink") 144 + if err != nil { 145 + return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 146 + } 147 + videoSink := app.SinkFromElement(videoSinkElem) 148 + if videoSink == nil { 149 + return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 150 + } 151 + 152 + hasBFrames := false 153 + videoSink.SetCallbacks(&app.SinkCallbacks{ 154 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 155 + sample := sink.PullSample() 156 + if sample == nil { 157 + return gst.FlowOK 158 + } 159 + 160 + buf := sample.GetBuffer() 161 + pts := buf.PresentationTimestamp().String() 162 + dts := buf.DecodingTimestamp().String() 163 + 164 + if pts != dts { 165 + hasBFrames = true 166 + } 167 + 168 + return gst.FlowOK 169 + }, 170 + }) 171 + 121 172 go func() { 122 173 if err := HandleBusMessages(ctx, pipeline); err != nil { 123 174 log.Log(ctx, "pipeline error", "error", err) ··· 144 195 if audioMetadata == nil { 145 196 return nil, fmt.Errorf("no audio metadata") 146 197 } 198 + 199 + videoMetadata.BFrames = hasBFrames 147 200 148 201 meta := &model.SegmentMediaData{ 149 202 Video: []*model.SegmentMediadataVideo{videoMetadata},
+19
pkg/media/media_data_parser_test.go
··· 8 8 9 9 "github.com/stretchr/testify/require" 10 10 "stream.place/streamplace/pkg/log" 11 + "stream.place/streamplace/test/remote" 11 12 ) 12 13 13 14 func TestMediaDataParser(t *testing.T) { ··· 23 24 mediaData, err := ParseSegmentMediaData(ctx, bs) 24 25 require.NoError(t, err) 25 26 require.NotNil(t, mediaData) 27 + require.False(t, mediaData.Video[0].BFrames, "Video should not have BFrames") 28 + require.Greater(t, mediaData.Duration, int64(0), "Video duration should not be empty") 29 + }) 30 + } 31 + 32 + func TestMediaDataParserBFrames(t *testing.T) { 33 + withNoGSTLeaks(t, func() { 34 + inputFile, err := os.Open(remote.RemoteFixture("5ea6c4491bade0cdcad3770aa0b63b2cd7a580e233ee320d5bc2282503b26491/segment-with-bframes.mp4")) 35 + require.NoError(t, err) 36 + defer inputFile.Close() 37 + bs, err := io.ReadAll(inputFile) 38 + require.NoError(t, err) 39 + 40 + ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"GStreamerFunc": {"ParseSegmentMediaData": 9}}) 41 + mediaData, err := ParseSegmentMediaData(ctx, bs) 42 + require.NoError(t, err) 43 + require.NotNil(t, mediaData) 44 + require.True(t, mediaData.Video[0].BFrames, "Video should have BFrames") 26 45 require.Greater(t, mediaData.Duration, int64(0), "Video duration should not be empty") 27 46 }) 28 47 }
+6 -4
pkg/model/segment.go
··· 15 15 ) 16 16 17 17 type SegmentMediadataVideo struct { 18 - Width int `json:"width"` 19 - Height int `json:"height"` 20 - FPSNum int `json:"fpsNum"` 21 - FPSDen int `json:"fpsDen"` 18 + Width int `json:"width"` 19 + Height int `json:"height"` 20 + FPSNum int `json:"fpsNum"` 21 + FPSDen int `json:"fpsDen"` 22 + BFrames bool `json:"bframes"` 22 23 } 23 24 24 25 type SegmentMediadataAudio struct { ··· 89 90 Num: int64(s.MediaData.Video[0].FPSNum), 90 91 Den: int64(s.MediaData.Video[0].FPSDen), 91 92 }, 93 + Bframes: &s.MediaData.Video[0].BFrames, 92 94 }, 93 95 }, 94 96 Audio: []*streamplace.Segment_Audio{
+63 -1
pkg/streamplace/cbor_gen.go
··· 1224 1224 } 1225 1225 1226 1226 cw := cbg.NewCborWriter(w) 1227 - fieldCount := 4 1227 + fieldCount := 5 1228 + 1229 + if t.Bframes == nil { 1230 + fieldCount-- 1231 + } 1228 1232 1229 1233 if t.Framerate == nil { 1230 1234 fieldCount-- ··· 1298 1302 } else { 1299 1303 if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Height-1)); err != nil { 1300 1304 return err 1305 + } 1306 + } 1307 + 1308 + // t.Bframes (bool) (bool) 1309 + if t.Bframes != nil { 1310 + 1311 + if len("bframes") > 1000000 { 1312 + return xerrors.Errorf("Value in field \"bframes\" was too long") 1313 + } 1314 + 1315 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("bframes"))); err != nil { 1316 + return err 1317 + } 1318 + if _, err := cw.WriteString(string("bframes")); err != nil { 1319 + return err 1320 + } 1321 + 1322 + if t.Bframes == nil { 1323 + if _, err := cw.Write(cbg.CborNull); err != nil { 1324 + return err 1325 + } 1326 + } else { 1327 + if err := cbg.WriteBool(w, *t.Bframes); err != nil { 1328 + return err 1329 + } 1301 1330 } 1302 1331 } 1303 1332 ··· 1425 1454 } 1426 1455 1427 1456 t.Height = int64(extraI) 1457 + } 1458 + // t.Bframes (bool) (bool) 1459 + case "bframes": 1460 + 1461 + { 1462 + b, err := cr.ReadByte() 1463 + if err != nil { 1464 + return err 1465 + } 1466 + if b != cbg.CborNull[0] { 1467 + if err := cr.UnreadByte(); err != nil { 1468 + return err 1469 + } 1470 + 1471 + maj, extra, err = cr.ReadHeader() 1472 + if err != nil { 1473 + return err 1474 + } 1475 + if maj != cbg.MajOther { 1476 + return fmt.Errorf("booleans must be major type 7") 1477 + } 1478 + 1479 + var val bool 1480 + switch extra { 1481 + case 20: 1482 + val = false 1483 + case 21: 1484 + val = true 1485 + default: 1486 + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) 1487 + } 1488 + t.Bframes = &val 1489 + } 1428 1490 } 1429 1491 // t.Framerate (streamplace.Segment_Framerate) (struct) 1430 1492 case "framerate":
+1
pkg/streamplace/streamsegment.go
··· 48 48 49 49 // Segment_Video is a "video" in the place.stream.segment schema. 50 50 type Segment_Video struct { 51 + Bframes *bool `json:"bframes,omitempty" cborgen:"bframes,omitempty"` 51 52 Codec string `json:"codec" cborgen:"codec"` 52 53 Framerate *Segment_Framerate `json:"framerate,omitempty" cborgen:"framerate,omitempty"` 53 54 Height int64 `json:"height" cborgen:"height"`