"""Stream connection state machine.""" import enum from i2p_streaming.options import StreamOptions class ConnectionState(enum.Enum): NEW = 0 SYN_SENT = 1 SYN_RECEIVED = 2 ESTABLISHED = 3 CLOSE_WAIT = 4 LAST_ACK = 5 FIN_WAIT = 6 CLOSING = 7 TIME_WAIT = 8 CLOSED = 9 RESET = 10 class StreamConnection: """TCP-like connection state machine for I2P streaming.""" def __init__(self, options: StreamOptions | None = None): self._options = options or StreamOptions() self.state = ConnectionState.NEW self.next_send_seq = 0 self.next_recv_seq = 0 self.send_window = self._options.initial_window_size self.recv_window = self._options.initial_window_size self.destination: bytes = b"" def _require_state(self, *states: ConnectionState): if self.state not in states: raise RuntimeError(f"Invalid transition from {self.state}") def send_syn(self): self._require_state(ConnectionState.NEW) self.state = ConnectionState.SYN_SENT def receive_syn(self): self._require_state(ConnectionState.NEW) self.state = ConnectionState.SYN_RECEIVED def receive_syn_ack(self): self._require_state(ConnectionState.SYN_SENT) self.state = ConnectionState.ESTABLISHED def send_syn_ack(self): self._require_state(ConnectionState.SYN_RECEIVED) self.state = ConnectionState.ESTABLISHED def send_close(self): self._require_state(ConnectionState.ESTABLISHED, ConnectionState.CLOSE_WAIT) if self.state == ConnectionState.ESTABLISHED: self.state = ConnectionState.FIN_WAIT elif self.state == ConnectionState.CLOSE_WAIT: self.state = ConnectionState.LAST_ACK def receive_close(self): self._require_state(ConnectionState.ESTABLISHED, ConnectionState.FIN_WAIT) if self.state == ConnectionState.ESTABLISHED: self.state = ConnectionState.CLOSE_WAIT elif self.state == ConnectionState.FIN_WAIT: self.state = ConnectionState.CLOSING def receive_ack(self): self._require_state(ConnectionState.LAST_ACK) self.state = ConnectionState.CLOSED def receive_close_ack(self): self._require_state(ConnectionState.FIN_WAIT) self.state = ConnectionState.TIME_WAIT def timeout(self): self._require_state(ConnectionState.TIME_WAIT) self.state = ConnectionState.CLOSED def reset(self): self.state = ConnectionState.RESET def increment_send_seq(self): self.next_send_seq += 1 def increment_recv_seq(self): self.next_recv_seq += 1