Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/fix-context-recursion 236 lines 6.9 kB view raw
1package media 2 3import ( 4 "context" 5 "crypto" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "sync" 11 12 "github.com/google/uuid" 13 "github.com/pion/interceptor" 14 "github.com/pion/interceptor/pkg/intervalpli" 15 "github.com/pion/webrtc/v4" 16 "go.opentelemetry.io/otel" 17 "stream.place/streamplace/pkg/aqtime" 18 "stream.place/streamplace/pkg/atproto" 19 "stream.place/streamplace/pkg/bus" 20 "stream.place/streamplace/pkg/config" 21 "stream.place/streamplace/pkg/gstinit" 22 "stream.place/streamplace/pkg/model" 23 24 "stream.place/streamplace/pkg/replication" 25 26 "git.stream.place/streamplace/c2pa-go/pkg/c2pa/generated/manifeststore" 27 "github.com/piprate/json-gold/ld" 28) 29 30// #cgo pkg-config: streamplacedeps-uninstalled 31import "C" 32 33const CertFile = "cert.pem" 34const SegmentsDir = "segments" 35 36var StreamplaceMetadata = "place.stream.metadata" 37 38type MediaManager struct { 39 cli *config.CLI 40 replicator replication.Replicator 41 hlsRunning map[string]*M3U8 42 hlsRunningMut sync.Mutex 43 httpPipes map[string]io.Writer 44 httpPipesMutex sync.Mutex 45 newSegmentSubs []chan *NewSegmentNotification 46 newSegmentSubsMutex sync.RWMutex 47 model model.Model 48 bus *bus.Bus 49 atsync *atproto.ATProtoSynchronizer 50 webrtcAPI *webrtc.API 51 webrtcConfig webrtc.Configuration 52} 53 54type NewSegmentNotification struct { 55 Segment *model.Segment 56 Data []byte 57 Metadata *SegmentMetadata 58} 59 60func RunSelfTest(ctx context.Context) error { 61 gstinit.InitGST() 62 return SelfTest(ctx) 63} 64 65func MakeMediaManager(ctx context.Context, cli *config.CLI, signer crypto.Signer, rep replication.Replicator, mod model.Model, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer) (*MediaManager, error) { 66 gstinit.InitGST() 67 err := SelfTest(ctx) 68 if err != nil { 69 return nil, fmt.Errorf("error in gstreamer self-test: %w", err) 70 } 71 72 m := &webrtc.MediaEngine{} 73 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. 74 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` 75 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry 76 // for each PeerConnection. 77 i := &interceptor.Registry{} 78 79 // Register a intervalpli factory 80 // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender. 81 // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates 82 // A real world application should process incoming RTCP packets from viewers and forward them to senders 83 intervalPliFactory, err := intervalpli.NewReceiverInterceptor() 84 if err != nil { 85 return nil, fmt.Errorf("failed to create intervalpli factory: %w", err) 86 } 87 i.Add(intervalPliFactory) 88 89 if err := m.RegisterCodec(webrtc.RTPCodecParameters{ 90 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, 91 PayloadType: 102, 92 }, webrtc.RTPCodecTypeVideo); err != nil { 93 return nil, err 94 } 95 if err := m.RegisterCodec(webrtc.RTPCodecParameters{ 96 RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, 97 PayloadType: 111, 98 }, webrtc.RTPCodecTypeAudio); err != nil { 99 return nil, err 100 } 101 102 // Use the default set of Interceptors 103 if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil { 104 return nil, fmt.Errorf("failed to register default interceptors: %w", err) 105 } 106 107 // Create the API object with the MediaEngine 108 api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)) 109 110 // Prepare the configuration 111 config := webrtc.Configuration{ 112 ICEServers: []webrtc.ICEServer{ 113 { 114 URLs: []string{"stun:stun.l.google.com:19302"}, 115 }, 116 }, 117 } 118 return &MediaManager{ 119 cli: cli, 120 replicator: rep, 121 hlsRunning: map[string]*M3U8{}, 122 httpPipes: map[string]io.Writer{}, 123 model: mod, 124 bus: bus, 125 atsync: atsync, 126 webrtcAPI: api, 127 webrtcConfig: config, 128 }, nil 129} 130 131// replacement for os.Pipe that works on windows 132func (mm *MediaManager) HTTPPipe() (string, io.ReadCloser, func(), error) { 133 uu, err := uuid.NewV7() 134 if err != nil { 135 return "", nil, nil, err 136 } 137 mm.httpPipesMutex.Lock() 138 defer mm.httpPipesMutex.Unlock() 139 u := fmt.Sprintf("%s/http-pipe/%s", mm.cli.OwnInternalURL(), uu.String()) 140 done := func() { 141 mm.httpPipesMutex.Lock() 142 defer mm.httpPipesMutex.Unlock() 143 delete(mm.httpPipes, uu.String()) 144 } 145 r, w := io.Pipe() 146 mm.httpPipes[uu.String()] = w 147 return u, r, done, nil 148} 149 150func (mm *MediaManager) GetHTTPPipeWriter(uu string) io.Writer { 151 mm.httpPipesMutex.Lock() 152 defer mm.httpPipesMutex.Unlock() 153 return mm.httpPipes[uu] 154} 155 156// register a handler for all new segments that come in 157func (mm *MediaManager) NewSegment() <-chan *NewSegmentNotification { 158 ch := make(chan *NewSegmentNotification) 159 mm.newSegmentSubsMutex.Lock() 160 defer mm.newSegmentSubsMutex.Unlock() 161 mm.newSegmentSubs = append(mm.newSegmentSubs, ch) 162 return ch 163} 164 165type obj map[string]any 166 167type StringVal struct { 168 Value string `json:"@value"` 169} 170 171type ExpandedSchemaOrg []struct { 172 Creator []StringVal `json:"http://purl.org/dc/elements/1.1/creator"` 173 Date []StringVal `json:"http://purl.org/dc/elements/1.1/date"` 174 Title []StringVal `json:"http://purl.org/dc/elements/1.1/title"` 175} 176 177type SegmentMetadata struct { 178 StartTime aqtime.AQTime 179 Title string 180 Creator string 181} 182 183var ErrInvalidMetadata = errors.New("invalid segment metadata") 184 185func ParseSegmentAssertions(ctx context.Context, mani *manifeststore.Manifest) (*SegmentMetadata, error) { 186 _, span := otel.Tracer("signer").Start(ctx, "ParseSegmentAssertions") 187 defer span.End() 188 var ass *manifeststore.ManifestAssertion 189 for _, a := range mani.Assertions { 190 if a.Label == StreamplaceMetadata { 191 ass = &a 192 break 193 } 194 } 195 if ass == nil { 196 return nil, fmt.Errorf("couldn't find %s assertions", StreamplaceMetadata) 197 } 198 proc := ld.NewJsonLdProcessor() 199 options := ld.NewJsonLdOptions("") 200 flat, err := proc.Expand(ass.Data, options) 201 if err != nil { 202 return nil, err 203 } 204 bs, err := json.Marshal(flat) 205 if err != nil { 206 return nil, err 207 } 208 var metas ExpandedSchemaOrg 209 err = json.Unmarshal(bs, &metas) 210 if err != nil { 211 return nil, err 212 } 213 if len(metas) != 1 { 214 return nil, ErrInvalidMetadata 215 } 216 meta := metas[0] 217 if len(meta.Creator) == 0 { 218 return nil, ErrInvalidMetadata 219 } 220 if len(meta.Title) != 1 { 221 return nil, ErrInvalidMetadata 222 } 223 if len(meta.Date) != 1 { 224 return nil, ErrInvalidMetadata 225 } 226 start, err := aqtime.FromString(meta.Date[0].Value) 227 if err != nil { 228 return nil, err 229 } 230 out := SegmentMetadata{ 231 StartTime: start, 232 Title: meta.Title[0].Value, 233 Creator: meta.Creator[0].Value, 234 } 235 return &out, nil 236}