Live video on the AT Protocol
at natb/command-errors 299 lines 10 kB view raw
1use std::{ops::Deref, sync::Arc, time::Duration}; 2 3use async_trait::async_trait; 4use n0_future::{BufferedStreamExt, StreamExt, stream}; 5use testresult::TestResult; 6 7use super::*; 8 9struct TestNode { 10 node: Arc<Node>, 11 public: Arc<crate::public_key::PublicKey>, 12 ticket: String, 13 #[allow(dead_code)] 14 private: Vec<u8>, 15} 16 17impl Deref for TestNode { 18 type Target = Node; 19 20 fn deref(&self) -> &Self::Target { 21 &self.node 22 } 23} 24 25impl TestNode { 26 /// Helper to create a test node with given config and handler mode. 27 async fn new(handler: HandlerMode) -> TestResult<TestNode> { 28 let config = Config { 29 key: vec![0_u8; 32], // will be replaced 30 topic: vec![0_u8; 32], // all nodes use the same topic 31 max_send_duration: Duration::from_secs(10), 32 disable_relay: false, 33 }; 34 Self::new_with_config(handler, config).await 35 } 36 37 async fn new_with_config(handler: HandlerMode, mut config: Config) -> TestResult<TestNode> { 38 let key = iroh::SecretKey::generate(&mut rand::rng()); 39 let key = key.to_bytes().to_vec(); 40 config.key = key.clone(); 41 let node = Node::new_in_runtime(config, handler).await?; 42 let public = node.node_id().await?; 43 let ticket = node.ticket().await?; 44 Ok(TestNode { 45 node, 46 private: key, 47 public, 48 ticket, 49 }) 50 } 51} 52 53/// Helper to create multiple test nodes with given handler mode. 54async fn test_nodes( 55 n: usize, 56 handler: impl Fn(usize) -> HandlerMode + Send + Sync, 57 disable_relay: bool, 58) -> TestResult<Vec<TestNode>> { 59 const PAR: usize = 32; 60 let modes = (0..n).map(handler).collect::<Vec<_>>(); 61 62 let config = Config { 63 key: vec![0_u8; 32], // will be replaced 64 topic: vec![0_u8; 32], 65 max_send_duration: Duration::from_secs(10), 66 disable_relay, 67 }; 68 // create all nodes in parallel 69 let nodes = stream::iter(modes) 70 .map(|mode| TestNode::new_with_config(mode, config.clone())) 71 .buffered_unordered(PAR) 72 .collect::<Vec<_>>() 73 .await; 74 let nodes = nodes.into_iter().collect::<TestResult<Vec<_>>>()?; 75 // join everyone to everyone 76 let tickets = nodes.iter().map(|n| n.ticket.clone()).collect::<Vec<_>>(); 77 let res = stream::iter(&nodes) 78 .map(|n| n.join_peers(tickets.clone())) 79 .buffered_unordered(PAR) 80 .collect::<Vec<_>>() 81 .await; 82 res.into_iter().collect::<Result<Vec<_>, _>>()?; 83 Ok(nodes) 84} 85 86#[tokio::test] 87async fn one_node() -> TestResult<()> { 88 tracing_subscriber::fmt::try_init().ok(); 89 let node = TestNode::new(HandlerMode::Sender).await?.node; 90 let write = node.node_scope(); 91 let db = node.db(); 92 println!("Ticket: {}", node.ticket().await?); 93 write 94 .put(Some(b"stream1".to_vec()), b"s".to_vec(), b"y".to_vec()) 95 .await?; 96 write 97 .put(Some(b"stream2".to_vec()), b"s".to_vec(), b"y".to_vec()) 98 .await?; 99 let res = db.subscribe_with_opts(SubscribeOpts { 100 filter: Filter::new(), 101 mode: SubscribeMode::Both, 102 }); 103 while let Some(item) = res.next_raw().await? { 104 if let SubscribeItem::CurrentDone = item { 105 break; 106 } 107 println!("Got item: {item:?}"); 108 } 109 let res = db 110 .iter_with_opts( 111 Filter::new() 112 .stream(b"stream1".to_vec()) 113 .scope(node.node_id().await?), 114 ) 115 .await?; 116 println!("Iter result: {res:?}"); 117 Ok(()) 118} 119 120struct TestHandler<T> { 121 info: T, 122 sender: tokio::sync::mpsc::Sender<(T, String, Vec<u8>)>, 123} 124 125impl<T> TestHandler<T> { 126 fn new(info: T, sender: tokio::sync::mpsc::Sender<(T, String, Vec<u8>)>) -> Self { 127 Self { info, sender } 128 } 129} 130 131#[async_trait] 132impl<T: Clone + Send + Sync + 'static> DataHandler for TestHandler<T> { 133 async fn handle_data(&self, _from: Arc<public_key::PublicKey>, topic: String, data: Vec<u8>) { 134 self.sender 135 .send((self.info.clone(), topic, data)) 136 .await 137 .ok(); 138 } 139} 140 141#[tokio::test] 142async fn two_nodes_send_receive() -> TestResult<()> { 143 tracing_subscriber::fmt::try_init().ok(); 144 let (tx, mut rx) = tokio::sync::mpsc::channel(32); 145 let handler = Arc::new(TestHandler::new((), tx)); 146 let sender = TestNode::new(HandlerMode::Sender).await?; 147 let receiver = TestNode::new(HandlerMode::Receiver(handler)).await?; 148 // join the sender to the receiver. This will also configure the receiver endpoint to be able to dial the sender. 149 receiver.join_peers(vec![sender.ticket.clone()]).await?; 150 let stream = "teststream".to_string(); 151 receiver 152 .subscribe(stream.clone(), sender.public.clone()) 153 .await?; 154 sender.send_segment(stream, b"segment1".to_vec()).await?; 155 let (_, stream, data) = rx.recv().await.expect("should get data"); 156 assert_eq!(stream, "teststream"); 157 assert_eq!(data, b"segment1".to_vec()); 158 Ok(()) 159} 160 161#[tokio::test] 162async fn three_nodes_send_forward_receive() -> TestResult<()> { 163 tracing_subscriber::fmt::try_init().ok(); 164 let (tx, mut rx) = tokio::sync::mpsc::channel(32); 165 let handler = Arc::new(TestHandler::new((), tx)); 166 let sender = TestNode::new(HandlerMode::Sender).await?; 167 let forwarder = TestNode::new(HandlerMode::Forwarder).await?; 168 let receiver = TestNode::new(HandlerMode::Receiver(handler)).await?; 169 // join everyone to everyone, so the receiver can reach the sender via the forwarder. 170 let tickets = vec![ 171 sender.ticket.clone(), 172 forwarder.ticket.clone(), 173 receiver.ticket.clone(), 174 ]; 175 receiver.join_peers(tickets.clone()).await?; 176 forwarder.join_peers(tickets.clone()).await?; 177 sender.join_peers(tickets).await?; 178 let stream = "teststream".to_string(); 179 receiver 180 .subscribe(stream.clone(), forwarder.public.clone()) 181 .await?; 182 forwarder 183 .subscribe(stream.clone(), sender.public.clone()) 184 .await?; 185 sender.send_segment(stream, b"segment1".to_vec()).await?; 186 let (_, stream, data) = rx.recv().await.expect("should get data"); 187 assert_eq!(stream, "teststream"); 188 assert_eq!(data, b"segment1".to_vec()); 189 Ok(()) 190} 191 192#[tokio::test] 193async fn meta_three_nodes_send_forward_receive() -> TestResult<()> { 194 tracing_subscriber::fmt::try_init().ok(); 195 let (tx, mut rx) = tokio::sync::mpsc::channel(32); 196 let handler = Arc::new(TestHandler::new((), tx)); 197 let sender = TestNode::new(HandlerMode::Sender).await?; 198 let forwarder = TestNode::new(HandlerMode::Forwarder).await?; 199 let receiver = TestNode::new(HandlerMode::Receiver(handler)).await?; 200 // join everyone to everyone, so the receiver can reach the sender via the forwarder. 201 let tickets = vec![ 202 sender.ticket.clone(), 203 forwarder.ticket.clone(), 204 receiver.ticket.clone(), 205 ]; 206 receiver.join_peers(tickets.clone()).await?; 207 forwarder.join_peers(tickets.clone()).await?; 208 sender.join_peers(tickets).await?; 209 let stream = "teststream".to_string(); 210 receiver 211 .subscribe(stream.clone(), forwarder.public.clone()) 212 .await?; 213 forwarder 214 .subscribe(stream.clone(), sender.public.clone()) 215 .await?; 216 sender.send_segment(stream, b"segment1".to_vec()).await?; 217 let (_, stream, data) = rx.recv().await.expect("should get data"); 218 assert_eq!(stream, "teststream"); 219 assert_eq!(data, b"segment1".to_vec()); 220 let stream = receiver.db().subscribe(Filter::new()); 221 while let Some(item) = stream.next_raw().await? { 222 println!("{}", subscribe_item_debug(&item)); 223 } 224 Ok(()) 225} 226 227async fn broadcast( 228 nsenders: usize, 229 nforwarders: usize, 230 nreceivers: usize, 231 nmsgs: usize, 232) -> TestResult<()> { 233 let (tx, mut rx) = tokio::sync::mpsc::channel(32); 234 let ntotal = nsenders + nforwarders + nreceivers; 235 let senders = 0..nsenders; 236 let forwarders = nsenders..(nsenders + nforwarders); 237 let receivers = (nsenders + nforwarders)..ntotal; 238 let make_handler = |i: usize| { 239 if senders.contains(&i) { 240 HandlerMode::Sender 241 } else if forwarders.contains(&i) { 242 HandlerMode::Forwarder 243 } else { 244 HandlerMode::Receiver(Arc::new(TestHandler::new(i, tx.clone()))) 245 } 246 }; 247 let nodes = test_nodes(ntotal, make_handler, true).await?; 248 let senders = &nodes[senders]; 249 let forwarders = &nodes[forwarders]; 250 let receivers = &nodes[receivers]; 251 let stream = "teststream".to_string(); 252 // subscribe all forwarders to a sender, round robin 253 for (i, forwarder) in forwarders.iter().enumerate() { 254 let sender = &senders[i % senders.len()]; 255 forwarder 256 .subscribe(stream.clone(), sender.public.clone()) 257 .await?; 258 } 259 // subscribe all receivers to a forwarder, round robin 260 for (i, receiver) in receivers.iter().enumerate() { 261 let forwarder = &forwarders[i % forwarders.len()]; 262 receiver 263 .subscribe(stream.clone(), forwarder.public.clone()) 264 .await?; 265 } 266 for _ in 0..nmsgs { 267 for sender in senders { 268 sender 269 .send_segment(stream.clone(), b"segment1".to_vec()) 270 .await?; 271 } 272 for _ in 0..receivers.len() { 273 let (i, stream, _) = rx.recv().await.expect("should get data"); 274 println!("Node {i} got data on stream {stream}"); 275 } 276 } 277 Ok(()) 278} 279 280#[tokio::test] 281async fn broadcast_1_2_4() -> TestResult<()> { 282 tracing_subscriber::fmt().try_init().ok(); 283 broadcast(1, 2, 4, 1).await?; 284 Ok(()) 285} 286 287#[tokio::test] 288async fn broadcast_1_3_9() -> TestResult<()> { 289 tracing_subscriber::fmt().try_init().ok(); 290 broadcast(1, 3, 9, 1).await?; 291 Ok(()) 292} 293 294#[tokio::test] 295async fn broadcast_1_4_16() -> TestResult<()> { 296 tracing_subscriber::fmt().try_init().ok(); 297 broadcast(1, 4, 16, 100).await?; 298 Ok(()) 299}