Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/rtmps 250 lines 7.6 kB view raw
1package media 2 3import ( 4 "context" 5 "crypto" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "sync" 11 12 "github.com/google/uuid" 13 "github.com/pion/interceptor" 14 "github.com/pion/interceptor/pkg/intervalpli" 15 "github.com/pion/webrtc/v4" 16 "go.opentelemetry.io/otel" 17 "stream.place/streamplace/pkg/aqtime" 18 "stream.place/streamplace/pkg/atproto" 19 "stream.place/streamplace/pkg/bus" 20 "stream.place/streamplace/pkg/config" 21 "stream.place/streamplace/pkg/gstinit" 22 "stream.place/streamplace/pkg/media/segchanman" 23 "stream.place/streamplace/pkg/model" 24 25 "stream.place/streamplace/pkg/replication" 26 27 "git.stream.place/streamplace/c2pa-go/pkg/c2pa/generated/manifeststore" 28 "github.com/piprate/json-gold/ld" 29) 30 31const CERT_FILE = "cert.pem" 32const SEGMENTS_DIR = "segments" 33 34var STREAMPLACE_METADATA = "place.stream.metadata" 35 36type MediaManager struct { 37 cli *config.CLI 38 segChanMan *segchanman.SegChanMan 39 replicator replication.Replicator 40 hlsRunning map[string]*M3U8 41 hlsRunningMut sync.Mutex 42 httpPipes map[string]io.Writer 43 httpPipesMutex sync.Mutex 44 newSegmentSubs []chan *NewSegmentNotification 45 newSegmentSubsMutex sync.RWMutex 46 model model.Model 47 bus *bus.Bus 48 atsync *atproto.ATProtoSynchronizer 49 webrtcAPI *webrtc.API 50 webrtcConfig webrtc.Configuration 51} 52 53type NewSegmentNotification struct { 54 Segment *model.Segment 55 Data []byte 56 Metadata *SegmentMetadata 57} 58 59func RunSelfTest(ctx context.Context) error { 60 gstinit.InitGST() 61 return SelfTest(ctx) 62} 63 64func MakeMediaManager(ctx context.Context, cli *config.CLI, signer crypto.Signer, rep replication.Replicator, mod model.Model, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer) (*MediaManager, error) { 65 gstinit.InitGST() 66 err := SelfTest(ctx) 67 if err != nil { 68 return nil, fmt.Errorf("error in gstreamer self-test: %w", err) 69 } 70 71 m := &webrtc.MediaEngine{} 72 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. 73 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` 74 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry 75 // for each PeerConnection. 76 i := &interceptor.Registry{} 77 78 // Register a intervalpli factory 79 // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender. 80 // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates 81 // A real world application should process incoming RTCP packets from viewers and forward them to senders 82 intervalPliFactory, err := intervalpli.NewReceiverInterceptor() 83 if err != nil { 84 return nil, fmt.Errorf("failed to create intervalpli factory: %w", err) 85 } 86 i.Add(intervalPliFactory) 87 88 if err := m.RegisterCodec(webrtc.RTPCodecParameters{ 89 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, 90 PayloadType: 102, 91 }, webrtc.RTPCodecTypeVideo); err != nil { 92 return nil, err 93 } 94 if err := m.RegisterCodec(webrtc.RTPCodecParameters{ 95 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, 96 PayloadType: 111, 97 }, webrtc.RTPCodecTypeAudio); err != nil { 98 return nil, err 99 } 100 101 // Use the default set of Interceptors 102 if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil { 103 return nil, fmt.Errorf("failed to register default interceptors: %w", err) 104 } 105 106 // Create the API object with the MediaEngine 107 api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)) 108 109 // Prepare the configuration 110 config := webrtc.Configuration{ 111 ICEServers: []webrtc.ICEServer{ 112 { 113 URLs: []string{"stun:stun.l.google.com:19302"}, 114 }, 115 }, 116 } 117 return &MediaManager{ 118 cli: cli, 119 segChanMan: segchanman.MakeSegChanMan(), 120 replicator: rep, 121 hlsRunning: map[string]*M3U8{}, 122 httpPipes: map[string]io.Writer{}, 123 model: mod, 124 bus: bus, 125 atsync: atsync, 126 webrtcAPI: api, 127 webrtcConfig: config, 128 }, nil 129} 130 131// replacement for os.Pipe that works on windows 132func (mm *MediaManager) HTTPPipe() (string, io.ReadCloser, func(), error) { 133 uu, err := uuid.NewV7() 134 if err != nil { 135 return "", nil, nil, err 136 } 137 mm.httpPipesMutex.Lock() 138 defer mm.httpPipesMutex.Unlock() 139 u := fmt.Sprintf("%s/http-pipe/%s", mm.cli.OwnInternalURL(), uu.String()) 140 done := func() { 141 mm.httpPipesMutex.Lock() 142 defer mm.httpPipesMutex.Unlock() 143 delete(mm.httpPipes, uu.String()) 144 } 145 r, w := io.Pipe() 146 mm.httpPipes[uu.String()] = w 147 return u, r, done, nil 148} 149 150func (mm *MediaManager) GetHTTPPipeWriter(uu string) io.Writer { 151 mm.httpPipesMutex.Lock() 152 defer mm.httpPipesMutex.Unlock() 153 return mm.httpPipes[uu] 154} 155 156// register a handler for all new segments that come in 157func (mm *MediaManager) NewSegment() <-chan *NewSegmentNotification { 158 ch := make(chan *NewSegmentNotification) 159 mm.newSegmentSubsMutex.Lock() 160 defer mm.newSegmentSubsMutex.Unlock() 161 mm.newSegmentSubs = append(mm.newSegmentSubs, ch) 162 return ch 163} 164 165// subscribe to the latest segments from a given user for livestreaming purposes 166func (mm *MediaManager) SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg { 167 return mm.segChanMan.SubscribeSegment(ctx, user, rendition) 168} 169 170func (mm *MediaManager) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg) { 171 mm.segChanMan.UnsubscribeSegment(ctx, user, rendition, ch) 172} 173 174// subscribe to the latest segments from a given user for livestreaming purposes 175func (mm *MediaManager) PublishSegment(ctx context.Context, user, rendition string, seg *segchanman.Seg) { 176 mm.segChanMan.PublishSegment(ctx, user, rendition, seg) 177} 178 179type obj map[string]any 180 181type StringVal struct { 182 Value string `json:"@value"` 183} 184 185type ExpandedSchemaOrg []struct { 186 Creator []StringVal `json:"http://purl.org/dc/elements/1.1/creator"` 187 Date []StringVal `json:"http://purl.org/dc/elements/1.1/date"` 188 Title []StringVal `json:"http://purl.org/dc/elements/1.1/title"` 189} 190 191type SegmentMetadata struct { 192 StartTime aqtime.AQTime 193 Title string 194 Creator string 195} 196 197var ErrInvalidMetadata = errors.New("invalid segment metadata") 198 199func ParseSegmentAssertions(ctx context.Context, mani *manifeststore.Manifest) (*SegmentMetadata, error) { 200 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentAssertions") 201 defer span.End() 202 var ass *manifeststore.ManifestAssertion 203 for _, a := range mani.Assertions { 204 if a.Label == STREAMPLACE_METADATA { 205 ass = &a 206 break 207 } 208 } 209 if ass == nil { 210 return nil, fmt.Errorf("couldn't find %s assertions", STREAMPLACE_METADATA) 211 } 212 proc := ld.NewJsonLdProcessor() 213 options := ld.NewJsonLdOptions("") 214 flat, err := proc.Expand(ass.Data, options) 215 if err != nil { 216 return nil, err 217 } 218 bs, err := json.Marshal(flat) 219 if err != nil { 220 return nil, err 221 } 222 var metas ExpandedSchemaOrg 223 err = json.Unmarshal(bs, &metas) 224 if err != nil { 225 return nil, err 226 } 227 if len(metas) != 1 { 228 return nil, ErrInvalidMetadata 229 } 230 meta := metas[0] 231 if len(meta.Creator) == 0 { 232 return nil, ErrInvalidMetadata 233 } 234 if len(meta.Title) != 1 { 235 return nil, ErrInvalidMetadata 236 } 237 if len(meta.Date) != 1 { 238 return nil, ErrInvalidMetadata 239 } 240 start, err := aqtime.FromString(meta.Date[0].Value) 241 if err != nil { 242 return nil, err 243 } 244 out := SegmentMetadata{ 245 StartTime: start, 246 Title: meta.Title[0].Value, 247 Creator: meta.Creator[0].Value, 248 } 249 return &out, nil 250}