Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2
3import os
4import time
5from pathlib import Path
6from lib.py import KsftSkipEx, KsftXfailEx
7from lib.py import cmd, ethtool, ip
8from lib.py import NetNS, NetdevSimDev
9from .remote import Remote
10
11
12def _load_env_file(src_path):
13 env = os.environ.copy()
14
15 src_dir = Path(src_path).parent.resolve()
16 if not (src_dir / "net.config").exists():
17 return env
18
19 with open((src_dir / "net.config").as_posix(), 'r') as fp:
20 for line in fp.readlines():
21 full_file = line
22 # Strip comments
23 pos = line.find("#")
24 if pos >= 0:
25 line = line[:pos]
26 line = line.strip()
27 if not line:
28 continue
29 pair = line.split('=', maxsplit=1)
30 if len(pair) != 2:
31 raise Exception("Can't parse configuration line:", full_file)
32 env[pair[0]] = pair[1]
33 return env
34
35
36class NetDrvEnv:
37 """
38 Class for a single NIC / host env, with no remote end
39 """
40 def __init__(self, src_path, **kwargs):
41 self._ns = None
42
43 self.env = _load_env_file(src_path)
44
45 if 'NETIF' in self.env:
46 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
47 else:
48 self._ns = NetdevSimDev(**kwargs)
49 self.dev = self._ns.nsims[0].dev
50 self.ifindex = self.dev['ifindex']
51
52 def __enter__(self):
53 ip(f"link set dev {self.dev['ifname']} up")
54
55 return self
56
57 def __exit__(self, ex_type, ex_value, ex_tb):
58 """
59 __exit__ gets called at the end of a "with" block.
60 """
61 self.__del__()
62
63 def __del__(self):
64 if self._ns:
65 self._ns.remove()
66 self._ns = None
67
68
69class NetDrvEpEnv:
70 """
71 Class for an environment with a local device and "remote endpoint"
72 which can be used to send traffic in.
73
74 For local testing it creates two network namespaces and a pair
75 of netdevsim devices.
76 """
77
78 # Network prefixes used for local tests
79 nsim_v4_pfx = "192.0.2."
80 nsim_v6_pfx = "2001:db8::"
81
82 def __init__(self, src_path, nsim_test=None):
83
84 self.env = _load_env_file(src_path)
85
86 self._stats_settle_time = None
87
88 # Things we try to destroy
89 self.remote = None
90 # These are for local testing state
91 self._netns = None
92 self._ns = None
93 self._ns_peer = None
94
95 if "NETIF" in self.env:
96 if nsim_test is True:
97 raise KsftXfailEx("Test only works on netdevsim")
98 self._check_env()
99
100 self.dev = ip("link show dev " + self.env['NETIF'], json=True)[0]
101
102 self.v4 = self.env.get("LOCAL_V4")
103 self.v6 = self.env.get("LOCAL_V6")
104 self.remote_v4 = self.env.get("REMOTE_V4")
105 self.remote_v6 = self.env.get("REMOTE_V6")
106 kind = self.env["REMOTE_TYPE"]
107 args = self.env["REMOTE_ARGS"]
108 else:
109 if nsim_test is False:
110 raise KsftXfailEx("Test does not work on netdevsim")
111
112 self.create_local()
113
114 self.dev = self._ns.nsims[0].dev
115
116 self.v4 = self.nsim_v4_pfx + "1"
117 self.v6 = self.nsim_v6_pfx + "1"
118 self.remote_v4 = self.nsim_v4_pfx + "2"
119 self.remote_v6 = self.nsim_v6_pfx + "2"
120 kind = "netns"
121 args = self._netns.name
122
123 self.remote = Remote(kind, args, src_path)
124
125 self.addr = self.v6 if self.v6 else self.v4
126 self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
127
128 self.addr_ipver = "6" if self.v6 else "4"
129 # Bracketed addresses, some commands need IPv6 to be inside []
130 self.baddr = f"[{self.v6}]" if self.v6 else self.v4
131 self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4
132
133 self.ifname = self.dev['ifname']
134 self.ifindex = self.dev['ifindex']
135
136 self._required_cmd = {}
137
138 def create_local(self):
139 self._netns = NetNS()
140 self._ns = NetdevSimDev()
141 self._ns_peer = NetdevSimDev(ns=self._netns)
142
143 with open("/proc/self/ns/net") as nsfd0, \
144 open("/var/run/netns/" + self._netns.name) as nsfd1:
145 ifi0 = self._ns.nsims[0].ifindex
146 ifi1 = self._ns_peer.nsims[0].ifindex
147 NetdevSimDev.ctrl_write('link_device',
148 f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
149
150 ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
151 ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
152 ip(f" link set dev {self._ns.nsims[0].ifname} up")
153
154 ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
155 ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
156 ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
157
158 def _check_env(self):
159 vars_needed = [
160 ["LOCAL_V4", "LOCAL_V6"],
161 ["REMOTE_V4", "REMOTE_V6"],
162 ["REMOTE_TYPE"],
163 ["REMOTE_ARGS"]
164 ]
165 missing = []
166
167 for choice in vars_needed:
168 for entry in choice:
169 if entry in self.env:
170 break
171 else:
172 missing.append(choice)
173 # Make sure v4 / v6 configs are symmetric
174 if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
175 missing.append(["LOCAL_V6", "REMOTE_V6"])
176 if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
177 missing.append(["LOCAL_V4", "REMOTE_V4"])
178 if missing:
179 raise Exception("Invalid environment, missing configuration:", missing,
180 "Please see tools/testing/selftests/drivers/net/README.rst")
181
182 def __enter__(self):
183 return self
184
185 def __exit__(self, ex_type, ex_value, ex_tb):
186 """
187 __exit__ gets called at the end of a "with" block.
188 """
189 self.__del__()
190
191 def __del__(self):
192 if self._ns:
193 self._ns.remove()
194 self._ns = None
195 if self._ns_peer:
196 self._ns_peer.remove()
197 self._ns_peer = None
198 if self._netns:
199 del self._netns
200 self._netns = None
201 if self.remote:
202 del self.remote
203 self.remote = None
204
205 def require_v4(self):
206 if not self.v4 or not self.remote_v4:
207 raise KsftSkipEx("Test requires IPv4 connectivity")
208
209 def require_v6(self):
210 if not self.v6 or not self.remote_v6:
211 raise KsftSkipEx("Test requires IPv6 connectivity")
212
213 def _require_cmd(self, comm, key, host=None):
214 cached = self._required_cmd.get(comm, {})
215 if cached.get(key) is None:
216 cached[key] = cmd("command -v -- " + comm, fail=False,
217 shell=True, host=host).ret == 0
218 self._required_cmd[comm] = cached
219 return cached[key]
220
221 def require_cmd(self, comm, local=True, remote=False):
222 if local:
223 if not self._require_cmd(comm, "local"):
224 raise KsftSkipEx("Test requires command: " + comm)
225 if remote:
226 if not self._require_cmd(comm, "remote"):
227 raise KsftSkipEx("Test requires (remote) command: " + comm)
228
229 def wait_hw_stats_settle(self):
230 """
231 Wait for HW stats to become consistent, some devices DMA HW stats
232 periodically so events won't be reflected until next sync.
233 Good drivers will tell us via ethtool what their sync period is.
234 """
235 if self._stats_settle_time is None:
236 data = ethtool("-c " + self.ifname, json=True)[0]
237
238 self._stats_settle_time = 0.025 + \
239 data.get('stats-block-usecs', 0) / 1000 / 1000
240
241 time.sleep(self._stats_settle_time)