Commit e9064630 authored by Benjamin Tissoires's avatar Benjamin Tissoires

selftests/hid: add support for HID-BPF pre-loading before starting a test

few required changes:
- we need to count how many times a udev 'bind' event happens
- we need to tell `udev-hid-bpf` to not automatically attach the
  provided HID-BPF objects
- we need to manually attach the ones from the kernel tree, and wait
  for the second udev 'bind' event to happen

Link: https://lore.kernel.org/r/20240410-bpf_sources-v1-11-a8bf16033ef8@kernel.orgReviewed-by: default avatarPeter Hutterer <peter.hutterer@who-t.net>
Signed-off-by: default avatarBenjamin Tissoires <bentiss@kernel.org>
parent a7def2e5
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
import libevdev import libevdev
import os import os
import pytest import pytest
import subprocess
import time import time
import logging import logging
...@@ -157,6 +158,17 @@ class BaseTestCase: ...@@ -157,6 +158,17 @@ class BaseTestCase:
# for example ("playstation", "hid-playstation") # for example ("playstation", "hid-playstation")
kernel_modules: List[Tuple[str, str]] = [] kernel_modules: List[Tuple[str, str]] = []
# List of in kernel HID-BPF object files to load
# before starting the test
# Any existing pre-loaded HID-BPF module will be removed
# before the ones in this list will be manually loaded.
# Each Element is a tuple '(hid_bpf_object, rdesc_fixup_present)',
# for example '("xppen-ArtistPro16Gen2.bpf.o", True)'
# If 'rdesc_fixup_present' is True, the test needs to wait
# for one unbind and rebind before it can be sure the kernel is
# ready
hid_bpfs: List[Tuple[str, bool]] = []
def assertInputEventsIn(self, expected_events, effective_events): def assertInputEventsIn(self, expected_events, effective_events):
effective_events = effective_events.copy() effective_events = effective_events.copy()
for ev in expected_events: for ev in expected_events:
...@@ -211,8 +223,6 @@ class BaseTestCase: ...@@ -211,8 +223,6 @@ class BaseTestCase:
# we don't know beforehand the name of the module from modinfo # we don't know beforehand the name of the module from modinfo
sysfs_path = Path("/sys/module") / kernel_module.replace("-", "_") sysfs_path = Path("/sys/module") / kernel_module.replace("-", "_")
if not sysfs_path.exists(): if not sysfs_path.exists():
import subprocess
ret = subprocess.run(["/usr/sbin/modprobe", kernel_module]) ret = subprocess.run(["/usr/sbin/modprobe", kernel_module])
if ret.returncode != 0: if ret.returncode != 0:
pytest.skip( pytest.skip(
...@@ -225,6 +235,60 @@ class BaseTestCase: ...@@ -225,6 +235,60 @@ class BaseTestCase:
self._load_kernel_module(kernel_driver, kernel_module) self._load_kernel_module(kernel_driver, kernel_module)
yield yield
def load_hid_bpfs(self):
script_dir = Path(os.path.dirname(os.path.realpath(__file__)))
root_dir = (script_dir / "../../../../..").resolve()
bpf_dir = root_dir / "drivers/hid/bpf/progs"
wait = False
for _, rdesc_fixup in self.hid_bpfs:
if rdesc_fixup:
wait = True
for hid_bpf, _ in self.hid_bpfs:
# We need to start `udev-hid-bpf` in the background
# and dispatch uhid events in case the kernel needs
# to fetch features on the device
process = subprocess.Popen(
[
"udev-hid-bpf",
"--verbose",
"add",
str(self.uhdev.sys_path),
str(bpf_dir / hid_bpf),
],
)
while process.poll() is None:
self.uhdev.dispatch(1)
if process.poll() != 0:
pytest.fail(
f"Couldn't insert hid-bpf program '{hid_bpf}', marking the test as failed"
)
if wait:
# the HID-BPF program exports a rdesc fixup, so it needs to be
# unbound by the kernel and then rebound.
# Ensure we get the bound event exactly 2 times (one for the normal
# uhid loading, and then the reload from HID-BPF)
now = time.time()
while self.uhdev.kernel_ready_count < 2 and time.time() - now < 2:
self.uhdev.dispatch(1)
if self.uhdev.kernel_ready_count < 2:
pytest.fail(
f"Couldn't insert hid-bpf programs, marking the test as failed"
)
def unload_hid_bpfs(self):
ret = subprocess.run(
["udev-hid-bpf", "--verbose", "remove", str(self.uhdev.sys_path)],
)
if ret.returncode != 0:
pytest.fail(
f"Couldn't unload hid-bpf programs, marking the test as failed"
)
@pytest.fixture() @pytest.fixture()
def new_uhdev(self, load_kernel_module): def new_uhdev(self, load_kernel_module):
return self.create_device() return self.create_device()
...@@ -248,12 +312,18 @@ class BaseTestCase: ...@@ -248,12 +312,18 @@ class BaseTestCase:
now = time.time() now = time.time()
while not self.uhdev.is_ready() and time.time() - now < 5: while not self.uhdev.is_ready() and time.time() - now < 5:
self.uhdev.dispatch(1) self.uhdev.dispatch(1)
if self.hid_bpfs:
self.load_hid_bpfs()
if self.uhdev.get_evdev() is None: if self.uhdev.get_evdev() is None:
logger.warning( logger.warning(
f"available list of input nodes: (default application is '{self.uhdev.application}')" f"available list of input nodes: (default application is '{self.uhdev.application}')"
) )
logger.warning(self.uhdev.input_nodes) logger.warning(self.uhdev.input_nodes)
yield yield
if self.hid_bpfs:
self.unload_hid_bpfs()
self.uhdev = None self.uhdev = None
except PermissionError: except PermissionError:
pytest.skip("Insufficient permissions, run me as root") pytest.skip("Insufficient permissions, run me as root")
...@@ -313,8 +383,6 @@ class HIDTestUdevRule(object): ...@@ -313,8 +383,6 @@ class HIDTestUdevRule(object):
self.reload_udev_rules() self.reload_udev_rules()
def reload_udev_rules(self): def reload_udev_rules(self):
import subprocess
subprocess.run("udevadm control --reload-rules".split()) subprocess.run("udevadm control --reload-rules".split())
subprocess.run("systemd-hwdb update".split()) subprocess.run("systemd-hwdb update".split())
...@@ -330,10 +398,11 @@ class HIDTestUdevRule(object): ...@@ -330,10 +398,11 @@ class HIDTestUdevRule(object):
delete=False, delete=False,
) as f: ) as f:
f.write( f.write(
'KERNELS=="*input*", ATTRS{name}=="*uhid test *", ENV{LIBINPUT_IGNORE_DEVICE}="1"\n' """
) KERNELS=="*input*", ATTRS{name}=="*uhid test *", ENV{LIBINPUT_IGNORE_DEVICE}="1"
f.write( KERNELS=="*hid*", ENV{HID_NAME}=="*uhid test *", ENV{HID_BPF_IGNORE_DEVICE}="1"
'KERNELS=="*input*", ATTRS{name}=="*uhid test * System Multi Axis", ENV{ID_INPUT_TOUCHSCREEN}="", ENV{ID_INPUT_SYSTEM_MULTIAXIS}="1"\n' KERNELS=="*input*", ATTRS{name}=="*uhid test * System Multi Axis", ENV{ID_INPUT_TOUCHSCREEN}="", ENV{ID_INPUT_SYSTEM_MULTIAXIS}="1"
"""
) )
self.rulesfile = f self.rulesfile = f
......
...@@ -35,7 +35,7 @@ from hidtools.uhid import UHIDDevice ...@@ -35,7 +35,7 @@ from hidtools.uhid import UHIDDevice
from hidtools.util import BusType from hidtools.util import BusType
from pathlib import Path from pathlib import Path
from typing import Any, ClassVar, Dict, List, Optional, Type, Union from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union
logger = logging.getLogger("hidtools.device.base_device") logger = logging.getLogger("hidtools.device.base_device")
...@@ -126,7 +126,7 @@ class HIDIsReady(object): ...@@ -126,7 +126,7 @@ class HIDIsReady(object):
class UdevHIDIsReady(HIDIsReady): class UdevHIDIsReady(HIDIsReady):
_pyudev_context: ClassVar[Optional[pyudev.Context]] = None _pyudev_context: ClassVar[Optional[pyudev.Context]] = None
_pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None
_uhid_devices: ClassVar[Dict[int, bool]] = {} _uhid_devices: ClassVar[Dict[int, Tuple[bool, int]]] = {}
def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None: def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None:
super().__init__(uhid) super().__init__(uhid)
...@@ -150,20 +150,25 @@ class UdevHIDIsReady(HIDIsReady): ...@@ -150,20 +150,25 @@ class UdevHIDIsReady(HIDIsReady):
return return
event: pyudev.Device event: pyudev.Device
for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None): for event in iter(functools.partial(cls._pyudev_monitor.poll, 0.02), None):
if event.action not in ["bind", "remove"]: if event.action not in ["bind", "remove", "unbind"]:
return return
logger.debug(f"udev event: {event.action} -> {event}") logger.debug(f"udev event: {event.action} -> {event}")
id = int(event.sys_path.strip().split(".")[-1], 16) id = int(event.sys_path.strip().split(".")[-1], 16)
cls._uhid_devices[id] = event.action == "bind" device_ready, count = cls._uhid_devices.get(id, (False, 0))
def is_ready(self: "UdevHIDIsReady") -> bool: ready = event.action == "bind"
if not device_ready and ready:
count += 1
cls._uhid_devices[id] = (ready, count)
def is_ready(self: "UdevHIDIsReady") -> Tuple[bool, int]:
try: try:
return self._uhid_devices[self.uhid.hid_id] return self._uhid_devices[self.uhid.hid_id]
except KeyError: except KeyError:
return False return (False, 0)
class EvdevMatch(object): class EvdevMatch(object):
...@@ -317,7 +322,11 @@ class BaseDevice(UHIDDevice): ...@@ -317,7 +322,11 @@ class BaseDevice(UHIDDevice):
@property @property
def kernel_is_ready(self: "BaseDevice") -> bool: def kernel_is_ready(self: "BaseDevice") -> bool:
return self._kernel_is_ready.is_ready() and self.started return self._kernel_is_ready.is_ready()[0] and self.started
@property
def kernel_ready_count(self: "BaseDevice") -> int:
return self._kernel_is_ready.is_ready()[1]
@property @property
def input_nodes(self: "BaseDevice") -> List[EvdevDevice]: def input_nodes(self: "BaseDevice") -> List[EvdevDevice]:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment