A Python port of the Invisible Internet Project (I2P)
at main 88 lines 2.7 kB view raw
1"""Piece management — tracks download state of torrent pieces. 2 3Manages which pieces have been downloaded, which are in progress, 4and selects the next piece to request from peers. 5 6Ported from org.klomp.snark.PeerCoordinator / Storage piece tracking. 7""" 8 9from __future__ import annotations 10 11import enum 12import random 13 14 15class PieceState(enum.Enum): 16 MISSING = "missing" 17 REQUESTED = "requested" 18 COMPLETE = "complete" 19 20 21class PieceManager: 22 """Tracks piece download state for a torrent.""" 23 24 def __init__( 25 self, 26 total_pieces: int, 27 piece_length: int, 28 total_size: int, 29 ) -> None: 30 self.total_pieces = total_pieces 31 self.piece_length = piece_length 32 self.total_size = total_size 33 self._states: list[PieceState] = [PieceState.MISSING] * total_pieces 34 35 @property 36 def completed_count(self) -> int: 37 return sum(1 for s in self._states if s == PieceState.COMPLETE) 38 39 @property 40 def is_complete(self) -> bool: 41 return self.completed_count == self.total_pieces 42 43 @property 44 def percent_complete(self) -> float: 45 if self.total_pieces == 0: 46 return 100.0 47 return (self.completed_count / self.total_pieces) * 100.0 48 49 def state(self, piece_index: int) -> PieceState: 50 return self._states[piece_index] 51 52 def mark_complete(self, piece_index: int) -> None: 53 self._states[piece_index] = PieceState.COMPLETE 54 55 def mark_requested(self, piece_index: int) -> None: 56 if self._states[piece_index] == PieceState.MISSING: 57 self._states[piece_index] = PieceState.REQUESTED 58 59 def mark_missing(self, piece_index: int) -> None: 60 if self._states[piece_index] != PieceState.COMPLETE: 61 self._states[piece_index] = PieceState.MISSING 62 63 def next_needed(self, available: set[int]) -> int | None: 64 """Select the next piece to download from available pieces. 65 66 Uses random selection from missing pieces that the peer has. 67 """ 68 candidates = [ 69 i for i in available 70 if i < self.total_pieces and self._states[i] == PieceState.MISSING 71 ] 72 if not candidates: 73 return None 74 return random.choice(candidates) 75 76 def bitfield(self) -> bytes: 77 """Generate a bitfield representing completed pieces. 78 79 Each bit represents one piece (MSB first). 80 """ 81 num_bytes = (self.total_pieces + 7) // 8 82 result = bytearray(num_bytes) 83 for i, state in enumerate(self._states): 84 if state == PieceState.COMPLETE: 85 byte_idx = i // 8 86 bit_idx = 7 - (i % 8) 87 result[byte_idx] |= (1 << bit_idx) 88 return bytes(result)