From 9e0bb4565953efd6efb3343fa5b6fb6786f4f5f3 Mon Sep 17 00:00:00 2001 From: egeakman Date: Mon, 12 Feb 2024 02:24:23 +0300 Subject: [PATCH 01/27] [Lib] Introduce has_demand method --- mjpeg_streamer/mjpeg_streamer.py | 27 +++++++++++++++++++++------ pyproject.toml | 4 ++-- test-perf/fast-od.py | 22 ++++++++++++++++++++++ test-perf/regular-od.py | 25 +++++++++++++++++++++++++ test-perf/regular.py | 19 +++++++++++++++++++ 5 files changed, 89 insertions(+), 8 deletions(-) create mode 100644 test-perf/fast-od.py create mode 100644 test-perf/regular-od.py create mode 100644 test-perf/regular.py diff --git a/mjpeg_streamer/mjpeg_streamer.py b/mjpeg_streamer/mjpeg_streamer.py index fdc5859..5ee87c8 100644 --- a/mjpeg_streamer/mjpeg_streamer.py +++ b/mjpeg_streamer/mjpeg_streamer.py @@ -26,18 +26,30 @@ def __init__( self.fps = fps self._frame = np.zeros((320, 240, 1), dtype=np.uint8) self._lock = asyncio.Lock() - self._byte_frame_window = deque(maxlen=30) + self._byte_frame_window = deque(maxlen=fps) self._bandwidth_last_modified_time = time.time() + self._background_task: Optional[asyncio.Task] = None + + async def __ensure_background_task(self) -> None: + if self._background_task is None or self._background_task.done(): + self._background_task = asyncio.create_task(self.__clear_deque()) + + async def __clear_deque(self) -> None: + while True: + await asyncio.sleep(1 / self.fps) + if ( + len(self._byte_frame_window) > 0 + and time.time() - self._bandwidth_last_modified_time >= 1 + ): + deque.clear(self._byte_frame_window) + + def has_demand(self) -> bool: + return len(self._byte_frame_window) > 0 def set_frame(self, frame: np.ndarray) -> None: self._frame = frame def get_bandwidth(self) -> float: - if ( - len(self._byte_frame_window) > 0 - and time.time() - self._bandwidth_last_modified_time >= 1 - ): - deque.clear(self._byte_frame_window) return sum(self._byte_frame_window) def __process_current_frame(self) -> np.ndarray: @@ -54,10 +66,13 @@ def __process_current_frame(self) -> np.ndarray: return frame async def get_frame(self) -> np.ndarray: + # A little hacky, if you have a better way, please let me know + await self.__ensure_background_task() async with self._lock: return self._frame async def get_frame_processed(self) -> np.ndarray: + await self.__ensure_background_task() # Ditto async with self._lock: return self.__process_current_frame() diff --git a/pyproject.toml b/pyproject.toml index 34a047d..947807a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,9 +79,9 @@ isolated = true path = "mjpeg_streamer/__init__.py" [tool.ruff] -exclude = ["examples"] +exclude = ["examples", "test*"] fix = false -ignore = [ +lint.ignore = [ "TID252", # Relative imports are banned | __init__.py "T201", # `print` found | TODO: Migrate to logging "S104", # Possible binding to all interfaces | False positive diff --git a/test-perf/fast-od.py b/test-perf/fast-od.py new file mode 100644 index 0000000..a8fa1a0 --- /dev/null +++ b/test-perf/fast-od.py @@ -0,0 +1,22 @@ +import os + +import cv2 + +from mjpeg_streamer import MjpegServer, Stream + +print(os.getpid()) + +cap = cv2.VideoCapture(0) + +stream = Stream("my_source", size=(640, 480), quality=50, fps=30) + +server = MjpegServer("localhost", 8080) +server.add_stream(stream) +server.start() + +while True: + if stream.has_demand(): + _, frame = cap.read() + stream.set_frame(frame) + else: + continue diff --git a/test-perf/regular-od.py b/test-perf/regular-od.py new file mode 100644 index 0000000..0cff62c --- /dev/null +++ b/test-perf/regular-od.py @@ -0,0 +1,25 @@ +import os + +import cv2 + +from mjpeg_streamer import MjpegServer, Stream + +print(os.getpid()) + +cap = cv2.VideoCapture(0) + +stream = Stream("my_source", size=(640, 480), quality=50, fps=30) + +server = MjpegServer("localhost", 8080) +server.add_stream(stream) +server.start() + +while True: + if stream.has_demand(): + if not cap.isOpened(): + cap.open(0) + _, frame = cap.read() + stream.set_frame(frame) + else: + if cap.isOpened(): + cap.release() diff --git a/test-perf/regular.py b/test-perf/regular.py new file mode 100644 index 0000000..90d510c --- /dev/null +++ b/test-perf/regular.py @@ -0,0 +1,19 @@ +import os + +import cv2 + +from mjpeg_streamer import MjpegServer, Stream + +print(os.getpid()) + +cap = cv2.VideoCapture(0) + +stream = Stream("my_source", size=(640, 480), quality=50, fps=30) + +server = MjpegServer("localhost", 8080) +server.add_stream(stream) +server.start() + +while True: + _, frame = cap.read() + stream.set_frame(frame) From e15e99d852755e9a62e60e2d82cac3f5ef0dc487 Mon Sep 17 00:00:00 2001 From: egeakman Date: Thu, 22 Feb 2024 22:25:21 +0100 Subject: [PATCH 02/27] Found the overhead, remove temporary tests --- requirements-dev.txt | 3 +++ test-perf/fast-od.py | 22 ---------------------- test-perf/regular-od.py | 25 ------------------------- test-perf/regular.py | 19 ------------------- 4 files changed, 3 insertions(+), 66 deletions(-) create mode 100644 requirements-dev.txt delete mode 100644 test-perf/fast-od.py delete mode 100644 test-perf/regular-od.py delete mode 100644 test-perf/regular.py diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..e7061e7 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,3 @@ +hatch +pre-commit +ruff diff --git a/test-perf/fast-od.py b/test-perf/fast-od.py deleted file mode 100644 index a8fa1a0..0000000 --- a/test-perf/fast-od.py +++ /dev/null @@ -1,22 +0,0 @@ -import os - -import cv2 - -from mjpeg_streamer import MjpegServer, Stream - -print(os.getpid()) - -cap = cv2.VideoCapture(0) - -stream = Stream("my_source", size=(640, 480), quality=50, fps=30) - -server = MjpegServer("localhost", 8080) -server.add_stream(stream) -server.start() - -while True: - if stream.has_demand(): - _, frame = cap.read() - stream.set_frame(frame) - else: - continue diff --git a/test-perf/regular-od.py b/test-perf/regular-od.py deleted file mode 100644 index 0cff62c..0000000 --- a/test-perf/regular-od.py +++ /dev/null @@ -1,25 +0,0 @@ -import os - -import cv2 - -from mjpeg_streamer import MjpegServer, Stream - -print(os.getpid()) - -cap = cv2.VideoCapture(0) - -stream = Stream("my_source", size=(640, 480), quality=50, fps=30) - -server = MjpegServer("localhost", 8080) -server.add_stream(stream) -server.start() - -while True: - if stream.has_demand(): - if not cap.isOpened(): - cap.open(0) - _, frame = cap.read() - stream.set_frame(frame) - else: - if cap.isOpened(): - cap.release() diff --git a/test-perf/regular.py b/test-perf/regular.py deleted file mode 100644 index 90d510c..0000000 --- a/test-perf/regular.py +++ /dev/null @@ -1,19 +0,0 @@ -import os - -import cv2 - -from mjpeg_streamer import MjpegServer, Stream - -print(os.getpid()) - -cap = cv2.VideoCapture(0) - -stream = Stream("my_source", size=(640, 480), quality=50, fps=30) - -server = MjpegServer("localhost", 8080) -server.add_stream(stream) -server.start() - -while True: - _, frame = cap.read() - stream.set_frame(frame) From 28e454c0df566da53dda0b1a524b12598e9c769d Mon Sep 17 00:00:00 2001 From: egeakman Date: Sat, 24 Feb 2024 20:52:41 +0300 Subject: [PATCH 03/27] Add .editorconfig --- .editorconfig | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..c02ae14 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,6 @@ +charset = utf-8 +indent_size = 4 +indent_style = space +insert_final_newline = true +root = true +trim_trailing_whitespace = true From de8b61d44d6aeb7f711cdbcd03b63957d64ff19f Mon Sep 17 00:00:00 2001 From: egeakman Date: Sun, 25 Feb 2024 01:30:10 +0300 Subject: [PATCH 04/27] Added a bunch of stuff, gonna need tons of debugging --- mjpeg_streamer/mjpeg_streamer.py | 259 +++++++++++++++++++++++++++---- test.py | 37 +++++ 2 files changed, 264 insertions(+), 32 deletions(-) create mode 100644 test.py diff --git a/mjpeg_streamer/mjpeg_streamer.py b/mjpeg_streamer/mjpeg_streamer.py index 5ee87c8..5c41b58 100644 --- a/mjpeg_streamer/mjpeg_streamer.py +++ b/mjpeg_streamer/mjpeg_streamer.py @@ -12,27 +12,25 @@ from aiohttp.web_runner import GracefulExit -class Stream: +class StreamBase: def __init__( self, name: str, - size: Optional[Tuple[int, int]] = None, - quality: int = 50, fps: int = 30, ) -> None: - self.name = name.lower().casefold().replace(" ", "_") - self.size = size - self.quality = max(1, min(quality, 100)) - self.fps = fps - self._frame = np.zeros((320, 240, 1), dtype=np.uint8) - self._lock = asyncio.Lock() - self._byte_frame_window = deque(maxlen=fps) - self._bandwidth_last_modified_time = time.time() - self._background_task: Optional[asyncio.Task] = None - - async def __ensure_background_task(self) -> None: - if self._background_task is None or self._background_task.done(): - self._background_task = asyncio.create_task(self.__clear_deque()) + if type(self) is StreamBase: + raise TypeError( + "StreamBase is an abstract class and cannot be instantiated" + ) + else: + self.name = name.lower().casefold().replace(" ", "_") + self.fps = fps + self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) + self._lock: asyncio.Lock = asyncio.Lock() + self._byte_frame_window: deque = deque(maxlen=fps) + self._bandwidth_last_modified_time: float = time.time() + self._deque_background_task: Optional[asyncio.Task] = None + self.settings() async def __clear_deque(self) -> None: while True: @@ -43,16 +41,194 @@ async def __clear_deque(self) -> None: ): deque.clear(self._byte_frame_window) + async def _ensure_background_tasks(self) -> None: + if self._deque_background_task is None or self._deque_background_task.done(): + self._deque_background_task = asyncio.create_task(self.__clear_deque()) + + def _check_encoding(self, frame: np.ndarray) -> str: + if isinstance(frame, np.ndarray) and frame.ndim == 1 and frame.size > 2: + # Check JPEG header (0xFFD8) and footer (0xFFD9) + if ( + frame[0] == 255 + and frame[1] == 216 + and frame[-2] == 255 + and frame[-1] == 217 + ): + return "jpeg" + else: + return "one-dim-non-jpeg" + elif isinstance(frame, np.ndarray): + return "multi-dim" + else: + return "unknown" + + def settings(self): + for key, value in self.__dict__.items(): + if key.startswith("_"): + continue + print(f"{key}: {value}") + def has_demand(self) -> bool: return len(self._byte_frame_window) > 0 + def get_bandwidth(self) -> float: + return sum(self._byte_frame_window) + + def set_fps(self, fps: int) -> None: + self.fps = fps + + # Method for delivering the frame to the StreamHandler + async def _get_frame(self) -> np.ndarray: + # A little hacky, if you have a better way, please let me know + await self._ensure_background_tasks() + self._byte_frame_window.append(len(self._frame.tobytes())) + self._bandwidth_last_modified_time = time.time() + async with self._lock: + return self._frame + def set_frame(self, frame: np.ndarray) -> None: + if self._check_encoding(frame) != "jpeg": + raise ValueError( + "Input is not an encoded JPEG frame. Use OpenCV's imencode method to encode the frame to JPEG." + ) self._frame = frame - def get_bandwidth(self) -> float: - return sum(self._byte_frame_window) + # Not very useful, but it's here for the sake of completeness + # def get_frame(self) -> np.ndarray: + # return self._frame + + +class Stream(StreamBase): + def __init__( + self, + name: str, + fps: int = 30, + size: Optional[Tuple[int, int]] = None, + quality: int = 50, + ) -> None: + self.size = size + self.quality = max(1, min(quality, 100)) + self._is_encoded = False + super().__init__(name, fps) + + async def __process_current_frame(self) -> np.ndarray: + if not self._is_encoded: + frame = cv2.resize( + self._frame, self.size or (self._frame.shape[1], self._frame.shape[0]) + ) + val, frame = cv2.imencode( + ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality] + ) + if not val: + raise ValueError("Error encoding frame") + self._is_encoded = True + return frame + return self._frame + + async def _get_frame(self) -> np.ndarray: + await self._ensure_background_tasks() + self._byte_frame_window.append(len(self._frame.tobytes())) + self._bandwidth_last_modified_time = time.time() + async with self._lock: + return await self.__process_current_frame() + + def set_size(self, size: Tuple[int, int]) -> None: + self.size = size + + def set_quality(self, quality: int) -> None: + self.quality = max(1, min(quality, 100)) + + def set_frame(self, frame: np.ndarray) -> None: + self._is_encoded = False + if self._check_encoding(frame) == "jpeg": + print( + "The frame is already encoded, will not encode again. Consider using CustomStream if you want to handle the processing yourself." + ) + self._is_encoded = True + self._frame = frame + + # def get_frame(self) -> np.ndarray: + # return super().get_frame() + + +class CustomStream(StreamBase): + # Same as StreamBase, but with a friendly name + def __init__( + self, + name: str, + fps: int = 30, + ) -> None: + super().__init__(name, fps) + + +class ManagedStream(StreamBase): + def __init__( + self, + name: str, + fps: int = 30, + size: Optional[Tuple[int, int]] = None, + quality: int = 50, + source: Union[int, str] = 0, + mode: str = "regular", + poll_delay: Optional[float] = None, + ) -> None: + self.mode = mode + self._available_modes: List[str,] = ["regular", "od", "fast-od"] + if self.mode not in self._available_modes: + raise ValueError(f"Invalid mode. Available modes: {self._available_modes}") + self.size = size + self.quality = max(1, min(quality, 100)) + self.source = source + self.poll_delay = poll_delay or fps + self._cap_is_open = False + self._cap: Optional[cv2.VideoCapture] = None + self._cap_background_task: Optional[asyncio.Task] = None + super().__init__(name, fps) + + async def _ensure_background_tasks(self) -> None: + await super()._ensure_background_tasks() + if self._cap_background_task is None or self._cap_background_task.done(): + print("Creating a new background task") + await self.__open_cap() + self._cap_background_task = asyncio.create_task(self.__manage_cap_state()) + + async def __manage_cap_state(self) -> None: + while True: + await asyncio.sleep(self.poll_delay) + if self.mode == "od": + if self.has_demand() and not self._cap_is_open: + await self.__open_cap() + elif not self.has_demand() and self._cap_is_open: + await self.__close_cap() + + async def __open_cap(self) -> None: + if not self._cap_is_open: + self._cap = cv2.VideoCapture(self.source) + if not self._cap.isOpened(): + raise ValueError("Cannot open the capture device") + self._cap_is_open = True + + async def __close_cap(self) -> None: + if self._cap_is_open: + self._cap.release() + self._cap_is_open = False - def __process_current_frame(self) -> np.ndarray: + async def __read_frame(self) -> np.ndarray: + if self._cap_is_open: + val, frame = self._cap.read() + if not val: + raise ValueError("Error reading frame") + return frame + else: + print("Capture device is not open") + self.__open_cap() + + async def __process_current_frame(self) -> np.ndarray: + if self.mode in ["od", "fast-od"] and not self.has_demand(): + print("No demand, skipping frame") + return self._frame + # print("Processing frame") + frame = await self.__read_frame() frame = cv2.resize( self._frame, self.size or (self._frame.shape[1], self._frame.shape[0]) ) @@ -61,24 +237,43 @@ def __process_current_frame(self) -> np.ndarray: ) if not val: raise ValueError("Error encoding frame") - self._byte_frame_window.append(len(frame.tobytes())) - self._bandwidth_last_modified_time = time.time() return frame - async def get_frame(self) -> np.ndarray: - # A little hacky, if you have a better way, please let me know - await self.__ensure_background_task() + async def _get_frame(self) -> np.ndarray: + await self._ensure_background_tasks() + self._byte_frame_window.append(len(self._frame.tobytes())) + self._bandwidth_last_modified_time = time.time() async with self._lock: - return self._frame + # print("Returning frame") + return await self.__process_current_frame() - async def get_frame_processed(self) -> np.ndarray: - await self.__ensure_background_task() # Ditto - async with self._lock: - return self.__process_current_frame() + def set_size(self, size: Tuple[int, int]) -> None: + self.size = size + + def set_quality(self, quality: int) -> None: + self.quality = max(1, min(quality, 100)) + + def set_frame(self) -> None: + raise NotImplementedError( + "This method is not available for ManagedStream, use Stream or CustomStream instead." + ) + + def change_mode(self, mode: str) -> None: + if mode not in self._available_modes: + raise ValueError(f"Invalid mode. Available modes: {self._available_modes}") + self.mode = mode + + def change_source(self, source: Union[int, str]) -> None: + self.source = source + self.__close_cap() + self.__open_cap() + + def set_poll_delay(self, poll_delay: float) -> None: + self.poll_delay = poll_delay class _StreamHandler: - def __init__(self, stream: Stream) -> None: + def __init__(self, stream: StreamBase) -> None: self._stream = stream async def __call__(self, request: web.Request) -> web.StreamResponse: @@ -93,7 +288,7 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: while True: await asyncio.sleep(1 / self._stream.fps) - frame = await self._stream.get_frame_processed() + frame = await self._stream._get_frame() with MultipartWriter("image/jpeg", boundary="image-boundary") as mpwriter: mpwriter.append(frame.tobytes(), {"Content-Type": "image/jpeg"}) try: @@ -136,7 +331,7 @@ async def __root_handler(self, _) -> web.Response: text += f"{route}\n
\n" return aiohttp.web.Response(text=text, content_type="text/html") - def add_stream(self, stream: Stream) -> None: + def add_stream(self, stream: StreamBase) -> None: if self.is_running(): raise RuntimeError("Cannot add stream after the server has started") route = f"/{stream.name}" diff --git a/test.py b/test.py new file mode 100644 index 0000000..730c4c2 --- /dev/null +++ b/test.py @@ -0,0 +1,37 @@ +import cv2 + +from mjpeg_streamer.mjpeg_streamer import MjpegServer, Stream + +# Create a server +server = MjpegServer() + +# # Create a stream +# stream = ManagedStream('test') + +# # Add the stream to the server +# server.add_stream(stream) + +# # Start the server +# server.start() + +# while True: +# pass + +# Create a stream +stream = Stream("test") + +# Add the stream to the server +server.add_stream(stream) + +# Start the server +server.start() +cap = cv2.VideoCapture(0) + +while True: + _, frame = cap.read() + stream.set_frame(frame) + cv2.imshow("frame", frame) + + +server.stop() +cap.release() From 3b1700edd4c9f61537bbabe531caed2aac86af9c Mon Sep 17 00:00:00 2001 From: egeakman Date: Sun, 25 Feb 2024 05:12:20 +0300 Subject: [PATCH 05/27] [Lib] The classes look alright --- mjpeg_streamer/mjpeg_streamer.py | 102 ++++++++++++++++++------------- test.py | 37 ----------- 2 files changed, 60 insertions(+), 79 deletions(-) delete mode 100644 test.py diff --git a/mjpeg_streamer/mjpeg_streamer.py b/mjpeg_streamer/mjpeg_streamer.py index 5c41b58..397ce5d 100644 --- a/mjpeg_streamer/mjpeg_streamer.py +++ b/mjpeg_streamer/mjpeg_streamer.py @@ -1,6 +1,7 @@ import asyncio import threading import time +import uuid from collections import deque from typing import List, Optional, Tuple, Union @@ -30,8 +31,17 @@ def __init__( self._byte_frame_window: deque = deque(maxlen=fps) self._bandwidth_last_modified_time: float = time.time() self._deque_background_task: Optional[asyncio.Task] = None + self._active_viewers: set = set() self.settings() + async def _add_viewer(self): + viewer_token = str(uuid.uuid4()) + self._active_viewers.add(viewer_token) + return viewer_token + + async def _remove_viewer(self, viewer_token): + self._active_viewers.discard(viewer_token) + async def __clear_deque(self) -> None: while True: await asyncio.sleep(1 / self.fps) @@ -69,7 +79,7 @@ def settings(self): print(f"{key}: {value}") def has_demand(self) -> bool: - return len(self._byte_frame_window) > 0 + return len(self._active_viewers) > 0 def get_bandwidth(self) -> float: return sum(self._byte_frame_window) @@ -94,8 +104,8 @@ def set_frame(self, frame: np.ndarray) -> None: self._frame = frame # Not very useful, but it's here for the sake of completeness - # def get_frame(self) -> np.ndarray: - # return self._frame + def get_frame(self) -> np.ndarray: + return self._frame class Stream(StreamBase): @@ -121,14 +131,13 @@ async def __process_current_frame(self) -> np.ndarray: ) if not val: raise ValueError("Error encoding frame") - self._is_encoded = True + self._byte_frame_window.append(len(frame.tobytes())) + self._bandwidth_last_modified_time = time.time() return frame return self._frame async def _get_frame(self) -> np.ndarray: await self._ensure_background_tasks() - self._byte_frame_window.append(len(self._frame.tobytes())) - self._bandwidth_last_modified_time = time.time() async with self._lock: return await self.__process_current_frame() @@ -147,8 +156,8 @@ def set_frame(self, frame: np.ndarray) -> None: self._is_encoded = True self._frame = frame - # def get_frame(self) -> np.ndarray: - # return super().get_frame() + def get_frame(self) -> np.ndarray: + return super().get_frame() class CustomStream(StreamBase): @@ -169,17 +178,17 @@ def __init__( size: Optional[Tuple[int, int]] = None, quality: int = 50, source: Union[int, str] = 0, - mode: str = "regular", - poll_delay: Optional[float] = None, + mode: str = "fast-on-demand", + poll_delay_ms: Optional[float] = None, ) -> None: self.mode = mode - self._available_modes: List[str,] = ["regular", "od", "fast-od"] + self._available_modes: List[str,] = ["fast-on-demand", "full-on-demand"] if self.mode not in self._available_modes: raise ValueError(f"Invalid mode. Available modes: {self._available_modes}") self.size = size self.quality = max(1, min(quality, 100)) self.source = source - self.poll_delay = poll_delay or fps + self.poll_delay_ms = poll_delay_ms / 1000 or 1 / fps self._cap_is_open = False self._cap: Optional[cv2.VideoCapture] = None self._cap_background_task: Optional[asyncio.Task] = None @@ -188,27 +197,28 @@ def __init__( async def _ensure_background_tasks(self) -> None: await super()._ensure_background_tasks() if self._cap_background_task is None or self._cap_background_task.done(): - print("Creating a new background task") - await self.__open_cap() self._cap_background_task = asyncio.create_task(self.__manage_cap_state()) async def __manage_cap_state(self) -> None: while True: - await asyncio.sleep(self.poll_delay) - if self.mode == "od": + await asyncio.sleep(self.poll_delay_ms) + if self.mode == "full-on-demand": if self.has_demand() and not self._cap_is_open: - await self.__open_cap() + self.__open_cap() elif not self.has_demand() and self._cap_is_open: - await self.__close_cap() + self.__close_cap() + else: + if not self._cap_is_open: + self.__open_cap() - async def __open_cap(self) -> None: + def __open_cap(self) -> None: if not self._cap_is_open: self._cap = cv2.VideoCapture(self.source) if not self._cap.isOpened(): raise ValueError("Cannot open the capture device") self._cap_is_open = True - async def __close_cap(self) -> None: + def __close_cap(self) -> None: if self._cap_is_open: self._cap.release() self._cap_is_open = False @@ -218,17 +228,14 @@ async def __read_frame(self) -> np.ndarray: val, frame = self._cap.read() if not val: raise ValueError("Error reading frame") - return frame + self._frame = frame else: - print("Capture device is not open") self.__open_cap() async def __process_current_frame(self) -> np.ndarray: - if self.mode in ["od", "fast-od"] and not self.has_demand(): - print("No demand, skipping frame") + if not self.has_demand(): return self._frame - # print("Processing frame") - frame = await self.__read_frame() + await self.__read_frame() frame = cv2.resize( self._frame, self.size or (self._frame.shape[1], self._frame.shape[0]) ) @@ -237,14 +244,13 @@ async def __process_current_frame(self) -> np.ndarray: ) if not val: raise ValueError("Error encoding frame") + self._byte_frame_window.append(len(frame.tobytes())) + self._bandwidth_last_modified_time = time.time() return frame async def _get_frame(self) -> np.ndarray: await self._ensure_background_tasks() - self._byte_frame_window.append(len(self._frame.tobytes())) - self._bandwidth_last_modified_time = time.time() async with self._lock: - # print("Returning frame") return await self.__process_current_frame() def set_size(self, size: Tuple[int, int]) -> None: @@ -268,8 +274,8 @@ def change_source(self, source: Union[int, str]) -> None: self.__close_cap() self.__open_cap() - def set_poll_delay(self, poll_delay: float) -> None: - self.poll_delay = poll_delay + def set_poll_delay_ms(self, poll_delay_ms: float) -> None: + self.poll_delay_ms = poll_delay_ms / 1000 class _StreamHandler: @@ -277,6 +283,7 @@ def __init__(self, stream: StreamBase) -> None: self._stream = stream async def __call__(self, request: web.Request) -> web.StreamResponse: + viewer_token = request.cookies.get("viewer_token") response = web.StreamResponse( status=200, reason="OK", @@ -285,17 +292,28 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: }, ) await response.prepare(request) - - while True: - await asyncio.sleep(1 / self._stream.fps) - frame = await self._stream._get_frame() - with MultipartWriter("image/jpeg", boundary="image-boundary") as mpwriter: - mpwriter.append(frame.tobytes(), {"Content-Type": "image/jpeg"}) - try: - await mpwriter.write(response, close_boundary=False) - except (ConnectionResetError, ConnectionAbortedError): - return web.Response(status=499, text="Client closed the connection") - await response.write(b"\r\n") + if not viewer_token: + viewer_token = await self._stream._add_viewer() + response.set_cookie("viewer_token", viewer_token) + else: + if viewer_token not in self._stream._active_viewers: + await self._stream._add_viewer(viewer_token) + try: + while True: + await asyncio.sleep(1 / self._stream.fps) + frame = await self._stream._get_frame() + with MultipartWriter( + "image/jpeg", boundary="image-boundary" + ) as mpwriter: + mpwriter.append(frame.tobytes(), {"Content-Type": "image/jpeg"}) + try: + await mpwriter.write(response, close_boundary=False) + except (ConnectionResetError, ConnectionAbortedError): + break + await response.write(b"\r\n") + finally: + await self._stream._remove_viewer(viewer_token) + return response class MjpegServer: diff --git a/test.py b/test.py deleted file mode 100644 index 730c4c2..0000000 --- a/test.py +++ /dev/null @@ -1,37 +0,0 @@ -import cv2 - -from mjpeg_streamer.mjpeg_streamer import MjpegServer, Stream - -# Create a server -server = MjpegServer() - -# # Create a stream -# stream = ManagedStream('test') - -# # Add the stream to the server -# server.add_stream(stream) - -# # Start the server -# server.start() - -# while True: -# pass - -# Create a stream -stream = Stream("test") - -# Add the stream to the server -server.add_stream(stream) - -# Start the server -server.start() -cap = cv2.VideoCapture(0) - -while True: - _, frame = cap.read() - stream.set_frame(frame) - cv2.imshow("frame", frame) - - -server.stop() -cap.release() From 49a83fe724a9678023326c24eee6b7eb6a4c6e4c Mon Sep 17 00:00:00 2001 From: egeakman Date: Sun, 25 Feb 2024 06:15:47 +0300 Subject: [PATCH 06/27] [Lib] typing + code quality --- mjpeg_streamer/mjpeg_streamer.py | 91 +++++++++++++++++--------------- test.py | 65 +++++++++++++++++++++++ 2 files changed, 112 insertions(+), 44 deletions(-) create mode 100644 test.py diff --git a/mjpeg_streamer/mjpeg_streamer.py b/mjpeg_streamer/mjpeg_streamer.py index 397ce5d..4327549 100644 --- a/mjpeg_streamer/mjpeg_streamer.py +++ b/mjpeg_streamer/mjpeg_streamer.py @@ -3,7 +3,7 @@ import time import uuid from collections import deque -from typing import List, Optional, Tuple, Union +from typing import Deque, List, Optional, Set, Tuple, Union import aiohttp import cv2 @@ -11,6 +11,7 @@ import numpy as np from aiohttp import MultipartWriter, web from aiohttp.web_runner import GracefulExit +from multidict import MultiDict class StreamBase: @@ -23,23 +24,21 @@ def __init__( raise TypeError( "StreamBase is an abstract class and cannot be instantiated" ) - else: - self.name = name.lower().casefold().replace(" ", "_") - self.fps = fps - self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) - self._lock: asyncio.Lock = asyncio.Lock() - self._byte_frame_window: deque = deque(maxlen=fps) - self._bandwidth_last_modified_time: float = time.time() - self._deque_background_task: Optional[asyncio.Task] = None - self._active_viewers: set = set() - self.settings() - - async def _add_viewer(self): - viewer_token = str(uuid.uuid4()) + self.name = name.lower().casefold().replace(" ", "_") + self.fps = fps + self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) + self._lock: asyncio.Lock = asyncio.Lock() + self._byte_frame_window: Deque[int,] = deque(maxlen=fps) + self._bandwidth_last_modified_time: float = time.time() + self._deque_background_task: Optional[asyncio.Task] = None + self._active_viewers: Set[str,] = set() + + async def _add_viewer(self, viewer_token: Optional[str] = None) -> str: + viewer_token = viewer_token or str(uuid.uuid4()) self._active_viewers.add(viewer_token) return viewer_token - async def _remove_viewer(self, viewer_token): + async def _remove_viewer(self, viewer_token: str) -> None: self._active_viewers.discard(viewer_token) async def __clear_deque(self) -> None: @@ -49,7 +48,7 @@ async def __clear_deque(self) -> None: len(self._byte_frame_window) > 0 and time.time() - self._bandwidth_last_modified_time >= 1 ): - deque.clear(self._byte_frame_window) + self._byte_frame_window.clear() async def _ensure_background_tasks(self) -> None: if self._deque_background_task is None or self._deque_background_task.done(): @@ -65,14 +64,12 @@ def _check_encoding(self, frame: np.ndarray) -> str: and frame[-1] == 217 ): return "jpeg" - else: - return "one-dim-non-jpeg" - elif isinstance(frame, np.ndarray): + return "one-dim-non-jpeg" + if isinstance(frame, np.ndarray): return "multi-dim" - else: - return "unknown" + return "unknown" - def settings(self): + def settings(self) -> None: for key, value in self.__dict__.items(): if key.startswith("_"): continue @@ -151,7 +148,8 @@ def set_frame(self, frame: np.ndarray) -> None: self._is_encoded = False if self._check_encoding(frame) == "jpeg": print( - "The frame is already encoded, will not encode again. Consider using CustomStream if you want to handle the processing yourself." + "The frame is already encoded, will not encode again. \ + Consider using CustomStream if you want to handle the processing yourself." ) self._is_encoded = True self._frame = frame @@ -179,7 +177,7 @@ def __init__( quality: int = 50, source: Union[int, str] = 0, mode: str = "fast-on-demand", - poll_delay_ms: Optional[float] = None, + poll_delay_ms: Optional[Union[float, int]] = None, ) -> None: self.mode = mode self._available_modes: List[str,] = ["fast-on-demand", "full-on-demand"] @@ -188,9 +186,9 @@ def __init__( self.size = size self.quality = max(1, min(quality, 100)) self.source = source - self.poll_delay_ms = poll_delay_ms / 1000 or 1 / fps - self._cap_is_open = False - self._cap: Optional[cv2.VideoCapture] = None + self.poll_delay_ms = poll_delay_ms / 1000 if poll_delay_ms else 1 / fps + self._cap_is_open: bool = False + self._cap: cv2.VideoCapture = cv2.VideoCapture(self.source) self._cap_background_task: Optional[asyncio.Task] = None super().__init__(name, fps) @@ -207,9 +205,8 @@ async def __manage_cap_state(self) -> None: self.__open_cap() elif not self.has_demand() and self._cap_is_open: self.__close_cap() - else: - if not self._cap_is_open: - self.__open_cap() + elif not self._cap_is_open: + self.__open_cap() def __open_cap(self) -> None: if not self._cap_is_open: @@ -223,7 +220,7 @@ def __close_cap(self) -> None: self._cap.release() self._cap_is_open = False - async def __read_frame(self) -> np.ndarray: + async def __read_frame(self) -> None: if self._cap_is_open: val, frame = self._cap.read() if not val: @@ -259,7 +256,7 @@ def set_size(self, size: Tuple[int, int]) -> None: def set_quality(self, quality: int) -> None: self.quality = max(1, min(quality, 100)) - def set_frame(self) -> None: + def set_frame(self, frame: np.ndarray) -> None: raise NotImplementedError( "This method is not available for ManagedStream, use Stream or CustomStream instead." ) @@ -295,9 +292,8 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: if not viewer_token: viewer_token = await self._stream._add_viewer() response.set_cookie("viewer_token", viewer_token) - else: - if viewer_token not in self._stream._active_viewers: - await self._stream._add_viewer(viewer_token) + elif viewer_token not in self._stream._active_viewers: + await self._stream._add_viewer(viewer_token) try: while True: await asyncio.sleep(1 / self._stream.fps) @@ -305,7 +301,10 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: with MultipartWriter( "image/jpeg", boundary="image-boundary" ) as mpwriter: - mpwriter.append(frame.tobytes(), {"Content-Type": "image/jpeg"}) + mpwriter.append( + frame.tobytes(), + MultiDict({"Content-Type": "image/jpeg"}), + ) try: await mpwriter.write(response, close_boundary=False) except (ConnectionResetError, ConnectionAbortedError): @@ -317,9 +316,13 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: class MjpegServer: - def __init__(self, host: Union[str, int] = "localhost", port: int = 8080) -> None: + def __init__( + self, host: Union[str, List[str,]] = "localhost", port: int = 8080 + ) -> None: if isinstance(host, str) and host != "0.0.0.0": - self._host = [host] + self._host: List[str,] = [ + host, + ] elif isinstance(host, list): if "0.0.0.0" in host: host.remove("0.0.0.0") @@ -328,7 +331,7 @@ def __init__(self, host: Union[str, int] = "localhost", port: int = 8080) -> Non for iface in netifaces.interfaces() if netifaces.AF_INET in netifaces.ifaddresses(iface) ] - self._host = list(dict.fromkeys(host)) + self._host = list(set(host)) else: self._host = [ netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"] @@ -336,12 +339,12 @@ def __init__(self, host: Union[str, int] = "localhost", port: int = 8080) -> Non if netifaces.AF_INET in netifaces.ifaddresses(iface) ] self._port = port - self._app = web.Application() - self._app.is_running = False + self._app: web.Application = web.Application() + self._app_is_running: bool = False self._cap_routes: List[str,] = [] def is_running(self) -> bool: - return self._app.is_running + return self._app_is_running async def __root_handler(self, _) -> web.Response: text = "

