Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

rtmp: working pull-based embedded rtmp server

+370 -118
+3 -1
Makefile
··· 170 170 -D "gst-plugins-good:multifile=enabled" \ 171 171 -D "gst-plugins-good:rtp=enabled" \ 172 172 -D "gst-plugins-bad:fdkaac=enabled" \ 173 + -D "gst-plugins-bad:rtmp2=enabled" \ 173 174 -D "gst-plugins-good:audioparsers=enabled" \ 174 175 -D "gst-plugins-good:isomp4=enabled" \ 175 176 -D "gst-plugins-good:png=enabled" \ 176 177 -D "gst-plugins-good:videobox=enabled" \ 177 178 -D "gst-plugins-good:jpeg=enabled" \ 178 179 -D "gst-plugins-good:audioparsers=enabled" \ 180 + -D "gst-plugins-good:flv=enabled" \ 179 181 -D "gst-plugins-bad:videoparsers=enabled" \ 180 182 -D "gst-plugins-bad:mpegtsmux=enabled" \ 181 183 -D "gst-plugins-bad:mpegtsdemux=enabled" \ ··· 185 187 -D "gst-plugins-ugly:gpl=enabled" \ 186 188 -D "x264:asm=enabled" \ 187 189 -D "gstreamer-full:gst-full=enabled" \ 188 - -D "gstreamer-full:gst-full-plugins=libgstopusparse.a;libgstcodectimestamper.a;libgstrtp.a;libgstaudioresample.a;libgstlibav.a;libgstmatroska.a;libgstmultifile.a;libgstjpeg.a;libgstaudiotestsrc.a;libgstaudioconvert.a;libgstaudioparsers.a;libgstfdkaac.a;libgstisomp4.a;libgstapp.a;libgstvideoconvertscale.a;libgstvideobox.a;libgstvideorate.a;libgstpng.a;libgstcompositor.a;libgstaudiorate.a;libgstx264.a;libgstopus.a;libgstvideotestsrc.a;libgstvideoparsersbad.a;libgstaudioparsers.a;libgstmpegtsmux.a;libgstmpegtsdemux.a;libgstplayback.a;libgsttypefindfunctions.a;libgstcoretracers.a;libgstcodec2json.a" \ 190 + -D "gstreamer-full:gst-full-plugins=libgstflv.a;libgstrtmp2.a;libgstopusparse.a;libgstcodectimestamper.a;libgstrtp.a;libgstaudioresample.a;libgstlibav.a;libgstmatroska.a;libgstmultifile.a;libgstjpeg.a;libgstaudiotestsrc.a;libgstaudioconvert.a;libgstaudioparsers.a;libgstfdkaac.a;libgstisomp4.a;libgstapp.a;libgstvideoconvertscale.a;libgstvideobox.a;libgstvideorate.a;libgstpng.a;libgstcompositor.a;libgstaudiorate.a;libgstx264.a;libgstopus.a;libgstvideotestsrc.a;libgstvideoparsersbad.a;libgstaudioparsers.a;libgstmpegtsmux.a;libgstmpegtsdemux.a;libgstplayback.a;libgsttypefindfunctions.a;libgstcoretracers.a;libgstcodec2json.a" \ 189 191 -D "gstreamer-full:gst-full-libraries=gstreamer-controller-1.0,gstreamer-plugins-base-1.0,gstreamer-pbutils-1.0" \ 190 192 -D "gstreamer-full:gst-full-elements=coreelements:concat,filesrc,filesink,queue,queue2,multiqueue,typefind,tee,capsfilter,fakesink,identity" \ 191 193 -D "gstreamer-full:bad=enabled" \
+1 -1
go.mod
··· 19 19 github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d 20 20 github.com/bluenviron/gortmplib v0.1.2 21 21 github.com/bluenviron/gortsplib/v5 v5.2.1 22 + github.com/bluenviron/mediacommon/v2 v2.5.2 22 23 github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635 23 24 github.com/cenkalti/backoff v2.2.1+incompatible 24 25 github.com/cenkalti/backoff/v5 v5.0.2 ··· 162 163 github.com/bkielbasa/cyclop v1.2.3 // indirect 163 164 github.com/blizzy78/varnamelen v0.8.0 // indirect 164 165 github.com/bluenviron/gortsplib/v4 v4.12.3 // indirect 165 - github.com/bluenviron/mediacommon/v2 v2.5.2 // indirect 166 166 github.com/bombsimon/wsl/v4 v4.7.0 // indirect 167 167 github.com/breml/bidichk v0.3.3 // indirect 168 168 github.com/breml/errchkjson v0.4.1 // indirect
+6
pkg/api/api.go
··· 79 79 HTTPRedirectTLSPort *int 80 80 sessions map[string]map[string]time.Time 81 81 sessionsLock sync.RWMutex 82 + 83 + rtmpSessions map[string]*media.RTMPSession 84 + rtmpSessionsLock sync.Mutex 85 + rtmpInternalPlaybackAddr string 82 86 } 83 87 84 88 type WebsocketTracker struct { ··· 109 113 op: op, 110 114 sessions: make(map[string]map[string]time.Time), 111 115 sessionsLock: sync.RWMutex{}, 116 + rtmpSessions: make(map[string]*media.RTMPSession), 117 + rtmpSessionsLock: sync.Mutex{}, 112 118 } 113 119 a.Mimes, err = updater.GetMimes() 114 120 if err != nil {
+179 -25
pkg/api/rtmp_server.go
··· 27 27 // readers []*gortmplib.Writer 28 28 // ) 29 29 30 + var RTMPTimeout = 10 * time.Second 31 + 30 32 const RTMPPrefix = "/live/" 31 33 32 34 func (a *StreamplaceAPI) HandleRTMPPublisher(ctx context.Context, sc *gortmplib.ServerConn) error { 33 - sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(10 * time.Second)) 35 + err := sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(RTMPTimeout)) 36 + if err != nil { 37 + return err 38 + } 34 39 35 40 if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) { 36 41 return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix) ··· 41 46 return fmt.Errorf("failed to make media signer: %w", err) 42 47 } 43 48 44 - ctx = log.WithLogValues(ctx, "streamer", mediaSigner.Streamer()) 49 + streamer := mediaSigner.Streamer() 50 + ctx = log.WithLogValues(ctx, "streamer", streamer) 51 + session := &media.RTMPSession{ 52 + EventChan: make(chan any, 1024), 53 + MediaSigner: mediaSigner, 54 + } 55 + a.rtmpSessionsLock.Lock() 56 + a.rtmpSessions[streamer] = session 57 + a.rtmpSessionsLock.Unlock() 45 58 46 - videoInput := make(chan *media.RTMPH264Data, 1024) 47 - defer close(videoInput) 48 - audioInput := make(chan *media.RTMPAACData, 1024) 49 - defer close(audioInput) 59 + defer func() { 60 + a.rtmpSessionsLock.Lock() 61 + delete(a.rtmpSessions, streamer) 62 + a.rtmpSessionsLock.Unlock() 63 + close(session.EventChan) 64 + }() 65 + 66 + // videoInput := make(chan *media.RTMPH264Data, 1024) 67 + // defer close(videoInput) 68 + // audioInput := make(chan *media.RTMPAACData, 1024) 69 + // defer close(audioInput) 50 70 51 71 r := &gortmplib.Reader{ 52 72 Conn: sc, ··· 61 81 62 82 switch track := track.(type) { 63 83 case *format.H264: 84 + session.VideoTrack = track 64 85 r.OnDataH264(track, func(pts time.Duration, dts time.Duration, au [][]byte) { 65 - log.Log(ctx, "got H264", "len", len(au), "pts", pts, "dts", dts) 66 - videoInput <- &media.RTMPH264Data{ 86 + // log.Log(ctx, "got H264", "len", len(au), "pts", pts, "dts", dts) 87 + session.EventChan <- &media.RTMPH264Data{ 67 88 AU: au, 68 89 PTS: pts, 90 + DTS: dts, 69 91 } 70 92 }) 71 93 72 94 case *format.MPEG4Audio: 95 + session.AudioTrack = track 73 96 r.OnDataMPEG4Audio(track, func(pts time.Duration, au []byte) { 74 - log.Log(ctx, "got MPEG4Au", "len", len(au), "pts", pts) 75 - audioInput <- &media.RTMPAACData{ 97 + // log.Log(ctx, "got MPEG4Au", "len", len(au), "pts", pts) 98 + session.EventChan <- &media.RTMPAACData{ 76 99 AU: au, 77 100 PTS: pts, 78 101 } ··· 89 112 if ctx.Err() != nil { 90 113 return ctx.Err() 91 114 } 92 - sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(10 * time.Second)) 115 + err = sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(RTMPTimeout)) 116 + if err != nil { 117 + return err 118 + } 93 119 err = r.Read() 94 120 if err != nil { 95 121 return err ··· 98 124 }) 99 125 100 126 g.Go(func() error { 101 - return a.MediaManager.RTMPIngest(ctx, videoInput, audioInput, mediaSigner) 127 + return a.MediaManager.RTMPIngest(ctx, fmt.Sprintf("rtmp://%s/live/%s", a.rtmpInternalPlaybackAddr, streamer), mediaSigner) 102 128 }) 103 129 104 130 return g.Wait() 105 131 } 106 132 107 - func (a *StreamplaceAPI) HandleRTMPConnInner(ctx context.Context, conn net.Conn) error { 108 - conn.SetReadDeadline(time.Now().Add(10 * time.Second)) 133 + func (a *StreamplaceAPI) HandleRTMPPlayback(ctx context.Context, sc *gortmplib.ServerConn) error { 134 + if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) { 135 + return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix) 136 + } 137 + streamer := strings.TrimPrefix(sc.URL.Path, RTMPPrefix) 138 + a.rtmpSessionsLock.Lock() 139 + session, ok := a.rtmpSessions[streamer] 140 + a.rtmpSessionsLock.Unlock() 141 + if !ok { 142 + return fmt.Errorf("RTMP session not found for streamer %s", streamer) 143 + } 144 + 145 + w := &gortmplib.Writer{ 146 + Conn: sc, 147 + Tracks: []format.Format{session.VideoTrack, session.AudioTrack}, 148 + } 149 + err := w.Initialize() 150 + if err != nil { 151 + return err 152 + } 153 + for { 154 + select { 155 + case <-ctx.Done(): 156 + return ctx.Err() 157 + case event := <-session.EventChan: 158 + if event == nil { 159 + return fmt.Errorf("RTMP session closed") 160 + } 161 + switch event := event.(type) { 162 + case *media.RTMPH264Data: 163 + err := w.WriteH264(session.VideoTrack, event.PTS, event.DTS, event.AU) 164 + if err != nil { 165 + return fmt.Errorf("error writing H264: %w", err) 166 + } 167 + case *media.RTMPAACData: 168 + err := w.WriteMPEG4Audio(session.AudioTrack, event.PTS, event.AU) 169 + if err != nil { 170 + return fmt.Errorf("error writing MPEG4Audio: %w", err) 171 + } 172 + default: 173 + return fmt.Errorf("unsupported event type: %T", event) 174 + } 175 + } 176 + } 177 + } 178 + 179 + func (a *StreamplaceAPI) HandleRTMPPublishConn(ctx context.Context, conn net.Conn) error { 180 + err := conn.SetReadDeadline(time.Now().Add(RTMPTimeout)) 181 + if err != nil { 182 + return err 183 + } 109 184 110 185 sc := &gortmplib.ServerConn{ 111 186 RW: conn, 112 187 } 113 - err := sc.Initialize() 188 + err = sc.Initialize() 114 189 if err != nil { 115 190 return err 116 191 } ··· 123 198 if sc.Publish { 124 199 return a.HandleRTMPPublisher(ctx, sc) 125 200 } 126 - return fmt.Errorf("RTMP playback is not supported") 201 + return fmt.Errorf("RTMP playback is not allowed") 127 202 } 128 203 129 - func (a *StreamplaceAPI) HandleRTMPConn(ctx context.Context, conn net.Conn) { 130 - defer conn.Close() 204 + func (a *StreamplaceAPI) HandleRTMPPlaybackConn(ctx context.Context, conn net.Conn) error { 205 + err := conn.SetReadDeadline(time.Now().Add(RTMPTimeout)) 206 + if err != nil { 207 + return err 208 + } 131 209 132 - log.Log(ctx, "connection opened", "remoteAddr", conn.RemoteAddr()) 133 - err := a.HandleRTMPConnInner(ctx, conn) 134 - log.Log(ctx, "connection closed", "remoteAddr", conn.RemoteAddr(), "error", err) 210 + sc := &gortmplib.ServerConn{ 211 + RW: conn, 212 + } 213 + err = sc.Initialize() 214 + if err != nil { 215 + return err 216 + } 217 + 218 + err = sc.Accept() 219 + if err != nil { 220 + return err 221 + } 222 + 223 + if !sc.Publish { 224 + return a.HandleRTMPPlayback(ctx, sc) 225 + } 226 + return fmt.Errorf("RTMP playback is not allowed") 135 227 } 136 228 137 - func (a *StreamplaceAPI) StartRTMPServer(ctx context.Context) error { 138 - ln, err := net.Listen("tcp", ":1935") 229 + func (a *StreamplaceAPI) ServeRTMP(ctx context.Context) error { 230 + ln, err := net.Listen("tcp", a.CLI.RTMPAddr) 139 231 if err != nil { 140 232 return fmt.Errorf("failed to listen: %w", err) 141 233 } 142 234 defer ln.Close() 143 235 144 - log.Log(ctx, "listening on :1935") 236 + log.Log(ctx, "rtmp server starting", "addr", a.CLI.RTMPAddr) 237 + 238 + g, ctx := errgroup.WithContext(ctx) 239 + g.Go(func() error { 240 + return a.ServeRTMPInternalPlayback(ctx) 241 + }) 242 + g.Go(func() error { 243 + for { 244 + if ctx.Err() != nil { 245 + return ctx.Err() 246 + } 247 + conn, err := ln.Accept() 248 + if err != nil { 249 + return fmt.Errorf("error accepting RTMP connection: %w", err) 250 + } 251 + go func() { 252 + err := a.HandleRTMPPublishConn(ctx, conn) 253 + if err != nil { 254 + log.Error(ctx, "error handling RTMP publish connection", "error", err) 255 + } 256 + }() 257 + } 258 + }) 259 + 260 + <-ctx.Done() 261 + 262 + err = ln.Close() 263 + if err != nil { 264 + return fmt.Errorf("failed to close RTMP listener: %w", err) 265 + } 266 + 267 + return g.Wait() 268 + } 269 + 270 + // Serve RTMP internal playback server for gstreamer to pull from 271 + func (a *StreamplaceAPI) ServeRTMPInternalPlayback(ctx context.Context) error { 272 + ln, err := net.Listen("tcp", "127.0.0.1:0") 273 + if err != nil { 274 + return fmt.Errorf("failed to listen: %w", err) 275 + } 276 + addr := ln.Addr().String() 277 + defer ln.Close() 278 + 279 + _, port, err := net.SplitHostPort(addr) 280 + if err != nil { 281 + return fmt.Errorf("failed to split host and port: %w", err) 282 + } 283 + 284 + a.rtmpInternalPlaybackAddr = fmt.Sprintf("127.0.0.1:%s", port) 285 + 286 + log.Log(ctx, "rtmp internal playback server starting", "addr", a.rtmpInternalPlaybackAddr) 145 287 146 288 // Accept loop in a goroutine so we can select on context.Done 147 289 go func() { 148 290 for { 291 + if ctx.Err() != nil { 292 + return 293 + } 149 294 conn, err := ln.Accept() 150 295 if err != nil { 296 + if ctx.Err() != nil { 297 + return 298 + } 151 299 log.Error(ctx, "error accepting RTMP connection", "error", err) 300 + continue 152 301 } 153 302 154 - go a.HandleRTMPConn(ctx, conn) 303 + go func() { 304 + err := a.HandleRTMPPlaybackConn(ctx, conn) 305 + if err != nil { 306 + log.Error(ctx, "error handling RTMP internal playback connection", "error", err) 307 + } 308 + }() 155 309 } 156 310 }() 157 311
+4 -5
pkg/cmd/streamplace.go
··· 411 411 }) 412 412 if cli.RTMPServerAddon != "" { 413 413 group.Go(func() error { 414 - return rtmps.ServeRTMPS(ctx, &cli) 414 + return rtmps.ServeRTMPSAddon(ctx, &cli) 415 415 }) 416 416 } 417 417 } else { 418 418 group.Go(func() error { 419 419 return a.ServeHTTP(ctx) 420 + }) 421 + group.Go(func() error { 422 + return a.ServeRTMP(ctx) 420 423 }) 421 424 } 422 425 ··· 445 448 446 449 group.Go(func() error { 447 450 return mod.StartSegmentCleaner(ctx) 448 - }) 449 - 450 - group.Go(func() error { 451 - return a.StartRTMPServer(ctx) 452 451 }) 453 452 454 453 group.Go(func() error {
+4 -2
pkg/config/config.go
··· 65 65 HTTPAddr string 66 66 HTTPInternalAddr string 67 67 HTTPSAddr string 68 - RtmpsAddr string 68 + RTMPAddr string 69 + RTMPSAddr string 69 70 Secure bool 70 71 NoMist bool 71 72 MistAdminPort int ··· 208 209 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") 209 210 fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip") 210 211 fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to") 211 - fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections") 212 + fs.StringVar(&cli.RTMPSAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections (when --secure=true)") 213 + fs.StringVar(&cli.RTMPAddr, "rtmp-addr", ":1935", "address to listen for RTMP connections (when --secure=false)") 212 214 cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to") 213 215 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 214 216 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
+168 -80
pkg/media/rtmp_ingest.go
··· 6 6 "strings" 7 7 "time" 8 8 9 - "github.com/bluenviron/mediacommon/v2/pkg/codecs/h264" 9 + "github.com/bluenviron/gortsplib/v5/pkg/format" 10 10 "github.com/go-gst/go-gst/gst" 11 - "github.com/go-gst/go-gst/gst/app" 12 11 "stream.place/streamplace/pkg/log" 13 12 ) 14 13 15 14 type RTMPH264Data struct { 16 15 AU [][]byte 17 16 PTS time.Duration 17 + DTS time.Duration 18 18 } 19 19 20 20 type RTMPAACData struct { ··· 22 22 PTS time.Duration 23 23 } 24 24 25 - // ingest a H264+AAC RTMP stream 26 - func (mm *MediaManager) RTMPIngest(ctx context.Context, videoInput chan *RTMPH264Data, audioInput chan *RTMPAACData, ms MediaSigner) error { 25 + type RTMPSession struct { 26 + EventChan chan any 27 + VideoTrack *format.H264 28 + AudioTrack *format.MPEG4Audio 29 + MediaSigner MediaSigner 30 + } 31 + 32 + func (mm *MediaManager) RTMPIngest(ctx context.Context, rtmpURL string, ms MediaSigner) error { 27 33 ctx, cancel := context.WithCancel(ctx) 28 34 defer cancel() 29 35 pipelineSlice := []string{ 30 - "appsrc name=videosrc ! queue ! h264parse name=parse", 31 - "appsrc name=audiosrc ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc", 36 + fmt.Sprintf("rtmp2src location=%s ! flvdemux name=demux", rtmpURL), 37 + "demux.audio ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc", 38 + "demux.video ! queue ! h264parse name=parse", 32 39 } 33 40 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 34 41 if err != nil { 35 42 return fmt.Errorf("error creating RTMPIngest pipeline: %w", err) 36 43 } 37 44 38 - videosrcEle, err := pipeline.GetElementByName("videosrc") 39 - if err != nil { 40 - return err 41 - } 42 - // defer runtime.KeepAlive(srcele) 43 - videosrc := app.SrcFromElement(videosrcEle) 44 - videosrc.SetCaps(gst.NewCapsFromString("video/x-h264,stream-format=byte-stream")) 45 - videosrc.SetCallbacks(&app.SourceCallbacks{ 46 - NeedDataFunc: func(self *app.Source, length uint) { 47 - if ctx.Err() != nil { 48 - self.EndStream() 49 - return 50 - } 51 - 52 - packet := <-videoInput 53 - if packet == nil { 54 - log.Debug(ctx, "video input closed, ending stream") 55 - self.EndStream() 56 - return 57 - } 58 - 59 - // allBytes := bytes.Buffer{} 60 - // for _, au := range packet.AU { 61 - // allBytes.Write(au) 62 - // } 63 - 64 - avcc, err := h264.AnnexB(packet.AU).Marshal() 65 - if err != nil { 66 - log.Error(ctx, "failed to marshal AVCC", "error", err) 67 - self.Error("failed to marshal AVCC", fmt.Errorf("failed to marshal AVCC: %w", err)) 68 - return 69 - } 70 - 71 - buf := gst.NewBufferFromBytes(avcc) 72 - buf.SetPresentationTimestamp(gst.ClockTime(uint64(packet.PTS.Nanoseconds()))) 73 - ret := self.PushBuffer(buf) 74 - if ret != gst.FlowOK { 75 - log.Error(ctx, "failed to push video buffer", "error", ret.String()) 76 - self.Error("failed to push video buffer", fmt.Errorf("failed to push video buffer: %s", ret.String())) 77 - return 78 - } 79 - }, 80 - }) 81 - 82 - audiosrcEle, err := pipeline.GetElementByName("videosrc") 45 + signer, err := mm.SegmentAndSignElem(ctx, ms) 83 46 if err != nil { 84 47 return err 85 48 } 86 - // defer runtime.KeepAlive(srcele) 87 - audiosrc := app.SrcFromElement(audiosrcEle) 88 - audiosrc.SetCallbacks(&app.SourceCallbacks{ 89 - NeedDataFunc: func(self *app.Source, length uint) { 90 - if ctx.Err() != nil { 91 - self.EndStream() 92 - return 93 - } 94 - packet := <-audioInput 95 - if packet == nil { 96 - log.Debug(ctx, "audio input closed, ending stream") 97 - self.EndStream() 98 - return 99 - } 100 - buf := gst.NewBufferFromBytes(packet.AU) 101 - buf.SetPresentationTimestamp(gst.ClockTime(uint64(packet.PTS.Nanoseconds()))) 102 - ret := self.PushBuffer(buf) 103 - if ret != gst.FlowOK { 104 - log.Error(ctx, "failed to push audio buffer", "error", ret.String()) 105 - self.Error("failed to push audio buffer", fmt.Errorf("failed to push audio buffer: %s", ret.String())) 106 - return 107 - } 108 - }, 109 - }) 110 49 111 50 parseEle, err := pipeline.GetElementByName("parse") 112 - if err != nil { 113 - return err 114 - } 115 - 116 - signer, err := mm.SegmentAndSignElem(ctx, ms) 117 51 if err != nil { 118 52 return err 119 53 } ··· 159 93 160 94 return err 161 95 } 96 + 97 + // // ingest a H264+AAC RTMP stream 98 + // func (mm *MediaManager) RTMPIngest(ctx context.Context, videoInput chan *RTMPH264Data, audioInput chan *RTMPAACData, ms MediaSigner) error { 99 + // ctx, cancel := context.WithCancel(ctx) 100 + // defer cancel() 101 + // pipelineSlice := []string{ 102 + // "appsrc name=videosrc ! queue ! h264parse name=parse", 103 + // "appsrc name=audiosrc ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc", 104 + // } 105 + // pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 106 + // if err != nil { 107 + // return fmt.Errorf("error creating RTMPIngest pipeline: %w", err) 108 + // } 109 + 110 + // videosrcEle, err := pipeline.GetElementByName("videosrc") 111 + // if err != nil { 112 + // return err 113 + // } 114 + // first := true 115 + // // defer runtime.KeepAlive(srcele) 116 + // videosrc := app.SrcFromElement(videosrcEle) 117 + // videosrc.SetCaps(gst.NewCapsFromString("video/x-h264,stream-format=avc3")) 118 + // videosrc.SetCallbacks(&app.SourceCallbacks{ 119 + // NeedDataFunc: func(self *app.Source, length uint) { 120 + // if ctx.Err() != nil { 121 + // self.EndStream() 122 + // return 123 + // } 124 + 125 + // packet := <-videoInput 126 + // if packet == nil { 127 + // log.Debug(ctx, "video input closed, ending stream") 128 + // self.EndStream() 129 + // return 130 + // } 131 + 132 + // // allBytes := bytes.Buffer{} 133 + // // for _, au := range packet.AU { 134 + // // allBytes.Write(au) 135 + // // } 136 + 137 + // var avc []byte 138 + // if first { 139 + // c := h264conf.Conf{ 140 + // SPS: packet.AU[0], 141 + // PPS: packet.AU[1], 142 + // } 143 + // avc, err = c.Marshal() 144 + // if err != nil { 145 + // log.Error(ctx, "failed to marshal H264 config", "error", err) 146 + // self.Error("failed to marshal H264 config", fmt.Errorf("failed to marshal H264 config: %w", err)) 147 + // return 148 + // } 149 + // first = false 150 + // } else { 151 + // avc, err = h264.AVCC(packet.AU).Marshal() 152 + // if err != nil { 153 + // log.Error(ctx, "failed to marshal AnnexB", "error", err) 154 + // self.Error("failed to marshal AnnexB", fmt.Errorf("failed to marshal AnnexB: %w", err)) 155 + // return 156 + // } 157 + // } 158 + 159 + // buf := gst.NewBufferFromBytes(avc) 160 + // buf.SetPresentationTimestamp(gst.ClockTime(uint64(packet.PTS.Nanoseconds()))) 161 + // ret := self.PushBuffer(buf) 162 + // if ret != gst.FlowOK { 163 + // log.Error(ctx, "failed to push video buffer", "error", ret.String()) 164 + // self.Error("failed to push video buffer", fmt.Errorf("failed to push video buffer: %s", ret.String())) 165 + // return 166 + // } 167 + // }, 168 + // }) 169 + 170 + // audiosrcEle, err := pipeline.GetElementByName("videosrc") 171 + // if err != nil { 172 + // return err 173 + // } 174 + // // defer runtime.KeepAlive(srcele) 175 + // audiosrc := app.SrcFromElement(audiosrcEle) 176 + // audiosrc.SetCallbacks(&app.SourceCallbacks{ 177 + // NeedDataFunc: func(self *app.Source, length uint) { 178 + // if ctx.Err() != nil { 179 + // self.EndStream() 180 + // return 181 + // } 182 + // packet := <-audioInput 183 + // if packet == nil { 184 + // log.Debug(ctx, "audio input closed, ending stream") 185 + // self.EndStream() 186 + // return 187 + // } 188 + // buf := gst.NewBufferFromBytes(packet.AU) 189 + // buf.SetPresentationTimestamp(gst.ClockTime(uint64(packet.PTS.Nanoseconds()))) 190 + // ret := self.PushBuffer(buf) 191 + // if ret != gst.FlowOK { 192 + // log.Error(ctx, "failed to push audio buffer", "error", ret.String()) 193 + // self.Error("failed to push audio buffer", fmt.Errorf("failed to push audio buffer: %s", ret.String())) 194 + // return 195 + // } 196 + // }, 197 + // }) 198 + 199 + // parseEle, err := pipeline.GetElementByName("parse") 200 + // if err != nil { 201 + // return err 202 + // } 203 + 204 + // signer, err := mm.SegmentAndSignElem(ctx, ms) 205 + // if err != nil { 206 + // return err 207 + // } 208 + 209 + // err = pipeline.Add(signer) 210 + // if err != nil { 211 + // return err 212 + // } 213 + // err = parseEle.Link(signer) 214 + // if err != nil { 215 + // return err 216 + // } 217 + // audioenc, err := pipeline.GetElementByName("audioenc") 218 + // if err != nil { 219 + // return err 220 + // } 221 + // err = audioenc.Link(signer) 222 + // if err != nil { 223 + // return err 224 + // } 225 + 226 + // busErr := make(chan error) 227 + // go func() { 228 + // err := HandleBusMessages(ctx, pipeline) 229 + // busErr <- err 230 + // }() 231 + 232 + // go mm.HandleKeyRevocation(ctx, ms, pipeline) 233 + 234 + // err = pipeline.SetState(gst.StatePlaying) 235 + // if err != nil { 236 + // return err 237 + // } 238 + 239 + // defer func() { 240 + // err := pipeline.SetState(gst.StateNull) 241 + // if err != nil { 242 + // log.Error(ctx, "error setting pipeline to null state", "error", err) 243 + // } 244 + // }() 245 + 246 + // err = <-busErr 247 + 248 + // return err 249 + // }
+2 -1
pkg/notifications/firebase.go
··· 5 5 "encoding/json" 6 6 "fmt" 7 7 8 + "context" 9 + 8 10 firebase "firebase.google.com/go/v4" 9 11 "firebase.google.com/go/v4/messaging" 10 - "golang.org/x/net/context" 11 12 "google.golang.org/api/option" 12 13 "stream.place/streamplace/pkg/log" 13 14 )
+3 -3
pkg/rtmps/rtmps.go
··· 14 14 ) 15 15 16 16 // passthrough RTMPS TLS terminator to external RTMP server 17 - func ServeRTMPS(ctx context.Context, cli *config.CLI) error { 17 + func ServeRTMPSAddon(ctx context.Context, cli *config.CLI) error { 18 18 if cli.RTMPServerAddon == "" { 19 19 return fmt.Errorf("RTMP server address not configured") 20 20 } ··· 29 29 MinVersion: tls.VersionTLS12, 30 30 } 31 31 32 - listener, err := tls.Listen("tcp", cli.RtmpsAddr, tlsConfig) 32 + listener, err := tls.Listen("tcp", cli.RTMPAddr, tlsConfig) 33 33 if err != nil { 34 34 return fmt.Errorf("failed to create RTMPS listener: %w", err) 35 35 } 36 36 37 37 log.Log(ctx, "rtmps server starting", 38 - "addr", cli.RtmpsAddr, 38 + "addr", cli.RTMPAddr, 39 39 "forwarding_to", cli.RTMPServerAddon) 40 40 41 41 go func() {