Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2
3import errno
4import json as _json
5import random
6import re
7import socket
8import subprocess
9import time
10
11
12class CmdExitFailure(Exception):
13 pass
14
15
16class cmd:
17 def __init__(self, comm, shell=True, fail=True, ns=None, background=False, host=None, timeout=5):
18 if ns:
19 comm = f'ip netns exec {ns} ' + comm
20
21 self.stdout = None
22 self.stderr = None
23 self.ret = None
24
25 self.comm = comm
26 if host:
27 self.proc = host.cmd(comm)
28 else:
29 self.proc = subprocess.Popen(comm, shell=shell, stdout=subprocess.PIPE,
30 stderr=subprocess.PIPE)
31 if not background:
32 self.process(terminate=False, fail=fail, timeout=timeout)
33
34 def process(self, terminate=True, fail=None, timeout=5):
35 if fail is None:
36 fail = not terminate
37
38 if terminate:
39 self.proc.terminate()
40 stdout, stderr = self.proc.communicate(timeout)
41 self.stdout = stdout.decode("utf-8")
42 self.stderr = stderr.decode("utf-8")
43 self.proc.stdout.close()
44 self.proc.stderr.close()
45 self.ret = self.proc.returncode
46
47 if self.proc.returncode != 0 and fail:
48 if len(stderr) > 0 and stderr[-1] == "\n":
49 stderr = stderr[:-1]
50 raise CmdExitFailure("Command failed: %s\nSTDOUT: %s\nSTDERR: %s" %
51 (self.proc.args, stdout, stderr))
52
53
54class bkg(cmd):
55 def __init__(self, comm, shell=True, fail=None, ns=None, host=None,
56 exit_wait=False):
57 super().__init__(comm, background=True,
58 shell=shell, fail=fail, ns=ns, host=host)
59 self.terminate = not exit_wait
60 self.check_fail = fail
61
62 def __enter__(self):
63 return self
64
65 def __exit__(self, ex_type, ex_value, ex_tb):
66 return self.process(terminate=self.terminate, fail=self.check_fail)
67
68
69global_defer_queue = []
70
71
72class defer:
73 def __init__(self, func, *args, **kwargs):
74 global global_defer_queue
75
76 if not callable(func):
77 raise Exception("defer created with un-callable object, did you call the function instead of passing its name?")
78
79 self.func = func
80 self.args = args
81 self.kwargs = kwargs
82
83 self._queue = global_defer_queue
84 self._queue.append(self)
85
86 def __enter__(self):
87 return self
88
89 def __exit__(self, ex_type, ex_value, ex_tb):
90 return self.exec()
91
92 def exec_only(self):
93 self.func(*self.args, **self.kwargs)
94
95 def cancel(self):
96 self._queue.remove(self)
97
98 def exec(self):
99 self.cancel()
100 self.exec_only()
101
102
103def tool(name, args, json=None, ns=None, host=None):
104 cmd_str = name + ' '
105 if json:
106 cmd_str += '--json '
107 cmd_str += args
108 cmd_obj = cmd(cmd_str, ns=ns, host=host)
109 if json:
110 return _json.loads(cmd_obj.stdout)
111 return cmd_obj
112
113
114def ip(args, json=None, ns=None, host=None):
115 if ns:
116 args = f'-netns {ns} ' + args
117 return tool('ip', args, json=json, host=host)
118
119
120def ethtool(args, json=None, ns=None, host=None):
121 return tool('ethtool', args, json=json, ns=ns, host=host)
122
123
124def rand_port():
125 """
126 Get a random unprivileged port, try to make sure it's not already used.
127 """
128 for _ in range(1000):
129 port = random.randint(10000, 65535)
130 try:
131 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
132 s.bind(("", port))
133 return port
134 except OSError as e:
135 if e.errno != errno.EADDRINUSE:
136 raise
137 raise Exception("Can't find any free unprivileged port")
138
139
140def wait_port_listen(port, proto="tcp", ns=None, host=None, sleep=0.005, deadline=5):
141 end = time.monotonic() + deadline
142
143 pattern = f":{port:04X} .* "
144 if proto == "tcp": # for tcp protocol additionally check the socket state
145 pattern += "0A"
146 pattern = re.compile(pattern)
147
148 while True:
149 data = cmd(f'cat /proc/net/{proto}*', ns=ns, host=host, shell=True).stdout
150 for row in data.split("\n"):
151 if pattern.search(row):
152 return
153 if time.monotonic() > end:
154 raise Exception("Waiting for port listen timed out")
155 time.sleep(sleep)