Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/rtmprec 122 lines 2.9 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "strings" 8 "time" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" 12 "stream.place/streamplace/pkg/aqtime" 13 "stream.place/streamplace/pkg/log" 14) 15 16// ingest a H264+AAC MKV stream (prolly from an RTMP server) 17func (mm *MediaManager) MKVIngest(ctx context.Context, input io.Reader, ms MediaSigner) error { 18 shouldRecord, err := mm.shouldRecord(ctx, ms.Streamer()) 19 if err != nil { 20 return err 21 } 22 if shouldRecord { 23 log.Log(ctx, "recording RTMP stream to file", "streamer", ms.Streamer()) 24 pr, pw := io.Pipe() 25 input = io.TeeReader(input, pw) 26 go func() { 27 err := mm.dumpToFile(ctx, pr, ms.Streamer(), ".rtmp.mkv") 28 if err != nil { 29 log.Error(ctx, "error dumping to file", "error", err) 30 } 31 }() 32 } else { 33 log.Log(ctx, "not recording RTMP stream to file", "streamer", ms.Streamer()) 34 } 35 ctx, cancel := context.WithCancel(ctx) 36 defer cancel() 37 pipelineSlice := []string{ 38 "appsrc name=streamsrc ! matroskademux name=demux", 39 "demux. ! queue ! h264parse name=parse", 40 "demux. ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc", 41 } 42 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 43 if err != nil { 44 return fmt.Errorf("error creating MKVIngest pipeline: %w", err) 45 } 46 47 srcele, err := pipeline.GetElementByName("streamsrc") 48 if err != nil { 49 return err 50 } 51 // defer runtime.KeepAlive(srcele) 52 src := app.SrcFromElement(srcele) 53 src.SetCallbacks(&app.SourceCallbacks{ 54 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 55 }) 56 parseEle, err := pipeline.GetElementByName("parse") 57 if err != nil { 58 return err 59 } 60 61 signer, err := mm.SegmentAndSignElem(ctx, ms) 62 if err != nil { 63 return err 64 } 65 66 err = pipeline.Add(signer) 67 if err != nil { 68 return err 69 } 70 err = parseEle.Link(signer) 71 if err != nil { 72 return err 73 } 74 audioenc, err := pipeline.GetElementByName("audioenc") 75 if err != nil { 76 return err 77 } 78 err = audioenc.Link(signer) 79 if err != nil { 80 return err 81 } 82 83 busErr := make(chan error) 84 go func() { 85 err := HandleBusMessages(ctx, pipeline) 86 cancel() 87 busErr <- err 88 }() 89 90 go mm.HandleKeyRevocation(ctx, ms, pipeline) 91 92 err = pipeline.SetState(gst.StatePlaying) 93 if err != nil { 94 return err 95 } 96 97 defer func() { 98 err := pipeline.SetState(gst.StateNull) 99 if err != nil { 100 log.Error(ctx, "error setting pipeline to null state", "error", err) 101 } 102 }() 103 104 <-busErr 105 106 return nil 107} 108 109func (mm *MediaManager) dumpToFile(ctx context.Context, r io.Reader, user string, filesuffix string) error { 110 now := aqtime.FromTime(time.Now()) 111 filename := fmt.Sprintf("%s%s", now.FileSafeString(), filesuffix) 112 f, err := mm.cli.DataFileCreate([]string{"debug-recordings", user, filename}, false) 113 if err != nil { 114 return fmt.Errorf("failed to create data file: %w", err) 115 } 116 defer f.Close() 117 _, err = io.Copy(f, r) 118 if err != nil { 119 return fmt.Errorf("failed to copy to file: %w", err) 120 } 121 return nil 122}