diff --git a/README.md b/README.md
index 6d1bf82..2fb2600 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,8 @@
Human friendly interface to linux subsystems using python.
-Provides python access to several linux subsystems like V4L2, input and MIDI.
+Provides python access to several linux subsystems like V4L2, GPIO, Led, thermal,
+input and MIDI.
There is experimental, undocumented, incomplete and unstable access to USB.
diff --git a/docs/api/gpio.md b/docs/api/gpio.md
new file mode 100644
index 0000000..6a7caa0
--- /dev/null
+++ b/docs/api/gpio.md
@@ -0,0 +1,3 @@
+# ⚡ GPIO API
+
+::: linuxpy.gpio.device
diff --git a/docs/develop.md b/docs/develop.md
index f82beaa..9b74cc3 100644
--- a/docs/develop.md
+++ b/docs/develop.md
@@ -45,6 +45,7 @@ $ sudo addgroup video
$ sudo addgroup led
$ sudo adduser $USER input
$ sudo adduser $USER video
+$ sudo adduser $USER led
```
(reboot if necessary for those changes to take effect)
@@ -56,10 +57,18 @@ Create a new rules file (ex: `/etc/udev/rules.d/80-device.rules`):
```
KERNEL=="event[0-9]*", SUBSYSTEM=="input", GROUP="input", MODE:="0660"
KERNEL=="uinput", SUBSYSTEM=="misc", GROUP="input", MODE:="0660"
-SUBSYSTEM=="video4linux", MODE:="0666"
+SUBSYSTEM=="video4linux", GROUP="video", MODE:="0660"
KERNEL=="uleds", GROUP="input", MODE:="0660"
SUBSYSTEM=="leds", ACTION=="add", RUN+="/bin/chmod -R g=u,o=u /sys%p"
SUBSYSTEM=="leds", ACTION=="change", ENV{TRIGGER}!="none", RUN+="/bin/chmod -R g=u,o=u /sys%p"
+SUBSYSTEM=="gpio", GROUP="input", MODE:="0660"
+```
+
+Reload the rules:
+
+```console
+$ sudo udevadm control --reload-rules
+$ sudo udevadm trigger
```
Finally, make sure all kernel modules are installed:
diff --git a/docs/index.md b/docs/index.md
index 0335aab..3ad8409 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -13,11 +13,11 @@ hide:
Human friendly interface to linux subsystems using python.
-Provides python access to several linux subsystems like V4L2, input and MIDI.
+Provides python access to several linux subsystems like V4L2, input, GPIO and MIDI.
There is experimental, undocumented, incomplete and unstable access to USB.
-Need fine control over Webcams, MIDI devices, thermal sensors and cooling
+Need fine control over Webcams, GPIO, MIDI devices, thermal sensors and cooling
devices, joysticks, gamepads, keyboards, mice or even the keyboard light on
your laptop?
diff --git a/docs/user_guide/gpio.md b/docs/user_guide/gpio.md
new file mode 100644
index 0000000..cc9240a
--- /dev/null
+++ b/docs/user_guide/gpio.md
@@ -0,0 +1,14 @@
+# ⚡ GPIO
+
+Human friendly interface to linux GPIO handling.
+
+Without further ado:
+
+
+ python
+ from linuxpy.gpio import find
+ with find() as gpio:
+ with gpio[1, 2, 5:8] as lines:
+ print(lines[:])
+ {1: 0, 2: 1, 5: 0, 6: 1, 7:0}
+
diff --git a/linuxpy/codegen/base.py b/linuxpy/codegen/base.py
index bb146df..0707313 100644
--- a/linuxpy/codegen/base.py
+++ b/linuxpy/codegen/base.py
@@ -70,11 +70,11 @@ class {self.name}(enum.{self.klass}):
class CStruct:
- def __init__(self, node, node_members, pack=False):
+ def __init__(self, node, name, node_members, pack=False):
self.node = node
self.node_members = node_members
self.pack = pack
- self.name = node.get("name")
+ self.name = name
self.parent = None
self.fields = []
self.children = {}
@@ -215,7 +215,7 @@ def find_xml_base_type(etree, context, type_id):
type_id = node.get("type")
-def get_structs(header_filename, xml_filename):
+def get_structs(header_filename, xml_filename, decode_name):
etree = xml.etree.ElementTree.parse(xml_filename)
header_tag = etree.find(f"File[@name='{header_filename}']")
structs = {}
@@ -234,7 +234,10 @@ def get_structs(header_filename, xml_filename):
fields = (etree.find(f"*[@id='{member_id}']") for member_id in member_ids)
fields = [field for field in fields if field.tag not in {"Union", "Struct", "Unimplemented"}]
pack = int(node.get("align")) == 8
- struct = CStruct(node, fields, pack)
+ name = node.get("name")
+ if name:
+ name = decode_name(name)
+ struct = CStruct(node, name, fields, pack)
structs[struct.id] = struct
for struct in structs.values():
if struct.context_id != "_1":
@@ -278,7 +281,7 @@ def cname_to_pyname(
return "".join(map(str.capitalize, name.split(splitby)))
-def get_enums(header_filename, xml_filename, enums):
+def get_enums(header_filename, xml_filename, enums, decode_name):
etree = xml.etree.ElementTree.parse(xml_filename)
header_tag = etree.find(f"File[@name='{header_filename}']")
structs = {}
@@ -288,7 +291,9 @@ def get_enums(header_filename, xml_filename, enums):
nodes = etree.findall(f"Enumeration[@file='{header_id}']")
for node in nodes:
cname = node.get("name")
- py_name = cname_to_pyname(cname)
+ if not cname:
+ continue
+ py_name = cname_to_pyname(decode_name(cname))
prefix = cname.upper() + "_"
raw_names = [child.get("name") for child in node]
common_prefix = os.path.commonprefix(raw_names)
@@ -309,16 +314,28 @@ def get_enums(header_filename, xml_filename, enums):
def code_format(text, filename):
- cmd = ["ruff", "check", "--fix", "--stdin-filename", str(filename)]
- result = subprocess.run(cmd, capture_output=True, check=True, text=True, input=text)
- fixed_text = result.stdout
+ try:
+ cmd = ["ruff", "check", "--fix", "--stdin-filename", str(filename)]
+ result = subprocess.run(cmd, capture_output=True, check=True, text=True, input=text)
+ fixed_text = result.stdout
+ except Exception as error:
+ print(repr(error))
+ fixed_text = text
+
+ try:
+ cmd = ["ruff", "format", "--stdin-filename", str(filename)]
+ result = subprocess.run(cmd, capture_output=True, check=True, text=True, input=fixed_text)
+ fixed_text = result.stdout
+ except Exception as error:
+ print(repr(error))
+ return fixed_text
+
- cmd = ["ruff", "format", "--stdin-filename", str(filename)]
- result = subprocess.run(cmd, capture_output=True, check=True, text=True, input=fixed_text)
- return result.stdout
+def nullf(x):
+ return x
-def run(name, headers, template, macro_enums, output=None):
+def run(name, headers, template, macro_enums, output=None, decode_struct_name=nullf, decode_enum_name=nullf):
cache = {}
temp_dir = tempfile.mkdtemp()
logging.info("Starting %s...", name)
@@ -330,10 +347,10 @@ def run(name, headers, template, macro_enums, output=None):
xml_filename = os.path.join(temp_dir, base_header + ".xml")
cmd = f"castxml --castxml-output=1.0.0 -o {xml_filename} {header}"
assert os.system(cmd) == 0
- new_structs = get_structs(header, xml_filename)
+ new_structs = get_structs(header, xml_filename, decode_name=decode_struct_name)
structs += list(new_structs.values())
- get_enums(header, xml_filename, macro_enums)
+ get_enums(header, xml_filename, macro_enums, decode_name=decode_enum_name)
structs_definition = "\n\n".join(struct.class_text for struct in structs if struct.parent is None)
structs_fields = "\n".join(struct.fields_text for struct in structs if struct.parent is None)
diff --git a/linuxpy/codegen/cli.py b/linuxpy/codegen/cli.py
index ab2636b..6446820 100644
--- a/linuxpy/codegen/cli.py
+++ b/linuxpy/codegen/cli.py
@@ -1,5 +1,6 @@
import logging
+from .gpio import main as gpio_main
from .input import main as input_run
from .magic import main as magic_run
from .usbfs import main as usbfs_run
@@ -10,6 +11,7 @@
def main():
logging.basicConfig(level="INFO")
+ gpio_main()
magic_run()
input_run()
video_main()
diff --git a/linuxpy/codegen/gpio.py b/linuxpy/codegen/gpio.py
new file mode 100644
index 0000000..94f9b23
--- /dev/null
+++ b/linuxpy/codegen/gpio.py
@@ -0,0 +1,81 @@
+#
+# This file is part of the linuxpy project
+#
+# Copyright (c) 2024 Tiago Coutinho
+# Distributed under the GPLv3 license. See LICENSE for more info.
+
+import pathlib
+
+from linuxpy.codegen.base import CEnum, run
+
+HEADERS = [
+ "/usr/include/linux/gpio.h",
+]
+
+
+TEMPLATE = """\
+#
+# This file is part of the linuxpy project
+#
+# Copyright (c) 2024 Tiago Coutinho
+# Distributed under the GPLv3 license. See LICENSE for more info.
+
+# This file has been generated by {name}
+# Date: {date}
+# System: {system}
+# Release: {release}
+# Version: {version}
+
+import enum
+
+from linuxpy.ioctl import IOR as _IOR, IOW as _IOW, IOWR as _IOWR
+from linuxpy.ctypes import u8, u16, u32, cuint, cint, cchar, culonglong
+from linuxpy.ctypes import Struct, Union, POINTER, cvoidp
+
+
+class GpioLineEvent(enum.IntEnum):
+ REQUESTED = 1
+ RELEASED = 2
+ CONFIG = 3
+
+
+{enums_body}
+
+
+{structs_body}
+
+
+{iocs_body}"""
+
+
+class IOC(CEnum):
+ def __init__(self):
+ def filter(name, value):
+ return name.endswith("_IOCTL")
+
+ super().__init__("IOC", ["GPIO_GET_", "GPIO_V2_"], filter=filter)
+
+ def add_item(self, name, value):
+ name = name.removesuffix("_IOCTL")
+ return super().add_item(name, value)
+
+
+# macros from #define statements
+MACRO_ENUMS = [
+ IOC(),
+]
+
+
+this_dir = pathlib.Path(__file__).parent
+
+
+def decode_name(name: str) -> str:
+ return name.removeprefix("gpio_v2_").removeprefix("gpio_")
+
+
+def main(output=this_dir.parent / "gpio" / "raw.py"):
+ run(__name__, HEADERS, TEMPLATE, MACRO_ENUMS, output=output, decode_enum_name=decode_name)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/linuxpy/gpio/__init__.py b/linuxpy/gpio/__init__.py
new file mode 100644
index 0000000..24bcc0a
--- /dev/null
+++ b/linuxpy/gpio/__init__.py
@@ -0,0 +1,5 @@
+#
+# This file is part of the linuxpy project
+#
+# Copyright (c) 2024 Tiago Coutinho
+# Distributed under the GPLv3 license. See LICENSE for more info.
diff --git a/linuxpy/gpio/device.py b/linuxpy/gpio/device.py
new file mode 100644
index 0000000..5dc8a42
--- /dev/null
+++ b/linuxpy/gpio/device.py
@@ -0,0 +1,335 @@
+#
+# This file is part of the linuxpy project
+#
+# Copyright (c) 2024 Tiago Coutinho
+# Distributed under the GPLv3 license. See LICENSE for more info.
+
+"""
+Human friendly interface to linux GPIO subsystem.
+
+The heart of linuxpy GPIO library is the [`Device`][linuxpy.gpio.device.Device]
+class.
+The recommended way is to use one of the find methods to create a Device object
+and use it within a context manager like:
+
+```python
+from linuxpy.gpio.device import find, LineFlag
+
+with find() as gpio:
+ # request lines 5 and 6
+ lines = gpio[5, 6]
+ lines.flags = LineFlag.ACTIVE_LOW
+ with lines:
+ print(lines[:])
+
+```
+"""
+
+import collections
+import fcntl
+import functools
+import operator
+import os
+import pathlib
+import selectors
+
+from linuxpy.ctypes import sizeof
+from linuxpy.device import BaseDevice, ReentrantOpen, iter_device_files
+from linuxpy.gpio import raw
+from linuxpy.gpio.raw import IOC
+from linuxpy.ioctl import ioctl
+from linuxpy.types import Collection, Iterable, Optional, PathLike, Sequence, Union
+from linuxpy.util import astream, bit_indexes, chunks, make_find
+
+Info = collections.namedtuple("Info", "name label lines")
+ChipInfo = collections.namedtuple("ChipInfo", "name label lines")
+LineInfo = collections.namedtuple("LineInfo", "name consumer offset flags attributes")
+
+LineFlag = raw.LineFlag
+LineAttrId = raw.LineAttrId
+LineEventId = raw.LineEventId
+LineEvent = collections.namedtuple("LineEvent", "timestamp type line sequence line_sequence")
+
+
+class LineAttributes:
+ def __init__(self):
+ self.flags = LineFlag(0)
+ self.indexes = []
+ self.debounce_period = 0
+
+
+def get_chip_info(fd) -> ChipInfo:
+ info = raw.gpiochip_info()
+ ioctl(fd, IOC.CHIPINFO, info)
+ return ChipInfo(info.name.decode(), info.label.decode(), info.lines)
+
+
+def get_line_info(fd, line: int) -> LineInfo:
+ info = raw.gpio_v2_line_info(offset=line)
+ ioctl(fd, IOC.GET_LINEINFO, info)
+ attributes = LineAttributes()
+ for i in range(info.num_attrs):
+ attr = info.attrs[i]
+ if attr.id == LineAttrId.FLAGS:
+ attributes.flags = LineFlag(attr.flags)
+ elif attr.id == LineAttrId.OUTPUT_VALUES:
+ attributes.indexes = bit_indexes(attr.values)
+ elif attr.id == LineAttrId.DEBOUNCE:
+ attributes.debounce_period = attr.debounce_period_us / 1_000_000
+
+ return LineInfo(
+ info.name.decode(),
+ info.consumer.decode(),
+ info.offset,
+ LineFlag(info.flags),
+ attributes,
+ )
+
+
+def get_info(fd) -> Info:
+ chip = get_chip_info(fd)
+ return Info(chip.name, chip.label, [get_line_info(fd, line) for line in range(chip.lines)])
+
+
+def request_line(fd, consumer_name: str, lines: Sequence[int], flags: LineFlag, blocking=False):
+ num_lines = len(lines)
+ req = raw.gpio_v2_line_request()
+ req.consumer = consumer_name.encode()
+ req.num_lines = num_lines
+ req.offsets[:num_lines] = lines
+ req.config.flags = flags
+ req.config.num_attrs = 0 # for now only support generic config
+ ioctl(fd, IOC.GET_LINE, req)
+ if not blocking:
+ flag = fcntl.fcntl(req.fd, fcntl.F_GETFL)
+ fcntl.fcntl(req.fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
+ return req
+
+
+def get_values(req_fd, mask: int) -> raw.gpio_v2_line_values:
+ result = raw.gpio_v2_line_values(mask=mask)
+ return ioctl(req_fd, IOC.LINE_GET_VALUES, result)
+
+
+def set_values(req_fd, mask: int, bits: int) -> raw.gpio_v2_line_values:
+ result = raw.gpio_v2_line_values(mask=mask, bits=bits)
+ return ioctl(req_fd, IOC.LINE_SET_VALUES, result)
+
+
+def read_one_event(req_fd) -> LineEvent:
+ data = os.read(req_fd, sizeof(raw.gpio_v2_line_event))
+ event = raw.gpio_v2_line_event.from_buffer_copy(data)
+ return LineEvent(
+ event.timestamp_ns * 1e-9,
+ LineEventId(event.id),
+ event.offset,
+ event.seqno,
+ event.line_seqno,
+ )
+
+
+def event_selector(fds: Collection[int]) -> selectors.BaseSelector:
+ selector = selectors.DefaultSelector()
+ for fd in fds:
+ selector.register(fd, selectors.EVENT_READ)
+ return selector
+
+
+def fd_stream(fds: Collection[int], timeout: Optional[float] = None):
+ selector = event_selector(fds)
+ while True:
+ for key, _ in selector.select(timeout):
+ yield key.fileobj
+
+
+def event_stream(fds: Collection[int], timeout: Optional[float] = None):
+ for fd in fd_stream(fds, timeout):
+ yield read_one_event(fd)
+
+
+async def async_fd_stream(fds: Collection[int]):
+ selector = event_selector(fds)
+ stream = astream(selector.fileno(), selector.select)
+ try:
+ async for events in stream:
+ for key, _ in events:
+ yield key.fileobj
+ finally:
+ await stream.aclose()
+
+
+async def async_event_stream(fds: Collection[int]):
+ stream = async_fd_stream(fds)
+ try:
+ async for fd in stream:
+ yield read_one_event(fd)
+ finally:
+ await stream.aclose()
+
+
+def expand_from_list(key: Union[int, slice, tuple], minimum, maximum) -> list[int]:
+ """Used internally in __getitem__ to expand the given key"""
+ if isinstance(key, int):
+ return [key]
+ if isinstance(key, slice):
+ start = minimum if key.start is None else key.start
+ key = slice(start, key.stop, key.step)
+ return list(range(maximum)[key])
+ return [line for item in key for line in expand_from_list(item, minimum, maximum)]
+
+
+class _Request(ReentrantOpen):
+ """Raw line request. Not to be used directly"""
+
+ def __init__(
+ self, device, lines: Sequence[int], name: str = "", flags: LineFlag = LineFlag(0), blocking: bool = False
+ ):
+ assert len(lines) <= 64
+ self.device = device
+ self.name = name
+ self.flags = flags
+ self.lines = lines
+ self.blocking = blocking
+ self.indexes = {line: index for index, line in enumerate(lines)}
+ self.fd = None
+ super().__init__()
+
+ def fileno(self):
+ return self.fd
+
+ def close(self):
+ if self.fd is None:
+ return
+ os.close(self.fd)
+ self.fd = None
+
+ def open(self):
+ self.fd = request_line(self.device, self.name, self.lines, self.flags, self.blocking).fd
+
+ def get_values(self, lines: Collection[int]) -> dict[int, int]:
+ mask = functools.reduce(operator.or_, (1 << self.indexes[line] for line in lines), 0)
+ values = get_values(self, mask)
+ return {line: (values.bits >> self.indexes[line]) & 1 for line in lines}
+
+ def set_values(self, values: dict[int, Union[int, bool]]):
+ mask, bits = 0, 0
+ for line, value in values.items():
+ index = self.indexes[line]
+ mask |= 1 << index
+ if value:
+ bits |= 1 << index
+ return set_values(self, mask, bits)
+
+
+class Request(ReentrantOpen):
+ def __init__(
+ self, device, lines: Sequence[int], name: str = "", flags: LineFlag = LineFlag(0), blocking: bool = False
+ ):
+ self.lines = lines
+ self.line_requests: list[_Request] = []
+ self.line_map: dict[int, _Request] = {}
+ for chunk in chunks(lines, 64):
+ line_request = _Request(device, chunk, name, flags, blocking)
+ self.line_requests.append(line_request)
+ for line in chunk:
+ self.line_map[line] = line_request
+ super().__init__()
+
+ def __getitem__(self, key: Union[int, tuple, slice]) -> Union[int, dict]:
+ """Get values"""
+ if isinstance(key, int):
+ return self.get_values((key,))[key]
+ lines = expand_from_list(key, self.min_line, self.max_line + 1)
+ return self.get_values(lines)
+
+ def __setitem__(self, key: Union[int, tuple, slice], value: Union[int, Sequence[int]]):
+ if isinstance(key, int):
+ values = {key: value}
+ else:
+ lines = expand_from_list(key, self.min_line, self.max_line + 1)
+ values = dict(zip(lines, value))
+ self.set_values(values)
+
+ def __iter__(self):
+ return event_stream(self.filenos())
+
+ def __aiter__(self):
+ return async_event_stream(self.filenos())
+
+ @property
+ def min_line(self):
+ if not hasattr(self, "_min_line"):
+ self._min_line = min(self.lines)
+ return self._min_line
+
+ @property
+ def max_line(self):
+ if not hasattr(self, "_max_line"):
+ self._max_line = max(self.lines)
+ return self._max_line
+
+ def filenos(self):
+ return [request.fd for request in self.line_requests]
+
+ def close(self):
+ for line_request in self.line_requests:
+ line_request.close()
+
+ def open(self):
+ for line_request in self.line_requests:
+ line_request.open()
+
+ def get_values(self, lines: Optional[Sequence[int]] = None):
+ if lines is None:
+ lines = self.lines
+ request_lines = {}
+ for line in lines:
+ request_line = self.line_map[line]
+ request_lines.setdefault(request_line, []).append(line)
+ result = {}
+ for request_line, local_lines in request_lines.items():
+ result.update(request_line.get_values(local_lines))
+ return result
+
+ def set_values(self, values: dict[int, Union[int, bool]]):
+ request_lines = {}
+ for line, value in values.items():
+ request_line = self.line_map[line]
+ request_lines.setdefault(request_line, {})[line] = value
+ for request_line, local_lines in request_lines.items():
+ request_line.set_values(local_lines)
+
+
+class Device(BaseDevice):
+ PREFIX = "/dev/gpiochip"
+
+ def __len__(self) -> int:
+ if not hasattr(self, "_len"):
+ self._len = get_chip_info(self).lines
+ return self._len
+
+ def __getitem__(self, key: Union[int, tuple, slice]) -> Request:
+ """Request line(s)"""
+ lines = expand_from_list(key, 0, len(self))
+ return self.request(lines)
+
+ def get_info(self) -> Info:
+ return get_info(self)
+
+ def request(self, lines: Optional[Sequence[int]] = None, name: str = "", flags: LineFlag = LineFlag(0)) -> Request:
+ if lines is None:
+ lines = list(range(len(self)))
+ return Request(self, lines, name, flags)
+
+
+def iter_gpio_files(path: PathLike = "/dev") -> Iterable[pathlib.Path]:
+ """Returns an iterator over all GPIO chip files"""
+ return iter_device_files(path=path, pattern="gpio*")
+
+
+def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
+ """Returns an iterator over all GPIO chip devices"""
+ return (Device(name, **kwargs) for name in iter_gpio_files(path=path))
+
+
+find = make_find(iter_devices)
diff --git a/linuxpy/gpio/raw.py b/linuxpy/gpio/raw.py
new file mode 100644
index 0000000..8b39381
--- /dev/null
+++ b/linuxpy/gpio/raw.py
@@ -0,0 +1,238 @@
+#
+# This file is part of the linuxpy project
+#
+# Copyright (c) 2024 Tiago Coutinho
+# Distributed under the GPLv3 license. See LICENSE for more info.
+
+# This file has been generated by __main__
+# Date: 2024-09-14 07:38:13.225082
+# System: Linux
+# Release: 6.8.0-44-generic
+# Version: #44-Ubuntu SMP PREEMPT_DYNAMIC Tue Aug 13 13:35:26 UTC 2024
+
+import enum
+
+from linuxpy.ctypes import Struct, Union, cchar, cint, cuint, culonglong, u32
+from linuxpy.ioctl import IOR as _IOR, IOWR as _IOWR
+
+
+class GpioLineEvent(enum.IntEnum):
+ REQUESTED = 1
+ RELEASED = 2
+ CONFIG = 3
+
+
+class LineFlag(enum.IntFlag):
+ USED = 1
+ ACTIVE_LOW = 2
+ INPUT = 4
+ OUTPUT = 8
+ EDGE_RISING = 16
+ EDGE_FALLING = 32
+ OPEN_DRAIN = 64
+ OPEN_SOURCE = 128
+ BIAS_PULL_UP = 256
+ BIAS_PULL_DOWN = 512
+ BIAS_DISABLED = 1024
+ EVENT_CLOCK_REALTIME = 2048
+ EVENT_CLOCK_HTE = 4096
+
+
+class LineAttrId(enum.IntEnum):
+ FLAGS = 1
+ OUTPUT_VALUES = 2
+ DEBOUNCE = 3
+
+
+class LineChangedType(enum.IntEnum):
+ REQUESTED = 1
+ RELEASED = 2
+ CONFIG = 3
+
+
+class LineEventId(enum.IntEnum):
+ RISING_EDGE = 1
+ FALLING_EDGE = 2
+
+
+class gpiochip_info(Struct):
+ pass
+
+
+gpiochip_info._fields_ = [("name", cchar * 32), ("label", cchar * 32), ("lines", cuint)]
+
+
+class gpio_v2_line_values(Struct):
+ pass
+
+
+gpio_v2_line_values._fields_ = [("bits", culonglong), ("mask", culonglong)]
+
+
+class gpio_v2_line_attribute(Struct):
+ class M1(Union):
+ pass
+
+ M1._fields_ = [("flags", culonglong), ("values", culonglong), ("debounce_period_us", cuint)]
+
+ _anonymous_ = ("m1",)
+
+
+gpio_v2_line_attribute._fields_ = [("id", cuint), ("padding", cuint), ("m1", gpio_v2_line_attribute.M1)]
+
+
+class gpio_v2_line_config_attribute(Struct):
+ pass
+
+
+gpio_v2_line_config_attribute._fields_ = [("attr", gpio_v2_line_attribute), ("mask", culonglong)]
+
+
+class gpio_v2_line_config(Struct):
+ pass
+
+
+gpio_v2_line_config._fields_ = [
+ ("flags", culonglong),
+ ("num_attrs", cuint),
+ ("padding", cuint * 5),
+ ("attrs", gpio_v2_line_config_attribute * 10),
+]
+
+
+class gpio_v2_line_request(Struct):
+ pass
+
+
+gpio_v2_line_request._fields_ = [
+ ("offsets", cuint * 64),
+ ("consumer", cchar * 32),
+ ("config", gpio_v2_line_config),
+ ("num_lines", cuint),
+ ("event_buffer_size", cuint),
+ ("padding", cuint * 5),
+ ("fd", cint),
+]
+
+
+class gpio_v2_line_info(Struct):
+ pass
+
+
+gpio_v2_line_info._fields_ = [
+ ("name", cchar * 32),
+ ("consumer", cchar * 32),
+ ("offset", cuint),
+ ("num_attrs", cuint),
+ ("flags", culonglong),
+ ("attrs", gpio_v2_line_attribute * 10),
+ ("padding", cuint * 4),
+]
+
+
+class gpio_v2_line_info_changed(Struct):
+ pass
+
+
+gpio_v2_line_info_changed._fields_ = [
+ ("info", gpio_v2_line_info),
+ ("timestamp_ns", culonglong),
+ ("event_type", cuint),
+ ("padding", cuint * 5),
+]
+
+
+class gpio_v2_line_event(Struct):
+ pass
+
+
+gpio_v2_line_event._fields_ = [
+ ("timestamp_ns", culonglong),
+ ("id", cuint),
+ ("offset", cuint),
+ ("seqno", cuint),
+ ("line_seqno", cuint),
+ ("padding", cuint * 6),
+]
+
+
+class gpioline_info(Struct):
+ pass
+
+
+gpioline_info._fields_ = [("line_offset", cuint), ("flags", cuint), ("name", cchar * 32), ("consumer", cchar * 32)]
+
+
+class gpioline_info_changed(Struct):
+ pass
+
+
+gpioline_info_changed._fields_ = [
+ ("info", gpioline_info),
+ ("timestamp", culonglong),
+ ("event_type", cuint),
+ ("padding", cuint * 5),
+]
+
+
+class gpiohandle_request(Struct):
+ pass
+
+
+gpiohandle_request._fields_ = [
+ ("lineoffsets", cuint * 64),
+ ("flags", cuint),
+ ("default_values", cchar * 64),
+ ("consumer_label", cchar * 32),
+ ("lines", cuint),
+ ("fd", cint),
+]
+
+
+class gpiohandle_config(Struct):
+ pass
+
+
+gpiohandle_config._fields_ = [("flags", cuint), ("default_values", cchar * 64), ("padding", cuint * 4)]
+
+
+class gpiohandle_data(Struct):
+ _pack_ = True
+
+
+gpiohandle_data._fields_ = [("values", cchar * 64)]
+
+
+class gpioevent_request(Struct):
+ pass
+
+
+gpioevent_request._fields_ = [
+ ("lineoffset", cuint),
+ ("handleflags", cuint),
+ ("eventflags", cuint),
+ ("consumer_label", cchar * 32),
+ ("fd", cint),
+]
+
+
+class gpioevent_data(Struct):
+ pass
+
+
+gpioevent_data._fields_ = [("timestamp", culonglong), ("id", cuint)]
+
+
+class IOC(enum.IntEnum):
+ CHIPINFO = _IOR(0xB4, 0x01, gpiochip_info)
+ LINEINFO_UNWATCH = _IOWR(0xB4, 0x0C, u32)
+ GET_LINEINFO = _IOWR(0xB4, 0x05, gpio_v2_line_info)
+ GET_LINEINFO_WATCH = _IOWR(0xB4, 0x06, gpio_v2_line_info)
+ GET_LINE = _IOWR(0xB4, 0x07, gpio_v2_line_request)
+ LINE_SET_CONFIG = _IOWR(0xB4, 0x0D, gpio_v2_line_config)
+ LINE_GET_VALUES = _IOWR(0xB4, 0x0E, gpio_v2_line_values)
+ LINE_SET_VALUES = _IOWR(0xB4, 0x0F, gpio_v2_line_values)
+ LINEINFO = _IOWR(0xB4, 0x02, gpioline_info)
+ LINEHANDLE = _IOWR(0xB4, 0x03, gpiohandle_request)
+ LINEEVENT = _IOWR(0xB4, 0x04, gpioevent_request)
+ LINEINFO_WATCH = _IOWR(0xB4, 0x0B, gpioline_info)
diff --git a/linuxpy/util.py b/linuxpy/util.py
index 7cbad21..567b69e 100644
--- a/linuxpy/util.py
+++ b/linuxpy/util.py
@@ -22,7 +22,7 @@ def iter_chunks(lst: Sequence, size: int) -> Iterable:
return (lst[i : i + size] for i in range(0, len(lst), size))
-def chunks(lst: Sequence, size) -> tuple:
+def chunks(lst: Sequence, size: int) -> tuple:
"""
Batch data from the sequence into groups of length n.
The last batch may be shorter than size.
diff --git a/mkdocs.yml b/mkdocs.yml
index 370defb..21ae5fa 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -80,6 +80,7 @@ nav:
- Linuxpy: index.md
- User guide:
- user_guide/index.md
+ - user_guide/gpio.md
- user_guide/input.md
- user_guide/led.md
- user_guide/midi.md
@@ -87,6 +88,7 @@ nav:
- user_guide/video.md
- API Reference:
- api/index.md
+ - api/gpio.md
- api/input.md
- api/led.md
- api/midi.md
diff --git a/tests/test_gpio.py b/tests/test_gpio.py
new file mode 100644
index 0000000..ba2b6ad
--- /dev/null
+++ b/tests/test_gpio.py
@@ -0,0 +1,400 @@
+#
+# This file is part of the linuxpy project
+#
+# Copyright (c) 2024 Tiago Coutinho
+# Distributed under the GPLv3 license. See LICENSE for more info.
+
+import os
+import socket
+from contextlib import ExitStack, contextmanager
+from inspect import isgenerator
+from itertools import count
+from math import ceil, isclose
+from pathlib import Path
+from unittest import mock
+
+from ward import each, fixture, raises, test
+
+from linuxpy.device import device_number
+from linuxpy.gpio import raw
+from linuxpy.gpio.device import Device, LineEventId, Request, expand_from_list, iter_devices, iter_gpio_files
+from linuxpy.util import bit_indexes
+
+
+def FD():
+ return next(FD._FD)
+
+
+FD._FD = count(10_001, 2)
+
+
+class Hardware:
+ nb_lines = 100
+
+ def __init__(self, filename="/dev/gpiochip99"):
+ self.filename = filename
+ self.fd = None
+ self.fobj = None
+ self.requests = {}
+ self.name = b"my-little-GPIO"
+ self.label = self.name.replace(b"-", b" ")
+ self.lines = [line % 2 for line in range(self.nb_lines)]
+
+ def __enter__(self):
+ self.stack = ExitStack()
+ opener = mock.patch("linuxpy.io.open", self.open)
+ self.stack.enter_context(opener)
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.stack.close()
+
+ def open(self, filename, mode, buffering=-1, opener=None):
+ ioctl = mock.patch("linuxpy.ioctl.fcntl.ioctl", self.ioctl)
+ fcntl = mock.patch("linuxpy.ioctl.fcntl.fcntl", self.fcntl)
+ blocking = mock.patch("linuxpy.device.os.get_blocking", self.get_blocking)
+ close = mock.patch("linuxpy.gpio.device.os.close", self.close)
+ self.stack.enter_context(ioctl)
+ self.stack.enter_context(fcntl)
+ self.stack.enter_context(blocking)
+ self.stack.enter_context(close)
+ self.fd = FD()
+ self.fobj = mock.Mock()
+ self.fobj.fileno.return_value = self.fd
+ self.fobj.get_blocking.return_value = False
+ self.fobj.closed = False
+ return self.fobj
+
+ def get_blocking(self, fd):
+ assert self.fd == fd
+ return self.fobj.get_blocking()
+
+ @property
+ def closed(self):
+ return self.fd is not None
+
+ def select(self, readers, writers, other, timeout=None):
+ assert readers[0].fileno() == self.fd
+ return readers, writers, other
+
+ def ioctl(self, fd, ioc, arg): # noqa: C901
+ self.ioctl_ioc = ioc
+ self.ioctl_arg = arg
+ if not isinstance(fd, int):
+ fd = fd.fileno()
+ if fd == self.fd:
+ if ioc == raw.IOC.CHIPINFO:
+ arg.name = self.name
+ arg.label = self.label
+ arg.lines = self.nb_lines
+ elif ioc == raw.IOC.GET_LINE:
+ reader, writer = socket.socketpair()
+ arg.fd = reader.fileno()
+ self.requests[arg.fd] = arg, (reader, writer)
+ elif ioc == raw.IOC.GET_LINEINFO:
+ arg.name = f"Line{arg.offset}".encode()
+ arg.flags = raw.LineFlag.INPUT
+ if arg.offset == 1:
+ arg.consumer = b"another fellow"
+ arg.flags = raw.LineFlag.USED | raw.LineFlag.OUTPUT
+ arg.num_attrs = 3
+ arg.attrs[0] = raw.gpio_v2_line_attribute(id=raw.LineAttrId.FLAGS, flags=raw.LineFlag.ACTIVE_LOW)
+ arg.attrs[1] = raw.gpio_v2_line_attribute(id=raw.LineAttrId.OUTPUT_VALUES, values=0)
+ arg.attrs[2] = raw.gpio_v2_line_attribute(id=raw.LineAttrId.DEBOUNCE, debounce_period_us=99_123_456)
+
+ else:
+ request, (reader, writer) = self.requests[fd]
+ if ioc == raw.IOC.LINE_GET_VALUES:
+ bits = 0
+ for index in bit_indexes(arg.mask):
+ line = request.offsets[index]
+ value = self.lines[line]
+ bits |= value << index
+ arg.bits = bits
+ elif ioc == raw.IOC.LINE_SET_VALUES:
+ for index in bit_indexes(arg.mask):
+ line = request.offsets[index]
+ value = (arg.bits >> index) & 1
+ self.lines[line] = value
+
+ def close(self, fd):
+ del self.requests[fd]
+
+ def fcntl(self, fd, cmd, arg=0):
+ assert fd in self.requests
+ return 0
+
+ def trigger_event(
+ self,
+ fd,
+ line: int,
+ event_type: LineEventId = LineEventId.RISING_EDGE,
+ seqno: int = 2,
+ line_seqno: int = 1,
+ timestamp_ns: int = 1_000_000,
+ ):
+ if not isinstance(fd, int):
+ fd = fd.fileno()
+ _, (reader, writer) = self.requests[fd]
+ event = raw.gpio_v2_line_event(
+ timestamp_ns=timestamp_ns, id=event_type, offset=line, seqno=seqno, line_seqno=line_seqno
+ )
+ writer.sendall(bytes(event))
+
+
+@contextmanager
+def gpio_files(paths=("/dev/gpiochip99")):
+ with mock.patch("linuxpy.device.pathlib.Path.glob") as glob:
+ expected_files = list(paths)
+ glob.return_value = expected_files
+ with mock.patch("linuxpy.device.pathlib.Path.is_char_device") as is_char_device:
+ is_char_device.return_value = True
+ with mock.patch("linuxpy.device.os.access") as access:
+ access.return_value = os.R_OK | os.W_OK
+ yield paths
+
+
+@fixture
+def emulate_gpiochip():
+ with Hardware() as hardware:
+ yield hardware
+
+
+@test("expand list")
+def _():
+ assert [5] == expand_from_list(5, None, None)
+ assert [5, 10] == expand_from_list((5, 10), None, None)
+ assert list(range(100)) == expand_from_list(slice(100), 0, 100)
+ assert list(range(10)) == expand_from_list(slice(10), 0, 100)
+
+
+@test("device number")
+def _(
+ filename=each("/dev/gpiochip0", "/dev/gpiochip1", "/dev/gpiochip99"),
+ expected=each(0, 1, 99),
+):
+ assert device_number(filename) == expected
+
+
+@test("gpio files")
+def _():
+ with gpio_files(["/dev/gpiochip33", "/dev/gpiochip55"]) as expected_files:
+ assert list(iter_gpio_files()) == expected_files
+
+
+@test("device list")
+def _():
+ assert isgenerator(iter_devices())
+
+ with gpio_files(["/dev/gpiochip33", "/dev/gpiochip55"]) as expected_files:
+ devices = list(iter_devices())
+ assert len(devices) == 2
+ for device in devices:
+ assert isinstance(device, Device)
+ assert {device.filename for device in devices} == {Path(filename) for filename in expected_files}
+
+
+@test("device creation")
+def _():
+ # This should not raise an error until open() is called
+ device = Device("/unknown")
+ assert str(device.filename) == "/unknown"
+ assert device.filename.name == "unknown"
+ assert device.closed
+
+ for name in (1, 1.1, True, [], {}, (), set()):
+ with raises(TypeError):
+ Device(name)
+
+
+@test("device creation from id")
+def _():
+ # This should not raise an error until open() is called
+ device = Device.from_id(33)
+ assert str(device.filename) == "/dev/gpiochip33"
+ assert device.filename.name == "gpiochip33"
+ assert device.closed
+
+
+@test("device open")
+def _(chip=emulate_gpiochip):
+ device = Device(chip.filename)
+ assert chip.fobj is None
+ assert device.closed
+ device.open()
+ assert not device.closed
+ assert device.fileno() == chip.fd
+
+
+@test("get info")
+def _(chip=emulate_gpiochip):
+ device = Device(chip.filename)
+
+ with raises(AttributeError):
+ device.get_info()
+
+ with device:
+ info = device.get_info()
+ assert info.name == chip.name.decode()
+ assert info.label == chip.label.decode()
+ assert len(info.lines) == chip.nb_lines
+ l1 = info.lines[1]
+ assert l1.flags == raw.LineFlag.USED | raw.LineFlag.OUTPUT
+ assert l1.name == "Line1"
+ assert l1.consumer == "another fellow"
+ assert l1.attributes.flags == raw.LineFlag.ACTIVE_LOW
+ assert isclose(l1.attributes.debounce_period, 99.123456)
+
+
+@test("make request")
+def _(chip=emulate_gpiochip):
+ nb_lines = chip.nb_lines
+ with Device(chip.filename) as device:
+ for blocking in (True, False):
+ for request in (device[:], device.request(), Request(None, list(range(nb_lines)))):
+ request.blocking = blocking
+ assert len(request.lines) == nb_lines
+ assert request.min_line == 0
+ assert request.max_line == nb_lines - 1
+ assert len(request.line_requests) == ceil(nb_lines / 64)
+
+ for request in (device[1], device.request([1]), Request(None, [1])):
+ assert len(request.lines) == 1
+ assert len(request.line_requests) == 1
+ assert request.min_line == 1
+ assert request.max_line == 1
+ assert request.lines == [1]
+
+ lines = [1, 5, 10, 12]
+ for request in (device[1, 5, 10:14:2], device.request(lines), Request(None, lines)):
+ assert len(request.lines) == 4
+ assert len(request.line_requests) == 1
+ assert request.min_line == 1
+ assert request.max_line == 12
+ assert request.lines == lines
+
+ # close the request to make sure it always succeeds
+ request.close()
+
+
+@test("get value")
+def _(chip=emulate_gpiochip):
+ def assert_request(request):
+ for values in (request[:], request.get_values()):
+ assert len(values) == 9
+ for i in range(1, 10):
+ expected = 1 if i % 2 else 0
+ assert values[i] == expected
+
+ lines = [2, 3, 5, 7]
+ for values in (request[2, 3:8:2], request.get_values(lines)):
+ assert len(values) == len(lines)
+ for i in lines:
+ expected = 1 if i % 2 else 0
+ assert values[i] == expected
+
+ expected = {2: 0, 1: 1}
+ assert request[1] == expected[1]
+ assert request[2] == expected[2]
+ assert request[1, 2] == expected
+ assert request[1:3] == expected
+
+ with raises(KeyError):
+ request[0]
+
+ with raises(KeyError):
+ request[20]
+
+ with raises(KeyError):
+ request[-1]
+
+ with Device(chip.filename) as device:
+ with device[1:10] as request:
+ assert_request(request)
+
+ for blocking in (False, True):
+ request = Request(device, list(range(1, 10)), name="Me", blocking=blocking)
+ with request:
+ assert_request(request)
+
+
+@test("set value")
+def _(chip=emulate_gpiochip):
+ with Device(chip.filename) as device:
+ with device[1:10] as request:
+ assert request[1, 2] == {2: 0, 1: 1}
+
+ request.set_values({1: 0})
+ assert request[1, 2] == {2: 0, 1: 0}
+
+ request.set_values({2: 1, 1: 1})
+ assert request[1, 2] == {2: 1, 1: 1}
+
+ request[1] = 0
+ assert request[1, 2] == {2: 1, 1: 0}
+
+ request[2] = 1
+ assert request[1, 2] == {2: 1, 1: 0}
+
+ request[1, 2] = [1, 0]
+ assert request[1, 2] == {2: 0, 1: 1}
+
+ request[1:3] = [0, 1]
+ assert request[1, 2] == {2: 1, 1: 0}
+
+ request[:] = 9 * [1]
+ assert request[:] == {i: 1 for i in range(1, 10)}
+
+
+@test("event stream")
+def _(chip=emulate_gpiochip):
+ with Device(chip.filename) as device:
+ line = 11
+ with device[line] as request:
+ line_request = request.line_requests[0]
+ chip.trigger_event(line_request, line, LineEventId.RISING_EDGE, 55, 22, 1_999_999_000)
+ chip.trigger_event(line_request, line, LineEventId.FALLING_EDGE, 57, 23, 2_999_999_000)
+ for i, event in enumerate(request):
+ assert event.line == line
+ if not i:
+ assert isclose(event.timestamp, 1.999_999)
+ assert event.sequence == 55
+ assert event.line_sequence == 22
+ assert event.type == LineEventId.RISING_EDGE
+ else:
+ assert isclose(event.timestamp, 2.999_999)
+ assert event.sequence == 57
+ assert event.line_sequence == 23
+ assert event.type == LineEventId.FALLING_EDGE
+ if i > 0:
+ break
+
+
+@test("async event stream")
+async def _(chip=emulate_gpiochip):
+ with Device(chip.filename) as device:
+ line = 11
+ with device[line] as request:
+ line_request = request.line_requests[0]
+ chip.trigger_event(line_request, line, LineEventId.RISING_EDGE, 55, 22, 1_999_999_000)
+ chip.trigger_event(line_request, line, LineEventId.FALLING_EDGE, 57, 23, 2_999_999_000)
+ stream = request.__aiter__()
+ try:
+ i = 0
+ async for event in stream:
+ assert event.line == line
+ if not i:
+ assert isclose(event.timestamp, 1.999_999)
+ assert event.sequence == 55
+ assert event.line_sequence == 22
+ assert event.type == LineEventId.RISING_EDGE
+ else:
+ assert isclose(event.timestamp, 2.999_999)
+ assert event.sequence == 57
+ assert event.line_sequence == 23
+ assert event.type == LineEventId.FALLING_EDGE
+ i += 1
+ if i > 0:
+ break
+ finally:
+ await stream.aclose()