A Python port of the Invisible Internet Project (I2P)
1"""Bidirectional async socket forwarder for I2PTunnel.
2
3Replaces Java's I2PTunnelRunner with async coroutine pairs.
4"""
5
6from __future__ import annotations
7
8import asyncio
9import logging
10from typing import Callable
11
12logger = logging.getLogger(__name__)
13
14BUFFER_SIZE = 4096
15
16
17async def forward(
18 reader: asyncio.StreamReader,
19 writer: asyncio.StreamWriter,
20 name: str = "",
21 on_data: Callable[[int], None] | None = None,
22) -> None:
23 """Copy data from reader to writer until EOF or error.
24
25 Args:
26 reader: Source stream.
27 writer: Destination stream.
28 name: Label for logging.
29 on_data: Optional callback with byte count for each chunk.
30 """
31 try:
32 while True:
33 data = await reader.read(BUFFER_SIZE)
34 if not data:
35 break
36 writer.write(data)
37 await writer.drain()
38 if on_data:
39 on_data(len(data))
40 except (ConnectionResetError, BrokenPipeError, OSError) as e:
41 logger.debug("Forward %s ended: %s", name, e)
42 finally:
43 try:
44 if not writer.transport.is_closing():
45 writer.close()
46 except Exception:
47 pass
48
49
50async def bridge(
51 local_reader: asyncio.StreamReader,
52 local_writer: asyncio.StreamWriter,
53 remote_reader: asyncio.StreamReader,
54 remote_writer: asyncio.StreamWriter,
55 on_data: Callable[[int], None] | None = None,
56) -> None:
57 """Run bidirectional forwarding between local and remote streams.
58
59 Two forward tasks run concurrently via asyncio.gather().
60
61 Args:
62 local_reader: Local side reader.
63 local_writer: Local side writer.
64 remote_reader: Remote side (I2P) reader.
65 remote_writer: Remote side (I2P) writer.
66 on_data: Optional callback for byte counting.
67 """
68 await asyncio.gather(
69 forward(local_reader, remote_writer, name="local->remote", on_data=on_data),
70 forward(remote_reader, local_writer, name="remote->local", on_data=on_data),
71 return_exceptions=True,
72 )
73
74
75async def bridge_with_initial_data(
76 local_reader: asyncio.StreamReader,
77 local_writer: asyncio.StreamWriter,
78 remote_reader: asyncio.StreamReader,
79 remote_writer: asyncio.StreamWriter,
80 initial_data: bytes,
81) -> None:
82 """Like bridge() but sends initial_data to remote before starting.
83
84 Used by HTTP proxy to send modified request headers.
85 """
86 remote_writer.write(initial_data)
87 await remote_writer.drain()
88 await bridge(local_reader, local_writer, remote_reader, remote_writer)