Live video on the AT Protocol
1use std::{ops::Deref, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use n0_future::{BufferedStreamExt, StreamExt, stream};
5use testresult::TestResult;
6
7use super::*;
8
9struct TestNode {
10 node: Arc<Node>,
11 public: Arc<crate::public_key::PublicKey>,
12 ticket: String,
13 #[allow(dead_code)]
14 private: Vec<u8>,
15}
16
17impl Deref for TestNode {
18 type Target = Node;
19
20 fn deref(&self) -> &Self::Target {
21 &self.node
22 }
23}
24
25impl TestNode {
26 /// Helper to create a test node with given config and handler mode.
27 async fn new(handler: HandlerMode) -> TestResult<TestNode> {
28 let config = Config {
29 key: vec![0_u8; 32], // will be replaced
30 topic: vec![0_u8; 32], // all nodes use the same topic
31 max_send_duration: Duration::from_secs(10),
32 disable_relay: false,
33 };
34 Self::new_with_config(handler, config).await
35 }
36
37 async fn new_with_config(handler: HandlerMode, mut config: Config) -> TestResult<TestNode> {
38 let key = iroh::SecretKey::generate(&mut rand::rng());
39 let key = key.to_bytes().to_vec();
40 config.key = key.clone();
41 let node = Node::new_in_runtime(config, handler).await?;
42 let public = node.node_id().await?;
43 let ticket = node.ticket().await?;
44 Ok(TestNode {
45 node,
46 private: key,
47 public,
48 ticket,
49 })
50 }
51}
52
53/// Helper to create multiple test nodes with given handler mode.
54async fn test_nodes(
55 n: usize,
56 handler: impl Fn(usize) -> HandlerMode + Send + Sync,
57 disable_relay: bool,
58) -> TestResult<Vec<TestNode>> {
59 const PAR: usize = 32;
60 let modes = (0..n).map(handler).collect::<Vec<_>>();
61
62 let config = Config {
63 key: vec![0_u8; 32], // will be replaced
64 topic: vec![0_u8; 32],
65 max_send_duration: Duration::from_secs(10),
66 disable_relay,
67 };
68 // create all nodes in parallel
69 let nodes = stream::iter(modes)
70 .map(|mode| TestNode::new_with_config(mode, config.clone()))
71 .buffered_unordered(PAR)
72 .collect::<Vec<_>>()
73 .await;
74 let nodes = nodes.into_iter().collect::<TestResult<Vec<_>>>()?;
75 // join everyone to everyone
76 let tickets = nodes.iter().map(|n| n.ticket.clone()).collect::<Vec<_>>();
77 let res = stream::iter(&nodes)
78 .map(|n| n.join_peers(tickets.clone()))
79 .buffered_unordered(PAR)
80 .collect::<Vec<_>>()
81 .await;
82 res.into_iter().collect::<Result<Vec<_>, _>>()?;
83 Ok(nodes)
84}
85
86#[tokio::test]
87async fn one_node() -> TestResult<()> {
88 tracing_subscriber::fmt::try_init().ok();
89 let node = TestNode::new(HandlerMode::Sender).await?.node;
90 let write = node.node_scope();
91 let db = node.db();
92 println!("Ticket: {}", node.ticket().await?);
93 write
94 .put(Some(b"stream1".to_vec()), b"s".to_vec(), b"y".to_vec())
95 .await?;
96 write
97 .put(Some(b"stream2".to_vec()), b"s".to_vec(), b"y".to_vec())
98 .await?;
99 let res = db.subscribe_with_opts(SubscribeOpts {
100 filter: Filter::new(),
101 mode: SubscribeMode::Both,
102 });
103 while let Some(item) = res.next_raw().await? {
104 if let SubscribeItem::CurrentDone = item {
105 break;
106 }
107 println!("Got item: {item:?}");
108 }
109 let res = db
110 .iter_with_opts(
111 Filter::new()
112 .stream(b"stream1".to_vec())
113 .scope(node.node_id().await?),
114 )
115 .await?;
116 println!("Iter result: {res:?}");
117 Ok(())
118}
119
120struct TestHandler<T> {
121 info: T,
122 sender: tokio::sync::mpsc::Sender<(T, String, Vec<u8>)>,
123}
124
125impl<T> TestHandler<T> {
126 fn new(info: T, sender: tokio::sync::mpsc::Sender<(T, String, Vec<u8>)>) -> Self {
127 Self { info, sender }
128 }
129}
130
131#[async_trait]
132impl<T: Clone + Send + Sync + 'static> DataHandler for TestHandler<T> {
133 async fn handle_data(&self, _from: Arc<public_key::PublicKey>, topic: String, data: Vec<u8>) {
134 self.sender
135 .send((self.info.clone(), topic, data))
136 .await
137 .ok();
138 }
139}
140
141#[tokio::test]
142async fn two_nodes_send_receive() -> TestResult<()> {
143 tracing_subscriber::fmt::try_init().ok();
144 let (tx, mut rx) = tokio::sync::mpsc::channel(32);
145 let handler = Arc::new(TestHandler::new((), tx));
146 let sender = TestNode::new(HandlerMode::Sender).await?;
147 let receiver = TestNode::new(HandlerMode::Receiver(handler)).await?;
148 // join the sender to the receiver. This will also configure the receiver endpoint to be able to dial the sender.
149 receiver.join_peers(vec![sender.ticket.clone()]).await?;
150 let stream = "teststream".to_string();
151 receiver
152 .subscribe(stream.clone(), sender.public.clone())
153 .await?;
154 sender.send_segment(stream, b"segment1".to_vec()).await?;
155 let (_, stream, data) = rx.recv().await.expect("should get data");
156 assert_eq!(stream, "teststream");
157 assert_eq!(data, b"segment1".to_vec());
158 Ok(())
159}
160
161#[tokio::test]
162async fn three_nodes_send_forward_receive() -> TestResult<()> {
163 tracing_subscriber::fmt::try_init().ok();
164 let (tx, mut rx) = tokio::sync::mpsc::channel(32);
165 let handler = Arc::new(TestHandler::new((), tx));
166 let sender = TestNode::new(HandlerMode::Sender).await?;
167 let forwarder = TestNode::new(HandlerMode::Forwarder).await?;
168 let receiver = TestNode::new(HandlerMode::Receiver(handler)).await?;
169 // join everyone to everyone, so the receiver can reach the sender via the forwarder.
170 let tickets = vec![
171 sender.ticket.clone(),
172 forwarder.ticket.clone(),
173 receiver.ticket.clone(),
174 ];
175 receiver.join_peers(tickets.clone()).await?;
176 forwarder.join_peers(tickets.clone()).await?;
177 sender.join_peers(tickets).await?;
178 let stream = "teststream".to_string();
179 receiver
180 .subscribe(stream.clone(), forwarder.public.clone())
181 .await?;
182 forwarder
183 .subscribe(stream.clone(), sender.public.clone())
184 .await?;
185 sender.send_segment(stream, b"segment1".to_vec()).await?;
186 let (_, stream, data) = rx.recv().await.expect("should get data");
187 assert_eq!(stream, "teststream");
188 assert_eq!(data, b"segment1".to_vec());
189 Ok(())
190}
191
192#[tokio::test]
193async fn meta_three_nodes_send_forward_receive() -> TestResult<()> {
194 tracing_subscriber::fmt::try_init().ok();
195 let (tx, mut rx) = tokio::sync::mpsc::channel(32);
196 let handler = Arc::new(TestHandler::new((), tx));
197 let sender = TestNode::new(HandlerMode::Sender).await?;
198 let forwarder = TestNode::new(HandlerMode::Forwarder).await?;
199 let receiver = TestNode::new(HandlerMode::Receiver(handler)).await?;
200 // join everyone to everyone, so the receiver can reach the sender via the forwarder.
201 let tickets = vec![
202 sender.ticket.clone(),
203 forwarder.ticket.clone(),
204 receiver.ticket.clone(),
205 ];
206 receiver.join_peers(tickets.clone()).await?;
207 forwarder.join_peers(tickets.clone()).await?;
208 sender.join_peers(tickets).await?;
209 let stream = "teststream".to_string();
210 receiver
211 .subscribe(stream.clone(), forwarder.public.clone())
212 .await?;
213 forwarder
214 .subscribe(stream.clone(), sender.public.clone())
215 .await?;
216 sender.send_segment(stream, b"segment1".to_vec()).await?;
217 let (_, stream, data) = rx.recv().await.expect("should get data");
218 assert_eq!(stream, "teststream");
219 assert_eq!(data, b"segment1".to_vec());
220 let stream = receiver.db().subscribe(Filter::new());
221 while let Some(item) = stream.next_raw().await? {
222 println!("{}", subscribe_item_debug(&item));
223 }
224 Ok(())
225}
226
227async fn broadcast(
228 nsenders: usize,
229 nforwarders: usize,
230 nreceivers: usize,
231 nmsgs: usize,
232) -> TestResult<()> {
233 let (tx, mut rx) = tokio::sync::mpsc::channel(32);
234 let ntotal = nsenders + nforwarders + nreceivers;
235 let senders = 0..nsenders;
236 let forwarders = nsenders..(nsenders + nforwarders);
237 let receivers = (nsenders + nforwarders)..ntotal;
238 let make_handler = |i: usize| {
239 if senders.contains(&i) {
240 HandlerMode::Sender
241 } else if forwarders.contains(&i) {
242 HandlerMode::Forwarder
243 } else {
244 HandlerMode::Receiver(Arc::new(TestHandler::new(i, tx.clone())))
245 }
246 };
247 let nodes = test_nodes(ntotal, make_handler, true).await?;
248 let senders = &nodes[senders];
249 let forwarders = &nodes[forwarders];
250 let receivers = &nodes[receivers];
251 let stream = "teststream".to_string();
252 // subscribe all forwarders to a sender, round robin
253 for (i, forwarder) in forwarders.iter().enumerate() {
254 let sender = &senders[i % senders.len()];
255 forwarder
256 .subscribe(stream.clone(), sender.public.clone())
257 .await?;
258 }
259 // subscribe all receivers to a forwarder, round robin
260 for (i, receiver) in receivers.iter().enumerate() {
261 let forwarder = &forwarders[i % forwarders.len()];
262 receiver
263 .subscribe(stream.clone(), forwarder.public.clone())
264 .await?;
265 }
266 for _ in 0..nmsgs {
267 for sender in senders {
268 sender
269 .send_segment(stream.clone(), b"segment1".to_vec())
270 .await?;
271 }
272 for _ in 0..receivers.len() {
273 let (i, stream, _) = rx.recv().await.expect("should get data");
274 println!("Node {i} got data on stream {stream}");
275 }
276 }
277 Ok(())
278}
279
280#[tokio::test]
281async fn broadcast_1_2_4() -> TestResult<()> {
282 tracing_subscriber::fmt().try_init().ok();
283 broadcast(1, 2, 4, 1).await?;
284 Ok(())
285}
286
287#[tokio::test]
288async fn broadcast_1_3_9() -> TestResult<()> {
289 tracing_subscriber::fmt().try_init().ok();
290 broadcast(1, 3, 9, 1).await?;
291 Ok(())
292}
293
294#[tokio::test]
295async fn broadcast_1_4_16() -> TestResult<()> {
296 tracing_subscriber::fmt().try_init().ok();
297 broadcast(1, 4, 16, 100).await?;
298 Ok(())
299}