"""DataHelper — I2P wire format serialization utilities. Ported from net.i2p.data.DataHelper. """ import base64 import io from datetime import datetime, timezone def read_long(stream: io.IOBase, num_bytes: int) -> int: """Read a big-endian unsigned integer of num_bytes from stream.""" data = stream.read(num_bytes) if len(data) != num_bytes: raise ValueError(f"Expected {num_bytes} bytes, got {len(data)}") return int.from_bytes(data, "big") def write_long(stream: io.IOBase, num_bytes: int, value: int) -> None: """Write a big-endian unsigned integer of num_bytes to stream.""" stream.write(value.to_bytes(num_bytes, "big")) def read_string(stream: io.IOBase) -> str: """Read a length-prefixed UTF-8 string (1-byte length prefix).""" length = read_long(stream, 1) if length == 0: return "" data = stream.read(length) if len(data) != length: raise ValueError(f"Expected {length} bytes for string, got {len(data)}") return data.decode("utf-8") def write_string(stream: io.IOBase, value: str) -> None: """Write a length-prefixed UTF-8 string (1-byte length prefix).""" encoded = value.encode("utf-8") if len(encoded) > 255: raise ValueError(f"String too long: {len(encoded)} > 255") write_long(stream, 1, len(encoded)) stream.write(encoded) def read_properties(stream: io.IOBase) -> dict[str, str]: """Read I2P wire format properties: 2-byte total length, then 'key=value;\\n' pairs.""" total_len = read_long(stream, 2) if total_len == 0: return {} data = stream.read(total_len) if len(data) != total_len: raise ValueError(f"Expected {total_len} bytes for properties, got {len(data)}") props = {} text = data.decode("utf-8") for line in text.split("\n"): line = line.strip() if not line or line == ";": continue if line.endswith(";"): line = line[:-1] if "=" in line: key, value = line.split("=", 1) props[key] = value return props def write_properties(stream: io.IOBase, props: dict[str, str]) -> None: """Write I2P wire format properties: 2-byte total length, then 'key=value;\\n' pairs.""" if not props: write_long(stream, 2, 0) return buf = io.BytesIO() for key in sorted(props.keys()): line = f"{key}={props[key]};\n" buf.write(line.encode("utf-8")) data = buf.getvalue() write_long(stream, 2, len(data)) stream.write(data) def to_date(timestamp_ms: int) -> datetime: """Convert millisecond timestamp to datetime (UTC).""" return datetime.fromtimestamp(timestamp_ms / 1000.0, tz=timezone.utc) def from_date(dt: datetime) -> int: """Convert datetime to millisecond timestamp.""" return int(dt.timestamp() * 1000) # I2P Base64 uses a modified alphabet: '-' instead of '+', '~' instead of '/' # This matches Java's net.i2p.data.Base64 implementation. _I2P_B64_ALTCHARS = b"-~" def to_base64(data: bytes) -> str: """Encode bytes to I2P-style Base64 (I2P alphabet, with padding). Java's net.i2p.data.Base64.encode() includes '=' padding, so we do too. """ return base64.b64encode(data, altchars=_I2P_B64_ALTCHARS).decode("ascii") def from_base64(s: str) -> bytes: """Decode I2P-style Base64 (add padding back, I2P alphabet).""" padding = 4 - (len(s) % 4) if padding != 4: s += "=" * padding return base64.b64decode(s, altchars=_I2P_B64_ALTCHARS) def to_base32(data: bytes) -> str: """Encode bytes to lowercase Base32 without padding.""" return base64.b32encode(data).rstrip(b"=").decode("ascii").lower() def from_base32(s: str) -> bytes: """Decode Base32 (add padding back).""" s = s.upper() padding = 8 - (len(s) % 8) if padding != 8: s += "=" * padding return base64.b32decode(s)