P2P support library for the beaver compute environment
at main 64 lines 1.8 kB view raw
1/* SPDX Id: AGPL-3.0-or-later */ 2 3use iroh::{ 4 endpoint::Connection, 5 protocol::{AcceptError, ProtocolHandler}, 6}; 7use log::{error, info}; 8 9use crate::{PeerEvent, packet::BasePacket, state::SharedState}; 10 11#[derive(Debug)] 12pub(crate) struct MessageProtocol { 13 state: SharedState, 14} 15 16impl MessageProtocol { 17 pub(crate) fn new(state: SharedState) -> Self { 18 Self { state } 19 } 20} 21 22/// Message protocol. 23/// Both sides can send messages back and forth with 24/// no pre-determined pattern. 25impl ProtocolHandler for MessageProtocol { 26 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> { 27 let remote_id = connection.remote_id(); 28 println!( 29 "accepted connection on {:?} from {:?}", 30 String::from_utf8_lossy(connection.alpn()), 31 remote_id 32 ); 33 34 // 1. Accept the bidirectional stream. 35 let (sender, mut receiver) = connection.accept_bi().await?; 36 37 // 2. Store the sender in the state for that remote ID. 38 self.state 39 .lock() 40 .await 41 .set_message_sender(&remote_id, sender); 42 43 // 3. Relay all the incoming messages. 44 loop { 45 match BasePacket::recv(&mut receiver).await { 46 Ok(payload) => { 47 self.state 48 .lock() 49 .await 50 .notify(PeerEvent::Message(remote_id, payload)); 51 } 52 Err(err) => { 53 error!("Error reading message packet: {err}"); 54 break; 55 } 56 } 57 } 58 59 // 4. Done with that connection? 60 self.state.lock().await.remove_message_sender(&remote_id); 61 62 Ok(()) 63 } 64}