Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.3 253 lines 7.8 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "slices" 10 "strings" 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/atcrypto" 14 "go.opentelemetry.io/otel" 15 "stream.place/streamplace/pkg/aqio" 16 "stream.place/streamplace/pkg/aqtime" 17 c2patypes "stream.place/streamplace/pkg/c2patypes" 18 "stream.place/streamplace/pkg/constants" 19 "stream.place/streamplace/pkg/crypto/signers" 20 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 21 "stream.place/streamplace/pkg/log" 22 "stream.place/streamplace/pkg/model" 23) 24 25type ManifestAndCert struct { 26 Manifest c2patypes.Manifest `json:"manifest"` 27 Cert string `json:"cert"` 28 ValidationResults c2patypes.ValidationResults `json:"validation_results"` 29} 30 31func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader, local bool) error { 32 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4") 33 defer span.End() 34 buf, err := io.ReadAll(input) 35 if err != nil { 36 return fmt.Errorf("failed to read input: %w", err) 37 } 38 39 valid, err := ValidateMP4Media(ctx, buf) 40 if err != nil { 41 return fmt.Errorf("failed to validate MP4 media: %w", err) 42 } 43 meta := valid.Meta 44 pub := valid.Pub 45 mediaData := valid.MediaData 46 manifest := valid.Manifest 47 48 label := manifest.Label 49 if label != nil && mm.model != nil { 50 oldSeg, err := mm.model.GetSegment(*label) 51 if err != nil { 52 return fmt.Errorf("failed to get old segment: %w", err) 53 } 54 if oldSeg != nil { 55 log.Warn(ctx, "segment already exists, skipping", "segmentID", *label) 56 return nil 57 } 58 } 59 60 if meta.MetadataConfiguration != nil { 61 if meta.MetadataConfiguration.DistributionPolicy != nil { 62 allowedBroadcasters := meta.MetadataConfiguration.DistributionPolicy.AllowedBroadcasters 63 if allowedBroadcasters != nil { 64 if !slices.Contains(allowedBroadcasters, "*") && !slices.Contains(allowedBroadcasters, fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost)) { 65 return fmt.Errorf("broadcaster %s is not allowed to distribute content. Allowed broadcasters: %v", fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost), allowedBroadcasters) 66 } 67 } 68 } 69 } 70 71 var repoDID string 72 var signingKeyDID string 73 // special case for test signers that are only signed with a key 74 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) { 75 signingKeyDID = meta.Creator 76 repoDID = meta.Creator 77 } else { 78 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model) 79 if err != nil { 80 return err 81 } 82 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 83 if err != nil { 84 return err 85 } 86 if signingKey == nil { 87 return fmt.Errorf("no signing key found for %s", pub.DIDKey()) 88 } 89 repoDID = repo.DID 90 signingKeyDID = signingKey.DID 91 } 92 93 err = mm.cli.StreamIsAllowed(repoDID) 94 if err != nil { 95 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err) 96 } 97 98 // Apply content filtering after metadata is parsed 99 if mm.cli.ContentFilters != nil { 100 if err := mm.applyContentFilters(ctx, meta); err != nil { 101 return err 102 } 103 } 104 105 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4") 106 if err != nil { 107 return err 108 } 109 defer fd.Close() 110 111 r := bytes.NewReader(buf) 112 if _, err := io.Copy(fd, r); err != nil { 113 return err 114 } 115 var deleteAfter *time.Time 116 if meta.DistributionPolicy != nil && meta.DistributionPolicy.DeleteAfterSeconds != nil { 117 expiryTime := meta.StartTime.Time().Add(time.Duration(*meta.DistributionPolicy.DeleteAfterSeconds) * time.Second) 118 deleteAfter = &expiryTime 119 } 120 seg := &model.Segment{ 121 ID: *label, 122 SigningKeyDID: signingKeyDID, 123 RepoDID: repoDID, 124 StartTime: meta.StartTime.Time(), 125 Title: meta.Title, 126 Size: len(buf), 127 MediaData: mediaData, 128 ContentWarnings: model.ContentWarningsSlice(meta.ContentWarnings), 129 ContentRights: meta.ContentRights, 130 DistributionPolicy: meta.DistributionPolicy, 131 DeleteAfter: deleteAfter, 132 } 133 mm.newSegmentSubsMutex.RLock() 134 defer mm.newSegmentSubsMutex.RUnlock() 135 not := &NewSegmentNotification{ 136 Segment: seg, 137 Data: buf, 138 Metadata: meta, 139 Local: local, 140 } 141 for _, ch := range mm.newSegmentSubs { 142 go func() { 143 select { 144 case ch <- not: 145 case <-ctx.Done(): 146 return 147 case <-time.After(1 * time.Minute): 148 log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *label) 149 return 150 } 151 }() 152 } 153 aqt := aqtime.FromTime(meta.StartTime.Time()) 154 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *label) 155 return nil 156} 157 158// applyContentFilters applies content filtering based on configured rules 159func (mm *MediaManager) applyContentFilters(ctx context.Context, meta *SegmentMetadata) error { 160 // Check content warnings (if enabled) 161 if mm.cli.ContentFilters.ContentWarnings.Enabled { 162 for _, warning := range meta.ContentWarnings { 163 if mm.isWarningBlocked(warning) { 164 reason := fmt.Sprintf("content warning blocked: %s", warning) 165 log.Log(ctx, "content filtered", 166 "reason", reason, 167 "filter_type", "content_warning", 168 "creator", meta.Creator, 169 "warning", warning) 170 return fmt.Errorf("content filtered: %s", reason) 171 } 172 } 173 } 174 175 // Check distribution policy (if enabled) 176 if mm.cli.ContentFilters.DistributionPolicy.Enabled && meta.DistributionPolicy != nil { 177 if meta.DistributionPolicy.DeleteAfterSeconds != nil { 178 expiresAt := meta.StartTime.Time().Add(time.Duration(*meta.DistributionPolicy.DeleteAfterSeconds) * time.Second) 179 if time.Now().After(expiresAt) { 180 reason := fmt.Sprintf("distribution policy expired: segment expires at %s", expiresAt) 181 log.Log(ctx, "content filtered", 182 "reason", reason, 183 "filter_type", "distribution_policy", 184 "creator", meta.Creator, 185 "start_time", meta.StartTime, 186 "expires_at", expiresAt) 187 return fmt.Errorf("content filtered: %s", reason) 188 } 189 } 190 } 191 192 return nil 193} 194 195// isWarningBlocked checks if a content warning is in the blocked list 196func (mm *MediaManager) isWarningBlocked(warning string) bool { 197 for _, blocked := range mm.cli.ContentFilters.ContentWarnings.BlockedWarnings { 198 if warning == blocked { 199 return true 200 } 201 } 202 return false 203} 204 205type ValidationResult struct { 206 Pub *atcrypto.PublicKeyK256 207 Meta *SegmentMetadata 208 MediaData *model.SegmentMediaData 209 Manifest *c2patypes.Manifest 210 Cert string 211} 212 213// validate a signed mp4 file unto itself, ignoring whether this user is allowed and whatnot 214func ValidateMP4Media(ctx context.Context, buf []byte) (*ValidationResult, error) { 215 var maniCert ManifestAndCert 216 maniStr, err := iroh_streamplace.GetManifestAndCert(c2patypes.NewReader(aqio.NewReadWriteSeeker(buf))) 217 if err != nil { 218 return nil, err 219 } 220 err = json.Unmarshal([]byte(maniStr), &maniCert) 221 if err != nil { 222 return nil, fmt.Errorf("failed to unmarshal manifest and cert: %w", err) 223 } 224 activeManifest := maniCert.ValidationResults.ActiveManifest 225 if activeManifest != nil { 226 if activeManifest.Failure == nil { 227 return nil, fmt.Errorf("active manifest failure array not found?!") 228 } 229 if len(activeManifest.Failure) > 0 { 230 bs, _ := json.Marshal(activeManifest.Failure) 231 return nil, fmt.Errorf("active manifest has failures: %s", string(bs)) 232 } 233 } 234 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert)) 235 if err != nil { 236 return nil, err 237 } 238 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest) 239 if err != nil { 240 return nil, err 241 } 242 mediaData, err := ParseSegmentMediaData(ctx, buf) 243 if err != nil { 244 return nil, err 245 } 246 return &ValidationResult{ 247 Pub: pub, 248 Meta: meta, 249 MediaData: mediaData, 250 Manifest: &maniCert.Manifest, 251 Cert: maniCert.Cert, 252 }, nil 253}