A Python port of the Invisible Internet Project (I2P)
1"""UDP/Streamr tunnels — datagram pub/sub for streaming data over I2P.
2
3StreamrClientTask subscribes to an I2P source and relays datagrams locally.
4StreamrServerTask reads local UDP and broadcasts to I2P subscribers.
5
6Ported from net.i2p.i2ptunnel.streamr.StreamrConsumer/StreamrProducer.
7"""
8
9from __future__ import annotations
10
11import logging
12import time
13from dataclasses import dataclass, field
14
15from i2p_apps.i2ptunnel.config import TunnelDefinition
16from i2p_apps.i2ptunnel.tasks import TunnelTask
17
18logger = logging.getLogger(__name__)
19
20# Protocol bytes
21SUBSCRIBE_BYTE = b"\x00"
22UNSUBSCRIBE_BYTE = b"\x01"
23
24
25@dataclass
26class Subscriber:
27 """Tracks an I2P subscriber with expiry."""
28
29 destination: str
30 last_seen: float = field(default_factory=time.monotonic)
31
32 def is_expired(self, timeout: float = 60.0) -> bool:
33 return (time.monotonic() - self.last_seen) > timeout
34
35 def refresh(self) -> None:
36 self.last_seen = time.monotonic()
37
38
39class StreamrClientTask(TunnelTask):
40 """Subscribe to I2P source, relay datagrams to local UDP.
41
42 Sends periodic subscribe pings to keep subscription alive.
43 """
44
45 def __init__(self, config: TunnelDefinition, session) -> None:
46 super().__init__(config, session)
47 self._ping_interval = float(config.options.get("pingInterval", "10"))
48 self._source_dest = config.target_destination
49
50 async def open(self) -> None:
51 self._open = True
52 logger.info("Streamr client %r subscribing to %s",
53 self._config.name, self._source_dest)
54
55 async def close(self) -> None:
56 self._open = False
57
58
59class StreamrServerTask(TunnelTask):
60 """Read local UDP, broadcast to I2P subscribers.
61
62 Manages subscriber set with expiry and max limit.
63 """
64
65 def __init__(self, config: TunnelDefinition, session) -> None:
66 super().__init__(config, session)
67 self._max_subscribers = int(config.options.get("maxSubscribers", "10"))
68 self._subscriber_timeout = float(config.options.get("subscriberTimeout", "60"))
69 self._subscribers: dict[str, Subscriber] = {}
70
71 def _handle_subscribe(self, destination: str) -> None:
72 """Handle a subscribe request from a peer."""
73 if destination in self._subscribers:
74 self._subscribers[destination].refresh()
75 return
76
77 # Enforce max subscribers
78 if len(self._subscribers) >= self._max_subscribers:
79 # Try to evict expired ones first
80 self._cleanup_expired()
81 if len(self._subscribers) >= self._max_subscribers:
82 logger.warning("Max subscribers reached (%d), rejecting %s",
83 self._max_subscribers, destination[:20])
84 return
85
86 self._subscribers[destination] = Subscriber(destination=destination)
87
88 def _handle_unsubscribe(self, destination: str) -> None:
89 """Handle an unsubscribe request from a peer."""
90 self._subscribers.pop(destination, None)
91
92 def _cleanup_expired(self) -> None:
93 """Remove expired subscribers."""
94 expired = [
95 k for k, v in self._subscribers.items()
96 if v.is_expired(self._subscriber_timeout)
97 ]
98 for k in expired:
99 del self._subscribers[k]
100
101 async def open(self) -> None:
102 self._open = True
103 logger.info("Streamr server %r started on %s:%d",
104 self._config.name, self._config.target_host,
105 self._config.target_port)
106
107 async def close(self) -> None:
108 self._open = False
109 self._subscribers.clear()