A Python port of the Invisible Internet Project (I2P)
at main 100 lines 2.7 kB view raw
1"""Bencode — BitTorrent encoding/decoding. 2 3Implements the bencode binary format used by BitTorrent for 4.torrent files and peer wire protocol messages. 5 6Standard bencode types: 7- Integers: i42e 8- Strings: 4:spam (length-prefixed) 9- Lists: l...e 10- Dicts: d...e (keys must be sorted byte strings) 11""" 12 13from __future__ import annotations 14 15from io import BytesIO 16from typing import Any 17 18 19def bencode(obj) -> bytes: 20 """Encode a Python object to bencode.""" 21 if isinstance(obj, int): 22 return f"i{obj}e".encode("ascii") 23 elif isinstance(obj, bytes): 24 return f"{len(obj)}:".encode("ascii") + obj 25 elif isinstance(obj, str): 26 encoded = obj.encode("utf-8") 27 return f"{len(encoded)}:".encode("ascii") + encoded 28 elif isinstance(obj, list): 29 return b"l" + b"".join(bencode(item) for item in obj) + b"e" 30 elif isinstance(obj, dict): 31 items = sorted(obj.items(), key=lambda kv: kv[0]) 32 encoded = b"".join(bencode(k) + bencode(v) for k, v in items) 33 return b"d" + encoded + b"e" 34 else: 35 raise TypeError(f"Cannot bencode {type(obj)}") 36 37 38def bdecode(data: bytes): 39 """Decode bencode data to a Python object.""" 40 stream = BytesIO(data) 41 return _decode(stream) 42 43 44def _decode(stream: BytesIO): 45 """Decode one value from the stream.""" 46 ch = stream.read(1) 47 if not ch: 48 raise ValueError("Unexpected end of data") 49 50 if ch == b"i": 51 return _decode_int(stream) 52 elif ch == b"l": 53 return _decode_list(stream) 54 elif ch == b"d": 55 return _decode_dict(stream) 56 elif ch.isdigit(): 57 return _decode_string(stream, ch) 58 else: 59 raise ValueError(f"Invalid bencode character: {ch!r}") 60 61 62def _decode_int(stream: BytesIO) -> int: 63 buf = b"" 64 while True: 65 ch = stream.read(1) 66 if ch == b"e": 67 return int(buf) 68 buf += ch 69 70 71def _decode_string(stream: BytesIO, first_digit: bytes) -> bytes: 72 length_str = first_digit 73 while True: 74 ch = stream.read(1) 75 if ch == b":": 76 length = int(length_str) 77 return stream.read(length) 78 length_str += ch 79 80 81def _decode_list(stream: BytesIO) -> list[Any]: 82 result: list[Any] = [] 83 while True: 84 ch = stream.read(1) 85 if ch == b"e": 86 return result 87 stream.seek(stream.tell() - 1) 88 result.append(_decode(stream)) 89 90 91def _decode_dict(stream: BytesIO) -> dict[Any, Any]: 92 result: dict[Any, Any] = {} 93 while True: 94 ch = stream.read(1) 95 if ch == b"e": 96 return result 97 stream.seek(stream.tell() - 1) 98 key = _decode(stream) 99 value = _decode(stream) 100 result[key] = value