Skip to content

Commit

Permalink
More typing
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagocoutinho committed Nov 3, 2023
1 parent 341c6a8 commit 50b52c7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
2 changes: 2 additions & 0 deletions linuxpy/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@


Union = typing.Union
Optional = typing.Optional
PathLike = Union[str, pathlib.Path, os.PathLike]

Iterable = collections.abc.Iterable
Iterator = collections.abc.Iterator
AsyncIterable = collections.abc.AsyncIterable
AsyncIterator = collections.abc.AsyncIterator
Callable = collections.abc.Callable
30 changes: 15 additions & 15 deletions linuxpy/video/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from linuxpy.io import IO
from linuxpy.ioctl import ioctl
from linuxpy.types import AsyncIterator, Buffer, Iterable, Iterator, PathLike, Self
from linuxpy.types import AsyncIterator, Buffer, Callable, Iterable, Iterator, Optional, PathLike, Self

from . import raw

Expand Down Expand Up @@ -1110,7 +1110,7 @@ def __init__(self, device: Device, buffer_type: BufferType, size: int = 2):
self.buffers = None
self.name = type(self).__name__

def formats(self):
def formats(self) -> list:
formats = self.device.info.formats
return [fmt for fmt in formats if fmt.type == self.type]

Expand Down Expand Up @@ -1515,7 +1515,7 @@ async def aread(self):


class FrameReader:
def __init__(self, device: Device, raw_read, max_queue_size=1):
def __init__(self, device: Device, raw_read: Callable[[], Buffer], max_queue_size: int = 1):
self.device = device
self.raw_read = raw_read
self._loop = None
Expand All @@ -1524,7 +1524,7 @@ def __init__(self, device: Device, raw_read, max_queue_size=1):
self._max_queue_size = max_queue_size
self._device_fd = None

async def __aenter__(self):
async def __aenter__(self) -> Self:
if self.device.is_blocking:
raise V4L2Error("Cannot use async frame reader on blocking device")
self._device_fd = self.device.fileno()
Expand All @@ -1547,13 +1547,13 @@ async def __aexit__(self, exc_type, exc_value, traceback):
self._loop = None
self._buffer = None

def __enter__(self):
def __enter__(self) -> Self:
return self

def __exit__(self, exc_type, exc_value, tb):
pass

def _on_event(self):
def _on_event(self) -> None:
task = self._loop.create_future()
try:
self._selector.poll(0) # avoid blocking
Expand All @@ -1568,14 +1568,14 @@ def _on_event(self):
buffer.get_nowait()
buffer.put_nowait(task)

def read(self, timeout=None):
def read(self, timeout: Optional[float] = None) -> Frame:
if not self.device.is_blocking:
read, _, _ = self.device.io.select((self.device,), (), (), timeout)
if not read:
return
return self.raw_read()

async def aread(self):
async def aread(self) -> Frame:
"""Wait for next frame or return last frame"""
task = await self._buffer.get()
return await task
Expand Down Expand Up @@ -1609,18 +1609,18 @@ def __init__(self, buffer_manager: BufferManager):
def device(self) -> Device:
return self.buffer_manager.device

def raw_write(self, data):
def raw_write(self, data: Buffer) -> None:
self.device.write(data)

def wait_write(self, data):
def wait_write(self, data: Buffer) -> None:
device = self.device
if device.io.select is not None:
_, w, _ = device.io.select((), (device,), ())
if not w:
raise OSError("Closed")
return self.raw_write(data)
self.raw_write(data)

def write(self, data: bytes):
def write(self, data: Buffer) -> None:
# first time we check what mode device was opened (blocking vs non-blocking)
# if file was opened with O_NONBLOCK: DQBUF will not block until a buffer
# is available for write. So we need to do it here
Expand All @@ -1630,10 +1630,10 @@ def write(self, data: bytes):
self.write = self.wait_write
return self.write(data)

def open(self):
def open(self) -> None:
pass

def close(self):
def close(self) -> None:
pass


Expand Down Expand Up @@ -1680,7 +1680,7 @@ def close(self) -> None:
self.buffer = None
self.device.log.info("Video output closed")

def write(self, data: Buffer):
def write(self, data: Buffer) -> None:
self.buffer.write(data)


Expand Down

0 comments on commit 50b52c7

Please sign in to comment.