Skip to content

Commit

Permalink
gpio: Add line info event
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagocoutinho committed Oct 14, 2024
1 parent 95668e4 commit 3321c8c
Showing 1 changed file with 103 additions and 18 deletions.
121 changes: 103 additions & 18 deletions linuxpy/gpio/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import os
import pathlib

from linuxpy.ctypes import sizeof
from linuxpy.ctypes import sizeof, u32
from linuxpy.device import BaseDevice, ReentrantOpen, iter_device_files
from linuxpy.gpio import raw
from linuxpy.gpio.raw import IOC
Expand All @@ -48,6 +48,7 @@
LineAttrId = raw.LineAttrId
LineEventId = raw.LineEventId
LineEvent = collections.namedtuple("LineEvent", "timestamp type line sequence line_sequence")
LineInfoEvent = collections.namedtuple("LineInfoEvent", "name consumer line flags attributes timestamp type")


class LineAttributes:
Expand All @@ -57,6 +58,26 @@ def __init__(self):
self.debounce_period = 0


def decode_line_info(info: raw.gpio_v2_line_info) -> LineInfo:
attributes = LineAttributes()
for i in range(info.num_attrs):
attr = info.attrs[i]
if attr.id == LineAttrId.FLAGS:
attributes.flags = LineFlag(attr.flags)
elif attr.id == LineAttrId.OUTPUT_VALUES:
attributes.indexes = bit_indexes(attr.values)
elif attr.id == LineAttrId.DEBOUNCE:
attributes.debounce_period = attr.debounce_period_us / 1_000_000

return LineInfo(
info.name.decode(),
info.consumer.decode(),
info.offset,
LineFlag(info.flags),
attributes,
)


def get_chip_info(fd: FDLike) -> ChipInfo:
"""Reads the chip information
Expand All @@ -83,23 +104,7 @@ def get_line_info(fd: FDLike, line: int) -> LineInfo:
"""
info = raw.gpio_v2_line_info(offset=line)
ioctl(fd, IOC.GET_LINEINFO, info)
attributes = LineAttributes()
for i in range(info.num_attrs):
attr = info.attrs[i]
if attr.id == LineAttrId.FLAGS:
attributes.flags = LineFlag(attr.flags)
elif attr.id == LineAttrId.OUTPUT_VALUES:
attributes.indexes = bit_indexes(attr.values)
elif attr.id == LineAttrId.DEBOUNCE:
attributes.debounce_period = attr.debounce_period_us / 1_000_000

return LineInfo(
info.name.decode(),
info.consumer.decode(),
info.offset,
LineFlag(info.flags),
attributes,
)
return decode_line_info(info)


def get_info(fd: FDLike) -> Info:
Expand Down Expand Up @@ -198,9 +203,38 @@ def read_one_event(req_fd: FDLike) -> LineEvent:
)


def watch_line_info(fd: FDLike, line: int):
info = raw.gpio_v2_line_info(offset=line)
return ioctl(fd, IOC.GET_LINEINFO_WATCH, info)


def unwatch_line_info(fd: FDLike, line: int):
return ioctl(fd, IOC.LINEINFO_UNWATCH, u32(line))


def read_one_line_info_event(fd: FDLike) -> LineInfoEvent:
if not isinstance(fd, int):
fd = fd.fileno()
data = os.read(fd, sizeof(raw.gpio_v2_line_info_changed))
event = raw.gpio_v2_line_info_changed.from_buffer_copy(data)
info = decode_line_info(event.info)
return LineInfoEvent(
info.name,
info.consumer,
info.line,
info.flags,
info.attributes,
event.timestamp_ns * 1e-9,
type=raw.GpioLineEvent(event.event_type),
)


event_stream = functools.partial(events, read=read_one_event)
async_event_stream = functools.partial(async_events, read=read_one_event)

event_info_stream = functools.partial(events, read=read_one_line_info_event)
async_event_info_stream = functools.partial(async_events, read=read_one_line_info_event)


def expand_from_list(key: Union[int, slice, tuple], minimum, maximum) -> list[int]:
"""Used internally in __getitem__ to expand the given key"""
Expand Down Expand Up @@ -448,6 +482,10 @@ class Device(BaseDevice):

PREFIX = "/dev/gpiochip"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._watch_lines = set()

def __len__(self) -> int:
"""The number of lines in this chip
Expand Down Expand Up @@ -475,6 +513,39 @@ def __getitem__(self, key: Union[int, tuple, slice]) -> Request:
lines = expand_from_list(key, 0, len(self))
return self.request(lines)

def __iter__(self) -> Iterable[LineInfoEvent]:
"""Infinite stream of line info events
Returns:
Iterable[LineInfoEvent]: the stream of line info events
"""
return event_info_stream((self,))

def __aiter__(self) -> AsyncIterator[LineInfoEvent]:
"""Asynchronous stream of line events
Returns:
AsyncIterator[LineInfoEvent]: the asynchronous stream of line info events
"""
return async_event_info_stream((self,))

def info_stream(self, lines: Collection[int]) -> Iterable[LineInfoEvent]:
watched = set(lines) - self._watch_lines
self.watch_lines(watched)
try:
yield from iter(self)
finally:
self.unwatch_lines(watched)

async def async_info_stream(self, lines: Collection[int]) -> AsyncIterator[LineInfoEvent]:
watched = set(lines) - self._watch_lines
self.watch_lines(watched)
try:
async for event in self:
yield event
finally:
self.unwatch_lines(watched)

def get_info(self) -> Info:
"""
Reads all information available including chip info and detailed information
Expand All @@ -485,6 +556,20 @@ def get_info(self) -> Info:
"""
return get_info(self)

def watch_line(self, line: int):
watch_line_info(self, line)

def unwatch_line(self, line: int):
unwatch_line_info(self, line)

def watch_lines(self, lines: Collection[int]):
for line in lines:
watch_line_info(self, line)

def unwatch_lines(self, lines: Collection[int]):
for line in lines:
unwatch_line_info(self, line)

def request(self, lines: Optional[Sequence[int]] = None, name: str = "", flags: LineFlag = LineFlag(0)) -> Request:
"""Create a request to reserve a list of lines on this chip
Expand Down

0 comments on commit 3321c8c

Please sign in to comment.