Live video on the AT Protocol
at eli/docker-deployment-docs 196 lines 6.0 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "slices" 10 "strings" 11 "time" 12 13 "go.opentelemetry.io/otel" 14 "stream.place/streamplace/pkg/aqtime" 15 c2patypes "stream.place/streamplace/pkg/c2patypes" 16 "stream.place/streamplace/pkg/constants" 17 "stream.place/streamplace/pkg/crypto/signers" 18 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 19 "stream.place/streamplace/pkg/log" 20 "stream.place/streamplace/pkg/model" 21) 22 23type ManifestAndCert struct { 24 Manifest c2patypes.Manifest `json:"manifest"` 25 Cert string `json:"cert"` 26} 27 28func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader, local bool) error { 29 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4") 30 defer span.End() 31 buf, err := io.ReadAll(input) 32 if err != nil { 33 return err 34 } 35 var maniCert ManifestAndCert 36 maniStr, err := iroh_streamplace.GetManifestAndCert(buf) 37 if err != nil { 38 return err 39 } 40 err = json.Unmarshal([]byte(maniStr), &maniCert) 41 if err != nil { 42 return err 43 } 44 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert)) 45 if err != nil { 46 return err 47 } 48 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest) 49 if err != nil { 50 return err 51 } 52 if meta.MetadataConfiguration != nil { 53 if meta.MetadataConfiguration.DistributionPolicy != nil { 54 allowedBroadcasters := meta.MetadataConfiguration.DistributionPolicy.AllowedBroadcasters 55 if allowedBroadcasters != nil { 56 if !slices.Contains(allowedBroadcasters, "*") && !slices.Contains(allowedBroadcasters, fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost)) { 57 return fmt.Errorf("broadcaster %s is not allowed to distribute content. Allowed broadcasters: %v", fmt.Sprintf("did:web:%s", mm.cli.BroadcasterHost), allowedBroadcasters) 58 } 59 } 60 } 61 } 62 mediaData, err := ParseSegmentMediaData(ctx, buf) 63 if err != nil { 64 return err 65 } 66 // special case for test signers that are only signed with a key 67 var repoDID string 68 var signingKeyDID string 69 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) { 70 signingKeyDID = meta.Creator 71 repoDID = meta.Creator 72 } else { 73 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model) 74 if err != nil { 75 return err 76 } 77 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 78 if err != nil { 79 return err 80 } 81 if signingKey == nil { 82 return fmt.Errorf("no signing key found for %s", pub.DIDKey()) 83 } 84 repoDID = repo.DID 85 signingKeyDID = signingKey.DID 86 } 87 88 err = mm.cli.StreamIsAllowed(repoDID) 89 if err != nil { 90 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err) 91 } 92 93 // Apply content filtering after metadata is parsed 94 if mm.cli.ContentFilters != nil { 95 if err := mm.applyContentFilters(ctx, meta); err != nil { 96 return err 97 } 98 } 99 100 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4") 101 if err != nil { 102 return err 103 } 104 defer fd.Close() 105 106 r := bytes.NewReader(buf) 107 if _, err := io.Copy(fd, r); err != nil { 108 return err 109 } 110 var deleteAfter *time.Time 111 if meta.DistributionPolicy != nil && meta.DistributionPolicy.ExpiresAt != nil { 112 deleteAfter = meta.DistributionPolicy.ExpiresAt 113 } 114 seg := &model.Segment{ 115 ID: *maniCert.Manifest.Label, 116 SigningKeyDID: signingKeyDID, 117 RepoDID: repoDID, 118 StartTime: meta.StartTime.Time(), 119 Title: meta.Title, 120 Size: len(buf), 121 MediaData: mediaData, 122 ContentWarnings: model.ContentWarningsSlice(meta.ContentWarnings), 123 ContentRights: meta.ContentRights, 124 DistributionPolicy: meta.DistributionPolicy, 125 DeleteAfter: deleteAfter, 126 } 127 mm.newSegmentSubsMutex.RLock() 128 defer mm.newSegmentSubsMutex.RUnlock() 129 not := &NewSegmentNotification{ 130 Segment: seg, 131 Data: buf, 132 Metadata: meta, 133 Local: local, 134 } 135 for _, ch := range mm.newSegmentSubs { 136 go func() { 137 select { 138 case ch <- not: 139 case <-ctx.Done(): 140 return 141 case <-time.After(1 * time.Minute): 142 log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *maniCert.Manifest.Label) 143 return 144 } 145 }() 146 } 147 aqt := aqtime.FromTime(meta.StartTime.Time()) 148 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *maniCert.Manifest.Label) 149 return nil 150} 151 152// applyContentFilters applies content filtering based on configured rules 153func (mm *MediaManager) applyContentFilters(ctx context.Context, meta *SegmentMetadata) error { 154 // Check content warnings (if enabled) 155 if mm.cli.ContentFilters.ContentWarnings.Enabled { 156 for _, warning := range meta.ContentWarnings { 157 if mm.isWarningBlocked(warning) { 158 reason := fmt.Sprintf("content warning blocked: %s", warning) 159 log.Log(ctx, "content filtered", 160 "reason", reason, 161 "filter_type", "content_warning", 162 "creator", meta.Creator, 163 "warning", warning) 164 return fmt.Errorf("content filtered: %s", reason) 165 } 166 } 167 } 168 169 // Check distribution policy (if enabled) 170 if mm.cli.ContentFilters.DistributionPolicy.Enabled && meta.DistributionPolicy != nil { 171 if meta.DistributionPolicy.ExpiresAt != nil { 172 if time.Now().After(*meta.DistributionPolicy.ExpiresAt) { 173 reason := fmt.Sprintf("distribution policy expired: segment expires at %s", meta.DistributionPolicy.ExpiresAt) 174 log.Log(ctx, "content filtered", 175 "reason", reason, 176 "filter_type", "distribution_policy", 177 "creator", meta.Creator, 178 "start_time", meta.StartTime, 179 "expires_at", *meta.DistributionPolicy.ExpiresAt) 180 return fmt.Errorf("content filtered: %s", reason) 181 } 182 } 183 } 184 185 return nil 186} 187 188// isWarningBlocked checks if a content warning is in the blocked list 189func (mm *MediaManager) isWarningBlocked(warning string) bool { 190 for _, blocked := range mm.cli.ContentFilters.ContentWarnings.BlockedWarnings { 191 if warning == blocked { 192 return true 193 } 194 } 195 return false 196}