Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/fix-context-recursion 187 lines 4.8 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "crypto" 7 "crypto/ecdsa" 8 "encoding/json" 9 "fmt" 10 "io" 11 "path/filepath" 12 "time" 13 14 "git.stream.place/streamplace/c2pa-go/pkg/c2pa" 15 "go.opentelemetry.io/otel" 16 "stream.place/streamplace/pkg/aqio" 17 "stream.place/streamplace/pkg/aqtime" 18 "stream.place/streamplace/pkg/atproto" 19 "stream.place/streamplace/pkg/config" 20 "stream.place/streamplace/pkg/crypto/aqpub" 21 "stream.place/streamplace/pkg/crypto/signers" 22 "stream.place/streamplace/pkg/log" 23 "stream.place/streamplace/pkg/spmetrics" 24) 25 26type MediaSigner interface { 27 SignMP4(ctx context.Context, input io.ReadSeeker, start int64) ([]byte, error) 28 Pub() aqpub.Pub 29 Streamer() string 30 DID() string 31} 32 33type MediaSignerLocal struct { 34 StreamerName string 35 Signer crypto.Signer 36 AQPub aqpub.Pub 37 Cert []byte 38 TAURL string 39 did string 40} 41 42func prepareCert(ctx context.Context, cli *config.CLI, signer crypto.Signer) ([]byte, string, error) { 43 pub, err := aqpub.FromPublicKey(signer.Public().(*ecdsa.PublicKey)) 44 if err != nil { 45 return nil, "", err 46 } 47 fSlice := []string{pub.String(), CertFile} 48 exists, err := cli.DataFileExists(fSlice) 49 if err != nil { 50 return nil, "", err 51 } 52 if !exists { 53 cert, err := signers.GenerateES256KCert(signer) 54 if err != nil { 55 return nil, "", err 56 } 57 r := bytes.NewReader(cert) 58 err = cli.DataFileWrite(fSlice, r, false) 59 if err != nil { 60 return nil, "", err 61 } 62 log.Log(ctx, "wrote new media signing certificate", "file", filepath.Join(pub.String(), CertFile)) 63 } 64 buf := bytes.Buffer{} 65 if err := cli.DataFileRead(fSlice, &buf); err != nil { 66 return nil, "", err 67 } 68 69 fPath := cli.DataFilePath(fSlice) 70 cert := buf.Bytes() 71 return cert, fPath, nil 72} 73 74func MakeMediaSigner(ctx context.Context, cli *config.CLI, streamer string, signer crypto.Signer) (MediaSigner, error) { 75 cert, _, err := prepareCert(ctx, cli, signer) 76 if err != nil { 77 return nil, err 78 } 79 pub, err := aqpub.FromPublicKey(signer.Public().(*ecdsa.PublicKey)) 80 if err != nil { 81 return nil, err 82 } 83 did, err := atproto.ParsePubKey(signer.Public().(*ecdsa.PublicKey)) 84 if err != nil { 85 return nil, err 86 } 87 return &MediaSignerLocal{ 88 Signer: signer, 89 Cert: cert, 90 StreamerName: streamer, 91 TAURL: cli.TAURL, 92 AQPub: pub, 93 did: did.DIDKey(), 94 }, nil 95} 96 97func (ms *MediaSignerLocal) Streamer() string { 98 return ms.StreamerName 99} 100 101func (ms *MediaSignerLocal) SignMP4(ctx context.Context, input io.ReadSeeker, start int64) ([]byte, error) { 102 startTime := time.Now() 103 ctx, span := otel.Tracer("signer").Start(ctx, "SignMP4") 104 defer span.End() 105 title := "livestream" 106 mani := obj{ 107 "title": fmt.Sprintf("Livestream Segment at %s", aqtime.FromMillis(start)), 108 "assertions": []obj{ 109 { 110 "label": "c2pa.actions", 111 "data": obj{ 112 "actions": []obj{ 113 {"action": "c2pa.created"}, 114 {"action": "c2pa.published"}, 115 }, 116 }, 117 }, 118 { 119 "label": StreamplaceMetadata, 120 "data": obj{ 121 "@context": obj{ 122 "dc": "http://purl.org/dc/elements/1.1/", 123 }, 124 "dc:creator": ms.StreamerName, 125 "dc:title": []string{title}, 126 "dc:date": []string{aqtime.FromMillis(start).String()}, 127 }, 128 }, 129 }, 130 } 131 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_MarshalManifest") 132 manifestBs, err := json.Marshal(mani) 133 if err != nil { 134 return nil, fmt.Errorf("failed to marshal manifest: %w", err) 135 } 136 var manifest c2pa.ManifestDefinition 137 err = json.Unmarshal(manifestBs, &manifest) 138 if err != nil { 139 return nil, fmt.Errorf("failed to unmarshal manifest: %w", err) 140 } 141 span.End() 142 143 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_GetSigningAlgorithm") 144 alg, err := c2pa.GetSigningAlgorithm(string(c2pa.ES256K)) 145 if err != nil { 146 return nil, fmt.Errorf("failed to get signing algorithm: %w", err) 147 } 148 span.End() 149 150 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_NewBuilder") 151 b, err := c2pa.NewBuilder(&manifest, &c2pa.BuilderParams{ 152 Cert: ms.Cert, 153 Signer: ms.Signer, 154 Algorithm: alg, 155 TAURL: ms.TAURL, 156 }) 157 if err != nil { 158 return nil, fmt.Errorf("failed to create C2PA builder: %w", err) 159 } 160 span.End() 161 162 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_Sign") 163 output := &aqio.ReadWriteSeeker{} 164 err = b.Sign(input, output, "video/mp4") 165 if err != nil { 166 return nil, fmt.Errorf("failed to sign MP4: %w", err) 167 } 168 span.End() 169 170 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_OutputBytes") 171 defer ctx.Done() 172 bs, err := output.Bytes() 173 if err != nil { 174 return nil, fmt.Errorf("failed to get output bytes: %w", err) 175 } 176 span.End() 177 spmetrics.SigningDuration.WithLabelValues(ms.StreamerName).Observe(float64(time.Since(startTime).Milliseconds())) 178 return bs, nil 179} 180 181func (ms *MediaSignerLocal) Pub() aqpub.Pub { 182 return ms.AQPub 183} 184 185func (ms *MediaSignerLocal) DID() string { 186 return ms.did 187}