A Python port of the Invisible Internet Project (I2P)
at main 212 lines 7.5 kB view raw
1"""Tests for i2pcontrol JSON-RPC 2.0 management API.""" 2import json 3 4import pytest 5 6from i2p_apps.i2pcontrol.server import ( 7 JSONRPCRequest, 8 JSONRPCResponse, 9 parse_jsonrpc_request, 10 build_jsonrpc_response, 11 build_jsonrpc_error, 12 PARSE_ERROR, 13 INVALID_REQUEST, 14 METHOD_NOT_FOUND, 15 INTERNAL_ERROR, 16) 17from i2p_apps.i2pcontrol.methods import I2PControlMethods 18from i2p_apps.i2pcontrol.auth import TokenAuth 19 20 21# --------------------------------------------------------------------------- 22# JSON-RPC request parsing 23# --------------------------------------------------------------------------- 24 25class TestParseJSONRPCRequest: 26 def test_valid_request(self): 27 data = json.dumps({ 28 "jsonrpc": "2.0", 29 "method": "router.info", 30 "params": {"key": "value"}, 31 "id": 1, 32 }) 33 req = parse_jsonrpc_request(data) 34 assert req.jsonrpc == "2.0" 35 assert req.method == "router.info" 36 assert req.params == {"key": "value"} 37 assert req.id == 1 38 39 def test_valid_request_no_params(self): 40 data = json.dumps({"jsonrpc": "2.0", "method": "router.info", "id": 1}) 41 req = parse_jsonrpc_request(data) 42 assert req.params == {} 43 44 def test_invalid_json(self): 45 with pytest.raises(ValueError, match="Parse error"): 46 parse_jsonrpc_request("not json{{{") 47 48 def test_missing_method(self): 49 data = json.dumps({"jsonrpc": "2.0", "id": 1}) 50 with pytest.raises(ValueError, match="method"): 51 52 parse_jsonrpc_request(data) 53 54 def test_missing_jsonrpc_field(self): 55 data = json.dumps({"method": "router.info", "id": 1}) 56 with pytest.raises(ValueError, match="jsonrpc"): 57 parse_jsonrpc_request(data) 58 59 60# --------------------------------------------------------------------------- 61# JSON-RPC response building 62# --------------------------------------------------------------------------- 63 64class TestBuildJSONRPCResponse: 65 def test_success_response(self): 66 resp = build_jsonrpc_response(result={"status": "ok"}, request_id=1) 67 assert resp["jsonrpc"] == "2.0" 68 assert resp["result"] == {"status": "ok"} 69 assert resp["id"] == 1 70 assert "error" not in resp 71 72 def test_error_response(self): 73 resp = build_jsonrpc_response( 74 error={"code": -32600, "message": "Invalid Request"}, 75 request_id=2, 76 ) 77 assert resp["jsonrpc"] == "2.0" 78 assert resp["error"]["code"] == -32600 79 assert resp["error"]["message"] == "Invalid Request" 80 assert resp["id"] == 2 81 assert "result" not in resp 82 83 def test_build_jsonrpc_error_helper(self): 84 resp = build_jsonrpc_error(PARSE_ERROR, "Parse error", request_id=3) 85 assert resp["error"]["code"] == PARSE_ERROR 86 assert resp["error"]["message"] == "Parse error" 87 assert resp["id"] == 3 88 89 def test_response_dataclass_to_dict(self): 90 r = JSONRPCResponse(result={"a": 1}, id=5) 91 d = r.to_dict() 92 assert d["jsonrpc"] == "2.0" 93 assert d["result"] == {"a": 1} 94 assert d["id"] == 5 95 assert "error" not in d 96 97 98# --------------------------------------------------------------------------- 99# Method dispatch 100# --------------------------------------------------------------------------- 101 102class TestI2PControlMethods: 103 def test_router_info_returns_status_dict(self): 104 m = I2PControlMethods(router_context={"status": "testing", "uptime": 42}) 105 result = m.router_info({}) 106 assert result["status"] == "testing" 107 assert result["uptime"] == 42 108 assert "version" in result 109 110 def test_router_status_returns_state_string(self): 111 m = I2PControlMethods(router_context={"status": "graceful_shutdown"}) 112 result = m.router_status({}) 113 assert result["status"] == "graceful_shutdown" 114 115 def test_unknown_method_raises(self): 116 m = I2PControlMethods() 117 with pytest.raises(ValueError, match="Unknown method"): 118 m.dispatch("nonexistent.method", {}) 119 120 def test_dispatch_converts_dots_to_underscores(self): 121 m = I2PControlMethods(router_context={"status": "running"}) 122 result = m.dispatch("router.info", {}) 123 assert "version" in result 124 125 def test_tunnel_list_returns_list(self): 126 tunnels = [{"id": "t1"}, {"id": "t2"}] 127 m = I2PControlMethods(router_context={"tunnels": tunnels}) 128 result = m.tunnel_list({}) 129 assert result["tunnels"] == tunnels 130 131 def test_peer_list_returns_list(self): 132 peers = [{"hash": "abc"}, {"hash": "def"}, {"hash": "ghi"}] 133 m = I2PControlMethods(router_context={"peers": peers}) 134 result = m.peer_list({"limit": 2}) 135 assert len(result["peers"]) == 2 136 137 def test_peer_list_default_limit(self): 138 m = I2PControlMethods(router_context={"peers": []}) 139 result = m.peer_list({}) 140 assert result["peers"] == [] 141 142 def test_netdb_stats_returns_dict(self): 143 m = I2PControlMethods(router_context={ 144 "total_routers": 1500, 145 "total_leasesets": 300, 146 }) 147 result = m.netdb_stats({}) 148 assert result["total_routers"] == 1500 149 assert result["total_leasesets"] == 300 150 151 def test_bw_status_returns_bandwidth_info(self): 152 m = I2PControlMethods(router_context={ 153 "inbound_bps": 50000, 154 "outbound_bps": 30000, 155 }) 156 result = m.bw_status({}) 157 assert result["inbound_bps"] == 50000 158 assert result["outbound_bps"] == 30000 159 160 def test_config_get_returns_value(self): 161 m = I2PControlMethods() 162 m._config["i2p.bandwidth.inbound"] = "500" 163 result = m.config_get({"key": "i2p.bandwidth.inbound"}) 164 assert result["key"] == "i2p.bandwidth.inbound" 165 assert result["value"] == "500" 166 167 def test_config_get_missing_key(self): 168 m = I2PControlMethods() 169 result = m.config_get({"key": "nonexistent"}) 170 assert result["value"] is None 171 172 def test_config_set_updates_value(self): 173 m = I2PControlMethods() 174 result = m.config_set({"key": "i2p.bandwidth.inbound", "value": "1000"}) 175 assert result["success"] is True 176 assert result["value"] == "1000" 177 # Verify it persists 178 assert m._config["i2p.bandwidth.inbound"] == "1000" 179 180 181# --------------------------------------------------------------------------- 182# Authentication 183# --------------------------------------------------------------------------- 184 185class TestTokenAuth: 186 def test_valid_token_accepted(self): 187 auth = TokenAuth() 188 token = auth.generate_token() 189 assert auth.validate_token(token) is True 190 191 def test_invalid_token_rejected(self): 192 auth = TokenAuth() 193 assert auth.validate_token("bogus-token") is False 194 195 def test_generate_new_token(self): 196 auth = TokenAuth() 197 t1 = auth.generate_token() 198 t2 = auth.generate_token() 199 assert t1 != t2 200 assert len(t1) == 32 # 16 bytes hex-encoded 201 assert auth.validate_token(t1) is True 202 assert auth.validate_token(t2) is True 203 204 def test_revoke_token(self): 205 auth = TokenAuth() 206 token = auth.generate_token() 207 assert auth.revoke_token(token) is True 208 assert auth.validate_token(token) is False 209 210 def test_revoke_nonexistent_token(self): 211 auth = TokenAuth() 212 assert auth.revoke_token("does-not-exist") is False