Skip to content

Commit

Permalink
Add gpio API doc
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagocoutinho committed Oct 13, 2024
1 parent d8cea86 commit 5c05881
Showing 1 changed file with 192 additions and 21 deletions.
213 changes: 192 additions & 21 deletions linuxpy/gpio/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,29 @@ def __init__(self):


def get_chip_info(fd: FDLike) -> ChipInfo:
"""Reads the chip information"""
"""Reads the chip information
Args:
fd (FDLike): a gpiochip file number or file like object
Returns:
ChipInfo: chip info of the given file descriptor
"""
info = raw.gpiochip_info()
ioctl(fd, IOC.CHIPINFO, info)
return ChipInfo(info.name.decode(), info.label.decode(), info.lines)


def get_line_info(fd: FDLike, line: int) -> LineInfo:
"""Reads the given line information"""
"""Reads the given line information
Args:
fd (FDLike): a gpiochip file number or file like object
line (int): desired line to get information
Returns:
LineInfo: information for the given line and chip
"""
info = raw.gpio_v2_line_info(offset=line)
ioctl(fd, IOC.GET_LINEINFO, info)
attributes = LineAttributes()
Expand All @@ -88,15 +103,33 @@ def get_line_info(fd: FDLike, line: int) -> LineInfo:


def get_info(fd: FDLike) -> Info:
"""Reads the given line information"""
"""Reads the given chip the full information
Args:
fd (FDLike): a gpiochip file number or file like object
Returns:
Info: information for the given chip including all its lines
"""
chip = get_chip_info(fd)
return Info(chip.name, chip.label, [get_line_info(fd, line) for line in range(chip.lines)])


def request_line(
fd: FDLike, consumer_name: str, lines: Sequence[int], flags: LineFlag, blocking=False
) -> raw.gpio_v2_line_request:
"""Make a request to reserve the given line(s)"""
"""Make a request to reserve the given line(s)
Args:
fd (FDLike): a gpiochip file number or file like object
consumer_name (str): consumer name (max 32 chars)
lines (Sequence[int]): list of lines to reserve
flags (LineFlag): common flags
blocking (bool, optional): Make the return FD blocking. Defaults to False.
Returns:
raw.gpio_v2_line_request: The details of the request. Field `fd` contains the new open file descritor
"""
num_lines = len(lines)
req = raw.gpio_v2_line_request()
req.consumer = consumer_name.encode()
Expand All @@ -112,31 +145,47 @@ def request_line(


def get_values(req_fd: FDLike, mask: int) -> raw.gpio_v2_line_values:
"""
Read line values.
The mask is a bitmap identifying the lines, with each bit number corresponding to
the index of the line reserved in the given req_fd.
"""Read lines values.
Args:
req_fd (FDLike): a gpiochip file number or file like object
mask (int): a bitmap identifying the lines, with each bit number corresponding to
the index of the line reserved in the given req_fd.
Returns:
raw.gpio_v2_line_values: the current line values
"""
result = raw.gpio_v2_line_values(mask=mask)
return ioctl(req_fd, IOC.LINE_GET_VALUES, result)


def set_values(req_fd: FDLike, mask: int, bits: int) -> raw.gpio_v2_line_values:
"""
Set line values.
Set lines values.
Parameters:
Args:
req_fd (FDLike): a gpiochip file number or file like object
mask: The mask is a bitmap identifying the lines, with each bit number corresponding to
the index of the line reserved in the given req_fd.
bits: The bits is a bitmap containing the value of the lines, set to 1 for active and 0
for inactive.
Returns:
raw.gpio_v2_line_values: the underlying object sent to the ioctl call
"""
result = raw.gpio_v2_line_values(mask=mask, bits=bits)
return ioctl(req_fd, IOC.LINE_SET_VALUES, result)


def read_one_event(req_fd: FDLike) -> LineEvent:
"""Read one event from the given request file descriptor"""
"""Read one event from the given request file descriptor
Args:
req_fd (FDLike): _description_
Returns:
LineEvent: _description_
"""
fd = req_fd if isinstance(req_fd, int) else req_fd.fileno()
data = os.read(fd, sizeof(raw.gpio_v2_line_event))
event = raw.gpio_v2_line_event.from_buffer_copy(data)
Expand Down Expand Up @@ -213,6 +262,11 @@ def set_values(self, values: dict[int, Union[int, bool]]) -> raw.gpio_v2_line_va


class Request(ReentrantOpen):
"""A lazy request to reserve lines on a chip
Prefered creation from the `Device.request()` method.
"""

def __init__(
self, device, lines: Sequence[int], name: str = "", flags: LineFlag = LineFlag(0), blocking: bool = False
):
Expand All @@ -230,10 +284,24 @@ def __init__(

@property
def name(self) -> str:
"""Requestor name
Change the requestor name must be called before the request is open
to take effect
Returns:
str: consumer name
"""
return self._name

@name.setter
def name(self, name) -> None:
def name(self, name: str) -> None:
"""Set requestor name. Must be called before the request is open
to take effect
Args:
name (str): new requestor name
"""
self._name = name
for req in self.line_requests:
req.name = name
Expand All @@ -249,13 +317,29 @@ def flags(self, flags: LineFlag):
req.flags = flags

def __getitem__(self, key: Union[int, tuple, slice]) -> Union[int, dict]:
"""Get values"""
"""Reads lines values
Args:
key (Union[int, tuple, slice]): a line number, a slice, or a list of line numbers or slices
Returns:
Union[int, dict]: a dict where key is line number and value its value
"""
if isinstance(key, int):
return self.get_values((key,))[key]
lines = expand_from_list(key, self.min_line, self.max_line + 1)
return self.get_values(lines)

def __setitem__(self, key: Union[int, tuple, slice], value: Union[int, Sequence[int]]):
"""Sets the given lines values
Args:
key (Union[int, tuple, slice]): a line number, a slice, or a list of line numbers or slices
value (Union[int, Sequence[int]]): the value(s) to write for the given lines
Raises:
ValueError: if key is a line number and value is not a number (0 or 1)
"""
if isinstance(key, int):
if not isinstance(value, int):
raise ValueError("set value for single line must be 0 or 1")
Expand All @@ -268,35 +352,72 @@ def __setitem__(self, key: Union[int, tuple, slice], value: Union[int, Sequence[
self.set_values(values)

def __iter__(self) -> Iterable[LineEvent]:
"""Infinite stream of line events
Returns:
Iterable[LineEvent]: the stream of events
"""
return event_stream(self.filenos())

def __aiter__(self) -> AsyncIterator[LineEvent]:
"""Asynchronous stream of line events
Returns:
AsyncIterator[LineEvent]: the asynchronous stream of events
"""
return async_event_stream(self.filenos())

@property
def min_line(self) -> int:
"""The smallest line number in the request
Returns:
int: The smallest line number in the request
"""
if not hasattr(self, "_min_line"):
self._min_line = min(self.lines)
return self._min_line

@property
def max_line(self) -> int:
"""The biggest line number in the request
Returns:
int: The biggest line number in the request
"""
if not hasattr(self, "_max_line"):
self._max_line = max(self.lines)
return self._max_line

def filenos(self) -> list[int]:
"""List of underlying request file numbers
Returns:
list[int]: List of underlying request file numbers
"""
return [request.fd for request in self.line_requests]

def close(self):
"""Closes the underling request files. If request is not
open nothing is done.
"""
for line_request in self.line_requests:
line_request.close()

def open(self):
"""Opens the underling request files effectively reserving the lines"""
for line_request in self.line_requests:
line_request.open()

def get_values(self, lines: Optional[Sequence[int]] = None) -> dict[int, int]:
"""Reads values for the given lines
Args:
lines (Optional[Sequence[int]], optional): A collection of lines. Defaults to None. Default means read all lines
Returns:
dict[int, int]: line values. Key is line number and value its value
"""
if lines is None:
lines = self.lines
request_lines = {}
Expand All @@ -309,6 +430,11 @@ def get_values(self, lines: Optional[Sequence[int]] = None) -> dict[int, int]:
return result

def set_values(self, values: dict[int, Union[int, bool]]):
"""Writes new values on the given lines
Args:
values (dict[int, Union[int, bool]]): key is line number and value its value
"""
request_lines = {}
for line, value in values.items():
request_line = self.line_map[line]
Expand All @@ -318,21 +444,34 @@ def set_values(self, values: dict[int, Union[int, bool]]):


class Device(BaseDevice):
"""
A device represents a connection to the underlying gpio chip
"""
"""A device represents a connection to the underlying gpio chip"""

PREFIX = "/dev/gpiochip"

def __len__(self) -> int:
"""The number of lines in this chip
Returns:
int: The number of lines in this chip
"""
if not hasattr(self, "_len"):
self._len = get_chip_info(self).lines
return self._len

def __getitem__(self, key: Union[int, tuple, slice]) -> Request:
"""Request line(s)"""
"""create a request for the given lines. Equivalent to `device.request(key)`
!!! note
The request is not active after this call. You need to use the request object returned
by this method in a context manager or manually call open/close.
Args:
key (Union[int, tuple, slice]): the line number, slice or a list of line numbers, or slices
Returns:
Request: A new request object
"""
lines = expand_from_list(key, 0, len(self))
return self.request(lines)

Expand All @@ -347,18 +486,50 @@ def get_info(self) -> Info:
return get_info(self)

def request(self, lines: Optional[Sequence[int]] = None, name: str = "", flags: LineFlag = LineFlag(0)) -> Request:
"""Create a request to reserve a list of lines on this chip
!!! note
The request is not active after this call. You need to use the request object returned
by this method in a context manager or manually call open/close.
Args:
lines (Optional[Sequence[int]], optional): Lists of lines. Defaults to None. Default is to reserve all lines
name (str, optional): the requestor (aka consumer) name. Defaults to "".
flags (LineFlag, optional): Flags on the request. Defaults to LineFlag(0).
Returns:
Request: A new request object
"""
if lines is None:
lines = list(range(len(self)))
return Request(self, lines, name, flags)


def iter_gpio_files(path: PathLike = "/dev") -> Iterable[pathlib.Path]:
"""Returns an iterator over all GPIO chip files"""
"""Returns an iterator over all GPIO chip files.
!!! warning
Only files for which the current user has read and write access are returned
Args:
path (PathLike, optional): root path. Defaults to "/dev".
Returns:
Iterable[pathlib.Path]: an iterator over the gpiochip files found on the system
"""
return iter_device_files(path=path, pattern="gpio*")


def iter_devices(path: PathLike = "/dev", **kwargs) -> Iterable[Device]:
"""Returns an iterator over all GPIO chip devices"""
"""Returns an iterator over all GPIO chip devices
Args:
path (PathLike, optional): root path. Defaults to "/dev".
Returns:
Iterable[Device]: an iterator over the gpiochip devices found on the system
"""
return (Device(name, **kwargs) for name in iter_gpio_files(path=path))


Expand Down

0 comments on commit 5c05881

Please sign in to comment.