Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.7.26 249 lines 7.2 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "crypto" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "io" 11 "sync" 12 13 "github.com/google/uuid" 14 "github.com/pion/interceptor" 15 "github.com/pion/interceptor/pkg/intervalpli" 16 "github.com/pion/webrtc/v4" 17 "go.opentelemetry.io/otel" 18 "stream.place/streamplace/pkg/aqtime" 19 "stream.place/streamplace/pkg/atproto" 20 "stream.place/streamplace/pkg/bus" 21 "stream.place/streamplace/pkg/config" 22 "stream.place/streamplace/pkg/gstinit" 23 "stream.place/streamplace/pkg/model" 24 25 "stream.place/streamplace/pkg/log" 26 "stream.place/streamplace/pkg/replication" 27 28 "git.stream.place/streamplace/c2pa-go/pkg/c2pa/generated/manifeststore" 29 "github.com/piprate/json-gold/ld" 30 31 irohStreamplace "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 32 33 _ "stream.place/streamplace/pkg/streamplacedeps" 34) 35 36const CertFile = "cert.pem" 37const SegmentsDir = "segments" 38 39var StreamplaceMetadata = "place.stream.metadata" 40 41type MediaManager struct { 42 cli *config.CLI 43 replicator replication.Replicator 44 hlsRunning map[string]*M3U8 45 hlsRunningMut sync.Mutex 46 httpPipes map[string]io.Writer 47 httpPipesMutex sync.Mutex 48 newSegmentSubs []chan *NewSegmentNotification 49 newSegmentSubsMutex sync.RWMutex 50 model model.Model 51 bus *bus.Bus 52 atsync *atproto.ATProtoSynchronizer 53 webrtcAPI *webrtc.API 54 webrtcConfig webrtc.Configuration 55} 56 57type NewSegmentNotification struct { 58 Segment *model.Segment 59 Data []byte 60 Metadata *SegmentMetadata 61} 62 63func RunSelfTest(ctx context.Context) error { 64 gstinit.InitGST() 65 return SelfTest(ctx) 66} 67 68func MakeMediaManager(ctx context.Context, cli *config.CLI, signer crypto.Signer, rep replication.Replicator, mod model.Model, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer) (*MediaManager, error) { 69 gstinit.InitGST() 70 err := SelfTest(ctx) 71 if err != nil { 72 return nil, fmt.Errorf("error in gstreamer self-test: %w", err) 73 } 74 75 m := &webrtc.MediaEngine{} 76 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. 77 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` 78 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry 79 // for each PeerConnection. 80 i := &interceptor.Registry{} 81 82 // Register a intervalpli factory 83 // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender. 84 // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates 85 // A real world application should process incoming RTCP packets from viewers and forward them to senders 86 intervalPliFactory, err := intervalpli.NewReceiverInterceptor() 87 if err != nil { 88 return nil, fmt.Errorf("failed to create intervalpli factory: %w", err) 89 } 90 i.Add(intervalPliFactory) 91 92 if err := m.RegisterCodec(webrtc.RTPCodecParameters{ 93 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, 94 PayloadType: 102, 95 }, webrtc.RTPCodecTypeVideo); err != nil { 96 return nil, err 97 } 98 if err := m.RegisterCodec(webrtc.RTPCodecParameters{ 99 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, 100 PayloadType: 111, 101 }, webrtc.RTPCodecTypeAudio); err != nil { 102 return nil, err 103 } 104 105 // Use the default set of Interceptors 106 if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil { 107 return nil, fmt.Errorf("failed to register default interceptors: %w", err) 108 } 109 110 // Create the API object with the MediaEngine 111 api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)) 112 113 // Prepare the configuration 114 config := webrtc.Configuration{ 115 ICEServers: []webrtc.ICEServer{ 116 { 117 URLs: []string{"stun:stun.l.google.com:19302"}, 118 }, 119 }, 120 } 121 122 return &MediaManager{ 123 cli: cli, 124 replicator: rep, 125 hlsRunning: map[string]*M3U8{}, 126 httpPipes: map[string]io.Writer{}, 127 model: mod, 128 bus: bus, 129 atsync: atsync, 130 webrtcAPI: api, 131 webrtcConfig: config, 132 }, nil 133} 134 135func (mm *MediaManager) HandleData(node *irohStreamplace.PublicKey, data []byte) { 136 r := bytes.NewReader(data) 137 ctx := context.Background() 138 err := mm.ValidateMP4(ctx, r) 139 if err != nil { 140 log.Log(ctx, "invalid incoming segment", "error", err) 141 } 142} 143 144// replacement for os.Pipe that works on windows 145func (mm *MediaManager) HTTPPipe() (string, io.ReadCloser, func(), error) { 146 uu, err := uuid.NewV7() 147 if err != nil { 148 return "", nil, nil, err 149 } 150 mm.httpPipesMutex.Lock() 151 defer mm.httpPipesMutex.Unlock() 152 u := fmt.Sprintf("%s/http-pipe/%s", mm.cli.OwnInternalURL(), uu.String()) 153 done := func() { 154 mm.httpPipesMutex.Lock() 155 defer mm.httpPipesMutex.Unlock() 156 delete(mm.httpPipes, uu.String()) 157 } 158 r, w := io.Pipe() 159 mm.httpPipes[uu.String()] = w 160 return u, r, done, nil 161} 162 163func (mm *MediaManager) GetHTTPPipeWriter(uu string) io.Writer { 164 mm.httpPipesMutex.Lock() 165 defer mm.httpPipesMutex.Unlock() 166 return mm.httpPipes[uu] 167} 168 169// register a handler for all new segments that come in 170func (mm *MediaManager) NewSegment() <-chan *NewSegmentNotification { 171 ch := make(chan *NewSegmentNotification) 172 mm.newSegmentSubsMutex.Lock() 173 defer mm.newSegmentSubsMutex.Unlock() 174 mm.newSegmentSubs = append(mm.newSegmentSubs, ch) 175 return ch 176} 177 178type obj map[string]any 179 180type StringVal struct { 181 Value string `json:"@value"` 182} 183 184type ExpandedSchemaOrg []struct { 185 Creator []StringVal `json:"http://purl.org/dc/elements/1.1/creator"` 186 Date []StringVal `json:"http://purl.org/dc/elements/1.1/date"` 187 Title []StringVal `json:"http://purl.org/dc/elements/1.1/title"` 188} 189 190type SegmentMetadata struct { 191 StartTime aqtime.AQTime 192 Title string 193 Creator string 194} 195 196var ErrInvalidMetadata = errors.New("invalid segment metadata") 197 198func ParseSegmentAssertions(ctx context.Context, mani *manifeststore.Manifest) (*SegmentMetadata, error) { 199 _, span := otel.Tracer("signer").Start(ctx, "ParseSegmentAssertions") 200 defer span.End() 201 var ass *manifeststore.ManifestAssertion 202 for _, a := range mani.Assertions { 203 if a.Label == StreamplaceMetadata { 204 ass = &a 205 break 206 } 207 } 208 if ass == nil { 209 return nil, fmt.Errorf("couldn't find %s assertions", StreamplaceMetadata) 210 } 211 proc := ld.NewJsonLdProcessor() 212 options := ld.NewJsonLdOptions("") 213 flat, err := proc.Expand(ass.Data, options) 214 if err != nil { 215 return nil, err 216 } 217 bs, err := json.Marshal(flat) 218 if err != nil { 219 return nil, err 220 } 221 var metas ExpandedSchemaOrg 222 err = json.Unmarshal(bs, &metas) 223 if err != nil { 224 return nil, err 225 } 226 if len(metas) != 1 { 227 return nil, ErrInvalidMetadata 228 } 229 meta := metas[0] 230 if len(meta.Creator) == 0 { 231 return nil, ErrInvalidMetadata 232 } 233 if len(meta.Title) != 1 { 234 return nil, ErrInvalidMetadata 235 } 236 if len(meta.Date) != 1 { 237 return nil, ErrInvalidMetadata 238 } 239 start, err := aqtime.FromString(meta.Date[0].Value) 240 if err != nil { 241 return nil, err 242 } 243 out := SegmentMetadata{ 244 StartTime: start, 245 Title: meta.Title[0].Value, 246 Creator: meta.Creator[0].Value, 247 } 248 return &out, nil 249}