A Python port of the Invisible Internet Project (I2P)
at main 121 lines 3.9 kB view raw
1"""DataHelper — I2P wire format serialization utilities. 2 3Ported from net.i2p.data.DataHelper. 4""" 5import base64 6import io 7from datetime import datetime, timezone 8 9 10def read_long(stream: io.IOBase, num_bytes: int) -> int: 11 """Read a big-endian unsigned integer of num_bytes from stream.""" 12 data = stream.read(num_bytes) 13 if len(data) != num_bytes: 14 raise ValueError(f"Expected {num_bytes} bytes, got {len(data)}") 15 return int.from_bytes(data, "big") 16 17 18def write_long(stream: io.IOBase, num_bytes: int, value: int) -> None: 19 """Write a big-endian unsigned integer of num_bytes to stream.""" 20 stream.write(value.to_bytes(num_bytes, "big")) 21 22 23def read_string(stream: io.IOBase) -> str: 24 """Read a length-prefixed UTF-8 string (1-byte length prefix).""" 25 length = read_long(stream, 1) 26 if length == 0: 27 return "" 28 data = stream.read(length) 29 if len(data) != length: 30 raise ValueError(f"Expected {length} bytes for string, got {len(data)}") 31 return data.decode("utf-8") 32 33 34def write_string(stream: io.IOBase, value: str) -> None: 35 """Write a length-prefixed UTF-8 string (1-byte length prefix).""" 36 encoded = value.encode("utf-8") 37 if len(encoded) > 255: 38 raise ValueError(f"String too long: {len(encoded)} > 255") 39 write_long(stream, 1, len(encoded)) 40 stream.write(encoded) 41 42 43def read_properties(stream: io.IOBase) -> dict[str, str]: 44 """Read I2P wire format properties: 2-byte total length, then 'key=value;\\n' pairs.""" 45 total_len = read_long(stream, 2) 46 if total_len == 0: 47 return {} 48 data = stream.read(total_len) 49 if len(data) != total_len: 50 raise ValueError(f"Expected {total_len} bytes for properties, got {len(data)}") 51 props = {} 52 text = data.decode("utf-8") 53 for line in text.split("\n"): 54 line = line.strip() 55 if not line or line == ";": 56 continue 57 if line.endswith(";"): 58 line = line[:-1] 59 if "=" in line: 60 key, value = line.split("=", 1) 61 props[key] = value 62 return props 63 64 65def write_properties(stream: io.IOBase, props: dict[str, str]) -> None: 66 """Write I2P wire format properties: 2-byte total length, then 'key=value;\\n' pairs.""" 67 if not props: 68 write_long(stream, 2, 0) 69 return 70 buf = io.BytesIO() 71 for key in sorted(props.keys()): 72 line = f"{key}={props[key]};\n" 73 buf.write(line.encode("utf-8")) 74 data = buf.getvalue() 75 write_long(stream, 2, len(data)) 76 stream.write(data) 77 78 79def to_date(timestamp_ms: int) -> datetime: 80 """Convert millisecond timestamp to datetime (UTC).""" 81 return datetime.fromtimestamp(timestamp_ms / 1000.0, tz=timezone.utc) 82 83 84def from_date(dt: datetime) -> int: 85 """Convert datetime to millisecond timestamp.""" 86 return int(dt.timestamp() * 1000) 87 88 89# I2P Base64 uses a modified alphabet: '-' instead of '+', '~' instead of '/' 90# This matches Java's net.i2p.data.Base64 implementation. 91_I2P_B64_ALTCHARS = b"-~" 92 93 94def to_base64(data: bytes) -> str: 95 """Encode bytes to I2P-style Base64 (I2P alphabet, with padding). 96 97 Java's net.i2p.data.Base64.encode() includes '=' padding, so we do too. 98 """ 99 return base64.b64encode(data, altchars=_I2P_B64_ALTCHARS).decode("ascii") 100 101 102def from_base64(s: str) -> bytes: 103 """Decode I2P-style Base64 (add padding back, I2P alphabet).""" 104 padding = 4 - (len(s) % 4) 105 if padding != 4: 106 s += "=" * padding 107 return base64.b64decode(s, altchars=_I2P_B64_ALTCHARS) 108 109 110def to_base32(data: bytes) -> str: 111 """Encode bytes to lowercase Base32 without padding.""" 112 return base64.b32encode(data).rstrip(b"=").decode("ascii").lower() 113 114 115def from_base32(s: str) -> bytes: 116 """Decode Base32 (add padding back).""" 117 s = s.upper() 118 padding = 8 - (len(s) % 8) 119 if padding != 8: 120 s += "=" * padding 121 return base64.b32decode(s)