A Python port of the Invisible Internet Project (I2P)
1"""Tests for i2pcontrol JSON-RPC 2.0 management API."""
2import json
3
4import pytest
5
6from i2p_apps.i2pcontrol.server import (
7 JSONRPCRequest,
8 JSONRPCResponse,
9 parse_jsonrpc_request,
10 build_jsonrpc_response,
11 build_jsonrpc_error,
12 PARSE_ERROR,
13 INVALID_REQUEST,
14 METHOD_NOT_FOUND,
15 INTERNAL_ERROR,
16)
17from i2p_apps.i2pcontrol.methods import I2PControlMethods
18from i2p_apps.i2pcontrol.auth import TokenAuth
19
20
21# ---------------------------------------------------------------------------
22# JSON-RPC request parsing
23# ---------------------------------------------------------------------------
24
25class TestParseJSONRPCRequest:
26 def test_valid_request(self):
27 data = json.dumps({
28 "jsonrpc": "2.0",
29 "method": "router.info",
30 "params": {"key": "value"},
31 "id": 1,
32 })
33 req = parse_jsonrpc_request(data)
34 assert req.jsonrpc == "2.0"
35 assert req.method == "router.info"
36 assert req.params == {"key": "value"}
37 assert req.id == 1
38
39 def test_valid_request_no_params(self):
40 data = json.dumps({"jsonrpc": "2.0", "method": "router.info", "id": 1})
41 req = parse_jsonrpc_request(data)
42 assert req.params == {}
43
44 def test_invalid_json(self):
45 with pytest.raises(ValueError, match="Parse error"):
46 parse_jsonrpc_request("not json{{{")
47
48 def test_missing_method(self):
49 data = json.dumps({"jsonrpc": "2.0", "id": 1})
50 with pytest.raises(ValueError, match="method"):
51
52 parse_jsonrpc_request(data)
53
54 def test_missing_jsonrpc_field(self):
55 data = json.dumps({"method": "router.info", "id": 1})
56 with pytest.raises(ValueError, match="jsonrpc"):
57 parse_jsonrpc_request(data)
58
59
60# ---------------------------------------------------------------------------
61# JSON-RPC response building
62# ---------------------------------------------------------------------------
63
64class TestBuildJSONRPCResponse:
65 def test_success_response(self):
66 resp = build_jsonrpc_response(result={"status": "ok"}, request_id=1)
67 assert resp["jsonrpc"] == "2.0"
68 assert resp["result"] == {"status": "ok"}
69 assert resp["id"] == 1
70 assert "error" not in resp
71
72 def test_error_response(self):
73 resp = build_jsonrpc_response(
74 error={"code": -32600, "message": "Invalid Request"},
75 request_id=2,
76 )
77 assert resp["jsonrpc"] == "2.0"
78 assert resp["error"]["code"] == -32600
79 assert resp["error"]["message"] == "Invalid Request"
80 assert resp["id"] == 2
81 assert "result" not in resp
82
83 def test_build_jsonrpc_error_helper(self):
84 resp = build_jsonrpc_error(PARSE_ERROR, "Parse error", request_id=3)
85 assert resp["error"]["code"] == PARSE_ERROR
86 assert resp["error"]["message"] == "Parse error"
87 assert resp["id"] == 3
88
89 def test_response_dataclass_to_dict(self):
90 r = JSONRPCResponse(result={"a": 1}, id=5)
91 d = r.to_dict()
92 assert d["jsonrpc"] == "2.0"
93 assert d["result"] == {"a": 1}
94 assert d["id"] == 5
95 assert "error" not in d
96
97
98# ---------------------------------------------------------------------------
99# Method dispatch
100# ---------------------------------------------------------------------------
101
102class TestI2PControlMethods:
103 def test_router_info_returns_status_dict(self):
104 m = I2PControlMethods(router_context={"status": "testing", "uptime": 42})
105 result = m.router_info({})
106 assert result["status"] == "testing"
107 assert result["uptime"] == 42
108 assert "version" in result
109
110 def test_router_status_returns_state_string(self):
111 m = I2PControlMethods(router_context={"status": "graceful_shutdown"})
112 result = m.router_status({})
113 assert result["status"] == "graceful_shutdown"
114
115 def test_unknown_method_raises(self):
116 m = I2PControlMethods()
117 with pytest.raises(ValueError, match="Unknown method"):
118 m.dispatch("nonexistent.method", {})
119
120 def test_dispatch_converts_dots_to_underscores(self):
121 m = I2PControlMethods(router_context={"status": "running"})
122 result = m.dispatch("router.info", {})
123 assert "version" in result
124
125 def test_tunnel_list_returns_list(self):
126 tunnels = [{"id": "t1"}, {"id": "t2"}]
127 m = I2PControlMethods(router_context={"tunnels": tunnels})
128 result = m.tunnel_list({})
129 assert result["tunnels"] == tunnels
130
131 def test_peer_list_returns_list(self):
132 peers = [{"hash": "abc"}, {"hash": "def"}, {"hash": "ghi"}]
133 m = I2PControlMethods(router_context={"peers": peers})
134 result = m.peer_list({"limit": 2})
135 assert len(result["peers"]) == 2
136
137 def test_peer_list_default_limit(self):
138 m = I2PControlMethods(router_context={"peers": []})
139 result = m.peer_list({})
140 assert result["peers"] == []
141
142 def test_netdb_stats_returns_dict(self):
143 m = I2PControlMethods(router_context={
144 "total_routers": 1500,
145 "total_leasesets": 300,
146 })
147 result = m.netdb_stats({})
148 assert result["total_routers"] == 1500
149 assert result["total_leasesets"] == 300
150
151 def test_bw_status_returns_bandwidth_info(self):
152 m = I2PControlMethods(router_context={
153 "inbound_bps": 50000,
154 "outbound_bps": 30000,
155 })
156 result = m.bw_status({})
157 assert result["inbound_bps"] == 50000
158 assert result["outbound_bps"] == 30000
159
160 def test_config_get_returns_value(self):
161 m = I2PControlMethods()
162 m._config["i2p.bandwidth.inbound"] = "500"
163 result = m.config_get({"key": "i2p.bandwidth.inbound"})
164 assert result["key"] == "i2p.bandwidth.inbound"
165 assert result["value"] == "500"
166
167 def test_config_get_missing_key(self):
168 m = I2PControlMethods()
169 result = m.config_get({"key": "nonexistent"})
170 assert result["value"] is None
171
172 def test_config_set_updates_value(self):
173 m = I2PControlMethods()
174 result = m.config_set({"key": "i2p.bandwidth.inbound", "value": "1000"})
175 assert result["success"] is True
176 assert result["value"] == "1000"
177 # Verify it persists
178 assert m._config["i2p.bandwidth.inbound"] == "1000"
179
180
181# ---------------------------------------------------------------------------
182# Authentication
183# ---------------------------------------------------------------------------
184
185class TestTokenAuth:
186 def test_valid_token_accepted(self):
187 auth = TokenAuth()
188 token = auth.generate_token()
189 assert auth.validate_token(token) is True
190
191 def test_invalid_token_rejected(self):
192 auth = TokenAuth()
193 assert auth.validate_token("bogus-token") is False
194
195 def test_generate_new_token(self):
196 auth = TokenAuth()
197 t1 = auth.generate_token()
198 t2 = auth.generate_token()
199 assert t1 != t2
200 assert len(t1) == 32 # 16 bytes hex-encoded
201 assert auth.validate_token(t1) is True
202 assert auth.validate_token(t2) is True
203
204 def test_revoke_token(self):
205 auth = TokenAuth()
206 token = auth.generate_token()
207 assert auth.revoke_token(token) is True
208 assert auth.validate_token(token) is False
209
210 def test_revoke_nonexistent_token(self):
211 auth = TokenAuth()
212 assert auth.revoke_token("does-not-exist") is False