Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for GPIO #43

Merged
merged 19 commits into from
Sep 17, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use LineRequest in Request
tiagocoutinho committed Sep 14, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 843e87007ff2b6056823e53565a206ca1b791531
74 changes: 27 additions & 47 deletions linuxpy/gpio/device.py
Original file line number Diff line number Diff line change
@@ -214,29 +214,24 @@ def read_one(self) -> LineEvent:

class Request(ReentrantOpen):
def __init__(self, device, lines: list[int] | tuple[int], name: str = "", flags: LineFlag = LineFlag(0)):
self.device = device
self.name = name
self.flags = flags
self.indexes = {}
self.lines = lines
self.chunk_lines = []
for chunk_nb, chunk in enumerate(chunks(lines, 64)):
chunk_lines = []
for index, line in enumerate(chunk):
self.indexes[line] = chunk_nb, index
chunk_lines.append(line)
self.chunk_lines.append(chunk_lines)
self.requests = None
self.line_requests = []
self.line_map = {}
for chunk in chunks(lines, 64):
line_request = LineRequest(device, chunk, name, flags)
self.line_requests.append(line_request)
for line in chunk:
self.line_map[line] = line_request
super().__init__()

def iter_fileno(self):
return (request.fd for request in self.requests)

def __iter__(self):
return event_stream(self.iter_fileno())
return event_stream(self.line_requests)

async def __aiter__(self):
async for event in async_event_stream(self.iter_fileno()):
async for event in async_event_stream(self.line_requests):
yield event

def __getitem__(self, key: int | tuple | slice):
@@ -248,50 +243,35 @@ def __getitem__(self, key: int | tuple | slice):
return self.get_values(key)

def close(self):
for request in self.requests or ():
os.close(request.fd)
self.requests = None
for line_request in self.line_requests:
line_request.close()

def open(self):
self.requests = tuple(request_line(self.device, self.name, lines, self.flags) for lines in self.chunk_lines)
for line_request in self.line_requests:
line_request.open()

def get_values(self, lines: None | list[int] | tuple[int] = None):
if lines is None:
lines = tuple(line for lines in self.chunk_lines for line in lines)

chunks = {}
for line in lines:
chunk_nb, index = self.indexes[line]
chunks[chunk_nb] = chunks.get(chunk_nb, 0) | 1 << index

values = {}
for chunk_nb, mask in chunks.items():
request = self.requests[chunk_nb]
values[chunk_nb] = get_values(request.fd, mask)

results = {}
lines = self.lines
request_lines = {}
for line in lines:
chunk_nb, index = self.indexes[line]
results[line] = values[chunk_nb].bits >> index & 0b1
return results
request_line = self.line_map[line]
request_lines.setdefault(request_line, []).append(line)
result = {}
for request_line, local_lines in request_lines.items():
result.update(request_line.get_values(local_lines))
return result

def set_values(self, values: dict[int, int | bool]):
chunks = {}
request_lines = {}
for line, value in values.items():
chunk_nb, index = self.indexes[line]
mask, bits = chunks.get(chunk_nb, (0, 0))
mask |= 1 << index
if value:
bits |= 1 << index
chunks[chunk_nb] = mask, bits

for chunk_nb, (mask, bits) in chunks.items():
request = self.requests[chunk_nb]
set_values(request.fd, mask, bits)
request_line = self.line_map[line]
request_lines.setdefault(request_line, {})[line] = value
for request_line, local_lines in request_lines.items():
request_line.set_values(local_lines)

def read_one(self) -> LineEvent:
fd = next(iter(self))
return read_one_event(fd)
return next(iter(self))


class Device(BaseDevice):