tangled
alpha
login
or
join now
stream.place
/
streamplace
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
promising
Eli Mallon
3 months ago
fcec4fc3
3e72d470
+88
-25
6 changed files
expand all
collapse all
unified
split
hack
compare-hash.sh
pkg
config
config.go
media
clip.go
segmenter.go
validate.go
rust
iroh-streamplace
src
c2pa.rs
+4
hack/compare-hash.sh
···
107
echo " meld $(realpath 1.frames) $(realpath 2.frames)"
108
echo "Compare frame headers:"
109
echo " meld $(realpath 1.trace_headers) $(realpath 2.trace_headers)"
0
0
0
0
110
exit 1
···
107
echo " meld $(realpath 1.frames) $(realpath 2.frames)"
108
echo "Compare frame headers:"
109
echo " meld $(realpath 1.trace_headers) $(realpath 2.trace_headers)"
110
+
echo "Compare hex hashes:"
111
+
echo " meld $(realpath 1.xxd) $(realpath 2.xxd)"
112
+
echo "Compare framemd5 hashes:"
113
+
echo " meld $(realpath 1.md5) $(realpath 2.md5)"
114
exit 1
+2
pkg/config/config.go
···
134
Replicators []string
135
WebsocketURL string
136
BehindHTTPSProxy bool
0
137
}
138
139
// ContentFilters represents the content filtering configuration
···
219
fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging")
220
fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting")
221
fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug")
0
222
cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with")
223
fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)")
224
fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay")
···
134
Replicators []string
135
WebsocketURL string
136
BehindHTTPSProxy bool
137
+
SegmentDebugDir string
138
}
139
140
// ContentFilters represents the content filtering configuration
···
220
fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging")
221
fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting")
222
fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug")
223
+
fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to")
224
cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with")
225
fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)")
226
fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay")
+5
-9
pkg/media/clip.go
···
18
defer cancel()
19
20
pipelineSlice := []string{
21
-
fmt.Sprintf("mp4mux name=muxer faststart=true interleave-bytes=%d interleave-time=%d ! appsink sync=false name=mp4sink", InterleaveBytes, InterleaveTime),
22
-
"capsfilter caps=video/x-h264,parsed=true name=videoqueue ! queue ! muxer.",
23
"opusparse name=audioparse ! queue ! muxer.",
24
}
25
···
93
return fmt.Errorf("failed to get mp4sink element: %w", err)
94
}
95
96
-
eos := make(chan struct{})
97
-
98
appSink := app.SinkFromElement(mp4Sink)
99
appSink.SetCallbacks(&app.SinkCallbacks{
100
NewSampleFunc: WriterNewSample(ctx, w),
101
-
EOSFunc: func(sink *app.Sink) {
102
-
close(eos)
103
-
},
104
})
105
106
// Start the pipeline
···
117
118
// Handle bus messages
119
err = HandleBusMessages(ctx, pipeline)
120
-
121
-
<-eos
0
122
123
if err != nil {
124
return fmt.Errorf("pipeline error: %w", err)
···
18
defer cancel()
19
20
pipelineSlice := []string{
21
+
fmt.Sprintf("mp4mux name=muxer faststart=true interleave-bytes=%d interleave-time=%d movie-timescale=60000 trak-timescale=60000 ! appsink sync=false name=mp4sink", InterleaveBytes, InterleaveTime),
22
+
"capsfilter caps=video/x-h264,parsed=true name=videoqueue ! queue ! muxer.",
23
"opusparse name=audioparse ! queue ! muxer.",
24
}
25
···
93
return fmt.Errorf("failed to get mp4sink element: %w", err)
94
}
95
0
0
96
appSink := app.SinkFromElement(mp4Sink)
97
appSink.SetCallbacks(&app.SinkCallbacks{
98
NewSampleFunc: WriterNewSample(ctx, w),
0
0
0
99
})
100
101
// Start the pipeline
···
112
113
// Handle bus messages
114
err = HandleBusMessages(ctx, pipeline)
115
+
if err != nil {
116
+
return fmt.Errorf("failed to handle bus messages: %w", err)
117
+
}
118
119
if err != nil {
120
return fmt.Errorf("pipeline error: %w", err)
+64
-3
pkg/media/segmenter.go
···
6
"fmt"
7
"io"
8
"os"
0
0
9
"strings"
10
"time"
11
12
"github.com/go-gst/go-gst/gst"
13
"github.com/go-gst/go-gst/gst/app"
0
14
"stream.place/streamplace/pkg/log"
15
)
16
···
70
if err != nil {
71
panic("error setting interleave-time" + err.Error())
72
}
0
0
0
0
0
0
0
0
0
0
0
0
73
})
74
if err != nil {
75
return nil, fmt.Errorf("failed to connect muxer-added handler: %w", err)
···
109
err := cb(ctx, bs, now)
110
if err != nil {
111
log.Error(ctx, "error signing segment", "error", err)
0
112
return
113
}
114
close(mySegCh)
···
123
return elem, nil
124
}
125
0
0
126
func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) {
127
return SegmentElem(ctx, func(ctx context.Context, bs []byte, now int64) error {
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
128
if mm.cli.SmearAudio {
129
smearedBuf := &bytes.Buffer{}
130
err := SmearAudioTimestamps(ctx, bytes.NewReader(bs), smearedBuf)
···
133
}
134
bs = smearedBuf.Bytes()
135
}
136
-
signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now)
137
if err != nil {
138
-
return err
139
}
140
-
return mm.ValidateMP4(ctx, bytes.NewReader(signedBs), true)
0
0
0
0
0
141
})
142
}
143
···
6
"fmt"
7
"io"
8
"os"
9
+
"path/filepath"
10
+
"slices"
11
"strings"
12
"time"
13
14
"github.com/go-gst/go-gst/gst"
15
"github.com/go-gst/go-gst/gst/app"
16
+
"stream.place/streamplace/pkg/aqtime"
17
"stream.place/streamplace/pkg/log"
18
)
19
···
73
if err != nil {
74
panic("error setting interleave-time" + err.Error())
75
}
76
+
err = muxEle.SetProperty("faststart", true)
77
+
if err != nil {
78
+
panic("error setting faststart" + err.Error())
79
+
}
80
+
err = muxEle.SetProperty("movie-timescale", uint(60000))
81
+
if err != nil {
82
+
panic("error setting movie-timescale" + err.Error())
83
+
}
84
+
err = muxEle.SetProperty("trak-timescale", uint(60000))
85
+
if err != nil {
86
+
panic("error setting trak-timescale" + err.Error())
87
+
}
88
})
89
if err != nil {
90
return nil, fmt.Errorf("failed to connect muxer-added handler: %w", err)
···
124
err := cb(ctx, bs, now)
125
if err != nil {
126
log.Error(ctx, "error signing segment", "error", err)
127
+
elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "Error signing segment", err.Error())
128
return
129
}
130
close(mySegCh)
···
139
return elem, nil
140
}
141
142
+
var MaxSegmentTries = 10
143
+
144
func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) {
145
return SegmentElem(ctx, func(ctx context.Context, bs []byte, now int64) error {
146
+
signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now)
147
+
if err != nil {
148
+
return fmt.Errorf("error calling SignMP4: %w", err)
149
+
}
150
+
previousBs := []byte{}
151
+
currentBs := signedBs
152
+
i := 0
153
+
for i = 0; i <= MaxSegmentTries; i++ {
154
+
if slices.Compare(previousBs, currentBs) == 0 {
155
+
break
156
+
}
157
+
if mm.cli.SegmentDebugDir != "" {
158
+
mydir := filepath.Join(mm.cli.SegmentDebugDir, ms.Streamer())
159
+
err := os.MkdirAll(mydir, 0755)
160
+
if err != nil {
161
+
return fmt.Errorf("failed to create debug directory: %w", err)
162
+
}
163
+
aqt := aqtime.FromMillis(now)
164
+
outFile := filepath.Join(mm.cli.SegmentDebugDir, fmt.Sprintf("%s-attempt-%03d.mp4", aqt.FileSafeString(), i))
165
+
err = os.WriteFile(outFile, currentBs, 0644)
166
+
if err != nil {
167
+
return fmt.Errorf("failed to write debug file: %w", err)
168
+
}
169
+
log.Log(ctx, "wrote debug file", "path", outFile)
170
+
}
171
+
buf := bytes.Buffer{}
172
+
err := CombineSegmentsUnsigned(ctx, []io.ReadSeeker{bytes.NewReader(currentBs)}, &buf)
173
+
if err != nil {
174
+
return fmt.Errorf("failed to attempt segment convergence: %w", err)
175
+
}
176
+
previousBs = currentBs
177
+
currentBs = buf.Bytes()
178
+
}
179
+
if slices.Compare(previousBs, currentBs) != 0 {
180
+
return fmt.Errorf("failed to converge segment after %d tries", MaxSegmentTries)
181
+
}
182
+
bs = currentBs
183
+
log.Log(ctx, "converged segments", "tries", i, "size", len(bs))
184
if mm.cli.SmearAudio {
185
smearedBuf := &bytes.Buffer{}
186
err := SmearAudioTimestamps(ctx, bytes.NewReader(bs), smearedBuf)
···
189
}
190
bs = smearedBuf.Bytes()
191
}
192
+
signedBs, err = ms.SignMP4(ctx, bytes.NewReader(bs), now)
193
if err != nil {
194
+
return fmt.Errorf("error calling SignMP4: %w", err)
195
}
196
+
log.Log(ctx, "signed segment", "size", len(signedBs))
197
+
err = mm.ValidateMP4(ctx, bytes.NewReader(signedBs), true)
198
+
if err != nil {
199
+
return fmt.Errorf("error validating just-signed segment: %w", err)
200
+
}
201
+
return nil
202
})
203
}
204
+12
-13
pkg/media/validate.go
···
33
defer span.End()
34
buf, err := io.ReadAll(input)
35
if err != nil {
36
-
return err
37
}
38
39
valid, err := ValidateMP4Media(ctx, buf)
40
if err != nil {
41
-
return err
42
}
43
meta := valid.Meta
44
pub := valid.Pub
···
49
if label != nil && mm.model != nil {
50
oldSeg, err := mm.model.GetSegment(*label)
51
if err != nil {
52
-
return err
53
}
54
if oldSeg != nil {
55
log.Warn(ctx, "segment already exists, skipping", "segmentID", *label)
···
217
}
218
err = json.Unmarshal([]byte(maniStr), &maniCert)
219
if err != nil {
220
-
return nil, err
221
}
222
activeManifest := maniCert.ValidationResults.ActiveManifest
223
-
if activeManifest == nil {
224
-
return nil, fmt.Errorf("no active manifest found")
225
-
}
226
-
if activeManifest.Failure == nil {
227
-
return nil, fmt.Errorf("active manifest failure array not found?!")
228
-
}
229
-
if len(activeManifest.Failure) > 0 {
230
-
bs, _ := json.Marshal(activeManifest.Failure)
231
-
return nil, fmt.Errorf("active manifest has failures: %s", string(bs))
232
}
233
pub, err := signers.ParseES256KCert([]byte(maniCert.Cert))
234
if err != nil {
···
33
defer span.End()
34
buf, err := io.ReadAll(input)
35
if err != nil {
36
+
return fmt.Errorf("failed to read input: %w", err)
37
}
38
39
valid, err := ValidateMP4Media(ctx, buf)
40
if err != nil {
41
+
return fmt.Errorf("failed to validate MP4 media: %w", err)
42
}
43
meta := valid.Meta
44
pub := valid.Pub
···
49
if label != nil && mm.model != nil {
50
oldSeg, err := mm.model.GetSegment(*label)
51
if err != nil {
52
+
return fmt.Errorf("failed to get old segment: %w", err)
53
}
54
if oldSeg != nil {
55
log.Warn(ctx, "segment already exists, skipping", "segmentID", *label)
···
217
}
218
err = json.Unmarshal([]byte(maniStr), &maniCert)
219
if err != nil {
220
+
return nil, fmt.Errorf("failed to unmarshal manifest and cert: %w", err)
221
}
222
activeManifest := maniCert.ValidationResults.ActiveManifest
223
+
if activeManifest != nil {
224
+
if activeManifest.Failure == nil {
225
+
return nil, fmt.Errorf("active manifest failure array not found?!")
226
+
}
227
+
if len(activeManifest.Failure) > 0 {
228
+
bs, _ := json.Marshal(activeManifest.Failure)
229
+
return nil, fmt.Errorf("active manifest has failures: %s", string(bs))
230
+
}
0
231
}
232
pub, err := signers.ParseES256KCert([]byte(maniCert.Cert))
233
if err != nil {
+1
rust/iroh-streamplace/src/c2pa.rs
···
32
"manifest": manifest,
33
"cert": cert_chain,
34
"validation_results": reader.validation_results(),
0
35
});
36
37
return Ok(result.to_string());
···
32
"manifest": manifest,
33
"cert": cert_chain,
34
"validation_results": reader.validation_results(),
35
+
"validation_state": reader.validation_state(),
36
});
37
38
return Ok(result.to_string());