"""DCC helper — IRC DCC file transfer address rewriting. Manages DCC SEND/CHAT tunneling through I2P by rewriting IP addresses and ports in DCC CTCP commands. Ported from net.i2p.i2ptunnel.irc.DCCHelper / I2PTunnelDCCClient/Server. """ from __future__ import annotations import re import time from dataclasses import dataclass, field _DCC_RE = re.compile( r"DCC\s+(SEND|CHAT|RESUME|ACCEPT)\s+(\S+)\s+(\d+)\s+(\d+)(?:\s+(\d+))?", re.IGNORECASE, ) # Default transfer expiry: 30 minutes _DEFAULT_EXPIRY_SECONDS = 1800 @dataclass class DCCTransfer: """Tracks one DCC file transfer.""" remote_dest: str local_port: int dcc_type: str = "SEND" filename: str = "" expires_at: float = 0.0 def __post_init__(self): if self.expires_at == 0.0: self.expires_at = time.monotonic() + _DEFAULT_EXPIRY_SECONDS @property def is_expired(self) -> bool: return time.monotonic() > self.expires_at class DCCHelper: """Central manager for DCC address rewriting through I2P.""" def __init__( self, local_b32: str = "", max_transfers: int = 5, ) -> None: self._local_b32 = local_b32 self._max_transfers = max_transfers self._transfers: dict[str, DCCTransfer] = {} self._next_port = 30000 def _parse_dcc(self, text: str) -> dict[str, str] | None: """Parse a DCC command string. Returns dict with keys: type, filename, ip, port, size (optional). """ m = _DCC_RE.match(text) if not m: return None result = { "type": m.group(1).upper(), "filename": m.group(2), "ip": m.group(3), "port": m.group(4), } if m.group(5): result["size"] = m.group(5) return result def _can_add_transfer(self) -> bool: """Check if we can accept another transfer.""" self._cleanup_expired() return len(self._transfers) < self._max_transfers def _cleanup_expired(self) -> None: """Remove expired transfers.""" expired = [k for k, v in self._transfers.items() if v.is_expired] for k in expired: del self._transfers[k] def _allocate_port(self) -> int: """Allocate a local port for a DCC transfer.""" port = self._next_port self._next_port += 1 return port def rewrite_outbound(self, dcc_text: str) -> str | None: """Rewrite outbound DCC command: replace local IP:port with b32:port. Returns rewritten DCC string, or None if cannot rewrite. """ parsed = self._parse_dcc(dcc_text) if parsed is None: return None if not self._can_add_transfer(): return None port = self._allocate_port() key = f"out-{port}" self._transfers[key] = DCCTransfer( remote_dest="", local_port=port, dcc_type=parsed["type"], filename=parsed.get("filename", ""), ) # Rebuild DCC command with I2P b32 address parts = [f"DCC {parsed['type']} {parsed['filename']} {self._local_b32} {port}"] if "size" in parsed: parts[0] += f" {parsed['size']}" return parts[0] def rewrite_inbound(self, dcc_text: str, remote_dest: str) -> str | None: """Rewrite inbound DCC command: replace b32:port with local socket. Returns rewritten DCC string, or None if cannot rewrite. """ parsed = self._parse_dcc(dcc_text) if parsed is None: return None if not self._can_add_transfer(): return None local_port = self._allocate_port() key = f"in-{local_port}" self._transfers[key] = DCCTransfer( remote_dest=remote_dest, local_port=local_port, dcc_type=parsed["type"], filename=parsed.get("filename", ""), ) parts = [f"DCC {parsed['type']} {parsed['filename']} 2130706433 {local_port}"] if "size" in parsed: parts[0] += f" {parsed['size']}" return parts[0]