A Python port of the Invisible Internet Project (I2P)
1"""DataHelper — I2P wire format serialization utilities.
2
3Ported from net.i2p.data.DataHelper.
4"""
5import base64
6import io
7from datetime import datetime, timezone
8
9
10def read_long(stream: io.IOBase, num_bytes: int) -> int:
11 """Read a big-endian unsigned integer of num_bytes from stream."""
12 data = stream.read(num_bytes)
13 if len(data) != num_bytes:
14 raise ValueError(f"Expected {num_bytes} bytes, got {len(data)}")
15 return int.from_bytes(data, "big")
16
17
18def write_long(stream: io.IOBase, num_bytes: int, value: int) -> None:
19 """Write a big-endian unsigned integer of num_bytes to stream."""
20 stream.write(value.to_bytes(num_bytes, "big"))
21
22
23def read_string(stream: io.IOBase) -> str:
24 """Read a length-prefixed UTF-8 string (1-byte length prefix)."""
25 length = read_long(stream, 1)
26 if length == 0:
27 return ""
28 data = stream.read(length)
29 if len(data) != length:
30 raise ValueError(f"Expected {length} bytes for string, got {len(data)}")
31 return data.decode("utf-8")
32
33
34def write_string(stream: io.IOBase, value: str) -> None:
35 """Write a length-prefixed UTF-8 string (1-byte length prefix)."""
36 encoded = value.encode("utf-8")
37 if len(encoded) > 255:
38 raise ValueError(f"String too long: {len(encoded)} > 255")
39 write_long(stream, 1, len(encoded))
40 stream.write(encoded)
41
42
43def read_properties(stream: io.IOBase) -> dict[str, str]:
44 """Read I2P wire format properties: 2-byte total length, then 'key=value;\\n' pairs."""
45 total_len = read_long(stream, 2)
46 if total_len == 0:
47 return {}
48 data = stream.read(total_len)
49 if len(data) != total_len:
50 raise ValueError(f"Expected {total_len} bytes for properties, got {len(data)}")
51 props = {}
52 text = data.decode("utf-8")
53 for line in text.split("\n"):
54 line = line.strip()
55 if not line or line == ";":
56 continue
57 if line.endswith(";"):
58 line = line[:-1]
59 if "=" in line:
60 key, value = line.split("=", 1)
61 props[key] = value
62 return props
63
64
65def write_properties(stream: io.IOBase, props: dict[str, str]) -> None:
66 """Write I2P wire format properties: 2-byte total length, then 'key=value;\\n' pairs."""
67 if not props:
68 write_long(stream, 2, 0)
69 return
70 buf = io.BytesIO()
71 for key in sorted(props.keys()):
72 line = f"{key}={props[key]};\n"
73 buf.write(line.encode("utf-8"))
74 data = buf.getvalue()
75 write_long(stream, 2, len(data))
76 stream.write(data)
77
78
79def to_date(timestamp_ms: int) -> datetime:
80 """Convert millisecond timestamp to datetime (UTC)."""
81 return datetime.fromtimestamp(timestamp_ms / 1000.0, tz=timezone.utc)
82
83
84def from_date(dt: datetime) -> int:
85 """Convert datetime to millisecond timestamp."""
86 return int(dt.timestamp() * 1000)
87
88
89# I2P Base64 uses a modified alphabet: '-' instead of '+', '~' instead of '/'
90# This matches Java's net.i2p.data.Base64 implementation.
91_I2P_B64_ALTCHARS = b"-~"
92
93
94def to_base64(data: bytes) -> str:
95 """Encode bytes to I2P-style Base64 (I2P alphabet, with padding).
96
97 Java's net.i2p.data.Base64.encode() includes '=' padding, so we do too.
98 """
99 return base64.b64encode(data, altchars=_I2P_B64_ALTCHARS).decode("ascii")
100
101
102def from_base64(s: str) -> bytes:
103 """Decode I2P-style Base64 (add padding back, I2P alphabet)."""
104 padding = 4 - (len(s) % 4)
105 if padding != 4:
106 s += "=" * padding
107 return base64.b64decode(s, altchars=_I2P_B64_ALTCHARS)
108
109
110def to_base32(data: bytes) -> str:
111 """Encode bytes to lowercase Base32 without padding."""
112 return base64.b32encode(data).rstrip(b"=").decode("ascii").lower()
113
114
115def from_base32(s: str) -> bytes:
116 """Decode Base32 (add padding back)."""
117 s = s.upper()
118 padding = 8 - (len(s) % 8)
119 if padding != 8:
120 s += "=" * padding
121 return base64.b32decode(s)