Live video on the AT Protocol

determinism: introduce deterministic muxing

+1056 -221
+96 -2
Cargo.lock
··· 615 615 ] 616 616 617 617 [[package]] 618 + name = "c2pa" 619 + version = "0.58.0" 620 + source = "git+https://github.com/streamplace/c2pa-rs.git?rev=544825abfaf9e588813e18dba70c8d5afd039d46#544825abfaf9e588813e18dba70c8d5afd039d46" 621 + dependencies = [ 622 + "asn1-rs", 623 + "async-generic", 624 + "async-recursion", 625 + "async-trait", 626 + "atree", 627 + "base64 0.22.1", 628 + "bcder", 629 + "byteorder", 630 + "byteordered", 631 + "bytes", 632 + "chrono", 633 + "ciborium", 634 + "config", 635 + "console_log", 636 + "const-hex", 637 + "const-oid 0.9.6", 638 + "conv", 639 + "coset", 640 + "der 0.7.10", 641 + "ed25519-dalek 2.2.0", 642 + "env_logger", 643 + "extfmt", 644 + "getrandom 0.2.16", 645 + "hex", 646 + "hex-literal", 647 + "http", 648 + "id3", 649 + "img-parts", 650 + "iref", 651 + "jfifdump", 652 + "js-sys", 653 + "lazy_static", 654 + "log", 655 + "memchr", 656 + "mp4", 657 + "nom", 658 + "non-empty-string", 659 + "nonempty-collections", 660 + "num-bigint-dig", 661 + "openssl", 662 + "pem", 663 + "pkcs1", 664 + "pkcs8 0.10.2", 665 + "png_pong", 666 + "quick-xml", 667 + "rand 0.8.5", 668 + "rand_chacha 0.3.1", 669 + "rand_core 0.9.3", 670 + "range-set", 671 + "rasn", 672 + "rasn-cms", 673 + "rasn-ocsp", 674 + "rasn-pkix", 675 + "regex", 676 + "reqwest", 677 + "riff", 678 + "ring", 679 + "rsa", 680 + "serde", 681 + "serde-transcode", 682 + "serde-wasm-bindgen", 683 + "serde_bytes", 684 + "serde_cbor", 685 + "serde_derive", 686 + "serde_json", 687 + "serde_with", 688 + "sha1", 689 + "sha2 0.10.9", 690 + "spki 0.7.3", 691 + "static-iref", 692 + "tempfile", 693 + "thiserror 2.0.16", 694 + "toml 0.8.23", 695 + "treeline", 696 + "ureq", 697 + "url", 698 + "uuid", 699 + "wasm-bindgen", 700 + "wasm-bindgen-futures", 701 + "web-sys", 702 + "web-time", 703 + "windows-core", 704 + "wstd", 705 + "x509-certificate", 706 + "x509-parser", 707 + "zeroize", 708 + "zip", 709 + ] 710 + 711 + [[package]] 618 712 name = "camino" 619 713 version = "1.1.11" 620 714 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1429 1523 version = "0.0.0" 1430 1524 dependencies = [ 1431 1525 "anyhow", 1432 - "c2pa", 1526 + "c2pa 0.58.0 (git+https://github.com/hyphacoop/c2pa-rs.git?rev=1b84d40219b27340a30fc309250e774e8a7b7761)", 1433 1527 "schemars 0.8.22", 1434 1528 "serde", 1435 1529 "serde_json", ··· 2641 2735 "anyhow", 2642 2736 "async-trait", 2643 2737 "bytes", 2644 - "c2pa", 2738 + "c2pa 0.58.0 (git+https://github.com/streamplace/c2pa-rs.git?rev=544825abfaf9e588813e18dba70c8d5afd039d46)", 2645 2739 "hex", 2646 2740 "iroh", 2647 2741 "iroh-base",
+12
hack/deterministic-mux.sh
··· 1 + #!/bin/bash 2 + 3 + set -euo pipefail 4 + 5 + TMPDIR=$(mktemp -d) 6 + SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) 7 + $SCRIPT_DIR/../build-linux-amd64/streamplace clip --out "$TMPDIR/combined1.mp4" "$@" 8 + sleep 2 9 + $SCRIPT_DIR/../build-linux-amd64/streamplace clip --out "$TMPDIR/combined2.mp4" "$@" 10 + xxd "$TMPDIR/combined1.mp4" > "$TMPDIR/combined1.mp4.xxd" 11 + xxd "$TMPDIR/combined2.mp4" > "$TMPDIR/combined2.mp4.xxd" 12 + diff --color=always "$TMPDIR/combined1.mp4.xxd" "$TMPDIR/combined2.mp4.xxd"
+1 -6
pkg/api/stream_key.go
··· 82 82 return nil, err 83 83 } 84 84 85 - var mediaSigner media.MediaSigner 86 - if a.CLI.ExternalSigning { 87 - mediaSigner, err = media.MakeMediaSignerExt(ctx, a.CLI, did, addrBytes, a.Model) 88 - } else { 89 - mediaSigner, err = media.MakeMediaSigner(ctx, a.CLI, did, signer, a.Model) 90 - } 85 + mediaSigner, err := media.MakeMediaSigner(ctx, a.CLI, did, signer, a.Model) 91 86 if err != nil { 92 87 return nil, fmt.Errorf("invalid authorization key (not valid secp256k1): %w", err) 93 88 }
+14
pkg/cmd/streamplace.go
··· 141 141 return Clip(ctx, fs.Args(), *out) 142 142 } 143 143 144 + if len(os.Args) > 1 && os.Args[1] == "segment" { 145 + cli := config.CLI{Build: build} 146 + fs := cli.NewFlagSet("streamplace clip") 147 + out := fs.String("out-dir", "", "output directory") 148 + 149 + err := cli.Parse(fs, os.Args[2:]) 150 + if err != nil { 151 + return err 152 + } 153 + ctx := context.Background() 154 + ctx = log.WithDebugValue(ctx, cli.Debug) 155 + return media.SegmentFile(ctx, fs.Args()[0], *out) 156 + } 157 + 144 158 if len(os.Args) > 1 && os.Args[1] == "self-test" { 145 159 err := media.RunSelfTest(context.Background()) 146 160 if err != nil {
+3 -2
pkg/config/config.go
··· 195 195 fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout") 196 196 fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters") 197 197 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 198 - fs.Bool("insecure", false, "DEPRECATED, does nothing.") 199 198 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 200 199 fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)") 201 200 fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate") 202 201 fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters") 203 202 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 204 203 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 205 - fs.BoolVar(&cli.ExternalSigning, "external-signing", false, "enable external signing via exec (prevents potential memory leak)") 206 204 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 207 205 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 208 206 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip") ··· 229 227 cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)") 230 228 fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)") 231 229 fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS") 230 + 231 + fs.Bool("external-signing", true, "DEPRECATED, does nothing.") 232 + fs.Bool("insecure", false, "DEPRECATED, does nothing.") 232 233 233 234 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 234 235 _ = starter.NewLivepeerConfig(lpFlags)
+112
pkg/iroh/generated/iroh_streamplace/iroh_streamplace.go
··· 363 363 } 364 364 { 365 365 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 366 + return C.uniffi_iroh_streamplace_checksum_func_get_manifests() 367 + }) 368 + if checksum != 17 { 369 + // If this happens try cleaning and rebuilding your project 370 + panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_func_get_manifests: UniFFI API checksum mismatch") 371 + } 372 + } 373 + { 374 + checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 366 375 return C.uniffi_iroh_streamplace_checksum_func_init_logging() 367 376 }) 368 377 if checksum != 40911 { ··· 390 399 } 391 400 { 392 401 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 402 + return C.uniffi_iroh_streamplace_checksum_func_resign() 403 + }) 404 + if checksum != 33363 { 405 + // If this happens try cleaning and rebuilding your project 406 + panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_func_resign: UniFFI API checksum mismatch") 407 + } 408 + } 409 + { 410 + checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 393 411 return C.uniffi_iroh_streamplace_checksum_func_sign() 394 412 }) 395 413 if checksum != 23786 { 396 414 // If this happens try cleaning and rebuilding your project 397 415 panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_func_sign: UniFFI API checksum mismatch") 416 + } 417 + } 418 + { 419 + checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 420 + return C.uniffi_iroh_streamplace_checksum_func_sign_with_ingredients() 421 + }) 422 + if checksum != 63680 { 423 + // If this happens try cleaning and rebuilding your project 424 + panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_func_sign_with_ingredients: UniFFI API checksum mismatch") 398 425 } 399 426 } 400 427 { ··· 4540 4567 } 4541 4568 } 4542 4569 4570 + type FfiConverterSequenceBytes struct{} 4571 + 4572 + var FfiConverterSequenceBytesINSTANCE = FfiConverterSequenceBytes{} 4573 + 4574 + func (c FfiConverterSequenceBytes) Lift(rb RustBufferI) [][]byte { 4575 + return LiftFromRustBuffer[[][]byte](c, rb) 4576 + } 4577 + 4578 + func (c FfiConverterSequenceBytes) Read(reader io.Reader) [][]byte { 4579 + length := readInt32(reader) 4580 + if length == 0 { 4581 + return nil 4582 + } 4583 + result := make([][]byte, 0, length) 4584 + for i := int32(0); i < length; i++ { 4585 + result = append(result, FfiConverterBytesINSTANCE.Read(reader)) 4586 + } 4587 + return result 4588 + } 4589 + 4590 + func (c FfiConverterSequenceBytes) Lower(value [][]byte) C.RustBuffer { 4591 + return LowerIntoRustBuffer[[][]byte](c, value) 4592 + } 4593 + 4594 + func (c FfiConverterSequenceBytes) Write(writer io.Writer, value [][]byte) { 4595 + if len(value) > math.MaxInt32 { 4596 + panic("[][]byte is too large to fit into Int32") 4597 + } 4598 + 4599 + writeInt32(writer, int32(len(value))) 4600 + for _, item := range value { 4601 + FfiConverterBytesINSTANCE.Write(writer, item) 4602 + } 4603 + } 4604 + 4605 + type FfiDestroyerSequenceBytes struct{} 4606 + 4607 + func (FfiDestroyerSequenceBytes) Destroy(sequence [][]byte) { 4608 + for _, value := range sequence { 4609 + FfiDestroyerBytes{}.Destroy(value) 4610 + } 4611 + } 4612 + 4543 4613 type FfiConverterSequencePublicKey struct{} 4544 4614 4545 4615 var FfiConverterSequencePublicKeyINSTANCE = FfiConverterSequencePublicKey{} ··· 4703 4773 } 4704 4774 } 4705 4775 4776 + func GetManifests(data []byte) (string, error) { 4777 + _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI { 4778 + return GoRustBuffer{ 4779 + inner: C.uniffi_iroh_streamplace_fn_func_get_manifests(FfiConverterBytesINSTANCE.Lower(data), _uniffiStatus), 4780 + } 4781 + }) 4782 + if _uniffiErr != nil { 4783 + var _uniffiDefaultValue string 4784 + return _uniffiDefaultValue, _uniffiErr 4785 + } else { 4786 + return FfiConverterStringINSTANCE.Lift(_uniffiRV), nil 4787 + } 4788 + } 4789 + 4706 4790 // Initialize logging with the default subscriber that respects RUST_LOG environment variable. 4707 4791 // This function is safe to call multiple times - it will only initialize logging once. 4708 4792 func InitLogging() { ··· 4737 4821 } 4738 4822 } 4739 4823 4824 + func Resign(unsignedSegLabel string, unsignedSegData []byte, signedConcatData []byte, certs []byte, gosigner GoSigner) ([]byte, error) { 4825 + _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI { 4826 + return GoRustBuffer{ 4827 + inner: C.uniffi_iroh_streamplace_fn_func_resign(FfiConverterStringINSTANCE.Lower(unsignedSegLabel), FfiConverterBytesINSTANCE.Lower(unsignedSegData), FfiConverterBytesINSTANCE.Lower(signedConcatData), FfiConverterBytesINSTANCE.Lower(certs), FfiConverterGoSignerINSTANCE.Lower(gosigner), _uniffiStatus), 4828 + } 4829 + }) 4830 + if _uniffiErr != nil { 4831 + var _uniffiDefaultValue []byte 4832 + return _uniffiDefaultValue, _uniffiErr 4833 + } else { 4834 + return FfiConverterBytesINSTANCE.Lift(_uniffiRV), nil 4835 + } 4836 + } 4837 + 4740 4838 func Sign(manifest string, data []byte, certs []byte, gosigner GoSigner) ([]byte, error) { 4741 4839 _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI { 4742 4840 return GoRustBuffer{ 4743 4841 inner: C.uniffi_iroh_streamplace_fn_func_sign(FfiConverterStringINSTANCE.Lower(manifest), FfiConverterBytesINSTANCE.Lower(data), FfiConverterBytesINSTANCE.Lower(certs), FfiConverterGoSignerINSTANCE.Lower(gosigner), _uniffiStatus), 4842 + } 4843 + }) 4844 + if _uniffiErr != nil { 4845 + var _uniffiDefaultValue []byte 4846 + return _uniffiDefaultValue, _uniffiErr 4847 + } else { 4848 + return FfiConverterBytesINSTANCE.Lift(_uniffiRV), nil 4849 + } 4850 + } 4851 + 4852 + func SignWithIngredients(manifest string, data []byte, certs []byte, ingredients [][]byte, gosigner GoSigner) ([]byte, error) { 4853 + _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI { 4854 + return GoRustBuffer{ 4855 + inner: C.uniffi_iroh_streamplace_fn_func_sign_with_ingredients(FfiConverterStringINSTANCE.Lower(manifest), FfiConverterBytesINSTANCE.Lower(data), FfiConverterBytesINSTANCE.Lower(certs), FfiConverterSequenceBytesINSTANCE.Lower(ingredients), FfiConverterGoSignerINSTANCE.Lower(gosigner), _uniffiStatus), 4744 4856 } 4745 4857 }) 4746 4858 if _uniffiErr != nil {
+33
pkg/iroh/generated/iroh_streamplace/iroh_streamplace.h
··· 738 738 RustBuffer uniffi_iroh_streamplace_fn_func_get_manifest_and_cert(RustBuffer data, RustCallStatus *out_status 739 739 ); 740 740 #endif 741 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_GET_MANIFESTS 742 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_GET_MANIFESTS 743 + RustBuffer uniffi_iroh_streamplace_fn_func_get_manifests(RustBuffer data, RustCallStatus *out_status 744 + ); 745 + #endif 741 746 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_INIT_LOGGING 742 747 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_INIT_LOGGING 743 748 void uniffi_iroh_streamplace_fn_func_init_logging(RustCallStatus *out_status ··· 754 759 void* uniffi_iroh_streamplace_fn_func_node_id_from_ticket(RustBuffer ticket_str, RustCallStatus *out_status 755 760 ); 756 761 #endif 762 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_RESIGN 763 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_RESIGN 764 + RustBuffer uniffi_iroh_streamplace_fn_func_resign(RustBuffer unsigned_seg_label, RustBuffer unsigned_seg_data, RustBuffer signed_concat_data, RustBuffer certs, void* gosigner, RustCallStatus *out_status 765 + ); 766 + #endif 757 767 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_SIGN 758 768 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_SIGN 759 769 RustBuffer uniffi_iroh_streamplace_fn_func_sign(RustBuffer manifest, RustBuffer data, RustBuffer certs, void* gosigner, RustCallStatus *out_status 770 + ); 771 + #endif 772 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_SIGN_WITH_INGREDIENTS 773 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_SIGN_WITH_INGREDIENTS 774 + RustBuffer uniffi_iroh_streamplace_fn_func_sign_with_ingredients(RustBuffer manifest, RustBuffer data, RustBuffer certs, RustBuffer ingredients, void* gosigner, RustCallStatus *out_status 760 775 ); 761 776 #endif 762 777 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_SUBSCRIBE_ITEM_DEBUG ··· 1050 1065 1051 1066 ); 1052 1067 #endif 1068 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_FUNC_GET_MANIFESTS 1069 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_FUNC_GET_MANIFESTS 1070 + uint16_t uniffi_iroh_streamplace_checksum_func_get_manifests(void 1071 + 1072 + ); 1073 + #endif 1053 1074 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_FUNC_INIT_LOGGING 1054 1075 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_FUNC_INIT_LOGGING 1055 1076 uint16_t uniffi_iroh_streamplace_checksum_func_init_logging(void ··· 1068 1089 1069 1090 ); 1070 1091 #endif 1092 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_FUNC_RESIGN 1093 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_FUNC_RESIGN 1094 + uint16_t uniffi_iroh_streamplace_checksum_func_resign(void 1095 + 1096 + ); 1097 + #endif 1071 1098 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_FUNC_SIGN 1072 1099 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_FUNC_SIGN 1073 1100 uint16_t uniffi_iroh_streamplace_checksum_func_sign(void 1101 + 1102 + ); 1103 + #endif 1104 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_FUNC_SIGN_WITH_INGREDIENTS 1105 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_FUNC_SIGN_WITH_INGREDIENTS 1106 + uint16_t uniffi_iroh_streamplace_checksum_func_sign_with_ingredients(void 1074 1107 1075 1108 ); 1076 1109 #endif
+2 -2
pkg/media/clip.go
··· 43 43 defer cancel() 44 44 45 45 pipelineSlice := []string{ 46 - "mp4mux faststart=true name=muxer ! appsink sync=false name=mp4sink", 47 - "h264parse name=videoparse ! h264timestamper ! muxer.video_0", 46 + "mp4mux name=muxer ! appsink sync=false name=mp4sink", 47 + "h264parse name=videoparse ! muxer.video_0", 48 48 "opusparse name=audioparse ! muxer.audio_0", 49 49 } 50 50
+142
pkg/media/deterministic_muxing_test.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "crypto/sha256" 6 + "encoding/json" 7 + "fmt" 8 + "os" 9 + "path/filepath" 10 + "reflect" 11 + "sort" 12 + "testing" 13 + 14 + "github.com/stretchr/testify/require" 15 + "stream.place/streamplace/test/remote" 16 + ) 17 + 18 + var muxTestCount = 2 19 + 20 + func TestDeterministicMuxing(t *testing.T) { 21 + withNoGSTLeaks(t, func() { 22 + tempDir, err := os.MkdirTemp("", "deterministic_muxing_test") 23 + require.NoError(t, err) 24 + 25 + startFile := remote.RemoteArchive("14ba49843a56c0510e2b5059123abd2f98a502b1f4c7d706b0ae1066d438468c/BigBuckBunny_1sGOP_4kp60_NoBframes.1min.tar.gz") 26 + 27 + require.NoError(t, err) 28 + for i := 0; i < muxTestCount; i++ { 29 + splitAndCombineTest(t, tempDir, startFile) 30 + require.NoError(t, err) 31 + } 32 + }) 33 + } 34 + 35 + func splitAndCombineTest(t *testing.T, tempDir string, inputDir string) string { 36 + var err error 37 + tempDir, err = os.MkdirTemp(tempDir, "splitAndCombineTest") 38 + require.NoError(t, err) 39 + 40 + firstReport, err := makeSegDirReport(t, inputDir) 41 + require.NoError(t, err) 42 + 43 + combinedHashes := []string{} 44 + combinedFiles := []string{} 45 + for i := 0; i < muxTestCount; i++ { 46 + outFilePath := filepath.Join(tempDir, fmt.Sprintf("combined_%d.mp4", i)) 47 + combinedFiles = append(combinedFiles, outFilePath) 48 + outFile, err := os.Create(outFilePath) 49 + require.NoError(t, err) 50 + defer outFile.Close() 51 + err = Clip(context.Background(), firstReport.Segs, outFile) 52 + require.NoError(t, err) 53 + hash, err := hashFile(outFilePath) 54 + require.NoError(t, err) 55 + combinedHashes = append(combinedHashes, hash) 56 + } 57 + 58 + for _, hash := range combinedHashes { 59 + require.Equal(t, hash, combinedHashes[0]) 60 + } 61 + 62 + for i := 0; i < muxTestCount; i++ { 63 + segDir, err := os.MkdirTemp(tempDir, "segs") 64 + require.NoError(t, err) 65 + err = SegmentFile(context.Background(), combinedFiles[0], segDir) 66 + require.NoError(t, err) 67 + report, err := makeSegDirReport(t, segDir) 68 + require.NoError(t, err) 69 + require.NoError(t, report.CheckEquals(firstReport), "round-trip muxing is not deterministic") 70 + } 71 + 72 + return combinedFiles[0] 73 + } 74 + 75 + type SegDirReport struct { 76 + Dir string 77 + Segs []string 78 + Hashes []string 79 + } 80 + 81 + func makeSegDirReport(t *testing.T, segDir string) (*SegDirReport, error) { 82 + segs := []string{} 83 + segEntries, err := os.ReadDir(segDir) 84 + require.NoError(t, err) 85 + for _, segEntry := range segEntries { 86 + if segEntry.Type().IsRegular() { 87 + segPath := filepath.Join(segDir, segEntry.Name()) 88 + segs = append(segs, segPath) 89 + } 90 + } 91 + sort.Strings(segs) 92 + hashes := make([]string, len(segs)) 93 + for i, segPath := range segs { 94 + hash, err := hashFile(segPath) 95 + if err != nil { 96 + return nil, err 97 + } 98 + hashes[i] = hash 99 + } 100 + 101 + return &SegDirReport{ 102 + Dir: segDir, 103 + Segs: segs, 104 + Hashes: hashes, 105 + }, nil 106 + } 107 + 108 + func (s *SegDirReport) Equals(other *SegDirReport) bool { 109 + if len(s.Segs) != len(other.Segs) { 110 + return false 111 + } 112 + if len(s.Hashes) != len(other.Hashes) { 113 + return false 114 + } 115 + return reflect.DeepEqual(s.Hashes, other.Hashes) 116 + } 117 + 118 + func hashFile(path string) (string, error) { 119 + bs, err := os.ReadFile(path) 120 + if err != nil { 121 + return "", err 122 + } 123 + hash := sha256.Sum256(bs) 124 + return fmt.Sprintf("%x", hash), nil 125 + } 126 + 127 + func (s *SegDirReport) CheckEquals(other *SegDirReport) error { 128 + if !s.Equals(other) { 129 + str1 := s.ToString() 130 + str2 := other.ToString() 131 + return fmt.Errorf("files should be equal: %s\n%s", str1, str2) 132 + } 133 + return nil 134 + } 135 + 136 + func (s *SegDirReport) ToString() string { 137 + bs, err := json.MarshalIndent(s, "", " ") 138 + if err != nil { 139 + panic(err) 140 + } 141 + return string(bs) 142 + }
+156
pkg/media/ingredient_test.go
··· 1 + package media 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "os" 9 + "path/filepath" 10 + "sort" 11 + "testing" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/util" 15 + "github.com/stretchr/testify/require" 16 + c2patypes "stream.place/streamplace/pkg/c2patypes" 17 + "stream.place/streamplace/pkg/config" 18 + ct "stream.place/streamplace/pkg/config/configtesting" 19 + "stream.place/streamplace/pkg/crypto/spkey" 20 + "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 21 + "stream.place/streamplace/test/remote" 22 + ) 23 + 24 + var testTimestamp = "2025-01-01T00:00:00.000Z" 25 + 26 + type ManifestResult struct { 27 + Manifests map[string]c2patypes.Manifest `json:"manifests"` 28 + } 29 + 30 + type ManifestAndMetadata struct { 31 + Manifest c2patypes.Manifest 32 + SegmentMetadata *SegmentMetadata 33 + } 34 + 35 + func TestIngredientConcat(t *testing.T) { 36 + withNoGSTLeaks(t, func() { 37 + tempDir, err := os.MkdirTemp("", "ingredient_test") 38 + require.NoError(t, err) 39 + // defer os.RemoveAll(tempDir) 40 + segments := remote.RemoteArchive("14ba49843a56c0510e2b5059123abd2f98a502b1f4c7d706b0ae1066d438468c/BigBuckBunny_1sGOP_4kp60_NoBframes.1min.tar.gz") 41 + // segments := "/home/iameli/testvids/three" 42 + testVids := []string{} 43 + sort.Strings(testVids) 44 + segEntries, err := os.ReadDir(segments) 45 + require.NoError(t, err) 46 + for _, segEntry := range segEntries { 47 + if segEntry.Type().IsRegular() { 48 + testVids = append(testVids, filepath.Join(segments, segEntry.Name())) 49 + } 50 + } 51 + firstReport, err := makeSegDirReport(t, segments) 52 + require.NoError(t, err) 53 + priv, _, err := spkey.GenerateStreamKey() 54 + require.NoError(t, err) 55 + signer, err := spkey.KeyToSigner(priv) 56 + require.NoError(t, err) 57 + cli := ct.CLI(t, &config.CLI{ 58 + TAURL: "http://timestamp.digicert.com", 59 + WideOpen: true, 60 + }) 61 + msInterface, err := MakeMediaSigner(context.Background(), cli, "test-person", signer, nil) 62 + require.NoError(t, err) 63 + ms := msInterface.(*MediaSignerLocal) 64 + buf := bytes.Buffer{} 65 + err = Clip(context.Background(), testVids, &buf) 66 + require.NoError(t, err) 67 + ingredients := [][]byte{} 68 + startTS, err := time.Parse(util.ISO8601, testTimestamp) 69 + require.NoError(t, err) 70 + signedSegDir := makeTestSubdir(t, tempDir, "signed-segments") 71 + for i, vid := range testVids { 72 + ts := startTS.Add(time.Duration(i) * time.Second) 73 + bs, err := os.ReadFile(vid) 74 + require.NoError(t, err) 75 + signedBS, err := ms.SignMP4(context.Background(), bytes.NewReader(bs), ts.UnixMilli()) 76 + require.NoError(t, err) 77 + ingredients = append(ingredients, signedBS) 78 + err = os.WriteFile(filepath.Join(signedSegDir, fmt.Sprintf("signed_%06d.mp4", i)), signedBS, 0644) 79 + require.NoError(t, err) 80 + } 81 + signedReport, err := makeSegDirReport(t, signedSegDir) 82 + require.NoError(t, err) 83 + signedConcatBS, err := ms.SignConcatMP4(context.Background(), bytes.NewReader(buf.Bytes()), ingredients) 84 + require.NoError(t, err) 85 + require.Greater(t, len(signedConcatBS), 0) 86 + concatSegment := filepath.Join(tempDir, "ingredient-concat.mp4") 87 + err = os.WriteFile(concatSegment, signedConcatBS, 0644) 88 + require.NoError(t, err) 89 + splitSegDir := makeTestSubdir(t, tempDir, "split-segments") 90 + err = SegmentFile(context.Background(), concatSegment, splitSegDir) 91 + require.NoError(t, err) 92 + splitReport, err := makeSegDirReport(t, splitSegDir) 93 + require.NoError(t, err) 94 + require.NoError(t, splitReport.CheckEquals(firstReport), "split segments are not equal to original segments") 95 + DoReplay = true 96 + // splitSignedSegDir := makeTestSubdir(t, tempDir, "split-signed-segments") 97 + // for i, vid := range splitReport.Segs { 98 + // ts := startTS.Add(time.Duration(i) * time.Second) 99 + // bs, err := os.ReadFile(vid) 100 + // require.NoError(t, err) 101 + // signedBS, err := ms.SignMP4(context.Background(), bytes.NewReader(bs), ts.UnixMilli()) 102 + // require.NoError(t, err) 103 + // err = os.WriteFile(filepath.Join(splitSignedSegDir, fmt.Sprintf("signed_%06d.mp4", i)), signedBS, 0644) 104 + // require.NoError(t, err) 105 + // } 106 + // signedSplitReport, err := makeSegDirReport(t, splitSignedSegDir) 107 + // require.NoError(t, err) 108 + // require.NoError(t, signedSplitReport.CheckEquals(signedReport), "split signed segments are not equal to signed segments") 109 + manifestsStr, err := iroh_streamplace.GetManifests(signedConcatBS) 110 + require.NoError(t, err) 111 + var manifests ManifestResult 112 + err = json.Unmarshal([]byte(manifestsStr), &manifests) 113 + require.NoError(t, err) 114 + require.Greater(t, len(manifests.Manifests), 0) 115 + manifestList := []ManifestAndMetadata{} 116 + for _, manifest := range manifests.Manifests { 117 + metadata, err := ParseSegmentAssertions(context.Background(), &manifest) 118 + if err == ErrMissingMetadata { 119 + continue 120 + } 121 + require.NoError(t, err) 122 + manifestList = append(manifestList, ManifestAndMetadata{ 123 + Manifest: manifest, 124 + SegmentMetadata: metadata, 125 + }) 126 + } 127 + sort.Slice(manifestList, func(i, j int) bool { 128 + m1 := manifestList[i] 129 + m2 := manifestList[j] 130 + return m1.SegmentMetadata.StartTime.Time().Before(m2.SegmentMetadata.StartTime.Time()) 131 + }) 132 + signedSplitSegDir := makeTestSubdir(t, tempDir, "signed-split-segments") 133 + for i, vid := range testVids { 134 + bs, err := os.ReadFile(vid) 135 + require.NoError(t, err) 136 + rustCallbackSigner := &RustCallbackSigner{ 137 + Signer: ms.Signer, 138 + } 139 + fmt.Println("resigning", *manifestList[i].Manifest.Label) 140 + signedBS, err := iroh_streamplace.Resign(*manifestList[i].Manifest.Label, bs, signedConcatBS, ms.Cert, rustCallbackSigner) 141 + require.NoError(t, err) 142 + err = os.WriteFile(filepath.Join(signedSplitSegDir, fmt.Sprintf("signed_%06d.mp4", i)), signedBS, 0644) 143 + require.NoError(t, err) 144 + } 145 + signedSplitReport, err := makeSegDirReport(t, signedSplitSegDir) 146 + require.NoError(t, err) 147 + require.NoError(t, signedSplitReport.CheckEquals(signedReport), "split signed segments are not equal to signed segments") 148 + }) 149 + } 150 + 151 + func makeTestSubdir(t *testing.T, tempDir, subdir string) string { 152 + subDir := filepath.Join(tempDir, subdir) 153 + err := os.MkdirAll(subDir, 0755) 154 + require.NoError(t, err) 155 + return subDir 156 + }
+2 -1
pkg/media/media.go
··· 197 197 Livestream *streamplace.Livestream 198 198 } 199 199 200 + var ErrMissingMetadata = errors.New("missing segment metadata") 200 201 var ErrInvalidMetadata = errors.New("invalid segment metadata") 201 202 202 203 func ParseSegmentAssertions(ctx context.Context, mani *c2patypes.Manifest) (*SegmentMetadata, error) { ··· 210 211 } 211 212 } 212 213 if ass == nil { 213 - return nil, fmt.Errorf("couldn't find %s assertions", StreamplaceMetadata) 214 + return nil, ErrMissingMetadata 214 215 } 215 216 proc := ld.NewJsonLdProcessor() 216 217 options := ld.NewJsonLdOptions("")
+78
pkg/media/media_signer.go
··· 29 29 Pub() aqpub.Pub 30 30 Streamer() string 31 31 DID() string 32 + SignConcatMP4(ctx context.Context, input io.ReadSeeker, ingredients [][]byte) ([]byte, error) 32 33 } 34 + 35 + var DoReplay = false 33 36 34 37 type MediaSignerLocal struct { 35 38 StreamerName string ··· 40 43 did string 41 44 manifestBuilder *ManifestBuilder 42 45 PrebuiltManifest []byte // Optional: use this manifest instead of building one 46 + sigs [][]byte 43 47 } 44 48 45 49 func prepareCert(ctx context.Context, cli *config.CLI, signer crypto.Signer) ([]byte, error) { ··· 170 174 spmetrics.SigningDuration.WithLabelValues(ms.StreamerName).Observe(float64(time.Since(startTime).Milliseconds())) 171 175 return bs, nil 172 176 } 177 + 178 + func (ms *MediaSignerLocal) SignConcatMP4(ctx context.Context, input io.ReadSeeker, ingredients [][]byte) ([]byte, error) { 179 + startTime := time.Now() 180 + ctx, span := otel.Tracer("signer").Start(ctx, "SignMP4") 181 + defer span.End() 182 + for _, ingredient := range ingredients { 183 + _, err := iroh_streamplace.GetManifestAndCert(ingredient) 184 + if err != nil { 185 + return nil, err 186 + } 187 + } 188 + // title := "livestream" 189 + mani := obj{ 190 + "title": "Livestream Clip", 191 + // "assertions": []obj{ 192 + // { 193 + // "label": "c2pa.actions", 194 + // "data": obj{ 195 + // "actions": []obj{ 196 + // {"action": "c2pa.created"}, 197 + // {"action": "c2pa.published"}, 198 + // }, 199 + // }, 200 + // }, 201 + // { 202 + // "label": StreamplaceMetadata, 203 + // "data": obj{ 204 + // "@context": obj{ 205 + // "dc": "http://purl.org/dc/elements/1.1/", 206 + // }, 207 + // "dc:creator": ms.StreamerName, 208 + // "dc:title": []string{title}, 209 + // "dc:date": []string{aqtime.FromMillis(start).String()}, 210 + // }, 211 + // }, 212 + // }, 213 + } 214 + ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_MarshalManifest") 215 + manifestBs, err := json.Marshal(mani) 216 + if err != nil { 217 + return nil, fmt.Errorf("failed to marshal manifest: %w", err) 218 + } 219 + var manifest c2patypes.ManifestDefinition 220 + err = json.Unmarshal(manifestBs, &manifest) 221 + if err != nil { 222 + return nil, fmt.Errorf("failed to unmarshal manifest: %w", err) 223 + } 224 + span.End() 225 + 226 + bs, err := io.ReadAll(input) 227 + if err != nil { 228 + return nil, fmt.Errorf("failed to read input: %w", err) 229 + } 230 + ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_Sign") 231 + rustCallbackSigner := &RustCallbackSigner{ 232 + Signer: ms.Signer, 233 + } 234 + bs, err = iroh_streamplace.SignWithIngredients(string(manifestBs), bs, ms.Cert, ingredients, rustCallbackSigner) 235 + if err != nil { 236 + return nil, err 237 + } 238 + span.End() 239 + 240 + ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_OutputBytes") 241 + defer ctx.Done() 242 + if err != nil { 243 + return nil, fmt.Errorf("failed to get output bytes: %w", err) 244 + } 245 + span.End() 246 + spmetrics.SigningDuration.WithLabelValues(ms.StreamerName).Observe(float64(time.Since(startTime).Milliseconds())) 247 + return bs, nil 248 + } 249 + 250 + // don't call externally! this is used as a callback for the rust library 173 251 174 252 func (ms *MediaSignerLocal) Pub() aqpub.Pub { 175 253 return ms.AQPub
-154
pkg/media/media_signer_ext.go
··· 1 - package media 2 - 3 - import ( 4 - "bytes" 5 - "context" 6 - "crypto" 7 - "crypto/ecdsa" 8 - "fmt" 9 - "io" 10 - "os" 11 - "os/exec" 12 - "time" 13 - 14 - "github.com/decred/dcrd/dcrec/secp256k1" 15 - "github.com/mr-tron/base58" 16 - "go.opentelemetry.io/otel" 17 - "stream.place/streamplace/pkg/atproto" 18 - "stream.place/streamplace/pkg/config" 19 - "stream.place/streamplace/pkg/crypto/aqpub" 20 - "stream.place/streamplace/pkg/log" 21 - "stream.place/streamplace/pkg/model" 22 - "stream.place/streamplace/pkg/spmetrics" 23 - ) 24 - 25 - type MediaSignerExt struct { 26 - cli *config.CLI 27 - signer crypto.Signer 28 - pub aqpub.Pub 29 - certPath string 30 - streamer string 31 - keyBs []byte 32 - taURL string 33 - did string 34 - manifestBuilder *ManifestBuilder 35 - } 36 - 37 - func MakeMediaSignerExt(ctx context.Context, cli *config.CLI, streamer string, keyBs []byte, model model.Model) (MediaSigner, error) { 38 - key, _ := secp256k1.PrivKeyFromBytes(keyBs) 39 - if key == nil { 40 - return nil, fmt.Errorf("invalid authorization key (not valid secp256k1)") 41 - } 42 - var signer crypto.Signer = key.ToECDSA() 43 - certBs, err := prepareCert(ctx, cli, signer) 44 - if err != nil { 45 - return nil, err 46 - } 47 - // Write certificate to a temporary file 48 - certFile, err := os.CreateTemp("", "cert-*.pem") 49 - if err != nil { 50 - return nil, fmt.Errorf("failed to create temp cert file: %w", err) 51 - } 52 - defer certFile.Close() 53 - 54 - if _, err := certFile.Write(certBs); err != nil { 55 - return nil, fmt.Errorf("failed to write cert to temp file: %w", err) 56 - } 57 - 58 - certPath := certFile.Name() 59 - pub, err := aqpub.FromPublicKey(signer.Public().(*ecdsa.PublicKey)) 60 - if err != nil { 61 - return nil, err 62 - } 63 - did, err := atproto.ParsePubKey(signer.Public().(*ecdsa.PublicKey)) 64 - if err != nil { 65 - return nil, err 66 - } 67 - return &MediaSignerExt{ 68 - signer: signer, 69 - certPath: certPath, 70 - streamer: streamer, 71 - pub: pub, 72 - keyBs: keyBs, 73 - taURL: cli.TAURL, 74 - did: did.DIDKey(), 75 - manifestBuilder: NewManifestBuilder(model), 76 - }, nil 77 - } 78 - 79 - func (ms *MediaSignerExt) SignMP4(ctx context.Context, input io.ReadSeeker, start int64) ([]byte, error) { 80 - startTime := time.Now() 81 - _, span := otel.Tracer("signer").Start(ctx, "SignMP4_Ext") 82 - defer span.End() 83 - 84 - // Build manifest with metadata from database 85 - manifestBs, err := ms.manifestBuilder.BuildManifest(ctx, ms.streamer, start) 86 - if err != nil { 87 - log.Error(ctx, "MediaSignerExt: failed to build manifest", "error", err) 88 - return nil, fmt.Errorf("failed to build manifest: %w", err) 89 - } 90 - 91 - // Get the path to the current executable 92 - execPath, err := os.Executable() 93 - if err != nil { 94 - return nil, fmt.Errorf("failed to get executable path: %w", err) 95 - } 96 - 97 - enc := base58.Encode(ms.keyBs) 98 - 99 - // Prepare command with manifest JSON 100 - cmd := exec.Command(execPath, "sign", 101 - "--key", enc, 102 - "--cert", ms.certPath, 103 - "--ta-url", ms.taURL, 104 - "--streamer", ms.streamer, 105 - "--start-time", fmt.Sprintf("%d", start), 106 - "--manifest", string(manifestBs)) 107 - 108 - // overwrite so that our subprocesses don't do their own leak checking 109 - cmd.Env = append(os.Environ(), "LD_PRELOAD=") 110 - 111 - // Set up pipes for stdin and stdout 112 - stdin, err := cmd.StdinPipe() 113 - if err != nil { 114 - return nil, fmt.Errorf("failed to create stdin pipe: %w", err) 115 - } 116 - 117 - stdout := &bytes.Buffer{} 118 - cmd.Stdout = stdout 119 - stderr := &bytes.Buffer{} 120 - cmd.Stderr = stderr 121 - 122 - // Start the command 123 - if err := cmd.Start(); err != nil { 124 - return nil, fmt.Errorf("failed to start command: %w", err) 125 - } 126 - 127 - // Copy input to stdin 128 - _, err = io.Copy(stdin, input) 129 - if err != nil { 130 - return nil, fmt.Errorf("failed to write to stdin: %w stderr=%s", err, stderr.String()) 131 - } 132 - stdin.Close() 133 - 134 - // Wait for the command to complete 135 - if err := cmd.Wait(); err != nil { 136 - log.Error(ctx, "MediaSignerExt: subprocess failed", "error", err, "stderr", stderr.String()) 137 - return nil, fmt.Errorf("command failed: %w, stderr: %s", err, stderr.String()) 138 - } 139 - 140 - spmetrics.SigningDuration.WithLabelValues(ms.streamer).Observe(float64(time.Since(startTime).Milliseconds())) 141 - return stdout.Bytes(), nil 142 - } 143 - 144 - func (ms *MediaSignerExt) Pub() aqpub.Pub { 145 - return ms.pub 146 - } 147 - 148 - func (ms *MediaSignerExt) Streamer() string { 149 - return ms.streamer 150 - } 151 - 152 - func (ms *MediaSignerExt) DID() string { 153 - return ms.did 154 - }
+150 -28
pkg/media/segmenter.go
··· 4 4 "bytes" 5 5 "context" 6 6 "fmt" 7 + "os" 8 + "strings" 9 + "sync/atomic" 7 10 "time" 8 11 9 12 "github.com/go-gst/go-gst/gst" 10 13 "github.com/go-gst/go-gst/gst/app" 11 - "go.opentelemetry.io/otel" 12 - "go.opentelemetry.io/otel/attribute" 13 - "go.opentelemetry.io/otel/trace" 14 - "stream.place/streamplace/pkg/globalerror" 14 + "golang.org/x/sync/errgroup" 15 15 "stream.place/streamplace/pkg/log" 16 16 ) 17 17 18 18 // element that takes the input stream, muxes to mp4, and signs the result 19 - func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) { 19 + func SegmentElem(ctx context.Context, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) { 20 20 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1") 21 21 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 22 22 "name": "signer", ··· 55 55 } 56 56 }() 57 57 58 + // we didn't need faststart but i'm leaving this commented here in case 59 + // you want to change any other muxer properties in the future 60 + 61 + _, err = elem.Connect("muxer-added", func(split, muxEle *gst.Element) { 62 + err := muxEle.SetProperty("presentation-time", false) 63 + if err != nil { 64 + panic("error setting interleave-bytes to 4000: " + err.Error()) 65 + } 66 + }) 67 + if err != nil { 68 + return nil, fmt.Errorf("failed to connect muxer-added handler: %w", err) 69 + } 70 + 71 + // channel to make sure data is emitted in order 72 + var ch chan struct{} 73 + 58 74 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) { 75 + previousSegCh := ch 76 + mySegCh := make(chan struct{}, 1) 77 + ch = mySegCh 59 78 buf := &bytes.Buffer{} 79 + err := sinkEle.SetProperty("sync", false) 80 + if err != nil { 81 + panic("error setting sync to false: " + err.Error()) 82 + } 60 83 appsink := app.SinkFromElement(sinkEle) 61 84 if appsink == nil { 62 85 panic("appsink should not be nil") ··· 65 88 appsink.SetCallbacks(&app.SinkCallbacks{ 66 89 NewSampleFunc: WriterNewSample(ctx, buf), 67 90 EOSFunc: func(sink *app.Sink) { 68 - ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes( 69 - attribute.String("streamer", ms.Streamer()), 70 - )) 71 - defer span.End() 91 + // ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes( 92 + // attribute.String("streamer", ms.Streamer()), 93 + // )) 94 + // defer span.End() 95 + now := time.Now().UnixMilli() 72 96 resetTimer <- struct{}{} 73 - now := time.Now().UnixMilli() 74 97 bs := buf.Bytes() 75 - if mm.cli.SmearAudio { 76 - smearedBuf := &bytes.Buffer{} 77 - err := SmearAudioTimestamps(ctx, bytes.NewReader(buf.Bytes()), smearedBuf) 78 - if err != nil { 79 - log.Error(ctx, "error smearing audio timestamps", "error", err) 80 - return 81 - } 82 - bs = smearedBuf.Bytes() 98 + 99 + if previousSegCh != nil { 100 + <-previousSegCh 83 101 } 84 - bs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now) 102 + err := cb(ctx, bs, now) 85 103 if err != nil { 86 104 log.Error(ctx, "error signing segment", "error", err) 87 105 return 88 106 } 107 + close(mySegCh) 89 108 90 - err = mm.ValidateMP4(ctx, bytes.NewReader(bs), true) 91 - if err != nil { 92 - log.Error(ctx, "error validating segment", "error", err) 93 - globalerror.GlobalError(err) 94 - // We don't want to stop the pipeline here because we want to keep the stream 95 - // alive. Lots of weird invalid data coming in from WebRTC connections on 96 - // phones. Better we drop one weird segment than force the stream to restart 97 - return 98 - } 99 109 }, 100 110 }) 101 111 }) ··· 105 115 106 116 return elem, nil 107 117 } 118 + 119 + func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) { 120 + return SegmentElem(ctx, func(ctx context.Context, bs []byte, now int64) error { 121 + if mm.cli.SmearAudio { 122 + smearedBuf := &bytes.Buffer{} 123 + err := SmearAudioTimestamps(ctx, bytes.NewReader(bs), smearedBuf) 124 + if err != nil { 125 + return fmt.Errorf("error smearing audio timestamps: %w", err) 126 + } 127 + bs = smearedBuf.Bytes() 128 + } 129 + signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now) 130 + if err != nil { 131 + return err 132 + } 133 + return mm.ValidateMP4(ctx, bytes.NewReader(signedBs), true) 134 + }) 135 + } 136 + 137 + func SegmentFile(ctx context.Context, input string, outDir string) error { 138 + ctx, cancel := context.WithCancel(ctx) 139 + defer cancel() 140 + g, ctx := errgroup.WithContext(ctx) 141 + pipelineSlice := []string{ 142 + "filesrc name=filesrc ! qtdemux name=demux", 143 + "demux. ! queue ! h264parse ! rtph264pay ! rtph264depay ! h264parse name=videoparse", 144 + "demux. ! queue ! opusparse name=audioparse", 145 + } 146 + pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 147 + if err != nil { 148 + return fmt.Errorf("error creating MKVIngest pipeline: %w", err) 149 + } 150 + 151 + srcele, err := pipeline.GetElementByName("filesrc") 152 + if err != nil { 153 + return err 154 + } 155 + if err := srcele.Set("location", input); err != nil { 156 + return err 157 + } 158 + 159 + videoParseEle, err := pipeline.GetElementByName("videoparse") 160 + if err != nil { 161 + return err 162 + } 163 + 164 + var segCount atomic.Int64 165 + 166 + segmenter, err := SegmentElem(ctx, func(ctx context.Context, buf []byte, now int64) error { 167 + seg := segCount.Load() 168 + segCount.Add(1) 169 + g.Go(func() error { 170 + fpath := fmt.Sprintf("%s/%06d.mp4", outDir, seg) 171 + log.Log(ctx, "writing segment", "path", fpath) 172 + fd, err := os.Create(fpath) 173 + if err != nil { 174 + return err 175 + } 176 + defer fd.Close() 177 + _, err = fd.Write(buf) 178 + return err 179 + }) 180 + return nil 181 + }) 182 + if err != nil { 183 + return err 184 + } 185 + 186 + err = pipeline.Add(segmenter) 187 + if err != nil { 188 + return err 189 + } 190 + err = videoParseEle.Link(segmenter) 191 + if err != nil { 192 + return err 193 + } 194 + audioparse, err := pipeline.GetElementByName("audioparse") 195 + if err != nil { 196 + return err 197 + } 198 + err = audioparse.Link(segmenter) 199 + if err != nil { 200 + return err 201 + } 202 + 203 + busErr := make(chan error) 204 + go func() { 205 + err := HandleBusMessages(ctx, pipeline) 206 + cancel() 207 + busErr <- err 208 + }() 209 + 210 + err = pipeline.SetState(gst.StatePlaying) 211 + if err != nil { 212 + return err 213 + } 214 + 215 + defer func() { 216 + err := pipeline.SetState(gst.StateNull) 217 + if err != nil { 218 + log.Error(ctx, "error setting pipeline to null state", "error", err) 219 + } 220 + }() 221 + 222 + <-busErr 223 + err = g.Wait() 224 + if err != nil { 225 + return err 226 + } 227 + 228 + return nil 229 + }
+55 -23
pkg/media/validate.go
··· 10 10 "strings" 11 11 "time" 12 12 13 + "github.com/bluesky-social/indigo/atproto/crypto" 13 14 "go.opentelemetry.io/otel" 14 15 "stream.place/streamplace/pkg/aqtime" 15 16 c2patypes "stream.place/streamplace/pkg/c2patypes" ··· 32 33 if err != nil { 33 34 return err 34 35 } 35 - var maniCert ManifestAndCert 36 - maniStr, err := iroh_streamplace.GetManifestAndCert(buf) 36 + 37 + valid, err := ValidateMP4Media(ctx, buf) 37 38 if err != nil { 38 39 return err 39 40 } 40 - err = json.Unmarshal([]byte(maniStr), &maniCert) 41 - if err != nil { 42 - return err 43 - } 44 - label := maniCert.Manifest.Label 41 + meta := valid.Meta 42 + pub := valid.Pub 43 + mediaData := valid.MediaData 44 + manifest := valid.Manifest 45 + 46 + label := manifest.Label 45 47 if label != nil && mm.model != nil { 46 48 oldSeg, err := mm.model.GetSegment(*label) 47 49 if err != nil { ··· 52 54 return nil 53 55 } 54 56 } 55 - pub, err := signers.ParseES256KCert([]byte(maniCert.Cert)) 56 - if err != nil { 57 - return err 58 - } 59 - meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest) 60 - if err != nil { 61 - return err 62 - } 57 + 63 58 if meta.MetadataConfiguration != nil { 64 59 if meta.MetadataConfiguration.DistributionPolicy != nil { 65 60 allowedBroadcasters := meta.MetadataConfiguration.DistributionPolicy.AllowedBroadcasters ··· 70 65 } 71 66 } 72 67 } 73 - mediaData, err := ParseSegmentMediaData(ctx, buf) 74 - if err != nil { 75 - return err 76 - } 77 - // special case for test signers that are only signed with a key 68 + 78 69 var repoDID string 79 70 var signingKeyDID string 71 + // special case for test signers that are only signed with a key 80 72 if strings.HasPrefix(meta.Creator, constants.DID_KEY_PREFIX) { 81 73 signingKeyDID = meta.Creator 82 74 repoDID = meta.Creator ··· 123 115 deleteAfter = meta.DistributionPolicy.ExpiresAt 124 116 } 125 117 seg := &model.Segment{ 126 - ID: *maniCert.Manifest.Label, 118 + ID: *label, 127 119 SigningKeyDID: signingKeyDID, 128 120 RepoDID: repoDID, 129 121 StartTime: meta.StartTime.Time(), ··· 150 142 case <-ctx.Done(): 151 143 return 152 144 case <-time.After(1 * time.Minute): 153 - log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *maniCert.Manifest.Label) 145 + log.Warn(ctx, "failed to send segment to channel, timing out", "streamer", repoDID, "signingKey", signingKeyDID, "segmentID", *label) 154 146 return 155 147 } 156 148 }() 157 149 } 158 150 aqt := aqtime.FromTime(meta.StartTime.Time()) 159 - log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *maniCert.Manifest.Label) 151 + log.Log(ctx, "successfully ingested segment", "user", repoDID, "signingKey", signingKeyDID, "timestamp", aqt.FileSafeString(), "segmentID", *label) 160 152 return nil 161 153 } 162 154 ··· 205 197 } 206 198 return false 207 199 } 200 + 201 + type ValidationResult struct { 202 + Pub *crypto.PublicKeyK256 203 + Meta *SegmentMetadata 204 + MediaData *model.SegmentMediaData 205 + Manifest *c2patypes.Manifest 206 + Cert string 207 + } 208 + 209 + // validate a signed mp4 file unto itself, ignoring whether this user is allowed and whatnot 210 + func ValidateMP4Media(ctx context.Context, buf []byte) (*ValidationResult, error) { 211 + var maniCert ManifestAndCert 212 + maniStr, err := iroh_streamplace.GetManifestAndCert(buf) 213 + if err != nil { 214 + return nil, err 215 + } 216 + err = json.Unmarshal([]byte(maniStr), &maniCert) 217 + if err != nil { 218 + return nil, err 219 + } 220 + pub, err := signers.ParseES256KCert([]byte(maniCert.Cert)) 221 + if err != nil { 222 + return nil, err 223 + } 224 + meta, err := ParseSegmentAssertions(ctx, &maniCert.Manifest) 225 + if err != nil { 226 + return nil, err 227 + } 228 + mediaData, err := ParseSegmentMediaData(ctx, buf) 229 + if err != nil { 230 + return nil, err 231 + } 232 + return &ValidationResult{ 233 + Pub: pub, 234 + Meta: meta, 235 + MediaData: mediaData, 236 + Manifest: &maniCert.Manifest, 237 + Cert: maniCert.Cert, 238 + }, nil 239 + }
+1 -1
rust/iroh-streamplace/Cargo.toml
··· 25 25 postcard = { version = "1.1.3", features = ["use-std"] } 26 26 irpc = "0.9.0" 27 27 irpc-iroh = "0.9.0" 28 - c2pa = { git = "https://github.com/hyphacoop/c2pa-rs.git", rev = "1b84d40219b27340a30fc309250e774e8a7b7761", features = [ 28 + c2pa = { git = "https://github.com/streamplace/c2pa-rs.git", rev = "544825abfaf9e588813e18dba70c8d5afd039d46", features = [ 29 29 "openssl", 30 30 "file_io", 31 31 ] }
+131 -1
rust/iroh-streamplace/src/c2pa.rs
··· 1 1 use std::{io::Cursor, sync::Arc}; 2 2 3 - use c2pa::{Builder, CallbackSigner, Reader, settings::Settings}; 3 + use c2pa::Builder; 4 + use c2pa::CallbackSigner; 5 + use c2pa::Reader; 6 + use c2pa::settings::Settings; 7 + 8 + use c2pa::Ingredient; 9 + use c2pa::jumbf_io; 10 + use c2pa::status_tracker::StatusTracker; 11 + use c2pa::store::Store; 12 + 4 13 use serde_json; 5 14 6 15 #[derive(Debug, thiserror::Error, uniffi::Error)] ··· 16 25 pub fn get_manifest_and_cert(data: Vec<u8>) -> Result<String, SPError> { 17 26 let reader = Reader::from_stream("video/mp4", Cursor::new(data)) 18 27 .map_err(|e| SPError::C2paError(e.to_string()))?; 28 + 19 29 if let Some(manifest) = reader.active_manifest() { 20 30 let cert_chain = if let Some(si) = manifest.signature_info() { 21 31 si.cert_chain() ··· 110 120 Settings::from_toml(TOML_SETTINGS).map_err(|e| SPError::C2paError(e.to_string()))?; 111 121 let callback_signer = CallbackSigner::new( 112 122 move |_context: *const (), data: &[u8]| { 123 + let signature = gosigner 124 + .sign(data.to_vec()) 125 + .map_err(|e| c2pa::Error::BadParam(e.to_string()))?; 126 + Ok(signature) 127 + }, 128 + c2pa::SigningAlg::Es256K, 129 + certs, 130 + ); 131 + let mut builder = 132 + Builder::from_json(&manifest).map_err(|e| SPError::C2paError(e.to_string()))?; 133 + let mut output = Vec::new(); 134 + let mut input_cursor = Cursor::new(data); 135 + let mut output_cursor = Cursor::new(&mut output); 136 + builder 137 + .sign( 138 + &callback_signer, 139 + "video/mp4", 140 + &mut input_cursor, 141 + &mut output_cursor, 142 + ) 143 + .map_err(|e| SPError::C2paError(e.to_string()))?; 144 + Ok(output) 145 + } 146 + 147 + #[uniffi::export] 148 + pub fn get_manifests(data: Vec<u8>) -> Result<String, SPError> { 149 + let store = Reader::from_stream("video/mp4", Cursor::new(data)) 150 + .map_err(|e| SPError::C2paError(e.to_string()))?; 151 + let result = serde_json::json!({ 152 + "manifests": store.manifests() 153 + }); 154 + Ok(result.to_string()) 155 + } 156 + 157 + #[uniffi::export] 158 + pub fn resign( 159 + unsigned_seg_label: String, 160 + unsigned_seg_data: Vec<u8>, 161 + signed_concat_data: Vec<u8>, 162 + certs: Vec<u8>, 163 + gosigner: Arc<dyn GoSigner>, 164 + ) -> Result<Vec<u8>, SPError> { 165 + let callback_signer = CallbackSigner::new( 166 + move |_context: *const (), data: &[u8]| { 167 + gosigner 168 + .sign(data.to_vec()) 169 + .map_err(|e| c2pa::Error::BadParam(e.to_string())) 170 + }, 171 + c2pa::SigningAlg::Es256K, 172 + certs, 173 + ); 174 + 175 + let mut validation_log = StatusTracker::default(); 176 + 177 + let combined_store = Store::from_stream( 178 + "video/mp4", 179 + Cursor::new(signed_concat_data), 180 + true, 181 + &mut validation_log, 182 + ) 183 + .map_err(|e| SPError::C2paError(format!("from_stream failed: {}", e)))?; 184 + 185 + let seg_claim = combined_store 186 + .get_claim(unsigned_seg_label.as_str()) 187 + .ok_or(SPError::C2paError(format!( 188 + "Segment claim not found: {}", 189 + unsigned_seg_label 190 + )))?; 191 + 192 + let seg_claim_clone = seg_claim.clone(); 193 + 194 + let mut seg_store = Store::new(); 195 + let _provenance = seg_store 196 + .commit_claim(seg_claim_clone) 197 + .map_err(|e| SPError::C2paError(format!("commit_claim failed: {}", e)))?; 198 + 199 + let mut output = Vec::new(); 200 + let mut output_cursor = Cursor::new(&mut output); 201 + let mut input_cursor = Cursor::new(unsigned_seg_data); 202 + 203 + let jumbf_bytes = seg_store 204 + .to_jumbf(&callback_signer) 205 + .map_err(|e| SPError::C2paError(format!("to_jumbf failed: {}", e)))?; 206 + 207 + jumbf_io::save_jumbf_to_stream( 208 + "video/mp4", 209 + &mut input_cursor, 210 + &mut output_cursor, 211 + &jumbf_bytes, 212 + ) 213 + .map_err(|e| SPError::C2paError(format!("save_jumbf_to_stream failed: {}", e)))?; 214 + 215 + // seg_store 216 + // .save_to_stream( 217 + // "video/mp4", 218 + // &mut input_cursor, 219 + // &mut output_cursor, 220 + // &callback_signer, 221 + // ) 222 + // .map_err(|e| SPError::C2paError(format!("save_to_stream failed: {}", e)))?; 223 + Ok(output) 224 + } 225 + 226 + #[uniffi::export] 227 + pub fn sign_with_ingredients( 228 + manifest: String, 229 + data: Vec<u8>, 230 + certs: Vec<u8>, 231 + ingredients: Vec<Vec<u8>>, 232 + gosigner: Arc<dyn GoSigner>, 233 + ) -> Result<Vec<u8>, SPError> { 234 + Settings::from_toml(TOML_SETTINGS).map_err(|e| SPError::C2paError(e.to_string()))?; 235 + let callback_signer = CallbackSigner::new( 236 + move |_context: *const (), data: &[u8]| { 113 237 gosigner 114 238 .sign(data.to_vec()) 115 239 .map_err(|e| c2pa::Error::BadParam(e.to_string())) ··· 119 243 ); 120 244 let mut builder = 121 245 Builder::from_json(&manifest).map_err(|e| SPError::C2paError(e.to_string()))?; 246 + for ingredient in ingredients { 247 + let mut cursor = Cursor::new(ingredient); 248 + let ingredient = Ingredient::from_stream("video/mp4", &mut cursor) 249 + .map_err(|e| SPError::C2paError(e.to_string()))?; 250 + builder.add_ingredient(ingredient); 251 + } 122 252 let mut output = Vec::new(); 123 253 let mut input_cursor = Cursor::new(data); 124 254 let mut output_cursor = Cursor::new(&mut output);
+1 -1
subprojects/gstreamer-full.wrap
··· 1 1 [wrap-git] 2 2 url = https://gitlab.freedesktop.org/iameli/gstreamer.git 3 - revision = 5c372f9595b7024d1074e45ee50b68b7a9a3f5e3 3 + revision = 587f2f9d8678df804515dac40aaac0a732c32c00 4 4 depth = 1
+67
test/remote/remote-fixtures.go
··· 1 1 package remote 2 2 3 3 import ( 4 + "archive/tar" 5 + "compress/gzip" 4 6 "crypto/sha256" 5 7 "encoding/hex" 6 8 "fmt" ··· 80 82 81 83 return finalPath 82 84 } 85 + 86 + // takes a tarball, returns a directory with the contents 87 + func RemoteArchive(name string) string { 88 + fpath := RemoteFixture(name) 89 + 90 + // Create extracted directory adjacent to the archive file 91 + dir := filepath.Dir(fpath) 92 + extractedDir := filepath.Join(dir, "extracted") 93 + 94 + if err := os.MkdirAll(extractedDir, 0755); err != nil { 95 + panic(err) 96 + } 97 + 98 + // Extract the tarball contents into the directory 99 + file, err := os.Open(fpath) 100 + if err != nil { 101 + panic(err) 102 + } 103 + defer file.Close() 104 + 105 + // Create gzip reader 106 + gzr, err := gzip.NewReader(file) 107 + if err != nil { 108 + panic(err) 109 + } 110 + defer gzr.Close() 111 + 112 + tr := tar.NewReader(gzr) 113 + for { 114 + header, err := tr.Next() 115 + if err == io.EOF { 116 + break 117 + } 118 + if err != nil { 119 + panic(err) 120 + } 121 + 122 + target := filepath.Join(extractedDir, header.Name) 123 + 124 + switch header.Typeflag { 125 + case tar.TypeDir: 126 + if err := os.MkdirAll(target, 0755); err != nil { 127 + panic(err) 128 + } 129 + case tar.TypeReg: 130 + // Create parent directories if needed 131 + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { 132 + panic(err) 133 + } 134 + 135 + outFile, err := os.Create(target) 136 + if err != nil { 137 + panic(err) 138 + } 139 + 140 + if _, err := io.Copy(outFile, tr); err != nil { 141 + outFile.Close() 142 + panic(err) 143 + } 144 + outFile.Close() 145 + } 146 + } 147 + 148 + return extractedDir 149 + }