Live video on the AT Protocol
1// Copyright 2023 Adobe. All rights reserved.
2// This file is licensed to you under the Apache License,
3// Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
4// or the MIT license (http://opensource.org/licenses/MIT),
5// at your option.
6
7// Unless required by applicable law or agreed to in writing,
8// this software is distributed on an "AS IS" BASIS, WITHOUT
9// WARRANTIES OR REPRESENTATIONS OF ANY KIND, either express or
10// implied. See the LICENSE-MIT and LICENSE-APACHE files for the
11// specific language governing permissions and limitations under
12// each license.
13
14use crate::error::SPError;
15use std::io::{Read, Seek, SeekFrom, Write};
16use std::sync::Arc;
17
18// #[repr(C)]
19// #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20// pub enum SeekMode {
21// Start = 0,
22// End = 1,
23// Current = 2,
24// }
25
26/// This allows for a callback stream over the Uniffi interface.
27/// Implement these stream functions in the foreign language
28/// and this will provide Rust Stream trait implementations
29/// This is necessary since the Rust traits cannot be implemented directly
30/// as uniffi callbacks
31#[uniffi::export(with_foreign)]
32pub trait Stream: Send + Sync {
33 /// Read a stream of bytes from the stream
34 fn read_stream(&self, length: u64) -> Result<Vec<u8>, SPError>;
35 /// Seek to a position in the stream
36 fn seek_stream(&self, pos: i64, mode: u64) -> Result<u64, SPError>;
37 /// Write a stream of bytes to the stream
38 fn write_stream(&self, data: Vec<u8>) -> Result<u64, SPError>;
39}
40
41impl Stream for Arc<dyn Stream> {
42 fn read_stream(&self, length: u64) -> Result<Vec<u8>, SPError> {
43 (**self).read_stream(length)
44 }
45
46 fn seek_stream(&self, pos: i64, mode: u64) -> Result<u64, SPError> {
47 (**self).seek_stream(pos, mode)
48 }
49
50 fn write_stream(&self, data: Vec<u8>) -> Result<u64, SPError> {
51 (**self).write_stream(data)
52 }
53}
54
55impl AsMut<dyn Stream> for dyn Stream {
56 fn as_mut(&mut self) -> &mut Self {
57 self
58 }
59}
60
61pub struct StreamAdapter<'a> {
62 pub stream: &'a dyn Stream,
63}
64
65impl<'a> StreamAdapter<'a> {
66 pub fn from_stream_mut(stream: &'a mut dyn Stream) -> Self {
67 Self { stream }
68 }
69}
70
71impl<'a> From<&'a dyn Stream> for StreamAdapter<'a> {
72 fn from(stream: &'a dyn Stream) -> Self {
73 Self { stream }
74 }
75}
76
77impl<'a> Read for StreamAdapter<'a> {
78 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
79 let mut bytes = self
80 .stream
81 .read_stream(buf.len() as u64)
82 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
83 let len = bytes.len();
84 buf.iter_mut().zip(bytes.drain(..)).for_each(|(dest, src)| {
85 *dest = src;
86 });
87 //println!("read: {:?}", len);
88 Ok(len)
89 }
90}
91
92impl<'a> Seek for StreamAdapter<'a> {
93 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
94 let (pos, mode) = match pos {
95 SeekFrom::Current(pos) => (pos, 2),
96 SeekFrom::Start(pos) => (pos as i64, 0),
97 SeekFrom::End(pos) => (pos, 1),
98 };
99 //println!("Stream Seek {}", pos);
100 self.stream
101 .seek_stream(pos, mode)
102 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
103 }
104}
105
106impl<'a> Write for StreamAdapter<'a> {
107 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
108 let len = self
109 .stream
110 .write_stream(buf.to_vec())
111 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
112 Ok(len as usize)
113 }
114
115 fn flush(&mut self) -> std::io::Result<()> {
116 Ok(())
117 }
118}
119
120#[uniffi::export(with_foreign)]
121pub trait ManyStreams: Send + Sync {
122 /// Get the next stream from the many streams
123 fn next(&self) -> Option<Arc<dyn Stream>>;
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 use crate::test_stream::TestStream;
131
132 #[test]
133 fn test_stream_read() {
134 let mut test = TestStream::from_memory(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
135 let mut stream = StreamAdapter::from_stream_mut(&mut test);
136 let mut buf = [0u8; 5];
137 let len = stream.read(&mut buf).unwrap();
138 assert_eq!(len, 5);
139 assert_eq!(buf, [0, 1, 2, 3, 4]);
140 }
141
142 #[test]
143 fn test_stream_seek() {
144 let mut test = TestStream::from_memory(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
145 let mut stream = StreamAdapter { stream: &mut test };
146 let pos = stream.seek(SeekFrom::Start(5)).unwrap();
147 assert_eq!(pos, 5);
148 let mut buf = [0u8; 5];
149 let len = stream.read(&mut buf).unwrap();
150 assert_eq!(len, 5);
151 assert_eq!(buf, [5, 6, 7, 8, 9]);
152 }
153
154 #[test]
155 fn test_stream_write() {
156 let mut test = TestStream::new();
157 let mut stream = StreamAdapter { stream: &mut test };
158 let len = stream.write(&[0, 1, 2, 3, 4]).unwrap();
159 assert_eq!(len, 5);
160 stream.seek(SeekFrom::Start(0)).unwrap();
161 let mut buf = [0u8; 5];
162 let len = stream.read(&mut buf).unwrap();
163 assert_eq!(len, 5);
164 assert_eq!(buf, [0, 1, 2, 3, 4]);
165 }
166}