A Python port of the Invisible Internet Project (I2P)
at main 718 lines 26 kB view raw
1"""Router bootstrap orchestrator — startup sequence and main event loop. 2 3Orchestrates the full I2P router startup: 41. Load or generate router identity (Ed25519 + X25519 keys) 52. Start NTCP2 listener 63. Reseed from HTTPS servers if netdb is sparse 74. Parse RouterInfos, store in NetDB 85. Connect to initial peers 96. Exchange RouterInfo via DatabaseStore I2NP messages 107. Run the async main event loop 11 12Ported from net.i2p.router.Router bootstrap sequence. 13""" 14 15from __future__ import annotations 16 17import asyncio 18import hashlib 19import logging 20import os 21import random 22import resource 23import struct 24import time 25from dataclasses import dataclass 26from typing import TYPE_CHECKING 27 28if TYPE_CHECKING: 29 from i2p_transport.ntcp2_real_server import NTCP2RealListener 30 31from i2p_data.i2np_codec import ( 32 MSG_TYPE_DATABASE_STORE, 33 encode_i2np_short, 34) 35from i2p_data.router import RouterIdentity, RouterInfo 36from i2p_netdb.datastore import DataStore, EntryType, NetDBEntry 37from i2p_netdb.reseed import ReseedClient 38from i2p_router.config import RouterConfig 39from i2p_router.core import RouterContext 40from i2p_router.identity import ( 41 RouterKeyBundle, 42 build_router_identity, 43 build_router_info, 44 create_full_router_identity, 45) 46from i2p_router.peer_connector import ConnectionPool, PeerConnector, extract_ntcp2_address 47from i2p_router.health import start_health_server 48from i2p_router.maintenance import ( 49 keepalive_loop, 50 reconnect_loop, 51 routerinfo_broadcast_loop, 52 netdb_eviction_loop, 53) 54from i2p_netdb.sqlite_netdb import SqliteNetDB 55 56logger = logging.getLogger(__name__) 57 58# Minimum netdb entries before we skip reseed 59_MIN_NETDB_FOR_RESEED = 50 60 61# Target number of initial peer connections 62_INITIAL_PEER_COUNT = 5 63 64# Timeout for individual peer connection attempts (seconds) 65_PEER_CONNECT_TIMEOUT = 15 66 67 68def build_database_store_i2np(router_hash: bytes, ri_bytes: bytes) -> bytes: 69 """Build an I2NP DatabaseStore message with short header. 70 71 The DatabaseStore payload format: 72 - key (32 bytes): the router hash 73 - type (1 byte): 0 = RouterInfo 74 - reply_token (4 bytes): 0 (no reply requested) 75 - data: the RouterInfo bytes 76 77 Returns the full I2NP short-header message. 78 """ 79 # DatabaseStore payload: key(32) + type(1) + reply_token(4) + data 80 db_store_payload = router_hash + struct.pack("!BI", 0, 0) + ri_bytes 81 82 msg_id = int.from_bytes(os.urandom(4), "big") 83 expiration = int(time.time()) + 60 # 60s from now 84 85 return encode_i2np_short( 86 MSG_TYPE_DATABASE_STORE, msg_id, expiration, db_store_payload, 87 ) 88 89 90class MessageReader: 91 """Reads I2NP blocks from NTCP2 frames and dispatches to RouterContext. 92 93 Parses the I2NP short header (type, msg_id, expiration, size, payload) 94 and forwards to the RouterContext's inbound message handler. 95 """ 96 97 def __init__(self, context: RouterContext) -> None: 98 self._context = context 99 100 def handle_i2np_block(self, i2np_data: bytes, bootstrap: RouterBootstrap | None = None) -> None: 101 """Parse an I2NP short-header message and dispatch it. 102 103 Args: 104 i2np_data: Raw I2NP message with short header (11+ bytes). 105 bootstrap: Optional bootstrap instance to increment counters. 106 """ 107 if len(i2np_data) < 11: 108 logger.warning("I2NP block too short: %d bytes", len(i2np_data)) 109 return 110 111 msg_type, msg_id, expiration, size = struct.unpack_from("!BIiH", i2np_data) 112 payload = i2np_data[11:11 + size] 113 114 if bootstrap is not None: 115 bootstrap._i2np_messages_received += 1 116 117 logger.debug( 118 "I2NP message: type=%d id=%d size=%d", msg_type, msg_id, size, 119 ) 120 self._context.process_inbound(msg_type, payload) 121 122 123class RouterBootstrap: 124 """Orchestrates the full I2P router startup sequence. 125 126 Manages the lifecycle from key generation through peer discovery, 127 connection establishment, and the main event loop. 128 """ 129 130 def __init__(self, config: RouterConfig) -> None: 131 self._config = config 132 self._state = "stopped" 133 self._key_bundle: RouterKeyBundle | None = None 134 self._our_identity: RouterIdentity | None = None 135 self._our_router_info: RouterInfo | None = None 136 self._context: RouterContext | None = None 137 self._conn_pool = ConnectionPool(max_connections=50) 138 self._peer_connector: PeerConnector | None = None 139 self._reseed_client = ReseedClient( 140 reseed_urls=config.reseed_urls or None, 141 ) 142 self._listener: NTCP2RealListener | None = None 143 self._message_reader: MessageReader | None = None 144 self._main_loop_task: asyncio.Task | None = None 145 self._started_at: float | None = None 146 self._health_server: asyncio.Server | None = None 147 self._sqlite_netdb: SqliteNetDB | None = None 148 self._maintenance_tasks: list[asyncio.Task] = [] 149 # Counters for observability 150 self._peers_attempted = 0 151 self._peers_successful = 0 152 self._peers_failed = 0 153 self._i2np_messages_received = 0 154 self._i2np_messages_sent = 0 155 156 @property 157 def config(self) -> RouterConfig: 158 return self._config 159 160 @property 161 def state(self) -> str: 162 return self._state 163 164 @property 165 def peer_count(self) -> int: 166 return self._conn_pool.active_count 167 168 async def start(self) -> None: 169 """Run the full bootstrap sequence. 170 171 1. Load or generate identity 172 2. Start NTCP2 listener 173 3. Reseed if needed 174 4. Connect to initial peers 175 5. Exchange RouterInfo 176 6. Start main loop 177 """ 178 if self._state == "running": 179 return 180 181 self._state = "starting" 182 self._started_at = time.monotonic() 183 184 try: 185 self._state = "loading_identity" 186 await self._load_or_generate_identity() 187 188 self._state = "starting_listener" 189 await self._start_listener() 190 191 # Start health server EARLY so we have visibility during reseed 192 self._state = "starting_health" 193 await self._start_health_server() 194 195 self._state = "loading_netdb_cache" 196 self._load_netdb_from_disk() 197 198 self._state = "reseeding" 199 await self._reseed_if_needed() 200 self._persist_netdb_to_disk() 201 202 self._state = "connecting_peers" 203 await self._connect_initial_peers() 204 205 self._state = "exchanging_routerinfo" 206 await self._exchange_router_info() 207 208 self._start_maintenance_tasks() 209 self._state = "running" 210 logger.info("Router bootstrap complete, state=running") 211 except Exception: 212 logger.exception("Bootstrap failed in state=%s", self._state) 213 self._state = "failed" 214 raise 215 216 async def _load_or_generate_identity(self) -> None: 217 """Load existing keys or generate new ones. Creates RouterContext.""" 218 data_dir = os.path.expanduser(self._config.data_dir) 219 os.makedirs(data_dir, exist_ok=True) 220 221 key_path = os.path.join(data_dir, "router.keys.json") 222 bundle = RouterKeyBundle.load(key_path) 223 224 if bundle is None: 225 logger.info("No existing identity found, generating new keys") 226 bundle = RouterKeyBundle.generate() 227 bundle.save(key_path) 228 logger.info("New identity saved to %s", key_path) 229 else: 230 logger.info("Loaded existing identity from %s", key_path) 231 232 self._key_bundle = bundle 233 234 # Build RouterIdentity and signed RouterInfo 235 identity, ri = create_full_router_identity( 236 bundle, 237 self._config.listen_host, 238 self._config.listen_port, 239 ) 240 self._our_identity = identity 241 self._our_router_info = ri 242 243 # Create RouterContext with our identity hash 244 ri_hash = hashlib.sha256(identity.to_bytes()).digest() 245 self._context = RouterContext(router_hash=ri_hash) 246 247 # Create PeerConnector with our own RouterInfo for msg3 248 self._peer_connector = PeerConnector( 249 our_static_keypair=(bundle.ntcp2_private, bundle.ntcp2_public), 250 our_iv=bundle.ntcp2_iv, 251 our_ri_bytes=ri.to_bytes(), 252 ) 253 254 # Create MessageReader 255 self._message_reader = MessageReader(self._context) 256 257 async def _start_listener(self) -> None: 258 """Start the NTCP2 listener for inbound connections.""" 259 if self._key_bundle is None or self._our_router_info is None or self._our_identity is None: 260 raise RuntimeError("Identity not loaded") 261 262 from i2p_transport.ntcp2_real_server import NTCP2RealListener 263 264 # AES key for inbound obfuscation = SHA-256 of our RouterIdentity bytes 265 ri_hash = hashlib.sha256(self._our_identity.to_bytes()).digest() 266 267 self._listener = NTCP2RealListener( 268 host=self._config.listen_host, 269 port=self._config.listen_port, 270 our_static_key=(self._key_bundle.ntcp2_private, self._key_bundle.ntcp2_public), 271 our_ri_hash=ri_hash, 272 our_iv=self._key_bundle.ntcp2_iv, 273 on_connection=self._handle_inbound_connection, 274 ) 275 276 server = await self._listener.start() 277 actual_port = server.sockets[0].getsockname()[1] if server.sockets else 0 278 logger.info("NTCP2 listener started on %s:%d", self._config.listen_host, actual_port) 279 280 async def _handle_inbound_connection(self, conn) -> None: 281 """Handle a new inbound NTCP2 connection.""" 282 peer_hash = conn.remote_hash 283 if not self._conn_pool.add(peer_hash, conn): 284 logger.warning("Connection pool full, rejecting inbound peer") 285 await conn.close() 286 return 287 logger.info("Accepted inbound peer connection (hash=%s...)", peer_hash[:4].hex()) 288 # Start reading messages from this connection 289 asyncio.create_task(self._read_peer_messages(peer_hash, conn)) 290 291 async def _reseed_if_needed(self) -> int: 292 """Fetch RouterInfos from reseed servers if netdb is sparse. 293 294 Returns the number of new RouterInfos stored. 295 """ 296 if self._context is None: 297 return 0 298 299 current_count = self._context.datastore.count() 300 if current_count >= _MIN_NETDB_FOR_RESEED: 301 logger.info("NetDB has %d entries, skipping reseed", current_count) 302 return 0 303 304 logger.info("NetDB has %d entries (<%d), reseeding...", current_count, _MIN_NETDB_FOR_RESEED) 305 306 try: 307 ri_bytes_list = await self._reseed_client.reseed() 308 except Exception: 309 logger.exception("Reseed failed") 310 return 0 311 312 stored = 0 313 for ri_bytes in ri_bytes_list: 314 try: 315 ri = RouterInfo.from_bytes(ri_bytes) 316 ri_hash = hashlib.sha256(ri.identity.to_bytes()).digest() 317 entry = NetDBEntry( 318 key=ri_hash, 319 entry_type=EntryType.ROUTER_INFO, 320 data=ri_bytes, 321 received_ms=int(time.time() * 1000), 322 ) 323 self._context.datastore.put(entry) 324 stored += 1 325 except Exception: 326 logger.debug("Failed to parse RouterInfo from reseed", exc_info=True) 327 continue 328 329 logger.info("Stored %d/%d RouterInfos from reseed", stored, len(ri_bytes_list)) 330 return stored 331 332 async def _connect_initial_peers(self) -> int: 333 """Connect to peers from the netdb. 334 335 Returns the number of successful connections. 336 """ 337 if self._context is None or self._peer_connector is None: 338 return 0 339 340 # Get all known RouterInfos from datastore, shuffle for diversity 341 all_entries = self._context.datastore.get_all() 342 random.shuffle(all_entries) 343 connected = 0 344 attempts = 0 345 max_attempts = _INITIAL_PEER_COUNT * 6 # Try up to 30 peers 346 347 for entry in all_entries: 348 if connected >= _INITIAL_PEER_COUNT or attempts >= max_attempts: 349 break 350 351 if entry.entry_type != EntryType.ROUTER_INFO: 352 continue 353 354 try: 355 ri = RouterInfo.from_bytes(entry.data) 356 params = extract_ntcp2_address(ri) 357 if params is None: 358 continue 359 360 # Skip if already connected 361 peer_hash = entry.key 362 if self._conn_pool.is_connected(peer_hash): 363 continue 364 365 attempts += 1 366 self._peers_attempted += 1 367 host, port_num = params[0], params[1] 368 logger.info( 369 "Attempting peer %d: %s:%d (hash=%s...)", 370 attempts, host, port_num, peer_hash[:4].hex(), 371 ) 372 conn = await asyncio.wait_for( 373 self._peer_connector.connect(ri), 374 timeout=_PEER_CONNECT_TIMEOUT, 375 ) 376 if conn is not None: 377 self._conn_pool.add(peer_hash, conn) 378 connected += 1 379 self._peers_successful += 1 380 logger.info( 381 "Connected to peer %d/%d: %s:%d (hash=%s...)", 382 connected, _INITIAL_PEER_COUNT, host, port_num, peer_hash[:4].hex(), 383 ) 384 # Start reading messages 385 asyncio.create_task(self._read_peer_messages(peer_hash, conn)) 386 387 except asyncio.TimeoutError: 388 self._peers_failed += 1 389 logger.info( 390 "Peer connection timed out after %ds: %s...", 391 _PEER_CONNECT_TIMEOUT, entry.key[:4].hex(), 392 ) 393 continue 394 except Exception as e: 395 self._peers_failed += 1 396 logger.info( 397 "Failed to connect to peer %s...: %s", 398 entry.key[:4].hex(), e, 399 ) 400 logger.debug("Peer connect traceback", exc_info=True) 401 continue 402 403 logger.info( 404 "Initial peer connections: %d successful, %d failed, %d attempted", 405 connected, attempts - connected, attempts, 406 ) 407 return connected 408 409 async def _exchange_router_info(self) -> None: 410 """Send our RouterInfo to all connected peers via DatabaseStore.""" 411 if self._our_router_info is None or self._our_identity is None: 412 return 413 414 our_ri_bytes = self._our_router_info.to_bytes() 415 our_hash = hashlib.sha256(self._our_identity.to_bytes()).digest() 416 db_store_msg = build_database_store_i2np(our_hash, our_ri_bytes) 417 418 peer_hashes = self._conn_pool.get_all_peer_hashes() 419 for peer_hash in peer_hashes: 420 conn = self._conn_pool.get(peer_hash) 421 if conn is None: 422 continue 423 try: 424 await conn.send_i2np(db_store_msg) 425 self._i2np_messages_sent += 1 426 logger.debug("Sent DatabaseStore to peer %s...", peer_hash[:4].hex()) 427 except Exception as e: 428 logger.info("Failed to send DatabaseStore to peer %s...: %s", peer_hash[:4].hex(), e) 429 self._conn_pool.remove(peer_hash) 430 431 def _load_netdb_from_disk(self) -> None: 432 """Load RouterInfo entries from SQLite into the in-memory DataStore.""" 433 data_dir = os.path.expanduser(self._config.data_dir) 434 self._sqlite_netdb = SqliteNetDB(data_dir) 435 436 entries = self._sqlite_netdb.load_all() 437 if not entries: 438 logger.info("No cached NetDB entries on disk") 439 return 440 assert self._context is not None 441 442 stored = 0 443 for key, data in entries: 444 entry = NetDBEntry( 445 key=key, 446 entry_type=EntryType.ROUTER_INFO, 447 data=data, 448 received_ms=int(time.time() * 1000), 449 ) 450 self._context.datastore.put(entry) 451 stored += 1 452 453 logger.info("Loaded %d cached NetDB entries from disk", stored) 454 455 def _persist_netdb_to_disk(self) -> None: 456 """Persist current in-memory NetDB entries to SQLite.""" 457 if self._sqlite_netdb is None or self._context is None: 458 return 459 460 count = 0 461 for entry in self._context.datastore.get_all(): 462 if entry.entry_type == EntryType.ROUTER_INFO: 463 self._sqlite_netdb.store(entry.key, entry.data) 464 count += 1 465 466 logger.info("Persisted %d NetDB entries to disk", count) 467 468 async def _start_health_server(self) -> None: 469 """Start the HTTP health endpoint.""" 470 self._health_server = await start_health_server( 471 get_status_fn=self.get_status, 472 host=self._config.listen_host, 473 port=self._config.health_port, 474 ) 475 476 def _start_maintenance_tasks(self) -> None: 477 """Spawn background maintenance loops.""" 478 assert self._our_router_info is not None and self._our_identity is not None 479 assert self._context is not None 480 # Build the DatabaseStore message for broadcast 481 our_ri_bytes = self._our_router_info.to_bytes() 482 our_hash = hashlib.sha256(self._our_identity.to_bytes()).digest() 483 db_store_msg = build_database_store_i2np(our_hash, our_ri_bytes) 484 485 self._maintenance_tasks = [ 486 asyncio.create_task( 487 keepalive_loop(self._conn_pool, interval=60.0), 488 name="keepalive", 489 ), 490 asyncio.create_task( 491 reconnect_loop( 492 self._conn_pool, 493 self._context.datastore, 494 self._peer_connector, 495 target_peers=_INITIAL_PEER_COUNT, 496 interval=30.0, 497 ), 498 name="reconnect", 499 ), 500 asyncio.create_task( 501 routerinfo_broadcast_loop( 502 self._conn_pool, 503 db_store_msg, 504 interval=1800.0, 505 ), 506 name="broadcast", 507 ), 508 asyncio.create_task( 509 netdb_eviction_loop( 510 self._context.datastore, 511 sqlite_netdb=self._sqlite_netdb, 512 interval=600.0, 513 max_age_ms=86_400_000, 514 ), 515 name="eviction", 516 ), 517 ] 518 logger.info("Started %d maintenance tasks", len(self._maintenance_tasks)) 519 520 async def _read_peer_messages(self, peer_hash: bytes, conn) -> None: 521 """Continuously read frames from a peer connection. 522 523 Dispatches I2NP blocks to the MessageReader. 524 Removes the peer from the pool on disconnect. 525 """ 526 from i2p_transport.ntcp2_blocks import BLOCK_I2NP, BLOCK_TERMINATION 527 528 try: 529 while conn.is_alive(): 530 blocks = await conn.recv_frame() 531 for block in blocks: 532 if block.block_type == BLOCK_I2NP and self._message_reader: 533 self._message_reader.handle_i2np_block(block.data, bootstrap=self) 534 elif block.block_type == BLOCK_TERMINATION: 535 reason = block.data[-1] if block.data else -1 536 logger.info( 537 "Peer %s... sent termination (reason=%d, data=%s)", 538 peer_hash[:4].hex(), reason, block.data.hex(), 539 ) 540 return 541 except Exception as e: 542 logger.info("Peer %s... disconnected: %s", peer_hash[:4].hex(), e) 543 logger.debug("Peer %s... disconnect traceback", peer_hash[:4].hex(), exc_info=True) 544 finally: 545 self._conn_pool.remove(peer_hash) 546 547 async def run_main_loop(self, duration_seconds: float = 0) -> None: 548 """Run the main event loop. 549 550 If duration_seconds > 0, runs for that duration then stops. 551 If 0, runs until shutdown() is called. 552 """ 553 if self._state != "running": 554 await self.start() 555 556 logger.info("Main loop started") 557 558 try: 559 if duration_seconds > 0: 560 await asyncio.sleep(duration_seconds) 561 else: 562 # Run forever until cancelled 563 while self._state == "running": 564 await asyncio.sleep(1) 565 except asyncio.CancelledError: 566 logger.info("Main loop cancelled") 567 finally: 568 await self.shutdown() 569 570 async def shutdown(self) -> None: 571 """Gracefully shut down the router.""" 572 if self._state == "stopped": 573 return 574 575 logger.info("Shutting down router...") 576 self._state = "shutting_down" 577 578 # Cancel maintenance tasks 579 for task in self._maintenance_tasks: 580 task.cancel() 581 for task in self._maintenance_tasks: 582 try: 583 await task 584 except asyncio.CancelledError: 585 pass 586 self._maintenance_tasks.clear() 587 588 # Stop health server 589 if self._health_server is not None: 590 self._health_server.close() 591 await self._health_server.wait_closed() 592 self._health_server = None 593 594 # Close all peer connections 595 for peer_hash in list(self._conn_pool.get_all_peer_hashes()): 596 conn = self._conn_pool.get(peer_hash) 597 if conn is not None: 598 try: 599 await conn.close() 600 except Exception: 601 pass 602 self._conn_pool.remove(peer_hash) 603 604 # Stop listener 605 if self._listener is not None: 606 self._listener.close() 607 await self._listener.wait_closed() 608 self._listener = None 609 610 # Persist NetDB to disk before exit 611 self._persist_netdb_to_disk() 612 613 # Close SQLite 614 if self._sqlite_netdb is not None: 615 self._sqlite_netdb.close() 616 self._sqlite_netdb = None 617 618 self._state = "stopped" 619 logger.info("Router shut down") 620 621 def get_status(self) -> dict: 622 """Return current router status as a detailed dict. 623 624 Designed to be the single source of truth during long bootstrap 625 and sustained operation. Rich enough to diagnose issues from 626 a curl without needing container logs. 627 """ 628 uptime = 0.0 629 if self._started_at is not None: 630 uptime = time.monotonic() - self._started_at 631 632 router_hash = "" 633 if self._our_identity is not None: 634 router_hash = hashlib.sha256( 635 self._our_identity.to_bytes() 636 ).digest().hex() 637 638 netdb_size = 0 639 netdb_ri_count = 0 640 if self._context is not None: 641 netdb_size = self._context.datastore.count() 642 netdb_ri_count = self._context.datastore.count_by_type(EntryType.ROUTER_INFO) 643 644 sqlite_count = 0 645 if self._sqlite_netdb is not None: 646 try: 647 sqlite_count = self._sqlite_netdb.count() 648 except Exception: 649 pass 650 651 # Per-peer connection details 652 peers = [] 653 for peer_hash in self._conn_pool.get_all_peer_hashes(): 654 conn = self._conn_pool.get(peer_hash) 655 if conn is None: 656 continue 657 peer_info = { 658 "hash": peer_hash[:8].hex(), 659 "alive": conn.is_alive(), 660 } 661 if hasattr(conn, "seconds_since_last_read"): 662 peer_info["idle_read_s"] = round(conn.seconds_since_last_read(), 1) 663 peer_info["idle_write_s"] = round(conn.seconds_since_last_write(), 1) 664 if hasattr(conn, "_frames_sent"): 665 peer_info["frames_sent"] = conn._frames_sent 666 peer_info["frames_received"] = conn._frames_received 667 peers.append(peer_info) 668 669 # Maintenance task status 670 tasks_status = {} 671 for task in self._maintenance_tasks: 672 name = task.get_name() 673 if task.done(): 674 exc = task.exception() if not task.cancelled() else None 675 tasks_status[name] = f"stopped: {exc}" if exc else "stopped" 676 else: 677 tasks_status[name] = "running" 678 679 # Resource usage 680 rusage = resource.getrusage(resource.RUSAGE_SELF) 681 rss_mb = rusage.ru_maxrss / 1024 # Linux reports in KB 682 try: 683 import psutil 684 proc = psutil.Process() 685 cpu_percent = proc.cpu_percent(interval=0) 686 rss_mb = proc.memory_info().rss / (1024 * 1024) 687 except ImportError: 688 cpu_percent = None 689 690 return { 691 "state": self._state, 692 "uptime_seconds": round(uptime, 1), 693 "router_hash": router_hash[:16] + "..." if router_hash else "", 694 "peer_count": self._conn_pool.active_count, 695 "peers_attempted": self._peers_attempted, 696 "peers_successful": self._peers_successful, 697 "peers_failed": self._peers_failed, 698 "peers": peers, 699 "i2np_messages_received": self._i2np_messages_received, 700 "i2np_messages_sent": self._i2np_messages_sent, 701 "netdb_memory": netdb_size, 702 "netdb_routerinfos": netdb_ri_count, 703 "netdb_disk": sqlite_count, 704 "resources": { 705 "rss_mb": round(rss_mb, 1), 706 "cpu_percent": cpu_percent, 707 "max_rss_mb": round(rusage.ru_maxrss / 1024, 1), 708 "user_time_s": round(rusage.ru_utime, 2), 709 "system_time_s": round(rusage.ru_stime, 2), 710 }, 711 "maintenance_tasks": tasks_status, 712 "config": { 713 "listen_port": self._config.listen_port, 714 "health_port": self._config.health_port, 715 "data_dir": self._config.data_dir, 716 "bandwidth_limit_kbps": self._config.bandwidth_limit_kbps, 717 }, 718 }