diff --git a/atpbar/callback.py b/atpbar/callback.py index f889da1..6ed4be7 100644 --- a/atpbar/callback.py +++ b/atpbar/callback.py @@ -1,12 +1,9 @@ from collections.abc import Iterator from contextlib import contextmanager -from multiprocessing import Queue from threading import Lock, current_thread, main_thread from .machine import StateMachine -from .presentation import create_presentation -from .progress_report import ProgressReporter, ProgressReportPickup, Report -from .stream import StreamQueue, StreamRedirection, register_stream_queue +from .progress_report import ProgressReporter class CallbackImp: @@ -17,41 +14,12 @@ def __init__(self) -> None: self._lock: Lock # to be set by the StateMachine def on_active(self) -> None: - - self.queue: Queue[Report] = Queue() - self.notices_from_sub_processes: Queue[bool] = Queue() - self.stream_queue: StreamQueue = Queue() - self.reporter = ProgressReporter( - queue=self.queue, - notices_from_sub_processes=self.notices_from_sub_processes, - stream_queue=self.stream_queue, - ) - - self._start_pickup() - - if self.stream_redirection.disabled: - self.reporter.stream_redirection_enabled = False - - def _start_pickup(self) -> None: - presentation = create_presentation() - self.pickup = ProgressReportPickup(self.queue, presentation) - - self.stream_redirection = StreamRedirection( - queue=self.stream_queue, presentation=presentation - ) - self.stream_redirection.start() - - def _end_pickup(self) -> None: - self.pickup.end() - - self.stream_redirection.end() - - def _restart_pickup(self) -> None: - self._end_pickup() - self._start_pickup() + self.reporter = ProgressReporter() + self.reporter.start_pickup() @contextmanager def fetch_reporter_in_active(self) -> Iterator[ProgressReporter | None]: + assert self.reporter if not in_main_thread(): self.to_restart_pickup = False @@ -59,10 +27,7 @@ def fetch_reporter_in_active(self) -> Iterator[ProgressReporter | None]: return self.to_restart_pickup = True - - while not self.notices_from_sub_processes.empty(): - _ = self.notices_from_sub_processes.get() - + self.reporter.empty_notices() self._machine.on_yielded() try: @@ -70,21 +35,22 @@ def fetch_reporter_in_active(self) -> Iterator[ProgressReporter | None]: finally: self._machine.on_resumed() - while not self.notices_from_sub_processes.empty(): - _ = self.notices_from_sub_processes.get() + if self.reporter.empty_notices(): self.to_restart_pickup = False if not self.to_restart_pickup: return - with self._lock: - self._restart_pickup() + with self._lock: # NOTE: This lock is probably unnecessary + self.reporter.restart_pickup() def flush_in_active(self) -> None: - self._restart_pickup() + assert self.reporter + self.reporter.restart_pickup() def shutdown_in_active(self) -> None: - self._end_pickup() + assert self.reporter + self.reporter.end_pickup() @contextmanager def fetch_reporter_in_yielded(self) -> Iterator[ProgressReporter | None]: @@ -100,8 +66,7 @@ def on_registered(self, reporter: ProgressReporter | None) -> None: self.reporter = reporter if reporter is None: return - if reporter.stream_redirection_enabled: - register_stream_queue(reporter.stream_queue) + reporter.register() @contextmanager def fetch_reporter_in_registered(self) -> Iterator[ProgressReporter | None]: @@ -109,7 +74,7 @@ def fetch_reporter_in_registered(self) -> Iterator[ProgressReporter | None]: yield self.reporter return - self.reporter.notices_from_sub_processes.put(True) + self.reporter.notice() yield self.reporter def on_disabled(self) -> None: diff --git a/atpbar/progress_report/reporter.py b/atpbar/progress_report/reporter.py index b99a11f..90c745a 100755 --- a/atpbar/progress_report/reporter.py +++ b/atpbar/progress_report/reporter.py @@ -1,19 +1,21 @@ import time from multiprocessing import Queue -from typing import TYPE_CHECKING from uuid import UUID +from atpbar.presentation import create_presentation +from atpbar.stream import StreamQueue, StreamRedirection, register_stream_queue + from .complement import ProgressReportComplementer +from .pickup import ProgressReportPickup from .report import Report -if TYPE_CHECKING: - from atpbar.stream import StreamQueue - DEFAULT_INTERVAL = 0.1 # [second] class ProgressReporter: - '''A progress reporter + '''A progress reporter. + + NOTE: This docstring is outdated. This class sends progress reports. The reports will be picked up by the pickup (`ProgressReportPickup`), which uses the reports, @@ -45,24 +47,49 @@ class ProgressReporter: The queue through which this class sends progress reports. ''' - def __init__( - self, - queue: 'Queue[Report]', - notices_from_sub_processes: 'Queue[bool]', - stream_queue: 'StreamQueue', - ) -> None: - self.queue = queue + def __init__(self) -> None: + self.queue: Queue[Report] = Queue() + self.notices_from_sub_processes: Queue[bool] = Queue() + self.stream_queue: StreamQueue = Queue() self.interval = DEFAULT_INTERVAL # [second] self.last_time = dict[UUID, float]() self.complete_report = ProgressReportComplementer() - self.notices_from_sub_processes = notices_from_sub_processes - self.stream_queue = stream_queue self.stream_redirection_enabled = True def __repr__(self) -> str: - return '{}(queue={!r}, interval={!r})'.format( - self.__class__.__name__, self.queue, self.interval + return f'{self.__class__.__name__}(queue={self.queue!r}, interval={self.interval!r})' + + def start_pickup(self) -> None: + presentation = create_presentation() + self.pickup = ProgressReportPickup(self.queue, presentation) + + self.stream_redirection = StreamRedirection( + queue=self.stream_queue, presentation=presentation ) + self.stream_redirection.start() + self.stream_redirection_enabled = not self.stream_redirection.disabled + + def end_pickup(self) -> None: + self.pickup.end() + self.stream_redirection.end() + + def restart_pickup(self) -> None: + self.end_pickup() + self.start_pickup() + + def notice(self) -> None: + self.notices_from_sub_processes.put(True) + + def empty_notices(self) -> bool: + ret = False + while not self.notices_from_sub_processes.empty(): + _ = self.notices_from_sub_processes.get() + ret = True + return ret + + def register(self) -> None: + if self.stream_redirection_enabled: + register_stream_queue(self.stream_queue) def report(self, report: Report) -> None: '''send ``report`` to a progress monitor diff --git a/tests/progress_report/test_reporter.py b/tests/progress_report/test_reporter.py index 80d46d6..13b177e 100644 --- a/tests/progress_report/test_reporter.py +++ b/tests/progress_report/test_reporter.py @@ -24,9 +24,8 @@ def mock_time(monkeypatch: pytest.MonkeyPatch) -> mock.Mock: @pytest.fixture() def obj(mock_queue: mock.Mock, mock_time: mock.Mock) -> ProgressReporter: - ret = ProgressReporter( - mock_queue, notices_from_sub_processes=mock.Mock(), stream_queue=mock.Mock() - ) + ret = ProgressReporter() + ret.queue = mock_queue return ret diff --git a/tests/scenarios/test_utils.py b/tests/scenarios/test_utils.py index 187bb29..bc9d6a4 100644 --- a/tests/scenarios/test_utils.py +++ b/tests/scenarios/test_utils.py @@ -1,4 +1,4 @@ -from atpbar import callback, funcs +from atpbar import funcs from atpbar.progress_report import reporter from .utils import mock_presentations @@ -11,12 +11,12 @@ class MockException(Exception): func_machine = funcs._machine interval = reporter.DEFAULT_INTERVAL - create_presentation = callback.create_presentation + create_presentation = reporter.create_presentation try: with mock_presentations() as presentations: assert func_machine is not funcs._machine assert interval != reporter.DEFAULT_INTERVAL - assert create_presentation is not callback.create_presentation + assert create_presentation is not reporter.create_presentation presentations raise MockException() except MockException: @@ -24,4 +24,4 @@ class MockException(Exception): finally: assert func_machine is funcs._machine assert interval == reporter.DEFAULT_INTERVAL - assert create_presentation is callback.create_presentation + assert create_presentation is reporter.create_presentation diff --git a/tests/scenarios/utils.py b/tests/scenarios/utils.py index af050e9..c0fd96c 100644 --- a/tests/scenarios/utils.py +++ b/tests/scenarios/utils.py @@ -88,7 +88,7 @@ def monkeypatch_reporter_interval() -> Iterator[None]: def mock_create_presentation() -> Iterator[MockCreatePresentation]: y = MockCreatePresentation() with MonkeyPatch.context() as m: - m.setattr(callback, 'create_presentation', y) + m.setattr(reporter, 'create_presentation', y) yield y