diff --git a/linuxpy/types.py b/linuxpy/types.py index 690ed8b..34e9e00 100644 --- a/linuxpy/types.py +++ b/linuxpy/types.py @@ -25,9 +25,11 @@ Union = typing.Union +Optional = typing.Optional PathLike = Union[str, pathlib.Path, os.PathLike] Iterable = collections.abc.Iterable Iterator = collections.abc.Iterator AsyncIterable = collections.abc.AsyncIterable AsyncIterator = collections.abc.AsyncIterator +Callable = collections.abc.Callable diff --git a/linuxpy/video/device.py b/linuxpy/video/device.py index d8af662..b610d7c 100644 --- a/linuxpy/video/device.py +++ b/linuxpy/video/device.py @@ -27,7 +27,7 @@ ) from linuxpy.io import IO from linuxpy.ioctl import ioctl -from linuxpy.types import AsyncIterator, Buffer, Iterable, Iterator, PathLike, Self +from linuxpy.types import AsyncIterator, Buffer, Callable, Iterable, Iterator, Optional, PathLike, Self from . import raw @@ -1110,7 +1110,7 @@ def __init__(self, device: Device, buffer_type: BufferType, size: int = 2): self.buffers = None self.name = type(self).__name__ - def formats(self): + def formats(self) -> list: formats = self.device.info.formats return [fmt for fmt in formats if fmt.type == self.type] @@ -1515,7 +1515,7 @@ async def aread(self): class FrameReader: - def __init__(self, device: Device, raw_read, max_queue_size=1): + def __init__(self, device: Device, raw_read: Callable[[], Buffer], max_queue_size: int = 1): self.device = device self.raw_read = raw_read self._loop = None @@ -1524,7 +1524,7 @@ def __init__(self, device: Device, raw_read, max_queue_size=1): self._max_queue_size = max_queue_size self._device_fd = None - async def __aenter__(self): + async def __aenter__(self) -> Self: if self.device.is_blocking: raise V4L2Error("Cannot use async frame reader on blocking device") self._device_fd = self.device.fileno() @@ -1547,13 +1547,13 @@ async def __aexit__(self, exc_type, exc_value, traceback): self._loop = None self._buffer = None - def __enter__(self): + def __enter__(self) -> Self: return self def __exit__(self, exc_type, exc_value, tb): pass - def _on_event(self): + def _on_event(self) -> None: task = self._loop.create_future() try: self._selector.poll(0) # avoid blocking @@ -1568,14 +1568,14 @@ def _on_event(self): buffer.get_nowait() buffer.put_nowait(task) - def read(self, timeout=None): + def read(self, timeout: Optional[float] = None) -> Frame: if not self.device.is_blocking: read, _, _ = self.device.io.select((self.device,), (), (), timeout) if not read: return return self.raw_read() - async def aread(self): + async def aread(self) -> Frame: """Wait for next frame or return last frame""" task = await self._buffer.get() return await task @@ -1609,18 +1609,18 @@ def __init__(self, buffer_manager: BufferManager): def device(self) -> Device: return self.buffer_manager.device - def raw_write(self, data): + def raw_write(self, data: Buffer) -> None: self.device.write(data) - def wait_write(self, data): + def wait_write(self, data: Buffer) -> None: device = self.device if device.io.select is not None: _, w, _ = device.io.select((), (device,), ()) if not w: raise OSError("Closed") - return self.raw_write(data) + self.raw_write(data) - def write(self, data: bytes): + def write(self, data: Buffer) -> None: # first time we check what mode device was opened (blocking vs non-blocking) # if file was opened with O_NONBLOCK: DQBUF will not block until a buffer # is available for write. So we need to do it here @@ -1630,10 +1630,10 @@ def write(self, data: bytes): self.write = self.wait_write return self.write(data) - def open(self): + def open(self) -> None: pass - def close(self): + def close(self) -> None: pass @@ -1680,7 +1680,7 @@ def close(self) -> None: self.buffer = None self.device.log.info("Video output closed") - def write(self, data: Buffer): + def write(self, data: Buffer) -> None: self.buffer.write(data)