Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(iroh-streamplace): peer list hydration, leaving message type

b5 d23479c7 15272a6a

+149 -42
+135 -39
rust/iroh-streamplace/src/api.rs
··· 1 1 //! Protocol API 2 2 3 3 use std::{ 4 - collections::{BTreeMap, BTreeSet}, 4 + collections::{BTreeMap, BTreeSet, HashMap}, 5 5 time::Duration, 6 6 }; 7 7 ··· 54 54 data: Bytes, 55 55 } 56 56 57 + /// Ask a remote peer to return a stream of it's known PeerInfos 58 + #[derive(Debug, Clone, Serialize, Deserialize)] 59 + struct RequestPeerInfos { 60 + remote_id: NodeId, 61 + } 62 + 63 + /// Return a stream of peer infos 64 + #[derive(Debug, Clone, Serialize, Deserialize)] 65 + struct HandlePeerInfosRequest {} 66 + 67 + /// Return this peer's local state 68 + #[derive(Debug, Clone, Serialize, Deserialize)] 69 + struct MyPeerInfo {} 70 + 57 71 /// List all peers, and the subscriptions that they're believed to have 58 72 /// "believed", because subscription info can be out of date 59 73 #[derive(Debug, Clone, Serialize, Deserialize)] 60 - struct Peers {} 74 + struct MyPeers {} 61 75 62 76 /// Prune peers that haven't been seen since the given timestamp 63 77 #[derive(Debug, Clone, Serialize, Deserialize)] 64 - struct PrunePeers { 78 + struct PruneMyPeers { 65 79 cutoff_timestamp: u64, 66 80 } 67 81 ··· 74 88 info: PeerInfo, 75 89 } 76 90 77 - /// list out our local subscriptions 78 - #[derive(Debug, Clone, Serialize, Deserialize)] 79 - struct MySubscriptions {} 80 - 81 91 /// Request a node list out it's current subscriptions 82 92 #[derive(Debug, Clone, Serialize, Deserialize)] 83 93 struct GetSubscriptions { ··· 85 95 node_id: NodeId, 86 96 } 87 97 98 + /// Tell all remote peers that we're leaving the network 99 + #[derive(Debug, Clone, Serialize, Deserialize)] 100 + struct BroadcastLeaving {} 101 + 102 + /// Tell a remote node that we're leaving the network 103 + #[derive(Debug, Clone, Serialize, Deserialize)] 104 + struct HandleLeaving { 105 + /// the node id of the peer leaving 106 + node_id: NodeId, 107 + } 108 + 88 109 /// details about a peer in the network 89 110 #[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] 90 111 pub(crate) struct PeerInfo { ··· 100 121 enum Protocol { 101 122 // swarm coordination 102 123 #[rpc(tx=mpsc::Sender<PeerInfo>)] 103 - Peers(Peers), 124 + MyPeers(MyPeers), 104 125 #[rpc(tx=oneshot::Sender<PeerInfo>)] 105 - MyPeerInfo(MySubscriptions), 126 + MyPeerInfo(MyPeerInfo), 127 + #[rpc(tx=oneshot::Sender<()>)] 128 + PruneMyPeers(PruneMyPeers), 129 + #[rpc(tx=oneshot::Sender<()>)] 130 + BroadcastLeaving(BroadcastLeaving), 131 + #[rpc(tx=oneshot::Sender<()>)] 132 + HandleLeaving(HandleLeaving), 133 + 106 134 #[rpc(tx=oneshot::Sender<()>)] 107 - PrunePeers(PrunePeers), 135 + RequestPeerInfos(RequestPeerInfos), 136 + #[rpc(tx=mpsc::Sender<PeerInfo>)] 137 + HandlePeerInfosRequest(HandlePeerInfosRequest), 138 + 108 139 #[rpc(tx=oneshot::Sender<()>)] 109 140 SendPeerInfo(SendPeerInfo), 110 141 #[rpc(tx=oneshot::Sender<()>)] ··· 129 160 /// peers we'll permanently broadcast to 130 161 anchor_peers: Vec<NodeId>, 131 162 /// set of all peers we believe to be life in the swarm 132 - peers: BTreeSet<PeerInfo>, 163 + peers: HashMap<NodeId, PeerInfo>, 133 164 /// set of stream subscriptions we're receiving data for 134 165 subscriptions: BTreeMap<String, BTreeSet<NodeId>>, 135 166 /// pool of open RPC connections ··· 155 186 endpoint: endpoint.clone(), 156 187 recv: rx, 157 188 anchor_peers, 158 - peers: BTreeSet::new(), 189 + peers: HashMap::new(), 159 190 subscriptions: BTreeMap::new(), 160 191 connections: BTreeMap::new(), 161 192 handler: Box::new(handler), ··· 224 255 async fn handle(&mut self, msg: Message) { 225 256 match msg { 226 257 // swarm coordination 227 - Message::Peers(sub) => { 258 + Message::MyPeers(sub) => { 228 259 debug!("peers {:?}", sub); 229 260 let WithChannels { tx, .. } = sub; 230 261 ··· 232 263 let mut sent = BTreeSet::new(); 233 264 234 265 // stream over the list of peers we know about 235 - for sub in &self.peers { 236 - sent.insert(&sub.node_id); 266 + for (id, sub) in &self.peers { 267 + sent.insert(*id); 237 268 if tx.send(sub.clone()).await.is_err() { 238 269 break; 239 270 } 240 271 } 241 272 242 - // send over any anchor peers we know about, but haven't already sent 243 - // these go with empty subscription sets, which isn't great. 273 + // send over any anchor peers we know about, but haven't already 274 + // sent from our peers list. these go with empty subscription 275 + // sets, which isn't great. 244 276 for anchor in &self.anchor_peers { 245 277 if sent.contains(anchor) { 246 278 continue; ··· 259 291 let WithChannels { tx, .. } = sub; 260 292 tx.send(self.my_peer_info()).await.ok(); 261 293 } 294 + Message::PruneMyPeers(sub) => { 295 + let WithChannels { tx, inner, .. } = sub; 296 + // prune peers that haven't been seen since the given timestamp 297 + self.peers 298 + .retain(|_, peer| peer.timestamp >= inner.cutoff_timestamp); 299 + tx.send(()).await.ok(); 300 + } 301 + 302 + Message::RequestPeerInfos(list) => { 303 + let WithChannels { inner, tx, .. } = list; 304 + let conn = self.get_conn(&inner.remote_id).await; 305 + let mut rx = conn 306 + .rpc 307 + .server_streaming(HandlePeerInfosRequest {}, 1000) 308 + .await 309 + .unwrap(); 310 + while let Some(mut peer_info) = rx.recv().await.unwrap() { 311 + // update our tracked state about this peer, using timestamps 312 + // to avoid confusion from external sources 313 + peer_info.timestamp = timestamp(); 314 + self.peers.insert(peer_info.node_id, peer_info); 315 + } 316 + 317 + tx.send(()).await.ok(); 318 + } 319 + Message::HandlePeerInfosRequest(list) => { 320 + let WithChannels { tx, .. } = list; 321 + for (_, peer) in self.peers.clone() { 322 + if let Err(e) = tx.send(peer).await { 323 + tracing::error!("send peer error: {:?}", e); 324 + } 325 + } 326 + } 327 + 262 328 Message::SendPeerInfo(info) => { 263 329 let WithChannels { inner, tx, .. } = info; 264 330 debug!( ··· 282 348 // update our tracked state about this peer, using timestamps 283 349 // to avoid confusion from external sources 284 350 inner.timestamp = timestamp(); 285 - self.peers.insert(inner); 351 + self.peers.insert(inner.node_id, inner); 286 352 tx.send(()).await.ok(); 287 353 } 288 - Message::PrunePeers(sub) => { 289 - let WithChannels { tx, inner, .. } = sub; 290 - // prune peers that haven't been seen since the given timestamp 291 - self.peers 292 - .retain(|peer| peer.timestamp >= inner.cutoff_timestamp); 354 + Message::BroadcastLeaving(leaving) => { 355 + let WithChannels { tx, .. } = leaving; 356 + let node_id = self.endpoint.node_id(); 357 + let remotes = self 358 + .peers 359 + .values() 360 + .map(|peer| peer.node_id) 361 + .collect::<Vec<_>>(); 362 + for remote_node_id in remotes { 363 + // ensure connection 364 + let conn = self.get_conn(&remote_node_id).await; 365 + if let Err(err) = conn.rpc.rpc(HandleLeaving { node_id }).await { 366 + tracing::error!("failed to handle leaving: {}", err); 367 + } 368 + } 369 + tx.send(()).await.ok(); 370 + } 371 + Message::HandleLeaving(leaving) => { 372 + let WithChannels { tx, inner, .. } = leaving; 373 + self.peers.remove(&inner.node_id); 293 374 tx.send(()).await.ok(); 294 375 } 295 376 ··· 379 460 peer_info_broadcast_interval: Duration, 380 461 peer_prune_interval: Duration, 381 462 ) -> Self { 463 + let anchors = anchor_peers.clone(); 382 464 let api = Actor::spawn(endpoint, anchor_peers, handler); 383 465 466 + // hydrate our peers list from anchor nodes 467 + let api2 = api.clone(); 468 + n0_future::task::spawn(async move { 469 + for anchor in anchors { 470 + if let Err(e) = api2.inner.rpc(RequestPeerInfos { remote_id: anchor }).await { 471 + tracing::error!("requesting peer infos: {:?}", e); 472 + } 473 + } 474 + }); 475 + 384 476 // re-broadcast our subscriptions every interval 385 477 if peer_info_broadcast_interval > Duration::from_millis(0) { 386 478 let api2 = api.clone(); ··· 401 493 loop { 402 494 tokio::time::sleep(peer_prune_interval).await; 403 495 let cutoff_timestamp = timestamp() - peer_prune_interval.as_secs(); 404 - if let Err(e) = api2.inner.rpc(PrunePeers { cutoff_timestamp }).await { 496 + if let Err(e) = api2.inner.rpc(PruneMyPeers { cutoff_timestamp }).await { 405 497 tracing::error!("pruning stale subscriptions: {:?}", e); 406 498 } 407 499 } ··· 428 520 429 521 /// List all peers we know about, and the subscriptions they have 430 522 pub(crate) async fn peers(&self) -> irpc::Result<Vec<PeerInfo>> { 431 - let mut rx = self.inner.server_streaming(Peers {}, 1000).await?; 523 + let mut rx = self.inner.server_streaming(MyPeers {}, 1000).await?; 432 524 let mut peers = Vec::new(); 433 525 while let Some(peer) = rx.recv().await? { 434 526 peers.push(peer); ··· 436 528 Ok(peers) 437 529 } 438 530 439 - pub(crate) async fn my_subscriptions(&self) -> irpc::Result<PeerInfo> { 440 - self.inner.rpc(MySubscriptions {}).await 531 + pub(crate) async fn my_peer_info(&self) -> irpc::Result<PeerInfo> { 532 + self.inner.rpc(MyPeerInfo {}).await 533 + } 534 + 535 + async fn broadcast_peer_info(&self) -> irpc::Result<JoinHandle<()>> { 536 + let peers = self.peers().await?; 537 + let subs = self.my_peer_info().await?; 538 + let client = self.inner.clone(); 539 + let handle = n0_future::task::spawn(async move { 540 + if let Err(e) = broadcast_peer_info_inner(client, peers, subs).await { 541 + tracing::error!("Peer announcement task failed: {:?}", e); 542 + } 543 + }); 544 + Ok(handle) 545 + } 546 + 547 + pub(crate) async fn leaving(&self) -> irpc::Result<()> { 548 + self.inner.rpc(BroadcastLeaving {}).await 441 549 } 442 550 443 551 pub(crate) async fn subscribe(&self, key: String, self_id: NodeId) -> irpc::Result<()> { ··· 452 560 self.broadcast_peer_info().await?; 453 561 454 562 Ok(()) 455 - } 456 - 457 - async fn broadcast_peer_info(&self) -> irpc::Result<JoinHandle<()>> { 458 - let peers = self.peers().await?; 459 - let subs = self.my_subscriptions().await?; 460 - let client = self.inner.clone(); 461 - let handle = n0_future::task::spawn(async move { 462 - if let Err(e) = broadcast_peer_info_inner(client, peers, subs).await { 463 - tracing::error!("Peer announcement task failed: {:?}", e); 464 - } 465 - }); 466 - Ok(handle) 467 563 } 468 564 469 565 pub(crate) async fn unsubscribe(&self, key: String, self_id: NodeId) -> irpc::Result<()> {
+8
rust/iroh-streamplace/src/receiver.rs
··· 85 85 Ok(()) 86 86 } 87 87 88 + /// Get our node address 88 89 #[uniffi::method(async_runtime = "tokio")] 89 90 pub async fn node_addr(&self) -> NodeAddr { 90 91 self.endpoint.node_addr().await 92 + } 93 + 94 + /// tell the network that we're leaving. This should only be called just before disconnecting. 95 + #[uniffi::method(async_runtime = "tokio")] 96 + pub async fn leaving(&self) -> Result<(), Error> { 97 + self.api.leaving().await?; 98 + Ok(()) 91 99 } 92 100 } 93 101
+6 -3
rust/iroh-streamplace/swarm.md
··· 22 22 * when a node unsubscribes from a feed, it broadcasts it's updated `PeerInfo` to all known peers 23 23 * every `DEFAULT_PEER_INFO_REPUBLISH_INTERVAL`, a node broadcasts it's current `PeerInfo` to all known peers 24 24 25 - every `DEFAULT_PEER_PRUNE_INTERVAL`, nodes will examine their local list of peers, and prune any who's latest timestamp is older than the current time, minus the prune interval, this is to purge peers that die off without notice 25 + every `DEFAULT_PEER_PRUNE_INTERVAL`, nodes will examine their local list of peers, and prune any who's latest timestamp is older than the current time, minus the prune interval, this is to purge peers that die off without notice. 26 26 27 27 ### Anchor Peers 28 - Anchor peers are _always_ transmitted to. They're expcted to be high-availability nodes. Any broadcast message will always try to 28 + Anchor peers are _always_ transmitted to. They're expected to be high-availability nodes. Any broadcast message will always broadcast to anchor peers, regardless of whether they are online at the time, or not. 29 + 30 + ### Peer Listing Messages 31 + At startup, the new nodes will send a `RequestPeerInfos` request to all anchor nodes. Each anchor node will respond with their list of `PeerInfo`s to inform new nodes of their current view of the swarm. There's room to grow on maintaining swarm health, but this message type is a good primitive as a start. 29 32 30 33 ### FFI API 31 - The FFI API to goland is a single method on the `Receiver`: `peers`. It returns an array of `PeerInfo`, representing the nodes current view of the swarm. 34 + The FFI API to goland is 2 methods on the `Receiver`: `peers`, and `leaving`. It returns an array of `PeerInfo`, representing the nodes current view of the swarm. `leaving` should be called just before a node shuts down to notify the network that the node is going away. It's not a critical that `leaving` is called, but will cut down on stale data living in the network.