A Python port of the Invisible Internet Project (I2P)
at main 82 lines 2.4 kB view raw
1"""I2CP session state machine.""" 2 3import enum 4from typing import Callable 5 6 7class SessionState(enum.Enum): 8 INIT = 0 9 OPENING = 1 10 OPEN = 2 11 CLOSING = 3 12 CLOSED = 4 13 ERROR = 5 14 15 16class SessionId: 17 """2-byte session identifier.""" 18 19 def __init__(self, value: int): 20 if value < 0 or value > 0xFFFF: 21 raise ValueError(f"SessionId must be 0..65535, got {value}") 22 self._value = value 23 24 def __int__(self) -> int: 25 return self._value 26 27 def __eq__(self, other) -> bool: 28 if not isinstance(other, SessionId): 29 return NotImplemented 30 return self._value == other._value 31 32 def __hash__(self) -> int: 33 return hash(self._value) 34 35 def __repr__(self) -> str: 36 return f"SessionId({self._value})" 37 38 39class I2CPSession: 40 """I2CP session state machine.""" 41 42 def __init__(self): 43 self.state = SessionState.INIT 44 self.session_id: SessionId | None = None 45 self.destination_data: bytes | None = None 46 self.config = None 47 self.error_message: str | None = None 48 self._message_handlers: list[Callable] = [] 49 50 def open(self, destination_data: bytes, config): 51 if self.state != SessionState.INIT: 52 raise RuntimeError(f"Cannot open from state {self.state}") 53 self.destination_data = destination_data 54 self.config = config 55 self.state = SessionState.OPENING 56 57 def confirm_open(self, session_id: SessionId): 58 if self.state != SessionState.OPENING: 59 raise RuntimeError(f"Cannot confirm_open from state {self.state}") 60 self.session_id = session_id 61 self.state = SessionState.OPEN 62 63 def close(self): 64 if self.state != SessionState.OPEN: 65 raise RuntimeError(f"Cannot close from state {self.state}") 66 self.state = SessionState.CLOSING 67 68 def confirm_close(self): 69 if self.state != SessionState.CLOSING: 70 raise RuntimeError(f"Cannot confirm_close from state {self.state}") 71 self.state = SessionState.CLOSED 72 73 def set_error(self, message: str): 74 self.error_message = message 75 self.state = SessionState.ERROR 76 77 def on_message(self, handler: Callable): 78 self._message_handlers.append(handler) 79 80 def deliver_message(self, payload: bytes): 81 for handler in self._message_handlers: 82 handler(payload)