Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import ksft_setup, wait_file
8from lib.py import cmd, ethtool, ip, CmdExitFailure
9from lib.py import NetNS, NetdevSimDev
10from .remote import Remote
11
12
13class NetDrvEnvBase:
14 """
15 Base class for a NIC / host environments
16
17 Attributes:
18 test_dir: Path to the source directory of the test
19 net_lib_dir: Path to the net/lib directory
20 """
21 def __init__(self, src_path):
22 self.src_path = Path(src_path)
23 self.test_dir = self.src_path.parent.resolve()
24 self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()
25
26 self.env = self._load_env_file()
27
28 # Following attrs must be set be inheriting classes
29 self.dev = None
30
31 def _load_env_file(self):
32 env = os.environ.copy()
33
34 src_dir = Path(self.src_path).parent.resolve()
35 if not (src_dir / "net.config").exists():
36 return ksft_setup(env)
37
38 with open((src_dir / "net.config").as_posix(), 'r') as fp:
39 for line in fp.readlines():
40 full_file = line
41 # Strip comments
42 pos = line.find("#")
43 if pos >= 0:
44 line = line[:pos]
45 line = line.strip()
46 if not line:
47 continue
48 pair = line.split('=', maxsplit=1)
49 if len(pair) != 2:
50 raise Exception("Can't parse configuration line:", full_file)
51 env[pair[0]] = pair[1]
52 return ksft_setup(env)
53
54 def __del__(self):
55 pass
56
57 def __enter__(self):
58 ip(f"link set dev {self.dev['ifname']} up")
59 wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier",
60 lambda x: x.strip() == "1")
61
62 return self
63
64 def __exit__(self, ex_type, ex_value, ex_tb):
65 """
66 __exit__ gets called at the end of a "with" block.
67 """
68 self.__del__()
69
70
71class NetDrvEnv(NetDrvEnvBase):
72 """
73 Class for a single NIC / host env, with no remote end
74 """
75 def __init__(self, src_path, nsim_test=None, **kwargs):
76 super().__init__(src_path)
77
78 self._ns = None
79
80 if 'NETIF' in self.env:
81 if nsim_test is True:
82 raise KsftXfailEx("Test only works on netdevsim")
83
84 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
85 else:
86 if nsim_test is False:
87 raise KsftXfailEx("Test does not work on netdevsim")
88
89 self._ns = NetdevSimDev(**kwargs)
90 self.dev = self._ns.nsims[0].dev
91 self.ifname = self.dev['ifname']
92 self.ifindex = self.dev['ifindex']
93
94 def __del__(self):
95 if self._ns:
96 self._ns.remove()
97 self._ns = None
98
99
100class NetDrvEpEnv(NetDrvEnvBase):
101 """
102 Class for an environment with a local device and "remote endpoint"
103 which can be used to send traffic in.
104
105 For local testing it creates two network namespaces and a pair
106 of netdevsim devices.
107 """
108
109 # Network prefixes used for local tests
110 nsim_v4_pfx = "192.0.2."
111 nsim_v6_pfx = "2001:db8::"
112
113 def __init__(self, src_path, nsim_test=None):
114 super().__init__(src_path)
115
116 self._stats_settle_time = None
117
118 # Things we try to destroy
119 self.remote = None
120 # These are for local testing state
121 self._netns = None
122 self._ns = None
123 self._ns_peer = None
124
125 self.addr_v = { "4": None, "6": None }
126 self.remote_addr_v = { "4": None, "6": None }
127
128 if "NETIF" in self.env:
129 if nsim_test is True:
130 raise KsftXfailEx("Test only works on netdevsim")
131 self._check_env()
132
133 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
134
135 self.addr_v["4"] = self.env.get("LOCAL_V4")
136 self.addr_v["6"] = self.env.get("LOCAL_V6")
137 self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
138 self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
139 kind = self.env["REMOTE_TYPE"]
140 args = self.env["REMOTE_ARGS"]
141 else:
142 if nsim_test is False:
143 raise KsftXfailEx("Test does not work on netdevsim")
144
145 self.create_local()
146
147 self.dev = self._ns.nsims[0].dev
148
149 self.addr_v["4"] = self.nsim_v4_pfx + "1"
150 self.addr_v["6"] = self.nsim_v6_pfx + "1"
151 self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
152 self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
153 kind = "netns"
154 args = self._netns.name
155
156 self.remote = Remote(kind, args, src_path)
157
158 self.addr_ipver = "6" if self.addr_v["6"] else "4"
159 self.addr = self.addr_v[self.addr_ipver]
160 self.remote_addr = self.remote_addr_v[self.addr_ipver]
161
162 # Bracketed addresses, some commands need IPv6 to be inside []
163 self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
164 self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
165
166 self.ifname = self.dev['ifname']
167 self.ifindex = self.dev['ifindex']
168
169 # resolve remote interface name
170 self.remote_ifname = self.resolve_remote_ifc()
171 self.remote_dev = ip("-d link show dev " + self.remote_ifname,
172 host=self.remote, json=True)[0]
173
174 self._required_cmd = {}
175
176 def create_local(self):
177 self._netns = NetNS()
178 self._ns = NetdevSimDev()
179 self._ns_peer = NetdevSimDev(ns=self._netns)
180
181 with open("/proc/self/ns/net") as nsfd0, \
182 open("/var/run/netns/" + self._netns.name) as nsfd1:
183 ifi0 = self._ns.nsims[0].ifindex
184 ifi1 = self._ns_peer.nsims[0].ifindex
185 NetdevSimDev.ctrl_write('link_device',
186 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
187
188 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
189 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
190 ip(f" link set dev {self._ns.nsims[0].ifname} up")
191
192 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
193 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
194 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
195
196 def _check_env(self):
197 vars_needed = [
198 ["LOCAL_V4", "LOCAL_V6"],
199 ["REMOTE_V4", "REMOTE_V6"],
200 ["REMOTE_TYPE"],
201 ["REMOTE_ARGS"]
202 ]
203 missing = []
204
205 for choice in vars_needed:
206 for entry in choice:
207 if entry in self.env:
208 break
209 else:
210 missing.append(choice)
211 # Make sure v4 / v6 configs are symmetric
212 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
213 missing.append(["LOCAL_V6", "REMOTE_V6"])
214 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
215 missing.append(["LOCAL_V4", "REMOTE_V4"])
216 if missing:
217 raise Exception("Invalid environment, missing configuration:", missing,
218 "Please see tools/testing/selftests/drivers/net/README.rst")
219
220 def resolve_remote_ifc(self):
221 v4 = v6 = None
222 if self.remote_addr_v["4"]:
223 v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
224 if self.remote_addr_v["6"]:
225 v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
226 if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
227 raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
228 if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
229 raise Exception("Can't resolve remote interface name, multiple interfaces match")
230 return v6[0]["ifname"] if v6 else v4[0]["ifname"]
231
232 def __del__(self):
233 if self._ns:
234 self._ns.remove()
235 self._ns = None
236 if self._ns_peer:
237 self._ns_peer.remove()
238 self._ns_peer = None
239 if self._netns:
240 del self._netns
241 self._netns = None
242 if self.remote:
243 del self.remote
244 self.remote = None
245
246 def require_ipver(self, ipver):
247 if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
248 raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")
249
250 def require_nsim(self):
251 if self._ns is None:
252 raise KsftXfailEx("Test only works on netdevsim")
253
254 def _require_cmd(self, comm, key, host=None):
255 cached = self._required_cmd.get(comm, {})
256 if cached.get(key) is None:
257 cached[key] = cmd("command -v -- " + comm, fail=False,
258 shell=True, host=host).ret == 0
259 self._required_cmd[comm] = cached
260 return cached[key]
261
262 def require_cmd(self, comm, local=True, remote=False):
263 if local:
264 if not self._require_cmd(comm, "local"):
265 raise KsftSkipEx("Test requires command: " + comm)
266 if remote:
267 if not self._require_cmd(comm, "remote", host=self.remote):
268 raise KsftSkipEx("Test requires (remote) command: " + comm)
269
270 def wait_hw_stats_settle(self):
271 """
272 Wait for HW stats to become consistent, some devices DMA HW stats
273 periodically so events won't be reflected until next sync.
274 Good drivers will tell us via ethtool what their sync period is.
275 """
276 if self._stats_settle_time is None:
277 data = {}
278 try:
279 data = ethtool("-c " + self.ifname, json=True)[0]
280 except CmdExitFailure as e:
281 if "Operation not supported" not in e.cmd.stderr:
282 raise
283
284 self._stats_settle_time = 0.025 + \
285 data.get('stats-block-usecs', 0) / 1000 / 1000
286
287 time.sleep(self._stats_settle_time)