Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import ksft_setup
8from lib.py import cmd, ethtool, ip
9from lib.py import NetNS, NetdevSimDev
10from .remote import Remote
11
12
13def _load_env_file(src_path):
14 env = os.environ.copy()
15
16 src_dir = Path(src_path).parent.resolve()
17 if not (src_dir / "net.config").exists():
18 return ksft_setup(env)
19
20 with open((src_dir / "net.config").as_posix(), 'r') as fp:
21 for line in fp.readlines():
22 full_file = line
23 # Strip comments
24 pos = line.find("#")
25 if pos >= 0:
26 line = line[:pos]
27 line = line.strip()
28 if not line:
29 continue
30 pair = line.split('=', maxsplit=1)
31 if len(pair) != 2:
32 raise Exception("Can't parse configuration line:", full_file)
33 env[pair[0]] = pair[1]
34 return ksft_setup(env)
35
36
37class NetDrvEnv:
38 """
39 Class for a single NIC / host env, with no remote end
40 """
41 def __init__(self, src_path, **kwargs):
42 self._ns = None
43
44 self.env = _load_env_file(src_path)
45
46 if 'NETIF' in self.env:
47 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
48 else:
49 self._ns = NetdevSimDev(**kwargs)
50 self.dev = self._ns.nsims[0].dev
51 self.ifindex = self.dev['ifindex']
52
53 def __enter__(self):
54 ip(f"link set dev {self.dev['ifname']} up")
55
56 return self
57
58 def __exit__(self, ex_type, ex_value, ex_tb):
59 """
60 __exit__ gets called at the end of a "with" block.
61 """
62 self.__del__()
63
64 def __del__(self):
65 if self._ns:
66 self._ns.remove()
67 self._ns = None
68
69
70class NetDrvEpEnv:
71 """
72 Class for an environment with a local device and "remote endpoint"
73 which can be used to send traffic in.
74
75 For local testing it creates two network namespaces and a pair
76 of netdevsim devices.
77 """
78
79 # Network prefixes used for local tests
80 nsim_v4_pfx = "192.0.2."
81 nsim_v6_pfx = "2001:db8::"
82
83 def __init__(self, src_path, nsim_test=None):
84
85 self.env = _load_env_file(src_path)
86
87 self._stats_settle_time = None
88
89 # Things we try to destroy
90 self.remote = None
91 # These are for local testing state
92 self._netns = None
93 self._ns = None
94 self._ns_peer = None
95
96 if "NETIF" in self.env:
97 if nsim_test is True:
98 raise KsftXfailEx("Test only works on netdevsim")
99 self._check_env()
100
101 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
102
103 self.v4 = self.env.get("LOCAL_V4")
104 self.v6 = self.env.get("LOCAL_V6")
105 self.remote_v4 = self.env.get("REMOTE_V4")
106 self.remote_v6 = self.env.get("REMOTE_V6")
107 kind = self.env["REMOTE_TYPE"]
108 args = self.env["REMOTE_ARGS"]
109 else:
110 if nsim_test is False:
111 raise KsftXfailEx("Test does not work on netdevsim")
112
113 self.create_local()
114
115 self.dev = self._ns.nsims[0].dev
116
117 self.v4 = self.nsim_v4_pfx + "1"
118 self.v6 = self.nsim_v6_pfx + "1"
119 self.remote_v4 = self.nsim_v4_pfx + "2"
120 self.remote_v6 = self.nsim_v6_pfx + "2"
121 kind = "netns"
122 args = self._netns.name
123
124 self.remote = Remote(kind, args, src_path)
125
126 self.addr = self.v6 if self.v6 else self.v4
127 self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
128
129 self.addr_ipver = "6" if self.v6 else "4"
130 # Bracketed addresses, some commands need IPv6 to be inside []
131 self.baddr = f"[{self.v6}]" if self.v6 else self.v4
132 self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
133
134 self.ifname = self.dev['ifname']
135 self.ifindex = self.dev['ifindex']
136
137 self._required_cmd = {}
138
139 def create_local(self):
140 self._netns = NetNS()
141 self._ns = NetdevSimDev()
142 self._ns_peer = NetdevSimDev(ns=self._netns)
143
144 with open("/proc/self/ns/net") as nsfd0, \
145 open("/var/run/netns/" + self._netns.name) as nsfd1:
146 ifi0 = self._ns.nsims[0].ifindex
147 ifi1 = self._ns_peer.nsims[0].ifindex
148 NetdevSimDev.ctrl_write('link_device',
149 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
150
151 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
152 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
153 ip(f" link set dev {self._ns.nsims[0].ifname} up")
154
155 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
156 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
157 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
158
159 def _check_env(self):
160 vars_needed = [
161 ["LOCAL_V4", "LOCAL_V6"],
162 ["REMOTE_V4", "REMOTE_V6"],
163 ["REMOTE_TYPE"],
164 ["REMOTE_ARGS"]
165 ]
166 missing = []
167
168 for choice in vars_needed:
169 for entry in choice:
170 if entry in self.env:
171 break
172 else:
173 missing.append(choice)
174 # Make sure v4 / v6 configs are symmetric
175 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
176 missing.append(["LOCAL_V6", "REMOTE_V6"])
177 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
178 missing.append(["LOCAL_V4", "REMOTE_V4"])
179 if missing:
180 raise Exception("Invalid environment, missing configuration:", missing,
181 "Please see tools/testing/selftests/drivers/net/README.rst")
182
183 def __enter__(self):
184 return self
185
186 def __exit__(self, ex_type, ex_value, ex_tb):
187 """
188 __exit__ gets called at the end of a "with" block.
189 """
190 self.__del__()
191
192 def __del__(self):
193 if self._ns:
194 self._ns.remove()
195 self._ns = None
196 if self._ns_peer:
197 self._ns_peer.remove()
198 self._ns_peer = None
199 if self._netns:
200 del self._netns
201 self._netns = None
202 if self.remote:
203 del self.remote
204 self.remote = None
205
206 def require_v4(self):
207 if not self.v4 or not self.remote_v4:
208 raise KsftSkipEx("Test requires IPv4 connectivity")
209
210 def require_v6(self):
211 if not self.v6 or not self.remote_v6:
212 raise KsftSkipEx("Test requires IPv6 connectivity")
213
214 def _require_cmd(self, comm, key, host=None):
215 cached = self._required_cmd.get(comm, {})
216 if cached.get(key) is None:
217 cached[key] = cmd("command -v -- " + comm, fail=False,
218 shell=True, host=host).ret == 0
219 self._required_cmd[comm] = cached
220 return cached[key]
221
222 def require_cmd(self, comm, local=True, remote=False):
223 if local:
224 if not self._require_cmd(comm, "local"):
225 raise KsftSkipEx("Test requires command: " + comm)
226 if remote:
227 if not self._require_cmd(comm, "remote"):
228 raise KsftSkipEx("Test requires (remote) command: " + comm)
229
230 def wait_hw_stats_settle(self):
231 """
232 Wait for HW stats to become consistent, some devices DMA HW stats
233 periodically so events won't be reflected until next sync.
234 Good drivers will tell us via ethtool what their sync period is.
235 """
236 if self._stats_settle_time is None:
237 data = ethtool("-c " + self.ifname, json=True)[0]
238
239 self._stats_settle_time = 0.025 + \
240 data.get('stats-block-usecs', 0) / 1000 / 1000
241
242 time.sleep(self._stats_settle_time)