A Python port of the Invisible Internet Project (I2P)
1"""SAM stream session -- TCP-like connections over I2P tunnels.
2
3Ported from net.i2p.sam.SAMStreamSession.
4"""
5
6import asyncio
7import logging
8
9logger = logging.getLogger(__name__)
10
11
12class SAMStreamSession:
13 """Manages stream connections for a SAM session.
14
15 On STREAM CONNECT: connects to remote destination, then "steals" the
16 client socket for bidirectional raw tunnel I/O.
17
18 On STREAM ACCEPT: waits for incoming connection, then steals socket.
19 """
20
21 def __init__(self, nickname: str, destination_b64: str) -> None:
22 self._nickname = nickname
23 self._destination_b64 = destination_b64
24 self._accept_queue: asyncio.Queue[tuple[asyncio.StreamReader, asyncio.StreamWriter]] = (
25 asyncio.Queue()
26 )
27
28 async def connect(
29 self,
30 target_dest: str,
31 reader: asyncio.StreamReader,
32 writer: asyncio.StreamWriter,
33 silent: bool = False,
34 from_port: int = 0,
35 to_port: int = 0,
36 ) -> None:
37 """Connect to target and bridge client socket with I2P tunnel.
38
39 After sending STREAM STATUS OK, this method runs the bidirectional
40 pipe until one side closes.
41
42 In this implementation (without a live I2P router), the connection
43 cannot actually be established, so an error status is returned.
44
45 Args:
46 target_dest: Base64-encoded target destination.
47 reader: Client's stream reader.
48 writer: Client's stream writer.
49 silent: If True, suppress the status reply.
50 from_port: Source port (virtual).
51 to_port: Destination port (virtual).
52 """
53 # Without a real I2P router, we cannot establish tunnel connections.
54 # In production, this would:
55 # 1. Look up the target destination in the I2P network database
56 # 2. Build tunnels to reach the destination
57 # 3. Establish a streaming connection
58 # 4. Bridge the client socket <-> I2P tunnel bidirectionally
59 if not silent:
60 from i2p_sam.protocol import SAMReply
61 writer.write(SAMReply.stream_error("CANT_REACH_PEER",
62 "No I2P router available").encode("utf-8"))
63 await writer.drain()
64
65 async def accept(
66 self,
67 reader: asyncio.StreamReader,
68 writer: asyncio.StreamWriter,
69 silent: bool = False,
70 ) -> None:
71 """Wait for incoming connection and bridge sockets.
72
73 First sends the sender's destination base64 to the client,
74 then runs bidirectional pipe.
75
76 Args:
77 reader: Client's stream reader.
78 writer: Client's stream writer.
79 silent: If True, suppress the initial destination line.
80 """
81 # Wait for incoming connection from the accept queue
82 try:
83 incoming_reader, incoming_writer = await asyncio.wait_for(
84 self._accept_queue.get(), timeout=300.0
85 )
86 except asyncio.TimeoutError:
87 from i2p_sam.protocol import SAMReply
88 writer.write(SAMReply.stream_error("TIMEOUT",
89 "No incoming connection").encode("utf-8"))
90 await writer.drain()
91 return
92
93 if not silent:
94 # Send the remote destination to the client
95 writer.write(f"{self._destination_b64}\n".encode("utf-8"))
96 await writer.drain()
97
98 # Bridge the two connections
99 await self._bridge(reader, writer, incoming_reader, incoming_writer)
100
101 async def forward(
102 self,
103 listen_port: int,
104 host: str = "127.0.0.1",
105 silent: bool = False,
106 ) -> None:
107 """Listen on local TCP port and forward connections to I2P tunnel.
108
109 Args:
110 listen_port: Local TCP port to listen on.
111 host: Local host to bind to.
112 silent: If True, suppress status messages.
113 """
114 logger.info("STREAM FORWARD on %s:%d for session %s",
115 host, listen_port, self._nickname)
116 # In production, this would start a local TCP listener and forward
117 # each connection through the I2P tunnel.
118
119 async def _bridge(
120 self,
121 r1: asyncio.StreamReader,
122 w1: asyncio.StreamWriter,
123 r2: asyncio.StreamReader,
124 w2: asyncio.StreamWriter,
125 ) -> None:
126 """Bidirectional data pipe between two stream pairs."""
127 async def _copy(src: asyncio.StreamReader, dst: asyncio.StreamWriter) -> None:
128 try:
129 while True:
130 data = await src.read(8192)
131 if not data:
132 break
133 dst.write(data)
134 await dst.drain()
135 except (ConnectionResetError, BrokenPipeError):
136 pass
137 finally:
138 try:
139 dst.close()
140 except Exception:
141 pass
142
143 await asyncio.gather(
144 _copy(r1, w2),
145 _copy(r2, w1),
146 return_exceptions=True,
147 )