A Python port of the Invisible Internet Project (I2P)
at main 88 lines 2.6 kB view raw
1"""Bidirectional async socket forwarder for I2PTunnel. 2 3Replaces Java's I2PTunnelRunner with async coroutine pairs. 4""" 5 6from __future__ import annotations 7 8import asyncio 9import logging 10from typing import Callable 11 12logger = logging.getLogger(__name__) 13 14BUFFER_SIZE = 4096 15 16 17async def forward( 18 reader: asyncio.StreamReader, 19 writer: asyncio.StreamWriter, 20 name: str = "", 21 on_data: Callable[[int], None] | None = None, 22) -> None: 23 """Copy data from reader to writer until EOF or error. 24 25 Args: 26 reader: Source stream. 27 writer: Destination stream. 28 name: Label for logging. 29 on_data: Optional callback with byte count for each chunk. 30 """ 31 try: 32 while True: 33 data = await reader.read(BUFFER_SIZE) 34 if not data: 35 break 36 writer.write(data) 37 await writer.drain() 38 if on_data: 39 on_data(len(data)) 40 except (ConnectionResetError, BrokenPipeError, OSError) as e: 41 logger.debug("Forward %s ended: %s", name, e) 42 finally: 43 try: 44 if not writer.transport.is_closing(): 45 writer.close() 46 except Exception: 47 pass 48 49 50async def bridge( 51 local_reader: asyncio.StreamReader, 52 local_writer: asyncio.StreamWriter, 53 remote_reader: asyncio.StreamReader, 54 remote_writer: asyncio.StreamWriter, 55 on_data: Callable[[int], None] | None = None, 56) -> None: 57 """Run bidirectional forwarding between local and remote streams. 58 59 Two forward tasks run concurrently via asyncio.gather(). 60 61 Args: 62 local_reader: Local side reader. 63 local_writer: Local side writer. 64 remote_reader: Remote side (I2P) reader. 65 remote_writer: Remote side (I2P) writer. 66 on_data: Optional callback for byte counting. 67 """ 68 await asyncio.gather( 69 forward(local_reader, remote_writer, name="local->remote", on_data=on_data), 70 forward(remote_reader, local_writer, name="remote->local", on_data=on_data), 71 return_exceptions=True, 72 ) 73 74 75async def bridge_with_initial_data( 76 local_reader: asyncio.StreamReader, 77 local_writer: asyncio.StreamWriter, 78 remote_reader: asyncio.StreamReader, 79 remote_writer: asyncio.StreamWriter, 80 initial_data: bytes, 81) -> None: 82 """Like bridge() but sends initial_data to remote before starting. 83 84 Used by HTTP proxy to send modified request headers. 85 """ 86 remote_writer.write(initial_data) 87 await remote_writer.drain() 88 await bridge(local_reader, local_writer, remote_reader, remote_writer)