Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

media: clean up MKVIngest

Eli Mallon c754ac5c 5065eeb2

+34 -11
+19 -6
pkg/api/api_internal.go
··· 56 56 func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 57 57 router := httprouter.New() 58 58 broker := misttriggers.NewTriggerBroker() 59 - broker.OnPushOutStart(func(ctx context.Context, payload *misttriggers.PushOutStartPayload) (string, error) { 60 - return payload.URL, nil 61 - }) 59 + 62 60 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 63 61 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 62 + // Extract the last part of the URL path 63 + urlPath := payload.URL.Path 64 + parts := strings.Split(urlPath, "/") 65 + lastPart := "" 66 + if len(parts) > 0 { 67 + lastPart = parts[len(parts)-1] 68 + } 69 + mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 70 + if err != nil { 71 + return "", err 72 + } 64 73 65 74 ms := time.Now().UnixMilli() 66 - out := fmt.Sprintf("%s+%s_%d", mistconfig.STREAM_NAME, payload.StreamName, ms) 75 + out := fmt.Sprintf("%s+%s_%d", mistconfig.STREAM_NAME, mediaSigner.Streamer(), ms) 76 + a.SignerCacheMu.Lock() 77 + a.SignerCache[mediaSigner.Streamer()] = mediaSigner 78 + a.SignerCacheMu.Unlock() 79 + log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 67 80 68 81 return out, nil 69 82 }) ··· 276 289 } 277 290 278 291 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 279 - router.POST("/stream/:key", handleIncomingStream) 280 - router.PUT("/stream/:key", handleIncomingStream) 292 + router.POST("/live/:key", handleIncomingStream) 293 + router.PUT("/live/:key", handleIncomingStream) 281 294 282 295 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 283 296 id := p.ByName("id")
+15 -5
pkg/media/mkv_ingest.go
··· 8 8 9 9 "github.com/go-gst/go-gst/gst" 10 10 "github.com/go-gst/go-gst/gst/app" 11 + "stream.place/streamplace/pkg/log" 11 12 ) 12 13 13 14 // ingest a H264+AAC MKV stream (prolly from an RTMP server) ··· 17 18 pipelineSlice := []string{ 18 19 "appsrc name=streamsrc ! matroskademux name=demux", 19 20 "demux. ! queue ! h264parse name=parse", 20 - "demux. ! queue ! fdkaacdec ! audioresample ! opusenc name=audioparse", 21 + "demux. ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc", 21 22 } 22 23 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 23 24 if err != nil { ··· 51 52 if err != nil { 52 53 return err 53 54 } 54 - audioparse, err := pipeline.GetElementByName("audioparse") 55 + audioenc, err := pipeline.GetElementByName("audioenc") 55 56 if err != nil { 56 57 return err 57 58 } 58 - err = audioparse.Link(signer) 59 + err = audioenc.Link(signer) 59 60 if err != nil { 60 61 return err 61 62 } 62 63 64 + busErr := make(chan error) 63 65 go func() { 64 - HandleBusMessages(ctx, pipeline) 66 + err := HandleBusMessages(ctx, pipeline) 65 67 cancel() 68 + busErr <- err 66 69 }() 67 70 68 71 err = pipeline.SetState(gst.StatePlaying) ··· 70 73 return err 71 74 } 72 75 73 - <-ctx.Done() 76 + defer func() { 77 + err := pipeline.SetState(gst.StateNull) 78 + if err != nil { 79 + log.Error(ctx, "error setting pipeline to null state", "error", err) 80 + } 81 + }() 82 + 83 + <-busErr 74 84 75 85 return nil 76 86 }