A Python port of the Invisible Internet Project (I2P)
at main 109 lines 3.6 kB view raw
1"""UDP/Streamr tunnels — datagram pub/sub for streaming data over I2P. 2 3StreamrClientTask subscribes to an I2P source and relays datagrams locally. 4StreamrServerTask reads local UDP and broadcasts to I2P subscribers. 5 6Ported from net.i2p.i2ptunnel.streamr.StreamrConsumer/StreamrProducer. 7""" 8 9from __future__ import annotations 10 11import logging 12import time 13from dataclasses import dataclass, field 14 15from i2p_apps.i2ptunnel.config import TunnelDefinition 16from i2p_apps.i2ptunnel.tasks import TunnelTask 17 18logger = logging.getLogger(__name__) 19 20# Protocol bytes 21SUBSCRIBE_BYTE = b"\x00" 22UNSUBSCRIBE_BYTE = b"\x01" 23 24 25@dataclass 26class Subscriber: 27 """Tracks an I2P subscriber with expiry.""" 28 29 destination: str 30 last_seen: float = field(default_factory=time.monotonic) 31 32 def is_expired(self, timeout: float = 60.0) -> bool: 33 return (time.monotonic() - self.last_seen) > timeout 34 35 def refresh(self) -> None: 36 self.last_seen = time.monotonic() 37 38 39class StreamrClientTask(TunnelTask): 40 """Subscribe to I2P source, relay datagrams to local UDP. 41 42 Sends periodic subscribe pings to keep subscription alive. 43 """ 44 45 def __init__(self, config: TunnelDefinition, session) -> None: 46 super().__init__(config, session) 47 self._ping_interval = float(config.options.get("pingInterval", "10")) 48 self._source_dest = config.target_destination 49 50 async def open(self) -> None: 51 self._open = True 52 logger.info("Streamr client %r subscribing to %s", 53 self._config.name, self._source_dest) 54 55 async def close(self) -> None: 56 self._open = False 57 58 59class StreamrServerTask(TunnelTask): 60 """Read local UDP, broadcast to I2P subscribers. 61 62 Manages subscriber set with expiry and max limit. 63 """ 64 65 def __init__(self, config: TunnelDefinition, session) -> None: 66 super().__init__(config, session) 67 self._max_subscribers = int(config.options.get("maxSubscribers", "10")) 68 self._subscriber_timeout = float(config.options.get("subscriberTimeout", "60")) 69 self._subscribers: dict[str, Subscriber] = {} 70 71 def _handle_subscribe(self, destination: str) -> None: 72 """Handle a subscribe request from a peer.""" 73 if destination in self._subscribers: 74 self._subscribers[destination].refresh() 75 return 76 77 # Enforce max subscribers 78 if len(self._subscribers) >= self._max_subscribers: 79 # Try to evict expired ones first 80 self._cleanup_expired() 81 if len(self._subscribers) >= self._max_subscribers: 82 logger.warning("Max subscribers reached (%d), rejecting %s", 83 self._max_subscribers, destination[:20]) 84 return 85 86 self._subscribers[destination] = Subscriber(destination=destination) 87 88 def _handle_unsubscribe(self, destination: str) -> None: 89 """Handle an unsubscribe request from a peer.""" 90 self._subscribers.pop(destination, None) 91 92 def _cleanup_expired(self) -> None: 93 """Remove expired subscribers.""" 94 expired = [ 95 k for k, v in self._subscribers.items() 96 if v.is_expired(self._subscriber_timeout) 97 ] 98 for k in expired: 99 del self._subscribers[k] 100 101 async def open(self) -> None: 102 self._open = True 103 logger.info("Streamr server %r started on %s:%d", 104 self._config.name, self._config.target_host, 105 self._config.target_port) 106 107 async def close(self) -> None: 108 self._open = False 109 self._subscribers.clear()