"""NTP-adjusted clock. Provides a time source that can be adjusted by an offset (e.g. from NTP or SSU time synchronization). Ported from net.i2p.util.Clock. """ from __future__ import annotations import threading import time class Clock: """Clock with adjustable offset (in milliseconds).""" def __init__(self) -> None: self._lock = threading.Lock() self._offset_ms: float = 0.0 self._updated: bool = False def now(self) -> float: """Current time in milliseconds, adjusted by offset.""" return time.time() * 1000.0 + self._offset_ms def get_offset(self) -> float: """Current offset in milliseconds.""" with self._lock: return self._offset_ms def set_offset(self, offset_ms: float) -> None: """Set the clock offset in milliseconds.""" with self._lock: self._offset_ms = offset_ms self._updated = True def was_updated(self) -> bool: """Whether set_offset has been called at least once.""" with self._lock: return self._updated