A Python port of the Invisible Internet Project (I2P)
1"""DCC helper — IRC DCC file transfer address rewriting.
2
3Manages DCC SEND/CHAT tunneling through I2P by rewriting
4IP addresses and ports in DCC CTCP commands.
5
6Ported from net.i2p.i2ptunnel.irc.DCCHelper / I2PTunnelDCCClient/Server.
7"""
8
9from __future__ import annotations
10
11import re
12import time
13from dataclasses import dataclass, field
14
15_DCC_RE = re.compile(
16 r"DCC\s+(SEND|CHAT|RESUME|ACCEPT)\s+(\S+)\s+(\d+)\s+(\d+)(?:\s+(\d+))?",
17 re.IGNORECASE,
18)
19
20# Default transfer expiry: 30 minutes
21_DEFAULT_EXPIRY_SECONDS = 1800
22
23
24@dataclass
25class DCCTransfer:
26 """Tracks one DCC file transfer."""
27
28 remote_dest: str
29 local_port: int
30 dcc_type: str = "SEND"
31 filename: str = ""
32 expires_at: float = 0.0
33
34 def __post_init__(self):
35 if self.expires_at == 0.0:
36 self.expires_at = time.monotonic() + _DEFAULT_EXPIRY_SECONDS
37
38 @property
39 def is_expired(self) -> bool:
40 return time.monotonic() > self.expires_at
41
42
43class DCCHelper:
44 """Central manager for DCC address rewriting through I2P."""
45
46 def __init__(
47 self,
48 local_b32: str = "",
49 max_transfers: int = 5,
50 ) -> None:
51 self._local_b32 = local_b32
52 self._max_transfers = max_transfers
53 self._transfers: dict[str, DCCTransfer] = {}
54 self._next_port = 30000
55
56 def _parse_dcc(self, text: str) -> dict[str, str] | None:
57 """Parse a DCC command string.
58
59 Returns dict with keys: type, filename, ip, port, size (optional).
60 """
61 m = _DCC_RE.match(text)
62 if not m:
63 return None
64 result = {
65 "type": m.group(1).upper(),
66 "filename": m.group(2),
67 "ip": m.group(3),
68 "port": m.group(4),
69 }
70 if m.group(5):
71 result["size"] = m.group(5)
72 return result
73
74 def _can_add_transfer(self) -> bool:
75 """Check if we can accept another transfer."""
76 self._cleanup_expired()
77 return len(self._transfers) < self._max_transfers
78
79 def _cleanup_expired(self) -> None:
80 """Remove expired transfers."""
81 expired = [k for k, v in self._transfers.items() if v.is_expired]
82 for k in expired:
83 del self._transfers[k]
84
85 def _allocate_port(self) -> int:
86 """Allocate a local port for a DCC transfer."""
87 port = self._next_port
88 self._next_port += 1
89 return port
90
91 def rewrite_outbound(self, dcc_text: str) -> str | None:
92 """Rewrite outbound DCC command: replace local IP:port with b32:port.
93
94 Returns rewritten DCC string, or None if cannot rewrite.
95 """
96 parsed = self._parse_dcc(dcc_text)
97 if parsed is None:
98 return None
99
100 if not self._can_add_transfer():
101 return None
102
103 port = self._allocate_port()
104 key = f"out-{port}"
105 self._transfers[key] = DCCTransfer(
106 remote_dest="",
107 local_port=port,
108 dcc_type=parsed["type"],
109 filename=parsed.get("filename", ""),
110 )
111
112 # Rebuild DCC command with I2P b32 address
113 parts = [f"DCC {parsed['type']} {parsed['filename']} {self._local_b32} {port}"]
114 if "size" in parsed:
115 parts[0] += f" {parsed['size']}"
116 return parts[0]
117
118 def rewrite_inbound(self, dcc_text: str, remote_dest: str) -> str | None:
119 """Rewrite inbound DCC command: replace b32:port with local socket.
120
121 Returns rewritten DCC string, or None if cannot rewrite.
122 """
123 parsed = self._parse_dcc(dcc_text)
124 if parsed is None:
125 return None
126
127 if not self._can_add_transfer():
128 return None
129
130 local_port = self._allocate_port()
131 key = f"in-{local_port}"
132 self._transfers[key] = DCCTransfer(
133 remote_dest=remote_dest,
134 local_port=local_port,
135 dcc_type=parsed["type"],
136 filename=parsed.get("filename", ""),
137 )
138
139 parts = [f"DCC {parsed['type']} {parsed['filename']} 2130706433 {local_port}"]
140 if "size" in parsed:
141 parts[0] += f" {parsed['size']}"
142 return parts[0]