Live video on the AT Protocol
at next 253 lines 7.9 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "slices" 10 "strings" 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/atcrypto" 14 "go.opentelemetry.io/otel" 15 "stream.place/streamplace/pkg/aqio" 16 "stream.place/streamplace/pkg/aqtime" 17 c2patypes "stream.place/streamplace/pkg/c2patypes" 18 "stream.place/streamplace/pkg/constants" 19 "stream.place/streamplace/pkg/crypto/signers" 20 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 21 "stream.place/streamplace/pkg/localdb" 22 "stream.place/streamplace/pkg/log" 23) 24 25type ManifestAndCert struct { 26 Manifest c2patypes.Manifest `json:"manifest"` 27 Cert string `json:"cert"` 28 ValidationResults c2patypes.ValidationResults `json:"validation_results"` 29} 30 31func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader, local bool) error { 32 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4") 33 defer span.End() 34 buf, err := io.ReadAll(input) 35 if err != nil { 36 return fmt.Errorf("failed to read input: %w", err) 37 } 38 39 valid, err := ValidateMP4Media(ctx, buf) 40 if err != nil { 41 return fmt.Errorf("failed to validate MP4 media: %w", err) 42 } 43 meta := valid.Meta 44 pub := valid.Pub 45 mediaData := valid.MediaData 46 manifest := valid.Manifest 47 48 label := manifest.Label 49 if label != nil && mm.model != nil { 50 oldSeg, err := mm.localDB.GetSegment(*label) 51 if err != nil { 52 return fmt.Errorf("failed to get old segment: %w", err) 53 } 54 if oldSeg != nil { 55 log.Warn(ctx, "segment already exists, skipping", "segmentID", *label) 56 return nil 57 } 58 } 59 60 if meta.MetadataConfiguration != nil { 61 if meta.MetadataConfiguration.DistributionPolicy != nil { 62 allowedBroadcasters := meta.MetadataConfiguration.DistributionPolicy.AllowedBroadcasters 63 if allowedBroadcasters != nil { 64 if !slices.Contains(allowedBroadcasters, "*") && !slices.Contains(allowedBroadcasters, fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost)) { 65 return fmt.Errorf("broadcaster %s is not allowed to distribute content. Allowed broadcasters: %v", fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost), allowedBroadcasters) 66 } 67 } 68 } 69 } 70 71 var repoDID string 72 var signingKeyDID string 73 // special case for test signers that are only signed with a key 74 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) { 75 signingKeyDID = meta.Creator 76 repoDID = meta.Creator 77 } else { 78 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model) 79 if err != nil { 80 return err 81 } 82 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 83 if err != nil { 84 return err 85 } 86 if signingKey == nil { 87 return fmt.Errorf("no signing key found for %s", pub.DIDKey()) 88 } 89 repoDID = repo.DID 90 signingKeyDID = signingKey.DID 91 } 92 93 err = mm.cli.StreamIsAllowed(repoDID) 94 if err != nil { 95 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err) 96 } 97 98 // Apply content filtering after metadata is parsed 99 if mm.cli.ContentFilters != nil { 100 if err := mm.applyContentFilters(ctx, meta); err != nil { 101 return err 102 } 103 } 104 105 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4") 106 if err != nil { 107 return err 108 } 109 defer fd.Close() 110 111 r := bytes.NewReader(buf) 112 if _, err := io.Copy(fd, r); err != nil { 113 return err 114 } 115 var deleteAfter *time.Time 116 if meta.DistributionPolicy != nil && meta.DistributionPolicy.DeleteAfterSeconds != nil { 117 expiryTime := meta.StartTime.Time().Add(time.Duration(*meta.DistributionPolicy.DeleteAfterSeconds) * time.Second) 118 deleteAfter = &expiryTime 119 } 120 seg := &localdb.Segment{ 121 ID: *label, 122 SigningKeyDID: signingKeyDID, 123 RepoDID: repoDID, 124 StartTime: meta.StartTime.Time(), 125 Title: meta.Title, 126 Size: len(buf), 127 MediaData: mediaData, 128 ContentWarnings: localdb.ContentWarningsSlice(meta.ContentWarnings), 129 ContentRights: meta.ContentRights, 130 DistributionPolicy: meta.DistributionPolicy, 131 DeleteAfter: deleteAfter, 132 } 133 mm.newSegmentSubsMutex.RLock() 134 defer mm.newSegmentSubsMutex.RUnlock() 135 not := &NewSegmentNotification{ 136 Segment: seg, 137 Data: buf, 138 Metadata: meta, 139 Local: local, 140 } 141 for _, ch := range mm.newSegmentSubs { 142 go func() { 143 select { 144 case ch <- not: 145 case <-ctx.Done(): 146 return 147 case <-time.After(1 * time.Minute): 148 log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *label) 149 return 150 } 151 }() 152 } 153 aqt := aqtime.FromTime(meta.StartTime.Time()) 154 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *label) 155 return nil 156} 157 158// applyContentFilters applies content filtering based on configured rules 159func (mm *MediaManager) applyContentFilters(ctx context.Context, meta *SegmentMetadata) error { 160 // Check content warnings (if enabled) 161 if mm.cli.ContentFilters.ContentWarnings.Enabled { 162 for _, warning := range meta.ContentWarnings { 163 if mm.isWarningBlocked(warning) { 164 reason := fmt.Sprintf("content warning blocked: %s", warning) 165 log.Log(ctx, "content filtered", 166 "reason", reason, 167 "filter_type", "content_warning", 168 "creator", meta.Creator, 169 "warning", warning) 170 return fmt.Errorf("content filtered: %s", reason) 171 } 172 } 173 } 174 175 // Check distribution policy (if enabled) 176 if mm.cli.ContentFilters.DistributionPolicy.Enabled && meta.DistributionPolicy != nil { 177 if meta.DistributionPolicy.DeleteAfterSeconds != nil { 178 expiresAt := meta.StartTime.Time().Add(time.Duration(*meta.DistributionPolicy.DeleteAfterSeconds) * time.Second) 179 if time.Now().After(expiresAt) { 180 reason := fmt.Sprintf("distribution policy expired: segment expires at %s", expiresAt) 181 log.Log(ctx, "content filtered", 182 "reason", reason, 183 "filter_type", "distribution_policy", 184 "creator", meta.Creator, 185 "start_time", meta.StartTime, 186 "expires_at", expiresAt) 187 return fmt.Errorf("content filtered: %s", reason) 188 } 189 } 190 } 191 192 return nil 193} 194 195// isWarningBlocked checks if a content warning is in the blocked list 196func (mm *MediaManager) isWarningBlocked(warning string) bool { 197 for _, blocked := range mm.cli.ContentFilters.ContentWarnings.BlockedWarnings { 198 if warning == blocked { 199 return true 200 } 201 } 202 return false 203} 204 205type ValidationResult struct { 206 Pub *atcrypto.PublicKeyK256 207 Meta *SegmentMetadata 208 MediaData *localdb.SegmentMediaData 209 Manifest *c2patypes.Manifest 210 Cert string 211} 212 213// validate a signed mp4 file unto itself, ignoring whether this user is allowed and whatnot 214func ValidateMP4Media(ctx context.Context, buf []byte) (*ValidationResult, error) { 215 var maniCert ManifestAndCert 216 maniStr, err := iroh_streamplace.GetManifestAndCert(c2patypes.NewReader(aqio.NewReadWriteSeeker(buf))) 217 if err != nil { 218 return nil, err 219 } 220 err = json.Unmarshal([]byte(maniStr), &maniCert) 221 if err != nil { 222 return nil, fmt.Errorf("failed to unmarshal manifest and cert: %w", err) 223 } 224 activeManifest := maniCert.ValidationResults.ActiveManifest 225 if activeManifest != nil { 226 if activeManifest.Failure == nil { 227 return nil, fmt.Errorf("active manifest failure array not found?!") 228 } 229 if len(activeManifest.Failure) > 0 { 230 bs, _ := json.Marshal(activeManifest.Failure) 231 return nil, fmt.Errorf("active manifest has failures: %s", string(bs)) 232 } 233 } 234 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert)) 235 if err != nil { 236 return nil, err 237 } 238 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest) 239 if err != nil { 240 return nil, err 241 } 242 mediaData, err := ParseSegmentMediaData(ctx, buf) 243 if err != nil { 244 return nil, err 245 } 246 return &ValidationResult{ 247 Pub: pub, 248 Meta: meta, 249 MediaData: mediaData, 250 Manifest: &maniCert.Manifest, 251 Cert: maniCert.Cert, 252 }, nil 253}