Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "crypto"
7 "crypto/ecdsa"
8 "encoding/json"
9 "fmt"
10 "io"
11 "path/filepath"
12 "time"
13
14 "git.stream.place/streamplace/c2pa-go/pkg/c2pa"
15 "go.opentelemetry.io/otel"
16 "stream.place/streamplace/pkg/aqio"
17 "stream.place/streamplace/pkg/aqtime"
18 "stream.place/streamplace/pkg/atproto"
19 "stream.place/streamplace/pkg/config"
20 "stream.place/streamplace/pkg/crypto/aqpub"
21 "stream.place/streamplace/pkg/crypto/signers"
22 "stream.place/streamplace/pkg/log"
23 "stream.place/streamplace/pkg/spmetrics"
24)
25
26type MediaSigner interface {
27 SignMP4(ctx context.Context, input io.ReadSeeker, start int64) ([]byte, error)
28 Pub() aqpub.Pub
29 Streamer() string
30 DID() string
31}
32
33type MediaSignerLocal struct {
34 StreamerName string
35 Signer crypto.Signer
36 AQPub aqpub.Pub
37 Cert []byte
38 TAURL string
39 did string
40}
41
42func prepareCert(ctx context.Context, cli *config.CLI, signer crypto.Signer) ([]byte, string, error) {
43 pub, err := aqpub.FromPublicKey(signer.Public().(*ecdsa.PublicKey))
44 if err != nil {
45 return nil, "", err
46 }
47 fSlice := []string{pub.String(), CertFile}
48 exists, err := cli.DataFileExists(fSlice)
49 if err != nil {
50 return nil, "", err
51 }
52 if !exists {
53 cert, err := signers.GenerateES256KCert(signer)
54 if err != nil {
55 return nil, "", err
56 }
57 r := bytes.NewReader(cert)
58 err = cli.DataFileWrite(fSlice, r, false)
59 if err != nil {
60 return nil, "", err
61 }
62 log.Log(ctx, "wrote new media signing certificate", "file", filepath.Join(pub.String(), CertFile))
63 }
64 buf := bytes.Buffer{}
65 if err := cli.DataFileRead(fSlice, &buf); err != nil {
66 return nil, "", err
67 }
68
69 fPath := cli.DataFilePath(fSlice)
70 cert := buf.Bytes()
71 return cert, fPath, nil
72}
73
74func MakeMediaSigner(ctx context.Context, cli *config.CLI, streamer string, signer crypto.Signer) (MediaSigner, error) {
75 cert, _, err := prepareCert(ctx, cli, signer)
76 if err != nil {
77 return nil, err
78 }
79 pub, err := aqpub.FromPublicKey(signer.Public().(*ecdsa.PublicKey))
80 if err != nil {
81 return nil, err
82 }
83 did, err := atproto.ParsePubKey(signer.Public().(*ecdsa.PublicKey))
84 if err != nil {
85 return nil, err
86 }
87 return &MediaSignerLocal{
88 Signer: signer,
89 Cert: cert,
90 StreamerName: streamer,
91 TAURL: cli.TAURL,
92 AQPub: pub,
93 did: did.DIDKey(),
94 }, nil
95}
96
97func (ms *MediaSignerLocal) Streamer() string {
98 return ms.StreamerName
99}
100
101func (ms *MediaSignerLocal) SignMP4(ctx context.Context, input io.ReadSeeker, start int64) ([]byte, error) {
102 startTime := time.Now()
103 ctx, span := otel.Tracer("signer").Start(ctx, "SignMP4")
104 defer span.End()
105 title := "livestream"
106 mani := obj{
107 "title": fmt.Sprintf("Livestream Segment at %s", aqtime.FromMillis(start)),
108 "assertions": []obj{
109 {
110 "label": "c2pa.actions",
111 "data": obj{
112 "actions": []obj{
113 {"action": "c2pa.created"},
114 {"action": "c2pa.published"},
115 },
116 },
117 },
118 {
119 "label": StreamplaceMetadata,
120 "data": obj{
121 "@context": obj{
122 "dc": "http://purl.org/dc/elements/1.1/",
123 },
124 "dc:creator": ms.StreamerName,
125 "dc:title": []string{title},
126 "dc:date": []string{aqtime.FromMillis(start).String()},
127 },
128 },
129 },
130 }
131 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_MarshalManifest")
132 manifestBs, err := json.Marshal(mani)
133 if err != nil {
134 return nil, fmt.Errorf("failed to marshal manifest: %w", err)
135 }
136 var manifest c2pa.ManifestDefinition
137 err = json.Unmarshal(manifestBs, &manifest)
138 if err != nil {
139 return nil, fmt.Errorf("failed to unmarshal manifest: %w", err)
140 }
141 span.End()
142
143 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_GetSigningAlgorithm")
144 alg, err := c2pa.GetSigningAlgorithm(string(c2pa.ES256K))
145 if err != nil {
146 return nil, fmt.Errorf("failed to get signing algorithm: %w", err)
147 }
148 span.End()
149
150 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_NewBuilder")
151 b, err := c2pa.NewBuilder(&manifest, &c2pa.BuilderParams{
152 Cert: ms.Cert,
153 Signer: ms.Signer,
154 Algorithm: alg,
155 TAURL: ms.TAURL,
156 })
157 if err != nil {
158 return nil, fmt.Errorf("failed to create C2PA builder: %w", err)
159 }
160 span.End()
161
162 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_Sign")
163 output := &aqio.ReadWriteSeeker{}
164 err = b.Sign(input, output, "video/mp4")
165 if err != nil {
166 return nil, fmt.Errorf("failed to sign MP4: %w", err)
167 }
168 span.End()
169
170 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_OutputBytes")
171 defer ctx.Done()
172 bs, err := output.Bytes()
173 if err != nil {
174 return nil, fmt.Errorf("failed to get output bytes: %w", err)
175 }
176 span.End()
177 spmetrics.SigningDuration.WithLabelValues(ms.StreamerName).Observe(float64(time.Since(startTime).Milliseconds()))
178 return bs, nil
179}
180
181func (ms *MediaSignerLocal) Pub() aqpub.Pub {
182 return ms.AQPub
183}
184
185func (ms *MediaSignerLocal) DID() string {
186 return ms.did
187}