Available streams:

" @@ -372,7 +375,7 @@ def start(self) -> None: if not self.is_running(): thread = threading.Thread(target=self.__start_func, daemon=True) thread.start() - self._app.is_running = True + self._app_is_running = True else: print("\nServer is already running\n") @@ -386,7 +389,7 @@ def start(self) -> None: def stop(self) -> None: if self.is_running(): - self._app.is_running = False + self._app_is_running = False print("\nStopping...\n") raise GracefulExit print("\nServer is not running\n") diff --git a/test.py b/test.py new file mode 100644 index 0000000..bac2d90 --- /dev/null +++ b/test.py @@ -0,0 +1,65 @@ +# import time + +# import cv2 + +# from mjpeg_streamer.mjpeg_streamer import ( +# CustomStream, +# ManagedStream, +# MjpegServer, +# Stream, +# ) + +# # Create a server +# server = MjpegServer("0.0.0.0") + +# # # Create a stream +# # stream = ManagedStream('test', mode="od") + +# # # Add the stream to the server +# # server.add_stream(stream) + +# # # Start the server +# # server.start() + +# # while True: +# # print(stream.get_bandwidth(), end="\r") + +# # Create a stream +# stream = ManagedStream( +# "test", +# fps=30, +# quality=50, +# size=(640, 480), +# source=0, +# mode="full-on-demand", +# poll_delay_ms=100, +# ) +# # stream = CustomStream('test') + +# # Add the stream to the server +# server.add_stream(stream) + +# # Start the server +# server.start() +# # cap = cv2.VideoCapture(0) + +# while True: +# print(round(stream.get_bandwidth() / 1024 / 1024, 2), end="\r") +# # _, frame = cap.read() +# # frame = cv2.resize(frame, (640, 480)) +# # frame = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 50])[1] +# # stream.set_frame(frame) +# # # print(stream.get_bandwidth()) +# # print(round(stream.get_bandwidth() / 1024 / 1024, 2), end="\r") +# # # print(stream.settings()) + + +# server.stop() +# # cap.release() + + +from mjpeg_streamer import MjpegServer + +server = MjpegServer(["192.168.1.4", "127.0.0.1", "192.168.1.4"], 8080) + +server.start() From 085117ff57aa3c691b6cd2812ab44fd1ea1bcc31 Mon Sep 17 00:00:00 2001 From: egeakman Date: Sun, 25 Feb 2024 06:22:02 +0300 Subject: [PATCH 07/27] [Lib] Update __init__.py --- .gitignore | 2 ++ mjpeg_streamer/__init__.py | 4 +-- test.py | 65 -------------------------------------- 3 files changed, 4 insertions(+), 67 deletions(-) delete mode 100644 test.py diff --git a/.gitignore b/.gitignore index a07a66d..3f61268 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ build/ .mypy_cache/ .pytest_cache/ .ruff_cache/ +test*.py +!test-*.py diff --git a/mjpeg_streamer/__init__.py b/mjpeg_streamer/__init__.py index a74aec0..360c7dd 100644 --- a/mjpeg_streamer/__init__.py +++ b/mjpeg_streamer/__init__.py @@ -1,4 +1,4 @@ -from .mjpeg_streamer import MjpegServer, Stream +from .mjpeg_streamer import CustomStream, ManagedStream, MjpegServer, Stream -__all__ = ["MjpegServer", "Stream"] +__all__ = ["MjpegServer", "Stream", "CustomStream", "ManagedStream"] __version__ = "2024.2.8" diff --git a/test.py b/test.py deleted file mode 100644 index bac2d90..0000000 --- a/test.py +++ /dev/null @@ -1,65 +0,0 @@ -# import time - -# import cv2 - -# from mjpeg_streamer.mjpeg_streamer import ( -# CustomStream, -# ManagedStream, -# MjpegServer, -# Stream, -# ) - -# # Create a server -# server = MjpegServer("0.0.0.0") - -# # # Create a stream -# # stream = ManagedStream('test', mode="od") - -# # # Add the stream to the server -# # server.add_stream(stream) - -# # # Start the server -# # server.start() - -# # while True: -# # print(stream.get_bandwidth(), end="\r") - -# # Create a stream -# stream = ManagedStream( -# "test", -# fps=30, -# quality=50, -# size=(640, 480), -# source=0, -# mode="full-on-demand", -# poll_delay_ms=100, -# ) -# # stream = CustomStream('test') - -# # Add the stream to the server -# server.add_stream(stream) - -# # Start the server -# server.start() -# # cap = cv2.VideoCapture(0) - -# while True: -# print(round(stream.get_bandwidth() / 1024 / 1024, 2), end="\r") -# # _, frame = cap.read() -# # frame = cv2.resize(frame, (640, 480)) -# # frame = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 50])[1] -# # stream.set_frame(frame) -# # # print(stream.get_bandwidth()) -# # print(round(stream.get_bandwidth() / 1024 / 1024, 2), end="\r") -# # # print(stream.settings()) - - -# server.stop() -# # cap.release() - - -from mjpeg_streamer import MjpegServer - -server = MjpegServer(["192.168.1.4", "127.0.0.1", "192.168.1.4"], 8080) - -server.start() From 5844b636de61fa3e99cf29113fe4523dff881fcc Mon Sep 17 00:00:00 2001 From: Ege Akman Date: Sun, 25 Feb 2024 06:25:49 +0300 Subject: [PATCH 08/27] Update __init__.py --- mjpeg_streamer/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mjpeg_streamer/__init__.py b/mjpeg_streamer/__init__.py index 360c7dd..74171ec 100644 --- a/mjpeg_streamer/__init__.py +++ b/mjpeg_streamer/__init__.py @@ -1,4 +1,4 @@ from .mjpeg_streamer import CustomStream, ManagedStream, MjpegServer, Stream -__all__ = ["MjpegServer", "Stream", "CustomStream", "ManagedStream"] +__all__ = ["CustomStream", "ManagedStream", "MjpegServer", "Stream"] __version__ = "2024.2.8" From 7fa3b09c79d77c7e17ece9c7b0dc2040bda99a33 Mon Sep 17 00:00:00 2001 From: Ege Akman Date: Sun, 25 Feb 2024 06:45:52 +0300 Subject: [PATCH 09/27] Update requirements-dev.txt --- requirements-dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-dev.txt b/requirements-dev.txt index e7061e7..82ed91f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,4 @@ hatch +mypy pre-commit ruff From b9f9e27a813bd9eab33a943744cf3be9f5e01569 Mon Sep 17 00:00:00 2001 From: egeakman Date: Sun, 10 Mar 2024 12:50:59 +0300 Subject: [PATCH 10/27] Update StreamBase._check_encoding --- .gitignore | 3 ++- mjpeg_streamer/mjpeg_streamer.py | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 3f61268..63bdc33 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ build/ .pytest_cache/ .ruff_cache/ test*.py -!test-*.py +!test_*.py +test-perf/ diff --git a/mjpeg_streamer/mjpeg_streamer.py b/mjpeg_streamer/mjpeg_streamer.py index 4327549..cbe3012 100644 --- a/mjpeg_streamer/mjpeg_streamer.py +++ b/mjpeg_streamer/mjpeg_streamer.py @@ -88,16 +88,17 @@ def set_fps(self, fps: int) -> None: async def _get_frame(self) -> np.ndarray: # A little hacky, if you have a better way, please let me know await self._ensure_background_tasks() + # Checking here to avoid continous polling + if self._check_encoding(self._frame) != "jpeg": + raise ValueError( + "Input is not an encoded JPEG frame. Use OpenCV's imencode method to encode the frame to JPEG." + ) self._byte_frame_window.append(len(self._frame.tobytes())) self._bandwidth_last_modified_time = time.time() async with self._lock: return self._frame def set_frame(self, frame: np.ndarray) -> None: - if self._check_encoding(frame) != "jpeg": - raise ValueError( - "Input is not an encoded JPEG frame. Use OpenCV's imencode method to encode the frame to JPEG." - ) self._frame = frame # Not very useful, but it's here for the sake of completeness From 51a47f8cdbf350165c8a6a597f1045ae7850c17a Mon Sep 17 00:00:00 2001 From: egeakman Date: Sun, 10 Mar 2024 23:54:08 +0300 Subject: [PATCH 11/27] Seperate files --- mjpeg_streamer/__init__.py | 3 +- mjpeg_streamer/server.py | 131 +++++++++++++++ .../{mjpeg_streamer.py => stream.py} | 158 +++--------------- 3 files changed, 160 insertions(+), 132 deletions(-) create mode 100644 mjpeg_streamer/server.py rename mjpeg_streamer/{mjpeg_streamer.py => stream.py} (63%) diff --git a/mjpeg_streamer/__init__.py b/mjpeg_streamer/__init__.py index 74171ec..5ed16c0 100644 --- a/mjpeg_streamer/__init__.py +++ b/mjpeg_streamer/__init__.py @@ -1,4 +1,5 @@ -from .mjpeg_streamer import CustomStream, ManagedStream, MjpegServer, Stream +from .server import MjpegServer +from .stream import CustomStream, ManagedStream, Stream __all__ = ["CustomStream", "ManagedStream", "MjpegServer", "Stream"] __version__ = "2024.2.8" diff --git a/mjpeg_streamer/server.py b/mjpeg_streamer/server.py new file mode 100644 index 0000000..d7b0fa9 --- /dev/null +++ b/mjpeg_streamer/server.py @@ -0,0 +1,131 @@ +import asyncio +import threading +from typing import List, Union + +import aiohttp +import netifaces +from aiohttp import MultipartWriter, web +from aiohttp.web_runner import GracefulExit +from multidict import MultiDict + +from .stream import StreamBase + + +class _StreamHandler: + def __init__(self, stream: StreamBase) -> None: + self._stream = stream + + async def __call__(self, request: web.Request) -> web.StreamResponse: + viewer_token = request.cookies.get("viewer_token") + response = web.StreamResponse( + status=200, + reason="OK", + headers={ + "Content-Type": "multipart/x-mixed-replace;boundary=image-boundary" + }, + ) + await response.prepare(request) + if not viewer_token: + viewer_token = await self._stream._add_viewer() + response.set_cookie("viewer_token", viewer_token) + elif viewer_token not in self._stream._active_viewers: + await self._stream._add_viewer(viewer_token) + try: + while True: + await asyncio.sleep(1 / self._stream.fps) + frame = await self._stream._get_frame() + with MultipartWriter( + "image/jpeg", boundary="image-boundary" + ) as mpwriter: + mpwriter.append( + frame.tobytes(), + MultiDict({"Content-Type": "image/jpeg"}), + ) + try: + await mpwriter.write(response, close_boundary=False) + except (ConnectionResetError, ConnectionAbortedError): + break + await response.write(b"\r\n") + finally: + await self._stream._remove_viewer(viewer_token) + return response + + +class MjpegServer: + def __init__( + self, host: Union[str, List[str,]] = "localhost", port: int = 8080 + ) -> None: + if isinstance(host, str) and host != "0.0.0.0": + self._host: List[str,] = [ + host, + ] + elif isinstance(host, list): + if "0.0.0.0" in host: + host.remove("0.0.0.0") + host = host + [ + netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"] + for iface in netifaces.interfaces() + if netifaces.AF_INET in netifaces.ifaddresses(iface) + ] + self._host = list(set(host)) + else: + self._host = [ + netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"] + for iface in netifaces.interfaces() + if netifaces.AF_INET in netifaces.ifaddresses(iface) + ] + self._port = port + self._app: web.Application = web.Application() + self._app_is_running: bool = False + self._cap_routes: List[str,] = [] + + def is_running(self) -> bool: + return self._app_is_running + + async def __root_handler(self, _) -> web.Response: + text = "

Available streams:

" + for route in self._cap_routes: + text += f"{route}\n
\n" + return aiohttp.web.Response(text=text, content_type="text/html") + + def add_stream(self, stream: StreamBase) -> None: + if self.is_running(): + raise RuntimeError("Cannot add stream after the server has started") + route = f"/{stream.name}" + if route in self._cap_routes: + raise ValueError(f"A stream with the name {route} already exists") + self._cap_routes.append(route) + self._app.router.add_route("GET", route, _StreamHandler(stream)) + + def __start_func(self) -> None: + self._app.router.add_route("GET", "/", self.__root_handler) + runner = web.AppRunner(self._app) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(runner.setup()) + site = web.TCPSite(runner, self._host, self._port) + loop.run_until_complete(site.start()) + loop.run_forever() + + def start(self) -> None: + if not self.is_running(): + thread = threading.Thread(target=self.__start_func, daemon=True) + thread.start() + self._app_is_running = True + else: + print("\nServer is already running\n") + + for addr in self._host: + print(f"\nStreams index: http://{addr}:{self._port!s}") + print("Available streams:\n") + for route in self._cap_routes: # route has a leading slash + print(f"http://{addr}:{self._port!s}{route}") + print("--------------------------------\n") + print("\nPress Ctrl+C to stop the server\n") + + def stop(self) -> None: + if self.is_running(): + self._app_is_running = False + print("\nStopping...\n") + raise GracefulExit + print("\nServer is not running\n") diff --git a/mjpeg_streamer/mjpeg_streamer.py b/mjpeg_streamer/stream.py similarity index 63% rename from mjpeg_streamer/mjpeg_streamer.py rename to mjpeg_streamer/stream.py index cbe3012..266be99 100644 --- a/mjpeg_streamer/mjpeg_streamer.py +++ b/mjpeg_streamer/stream.py @@ -1,17 +1,11 @@ import asyncio -import threading import time import uuid from collections import deque from typing import Deque, List, Optional, Set, Tuple, Union -import aiohttp import cv2 -import netifaces import numpy as np -from aiohttp import MultipartWriter, web -from aiohttp.web_runner import GracefulExit -from multidict import MultiDict class StreamBase: @@ -28,7 +22,7 @@ def __init__( self.fps = fps self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) self._lock: asyncio.Lock = asyncio.Lock() - self._byte_frame_window: Deque[int,] = deque(maxlen=fps) + self._frame_buffer: Deque[int,] = deque(maxlen=fps) self._bandwidth_last_modified_time: float = time.time() self._deque_background_task: Optional[asyncio.Task] = None self._active_viewers: Set[str,] = set() @@ -45,10 +39,10 @@ async def __clear_deque(self) -> None: while True: await asyncio.sleep(1 / self.fps) if ( - len(self._byte_frame_window) > 0 + len(self._frame_buffer) > 0 and time.time() - self._bandwidth_last_modified_time >= 1 ): - self._byte_frame_window.clear() + self._frame_buffer.clear() async def _ensure_background_tasks(self) -> None: if self._deque_background_task is None or self._deque_background_task.done(): @@ -78,8 +72,11 @@ def settings(self) -> None: def has_demand(self) -> bool: return len(self._active_viewers) > 0 + def active_viewers(self) -> int: + return len(self._active_viewers) + def get_bandwidth(self) -> float: - return sum(self._byte_frame_window) + return sum(self._frame_buffer) def set_fps(self, fps: int) -> None: self.fps = fps @@ -93,7 +90,7 @@ async def _get_frame(self) -> np.ndarray: raise ValueError( "Input is not an encoded JPEG frame. Use OpenCV's imencode method to encode the frame to JPEG." ) - self._byte_frame_window.append(len(self._frame.tobytes())) + self._frame_buffer.append(len(self._frame.tobytes())) self._bandwidth_last_modified_time = time.time() async with self._lock: return self._frame @@ -129,7 +126,7 @@ async def __process_current_frame(self) -> np.ndarray: ) if not val: raise ValueError("Error encoding frame") - self._byte_frame_window.append(len(frame.tobytes())) + self._frame_buffer.append(len(frame.tobytes())) self._bandwidth_last_modified_time = time.time() return frame return self._frame @@ -149,7 +146,7 @@ def set_frame(self, frame: np.ndarray) -> None: self._is_encoded = False if self._check_encoding(frame) == "jpeg": print( - "The frame is already encoded, will not encode again. \ + "The frame is already encoded, I will not encode it again. \ Consider using CustomStream if you want to handle the processing yourself." ) self._is_encoded = True @@ -191,6 +188,7 @@ def __init__( self._cap_is_open: bool = False self._cap: cv2.VideoCapture = cv2.VideoCapture(self.source) self._cap_background_task: Optional[asyncio.Task] = None + self._is_running: bool = False super().__init__(name, fps) async def _ensure_background_tasks(self) -> None: @@ -242,11 +240,13 @@ async def __process_current_frame(self) -> np.ndarray: ) if not val: raise ValueError("Error encoding frame") - self._byte_frame_window.append(len(frame.tobytes())) + self._frame_buffer.append(len(frame.tobytes())) self._bandwidth_last_modified_time = time.time() return frame async def _get_frame(self) -> np.ndarray: + if not self._is_running: + raise RuntimeError("Stream has not started yet") await self._ensure_background_tasks() async with self._lock: return await self.__process_current_frame() @@ -269,128 +269,24 @@ def change_mode(self, mode: str) -> None: def change_source(self, source: Union[int, str]) -> None: self.source = source - self.__close_cap() - self.__open_cap() + if self._cap_is_open: + self.__close_cap() + self.__open_cap() def set_poll_delay_ms(self, poll_delay_ms: float) -> None: self.poll_delay_ms = poll_delay_ms / 1000 - -class _StreamHandler: - def __init__(self, stream: StreamBase) -> None: - self._stream = stream - - async def __call__(self, request: web.Request) -> web.StreamResponse: - viewer_token = request.cookies.get("viewer_token") - response = web.StreamResponse( - status=200, - reason="OK", - headers={ - "Content-Type": "multipart/x-mixed-replace;boundary=image-boundary" - }, - ) - await response.prepare(request) - if not viewer_token: - viewer_token = await self._stream._add_viewer() - response.set_cookie("viewer_token", viewer_token) - elif viewer_token not in self._stream._active_viewers: - await self._stream._add_viewer(viewer_token) - try: - while True: - await asyncio.sleep(1 / self._stream.fps) - frame = await self._stream._get_frame() - with MultipartWriter( - "image/jpeg", boundary="image-boundary" - ) as mpwriter: - mpwriter.append( - frame.tobytes(), - MultiDict({"Content-Type": "image/jpeg"}), - ) - try: - await mpwriter.write(response, close_boundary=False) - except (ConnectionResetError, ConnectionAbortedError): - break - await response.write(b"\r\n") - finally: - await self._stream._remove_viewer(viewer_token) - return response - - -class MjpegServer: - def __init__( - self, host: Union[str, List[str,]] = "localhost", port: int = 8080 - ) -> None: - if isinstance(host, str) and host != "0.0.0.0": - self._host: List[str,] = [ - host, - ] - elif isinstance(host, list): - if "0.0.0.0" in host: - host.remove("0.0.0.0") - host = host + [ - netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"] - for iface in netifaces.interfaces() - if netifaces.AF_INET in netifaces.ifaddresses(iface) - ] - self._host = list(set(host)) - else: - self._host = [ - netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"] - for iface in netifaces.interfaces() - if netifaces.AF_INET in netifaces.ifaddresses(iface) - ] - self._port = port - self._app: web.Application = web.Application() - self._app_is_running: bool = False - self._cap_routes: List[str,] = [] - - def is_running(self) -> bool: - return self._app_is_running - - async def __root_handler(self, _) -> web.Response: - text = "

Available streams:

" - for route in self._cap_routes: - text += f"{route}\n
\n" - return aiohttp.web.Response(text=text, content_type="text/html") - - def add_stream(self, stream: StreamBase) -> None: - if self.is_running(): - raise RuntimeError("Cannot add stream after the server has started") - route = f"/{stream.name}" - if route in self._cap_routes: - raise ValueError(f"A stream with the name {route} already exists") - self._cap_routes.append(route) - self._app.router.add_route("GET", route, _StreamHandler(stream)) - - def __start_func(self) -> None: - self._app.router.add_route("GET", "/", self.__root_handler) - runner = web.AppRunner(self._app) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(runner.setup()) - site = web.TCPSite(runner, self._host, self._port) - loop.run_until_complete(site.start()) - loop.run_forever() - def start(self) -> None: - if not self.is_running(): - thread = threading.Thread(target=self.__start_func, daemon=True) - thread.start() - self._app_is_running = True + if not self._is_running: + self._is_running = True else: - print("\nServer is already running\n") - - for addr in self._host: - print(f"\nStreams index: http://{addr}:{self._port!s}") - print("Available streams:\n") - for route in self._cap_routes: # route has a leading slash - print(f"http://{addr}:{self._port!s}{route}") - print("--------------------------------\n") - print("\nPress Ctrl+C to stop the server\n") + raise RuntimeError("Stream has already started") def stop(self) -> None: - if self.is_running(): - self._app_is_running = False - print("\nStopping...\n") - raise GracefulExit - print("\nServer is not running\n") + if self._is_running: + self._is_running = False + self._cap_background_task.cancel() + if self._cap_is_open: + self.__close_cap() + else: + raise RuntimeError("Stream has already stopped") From 5bba6c34415b41b8232dd9f59d15c0bd23accae7 Mon Sep 17 00:00:00 2001 From: egeakman Date: Mon, 11 Mar 2024 02:12:45 +0300 Subject: [PATCH 12/27] update --- mjpeg_streamer/cli.py | 5 ++- mjpeg_streamer/server.py | 32 +++++++++--------- mjpeg_streamer/stream.py | 70 ++++++++++++++++++++-------------------- 3 files changed, 56 insertions(+), 51 deletions(-) diff --git a/mjpeg_streamer/cli.py b/mjpeg_streamer/cli.py index 5f70a4d..fb5f55f 100644 --- a/mjpeg_streamer/cli.py +++ b/mjpeg_streamer/cli.py @@ -1,3 +1,5 @@ +# WON'T WORK RIGHT NOW. USE THE MAIN BRANCH FOR THE CLI + import argparse import re import threading @@ -5,7 +7,8 @@ import cv2 -from mjpeg_streamer import MjpegServer, Stream +from .server import MjpegServer +from .stream import Stream def parse_args() -> argparse.Namespace: diff --git a/mjpeg_streamer/server.py b/mjpeg_streamer/server.py index d7b0fa9..fbf98a1 100644 --- a/mjpeg_streamer/server.py +++ b/mjpeg_streamer/server.py @@ -32,20 +32,20 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: await self._stream._add_viewer(viewer_token) try: while True: - await asyncio.sleep(1 / self._stream.fps) - frame = await self._stream._get_frame() - with MultipartWriter( - "image/jpeg", boundary="image-boundary" - ) as mpwriter: - mpwriter.append( - frame.tobytes(), - MultiDict({"Content-Type": "image/jpeg"}), - ) - try: + try: + await asyncio.sleep(1 / self._stream.fps) + frame = await self._stream._get_frame() + with MultipartWriter( + "image/jpeg", boundary="image-boundary" + ) as mpwriter: + mpwriter.append( + frame.tobytes(), + MultiDict({"Content-Type": "image/jpeg"}), + ) await mpwriter.write(response, close_boundary=False) - except (ConnectionResetError, ConnectionAbortedError): - break - await response.write(b"\r\n") + await response.write(b"\r\n") + except (ConnectionResetError, ConnectionAbortedError): + break finally: await self._stream._remove_viewer(viewer_token) return response @@ -127,5 +127,7 @@ def stop(self) -> None: if self.is_running(): self._app_is_running = False print("\nStopping...\n") - raise GracefulExit - print("\nServer is not running\n") + GracefulExit() + print("\nServer stopped\n") + else: + print("\nServer is not running\n") diff --git a/mjpeg_streamer/stream.py b/mjpeg_streamer/stream.py index 266be99..cc5b00c 100644 --- a/mjpeg_streamer/stream.py +++ b/mjpeg_streamer/stream.py @@ -113,23 +113,25 @@ def __init__( ) -> None: self.size = size self.quality = max(1, min(quality, 100)) - self._is_encoded = False super().__init__(name, fps) async def __process_current_frame(self) -> np.ndarray: - if not self._is_encoded: - frame = cv2.resize( - self._frame, self.size or (self._frame.shape[1], self._frame.shape[0]) - ) + frame = self._frame + if not self._check_encoding(frame) == "jpeg": + frame = cv2.resize(frame, self.size or (frame.shape[1], frame.shape[0])) val, frame = cv2.imencode( ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality] ) if not val: raise ValueError("Error encoding frame") - self._frame_buffer.append(len(frame.tobytes())) - self._bandwidth_last_modified_time = time.time() - return frame - return self._frame + else: + print( + "The frame is already encoded, I will not encode nor resize it again. \ +Consider using CustomStream if you want to handle the processing yourself." + ) + self._frame_buffer.append(len(frame.tobytes())) + self._bandwidth_last_modified_time = time.time() + return frame async def _get_frame(self) -> np.ndarray: await self._ensure_background_tasks() @@ -143,13 +145,6 @@ def set_quality(self, quality: int) -> None: self.quality = max(1, min(quality, 100)) def set_frame(self, frame: np.ndarray) -> None: - self._is_encoded = False - if self._check_encoding(frame) == "jpeg": - print( - "The frame is already encoded, I will not encode it again. \ - Consider using CustomStream if you want to handle the processing yourself." - ) - self._is_encoded = True self._frame = frame def get_frame(self) -> np.ndarray: @@ -170,23 +165,23 @@ class ManagedStream(StreamBase): def __init__( self, name: str, + source: Union[int, str] = 0, fps: int = 30, size: Optional[Tuple[int, int]] = None, quality: int = 50, - source: Union[int, str] = 0, mode: str = "fast-on-demand", poll_delay_ms: Optional[Union[float, int]] = None, ) -> None: + self.source = source self.mode = mode self._available_modes: List[str,] = ["fast-on-demand", "full-on-demand"] if self.mode not in self._available_modes: raise ValueError(f"Invalid mode. Available modes: {self._available_modes}") self.size = size self.quality = max(1, min(quality, 100)) - self.source = source - self.poll_delay_ms = poll_delay_ms / 1000 if poll_delay_ms else 1 / fps + self.poll_delay_seconds = poll_delay_ms / 1000 if poll_delay_ms else 1 / fps self._cap_is_open: bool = False - self._cap: cv2.VideoCapture = cv2.VideoCapture(self.source) + self._cap: cv2.VideoCapture = None self._cap_background_task: Optional[asyncio.Task] = None self._is_running: bool = False super().__init__(name, fps) @@ -198,7 +193,7 @@ async def _ensure_background_tasks(self) -> None: async def __manage_cap_state(self) -> None: while True: - await asyncio.sleep(self.poll_delay_ms) + await asyncio.sleep(self.poll_delay_seconds) if self.mode == "full-on-demand": if self.has_demand() and not self._cap_is_open: self.__open_cap() @@ -208,19 +203,19 @@ async def __manage_cap_state(self) -> None: self.__open_cap() def __open_cap(self) -> None: - if not self._cap_is_open: + if not self._cap_is_open and self._is_running: self._cap = cv2.VideoCapture(self.source) if not self._cap.isOpened(): raise ValueError("Cannot open the capture device") self._cap_is_open = True def __close_cap(self) -> None: - if self._cap_is_open: + if self._cap_is_open and self._is_running: self._cap.release() self._cap_is_open = False async def __read_frame(self) -> None: - if self._cap_is_open: + if self._cap_is_open and self._is_running: val, frame = self._cap.read() if not val: raise ValueError("Error reading frame") @@ -239,6 +234,11 @@ async def __process_current_frame(self) -> np.ndarray: ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality] ) if not val: + if self._cap.getBackendName() == "FFMPEG": + raise ValueError( + "Seems like you are using a video file as the source. \ +The media might have ended." + ) raise ValueError("Error encoding frame") self._frame_buffer.append(len(frame.tobytes())) self._bandwidth_last_modified_time = time.time() @@ -246,7 +246,8 @@ async def __process_current_frame(self) -> np.ndarray: async def _get_frame(self) -> np.ndarray: if not self._is_running: - raise RuntimeError("Stream has not started yet") + print("Stream is not running, please call the start method first.") + return self._frame await self._ensure_background_tasks() async with self._lock: return await self.__process_current_frame() @@ -264,29 +265,28 @@ def set_frame(self, frame: np.ndarray) -> None: def change_mode(self, mode: str) -> None: if mode not in self._available_modes: - raise ValueError(f"Invalid mode. Available modes: {self._available_modes}") + print(f"Invalid mode. Available modes: {self._available_modes}") self.mode = mode def change_source(self, source: Union[int, str]) -> None: self.source = source - if self._cap_is_open: - self.__close_cap() - self.__open_cap() + self.__close_cap() + self.__open_cap() def set_poll_delay_ms(self, poll_delay_ms: float) -> None: - self.poll_delay_ms = poll_delay_ms / 1000 + self.poll_delay_seconds = poll_delay_ms / 1000 def start(self) -> None: if not self._is_running: self._is_running = True else: - raise RuntimeError("Stream has already started") + print("Stream has already started") def stop(self) -> None: if self._is_running: - self._is_running = False self._cap_background_task.cancel() - if self._cap_is_open: - self.__close_cap() + self.__close_cap() + self._is_running = False + self._cap_is_open = False else: - raise RuntimeError("Stream has already stopped") + print("Stream has already stopped") From 568ae1daa223d483ff3d999c2c839f621622eea2 Mon Sep 17 00:00:00 2001 From: egeakman Date: Thu, 4 Apr 2024 00:13:14 +0300 Subject: [PATCH 13/27] Fix #12 and #13 --- mjpeg_streamer/__init__.py | 4 ++-- mjpeg_streamer/server.py | 28 +++++++++++++--------------- mjpeg_streamer/stream.py | 26 +++++++++++++++++--------- pyproject.toml | 1 - 4 files changed, 32 insertions(+), 27 deletions(-) diff --git a/mjpeg_streamer/__init__.py b/mjpeg_streamer/__init__.py index 5ed16c0..78c0667 100644 --- a/mjpeg_streamer/__init__.py +++ b/mjpeg_streamer/__init__.py @@ -1,5 +1,5 @@ -from .server import MjpegServer +from .server import MjpegServer, Server from .stream import CustomStream, ManagedStream, Stream -__all__ = ["CustomStream", "ManagedStream", "MjpegServer", "Stream"] +__all__ = ["CustomStream", "ManagedStream", "MjpegServer", "Stream", "Server"] __version__ = "2024.2.8" diff --git a/mjpeg_streamer/server.py b/mjpeg_streamer/server.py index fbf98a1..023a747 100644 --- a/mjpeg_streamer/server.py +++ b/mjpeg_streamer/server.py @@ -3,7 +3,6 @@ from typing import List, Union import aiohttp -import netifaces from aiohttp import MultipartWriter, web from aiohttp.web_runner import GracefulExit from multidict import MultiDict @@ -51,29 +50,20 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: return response -class MjpegServer: +class Server: def __init__( self, host: Union[str, List[str,]] = "localhost", port: int = 8080 ) -> None: - if isinstance(host, str) and host != "0.0.0.0": + if isinstance(host, str): self._host: List[str,] = [ host, ] elif isinstance(host, list): if "0.0.0.0" in host: - host.remove("0.0.0.0") - host = host + [ - netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"] - for iface in netifaces.interfaces() - if netifaces.AF_INET in netifaces.ifaddresses(iface) - ] + host = ["0.0.0.0"] + if "localhost" in host and "127.0.0.1" in host: + host.remove("localhost") self._host = list(set(host)) - else: - self._host = [ - netifaces.ifaddresses(iface)[netifaces.AF_INET][0]["addr"] - for iface in netifaces.interfaces() - if netifaces.AF_INET in netifaces.ifaddresses(iface) - ] self._port = port self._app: web.Application = web.Application() self._app_is_running: bool = False @@ -131,3 +121,11 @@ def stop(self) -> None: print("\nServer stopped\n") else: print("\nServer is not running\n") + + +class MjpegServer(Server): + # Alias for Server, to maintain backwards compatibility + def __init__( + self, host: Union[str, List[str,]] = "localhost", port: int = 8080 + ) -> None: + super().__init__(host, port) diff --git a/mjpeg_streamer/stream.py b/mjpeg_streamer/stream.py index cc5b00c..77bd595 100644 --- a/mjpeg_streamer/stream.py +++ b/mjpeg_streamer/stream.py @@ -27,14 +27,6 @@ def __init__( self._deque_background_task: Optional[asyncio.Task] = None self._active_viewers: Set[str,] = set() - async def _add_viewer(self, viewer_token: Optional[str] = None) -> str: - viewer_token = viewer_token or str(uuid.uuid4()) - self._active_viewers.add(viewer_token) - return viewer_token - - async def _remove_viewer(self, viewer_token: str) -> None: - self._active_viewers.discard(viewer_token) - async def __clear_deque(self) -> None: while True: await asyncio.sleep(1 / self.fps) @@ -44,6 +36,14 @@ async def __clear_deque(self) -> None: ): self._frame_buffer.clear() + async def _add_viewer(self, viewer_token: Optional[str] = None) -> str: + viewer_token = viewer_token or str(uuid.uuid4()) + self._active_viewers.add(viewer_token) + return viewer_token + + async def _remove_viewer(self, viewer_token: str) -> None: + self._active_viewers.discard(viewer_token) + async def _ensure_background_tasks(self) -> None: if self._deque_background_task is None or self._deque_background_task.done(): self._deque_background_task = asyncio.create_task(self.__clear_deque()) @@ -85,7 +85,8 @@ def set_fps(self, fps: int) -> None: async def _get_frame(self) -> np.ndarray: # A little hacky, if you have a better way, please let me know await self._ensure_background_tasks() - # Checking here to avoid continous polling + # Checking the encoding here instead of set_frame + # to avoid continous polling if self._check_encoding(self._frame) != "jpeg": raise ValueError( "Input is not an encoded JPEG frame. Use OpenCV's imencode method to encode the frame to JPEG." @@ -113,6 +114,7 @@ def __init__( ) -> None: self.size = size self.quality = max(1, min(quality, 100)) + self._last_processed_frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) super().__init__(name, fps) async def __process_current_frame(self) -> np.ndarray: @@ -131,9 +133,12 @@ async def __process_current_frame(self) -> np.ndarray: ) self._frame_buffer.append(len(frame.tobytes())) self._bandwidth_last_modified_time = time.time() + self._last_processed_frame = frame return frame async def _get_frame(self) -> np.ndarray: + if time.time() - self._bandwidth_last_modified_time <= 1 / self.fps: + return self._last_processed_frame await self._ensure_background_tasks() async with self._lock: return await self.__process_current_frame() @@ -242,12 +247,15 @@ async def __process_current_frame(self) -> np.ndarray: raise ValueError("Error encoding frame") self._frame_buffer.append(len(frame.tobytes())) self._bandwidth_last_modified_time = time.time() + self._last_processed_frame = frame return frame async def _get_frame(self) -> np.ndarray: if not self._is_running: print("Stream is not running, please call the start method first.") return self._frame + if time.time() - self._bandwidth_last_modified_time <= 1 / self.fps: + return self._last_processed_frame await self._ensure_background_tasks() async with self._lock: return await self.__process_current_frame() diff --git a/pyproject.toml b/pyproject.toml index 947807a..5bea27e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,6 @@ dependencies = [ 'aiohttp==3.8.6; python_version >= "3.6" and python_version <= "3.7"', 'aiohttp==3.9.1; python_version == "3.8"', 'aiohttp; python_version >= "3.9"', - "netifaces", 'numpy==1.19.5; python_version == "3.6"', 'numpy==1.21.6; python_version == "3.7"', 'numpy==1.24.4; python_version == "3.8"', From 1342a95f4beb3d027e0e3ef78e667dc3e634d0ad Mon Sep 17 00:00:00 2001 From: egeakman Date: Fri, 5 Apr 2024 05:41:14 +0300 Subject: [PATCH 14/27] Clean up a bit --- mjpeg_streamer/stream.py | 191 +++++++++++++++++---------------------- 1 file changed, 85 insertions(+), 106 deletions(-) diff --git a/mjpeg_streamer/stream.py b/mjpeg_streamer/stream.py index 77bd595..ae45724 100644 --- a/mjpeg_streamer/stream.py +++ b/mjpeg_streamer/stream.py @@ -14,41 +14,51 @@ def __init__( name: str, fps: int = 30, ) -> None: - if type(self) is StreamBase: - raise TypeError( - "StreamBase is an abstract class and cannot be instantiated" - ) - self.name = name.lower().casefold().replace(" ", "_") + self.name = name.casefold().replace(" ", "_") self.fps = fps self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) self._lock: asyncio.Lock = asyncio.Lock() - self._frame_buffer: Deque[int,] = deque(maxlen=fps) + self._frames_buffer: Deque[int] = deque(maxlen=fps) self._bandwidth_last_modified_time: float = time.time() - self._deque_background_task: Optional[asyncio.Task] = None - self._active_viewers: Set[str,] = set() + self._active_viewers: Set[str] = set() + self._bandwidth_background_task: Optional[asyncio.Task] = None + + def __new__(self, *args, **kwargs): + raise TypeError("Cannot instantiate an abstract class") + + async def _ensure_background_tasks(self) -> None: + if ( + self._bandwidth_background_task is None + or self._bandwidth_background_task.done() + ): + self._bandwidth_background_task = asyncio.create_task( + self.__clear_bandwidth() + ) - async def __clear_deque(self) -> None: + async def __clear_bandwidth(self) -> None: while True: - await asyncio.sleep(1 / self.fps) + await asyncio.sleep(1.0 / self.fps) if ( - len(self._frame_buffer) > 0 + len(self._frames_buffer) > 0 and time.time() - self._bandwidth_last_modified_time >= 1 ): - self._frame_buffer.clear() + self._frames_buffer.clear() async def _add_viewer(self, viewer_token: Optional[str] = None) -> str: viewer_token = viewer_token or str(uuid.uuid4()) - self._active_viewers.add(viewer_token) + async with self._lock: + self._active_viewers.add(viewer_token) return viewer_token async def _remove_viewer(self, viewer_token: str) -> None: - self._active_viewers.discard(viewer_token) + async with self._lock: + self._active_viewers.discard(viewer_token) - async def _ensure_background_tasks(self) -> None: - if self._deque_background_task is None or self._deque_background_task.done(): - self._deque_background_task = asyncio.create_task(self.__clear_deque()) + async def _process_current_frame(self) -> np.ndarray: + self._last_processed_frame = self._frame + return self._frame - def _check_encoding(self, frame: np.ndarray) -> str: + async def __check_encoding(self, frame: np.ndarray) -> str: if isinstance(frame, np.ndarray) and frame.ndim == 1 and frame.size > 2: # Check JPEG header (0xFFD8) and footer (0xFFD9) if ( @@ -63,6 +73,20 @@ def _check_encoding(self, frame: np.ndarray) -> str: return "multi-dim" return "unknown" + async def _resize_and_encode_frame( + self, frame: np.ndarray, size: Tuple[int, int], quality: int + ) -> np.ndarray: + resized_frame = cv2.resize(frame, size) + if not await self.__check_encoding(resized_frame) == "jpeg": + val, encoded_frame = cv2.imencode( + ".jpg", resized_frame, [cv2.IMWRITE_JPEG_QUALITY, quality] + ) + if not val: + raise ValueError( + f"Error encoding frame. Format/shape: {await self.__check_encoding(resized_frame)}" + ) + return encoded_frame + def settings(self) -> None: for key, value in self.__dict__.items(): if key.startswith("_"): @@ -76,32 +100,26 @@ def active_viewers(self) -> int: return len(self._active_viewers) def get_bandwidth(self) -> float: - return sum(self._frame_buffer) + return sum(self._frames_buffer) def set_fps(self, fps: int) -> None: self.fps = fps + self._frames_buffer = deque(maxlen=fps) - # Method for delivering the frame to the StreamHandler async def _get_frame(self) -> np.ndarray: - # A little hacky, if you have a better way, please let me know - await self._ensure_background_tasks() - # Checking the encoding here instead of set_frame - # to avoid continous polling - if self._check_encoding(self._frame) != "jpeg": - raise ValueError( - "Input is not an encoded JPEG frame. Use OpenCV's imencode method to encode the frame to JPEG." - ) - self._frame_buffer.append(len(self._frame.tobytes())) - self._bandwidth_last_modified_time = time.time() + await self._ensure_background_tasks() # A little hacky + if time.time() - self._bandwidth_last_modified_time <= 1.0 / self.fps: + return self._last_processed_frame async with self._lock: - return self._frame + self._frames_buffer.append(len(self._frame.tobytes())) + self._bandwidth_last_modified_time = time.time() + return await self._process_current_frame() def set_frame(self, frame: np.ndarray) -> None: self._frame = frame - # Not very useful, but it's here for the sake of completeness - def get_frame(self) -> np.ndarray: - return self._frame + +CustomStream = StreamBase class Stream(StreamBase): @@ -112,59 +130,26 @@ def __init__( size: Optional[Tuple[int, int]] = None, quality: int = 50, ) -> None: + super().__init__(name, fps) self.size = size self.quality = max(1, min(quality, 100)) self._last_processed_frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) - super().__init__(name, fps) - async def __process_current_frame(self) -> np.ndarray: - frame = self._frame - if not self._check_encoding(frame) == "jpeg": - frame = cv2.resize(frame, self.size or (frame.shape[1], frame.shape[0])) - val, frame = cv2.imencode( - ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality] - ) - if not val: - raise ValueError("Error encoding frame") - else: - print( - "The frame is already encoded, I will not encode nor resize it again. \ -Consider using CustomStream if you want to handle the processing yourself." - ) - self._frame_buffer.append(len(frame.tobytes())) - self._bandwidth_last_modified_time = time.time() + async def _process_current_frame(self) -> np.ndarray: + frame = await self._resize_and_encode_frame( + self._frame, + self.size or (self._frame.shape[1], self._frame.shape[0]), + self.quality, + ) self._last_processed_frame = frame return frame - async def _get_frame(self) -> np.ndarray: - if time.time() - self._bandwidth_last_modified_time <= 1 / self.fps: - return self._last_processed_frame - await self._ensure_background_tasks() - async with self._lock: - return await self.__process_current_frame() - def set_size(self, size: Tuple[int, int]) -> None: self.size = size def set_quality(self, quality: int) -> None: self.quality = max(1, min(quality, 100)) - def set_frame(self, frame: np.ndarray) -> None: - self._frame = frame - - def get_frame(self) -> np.ndarray: - return super().get_frame() - - -class CustomStream(StreamBase): - # Same as StreamBase, but with a friendly name - def __init__( - self, - name: str, - fps: int = 30, - ) -> None: - super().__init__(name, fps) - class ManagedStream(StreamBase): def __init__( @@ -184,7 +169,7 @@ def __init__( raise ValueError(f"Invalid mode. Available modes: {self._available_modes}") self.size = size self.quality = max(1, min(quality, 100)) - self.poll_delay_seconds = poll_delay_ms / 1000 if poll_delay_ms else 1 / fps + self.poll_delay_seconds = poll_delay_ms / 1000.0 if poll_delay_ms else 1.0 / fps self._cap_is_open: bool = False self._cap: cv2.VideoCapture = None self._cap_background_task: Optional[asyncio.Task] = None @@ -201,20 +186,23 @@ async def __manage_cap_state(self) -> None: await asyncio.sleep(self.poll_delay_seconds) if self.mode == "full-on-demand": if self.has_demand() and not self._cap_is_open: - self.__open_cap() + async with self._lock: + await self.__open_cap() elif not self.has_demand() and self._cap_is_open: - self.__close_cap() + async with self._lock: + await self.__close_cap() elif not self._cap_is_open: - self.__open_cap() + async with self._lock: + await self.__open_cap() - def __open_cap(self) -> None: + async def __open_cap(self) -> None: if not self._cap_is_open and self._is_running: self._cap = cv2.VideoCapture(self.source) if not self._cap.isOpened(): raise ValueError("Cannot open the capture device") self._cap_is_open = True - def __close_cap(self) -> None: + async def __close_cap(self) -> None: if self._cap_is_open and self._is_running: self._cap.release() self._cap_is_open = False @@ -223,42 +211,33 @@ async def __read_frame(self) -> None: if self._cap_is_open and self._is_running: val, frame = self._cap.read() if not val: - raise ValueError("Error reading frame") + async with self._lock: + val, frame = self._cap.read() + if not val: + raise RuntimeError("Error reading frame") self._frame = frame else: - self.__open_cap() + await self.__open_cap() - async def __process_current_frame(self) -> np.ndarray: + async def _process_current_frame(self) -> np.ndarray: if not self.has_demand(): - return self._frame + return self._last_processed_frame + print("reading frame") await self.__read_frame() - frame = cv2.resize( - self._frame, self.size or (self._frame.shape[1], self._frame.shape[0]) - ) - val, frame = cv2.imencode( - ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality] + frame = await self._resize_and_encode_frame( + self._frame, + self.size or (self._frame.shape[1], self._frame.shape[0]), + self.quality, ) - if not val: - if self._cap.getBackendName() == "FFMPEG": - raise ValueError( - "Seems like you are using a video file as the source. \ -The media might have ended." - ) - raise ValueError("Error encoding frame") - self._frame_buffer.append(len(frame.tobytes())) - self._bandwidth_last_modified_time = time.time() self._last_processed_frame = frame return frame async def _get_frame(self) -> np.ndarray: if not self._is_running: - print("Stream is not running, please call the start method first.") - return self._frame - if time.time() - self._bandwidth_last_modified_time <= 1 / self.fps: - return self._last_processed_frame - await self._ensure_background_tasks() - async with self._lock: - return await self.__process_current_frame() + raise RuntimeError( + "Stream is not running, please call the start method first." + ) + return await super()._get_frame() def set_size(self, size: Tuple[int, int]) -> None: self.size = size @@ -282,7 +261,7 @@ def change_source(self, source: Union[int, str]) -> None: self.__open_cap() def set_poll_delay_ms(self, poll_delay_ms: float) -> None: - self.poll_delay_seconds = poll_delay_ms / 1000 + self.poll_delay_seconds = poll_delay_ms / 1000.0 def start(self) -> None: if not self._is_running: From 1c56c86c2f209d21cfde46b7f036658a0d8ba4d3 Mon Sep 17 00:00:00 2001 From: egeakman Date: Fri, 5 Apr 2024 05:57:05 +0300 Subject: [PATCH 15/27] #12 and #13 were reintroduced in 1342a95f4beb3d027e0e3ef78e667dc3e634d0ad + remove __new__ override --- mjpeg_streamer/stream.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mjpeg_streamer/stream.py b/mjpeg_streamer/stream.py index ae45724..edd236c 100644 --- a/mjpeg_streamer/stream.py +++ b/mjpeg_streamer/stream.py @@ -14,6 +14,10 @@ def __init__( name: str, fps: int = 30, ) -> None: + if type(self) is StreamBase: + raise TypeError( + "StreamBase is an abstract class and cannot be instantiated." + ) self.name = name.casefold().replace(" ", "_") self.fps = fps self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) @@ -23,9 +27,6 @@ def __init__( self._active_viewers: Set[str] = set() self._bandwidth_background_task: Optional[asyncio.Task] = None - def __new__(self, *args, **kwargs): - raise TypeError("Cannot instantiate an abstract class") - async def _ensure_background_tasks(self) -> None: if ( self._bandwidth_background_task is None From e57fa8825b083e0f3267792012d9a66a88d9e56d Mon Sep 17 00:00:00 2001 From: egeakman Date: Fri, 3 May 2024 17:56:57 +0300 Subject: [PATCH 16/27] This fixes #12 but only tried on Windows --- mjpeg_streamer/__init__.py | 4 ++-- mjpeg_streamer/server.py | 10 +++++----- mjpeg_streamer/stream.py | 38 ++++++++++++-------------------------- tests/test_fast_od.py | 21 +++++++++++++++++++++ tests/test_full_od.py | 21 +++++++++++++++++++++ tests/test_stream.py | 14 ++++++++++++++ tests/test_streambase.py | 16 ++++++++++++++++ 7 files changed, 91 insertions(+), 33 deletions(-) create mode 100644 tests/test_fast_od.py create mode 100644 tests/test_full_od.py create mode 100644 tests/test_stream.py create mode 100644 tests/test_streambase.py diff --git a/mjpeg_streamer/__init__.py b/mjpeg_streamer/__init__.py index 78c0667..ee5b55e 100644 --- a/mjpeg_streamer/__init__.py +++ b/mjpeg_streamer/__init__.py @@ -1,5 +1,5 @@ from .server import MjpegServer, Server -from .stream import CustomStream, ManagedStream, Stream +from .stream import ManagedStream, Stream, StreamBase -__all__ = ["CustomStream", "ManagedStream", "MjpegServer", "Stream", "Server"] +__all__ = ["StreamBase", "ManagedStream", "MjpegServer", "Stream", "Server"] __version__ = "2024.2.8" diff --git a/mjpeg_streamer/server.py b/mjpeg_streamer/server.py index 023a747..5316b75 100644 --- a/mjpeg_streamer/server.py +++ b/mjpeg_streamer/server.py @@ -23,7 +23,10 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: "Content-Type": "multipart/x-mixed-replace;boundary=image-boundary" }, ) - await response.prepare(request) + try: + await response.prepare(request) + except (ConnectionResetError, ConnectionAbortedError): + pass if not viewer_token: viewer_token = await self._stream._add_viewer() response.set_cookie("viewer_token", viewer_token) @@ -125,7 +128,4 @@ def stop(self) -> None: class MjpegServer(Server): # Alias for Server, to maintain backwards compatibility - def __init__( - self, host: Union[str, List[str,]] = "localhost", port: int = 8080 - ) -> None: - super().__init__(host, port) + pass diff --git a/mjpeg_streamer/stream.py b/mjpeg_streamer/stream.py index edd236c..fc678c8 100644 --- a/mjpeg_streamer/stream.py +++ b/mjpeg_streamer/stream.py @@ -2,7 +2,7 @@ import time import uuid from collections import deque -from typing import Deque, List, Optional, Set, Tuple, Union +from typing import Deque, Dict, List, Optional, Set, Tuple, Union import cv2 import numpy as np @@ -14,10 +14,6 @@ def __init__( name: str, fps: int = 30, ) -> None: - if type(self) is StreamBase: - raise TypeError( - "StreamBase is an abstract class and cannot be instantiated." - ) self.name = name.casefold().replace(" ", "_") self.fps = fps self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) @@ -25,18 +21,16 @@ def __init__( self._frames_buffer: Deque[int] = deque(maxlen=fps) self._bandwidth_last_modified_time: float = time.time() self._active_viewers: Set[str] = set() - self._bandwidth_background_task: Optional[asyncio.Task] = None + self._tasks: Dict[str, asyncio.Task] = {"_clear_bandwidth": None} async def _ensure_background_tasks(self) -> None: - if ( - self._bandwidth_background_task is None - or self._bandwidth_background_task.done() - ): - self._bandwidth_background_task = asyncio.create_task( - self.__clear_bandwidth() - ) + for task_name, task in self._tasks.items(): + if task is None or task.done(): + self._tasks[task_name] = asyncio.create_task( + eval(f"self.{task_name}()") + ) - async def __clear_bandwidth(self) -> None: + async def _clear_bandwidth(self) -> None: while True: await asyncio.sleep(1.0 / self.fps) if ( @@ -120,9 +114,6 @@ def set_frame(self, frame: np.ndarray) -> None: self._frame = frame -CustomStream = StreamBase - - class Stream(StreamBase): def __init__( self, @@ -163,6 +154,7 @@ def __init__( mode: str = "fast-on-demand", poll_delay_ms: Optional[Union[float, int]] = None, ) -> None: + super().__init__(name, fps) self.source = source self.mode = mode self._available_modes: List[str,] = ["fast-on-demand", "full-on-demand"] @@ -175,14 +167,9 @@ def __init__( self._cap: cv2.VideoCapture = None self._cap_background_task: Optional[asyncio.Task] = None self._is_running: bool = False - super().__init__(name, fps) - - async def _ensure_background_tasks(self) -> None: - await super()._ensure_background_tasks() - if self._cap_background_task is None or self._cap_background_task.done(): - self._cap_background_task = asyncio.create_task(self.__manage_cap_state()) + self._tasks["_manage_cap_state"] = None - async def __manage_cap_state(self) -> None: + async def _manage_cap_state(self) -> None: while True: await asyncio.sleep(self.poll_delay_seconds) if self.mode == "full-on-demand": @@ -223,7 +210,6 @@ async def __read_frame(self) -> None: async def _process_current_frame(self) -> np.ndarray: if not self.has_demand(): return self._last_processed_frame - print("reading frame") await self.__read_frame() frame = await self._resize_and_encode_frame( self._frame, @@ -248,7 +234,7 @@ def set_quality(self, quality: int) -> None: def set_frame(self, frame: np.ndarray) -> None: raise NotImplementedError( - "This method is not available for ManagedStream, use Stream or CustomStream instead." + "This method is not available for ManagedStream, use Stream or StreamBase instead." ) def change_mode(self, mode: str) -> None: diff --git a/tests/test_fast_od.py b/tests/test_fast_od.py new file mode 100644 index 0000000..06c9670 --- /dev/null +++ b/tests/test_fast_od.py @@ -0,0 +1,21 @@ +import time + +from mjpeg_streamer import ManagedStream, Server + +server = Server() +stream = ManagedStream( + "test", + source=0, + fps=30, + size=(640, 480), + quality=50, + mode="fast-on-demand", + poll_delay_ms=None, +) + +server.add_stream(stream) +server.start() +stream.start() + +while True: + time.sleep(1 / 30) diff --git a/tests/test_full_od.py b/tests/test_full_od.py new file mode 100644 index 0000000..afc28e8 --- /dev/null +++ b/tests/test_full_od.py @@ -0,0 +1,21 @@ +import time + +from mjpeg_streamer import ManagedStream, Server + +server = Server() +stream = ManagedStream( + "test", + source=0, + fps=30, + size=(640, 480), + quality=50, + mode="full-on-demand", + poll_delay_ms=None, +) + +server.add_stream(stream) +server.start() +stream.start() + +while True: + time.sleep(1 / 30) diff --git a/tests/test_stream.py b/tests/test_stream.py new file mode 100644 index 0000000..e26a044 --- /dev/null +++ b/tests/test_stream.py @@ -0,0 +1,14 @@ +import cv2 + +from mjpeg_streamer import Server, Stream + +capture = cv2.VideoCapture(0) + +server = Server() +stream = Stream("test", fps=30, size=(640, 480), quality=50) +server.add_stream(stream) +server.start() + +while True: + frame = capture.read()[1] + stream.set_frame(frame) diff --git a/tests/test_streambase.py b/tests/test_streambase.py new file mode 100644 index 0000000..e3f4ef7 --- /dev/null +++ b/tests/test_streambase.py @@ -0,0 +1,16 @@ +import cv2 + +from mjpeg_streamer import Server, StreamBase + +capture = cv2.VideoCapture(0) + +server = Server() +stream = StreamBase("test", fps=30) + +server.add_stream(stream) +server.start() + +while True: + frame = capture.read()[1] + frame = cv2.imencode(".jpg", frame)[1] + stream.set_frame(frame) From 2ea77e3f1017f59a5637fa8ac1426ba595f1c983 Mon Sep 17 00:00:00 2001 From: egeakman Date: Sat, 4 May 2024 17:07:39 +0300 Subject: [PATCH 17/27] Add performance test results + #13 fix is validated, it works on Linux too --- mjpeg_streamer/stream.py | 2 +- perf/py-spy-results.md | 37 +++++++++++++++++++++++++++++ perf/usage-tests.md | 51 ++++++++++++++++++++++++++++++++++++++++ tests/test_fast_od.py | 2 +- tests/test_full_od.py | 2 +- tests/test_stream.py | 2 +- tests/test_streambase.py | 2 +- 7 files changed, 93 insertions(+), 5 deletions(-) create mode 100644 perf/py-spy-results.md create mode 100644 perf/usage-tests.md diff --git a/mjpeg_streamer/stream.py b/mjpeg_streamer/stream.py index fc678c8..d1a43bf 100644 --- a/mjpeg_streamer/stream.py +++ b/mjpeg_streamer/stream.py @@ -104,7 +104,7 @@ def set_fps(self, fps: int) -> None: async def _get_frame(self) -> np.ndarray: await self._ensure_background_tasks() # A little hacky if time.time() - self._bandwidth_last_modified_time <= 1.0 / self.fps: - return self._last_processed_frame + return self._last_processed_frame # Prevents redundant processing async with self._lock: self._frames_buffer.append(len(self._frame.tobytes())) self._bandwidth_last_modified_time = time.time() diff --git a/perf/py-spy-results.md b/perf/py-spy-results.md new file mode 100644 index 0000000..ebdcde8 --- /dev/null +++ b/perf/py-spy-results.md @@ -0,0 +1,37 @@ +# PY-SPY Tests + +Performance results gathered using py-spy. [Test scripts](tests/) were run with the following command: + +```bash +py-spy top -- python tests/.py +``` + +## Results + +### test_streambase: +- no demand: +``GIL: 0% Active: 5-15% Threads: 2`` +- demand: +``GIL: 0% Active: 5-15% Threads: 2`` + +### test_stream: +- no demand: +``GIL: 0% Active: 0-4% Threads: 2`` +- demand: +``GIL: 0-2% Active: 3-10% Threads: 2`` + +### test_full_od: + - initial no demand: + ``GIL: 0% Active: 0-1% Threads: 2`` + - no demand: + ``GIL: 0% Active: 0% Threads: 2`` + - demand: + ``GIL: 0-2% Active: 2-10% Threads: 2`` + +### test_fast_od: + - initial no demand: + ``GIL: 0% Active: 0-1% Threads: 2`` + - no demand: + ``GIL: 0% Active: 0-1% Threads: 2`` + - demand: + ``GIL: 0-2% Active: 2-10% Threads: 2`` diff --git a/perf/usage-tests.md b/perf/usage-tests.md new file mode 100644 index 0000000..4602d52 --- /dev/null +++ b/perf/usage-tests.md @@ -0,0 +1,51 @@ +# Usage Tests + +Results gathered from running the tests in the [tests](tests/) directory. These results are very arbitrary and should not be taken as a definitive measure of performance. They are meant to give a rough idea of how the system behaves under different conditions. + +## Computer Specs + +```bash +$ neofetch --off +OS: Ubuntu 23.10 x86_64 +Host: Victus by HP Laptop 16-e0xxx +Kernel: 6.5.0-28-generic +Shell: zsh 5.9 +DE: GNOME 45.2 +Terminal: gnome-terminal +CPU: AMD Ryzen 7 5800H with Radeon Graphics (16) @ 4.463GHz +GPU: AMD ATI Radeon Vega Series / Radeon Vega Mobile Series +GPU: NVIDIA GeForce RTX 3050 Ti Mobile +Memory: 15313MiB +``` + +*Note:* Irrelevant information has been removed from the output. + +## Results + +### test_streambase: +- no demand: +``CPU: 14-16% Memory: 39.2 MB`` +- demand: +``CPU: 15-19% Memory: 39.5 MB`` + +### test_stream: +- no demand: +``CPU: 14-16% Memory: 39.7 MB`` +- demand: +``CPU: 14-17% Memory: 41.7 MB`` + +### test_full_od: + - initial no demand: + ``CPU: 0% Memory: 37 MB`` + - no demand: + ``CPU: 0% Memory: 39.9 MB`` + - demand: + ``CPU: 15-18% Memory: 41 MB`` + +### test_fast_od: + - initial no demand: + ``CPU: 0% Memory: 37.2 MB`` + - no demand: + ``CPU: 0-0.1% Memory: 41.1 MB`` + - demand: + ``CPU: 15-18% Memory: 41.1 MB`` diff --git a/tests/test_fast_od.py b/tests/test_fast_od.py index 06c9670..dc5d6a4 100644 --- a/tests/test_fast_od.py +++ b/tests/test_fast_od.py @@ -2,7 +2,7 @@ from mjpeg_streamer import ManagedStream, Server -server = Server() +server = Server(host="0.0.0.0", port=8080) stream = ManagedStream( "test", source=0, diff --git a/tests/test_full_od.py b/tests/test_full_od.py index afc28e8..78221a3 100644 --- a/tests/test_full_od.py +++ b/tests/test_full_od.py @@ -2,7 +2,7 @@ from mjpeg_streamer import ManagedStream, Server -server = Server() +server = Server(host="0.0.0.0", port=8080) stream = ManagedStream( "test", source=0, diff --git a/tests/test_stream.py b/tests/test_stream.py index e26a044..a66a99b 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -4,7 +4,7 @@ capture = cv2.VideoCapture(0) -server = Server() +server = Server(host="0.0.0.0", port=8080) stream = Stream("test", fps=30, size=(640, 480), quality=50) server.add_stream(stream) server.start() diff --git a/tests/test_streambase.py b/tests/test_streambase.py index e3f4ef7..509d5c9 100644 --- a/tests/test_streambase.py +++ b/tests/test_streambase.py @@ -4,7 +4,7 @@ capture = cv2.VideoCapture(0) -server = Server() +server = Server(host="0.0.0.0", port=8080) stream = StreamBase("test", fps=30) server.add_stream(stream) From 0ada5d0fc286cea2516f659167a8960af291fde9 Mon Sep 17 00:00:00 2001 From: Ege Akman Date: Sat, 4 May 2024 17:11:01 +0300 Subject: [PATCH 18/27] Update py-spy-results.md --- perf/py-spy-results.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/perf/py-spy-results.md b/perf/py-spy-results.md index ebdcde8..9fe861c 100644 --- a/perf/py-spy-results.md +++ b/perf/py-spy-results.md @@ -8,6 +8,8 @@ py-spy top -- python tests/.py ## Results +*Python version: 3.11.8* + ### test_streambase: - no demand: ``GIL: 0% Active: 5-15% Threads: 2`` From 37179decc062644ba2faa3d38cf42528295f11e0 Mon Sep 17 00:00:00 2001 From: Ege Akman Date: Sat, 4 May 2024 17:11:12 +0300 Subject: [PATCH 19/27] Update usage-tests.md --- perf/usage-tests.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/perf/usage-tests.md b/perf/usage-tests.md index 4602d52..5877cba 100644 --- a/perf/usage-tests.md +++ b/perf/usage-tests.md @@ -22,6 +22,8 @@ Memory: 15313MiB ## Results +*Python version: 3.11.8* + ### test_streambase: - no demand: ``CPU: 14-16% Memory: 39.2 MB`` From a97bdb6c7a9745d576a5a8324abf8498d82db94d Mon Sep 17 00:00:00 2001 From: Ege Akman Date: Sat, 4 May 2024 17:12:10 +0300 Subject: [PATCH 20/27] Update py-spy-results.md --- perf/py-spy-results.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/perf/py-spy-results.md b/perf/py-spy-results.md index 9fe861c..13627d6 100644 --- a/perf/py-spy-results.md +++ b/perf/py-spy-results.md @@ -1,6 +1,6 @@ # PY-SPY Tests -Performance results gathered using py-spy. [Test scripts](tests/) were run with the following command: +Performance results gathered using py-spy. [Test scripts](../tests/) were run with the following command: ```bash py-spy top -- python tests/.py From d938e4e967153714675f7fb7979c74c43af23cf8 Mon Sep 17 00:00:00 2001 From: Ege Akman Date: Sat, 4 May 2024 17:12:20 +0300 Subject: [PATCH 21/27] Update usage-tests.md --- perf/usage-tests.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/perf/usage-tests.md b/perf/usage-tests.md index 5877cba..fc01ac6 100644 --- a/perf/usage-tests.md +++ b/perf/usage-tests.md @@ -1,6 +1,6 @@ # Usage Tests -Results gathered from running the tests in the [tests](tests/) directory. These results are very arbitrary and should not be taken as a definitive measure of performance. They are meant to give a rough idea of how the system behaves under different conditions. +Results gathered from running the tests in the [tests](../tests/) directory. These results are very arbitrary and should not be taken as a definitive measure of performance. They are meant to give a rough idea of how the system behaves under different conditions. ## Computer Specs From b6d257f563dc8652123fd512d357b1bf32059423 Mon Sep 17 00:00:00 2001 From: egeakman Date: Sun, 5 May 2024 21:16:57 +0300 Subject: [PATCH 22/27] Update tests --- tests/test_fast_od.py | 3 +++ tests/test_full_od.py | 3 +++ tests/test_stream.py | 3 +++ tests/test_streambase.py | 3 +++ 4 files changed, 12 insertions(+) diff --git a/tests/test_fast_od.py b/tests/test_fast_od.py index dc5d6a4..7cb9518 100644 --- a/tests/test_fast_od.py +++ b/tests/test_fast_od.py @@ -17,5 +17,8 @@ server.start() stream.start() +print(stream.settings()) + while True: time.sleep(1 / 30) + print(stream.get_bandwidth() / 1024, "KB/s", end="\r") diff --git a/tests/test_full_od.py b/tests/test_full_od.py index 78221a3..a61dca0 100644 --- a/tests/test_full_od.py +++ b/tests/test_full_od.py @@ -17,5 +17,8 @@ server.start() stream.start() +print(stream.settings()) + while True: time.sleep(1 / 30) + print(stream.get_bandwidth() / 1024, "KB/s", end="\r") diff --git a/tests/test_stream.py b/tests/test_stream.py index a66a99b..8c382fc 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -9,6 +9,9 @@ server.add_stream(stream) server.start() +print(stream.settings()) + while True: frame = capture.read()[1] stream.set_frame(frame) + print(stream.get_bandwidth() / 1024, "KB/s", end="\r") diff --git a/tests/test_streambase.py b/tests/test_streambase.py index 509d5c9..bf5a450 100644 --- a/tests/test_streambase.py +++ b/tests/test_streambase.py @@ -10,7 +10,10 @@ server.add_stream(stream) server.start() +print(stream.settings()) + while True: frame = capture.read()[1] frame = cv2.imencode(".jpg", frame)[1] stream.set_frame(frame) + print(stream.get_bandwidth() / 1024, "KB/s", end="\r") From 38a13fec96c0ffa46b408853dc68bbda7931c334 Mon Sep 17 00:00:00 2001 From: egeakman Date: Sun, 5 May 2024 21:54:09 +0300 Subject: [PATCH 23/27] Fix #24 --- mjpeg_streamer/stream.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/mjpeg_streamer/stream.py b/mjpeg_streamer/stream.py index d1a43bf..e0f09d5 100644 --- a/mjpeg_streamer/stream.py +++ b/mjpeg_streamer/stream.py @@ -17,6 +17,9 @@ def __init__( self.name = name.casefold().replace(" ", "_") self.fps = fps self._frame: np.ndarray = np.zeros((320, 240, 1), dtype=np.uint8) + self._last_processed_frame: np.ndarray = cv2.imencode( + ".jpg", self._frame, [cv2.IMWRITE_JPEG_QUALITY, 1] + )[1] self._lock: asyncio.Lock = asyncio.Lock() self._frames_buffer: Deque[int] = deque(maxlen=fps) self._bandwidth_last_modified_time: float = time.time() @@ -55,15 +58,15 @@ async def _process_current_frame(self) -> np.ndarray: async def __check_encoding(self, frame: np.ndarray) -> str: if isinstance(frame, np.ndarray) and frame.ndim == 1 and frame.size > 2: - # Check JPEG header (0xFFD8) and footer (0xFFD9) + # Check JPG header (0xFFD8) and footer (0xFFD9) if ( frame[0] == 255 and frame[1] == 216 and frame[-2] == 255 and frame[-1] == 217 ): - return "jpeg" - return "one-dim-non-jpeg" + return "jpg" + return "one-dim-non-jpg" if isinstance(frame, np.ndarray): return "multi-dim" return "unknown" @@ -72,7 +75,7 @@ async def _resize_and_encode_frame( self, frame: np.ndarray, size: Tuple[int, int], quality: int ) -> np.ndarray: resized_frame = cv2.resize(frame, size) - if not await self.__check_encoding(resized_frame) == "jpeg": + if not await self.__check_encoding(resized_frame) == "jpg": val, encoded_frame = cv2.imencode( ".jpg", resized_frame, [cv2.IMWRITE_JPEG_QUALITY, quality] ) @@ -106,7 +109,7 @@ async def _get_frame(self) -> np.ndarray: if time.time() - self._bandwidth_last_modified_time <= 1.0 / self.fps: return self._last_processed_frame # Prevents redundant processing async with self._lock: - self._frames_buffer.append(len(self._frame.tobytes())) + self._frames_buffer.append(len(self._last_processed_frame.tobytes())) self._bandwidth_last_modified_time = time.time() return await self._process_current_frame() From 2b4e5be8c88899b1ff1420dbca019ff6c8bc76c7 Mon Sep 17 00:00:00 2001 From: egeakman Date: Sun, 5 May 2024 22:05:44 +0300 Subject: [PATCH 24/27] update gitignore --- .gitignore | 3 --- 1 file changed, 3 deletions(-) diff --git a/.gitignore b/.gitignore index 63bdc33..a07a66d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,3 @@ build/ .mypy_cache/ .pytest_cache/ .ruff_cache/ -test*.py -!test_*.py -test-perf/ From 9d4d5a8d471c643f36b6d940c4758653d5f82509 Mon Sep 17 00:00:00 2001 From: egeakman Date: Wed, 8 May 2024 23:21:48 +0300 Subject: [PATCH 25/27] Make CLI use ManagedStream --- mjpeg_streamer/cli.py | 75 ++++++++++++---------------------------- mjpeg_streamer/stream.py | 9 +++-- 2 files changed, 29 insertions(+), 55 deletions(-) diff --git a/mjpeg_streamer/cli.py b/mjpeg_streamer/cli.py index fb5f55f..4fd4289 100644 --- a/mjpeg_streamer/cli.py +++ b/mjpeg_streamer/cli.py @@ -1,14 +1,10 @@ -# WON'T WORK RIGHT NOW. USE THE MAIN BRANCH FOR THE CLI - import argparse import re -import threading -from typing import List, Tuple, Union - -import cv2 +import time +from typing import Dict, List, Tuple, Union -from .server import MjpegServer -from .stream import Stream +from .server import Server +from .stream import ManagedStream def parse_args() -> argparse.Namespace: @@ -16,7 +12,7 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--host", type=str, default="localhost") parser.add_argument("--port", type=int, default=8080) parser.add_argument( - "--prefix", type=str, default="source", help="Name prefix for the streams" + "--prefix", type=str, default="", help="Name prefix for the streams" ) parser.add_argument( "--source", @@ -37,84 +33,59 @@ def parse_args() -> argparse.Namespace: ) args: argparse.Namespace = parser.parse_args() args.prefix = re.sub("[^0-9a-zA-Z]+", "_", args.prefix) - args.source: List[Union[int, str],] = [[0]] if args.source is None else args.source + args.source = [[0]] if args.source is None else args.source args.source = [item for sublist in args.source for item in sublist] args.source = list(set(args.source)) return args -def run( - cap: cv2.VideoCapture, - stream: Stream, - stop_event: threading.Event, - show_bandwidth: bool, -) -> None: - while not stop_event.is_set(): - ret, frame = cap.read() - if not ret: - stop_event.set() - break - stream.set_frame(frame) - if show_bandwidth: - global bandwidth - bandwidth[stream.name] = stream.get_bandwidth() - cap.release() - - def main() -> None: args = parse_args() size: Tuple[int, int] = (args.width, args.height) - server = MjpegServer(args.host, args.port) - threads: List[threading.Thread,] = [] - stop_events: List[threading.Event,] = [] + streams: List[ManagedStream] = [] + server = Server(args.host, args.port) if args.show_bandwidth: - global bandwidth - bandwidth = {} # dict[str, int] + bandwidth: Dict[str, int] = {} for source in args.source: source: Union[int, str] = int(source) if str(source).isdigit() else source - cap = cv2.VideoCapture(source) source_display = ( re.sub("[^0-9a-zA-Z]+", "_", source) if isinstance(source, str) else source ) - stream = Stream( - f"{args.prefix}_{source_display!s}", + stream = ManagedStream( + f"{args.prefix}{'_' if args.prefix else ''}{source_display!s}", + source=source, size=size, quality=args.quality, fps=args.fps, ) server.add_stream(stream) - stop_event = threading.Event() - stop_events.append(stop_event) - thread = threading.Thread( - target=run, args=(cap, stream, stop_event, args.show_bandwidth) - ) - threads.append(thread) + streams.append(stream) try: - for thread in threads: - thread.start() + for stream in streams: + stream.start() server.start() while True: if args.show_bandwidth: + for stream in streams: + bandwidth[stream.name] = stream.get_bandwidth() print( f"{' | '.join([f'{k}: {round(v / 1024, 2)} KB/s' for k, v in bandwidth.items()])}", end="\r", ) + else: + time.sleep(1) # Keep the main thread alive, but don't consume CPU except KeyboardInterrupt: - for stop_event in stop_events: - stop_event.set() + for stream in streams: + stream.stop() server.stop() - for thread in threads: - thread.join() except Exception as e: print(e) - for stop_event in stop_events: - stop_event.set() + for stream in streams: + stream.stop() server.stop() - for thread in threads: - thread.join() if __name__ == "__main__": diff --git a/mjpeg_streamer/stream.py b/mjpeg_streamer/stream.py index e0f09d5..894bc02 100644 --- a/mjpeg_streamer/stream.py +++ b/mjpeg_streamer/stream.py @@ -168,7 +168,6 @@ def __init__( self.poll_delay_seconds = poll_delay_ms / 1000.0 if poll_delay_ms else 1.0 / fps self._cap_is_open: bool = False self._cap: cv2.VideoCapture = None - self._cap_background_task: Optional[asyncio.Task] = None self._is_running: bool = False self._tasks["_manage_cap_state"] = None @@ -261,8 +260,12 @@ def start(self) -> None: def stop(self) -> None: if self._is_running: - self._cap_background_task.cancel() - self.__close_cap() + for task in self._tasks.values(): + if task and not task.done(): + task.cancel() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.__close_cap()) + loop.close() if loop.is_running() else None self._is_running = False self._cap_is_open = False else: From 56b0034fae77ae5127ad29e4c9c67e3f3ab4f3c3 Mon Sep 17 00:00:00 2001 From: egeakman Date: Thu, 9 May 2024 00:48:48 +0300 Subject: [PATCH 26/27] small improvement --- mjpeg_streamer/cli.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/mjpeg_streamer/cli.py b/mjpeg_streamer/cli.py index 4fd4289..f8b777c 100644 --- a/mjpeg_streamer/cli.py +++ b/mjpeg_streamer/cli.py @@ -78,11 +78,10 @@ def main() -> None: else: time.sleep(1) # Keep the main thread alive, but don't consume CPU except KeyboardInterrupt: - for stream in streams: - stream.stop() - server.stop() + print("\nExiting...") except Exception as e: - print(e) + print("Error:", e) + finally: for stream in streams: stream.stop() server.stop() From 4e63aaf4ec2d2967dd5ac57252e12392998e6cb9 Mon Sep 17 00:00:00 2001 From: egeakman Date: Thu, 9 May 2024 19:53:38 +0300 Subject: [PATCH 27/27] Handle ConnectionError --- mjpeg_streamer/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mjpeg_streamer/server.py b/mjpeg_streamer/server.py index 5316b75..3f2901f 100644 --- a/mjpeg_streamer/server.py +++ b/mjpeg_streamer/server.py @@ -25,7 +25,7 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: ) try: await response.prepare(request) - except (ConnectionResetError, ConnectionAbortedError): + except (ConnectionResetError, ConnectionAbortedError, ConnectionError): pass if not viewer_token: viewer_token = await self._stream._add_viewer() @@ -46,7 +46,7 @@ async def __call__(self, request: web.Request) -> web.StreamResponse: ) await mpwriter.write(response, close_boundary=False) await response.write(b"\r\n") - except (ConnectionResetError, ConnectionAbortedError): + except (ConnectionResetError, ConnectionAbortedError, ConnectionError): break finally: await self._stream._remove_viewer(viewer_token)