"""Bencode — BitTorrent encoding/decoding. Implements the bencode binary format used by BitTorrent for .torrent files and peer wire protocol messages. Standard bencode types: - Integers: i42e - Strings: 4:spam (length-prefixed) - Lists: l...e - Dicts: d...e (keys must be sorted byte strings) """ from __future__ import annotations from io import BytesIO from typing import Any def bencode(obj) -> bytes: """Encode a Python object to bencode.""" if isinstance(obj, int): return f"i{obj}e".encode("ascii") elif isinstance(obj, bytes): return f"{len(obj)}:".encode("ascii") + obj elif isinstance(obj, str): encoded = obj.encode("utf-8") return f"{len(encoded)}:".encode("ascii") + encoded elif isinstance(obj, list): return b"l" + b"".join(bencode(item) for item in obj) + b"e" elif isinstance(obj, dict): items = sorted(obj.items(), key=lambda kv: kv[0]) encoded = b"".join(bencode(k) + bencode(v) for k, v in items) return b"d" + encoded + b"e" else: raise TypeError(f"Cannot bencode {type(obj)}") def bdecode(data: bytes): """Decode bencode data to a Python object.""" stream = BytesIO(data) return _decode(stream) def _decode(stream: BytesIO): """Decode one value from the stream.""" ch = stream.read(1) if not ch: raise ValueError("Unexpected end of data") if ch == b"i": return _decode_int(stream) elif ch == b"l": return _decode_list(stream) elif ch == b"d": return _decode_dict(stream) elif ch.isdigit(): return _decode_string(stream, ch) else: raise ValueError(f"Invalid bencode character: {ch!r}") def _decode_int(stream: BytesIO) -> int: buf = b"" while True: ch = stream.read(1) if ch == b"e": return int(buf) buf += ch def _decode_string(stream: BytesIO, first_digit: bytes) -> bytes: length_str = first_digit while True: ch = stream.read(1) if ch == b":": length = int(length_str) return stream.read(length) length_str += ch def _decode_list(stream: BytesIO) -> list[Any]: result: list[Any] = [] while True: ch = stream.read(1) if ch == b"e": return result stream.seek(stream.tell() - 1) result.append(_decode(stream)) def _decode_dict(stream: BytesIO) -> dict[Any, Any]: result: dict[Any, Any] = {} while True: ch = stream.read(1) if ch == b"e": return result stream.seek(stream.tell() - 1) key = _decode(stream) value = _decode(stream) result[key] = value