A Python port of the Invisible Internet Project (I2P)
at main 66 lines 2.2 kB view raw
1"""SubscriptionManager — manages hostname subscription lists from trusted sources.""" 2 3from __future__ import annotations 4 5from i2p_apps.addressbook.book import Addressbook 6 7 8class SubscriptionManager: 9 """Manages hostname subscription lists from trusted sources.""" 10 11 def __init__(self) -> None: 12 self._urls: list[str] = [] 13 14 def add_subscription(self, url: str) -> None: 15 """Add a subscription URL.""" 16 if url not in self._urls: 17 self._urls.append(url) 18 19 def remove_subscription(self, url: str) -> bool: 20 """Remove a subscription URL. Returns True if existed.""" 21 if url in self._urls: 22 self._urls.remove(url) 23 return True 24 return False 25 26 def get_subscriptions(self) -> list[str]: 27 """Return list of subscription URLs.""" 28 return list(self._urls) 29 30 def parse_subscription_response(self, text: str) -> dict[str, str]: 31 """Parse subscription response. 32 33 Format: one entry per line, hostname=base64destination 34 Lines starting with # are comments. Empty lines skipped. 35 """ 36 result: dict[str, str] = {} 37 for line in text.splitlines(): 38 line = line.strip() 39 if not line or line.startswith("#"): 40 continue 41 if "=" not in line: 42 continue 43 hostname, destination = line.split("=", 1) 44 hostname = hostname.strip().lower() 45 destination = destination.strip() 46 result[hostname] = destination 47 return result 48 49 def merge_into_addressbook( 50 self, 51 book: Addressbook, 52 new_entries: dict[str, str], 53 overwrite: bool = False, 54 ) -> int: 55 """Merge subscription entries into addressbook. 56 57 Returns count of entries added or updated. 58 If overwrite=False, existing entries are not updated. 59 """ 60 count = 0 61 for hostname, destination in new_entries.items(): 62 if not overwrite and book.has_entry(hostname): 63 continue 64 book.add_entry(hostname, destination, source="subscription") 65 count += 1 66 return count