Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/routing-cleanup 121 lines 2.9 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "strings" 8 "time" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/log" 14) 15 16// ingest a H264+AAC MKV stream (prolly from an RTMP server) 17func (mm *MediaManager) MKVIngest(ctx context.Context, input io.Reader, ms MediaSigner) error { 18 shouldRecord, err := mm.shouldRecord(ctx, ms.Streamer()) 19 if err != nil { 20 return err 21 } 22 if shouldRecord { 23 log.Log(ctx, "recording RTMP stream to file", "streamer", ms.Streamer()) 24 pr, pw := io.Pipe() 25 input = io.TeeReader(input, pw) 26 go func() { 27 err := mm.dumpToFile(ctx, pr, ms.Streamer(), ".rtmp.mkv") 28 if err != nil { 29 log.Error(ctx, "error dumping to file", "error", err) 30 } 31 }() 32 } else { 33 log.Log(ctx, "not recording RTMP stream to file", "streamer", ms.Streamer()) 34 } 35 ctx, cancel := context.WithCancel(ctx) 36 defer cancel() 37 pipelineSlice := []string{ 38 "appsrc name=streamsrc ! matroskademux name=demux", 39 "demux. ! queue ! h264parse name=parse", 40 "demux. ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc", 41 } 42 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 43 if err != nil { 44 return fmt.Errorf("error creating MKVIngest pipeline: %w", err) 45 } 46 47 srcele, err := pipeline.GetElementByName("streamsrc") 48 if err != nil { 49 return err 50 } 51 // defer runtime.KeepAlive(srcele) 52 src := app.SrcFromElement(srcele) 53 src.SetCallbacks(&app.SourceCallbacks{ 54 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 55 }) 56 parseEle, err := pipeline.GetElementByName("parse") 57 if err != nil { 58 return err 59 } 60 61 signer, err := mm.SegmentAndSignElem(ctx, ms) 62 if err != nil { 63 return err 64 } 65 66 err = pipeline.Add(signer) 67 if err != nil { 68 return err 69 } 70 err = parseEle.Link(signer) 71 if err != nil { 72 return err 73 } 74 audioenc, err := pipeline.GetElementByName("audioenc") 75 if err != nil { 76 return err 77 } 78 err = audioenc.Link(signer) 79 if err != nil { 80 return err 81 } 82 83 busErr := make(chan error) 84 go func() { 85 err := HandleBusMessages(ctx, pipeline) 86 busErr <- err 87 }() 88 89 go mm.HandleKeyRevocation(ctx, ms, pipeline) 90 91 err = pipeline.SetState(gst.StatePlaying) 92 if err != nil { 93 return err 94 } 95 96 defer func() { 97 err := pipeline.SetState(gst.StateNull) 98 if err != nil { 99 log.Error(ctx, "error setting pipeline to null state", "error", err) 100 } 101 }() 102 103 err = <-busErr 104 105 return err 106} 107 108func (mm *MediaManager) dumpToFile(ctx context.Context, r io.Reader, user string, filesuffix string) error { 109 now := aqtime.FromTime(time.Now()) 110 filename := fmt.Sprintf("%s%s", now.FileSafeString(), filesuffix) 111 f, err := mm.cli.DataFileCreate([]string{"debug-recordings", user, filename}, false) 112 if err != nil { 113 return fmt.Errorf("failed to create data file: %w", err) 114 } 115 defer f.Close() 116 _, err = io.Copy(f, r) 117 if err != nil { 118 return fmt.Errorf("failed to copy to file: %w", err) 119 } 120 return nil 121}