Live video on the AT Protocol

rust: move to streaming adapter for most things

+881 -70
+14 -1
pkg/aqio/aqio.go
··· 2 2 3 3 import ( 4 4 "errors" 5 + "fmt" 5 6 "io" 6 7 7 8 "github.com/johncgriffin/overflow" 8 9 ) 9 10 11 + func NewReadWriteSeeker(buf []byte) *ReadWriteSeeker { 12 + return &ReadWriteSeeker{buf: buf, pos: 0} 13 + } 14 + 10 15 // ReadWriteSeeker is an in-memory io.ReadWriteSeeker implementation 11 16 type ReadWriteSeeker struct { 12 17 buf []byte ··· 15 20 16 21 // Write implements the io.Writer interface 17 22 func (rws *ReadWriteSeeker) Write(p []byte) (n int, err error) { 23 + fmt.Printf("Write: pos=%d len(p)=%d\n", rws.pos, len(p)) 18 24 minCap := overflow.Addp(rws.pos, len(p)) 19 25 if minCap > cap(rws.buf) { // Make sure buf has enough capacity: 20 - buf2 := make([]byte, len(rws.buf), overflow.Addp(minCap, len(p))) // add some extra 26 + fmt.Printf("Write: pos=%d len(p)=%d minCap=%d\n", rws.pos, len(p), minCap) 27 + newCap := cap(rws.buf) * 2 28 + if newCap == 0 { 29 + newCap = 128 30 + } else if newCap < minCap { 31 + newCap = minCap * 2 32 + } 33 + buf2 := make([]byte, len(rws.buf), newCap) // double 21 34 copy(buf2, rws.buf) 22 35 rws.buf = buf2 23 36 }
+123
pkg/c2patypes/stream_adapter.go
··· 1 + package c2patypes 2 + 3 + import ( 4 + "errors" 5 + "fmt" 6 + "io" 7 + ) 8 + 9 + // type Stream interface { 10 + // // Read a stream of bytes from the stream 11 + // ReadStream(length uint64) ([]byte, error) 12 + // // Seek to a position in the stream 13 + // SeekStream(pos int64, mode uint64) (uint64, error) 14 + // // Write a stream of bytes to the stream 15 + // WriteStream(data []byte) (uint64, error) 16 + // } 17 + 18 + // pub enum SeekMode { 19 + // Start = 0, 20 + // End = 1, 21 + // Current = 2, 22 + // } 23 + 24 + const ( 25 + SeekModeStart uint64 = 0 26 + SeekModeEnd uint64 = 1 27 + SeekModeCurrent uint64 = 2 28 + ) 29 + 30 + func NewReader(rs io.ReadSeeker) *C2PAStreamReader { 31 + return &C2PAStreamReader{ReadSeeker: rs} 32 + } 33 + 34 + func NewWriter(rws io.ReadWriteSeeker) *C2PAStreamWriter { 35 + return &C2PAStreamWriter{ReadWriteSeeker: rws} 36 + } 37 + 38 + // Wrapped io.ReadSeeker for passing to Rust. Doesn't write. 39 + type C2PAStreamReader struct { 40 + io.ReadSeeker 41 + } 42 + 43 + func (s *C2PAStreamReader) ReadStream(length uint64) ([]byte, error) { 44 + return readStream(s.ReadSeeker, length) 45 + } 46 + 47 + func (s *C2PAStreamReader) SeekStream(pos int64, mode uint64) (uint64, error) { 48 + return seekStream(s.ReadSeeker, pos, mode) 49 + } 50 + 51 + func (s *C2PAStreamReader) WriteStream(data []byte) (uint64, error) { 52 + return 0, fmt.Errorf("Writing is not implemented for C2PAStreamReader") 53 + } 54 + 55 + // Wrapped io.Writer for passing to Rust. 56 + type C2PAStreamWriter struct { 57 + io.ReadWriteSeeker 58 + } 59 + 60 + func (s *C2PAStreamWriter) ReadStream(length uint64) ([]byte, error) { 61 + return readStream(s.ReadWriteSeeker, length) 62 + } 63 + 64 + func (s *C2PAStreamWriter) SeekStream(pos int64, mode uint64) (uint64, error) { 65 + return seekStream(s.ReadWriteSeeker, pos, mode) 66 + } 67 + 68 + func (s *C2PAStreamWriter) WriteStream(data []byte) (uint64, error) { 69 + return writeStream(s.ReadWriteSeeker, data) 70 + } 71 + 72 + func readStream(r io.ReadSeeker, length uint64) ([]byte, error) { 73 + // fmt.Printf("read length=%d\n", length) 74 + bs := make([]byte, length) 75 + read, err := r.Read(bs) 76 + if err != nil { 77 + if errors.Is(err, io.EOF) { 78 + if read == 0 { 79 + // fmt.Printf("read EOF read=%d returning empty?", read) 80 + return []byte{}, nil 81 + } 82 + // partial := bs[read:] 83 + // return partial, nil 84 + } 85 + // fmt.Printf("io error=%s\n", err) 86 + return []byte{}, err 87 + } 88 + if uint64(read) < length { 89 + partial := bs[:read] 90 + // fmt.Printf("read returning partial read=%d len=%d\n", read, len(partial)) 91 + return partial, nil 92 + } 93 + // fmt.Printf("read returning full read=%d len=%d\n", read, len(bs)) 94 + return bs, nil 95 + } 96 + 97 + func seekStream(r io.ReadSeeker, pos int64, mode uint64) (uint64, error) { 98 + // fmt.Printf("seek pos=%d\n", pos) 99 + var seekMode int 100 + if mode == SeekModeCurrent { 101 + seekMode = io.SeekCurrent 102 + } else if mode == SeekModeStart { 103 + seekMode = io.SeekStart 104 + } else if mode == SeekModeEnd { 105 + seekMode = io.SeekEnd 106 + } else { 107 + // fmt.Printf("seek mode unsupported mode=%d\n", mode) 108 + return 0, fmt.Errorf("unknown seek mode: %d", mode) 109 + } 110 + newPos, err := r.Seek(pos, seekMode) 111 + if err != nil { 112 + return 0, err 113 + } 114 + return uint64(newPos), nil 115 + } 116 + 117 + func writeStream(w io.ReadWriteSeeker, data []byte) (uint64, error) { 118 + wrote, err := w.Write(data) 119 + if err != nil { 120 + return uint64(wrote), err 121 + } 122 + return uint64(wrote), nil 123 + }
+337 -24
pkg/iroh/generated/iroh_streamplace/iroh_streamplace.go
··· 338 338 339 339 FfiConverterDataHandlerINSTANCE.register() 340 340 FfiConverterGoSignerINSTANCE.register() 341 + FfiConverterStreamINSTANCE.register() 341 342 uniffiCheckChecksums() 342 343 } 343 344 ··· 356 357 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 357 358 return C.uniffi_iroh_streamplace_checksum_func_get_manifest_and_cert() 358 359 }) 359 - if checksum != 17550 { 360 + if checksum != 36028 { 360 361 // If this happens try cleaning and rebuilding your project 361 362 panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_func_get_manifest_and_cert: UniFFI API checksum mismatch") 362 363 } ··· 365 366 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 366 367 return C.uniffi_iroh_streamplace_checksum_func_get_manifests() 367 368 }) 368 - if checksum != 17 { 369 + if checksum != 2548 { 369 370 // If this happens try cleaning and rebuilding your project 370 371 panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_func_get_manifests: UniFFI API checksum mismatch") 371 372 } ··· 401 402 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 402 403 return C.uniffi_iroh_streamplace_checksum_func_resign() 403 404 }) 404 - if checksum != 32728 { 405 + if checksum != 7588 { 405 406 // If this happens try cleaning and rebuilding your project 406 407 panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_func_resign: UniFFI API checksum mismatch") 407 408 } ··· 410 411 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 411 412 return C.uniffi_iroh_streamplace_checksum_func_sign() 412 413 }) 413 - if checksum != 23786 { 414 + if checksum != 50601 { 414 415 // If this happens try cleaning and rebuilding your project 415 416 panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_func_sign: UniFFI API checksum mismatch") 416 417 } ··· 419 420 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 420 421 return C.uniffi_iroh_streamplace_checksum_func_sign_with_ingredients() 421 422 }) 422 - if checksum != 63680 { 423 + if checksum != 34840 { 423 424 // If this happens try cleaning and rebuilding your project 424 425 panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_func_sign_with_ingredients: UniFFI API checksum mismatch") 425 426 } ··· 714 715 } 715 716 { 716 717 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 718 + return C.uniffi_iroh_streamplace_checksum_method_stream_read_stream() 719 + }) 720 + if checksum != 62815 { 721 + // If this happens try cleaning and rebuilding your project 722 + panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_stream_read_stream: UniFFI API checksum mismatch") 723 + } 724 + } 725 + { 726 + checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 727 + return C.uniffi_iroh_streamplace_checksum_method_stream_seek_stream() 728 + }) 729 + if checksum != 56397 { 730 + // If this happens try cleaning and rebuilding your project 731 + panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_stream_seek_stream: UniFFI API checksum mismatch") 732 + } 733 + } 734 + { 735 + checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 736 + return C.uniffi_iroh_streamplace_checksum_method_stream_write_stream() 737 + }) 738 + if checksum != 59847 { 739 + // If this happens try cleaning and rebuilding your project 740 + panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_stream_write_stream: UniFFI API checksum mismatch") 741 + } 742 + } 743 + { 744 + checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 717 745 return C.uniffi_iroh_streamplace_checksum_method_subscriberesponse_next_raw() 718 746 }) 719 747 if checksum != 55650 { ··· 818 846 type FfiDestroyerUint64 struct{} 819 847 820 848 func (FfiDestroyerUint64) Destroy(_ uint64) {} 849 + 850 + type FfiConverterInt64 struct{} 851 + 852 + var FfiConverterInt64INSTANCE = FfiConverterInt64{} 853 + 854 + func (FfiConverterInt64) Lower(value int64) C.int64_t { 855 + return C.int64_t(value) 856 + } 857 + 858 + func (FfiConverterInt64) Write(writer io.Writer, value int64) { 859 + writeInt64(writer, value) 860 + } 861 + 862 + func (FfiConverterInt64) Lift(value C.int64_t) int64 { 863 + return int64(value) 864 + } 865 + 866 + func (FfiConverterInt64) Read(reader io.Reader) int64 { 867 + return readInt64(reader) 868 + } 869 + 870 + type FfiDestroyerInt64 struct{} 871 + 872 + func (FfiDestroyerInt64) Destroy(_ int64) {} 821 873 822 874 type FfiConverterBool struct{} 823 875 ··· 2393 2445 value.Destroy() 2394 2446 } 2395 2447 2448 + // This allows for a callback stream over the Uniffi interface. 2449 + // Implement these stream functions in the foreign language 2450 + // and this will provide Rust Stream trait implementations 2451 + // This is necessary since the Rust traits cannot be implemented directly 2452 + // as uniffi callbacks 2453 + type Stream interface { 2454 + // Read a stream of bytes from the stream 2455 + ReadStream(length uint64) ([]byte, error) 2456 + // Seek to a position in the stream 2457 + SeekStream(pos int64, mode uint64) (uint64, error) 2458 + // Write a stream of bytes to the stream 2459 + WriteStream(data []byte) (uint64, error) 2460 + } 2461 + 2462 + // This allows for a callback stream over the Uniffi interface. 2463 + // Implement these stream functions in the foreign language 2464 + // and this will provide Rust Stream trait implementations 2465 + // This is necessary since the Rust traits cannot be implemented directly 2466 + // as uniffi callbacks 2467 + type StreamImpl struct { 2468 + ffiObject FfiObject 2469 + } 2470 + 2471 + // Read a stream of bytes from the stream 2472 + func (_self *StreamImpl) ReadStream(length uint64) ([]byte, error) { 2473 + _pointer := _self.ffiObject.incrementPointer("Stream") 2474 + defer _self.ffiObject.decrementPointer() 2475 + _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI { 2476 + return GoRustBuffer{ 2477 + inner: C.uniffi_iroh_streamplace_fn_method_stream_read_stream( 2478 + _pointer, FfiConverterUint64INSTANCE.Lower(length), _uniffiStatus), 2479 + } 2480 + }) 2481 + if _uniffiErr != nil { 2482 + var _uniffiDefaultValue []byte 2483 + return _uniffiDefaultValue, _uniffiErr 2484 + } else { 2485 + return FfiConverterBytesINSTANCE.Lift(_uniffiRV), nil 2486 + } 2487 + } 2488 + 2489 + // Seek to a position in the stream 2490 + func (_self *StreamImpl) SeekStream(pos int64, mode uint64) (uint64, error) { 2491 + _pointer := _self.ffiObject.incrementPointer("Stream") 2492 + defer _self.ffiObject.decrementPointer() 2493 + _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) C.uint64_t { 2494 + return C.uniffi_iroh_streamplace_fn_method_stream_seek_stream( 2495 + _pointer, FfiConverterInt64INSTANCE.Lower(pos), FfiConverterUint64INSTANCE.Lower(mode), _uniffiStatus) 2496 + }) 2497 + if _uniffiErr != nil { 2498 + var _uniffiDefaultValue uint64 2499 + return _uniffiDefaultValue, _uniffiErr 2500 + } else { 2501 + return FfiConverterUint64INSTANCE.Lift(_uniffiRV), nil 2502 + } 2503 + } 2504 + 2505 + // Write a stream of bytes to the stream 2506 + func (_self *StreamImpl) WriteStream(data []byte) (uint64, error) { 2507 + _pointer := _self.ffiObject.incrementPointer("Stream") 2508 + defer _self.ffiObject.decrementPointer() 2509 + _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) C.uint64_t { 2510 + return C.uniffi_iroh_streamplace_fn_method_stream_write_stream( 2511 + _pointer, FfiConverterBytesINSTANCE.Lower(data), _uniffiStatus) 2512 + }) 2513 + if _uniffiErr != nil { 2514 + var _uniffiDefaultValue uint64 2515 + return _uniffiDefaultValue, _uniffiErr 2516 + } else { 2517 + return FfiConverterUint64INSTANCE.Lift(_uniffiRV), nil 2518 + } 2519 + } 2520 + func (object *StreamImpl) Destroy() { 2521 + runtime.SetFinalizer(object, nil) 2522 + object.ffiObject.destroy() 2523 + } 2524 + 2525 + type FfiConverterStream struct { 2526 + handleMap *concurrentHandleMap[Stream] 2527 + } 2528 + 2529 + var FfiConverterStreamINSTANCE = FfiConverterStream{ 2530 + handleMap: newConcurrentHandleMap[Stream](), 2531 + } 2532 + 2533 + func (c FfiConverterStream) Lift(pointer unsafe.Pointer) Stream { 2534 + result := &StreamImpl{ 2535 + newFfiObject( 2536 + pointer, 2537 + func(pointer unsafe.Pointer, status *C.RustCallStatus) unsafe.Pointer { 2538 + return C.uniffi_iroh_streamplace_fn_clone_stream(pointer, status) 2539 + }, 2540 + func(pointer unsafe.Pointer, status *C.RustCallStatus) { 2541 + C.uniffi_iroh_streamplace_fn_free_stream(pointer, status) 2542 + }, 2543 + ), 2544 + } 2545 + runtime.SetFinalizer(result, (*StreamImpl).Destroy) 2546 + return result 2547 + } 2548 + 2549 + func (c FfiConverterStream) Read(reader io.Reader) Stream { 2550 + return c.Lift(unsafe.Pointer(uintptr(readUint64(reader)))) 2551 + } 2552 + 2553 + func (c FfiConverterStream) Lower(value Stream) unsafe.Pointer { 2554 + // TODO: this is bad - all synchronization from ObjectRuntime.go is discarded here, 2555 + // because the pointer will be decremented immediately after this function returns, 2556 + // and someone will be left holding onto a non-locked pointer. 2557 + pointer := unsafe.Pointer(uintptr(c.handleMap.insert(value))) 2558 + return pointer 2559 + 2560 + } 2561 + 2562 + func (c FfiConverterStream) Write(writer io.Writer, value Stream) { 2563 + writeUint64(writer, uint64(uintptr(c.Lower(value)))) 2564 + } 2565 + 2566 + type FfiDestroyerStream struct{} 2567 + 2568 + func (_ FfiDestroyerStream) Destroy(value Stream) { 2569 + if val, ok := value.(*StreamImpl); ok { 2570 + val.Destroy() 2571 + } else { 2572 + panic("Expected *StreamImpl") 2573 + } 2574 + } 2575 + 2576 + //export iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod0 2577 + func iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod0(uniffiHandle C.uint64_t, length C.uint64_t, uniffiOutReturn *C.RustBuffer, callStatus *C.RustCallStatus) { 2578 + handle := uint64(uniffiHandle) 2579 + uniffiObj, ok := FfiConverterStreamINSTANCE.handleMap.tryGet(handle) 2580 + if !ok { 2581 + panic(fmt.Errorf("no callback in handle map: %d", handle)) 2582 + } 2583 + 2584 + res, err := 2585 + uniffiObj.ReadStream( 2586 + FfiConverterUint64INSTANCE.Lift(length), 2587 + ) 2588 + 2589 + if err != nil { 2590 + var actualError *SpError 2591 + if errors.As(err, &actualError) { 2592 + *callStatus = C.RustCallStatus{ 2593 + code: C.int8_t(uniffiCallbackResultError), 2594 + errorBuf: FfiConverterSpErrorINSTANCE.Lower(actualError), 2595 + } 2596 + } else { 2597 + *callStatus = C.RustCallStatus{ 2598 + code: C.int8_t(uniffiCallbackUnexpectedResultError), 2599 + } 2600 + } 2601 + return 2602 + } 2603 + 2604 + *uniffiOutReturn = FfiConverterBytesINSTANCE.Lower(res) 2605 + } 2606 + 2607 + //export iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod1 2608 + func iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod1(uniffiHandle C.uint64_t, pos C.int64_t, mode C.uint64_t, uniffiOutReturn *C.uint64_t, callStatus *C.RustCallStatus) { 2609 + handle := uint64(uniffiHandle) 2610 + uniffiObj, ok := FfiConverterStreamINSTANCE.handleMap.tryGet(handle) 2611 + if !ok { 2612 + panic(fmt.Errorf("no callback in handle map: %d", handle)) 2613 + } 2614 + 2615 + res, err := 2616 + uniffiObj.SeekStream( 2617 + FfiConverterInt64INSTANCE.Lift(pos), 2618 + FfiConverterUint64INSTANCE.Lift(mode), 2619 + ) 2620 + 2621 + if err != nil { 2622 + var actualError *SpError 2623 + if errors.As(err, &actualError) { 2624 + *callStatus = C.RustCallStatus{ 2625 + code: C.int8_t(uniffiCallbackResultError), 2626 + errorBuf: FfiConverterSpErrorINSTANCE.Lower(actualError), 2627 + } 2628 + } else { 2629 + *callStatus = C.RustCallStatus{ 2630 + code: C.int8_t(uniffiCallbackUnexpectedResultError), 2631 + } 2632 + } 2633 + return 2634 + } 2635 + 2636 + *uniffiOutReturn = FfiConverterUint64INSTANCE.Lower(res) 2637 + } 2638 + 2639 + //export iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod2 2640 + func iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod2(uniffiHandle C.uint64_t, data C.RustBuffer, uniffiOutReturn *C.uint64_t, callStatus *C.RustCallStatus) { 2641 + handle := uint64(uniffiHandle) 2642 + uniffiObj, ok := FfiConverterStreamINSTANCE.handleMap.tryGet(handle) 2643 + if !ok { 2644 + panic(fmt.Errorf("no callback in handle map: %d", handle)) 2645 + } 2646 + 2647 + res, err := 2648 + uniffiObj.WriteStream( 2649 + FfiConverterBytesINSTANCE.Lift(GoRustBuffer{ 2650 + inner: data, 2651 + }), 2652 + ) 2653 + 2654 + if err != nil { 2655 + var actualError *SpError 2656 + if errors.As(err, &actualError) { 2657 + *callStatus = C.RustCallStatus{ 2658 + code: C.int8_t(uniffiCallbackResultError), 2659 + errorBuf: FfiConverterSpErrorINSTANCE.Lower(actualError), 2660 + } 2661 + } else { 2662 + *callStatus = C.RustCallStatus{ 2663 + code: C.int8_t(uniffiCallbackUnexpectedResultError), 2664 + } 2665 + } 2666 + return 2667 + } 2668 + 2669 + *uniffiOutReturn = FfiConverterUint64INSTANCE.Lower(res) 2670 + } 2671 + 2672 + var UniffiVTableCallbackInterfaceStreamINSTANCE = C.UniffiVTableCallbackInterfaceStream{ 2673 + readStream: (C.UniffiCallbackInterfaceStreamMethod0)(C.iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod0), 2674 + seekStream: (C.UniffiCallbackInterfaceStreamMethod1)(C.iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod1), 2675 + writeStream: (C.UniffiCallbackInterfaceStreamMethod2)(C.iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod2), 2676 + 2677 + uniffiFree: (C.UniffiCallbackInterfaceFree)(C.iroh_streamplace_cgo_dispatchCallbackInterfaceStreamFree), 2678 + } 2679 + 2680 + //export iroh_streamplace_cgo_dispatchCallbackInterfaceStreamFree 2681 + func iroh_streamplace_cgo_dispatchCallbackInterfaceStreamFree(handle C.uint64_t) { 2682 + FfiConverterStreamINSTANCE.handleMap.remove(uint64(handle)) 2683 + } 2684 + 2685 + func (c FfiConverterStream) register() { 2686 + C.uniffi_iroh_streamplace_fn_init_callback_vtable_stream(&UniffiVTableCallbackInterfaceStreamINSTANCE) 2687 + } 2688 + 2396 2689 // A response to a subscribe request. 2397 2690 // 2398 2691 // This can be used as a stream of [`SubscribeItem`]s. ··· 3613 3906 // Err* are used for checking error type with `errors.Is` 3614 3907 var ErrSpErrorNoCertificateChainFound = fmt.Errorf("SpErrorNoCertificateChainFound") 3615 3908 var ErrSpErrorC2paError = fmt.Errorf("SpErrorC2paError") 3909 + var ErrSpErrorIoError = fmt.Errorf("SpErrorIoError") 3616 3910 3617 3911 // Variant structs 3618 3912 type SpErrorNoCertificateChainFound struct { ··· 3653 3947 return target == ErrSpErrorC2paError 3654 3948 } 3655 3949 3950 + type SpErrorIoError struct { 3951 + message string 3952 + } 3953 + 3954 + func NewSpErrorIoError() *SpError { 3955 + return &SpError{err: &SpErrorIoError{}} 3956 + } 3957 + 3958 + func (e SpErrorIoError) destroy() { 3959 + } 3960 + 3961 + func (err SpErrorIoError) Error() string { 3962 + return fmt.Sprintf("IoError: %s", err.message) 3963 + } 3964 + 3965 + func (self SpErrorIoError) Is(target error) bool { 3966 + return target == ErrSpErrorIoError 3967 + } 3968 + 3656 3969 type FfiConverterSpError struct{} 3657 3970 3658 3971 var FfiConverterSpErrorINSTANCE = FfiConverterSpError{} ··· 3674 3987 return &SpError{&SpErrorNoCertificateChainFound{message}} 3675 3988 case 2: 3676 3989 return &SpError{&SpErrorC2paError{message}} 3990 + case 3: 3991 + return &SpError{&SpErrorIoError{message}} 3677 3992 default: 3678 3993 panic(fmt.Sprintf("Unknown error code %d in FfiConverterSpError.Read()", errorID)) 3679 3994 } ··· 3686 4001 writeInt32(writer, 1) 3687 4002 case *SpErrorC2paError: 3688 4003 writeInt32(writer, 2) 4004 + case *SpErrorIoError: 4005 + writeInt32(writer, 3) 3689 4006 default: 3690 4007 _ = variantValue 3691 4008 panic(fmt.Sprintf("invalid error value `%v` in FfiConverterSpError.Write", value)) ··· 3699 4016 case SpErrorNoCertificateChainFound: 3700 4017 variantValue.destroy() 3701 4018 case SpErrorC2paError: 4019 + variantValue.destroy() 4020 + case SpErrorIoError: 3702 4021 variantValue.destroy() 3703 4022 default: 3704 4023 _ = variantValue ··· 4759 5078 guard <- struct{}{} 4760 5079 } 4761 5080 4762 - func GetManifestAndCert(data []byte) (string, error) { 5081 + func GetManifestAndCert(data Stream) (string, error) { 4763 5082 _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI { 4764 5083 return GoRustBuffer{ 4765 - inner: C.uniffi_iroh_streamplace_fn_func_get_manifest_and_cert(FfiConverterBytesINSTANCE.Lower(data), _uniffiStatus), 5084 + inner: C.uniffi_iroh_streamplace_fn_func_get_manifest_and_cert(FfiConverterStreamINSTANCE.Lower(data), _uniffiStatus), 4766 5085 } 4767 5086 }) 4768 5087 if _uniffiErr != nil { ··· 4773 5092 } 4774 5093 } 4775 5094 4776 - func GetManifests(data []byte) (string, error) { 5095 + func GetManifests(data Stream) (string, error) { 4777 5096 _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI { 4778 5097 return GoRustBuffer{ 4779 - inner: C.uniffi_iroh_streamplace_fn_func_get_manifests(FfiConverterBytesINSTANCE.Lower(data), _uniffiStatus), 5098 + inner: C.uniffi_iroh_streamplace_fn_func_get_manifests(FfiConverterStreamINSTANCE.Lower(data), _uniffiStatus), 4780 5099 } 4781 5100 }) 4782 5101 if _uniffiErr != nil { ··· 4821 5140 } 4822 5141 } 4823 5142 4824 - func Resign(unsignedSegData [][]byte, signedConcatData []byte, manifestList []string, certs [][]byte) ([][]byte, error) { 5143 + func Resign(unsignedSegData [][]byte, signedConcatData Stream, manifestList []string, certs [][]byte) ([][]byte, error) { 4825 5144 _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI { 4826 5145 return GoRustBuffer{ 4827 - inner: C.uniffi_iroh_streamplace_fn_func_resign(FfiConverterSequenceBytesINSTANCE.Lower(unsignedSegData), FfiConverterBytesINSTANCE.Lower(signedConcatData), FfiConverterSequenceStringINSTANCE.Lower(manifestList), FfiConverterSequenceBytesINSTANCE.Lower(certs), _uniffiStatus), 5146 + inner: C.uniffi_iroh_streamplace_fn_func_resign(FfiConverterSequenceBytesINSTANCE.Lower(unsignedSegData), FfiConverterStreamINSTANCE.Lower(signedConcatData), FfiConverterSequenceStringINSTANCE.Lower(manifestList), FfiConverterSequenceBytesINSTANCE.Lower(certs), _uniffiStatus), 4828 5147 } 4829 5148 }) 4830 5149 if _uniffiErr != nil { ··· 4835 5154 } 4836 5155 } 4837 5156 4838 - func Sign(manifest string, data []byte, certs []byte, gosigner GoSigner) ([]byte, error) { 5157 + func Sign(manifest string, data Stream, certs []byte, gosigner GoSigner) ([]byte, error) { 4839 5158 _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI { 4840 5159 return GoRustBuffer{ 4841 - inner: C.uniffi_iroh_streamplace_fn_func_sign(FfiConverterStringINSTANCE.Lower(manifest), FfiConverterBytesINSTANCE.Lower(data), FfiConverterBytesINSTANCE.Lower(certs), FfiConverterGoSignerINSTANCE.Lower(gosigner), _uniffiStatus), 5160 + inner: C.uniffi_iroh_streamplace_fn_func_sign(FfiConverterStringINSTANCE.Lower(manifest), FfiConverterStreamINSTANCE.Lower(data), FfiConverterBytesINSTANCE.Lower(certs), FfiConverterGoSignerINSTANCE.Lower(gosigner), _uniffiStatus), 4842 5161 } 4843 5162 }) 4844 5163 if _uniffiErr != nil { ··· 4849 5168 } 4850 5169 } 4851 5170 4852 - func SignWithIngredients(manifest string, data []byte, certs []byte, ingredients [][]byte, gosigner GoSigner) ([]byte, error) { 4853 - _uniffiRV, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) RustBufferI { 4854 - return GoRustBuffer{ 4855 - inner: C.uniffi_iroh_streamplace_fn_func_sign_with_ingredients(FfiConverterStringINSTANCE.Lower(manifest), FfiConverterBytesINSTANCE.Lower(data), FfiConverterBytesINSTANCE.Lower(certs), FfiConverterSequenceBytesINSTANCE.Lower(ingredients), FfiConverterGoSignerINSTANCE.Lower(gosigner), _uniffiStatus), 4856 - } 5171 + func SignWithIngredients(manifest string, data Stream, certs []byte, ingredients [][]byte, gosigner GoSigner, output Stream) error { 5172 + _, _uniffiErr := rustCallWithError[SpError](FfiConverterSpError{}, func(_uniffiStatus *C.RustCallStatus) bool { 5173 + C.uniffi_iroh_streamplace_fn_func_sign_with_ingredients(FfiConverterStringINSTANCE.Lower(manifest), FfiConverterStreamINSTANCE.Lower(data), FfiConverterBytesINSTANCE.Lower(certs), FfiConverterSequenceBytesINSTANCE.Lower(ingredients), FfiConverterGoSignerINSTANCE.Lower(gosigner), FfiConverterStreamINSTANCE.Lower(output), _uniffiStatus) 5174 + return false 4857 5175 }) 4858 - if _uniffiErr != nil { 4859 - var _uniffiDefaultValue []byte 4860 - return _uniffiDefaultValue, _uniffiErr 4861 - } else { 4862 - return FfiConverterBytesINSTANCE.Lift(_uniffiRV), nil 4863 - } 5176 + return _uniffiErr.AsError() 4864 5177 } 4865 5178 4866 5179 func SubscribeItemDebug(item SubscribeItem) string {
+109 -5
pkg/iroh/generated/iroh_streamplace/iroh_streamplace.h
··· 406 406 407 407 408 408 #endif 409 + #ifndef UNIFFI_FFIDEF_CALLBACK_INTERFACE_STREAM_METHOD0 410 + #define UNIFFI_FFIDEF_CALLBACK_INTERFACE_STREAM_METHOD0 411 + typedef void (*UniffiCallbackInterfaceStreamMethod0)(uint64_t uniffi_handle, uint64_t length, RustBuffer* uniffi_out_return, RustCallStatus* callStatus ); 412 + 413 + // Making function static works arround: 414 + // https://github.com/golang/go/issues/11263 415 + static void call_UniffiCallbackInterfaceStreamMethod0( 416 + UniffiCallbackInterfaceStreamMethod0 cb, uint64_t uniffi_handle, uint64_t length, RustBuffer* uniffi_out_return, RustCallStatus* callStatus ) 417 + { 418 + return cb(uniffi_handle, length, uniffi_out_return, callStatus ); 419 + } 420 + 421 + 422 + #endif 423 + #ifndef UNIFFI_FFIDEF_CALLBACK_INTERFACE_STREAM_METHOD1 424 + #define UNIFFI_FFIDEF_CALLBACK_INTERFACE_STREAM_METHOD1 425 + typedef void (*UniffiCallbackInterfaceStreamMethod1)(uint64_t uniffi_handle, int64_t pos, uint64_t mode, uint64_t* uniffi_out_return, RustCallStatus* callStatus ); 426 + 427 + // Making function static works arround: 428 + // https://github.com/golang/go/issues/11263 429 + static void call_UniffiCallbackInterfaceStreamMethod1( 430 + UniffiCallbackInterfaceStreamMethod1 cb, uint64_t uniffi_handle, int64_t pos, uint64_t mode, uint64_t* uniffi_out_return, RustCallStatus* callStatus ) 431 + { 432 + return cb(uniffi_handle, pos, mode, uniffi_out_return, callStatus ); 433 + } 434 + 435 + 436 + #endif 437 + #ifndef UNIFFI_FFIDEF_CALLBACK_INTERFACE_STREAM_METHOD2 438 + #define UNIFFI_FFIDEF_CALLBACK_INTERFACE_STREAM_METHOD2 439 + typedef void (*UniffiCallbackInterfaceStreamMethod2)(uint64_t uniffi_handle, RustBuffer data, uint64_t* uniffi_out_return, RustCallStatus* callStatus ); 440 + 441 + // Making function static works arround: 442 + // https://github.com/golang/go/issues/11263 443 + static void call_UniffiCallbackInterfaceStreamMethod2( 444 + UniffiCallbackInterfaceStreamMethod2 cb, uint64_t uniffi_handle, RustBuffer data, uint64_t* uniffi_out_return, RustCallStatus* callStatus ) 445 + { 446 + return cb(uniffi_handle, data, uniffi_out_return, callStatus ); 447 + } 448 + 449 + 450 + #endif 409 451 #ifndef UNIFFI_FFIDEF_V_TABLE_CALLBACK_INTERFACE_DATA_HANDLER 410 452 #define UNIFFI_FFIDEF_V_TABLE_CALLBACK_INTERFACE_DATA_HANDLER 411 453 typedef struct UniffiVTableCallbackInterfaceDataHandler { ··· 420 462 UniffiCallbackInterfaceGoSignerMethod0 sign; 421 463 UniffiCallbackInterfaceFree uniffiFree; 422 464 } UniffiVTableCallbackInterfaceGoSigner; 465 + 466 + #endif 467 + #ifndef UNIFFI_FFIDEF_V_TABLE_CALLBACK_INTERFACE_STREAM 468 + #define UNIFFI_FFIDEF_V_TABLE_CALLBACK_INTERFACE_STREAM 469 + typedef struct UniffiVTableCallbackInterfaceStream { 470 + UniffiCallbackInterfaceStreamMethod0 readStream; 471 + UniffiCallbackInterfaceStreamMethod1 seekStream; 472 + UniffiCallbackInterfaceStreamMethod2 writeStream; 473 + UniffiCallbackInterfaceFree uniffiFree; 474 + } UniffiVTableCallbackInterfaceStream; 423 475 424 476 #endif 425 477 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_DATAHANDLER ··· 698 750 RustBuffer uniffi_iroh_streamplace_fn_method_publickey_uniffi_trait_display(void* ptr, RustCallStatus *out_status 699 751 ); 700 752 #endif 753 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_STREAM 754 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_STREAM 755 + void* uniffi_iroh_streamplace_fn_clone_stream(void* ptr, RustCallStatus *out_status 756 + ); 757 + #endif 758 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FREE_STREAM 759 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FREE_STREAM 760 + void uniffi_iroh_streamplace_fn_free_stream(void* ptr, RustCallStatus *out_status 761 + ); 762 + #endif 763 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_INIT_CALLBACK_VTABLE_STREAM 764 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_INIT_CALLBACK_VTABLE_STREAM 765 + void uniffi_iroh_streamplace_fn_init_callback_vtable_stream(UniffiVTableCallbackInterfaceStream* vtable 766 + ); 767 + #endif 768 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_STREAM_READ_STREAM 769 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_STREAM_READ_STREAM 770 + RustBuffer uniffi_iroh_streamplace_fn_method_stream_read_stream(void* ptr, uint64_t length, RustCallStatus *out_status 771 + ); 772 + #endif 773 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_STREAM_SEEK_STREAM 774 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_STREAM_SEEK_STREAM 775 + uint64_t uniffi_iroh_streamplace_fn_method_stream_seek_stream(void* ptr, int64_t pos, uint64_t mode, RustCallStatus *out_status 776 + ); 777 + #endif 778 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_STREAM_WRITE_STREAM 779 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_STREAM_WRITE_STREAM 780 + uint64_t uniffi_iroh_streamplace_fn_method_stream_write_stream(void* ptr, RustBuffer data, RustCallStatus *out_status 781 + ); 782 + #endif 701 783 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_SUBSCRIBERESPONSE 702 784 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_SUBSCRIBERESPONSE 703 785 void* uniffi_iroh_streamplace_fn_clone_subscriberesponse(void* ptr, RustCallStatus *out_status ··· 735 817 #endif 736 818 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_GET_MANIFEST_AND_CERT 737 819 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_GET_MANIFEST_AND_CERT 738 - RustBuffer uniffi_iroh_streamplace_fn_func_get_manifest_and_cert(RustBuffer data, RustCallStatus *out_status 820 + RustBuffer uniffi_iroh_streamplace_fn_func_get_manifest_and_cert(void* data, RustCallStatus *out_status 739 821 ); 740 822 #endif 741 823 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_GET_MANIFESTS 742 824 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_GET_MANIFESTS 743 - RustBuffer uniffi_iroh_streamplace_fn_func_get_manifests(RustBuffer data, RustCallStatus *out_status 825 + RustBuffer uniffi_iroh_streamplace_fn_func_get_manifests(void* data, RustCallStatus *out_status 744 826 ); 745 827 #endif 746 828 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_INIT_LOGGING ··· 761 843 #endif 762 844 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_RESIGN 763 845 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_RESIGN 764 - RustBuffer uniffi_iroh_streamplace_fn_func_resign(RustBuffer unsigned_seg_data, RustBuffer signed_concat_data, RustBuffer manifest_list, RustBuffer certs, RustCallStatus *out_status 846 + RustBuffer uniffi_iroh_streamplace_fn_func_resign(RustBuffer unsigned_seg_data, void* signed_concat_data, RustBuffer manifest_list, RustBuffer certs, RustCallStatus *out_status 765 847 ); 766 848 #endif 767 849 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_SIGN 768 850 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_SIGN 769 - RustBuffer uniffi_iroh_streamplace_fn_func_sign(RustBuffer manifest, RustBuffer data, RustBuffer certs, void* gosigner, RustCallStatus *out_status 851 + RustBuffer uniffi_iroh_streamplace_fn_func_sign(RustBuffer manifest, void* data, RustBuffer certs, void* gosigner, RustCallStatus *out_status 770 852 ); 771 853 #endif 772 854 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_SIGN_WITH_INGREDIENTS 773 855 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_SIGN_WITH_INGREDIENTS 774 - RustBuffer uniffi_iroh_streamplace_fn_func_sign_with_ingredients(RustBuffer manifest, RustBuffer data, RustBuffer certs, RustBuffer ingredients, void* gosigner, RustCallStatus *out_status 856 + void uniffi_iroh_streamplace_fn_func_sign_with_ingredients(RustBuffer manifest, void* data, RustBuffer certs, RustBuffer ingredients, void* gosigner, void* output, RustCallStatus *out_status 775 857 ); 776 858 #endif 777 859 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FUNC_SUBSCRIBE_ITEM_DEBUG ··· 1299 1381 1300 1382 ); 1301 1383 #endif 1384 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_STREAM_READ_STREAM 1385 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_STREAM_READ_STREAM 1386 + uint16_t uniffi_iroh_streamplace_checksum_method_stream_read_stream(void 1387 + 1388 + ); 1389 + #endif 1390 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_STREAM_SEEK_STREAM 1391 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_STREAM_SEEK_STREAM 1392 + uint16_t uniffi_iroh_streamplace_checksum_method_stream_seek_stream(void 1393 + 1394 + ); 1395 + #endif 1396 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_STREAM_WRITE_STREAM 1397 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_STREAM_WRITE_STREAM 1398 + uint16_t uniffi_iroh_streamplace_checksum_method_stream_write_stream(void 1399 + 1400 + ); 1401 + #endif 1302 1402 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_SUBSCRIBERESPONSE_NEXT_RAW 1303 1403 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_SUBSCRIBERESPONSE_NEXT_RAW 1304 1404 uint16_t uniffi_iroh_streamplace_checksum_method_subscriberesponse_next_raw(void ··· 1364 1464 void iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerFree(uint64_t handle); 1365 1465 void iroh_streamplace_cgo_dispatchCallbackInterfaceGoSignerMethod0(uint64_t uniffi_handle, RustBuffer data, RustBuffer* uniffi_out_return, RustCallStatus* callStatus ); 1366 1466 void iroh_streamplace_cgo_dispatchCallbackInterfaceGoSignerFree(uint64_t handle); 1467 + void iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod0(uint64_t uniffi_handle, uint64_t length, RustBuffer* uniffi_out_return, RustCallStatus* callStatus ); 1468 + void iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod1(uint64_t uniffi_handle, int64_t pos, uint64_t mode, uint64_t* uniffi_out_return, RustCallStatus* callStatus ); 1469 + void iroh_streamplace_cgo_dispatchCallbackInterfaceStreamMethod2(uint64_t uniffi_handle, RustBuffer data, uint64_t* uniffi_out_return, RustCallStatus* callStatus ); 1470 + void iroh_streamplace_cgo_dispatchCallbackInterfaceStreamFree(uint64_t handle); 1367 1471 1368 1472 void iroh_streamplace_uniffiFutureContinuationCallback(uint64_t, int8_t); 1369 1473 void iroh_streamplace_uniffiFreeGorutine(uint64_t);
+11 -13
pkg/media/media_signer.go
··· 12 12 "time" 13 13 14 14 "go.opentelemetry.io/otel" 15 + "stream.place/streamplace/pkg/aqio" 15 16 "stream.place/streamplace/pkg/aqtime" 16 17 "stream.place/streamplace/pkg/atproto" 17 18 c2patypes "stream.place/streamplace/pkg/c2patypes" ··· 159 160 rustCallbackSigner := &RustCallbackSigner{ 160 161 Signer: ms.Signer, 161 162 } 162 - bs, err = iroh_streamplace.Sign(string(manifestBs), bs, ms.Cert, rustCallbackSigner) 163 + bs, err = iroh_streamplace.Sign(string(manifestBs), c2patypes.NewReader(aqio.NewReadWriteSeeker(bs)), ms.Cert, rustCallbackSigner) 163 164 if err != nil { 164 165 return nil, err 165 166 } ··· 179 180 startTime := time.Now() 180 181 ctx, span := otel.Tracer("signer").Start(ctx, "SignMP4") 181 182 defer span.End() 182 - for _, ingredient := range ingredients { 183 - _, err := iroh_streamplace.GetManifestAndCert(ingredient) 184 - if err != nil { 185 - return nil, err 186 - } 187 - } 183 + // for _, ingredient := range ingredients { 184 + // _, err := iroh_streamplace.GetManifestAndCert(c2patypes.NewReader(aqio.NewReadWriteSeeker(ingredient))) 185 + // if err != nil { 186 + // return nil, err 187 + // } 188 + // } 188 189 // title := "livestream" 189 190 mani := obj{ 190 191 "title": "Livestream Clip", ··· 223 224 } 224 225 span.End() 225 226 226 - bs, err := io.ReadAll(input) 227 - if err != nil { 228 - return nil, fmt.Errorf("failed to read input: %w", err) 229 - } 230 227 ctx, span = otel.Tracer("signer").Start(ctx, "SignMP4_Sign") 231 228 rustCallbackSigner := &RustCallbackSigner{ 232 229 Signer: ms.Signer, 233 230 } 234 - bs, err = iroh_streamplace.SignWithIngredients(string(manifestBs), bs, ms.Cert, ingredients, rustCallbackSigner) 231 + rws := aqio.NewReadWriteSeeker([]byte{}) 232 + err = iroh_streamplace.SignWithIngredients(string(manifestBs), c2patypes.NewReader(input), ms.Cert, ingredients, rustCallbackSigner, c2patypes.NewWriter(rws)) 235 233 if err != nil { 236 234 return nil, err 237 235 } ··· 244 242 } 245 243 span.End() 246 244 spmetrics.SigningDuration.WithLabelValues(ms.StreamerName).Observe(float64(time.Since(startTime).Milliseconds())) 247 - return bs, nil 245 + return rws.Bytes() 248 246 } 249 247 250 248 // don't call externally! this is used as a callback for the rust library
+3 -2
pkg/media/segment_split.go
··· 7 7 "fmt" 8 8 "sort" 9 9 10 + "stream.place/streamplace/pkg/aqio" 10 11 c2patypes "stream.place/streamplace/pkg/c2patypes" 11 12 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 12 13 "stream.place/streamplace/pkg/log" ··· 29 30 30 31 // split a signed concatenated mp4 into its constituent signed segments 31 32 func SplitSegments(ctx context.Context, input []byte) ([]SplitSegment, error) { 32 - manifestsStr, err := iroh_streamplace.GetManifests(input) 33 + manifestsStr, err := iroh_streamplace.GetManifests(c2patypes.NewReader(aqio.NewReadWriteSeeker(input))) 33 34 if err != nil { 34 35 return nil, fmt.Errorf("failed to get manifests: %w", err) 35 36 } ··· 68 69 if err != nil { 69 70 return nil, fmt.Errorf("failed to segment file: %w", err) 70 71 } 71 - resignedSegs, err := iroh_streamplace.Resign(unsignedSegs, input, manifestStrs, certList) 72 + resignedSegs, err := iroh_streamplace.Resign(unsignedSegs, c2patypes.NewReader(aqio.NewReadWriteSeeker(input)), manifestStrs, certList) 72 73 if err != nil { 73 74 return nil, fmt.Errorf("failed to resign segments: %w", err) 74 75 }
+2 -1
pkg/media/validate.go
··· 12 12 13 13 "github.com/bluesky-social/indigo/atproto/crypto" 14 14 "go.opentelemetry.io/otel" 15 + "stream.place/streamplace/pkg/aqio" 15 16 "stream.place/streamplace/pkg/aqtime" 16 17 c2patypes "stream.place/streamplace/pkg/c2patypes" 17 18 "stream.place/streamplace/pkg/constants" ··· 209 210 // validate a signed mp4 file unto itself, ignoring whether this user is allowed and whatnot 210 211 func ValidateMP4Media(ctx context.Context, buf []byte) (*ValidationResult, error) { 211 212 var maniCert ManifestAndCert 212 - maniStr, err := iroh_streamplace.GetManifestAndCert(buf) 213 + maniStr, err := iroh_streamplace.GetManifestAndCert(c2patypes.NewReader(aqio.NewReadWriteSeeker(buf))) 213 214 if err != nil { 214 215 return nil, err 215 216 }
+20 -24
rust/iroh-streamplace/src/c2pa.rs
··· 10 10 use c2pa::status_tracker::StatusTracker; 11 11 use c2pa::store::Store; 12 12 13 - use serde_json; 13 + use crate::streams::Stream; 14 + use crate::streams::StreamAdapter; 14 15 15 - #[derive(Debug, thiserror::Error, uniffi::Error)] 16 - #[uniffi(flat_error)] 17 - pub enum SPError { 18 - #[error("No certificate chain found")] 19 - NoCertificateChainFound, 20 - #[error("C2PA error: {0}")] 21 - C2paError(String), 22 - } 16 + use serde_json; 17 + use tracing::info; 23 18 19 + use crate::error::SPError; 24 20 #[uniffi::export] 25 - pub fn get_manifest_and_cert(data: Vec<u8>) -> Result<String, SPError> { 26 - let reader = Reader::from_stream("video/mp4", Cursor::new(data)) 21 + pub fn get_manifest_and_cert(data: &dyn Stream) -> Result<String, SPError> { 22 + let reader = Reader::from_stream("video/mp4", StreamAdapter::from(data)) 27 23 .map_err(|e| SPError::C2paError(e.to_string()))?; 28 24 29 25 if let Some(manifest) = reader.active_manifest() { ··· 113 109 #[uniffi::export] 114 110 pub fn sign( 115 111 manifest: String, 116 - data: Vec<u8>, 112 + data: &dyn Stream, 117 113 certs: Vec<u8>, 118 114 gosigner: Arc<dyn GoSigner>, 119 115 ) -> Result<Vec<u8>, SPError> { ··· 131 127 let mut builder = 132 128 Builder::from_json(&manifest).map_err(|e| SPError::C2paError(e.to_string()))?; 133 129 let mut output = Vec::new(); 134 - let mut input_cursor = Cursor::new(data); 130 + let mut input_cursor = StreamAdapter::from(data); 135 131 let mut output_cursor = Cursor::new(&mut output); 136 132 builder 137 133 .sign( ··· 145 141 } 146 142 147 143 #[uniffi::export] 148 - pub fn get_manifests(data: Vec<u8>) -> Result<String, SPError> { 149 - let store = Reader::from_stream("video/mp4", Cursor::new(data)) 144 + pub fn get_manifests(data: &dyn Stream) -> Result<String, SPError> { 145 + let store = Reader::from_stream("video/mp4", StreamAdapter::from(data)) 150 146 .map_err(|e| SPError::C2paError(e.to_string()))?; 151 147 let mut certs: std::collections::HashMap<String, String> = std::collections::HashMap::new(); 152 148 for (label, manifest) in store.manifests() { ··· 169 165 #[uniffi::export] 170 166 pub fn resign( 171 167 unsigned_seg_data: Vec<Vec<u8>>, 172 - signed_concat_data: Vec<u8>, 168 + signed_concat_data: &dyn Stream, 173 169 manifest_list: Vec<String>, 174 170 certs: Vec<Vec<u8>>, 175 171 ) -> Result<Vec<Vec<u8>>, SPError> { ··· 177 173 178 174 let combined_store = Store::from_stream( 179 175 "video/mp4", 180 - Cursor::new(signed_concat_data), 176 + StreamAdapter::from(signed_concat_data), 181 177 true, 182 178 &mut validation_log, 183 179 ) ··· 236 232 #[uniffi::export] 237 233 pub fn sign_with_ingredients( 238 234 manifest: String, 239 - data: Vec<u8>, 235 + data: &dyn Stream, 240 236 certs: Vec<u8>, 241 237 ingredients: Vec<Vec<u8>>, 242 238 gosigner: Arc<dyn GoSigner>, 243 - ) -> Result<Vec<u8>, SPError> { 239 + output: &dyn Stream, 240 + ) -> Result<(), SPError> { 244 241 Settings::from_toml(TOML_SETTINGS).map_err(|e| SPError::C2paError(e.to_string()))?; 245 242 let callback_signer = CallbackSigner::new( 246 243 move |_context: *const (), data: &[u8]| { ··· 253 250 ); 254 251 let mut builder = 255 252 Builder::from_json(&manifest).map_err(|e| SPError::C2paError(e.to_string()))?; 256 - for ingredient in ingredients { 253 + for (i, ingredient) in ingredients.into_iter().enumerate() { 257 254 let mut cursor = Cursor::new(ingredient); 258 255 let ingredient = Ingredient::from_stream("video/mp4", &mut cursor) 259 256 .map_err(|e| SPError::C2paError(e.to_string()))?; 260 257 builder.add_ingredient(ingredient); 261 258 } 262 - let mut output = Vec::new(); 263 - let mut input_cursor = Cursor::new(data); 264 - let mut output_cursor = Cursor::new(&mut output); 259 + let mut input_cursor = StreamAdapter::from(data); 260 + let mut output_cursor = StreamAdapter::from(output); 265 261 builder 266 262 .sign( 267 263 &callback_signer, ··· 270 266 &mut output_cursor, 271 267 ) 272 268 .map_err(|e| SPError::C2paError(e.to_string()))?; 273 - Ok(output) 269 + Ok(()) 274 270 }
+10
rust/iroh-streamplace/src/error.rs
··· 1 + #[derive(Debug, thiserror::Error, uniffi::Error)] 2 + #[uniffi(flat_error)] 3 + pub enum SPError { 4 + #[error("No certificate chain found")] 5 + NoCertificateChainFound, 6 + #[error("C2PA error: {0}")] 7 + C2paError(String), 8 + #[error("IO Error: {0}")] 9 + IOError(String), 10 + }
+5
rust/iroh-streamplace/src/lib.rs
··· 1 1 uniffi::setup_scaffolding!(); 2 2 3 3 pub mod c2pa; 4 + pub mod error; 4 5 pub mod node_addr; 5 6 pub mod public_key; 7 + pub mod streams; 6 8 7 9 use std::sync::{LazyLock, Once}; 8 10 ··· 10 12 pub use db::*; 11 13 #[cfg(test)] 12 14 mod tests; 15 + 16 + #[cfg(test)] 17 + mod test_stream; 13 18 14 19 /// Lazily initialized Tokio runtime for use in uniffi methods that need a runtime. 15 20 static RUNTIME: LazyLock<tokio::runtime::Runtime> =
+167
rust/iroh-streamplace/src/streams.rs
··· 1 + // Copyright 2023 Adobe. All rights reserved. 2 + // This file is licensed to you under the Apache License, 3 + // Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0) 4 + // or the MIT license (http://opensource.org/licenses/MIT), 5 + // at your option. 6 + 7 + // Unless required by applicable law or agreed to in writing, 8 + // this software is distributed on an "AS IS" BASIS, WITHOUT 9 + // WARRANTIES OR REPRESENTATIONS OF ANY KIND, either express or 10 + // implied. See the LICENSE-MIT and LICENSE-APACHE files for the 11 + // specific language governing permissions and limitations under 12 + // each license. 13 + 14 + use crate::error::SPError; 15 + use std::io::{Read, Seek, SeekFrom, Write}; 16 + use std::sync::Arc; 17 + 18 + // #[repr(C)] 19 + // #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 20 + // pub enum SeekMode { 21 + // Start = 0, 22 + // End = 1, 23 + // Current = 2, 24 + // } 25 + 26 + /// This allows for a callback stream over the Uniffi interface. 27 + /// Implement these stream functions in the foreign language 28 + /// and this will provide Rust Stream trait implementations 29 + /// This is necessary since the Rust traits cannot be implemented directly 30 + /// as uniffi callbacks 31 + #[uniffi::export(with_foreign)] 32 + pub trait Stream: Send + Sync { 33 + /// Read a stream of bytes from the stream 34 + fn read_stream(&self, length: u64) -> Result<Vec<u8>, SPError>; 35 + /// Seek to a position in the stream 36 + fn seek_stream(&self, pos: i64, mode: u64) -> Result<u64, SPError>; 37 + /// Write a stream of bytes to the stream 38 + fn write_stream(&self, data: Vec<u8>) -> Result<u64, SPError>; 39 + } 40 + 41 + impl Stream for Arc<dyn Stream> { 42 + fn read_stream(&self, length: u64) -> Result<Vec<u8>, SPError> { 43 + (**self).read_stream(length) 44 + } 45 + 46 + fn seek_stream(&self, pos: i64, mode: u64) -> Result<u64, SPError> { 47 + (**self).seek_stream(pos, mode) 48 + } 49 + 50 + fn write_stream(&self, data: Vec<u8>) -> Result<u64, SPError> { 51 + (**self).write_stream(data) 52 + } 53 + } 54 + 55 + impl AsMut<dyn Stream> for dyn Stream { 56 + fn as_mut(&mut self) -> &mut Self { 57 + self 58 + } 59 + } 60 + 61 + pub struct StreamAdapter<'a> { 62 + pub stream: &'a mut dyn Stream, 63 + } 64 + 65 + impl<'a> StreamAdapter<'a> { 66 + pub fn from_stream_mut(stream: &'a mut dyn Stream) -> Self { 67 + Self { stream } 68 + } 69 + } 70 + 71 + impl<'a> From<&'a dyn Stream> for StreamAdapter<'a> { 72 + #[allow(invalid_reference_casting)] 73 + fn from(stream: &'a dyn Stream) -> Self { 74 + let stream = &*stream as *const dyn Stream as *mut dyn Stream; 75 + let stream = unsafe { &mut *stream }; 76 + Self { stream } 77 + } 78 + } 79 + 80 + // impl<'a> c2pa::CAIRead for StreamAdapter<'a> {} 81 + 82 + // impl<'a> c2pa::CAIReadWrite for StreamAdapter<'a> {} 83 + 84 + impl<'a> Read for StreamAdapter<'a> { 85 + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { 86 + let mut bytes = self 87 + .stream 88 + .read_stream(buf.len() as u64) 89 + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; 90 + let len = bytes.len(); 91 + buf.iter_mut().zip(bytes.drain(..)).for_each(|(dest, src)| { 92 + *dest = src; 93 + }); 94 + //println!("read: {:?}", len); 95 + Ok(len) 96 + } 97 + } 98 + 99 + impl<'a> Seek for StreamAdapter<'a> { 100 + fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> { 101 + let (pos, mode) = match pos { 102 + SeekFrom::Current(pos) => (pos, 2), 103 + SeekFrom::Start(pos) => (pos as i64, 0), 104 + SeekFrom::End(pos) => (pos, 1), 105 + }; 106 + //println!("Stream Seek {}", pos); 107 + self.stream 108 + .seek_stream(pos, mode) 109 + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) 110 + } 111 + } 112 + 113 + impl<'a> Write for StreamAdapter<'a> { 114 + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 115 + let len = self 116 + .stream 117 + .write_stream(buf.to_vec()) 118 + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; 119 + Ok(len as usize) 120 + } 121 + 122 + fn flush(&mut self) -> std::io::Result<()> { 123 + Ok(()) 124 + } 125 + } 126 + 127 + #[cfg(test)] 128 + mod tests { 129 + use super::*; 130 + 131 + use crate::test_stream::TestStream; 132 + 133 + #[test] 134 + fn test_stream_read() { 135 + let mut test = TestStream::from_memory(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); 136 + let mut stream = StreamAdapter::from_stream_mut(&mut test); 137 + let mut buf = [0u8; 5]; 138 + let len = stream.read(&mut buf).unwrap(); 139 + assert_eq!(len, 5); 140 + assert_eq!(buf, [0, 1, 2, 3, 4]); 141 + } 142 + 143 + #[test] 144 + fn test_stream_seek() { 145 + let mut test = TestStream::from_memory(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); 146 + let mut stream = StreamAdapter { stream: &mut test }; 147 + let pos = stream.seek(SeekFrom::Start(5)).unwrap(); 148 + assert_eq!(pos, 5); 149 + let mut buf = [0u8; 5]; 150 + let len = stream.read(&mut buf).unwrap(); 151 + assert_eq!(len, 5); 152 + assert_eq!(buf, [5, 6, 7, 8, 9]); 153 + } 154 + 155 + #[test] 156 + fn test_stream_write() { 157 + let mut test = TestStream::new(); 158 + let mut stream = StreamAdapter { stream: &mut test }; 159 + let len = stream.write(&[0, 1, 2, 3, 4]).unwrap(); 160 + assert_eq!(len, 5); 161 + stream.seek(SeekFrom::Start(0)).unwrap(); 162 + let mut buf = [0u8; 5]; 163 + let len = stream.read(&mut buf).unwrap(); 164 + assert_eq!(len, 5); 165 + assert_eq!(buf, [0, 1, 2, 3, 4]); 166 + } 167 + }
+80
rust/iroh-streamplace/src/test_stream.rs
··· 1 + // Copyright 2023 Adobe. All rights reserved. 2 + // This file is licensed to you under the Apache License, 3 + // Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0) 4 + // or the MIT license (http://opensource.org/licenses/MIT), 5 + // at your option. 6 + 7 + // Unless required by applicable law or agreed to in writing, 8 + // this software is distributed on an "AS IS" BASIS, WITHOUT 9 + // WARRANTIES OR REPRESENTATIONS OF ANY KIND, either express or 10 + // implied. See the LICENSE-MIT and LICENSE-APACHE files for the 11 + // specific language governing permissions and limitations under 12 + // each license. 13 + 14 + use std::io::{Read, Seek, SeekFrom, Write}; 15 + use std::sync::RwLock; 16 + 17 + use crate::error::SPError; 18 + use crate::streams::Stream; 19 + use std::io::Cursor; 20 + 21 + pub struct TestStream { 22 + stream: RwLock<Cursor<Vec<u8>>>, 23 + } 24 + 25 + impl TestStream { 26 + pub fn new() -> Self { 27 + Self { 28 + stream: RwLock::new(Cursor::new(Vec::new())), 29 + } 30 + } 31 + pub fn from_memory(data: Vec<u8>) -> Self { 32 + Self { 33 + stream: RwLock::new(Cursor::new(data)), 34 + } 35 + } 36 + } 37 + 38 + impl Stream for TestStream { 39 + fn read_stream(&self, length: u64) -> Result<Vec<u8>, SPError> { 40 + if let Ok(mut stream) = RwLock::write(&self.stream) { 41 + let mut data = vec![0u8; length as usize]; 42 + let bytes_read = stream 43 + .read(&mut data) 44 + .map_err(|e| SPError::IOError(e.to_string()))?; 45 + data.truncate(bytes_read); 46 + //println!("read_stream: {:?}, pos {:?}", data.len(), (*stream).position()); 47 + Ok(data) 48 + } else { 49 + Err(SPError::IOError("RwLock".to_string()))? 50 + } 51 + } 52 + 53 + fn seek_stream(&self, pos: i64, mode: u64) -> Result<u64, SPError> { 54 + if let Ok(mut stream) = RwLock::write(&self.stream) { 55 + //stream.seek(SeekFrom::Start(pos as u64)).map_err(|e| StreamError::Io{ reason: e.to_string()})?; 56 + let whence = match mode { 57 + 0 => SeekFrom::Start(pos as u64), 58 + 1 => SeekFrom::End(pos as i64), 59 + 2 => SeekFrom::Current(pos as i64), 60 + 3_u64..=u64::MAX => unimplemented!(), 61 + }; 62 + Ok(stream 63 + .seek(whence) 64 + .map_err(|e| SPError::IOError(e.to_string()))?) 65 + } else { 66 + Err(SPError::IOError("RwLock".to_string())) 67 + } 68 + } 69 + 70 + fn write_stream(&self, data: Vec<u8>) -> Result<u64, SPError> { 71 + if let Ok(mut stream) = RwLock::write(&self.stream) { 72 + let len = stream 73 + .write(&data) 74 + .map_err(|e| SPError::IOError(e.to_string()))?; 75 + Ok(len as u64) 76 + } else { 77 + Err(SPError::IOError("RwLock".to_string()))? 78 + } 79 + } 80 + }