A Python port of the Invisible Internet Project (I2P)
at main 142 lines 4.2 kB view raw
1"""DCC helper — IRC DCC file transfer address rewriting. 2 3Manages DCC SEND/CHAT tunneling through I2P by rewriting 4IP addresses and ports in DCC CTCP commands. 5 6Ported from net.i2p.i2ptunnel.irc.DCCHelper / I2PTunnelDCCClient/Server. 7""" 8 9from __future__ import annotations 10 11import re 12import time 13from dataclasses import dataclass, field 14 15_DCC_RE = re.compile( 16 r"DCC\s+(SEND|CHAT|RESUME|ACCEPT)\s+(\S+)\s+(\d+)\s+(\d+)(?:\s+(\d+))?", 17 re.IGNORECASE, 18) 19 20# Default transfer expiry: 30 minutes 21_DEFAULT_EXPIRY_SECONDS = 1800 22 23 24@dataclass 25class DCCTransfer: 26 """Tracks one DCC file transfer.""" 27 28 remote_dest: str 29 local_port: int 30 dcc_type: str = "SEND" 31 filename: str = "" 32 expires_at: float = 0.0 33 34 def __post_init__(self): 35 if self.expires_at == 0.0: 36 self.expires_at = time.monotonic() + _DEFAULT_EXPIRY_SECONDS 37 38 @property 39 def is_expired(self) -> bool: 40 return time.monotonic() > self.expires_at 41 42 43class DCCHelper: 44 """Central manager for DCC address rewriting through I2P.""" 45 46 def __init__( 47 self, 48 local_b32: str = "", 49 max_transfers: int = 5, 50 ) -> None: 51 self._local_b32 = local_b32 52 self._max_transfers = max_transfers 53 self._transfers: dict[str, DCCTransfer] = {} 54 self._next_port = 30000 55 56 def _parse_dcc(self, text: str) -> dict[str, str] | None: 57 """Parse a DCC command string. 58 59 Returns dict with keys: type, filename, ip, port, size (optional). 60 """ 61 m = _DCC_RE.match(text) 62 if not m: 63 return None 64 result = { 65 "type": m.group(1).upper(), 66 "filename": m.group(2), 67 "ip": m.group(3), 68 "port": m.group(4), 69 } 70 if m.group(5): 71 result["size"] = m.group(5) 72 return result 73 74 def _can_add_transfer(self) -> bool: 75 """Check if we can accept another transfer.""" 76 self._cleanup_expired() 77 return len(self._transfers) < self._max_transfers 78 79 def _cleanup_expired(self) -> None: 80 """Remove expired transfers.""" 81 expired = [k for k, v in self._transfers.items() if v.is_expired] 82 for k in expired: 83 del self._transfers[k] 84 85 def _allocate_port(self) -> int: 86 """Allocate a local port for a DCC transfer.""" 87 port = self._next_port 88 self._next_port += 1 89 return port 90 91 def rewrite_outbound(self, dcc_text: str) -> str | None: 92 """Rewrite outbound DCC command: replace local IP:port with b32:port. 93 94 Returns rewritten DCC string, or None if cannot rewrite. 95 """ 96 parsed = self._parse_dcc(dcc_text) 97 if parsed is None: 98 return None 99 100 if not self._can_add_transfer(): 101 return None 102 103 port = self._allocate_port() 104 key = f"out-{port}" 105 self._transfers[key] = DCCTransfer( 106 remote_dest="", 107 local_port=port, 108 dcc_type=parsed["type"], 109 filename=parsed.get("filename", ""), 110 ) 111 112 # Rebuild DCC command with I2P b32 address 113 parts = [f"DCC {parsed['type']} {parsed['filename']} {self._local_b32} {port}"] 114 if "size" in parsed: 115 parts[0] += f" {parsed['size']}" 116 return parts[0] 117 118 def rewrite_inbound(self, dcc_text: str, remote_dest: str) -> str | None: 119 """Rewrite inbound DCC command: replace b32:port with local socket. 120 121 Returns rewritten DCC string, or None if cannot rewrite. 122 """ 123 parsed = self._parse_dcc(dcc_text) 124 if parsed is None: 125 return None 126 127 if not self._can_add_transfer(): 128 return None 129 130 local_port = self._allocate_port() 131 key = f"in-{local_port}" 132 self._transfers[key] = DCCTransfer( 133 remote_dest=remote_dest, 134 local_port=local_port, 135 dcc_type=parsed["type"], 136 filename=parsed.get("filename", ""), 137 ) 138 139 parts = [f"DCC {parsed['type']} {parsed['filename']} 2130706433 {local_port}"] 140 if "size" in parsed: 141 parts[0] += f" {parsed['size']}" 142 return parts[0]