"""Tests for i2pcontrol JSON-RPC 2.0 management API.""" import json import pytest from i2p_apps.i2pcontrol.server import ( JSONRPCRequest, JSONRPCResponse, parse_jsonrpc_request, build_jsonrpc_response, build_jsonrpc_error, PARSE_ERROR, INVALID_REQUEST, METHOD_NOT_FOUND, INTERNAL_ERROR, ) from i2p_apps.i2pcontrol.methods import I2PControlMethods from i2p_apps.i2pcontrol.auth import TokenAuth # --------------------------------------------------------------------------- # JSON-RPC request parsing # --------------------------------------------------------------------------- class TestParseJSONRPCRequest: def test_valid_request(self): data = json.dumps({ "jsonrpc": "2.0", "method": "router.info", "params": {"key": "value"}, "id": 1, }) req = parse_jsonrpc_request(data) assert req.jsonrpc == "2.0" assert req.method == "router.info" assert req.params == {"key": "value"} assert req.id == 1 def test_valid_request_no_params(self): data = json.dumps({"jsonrpc": "2.0", "method": "router.info", "id": 1}) req = parse_jsonrpc_request(data) assert req.params == {} def test_invalid_json(self): with pytest.raises(ValueError, match="Parse error"): parse_jsonrpc_request("not json{{{") def test_missing_method(self): data = json.dumps({"jsonrpc": "2.0", "id": 1}) with pytest.raises(ValueError, match="method"): parse_jsonrpc_request(data) def test_missing_jsonrpc_field(self): data = json.dumps({"method": "router.info", "id": 1}) with pytest.raises(ValueError, match="jsonrpc"): parse_jsonrpc_request(data) # --------------------------------------------------------------------------- # JSON-RPC response building # --------------------------------------------------------------------------- class TestBuildJSONRPCResponse: def test_success_response(self): resp = build_jsonrpc_response(result={"status": "ok"}, request_id=1) assert resp["jsonrpc"] == "2.0" assert resp["result"] == {"status": "ok"} assert resp["id"] == 1 assert "error" not in resp def test_error_response(self): resp = build_jsonrpc_response( error={"code": -32600, "message": "Invalid Request"}, request_id=2, ) assert resp["jsonrpc"] == "2.0" assert resp["error"]["code"] == -32600 assert resp["error"]["message"] == "Invalid Request" assert resp["id"] == 2 assert "result" not in resp def test_build_jsonrpc_error_helper(self): resp = build_jsonrpc_error(PARSE_ERROR, "Parse error", request_id=3) assert resp["error"]["code"] == PARSE_ERROR assert resp["error"]["message"] == "Parse error" assert resp["id"] == 3 def test_response_dataclass_to_dict(self): r = JSONRPCResponse(result={"a": 1}, id=5) d = r.to_dict() assert d["jsonrpc"] == "2.0" assert d["result"] == {"a": 1} assert d["id"] == 5 assert "error" not in d # --------------------------------------------------------------------------- # Method dispatch # --------------------------------------------------------------------------- class TestI2PControlMethods: def test_router_info_returns_status_dict(self): m = I2PControlMethods(router_context={"status": "testing", "uptime": 42}) result = m.router_info({}) assert result["status"] == "testing" assert result["uptime"] == 42 assert "version" in result def test_router_status_returns_state_string(self): m = I2PControlMethods(router_context={"status": "graceful_shutdown"}) result = m.router_status({}) assert result["status"] == "graceful_shutdown" def test_unknown_method_raises(self): m = I2PControlMethods() with pytest.raises(ValueError, match="Unknown method"): m.dispatch("nonexistent.method", {}) def test_dispatch_converts_dots_to_underscores(self): m = I2PControlMethods(router_context={"status": "running"}) result = m.dispatch("router.info", {}) assert "version" in result def test_tunnel_list_returns_list(self): tunnels = [{"id": "t1"}, {"id": "t2"}] m = I2PControlMethods(router_context={"tunnels": tunnels}) result = m.tunnel_list({}) assert result["tunnels"] == tunnels def test_peer_list_returns_list(self): peers = [{"hash": "abc"}, {"hash": "def"}, {"hash": "ghi"}] m = I2PControlMethods(router_context={"peers": peers}) result = m.peer_list({"limit": 2}) assert len(result["peers"]) == 2 def test_peer_list_default_limit(self): m = I2PControlMethods(router_context={"peers": []}) result = m.peer_list({}) assert result["peers"] == [] def test_netdb_stats_returns_dict(self): m = I2PControlMethods(router_context={ "total_routers": 1500, "total_leasesets": 300, }) result = m.netdb_stats({}) assert result["total_routers"] == 1500 assert result["total_leasesets"] == 300 def test_bw_status_returns_bandwidth_info(self): m = I2PControlMethods(router_context={ "inbound_bps": 50000, "outbound_bps": 30000, }) result = m.bw_status({}) assert result["inbound_bps"] == 50000 assert result["outbound_bps"] == 30000 def test_config_get_returns_value(self): m = I2PControlMethods() m._config["i2p.bandwidth.inbound"] = "500" result = m.config_get({"key": "i2p.bandwidth.inbound"}) assert result["key"] == "i2p.bandwidth.inbound" assert result["value"] == "500" def test_config_get_missing_key(self): m = I2PControlMethods() result = m.config_get({"key": "nonexistent"}) assert result["value"] is None def test_config_set_updates_value(self): m = I2PControlMethods() result = m.config_set({"key": "i2p.bandwidth.inbound", "value": "1000"}) assert result["success"] is True assert result["value"] == "1000" # Verify it persists assert m._config["i2p.bandwidth.inbound"] == "1000" # --------------------------------------------------------------------------- # Authentication # --------------------------------------------------------------------------- class TestTokenAuth: def test_valid_token_accepted(self): auth = TokenAuth() token = auth.generate_token() assert auth.validate_token(token) is True def test_invalid_token_rejected(self): auth = TokenAuth() assert auth.validate_token("bogus-token") is False def test_generate_new_token(self): auth = TokenAuth() t1 = auth.generate_token() t2 = auth.generate_token() assert t1 != t2 assert len(t1) == 32 # 16 bytes hex-encoded assert auth.validate_token(t1) is True assert auth.validate_token(t2) is True def test_revoke_token(self): auth = TokenAuth() token = auth.generate_token() assert auth.revoke_token(token) is True assert auth.validate_token(token) is False def test_revoke_nonexistent_token(self): auth = TokenAuth() assert auth.revoke_token("does-not-exist") is False