Live video on the AT Protocol
at eli/multitesting 207 lines 6.2 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "slices" 10 "strings" 11 "time" 12 13 "go.opentelemetry.io/otel" 14 "stream.place/streamplace/pkg/aqtime" 15 c2patypes "stream.place/streamplace/pkg/c2patypes" 16 "stream.place/streamplace/pkg/constants" 17 "stream.place/streamplace/pkg/crypto/signers" 18 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 19 "stream.place/streamplace/pkg/log" 20 "stream.place/streamplace/pkg/model" 21) 22 23type ManifestAndCert struct { 24 Manifest c2patypes.Manifest `json:"manifest"` 25 Cert string `json:"cert"` 26} 27 28func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader, local bool) error { 29 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4") 30 defer span.End() 31 buf, err := io.ReadAll(input) 32 if err != nil { 33 return err 34 } 35 var maniCert ManifestAndCert 36 maniStr, err := iroh_streamplace.GetManifestAndCert(buf) 37 if err != nil { 38 return err 39 } 40 err = json.Unmarshal([]byte(maniStr), &maniCert) 41 if err != nil { 42 return err 43 } 44 label := maniCert.Manifest.Label 45 if label != nil && mm.model != nil { 46 oldSeg, err := mm.model.GetSegment(*label) 47 if err != nil { 48 return err 49 } 50 if oldSeg != nil { 51 log.Warn(ctx, "segment already exists, skipping", "segmentID", *label) 52 return nil 53 } 54 } 55 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert)) 56 if err != nil { 57 return err 58 } 59 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest) 60 if err != nil { 61 return err 62 } 63 if meta.MetadataConfiguration != nil { 64 if meta.MetadataConfiguration.DistributionPolicy != nil { 65 allowedBroadcasters := meta.MetadataConfiguration.DistributionPolicy.AllowedBroadcasters 66 if allowedBroadcasters != nil { 67 if !slices.Contains(allowedBroadcasters, "*") && !slices.Contains(allowedBroadcasters, fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost)) { 68 return fmt.Errorf("broadcaster %s is not allowed to distribute content. Allowed broadcasters: %v", fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost), allowedBroadcasters) 69 } 70 } 71 } 72 } 73 mediaData, err := ParseSegmentMediaData(ctx, buf) 74 if err != nil { 75 return err 76 } 77 // special case for test signers that are only signed with a key 78 var repoDID string 79 var signingKeyDID string 80 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) { 81 signingKeyDID = meta.Creator 82 repoDID = meta.Creator 83 } else { 84 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model) 85 if err != nil { 86 return err 87 } 88 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 89 if err != nil { 90 return err 91 } 92 if signingKey == nil { 93 return fmt.Errorf("no signing key found for %s", pub.DIDKey()) 94 } 95 repoDID = repo.DID 96 signingKeyDID = signingKey.DID 97 } 98 99 err = mm.cli.StreamIsAllowed(repoDID) 100 if err != nil { 101 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err) 102 } 103 104 // Apply content filtering after metadata is parsed 105 if mm.cli.ContentFilters != nil { 106 if err := mm.applyContentFilters(ctx, meta); err != nil { 107 return err 108 } 109 } 110 111 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4") 112 if err != nil { 113 return err 114 } 115 defer fd.Close() 116 117 r := bytes.NewReader(buf) 118 if _, err := io.Copy(fd, r); err != nil { 119 return err 120 } 121 var deleteAfter *time.Time 122 if meta.DistributionPolicy != nil && meta.DistributionPolicy.ExpiresAt != nil { 123 deleteAfter = meta.DistributionPolicy.ExpiresAt 124 } 125 seg := &model.Segment{ 126 ID: *maniCert.Manifest.Label, 127 SigningKeyDID: signingKeyDID, 128 RepoDID: repoDID, 129 StartTime: meta.StartTime.Time(), 130 Title: meta.Title, 131 Size: len(buf), 132 MediaData: mediaData, 133 ContentWarnings: model.ContentWarningsSlice(meta.ContentWarnings), 134 ContentRights: meta.ContentRights, 135 DistributionPolicy: meta.DistributionPolicy, 136 DeleteAfter: deleteAfter, 137 } 138 mm.newSegmentSubsMutex.RLock() 139 defer mm.newSegmentSubsMutex.RUnlock() 140 not := &NewSegmentNotification{ 141 Segment: seg, 142 Data: buf, 143 Metadata: meta, 144 Local: local, 145 } 146 for _, ch := range mm.newSegmentSubs { 147 go func() { 148 select { 149 case ch <- not: 150 case <-ctx.Done(): 151 return 152 case <-time.After(1 * time.Minute): 153 log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *maniCert.Manifest.Label) 154 return 155 } 156 }() 157 } 158 aqt := aqtime.FromTime(meta.StartTime.Time()) 159 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *maniCert.Manifest.Label) 160 return nil 161} 162 163// applyContentFilters applies content filtering based on configured rules 164func (mm *MediaManager) applyContentFilters(ctx context.Context, meta *SegmentMetadata) error { 165 // Check content warnings (if enabled) 166 if mm.cli.ContentFilters.ContentWarnings.Enabled { 167 for _, warning := range meta.ContentWarnings { 168 if mm.isWarningBlocked(warning) { 169 reason := fmt.Sprintf("content warning blocked: %s", warning) 170 log.Log(ctx, "content filtered", 171 "reason", reason, 172 "filter_type", "content_warning", 173 "creator", meta.Creator, 174 "warning", warning) 175 return fmt.Errorf("content filtered: %s", reason) 176 } 177 } 178 } 179 180 // Check distribution policy (if enabled) 181 if mm.cli.ContentFilters.DistributionPolicy.Enabled && meta.DistributionPolicy != nil { 182 if meta.DistributionPolicy.ExpiresAt != nil { 183 if time.Now().After(*meta.DistributionPolicy.ExpiresAt) { 184 reason := fmt.Sprintf("distribution policy expired: segment expires at %s", meta.DistributionPolicy.ExpiresAt) 185 log.Log(ctx, "content filtered", 186 "reason", reason, 187 "filter_type", "distribution_policy", 188 "creator", meta.Creator, 189 "start_time", meta.StartTime, 190 "expires_at", *meta.DistributionPolicy.ExpiresAt) 191 return fmt.Errorf("content filtered: %s", reason) 192 } 193 } 194 } 195 196 return nil 197} 198 199// isWarningBlocked checks if a content warning is in the blocked list 200func (mm *MediaManager) isWarningBlocked(warning string) bool { 201 for _, blocked := range mm.cli.ContentFilters.ContentWarnings.BlockedWarnings { 202 if warning == blocked { 203 return true 204 } 205 } 206 return false 207}