A Python port of the Invisible Internet Project (I2P)
at main 96 lines 2.8 kB view raw
1"""Torrent metainfo — .torrent file parsing. 2 3Parses BitTorrent .torrent files (bencoded metainfo dictionaries) 4to extract tracker URL, file list, piece hashes, and info hash. 5 6Ported from org.klomp.snark.MetaInfo. 7""" 8 9from __future__ import annotations 10 11import hashlib 12import math 13from dataclasses import dataclass, field 14 15from i2p_apps.snark.bencode import bdecode, bencode 16 17 18@dataclass 19class FileEntry: 20 """A file within a torrent.""" 21 path: str 22 length: int 23 24 25class MetaInfo: 26 """Parsed torrent metainfo.""" 27 28 def __init__( 29 self, 30 announce: str, 31 name: str, 32 piece_length: int, 33 pieces: bytes, 34 total_size: int, 35 files: list[FileEntry], 36 info_hash: bytes, 37 ) -> None: 38 self.announce = announce 39 self.name = name 40 self.piece_length = piece_length 41 self.pieces = pieces 42 self.total_size = total_size 43 self.files = files 44 self.info_hash = info_hash 45 46 @property 47 def piece_count(self) -> int: 48 return max(1, math.ceil(self.total_size / self.piece_length)) 49 50 @property 51 def is_i2p(self) -> bool: 52 """Check if this torrent uses an I2P tracker.""" 53 return ".i2p" in self.announce 54 55 @classmethod 56 def from_bytes(cls, data: bytes) -> MetaInfo: 57 """Parse a .torrent file from raw bytes.""" 58 torrent = bdecode(data) 59 if not isinstance(torrent, dict): 60 raise ValueError("Invalid torrent: not a dictionary") 61 62 announce = torrent.get(b"announce", b"").decode("utf-8", errors="replace") 63 info = torrent[b"info"] 64 65 name = info.get(b"name", b"unknown").decode("utf-8", errors="replace") 66 piece_length = info[b"piece length"] 67 pieces = info[b"pieces"] 68 69 # Calculate info hash 70 info_hash = hashlib.sha1(bencode(info)).digest() # nosec B324 — BitTorrent protocol requires SHA-1 info hash 71 72 # Parse files 73 if b"files" in info: 74 # Multi-file torrent 75 files = [] 76 total_size = 0 77 for f in info[b"files"]: 78 path_parts = [p.decode("utf-8", errors="replace") for p in f[b"path"]] 79 path = "/".join(path_parts) 80 length = f[b"length"] 81 files.append(FileEntry(path=path, length=length)) 82 total_size += length 83 else: 84 # Single-file torrent 85 total_size = info[b"length"] 86 files = [FileEntry(path=name, length=total_size)] 87 88 return cls( 89 announce=announce, 90 name=name, 91 piece_length=piece_length, 92 pieces=pieces, 93 total_size=total_size, 94 files=files, 95 info_hash=info_hash, 96 )