A Python port of the Invisible Internet Project (I2P)
1"""Bencode — BitTorrent encoding/decoding.
2
3Implements the bencode binary format used by BitTorrent for
4.torrent files and peer wire protocol messages.
5
6Standard bencode types:
7- Integers: i42e
8- Strings: 4:spam (length-prefixed)
9- Lists: l...e
10- Dicts: d...e (keys must be sorted byte strings)
11"""
12
13from __future__ import annotations
14
15from io import BytesIO
16from typing import Any
17
18
19def bencode(obj) -> bytes:
20 """Encode a Python object to bencode."""
21 if isinstance(obj, int):
22 return f"i{obj}e".encode("ascii")
23 elif isinstance(obj, bytes):
24 return f"{len(obj)}:".encode("ascii") + obj
25 elif isinstance(obj, str):
26 encoded = obj.encode("utf-8")
27 return f"{len(encoded)}:".encode("ascii") + encoded
28 elif isinstance(obj, list):
29 return b"l" + b"".join(bencode(item) for item in obj) + b"e"
30 elif isinstance(obj, dict):
31 items = sorted(obj.items(), key=lambda kv: kv[0])
32 encoded = b"".join(bencode(k) + bencode(v) for k, v in items)
33 return b"d" + encoded + b"e"
34 else:
35 raise TypeError(f"Cannot bencode {type(obj)}")
36
37
38def bdecode(data: bytes):
39 """Decode bencode data to a Python object."""
40 stream = BytesIO(data)
41 return _decode(stream)
42
43
44def _decode(stream: BytesIO):
45 """Decode one value from the stream."""
46 ch = stream.read(1)
47 if not ch:
48 raise ValueError("Unexpected end of data")
49
50 if ch == b"i":
51 return _decode_int(stream)
52 elif ch == b"l":
53 return _decode_list(stream)
54 elif ch == b"d":
55 return _decode_dict(stream)
56 elif ch.isdigit():
57 return _decode_string(stream, ch)
58 else:
59 raise ValueError(f"Invalid bencode character: {ch!r}")
60
61
62def _decode_int(stream: BytesIO) -> int:
63 buf = b""
64 while True:
65 ch = stream.read(1)
66 if ch == b"e":
67 return int(buf)
68 buf += ch
69
70
71def _decode_string(stream: BytesIO, first_digit: bytes) -> bytes:
72 length_str = first_digit
73 while True:
74 ch = stream.read(1)
75 if ch == b":":
76 length = int(length_str)
77 return stream.read(length)
78 length_str += ch
79
80
81def _decode_list(stream: BytesIO) -> list[Any]:
82 result: list[Any] = []
83 while True:
84 ch = stream.read(1)
85 if ch == b"e":
86 return result
87 stream.seek(stream.tell() - 1)
88 result.append(_decode(stream))
89
90
91def _decode_dict(stream: BytesIO) -> dict[Any, Any]:
92 result: dict[Any, Any] = {}
93 while True:
94 ch = stream.read(1)
95 if ch == b"e":
96 return result
97 stream.seek(stream.tell() - 1)
98 key = _decode(stream)
99 value = _decode(stream)
100 result[key] = value