A Python port of the Invisible Internet Project (I2P)
at main 35 lines 882 B view raw
1"""Thread-safe object counter. 2 3Ported from net.i2p.util.ObjectCounter. 4""" 5 6from __future__ import annotations 7 8import threading 9 10 11class ObjectCounter: 12 """Thread-safe counter for named keys.""" 13 14 def __init__(self) -> None: 15 self._lock = threading.Lock() 16 self._counts: dict[str, int] = {} 17 18 def increment(self, key: str) -> int: 19 with self._lock: 20 self._counts[key] = self._counts.get(key, 0) + 1 21 return self._counts[key] 22 23 def decrement(self, key: str) -> int: 24 with self._lock: 25 val = self._counts.get(key, 0) - 1 26 self._counts[key] = max(0, val) 27 return self._counts[key] 28 29 def count(self, key: str) -> int: 30 with self._lock: 31 return self._counts.get(key, 0) 32 33 def total(self) -> int: 34 with self._lock: 35 return sum(self._counts.values())