A Python port of the Invisible Internet Project (I2P)
1"""SubscriptionManager — manages hostname subscription lists from trusted sources."""
2
3from __future__ import annotations
4
5from i2p_apps.addressbook.book import Addressbook
6
7
8class SubscriptionManager:
9 """Manages hostname subscription lists from trusted sources."""
10
11 def __init__(self) -> None:
12 self._urls: list[str] = []
13
14 def add_subscription(self, url: str) -> None:
15 """Add a subscription URL."""
16 if url not in self._urls:
17 self._urls.append(url)
18
19 def remove_subscription(self, url: str) -> bool:
20 """Remove a subscription URL. Returns True if existed."""
21 if url in self._urls:
22 self._urls.remove(url)
23 return True
24 return False
25
26 def get_subscriptions(self) -> list[str]:
27 """Return list of subscription URLs."""
28 return list(self._urls)
29
30 def parse_subscription_response(self, text: str) -> dict[str, str]:
31 """Parse subscription response.
32
33 Format: one entry per line, hostname=base64destination
34 Lines starting with # are comments. Empty lines skipped.
35 """
36 result: dict[str, str] = {}
37 for line in text.splitlines():
38 line = line.strip()
39 if not line or line.startswith("#"):
40 continue
41 if "=" not in line:
42 continue
43 hostname, destination = line.split("=", 1)
44 hostname = hostname.strip().lower()
45 destination = destination.strip()
46 result[hostname] = destination
47 return result
48
49 def merge_into_addressbook(
50 self,
51 book: Addressbook,
52 new_entries: dict[str, str],
53 overwrite: bool = False,
54 ) -> int:
55 """Merge subscription entries into addressbook.
56
57 Returns count of entries added or updated.
58 If overwrite=False, existing entries are not updated.
59 """
60 count = 0
61 for hostname, destination in new_entries.items():
62 if not overwrite and book.has_entry(hostname):
63 continue
64 book.add_entry(hostname, destination, source="subscription")
65 count += 1
66 return count