P2P support library for the beaver compute environment
1/* SPDX Id: AGPL-3.0-or-later */
2
3use iroh::{
4 endpoint::Connection,
5 protocol::{AcceptError, ProtocolHandler},
6};
7use log::{error, info};
8
9use crate::{PeerEvent, packet::BasePacket, state::SharedState};
10
11#[derive(Debug)]
12pub(crate) struct MessageProtocol {
13 state: SharedState,
14}
15
16impl MessageProtocol {
17 pub(crate) fn new(state: SharedState) -> Self {
18 Self { state }
19 }
20}
21
22/// Message protocol.
23/// Both sides can send messages back and forth with
24/// no pre-determined pattern.
25impl ProtocolHandler for MessageProtocol {
26 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
27 let remote_id = connection.remote_id();
28 println!(
29 "accepted connection on {:?} from {:?}",
30 String::from_utf8_lossy(connection.alpn()),
31 remote_id
32 );
33
34 // 1. Accept the bidirectional stream.
35 let (sender, mut receiver) = connection.accept_bi().await?;
36
37 // 2. Store the sender in the state for that remote ID.
38 self.state
39 .lock()
40 .await
41 .set_message_sender(&remote_id, sender);
42
43 // 3. Relay all the incoming messages.
44 loop {
45 match BasePacket::recv(&mut receiver).await {
46 Ok(payload) => {
47 self.state
48 .lock()
49 .await
50 .notify(PeerEvent::Message(remote_id, payload));
51 }
52 Err(err) => {
53 error!("Error reading message packet: {err}");
54 break;
55 }
56 }
57 }
58
59 // 4. Done with that connection?
60 self.state.lock().await.remove_message_sender(&remote_id);
61
62 Ok(())
63 }
64}