Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1#!/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3# -*- coding: utf-8 -*-
4#
5# Copyright (c) 2017 Benjamin Tissoires <benjamin.tissoires@gmail.com>
6# Copyright (c) 2017 Red Hat, Inc.
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation; either version 2 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program. If not, see <http://www.gnu.org/licenses/>.
20
21import dataclasses
22import fcntl
23import functools
24import libevdev
25import os
26import threading
27
28try:
29 import pyudev
30except ImportError:
31 raise ImportError("UHID is not supported due to missing pyudev dependency")
32
33import logging
34
35import hidtools.hid as hid
36from hidtools.uhid import UHIDDevice
37from hidtools.util import BusType
38
39from pathlib import Path
40from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union
41
42logger = logging.getLogger("hidtools.device.base_device")
43
44
45class SysfsFile(object):
46 def __init__(self, path):
47 self.path = path
48
49 def __set_value(self, value):
50 with open(self.path, "w") as f:
51 return f.write(f"{value}\n")
52
53 def __get_value(self):
54 with open(self.path) as f:
55 return f.read().strip()
56
57 @property
58 def int_value(self) -> int:
59 return int(self.__get_value())
60
61 @int_value.setter
62 def int_value(self, v: int) -> None:
63 self.__set_value(v)
64
65 @property
66 def str_value(self) -> str:
67 return self.__get_value()
68
69 @str_value.setter
70 def str_value(self, v: str) -> None:
71 self.__set_value(v)
72
73
74class LED(object):
75 def __init__(self, sys_path):
76 self.max_brightness = SysfsFile(sys_path / "max_brightness").int_value
77 self.__brightness = SysfsFile(sys_path / "brightness")
78
79 @property
80 def brightness(self) -> int:
81 return self.__brightness.int_value
82
83 @brightness.setter
84 def brightness(self, value: int) -> None:
85 self.__brightness.int_value = value
86
87
88class PowerSupply(object):
89 """Represents Linux power_supply_class sysfs nodes."""
90
91 def __init__(self, sys_path):
92 self._capacity = SysfsFile(sys_path / "capacity")
93 self._status = SysfsFile(sys_path / "status")
94 self._type = SysfsFile(sys_path / "type")
95
96 @property
97 def capacity(self) -> int:
98 return self._capacity.int_value
99
100 @property
101 def status(self) -> str:
102 return self._status.str_value
103
104 @property
105 def type(self) -> str:
106 return self._type.str_value
107
108
109@dataclasses.dataclass
110class HidReadiness:
111 is_ready: bool = False
112 count: int = 0
113
114
115class HIDIsReady(object):
116 """
117 Companion class that binds to a kernel mechanism
118 and that allows to know when a uhid device is ready or not.
119
120 See :meth:`is_ready` for details.
121 """
122
123 def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None:
124 self.uhid = uhid
125
126 def is_ready(self: "HIDIsReady") -> HidReadiness:
127 """
128 Overwrite in subclasses: should return True or False whether
129 the attached uhid device is ready or not.
130 """
131 return HidReadiness()
132
133
134class UdevHIDIsReady(HIDIsReady):
135 _pyudev_context: ClassVar[Optional[pyudev.Context]] = None
136 _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None
137 _uhid_devices: ClassVar[Dict[int, HidReadiness]] = {}
138
139 def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None:
140 super().__init__(uhid)
141 self._init_pyudev()
142
143 @classmethod
144 def _init_pyudev(cls: Type["UdevHIDIsReady"]) -> None:
145 if cls._pyudev_context is None:
146 cls._pyudev_context = pyudev.Context()
147 cls._pyudev_monitor = pyudev.Monitor.from_netlink(cls._pyudev_context)
148 cls._pyudev_monitor.filter_by("hid")
149 cls._pyudev_monitor.start()
150
151 UHIDDevice._append_fd_to_poll(
152 cls._pyudev_monitor.fileno(), cls._cls_udev_event_callback
153 )
154
155 @classmethod
156 def _cls_udev_event_callback(cls: Type["UdevHIDIsReady"]) -> None:
157 if cls._pyudev_monitor is None:
158 return
159 event: pyudev.Device
160 for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None):
161 if event.action not in ["bind", "remove", "unbind"]:
162 return
163
164 logger.debug(f"udev event: {event.action} -> {event}")
165
166 id = int(event.sys_path.strip().split(".")[-1], 16)
167
168 readiness = cls._uhid_devices.setdefault(id, HidReadiness())
169
170 ready = event.action == "bind"
171 if not readiness.is_ready and ready:
172 readiness.count += 1
173
174 readiness.is_ready = ready
175
176 def is_ready(self: "UdevHIDIsReady") -> HidReadiness:
177 try:
178 return self._uhid_devices[self.uhid.hid_id]
179 except KeyError:
180 return HidReadiness()
181
182
183class EvdevMatch(object):
184 def __init__(
185 self: "EvdevMatch",
186 *,
187 requires: List[Any] = [],
188 excludes: List[Any] = [],
189 req_properties: List[Any] = [],
190 excl_properties: List[Any] = [],
191 ) -> None:
192 self.requires = requires
193 self.excludes = excludes
194 self.req_properties = req_properties
195 self.excl_properties = excl_properties
196
197 def is_a_match(self: "EvdevMatch", evdev: libevdev.Device) -> bool:
198 for m in self.requires:
199 if not evdev.has(m):
200 return False
201 for m in self.excludes:
202 if evdev.has(m):
203 return False
204 for p in self.req_properties:
205 if not evdev.has_property(p):
206 return False
207 for p in self.excl_properties:
208 if evdev.has_property(p):
209 return False
210 return True
211
212
213class EvdevDevice(object):
214 """
215 Represents an Evdev node and its properties.
216 This is a stub for the libevdev devices, as they are relying on
217 uevent to get the data, saving us some ioctls to fetch the names
218 and properties.
219 """
220
221 def __init__(self: "EvdevDevice", sysfs: Path) -> None:
222 self.sysfs = sysfs
223 self.event_node: Any = None
224 self.libevdev: Optional[libevdev.Device] = None
225
226 self.uevents = {}
227 # all of the interesting properties are stored in the input uevent, so in the parent
228 # so convert the uevent file of the parent input node into a dict
229 with open(sysfs.parent / "uevent") as f:
230 for line in f.readlines():
231 key, value = line.strip().split("=")
232 self.uevents[key] = value.strip('"')
233
234 # we open all evdev nodes in order to not miss any event
235 self.open()
236
237 @property
238 def name(self: "EvdevDevice") -> str:
239 assert "NAME" in self.uevents
240
241 return self.uevents["NAME"]
242
243 @property
244 def evdev(self: "EvdevDevice") -> Path:
245 return Path("/dev/input") / self.sysfs.name
246
247 def matches_application(
248 self: "EvdevDevice", application: str, matches: Dict[str, EvdevMatch]
249 ) -> bool:
250 if self.libevdev is None:
251 return False
252
253 if application in matches:
254 return matches[application].is_a_match(self.libevdev)
255
256 logger.error(
257 f"application '{application}' is unknown, please update/fix hid-tools"
258 )
259 assert False # hid-tools likely needs an update
260
261 def open(self: "EvdevDevice") -> libevdev.Device:
262 self.event_node = open(self.evdev, "rb")
263 self.libevdev = libevdev.Device(self.event_node)
264
265 assert self.libevdev.fd is not None
266
267 fd = self.libevdev.fd.fileno()
268 flag = fcntl.fcntl(fd, fcntl.F_GETFD)
269 fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
270
271 return self.libevdev
272
273 def close(self: "EvdevDevice") -> None:
274 if self.libevdev is not None and self.libevdev.fd is not None:
275 self.libevdev.fd.close()
276 self.libevdev = None
277 if self.event_node is not None:
278 self.event_node.close()
279 self.event_node = None
280
281
282class BaseDevice(UHIDDevice):
283 # default _application_matches that matches nothing. This needs
284 # to be set in the subclasses to have get_evdev() working
285 _application_matches: Dict[str, EvdevMatch] = {}
286
287 def __init__(
288 self,
289 name,
290 application,
291 rdesc_str: Optional[str] = None,
292 rdesc: Optional[Union[hid.ReportDescriptor, str, bytes]] = None,
293 input_info=None,
294 ) -> None:
295 self._kernel_is_ready: HIDIsReady = UdevHIDIsReady(self)
296 if rdesc_str is None and rdesc is None:
297 raise Exception("Please provide at least a rdesc or rdesc_str")
298 super().__init__()
299 if name is None:
300 name = f"uhid gamepad test {self.__class__.__name__}"
301 if input_info is None:
302 input_info = (BusType.USB, 1, 2)
303 self.name = name
304 self.info = input_info
305 self.default_reportID = None
306 self.opened = False
307 self.started = False
308 self.application = application
309 self._input_nodes: Optional[list[EvdevDevice]] = None
310 if rdesc is None:
311 assert rdesc_str is not None
312 self.rdesc = hid.ReportDescriptor.from_human_descr(rdesc_str) # type: ignore
313 else:
314 self.rdesc = rdesc # type: ignore
315
316 @property
317 def power_supply_class(self: "BaseDevice") -> Optional[PowerSupply]:
318 ps = self.walk_sysfs("power_supply", "power_supply/*")
319 if ps is None or len(ps) < 1:
320 return None
321
322 return PowerSupply(ps[0])
323
324 @property
325 def led_classes(self: "BaseDevice") -> List[LED]:
326 leds = self.walk_sysfs("led", "**/max_brightness")
327 if leds is None:
328 return []
329
330 return [LED(led.parent) for led in leds]
331
332 @property
333 def kernel_is_ready(self: "BaseDevice") -> bool:
334 return self._kernel_is_ready.is_ready().is_ready and self.started
335
336 @property
337 def kernel_ready_count(self: "BaseDevice") -> int:
338 return self._kernel_is_ready.is_ready().count
339
340 @property
341 def input_nodes(self: "BaseDevice") -> List[EvdevDevice]:
342 if self._input_nodes is not None:
343 return self._input_nodes
344
345 if not self.kernel_is_ready or not self.started:
346 return []
347
348 # Starting with kernel v6.16, an event is emitted when
349 # userspace opens a kernel device, and for some devices
350 # this translates into a SET_REPORT.
351 # Because EvdevDevice(path) opens every single evdev node
352 # we need to have a separate thread to process the incoming
353 # SET_REPORT or we end up having to wait for the kernel
354 # timeout of 5 seconds.
355 done = False
356
357 def dispatch():
358 while not done:
359 self.dispatch(1)
360
361 t = threading.Thread(target=dispatch)
362 t.start()
363
364 self._input_nodes = [
365 EvdevDevice(path)
366 for path in self.walk_sysfs("input", "input/input*/event*")
367 ]
368 done = True
369 t.join()
370 return self._input_nodes
371
372 def match_evdev_rule(self, application, evdev):
373 """Replace this in subclasses if the device has multiple reports
374 of the same type and we need to filter based on the actual evdev
375 node.
376
377 returning True will append the corresponding report to
378 `self.input_nodes[type]`
379 returning False will ignore this report / type combination
380 for the device.
381 """
382 return True
383
384 def open(self):
385 self.opened = True
386
387 def _close_all_opened_evdev(self):
388 if self._input_nodes is not None:
389 for e in self._input_nodes:
390 e.close()
391
392 def __del__(self):
393 self._close_all_opened_evdev()
394
395 def close(self):
396 self.opened = False
397
398 def start(self, flags):
399 self.started = True
400
401 def stop(self):
402 self.started = False
403 self._close_all_opened_evdev()
404
405 def next_sync_events(self, application=None):
406 evdev = self.get_evdev(application)
407 if evdev is not None:
408 return list(evdev.events())
409 return []
410
411 @property
412 def application_matches(self: "BaseDevice") -> Dict[str, EvdevMatch]:
413 return self._application_matches
414
415 @application_matches.setter
416 def application_matches(self: "BaseDevice", data: Dict[str, EvdevMatch]) -> None:
417 self._application_matches = data
418
419 def get_evdev(self, application=None):
420 if application is None:
421 application = self.application
422
423 if len(self.input_nodes) == 0:
424 return None
425
426 assert self._input_nodes is not None
427
428 if len(self._input_nodes) == 1:
429 evdev = self._input_nodes[0]
430 if self.match_evdev_rule(application, evdev.libevdev):
431 return evdev.libevdev
432 else:
433 for _evdev in self._input_nodes:
434 if _evdev.matches_application(application, self.application_matches):
435 if self.match_evdev_rule(application, _evdev.libevdev):
436 return _evdev.libevdev
437
438 def is_ready(self):
439 """Returns whether a UHID device is ready. Can be overwritten in
440 subclasses to add extra conditions on when to consider a UHID
441 device ready. This can be:
442
443 - we need to wait on different types of input devices to be ready
444 (Touch Screen and Pen for example)
445 - we need to have at least 4 LEDs present
446 (len(self.uhdev.leds_classes) == 4)
447 - or any other combinations"""
448 return self.kernel_is_ready