"""Clock — time with offset management. Ported from net.i2p.util.Clock (lives in util/ in Java, but logically belongs with the time module). """ import threading import time as _time from typing import Protocol, Set from i2p_time.build_time import BuildTime class ClockUpdateListener(Protocol): """Listener for clock offset changes.""" def offset_changed(self, delta: int) -> None: ... def _system_millis() -> int: """Current system time in milliseconds since epoch.""" return int(_time.time() * 1000) class Clock: """Alternate time source that maintains an offset from system clock. The offset compensates for system clock drift relative to a reference (e.g., NTP). Positive offset means system clock is slow; negative means system clock is fast. """ MAX_OFFSET = 3 * 24 * 60 * 60 * 1000 # 3 days MAX_LIVE_OFFSET = 10 * 60 * 1000 # 10 minutes MIN_OFFSET_CHANGE = 5 * 1000 # 5 seconds def __init__(self) -> None: self._lock = threading.Lock() self._listeners: Set[ClockUpdateListener] = set() self._offset: int = 0 self._already_changed: bool = False self._stat_created: bool = False now = _system_millis() min_time = BuildTime.get_earliest_time() max_time = BuildTime.get_latest_time() if now < min_time: self._offset = min_time - now self._is_system_clock_bad = True now = min_time elif now > max_time: self._offset = max_time - now self._is_system_clock_bad = True now = max_time else: self._is_system_clock_bad = False self._started_on = now def now(self) -> int: """Current time adjusted by offset, in milliseconds since epoch.""" return self._offset + _system_millis() def get_offset(self) -> int: """Current delta from system clock in milliseconds.""" with self._lock: return self._offset def get_updated_successfully(self) -> bool: """Whether the clock offset has been set at least once.""" return self._already_changed def set_offset(self, offset_ms: int, force: bool = False) -> None: """Set the clock offset. Args: offset_ms: Delta from system time in ms. Positive = system slow. force: If True, bypass sanity checks. """ with self._lock: delta = offset_ms - self._offset if not force: if not self._is_system_clock_bad and abs(offset_ms) > self.MAX_OFFSET: return if self._already_changed and ( _system_millis() - self._started_on > 10 * 60 * 1000 ): if abs(delta) > self.MAX_LIVE_OFFSET: return if abs(delta) < self.MIN_OFFSET_CHANGE: self._already_changed = True return self._already_changed = True self._offset = offset_ms self._fire_offset_changed(delta) def set_now(self, real_time: int, stratum: int = 0) -> None: """Set the clock to a known-correct time. Args: real_time: The actual current time in ms since epoch. stratum: NTP stratum (1-15, lower is better). Ignored in base Clock. """ if ( real_time < BuildTime.get_earliest_time() or real_time > BuildTime.get_latest_time() ): return diff = real_time - _system_millis() self.set_offset(diff) def add_update_listener(self, listener: ClockUpdateListener) -> None: self._listeners.add(listener) def remove_update_listener(self, listener: ClockUpdateListener) -> None: self._listeners.discard(listener) def _fire_offset_changed(self, delta: int) -> None: for listener in self._listeners: listener.offset_changed(delta)