A Python port of the Invisible Internet Project (I2P)
at main 41 lines 1.1 kB view raw
1"""NTP-adjusted clock. 2 3Provides a time source that can be adjusted by an offset (e.g. from 4NTP or SSU time synchronization). 5 6Ported from net.i2p.util.Clock. 7""" 8 9from __future__ import annotations 10 11import threading 12import time 13 14 15class Clock: 16 """Clock with adjustable offset (in milliseconds).""" 17 18 def __init__(self) -> None: 19 self._lock = threading.Lock() 20 self._offset_ms: float = 0.0 21 self._updated: bool = False 22 23 def now(self) -> float: 24 """Current time in milliseconds, adjusted by offset.""" 25 return time.time() * 1000.0 + self._offset_ms 26 27 def get_offset(self) -> float: 28 """Current offset in milliseconds.""" 29 with self._lock: 30 return self._offset_ms 31 32 def set_offset(self, offset_ms: float) -> None: 33 """Set the clock offset in milliseconds.""" 34 with self._lock: 35 self._offset_ms = offset_ms 36 self._updated = True 37 38 def was_updated(self) -> bool: 39 """Whether set_offset has been called at least once.""" 40 with self._lock: 41 return self._updated