A Python port of the Invisible Internet Project (I2P)
1"""NTP-adjusted clock.
2
3Provides a time source that can be adjusted by an offset (e.g. from
4NTP or SSU time synchronization).
5
6Ported from net.i2p.util.Clock.
7"""
8
9from __future__ import annotations
10
11import threading
12import time
13
14
15class Clock:
16 """Clock with adjustable offset (in milliseconds)."""
17
18 def __init__(self) -> None:
19 self._lock = threading.Lock()
20 self._offset_ms: float = 0.0
21 self._updated: bool = False
22
23 def now(self) -> float:
24 """Current time in milliseconds, adjusted by offset."""
25 return time.time() * 1000.0 + self._offset_ms
26
27 def get_offset(self) -> float:
28 """Current offset in milliseconds."""
29 with self._lock:
30 return self._offset_ms
31
32 def set_offset(self, offset_ms: float) -> None:
33 """Set the clock offset in milliseconds."""
34 with self._lock:
35 self._offset_ms = offset_ms
36 self._updated = True
37
38 def was_updated(self) -> bool:
39 """Whether set_offset has been called at least once."""
40 with self._lock:
41 return self._updated