"""Bidirectional async socket forwarder for I2PTunnel. Replaces Java's I2PTunnelRunner with async coroutine pairs. """ from __future__ import annotations import asyncio import logging from typing import Callable logger = logging.getLogger(__name__) BUFFER_SIZE = 4096 async def forward( reader: asyncio.StreamReader, writer: asyncio.StreamWriter, name: str = "", on_data: Callable[[int], None] | None = None, ) -> None: """Copy data from reader to writer until EOF or error. Args: reader: Source stream. writer: Destination stream. name: Label for logging. on_data: Optional callback with byte count for each chunk. """ try: while True: data = await reader.read(BUFFER_SIZE) if not data: break writer.write(data) await writer.drain() if on_data: on_data(len(data)) except (ConnectionResetError, BrokenPipeError, OSError) as e: logger.debug("Forward %s ended: %s", name, e) finally: try: if not writer.transport.is_closing(): writer.close() except Exception: pass async def bridge( local_reader: asyncio.StreamReader, local_writer: asyncio.StreamWriter, remote_reader: asyncio.StreamReader, remote_writer: asyncio.StreamWriter, on_data: Callable[[int], None] | None = None, ) -> None: """Run bidirectional forwarding between local and remote streams. Two forward tasks run concurrently via asyncio.gather(). Args: local_reader: Local side reader. local_writer: Local side writer. remote_reader: Remote side (I2P) reader. remote_writer: Remote side (I2P) writer. on_data: Optional callback for byte counting. """ await asyncio.gather( forward(local_reader, remote_writer, name="local->remote", on_data=on_data), forward(remote_reader, local_writer, name="remote->local", on_data=on_data), return_exceptions=True, ) async def bridge_with_initial_data( local_reader: asyncio.StreamReader, local_writer: asyncio.StreamWriter, remote_reader: asyncio.StreamReader, remote_writer: asyncio.StreamWriter, initial_data: bytes, ) -> None: """Like bridge() but sends initial_data to remote before starting. Used by HTTP proxy to send modified request headers. """ remote_writer.write(initial_data) await remote_writer.drain() await bridge(local_reader, local_writer, remote_reader, remote_writer)