Live video on the AT Protocol
at eli/buffer-thumbnails 112 lines 3.0 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "strings" 10 11 "go.opentelemetry.io/otel" 12 "stream.place/streamplace/pkg/aqtime" 13 c2patypes "stream.place/streamplace/pkg/c2patypes" 14 "stream.place/streamplace/pkg/constants" 15 "stream.place/streamplace/pkg/crypto/signers" 16 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 17 "stream.place/streamplace/pkg/log" 18 "stream.place/streamplace/pkg/model" 19) 20 21type ManifestAndCert struct { 22 Manifest c2patypes.Manifest `json:"manifest"` 23 Cert string `json:"cert"` 24} 25 26func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader) error { 27 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4") 28 defer span.End() 29 buf, err := io.ReadAll(input) 30 if err != nil { 31 return err 32 } 33 var maniCert ManifestAndCert 34 maniStr, rustErr := iroh_streamplace.GetManifestAndCert(buf) 35 if rustErr.AsError() != nil { 36 return rustErr.AsError() 37 } 38 err = json.Unmarshal([]byte(maniStr), &maniCert) 39 if err != nil { 40 return err 41 } 42 pub, err := signers.ParseES256KCert([]byte(maniCert.Cert)) 43 if err != nil { 44 return err 45 } 46 meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest) 47 if err != nil { 48 return err 49 } 50 mediaData, err := ParseSegmentMediaData(ctx, buf) 51 if err != nil { 52 return err 53 } 54 // special case for test signers that are only signed with a key 55 var repoDID string 56 var signingKeyDID string 57 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) { 58 signingKeyDID = meta.Creator 59 repoDID = meta.Creator 60 } else { 61 repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model) 62 if err != nil { 63 return err 64 } 65 signingKey, err := mm.model.GetSigningKey(ctx, pub.DIDKey(), repo.DID) 66 if err != nil { 67 return err 68 } 69 if signingKey == nil { 70 return fmt.Errorf("no signing key found for %s", pub.DIDKey()) 71 } 72 repoDID = repo.DID 73 signingKeyDID = signingKey.DID 74 } 75 76 err = mm.cli.StreamIsAllowed(repoDID) 77 if err != nil { 78 return fmt.Errorf("got valid segment, but user %s is not allowed: %w", repoDID, err) 79 } 80 fd, err := mm.cli.SegmentFileCreate(repoDID, meta.StartTime, "mp4") 81 if err != nil { 82 return err 83 } 84 defer fd.Close() 85 go mm.replicator.NewSegment(ctx, buf) 86 r := bytes.NewReader(buf) 87 if _, err := io.Copy(fd, r); err != nil { 88 return err 89 } 90 seg := &model.Segment{ 91 ID: *maniCert.Manifest.Label, 92 SigningKeyDID: signingKeyDID, 93 RepoDID: repoDID, 94 StartTime: meta.StartTime.Time(), 95 Title: meta.Title, 96 Size: len(buf), 97 MediaData: mediaData, 98 } 99 mm.newSegmentSubsMutex.RLock() 100 defer mm.newSegmentSubsMutex.RUnlock() 101 not := &NewSegmentNotification{ 102 Segment: seg, 103 Data: buf, 104 Metadata: meta, 105 } 106 for _, ch := range mm.newSegmentSubs { 107 go func() { ch <- not }() 108 } 109 aqt := aqtime.FromTime(meta.StartTime.Time()) 110 log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *maniCert.Manifest.Label) 111 return nil 112}