Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import ksft_setup, wait_file
8from lib.py import cmd, ethtool, ip, CmdExitFailure
9from lib.py import NetNS, NetdevSimDev
10from .remote import Remote
11
12
13class NetDrvEnvBase:
14 """
15 Base class for a NIC / host environments
16
17 Attributes:
18 test_dir: Path to the source directory of the test
19 net_lib_dir: Path to the net/lib directory
20 """
21 def __init__(self, src_path):
22 self.src_path = Path(src_path)
23 self.test_dir = self.src_path.parent.resolve()
24 self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()
25
26 self.env = self._load_env_file()
27
28 # Following attrs must be set be inheriting classes
29 self.dev = None
30
31 def _load_env_file(self):
32 env = os.environ.copy()
33
34 src_dir = Path(self.src_path).parent.resolve()
35 if not (src_dir / "net.config").exists():
36 return ksft_setup(env)
37
38 with open((src_dir / "net.config").as_posix(), 'r') as fp:
39 for line in fp.readlines():
40 full_file = line
41 # Strip comments
42 pos = line.find("#")
43 if pos >= 0:
44 line = line[:pos]
45 line = line.strip()
46 if not line:
47 continue
48 pair = line.split('=', maxsplit=1)
49 if len(pair) != 2:
50 raise Exception("Can't parse configuration line:", full_file)
51 env[pair[0]] = pair[1]
52 return ksft_setup(env)
53
54 def __del__(self):
55 pass
56
57 def __enter__(self):
58 ip(f"link set dev {self.dev['ifname']} up")
59 wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier",
60 lambda x: x.strip() == "1")
61
62 return self
63
64 def __exit__(self, ex_type, ex_value, ex_tb):
65 """
66 __exit__ gets called at the end of a "with" block.
67 """
68 self.__del__()
69
70
71class NetDrvEnv(NetDrvEnvBase):
72 """
73 Class for a single NIC / host env, with no remote end
74 """
75 def __init__(self, src_path, nsim_test=None, **kwargs):
76 super().__init__(src_path)
77
78 self._ns = None
79
80 if 'NETIF' in self.env:
81 if nsim_test is True:
82 raise KsftXfailEx("Test only works on netdevsim")
83
84 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
85 else:
86 if nsim_test is False:
87 raise KsftXfailEx("Test does not work on netdevsim")
88
89 self._ns = NetdevSimDev(**kwargs)
90 self.dev = self._ns.nsims[0].dev
91 self.ifname = self.dev['ifname']
92 self.ifindex = self.dev['ifindex']
93
94 def __del__(self):
95 if self._ns:
96 self._ns.remove()
97 self._ns = None
98
99
100class NetDrvEpEnv(NetDrvEnvBase):
101 """
102 Class for an environment with a local device and "remote endpoint"
103 which can be used to send traffic in.
104
105 For local testing it creates two network namespaces and a pair
106 of netdevsim devices.
107 """
108
109 # Network prefixes used for local tests
110 nsim_v4_pfx = "192.0.2."
111 nsim_v6_pfx = "2001:db8::"
112
113 def __init__(self, src_path, nsim_test=None):
114 super().__init__(src_path)
115
116 self._stats_settle_time = None
117
118 # Things we try to destroy
119 self.remote = None
120 # These are for local testing state
121 self._netns = None
122 self._ns = None
123 self._ns_peer = None
124
125 self.addr_v = { "4": None, "6": None }
126 self.remote_addr_v = { "4": None, "6": None }
127
128 if "NETIF" in self.env:
129 if nsim_test is True:
130 raise KsftXfailEx("Test only works on netdevsim")
131 self._check_env()
132
133 self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
134
135 self.addr_v["4"] = self.env.get("LOCAL_V4")
136 self.addr_v["6"] = self.env.get("LOCAL_V6")
137 self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
138 self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
139 kind = self.env["REMOTE_TYPE"]
140 args = self.env["REMOTE_ARGS"]
141 else:
142 if nsim_test is False:
143 raise KsftXfailEx("Test does not work on netdevsim")
144
145 self.create_local()
146
147 self.dev = self._ns.nsims[0].dev
148
149 self.addr_v["4"] = self.nsim_v4_pfx + "1"
150 self.addr_v["6"] = self.nsim_v6_pfx + "1"
151 self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
152 self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
153 kind = "netns"
154 args = self._netns.name
155
156 self.remote = Remote(kind, args, src_path)
157
158 self.addr_ipver = "6" if self.addr_v["6"] else "4"
159 self.addr = self.addr_v[self.addr_ipver]
160 self.remote_addr = self.remote_addr_v[self.addr_ipver]
161
162 # Bracketed addresses, some commands need IPv6 to be inside []
163 self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
164 self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
165
166 self.ifname = self.dev['ifname']
167 self.ifindex = self.dev['ifindex']
168
169 # resolve remote interface name
170 self.remote_ifname = self.resolve_remote_ifc()
171
172 self._required_cmd = {}
173
174 def create_local(self):
175 self._netns = NetNS()
176 self._ns = NetdevSimDev()
177 self._ns_peer = NetdevSimDev(ns=self._netns)
178
179 with open("/proc/self/ns/net") as nsfd0, \
180 open("/var/run/netns/" + self._netns.name) as nsfd1:
181 ifi0 = self._ns.nsims[0].ifindex
182 ifi1 = self._ns_peer.nsims[0].ifindex
183 NetdevSimDev.ctrl_write('link_device',
184 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
185
186 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
187 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
188 ip(f" link set dev {self._ns.nsims[0].ifname} up")
189
190 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
191 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
192 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
193
194 def _check_env(self):
195 vars_needed = [
196 ["LOCAL_V4", "LOCAL_V6"],
197 ["REMOTE_V4", "REMOTE_V6"],
198 ["REMOTE_TYPE"],
199 ["REMOTE_ARGS"]
200 ]
201 missing = []
202
203 for choice in vars_needed:
204 for entry in choice:
205 if entry in self.env:
206 break
207 else:
208 missing.append(choice)
209 # Make sure v4 / v6 configs are symmetric
210 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
211 missing.append(["LOCAL_V6", "REMOTE_V6"])
212 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
213 missing.append(["LOCAL_V4", "REMOTE_V4"])
214 if missing:
215 raise Exception("Invalid environment, missing configuration:", missing,
216 "Please see tools/testing/selftests/drivers/net/README.rst")
217
218 def resolve_remote_ifc(self):
219 v4 = v6 = None
220 if self.remote_addr_v["4"]:
221 v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
222 if self.remote_addr_v["6"]:
223 v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
224 if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
225 raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
226 if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
227 raise Exception("Can't resolve remote interface name, multiple interfaces match")
228 return v6[0]["ifname"] if v6 else v4[0]["ifname"]
229
230 def __del__(self):
231 if self._ns:
232 self._ns.remove()
233 self._ns = None
234 if self._ns_peer:
235 self._ns_peer.remove()
236 self._ns_peer = None
237 if self._netns:
238 del self._netns
239 self._netns = None
240 if self.remote:
241 del self.remote
242 self.remote = None
243
244 def require_ipver(self, ipver):
245 if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
246 raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")
247
248 def require_nsim(self):
249 if self._ns is None:
250 raise KsftXfailEx("Test only works on netdevsim")
251
252 def _require_cmd(self, comm, key, host=None):
253 cached = self._required_cmd.get(comm, {})
254 if cached.get(key) is None:
255 cached[key] = cmd("command -v -- " + comm, fail=False,
256 shell=True, host=host).ret == 0
257 self._required_cmd[comm] = cached
258 return cached[key]
259
260 def require_cmd(self, comm, local=True, remote=False):
261 if local:
262 if not self._require_cmd(comm, "local"):
263 raise KsftSkipEx("Test requires command: " + comm)
264 if remote:
265 if not self._require_cmd(comm, "remote", host=self.remote):
266 raise KsftSkipEx("Test requires (remote) command: " + comm)
267
268 def wait_hw_stats_settle(self):
269 """
270 Wait for HW stats to become consistent, some devices DMA HW stats
271 periodically so events won't be reflected until next sync.
272 Good drivers will tell us via ethtool what their sync period is.
273 """
274 if self._stats_settle_time is None:
275 data = {}
276 try:
277 data = ethtool("-c " + self.ifname, json=True)[0]
278 except CmdExitFailure as e:
279 if "Operation not supported" not in e.cmd.stderr:
280 raise
281
282 self._stats_settle_time = 0.025 + \
283 data.get('stats-block-usecs', 0) / 1000 / 1000
284
285 time.sleep(self._stats_settle_time)