Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean code #60

Merged
merged 5 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 14 additions & 49 deletions atpbar/callback.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from collections.abc import Iterator
from contextlib import contextmanager
from multiprocessing import Queue
from threading import Lock, current_thread, main_thread

from .machine import StateMachine
from .presentation import create_presentation
from .progress_report import ProgressReporter, ProgressReportPickup, Report
from .stream import StreamQueue, StreamRedirection, register_stream_queue
from .progress_report import ProgressReporter


class CallbackImp:
Expand All @@ -17,74 +14,43 @@ def __init__(self) -> None:
self._lock: Lock # to be set by the StateMachine

def on_active(self) -> None:

self.queue: Queue[Report] = Queue()
self.notices_from_sub_processes: Queue[bool] = Queue()
self.stream_queue: StreamQueue = Queue()
self.reporter = ProgressReporter(
queue=self.queue,
notices_from_sub_processes=self.notices_from_sub_processes,
stream_queue=self.stream_queue,
)

self._start_pickup()

if self.stream_redirection.disabled:
self.reporter.stream_redirection_enabled = False

def _start_pickup(self) -> None:
presentation = create_presentation()
self.pickup = ProgressReportPickup(self.queue, presentation)

self.stream_redirection = StreamRedirection(
queue=self.stream_queue, presentation=presentation
)
self.stream_redirection.start()

def _end_pickup(self) -> None:
self.pickup.end()

self.stream_redirection.end()

def _restart_pickup(self) -> None:
self._end_pickup()
self._start_pickup()
self.reporter = ProgressReporter()
self.reporter.start_pickup()

@contextmanager
def fetch_reporter_in_active(self) -> Iterator[ProgressReporter | None]:
assert self.reporter

if not in_main_thread():
self.to_restart_pickup = False
yield self.reporter
return

self.to_restart_pickup = True

while not self.notices_from_sub_processes.empty():
_ = self.notices_from_sub_processes.get()

self.reporter.empty_notices()
self._machine.on_yielded()

try:
yield self.reporter
finally:
self._machine.on_resumed()

while not self.notices_from_sub_processes.empty():
_ = self.notices_from_sub_processes.get()
if self.reporter.empty_notices():
self.to_restart_pickup = False

if not self.to_restart_pickup:
return

with self._lock:
self._restart_pickup()
with self._lock: # NOTE: This lock is probably unnecessary
self.reporter.restart_pickup()

def flush_in_active(self) -> None:
self._restart_pickup()
assert self.reporter
self.reporter.restart_pickup()

def shutdown_in_active(self) -> None:
self._end_pickup()
assert self.reporter
self.reporter.end_pickup()

@contextmanager
def fetch_reporter_in_yielded(self) -> Iterator[ProgressReporter | None]:
Expand All @@ -100,16 +66,15 @@ def on_registered(self, reporter: ProgressReporter | None) -> None:
self.reporter = reporter
if reporter is None:
return
if reporter.stream_redirection_enabled:
register_stream_queue(reporter.stream_queue)
reporter.register()

@contextmanager
def fetch_reporter_in_registered(self) -> Iterator[ProgressReporter | None]:
if self.reporter is None:
yield self.reporter
return

self.reporter.notices_from_sub_processes.put(True)
self.reporter.notice()
yield self.reporter

def on_disabled(self) -> None:
Expand Down
59 changes: 43 additions & 16 deletions atpbar/progress_report/reporter.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import time
from multiprocessing import Queue
from typing import TYPE_CHECKING
from uuid import UUID

from atpbar.presentation import create_presentation
from atpbar.stream import StreamQueue, StreamRedirection, register_stream_queue

from .complement import ProgressReportComplementer
from .pickup import ProgressReportPickup
from .report import Report

if TYPE_CHECKING:
from atpbar.stream import StreamQueue

DEFAULT_INTERVAL = 0.1 # [second]


class ProgressReporter:
'''A progress reporter
'''A progress reporter.

NOTE: This docstring is outdated.

This class sends progress reports. The reports will be picked up
by the pickup (`ProgressReportPickup`), which uses the reports,
Expand Down Expand Up @@ -45,24 +47,49 @@ class ProgressReporter:
The queue through which this class sends progress reports.
'''

def __init__(
self,
queue: 'Queue[Report]',
notices_from_sub_processes: 'Queue[bool]',
stream_queue: 'StreamQueue',
) -> None:
self.queue = queue
def __init__(self) -> None:
self.queue: Queue[Report] = Queue()
self.notices_from_sub_processes: Queue[bool] = Queue()
self.stream_queue: StreamQueue = Queue()
self.interval = DEFAULT_INTERVAL # [second]
self.last_time = dict[UUID, float]()
self.complete_report = ProgressReportComplementer()
self.notices_from_sub_processes = notices_from_sub_processes
self.stream_queue = stream_queue
self.stream_redirection_enabled = True

def __repr__(self) -> str:
return '{}(queue={!r}, interval={!r})'.format(
self.__class__.__name__, self.queue, self.interval
return f'{self.__class__.__name__}(queue={self.queue!r}, interval={self.interval!r})'

def start_pickup(self) -> None:
presentation = create_presentation()
self.pickup = ProgressReportPickup(self.queue, presentation)

self.stream_redirection = StreamRedirection(
queue=self.stream_queue, presentation=presentation
)
self.stream_redirection.start()
self.stream_redirection_enabled = not self.stream_redirection.disabled

def end_pickup(self) -> None:
self.pickup.end()
self.stream_redirection.end()

def restart_pickup(self) -> None:
self.end_pickup()
self.start_pickup()

def notice(self) -> None:
self.notices_from_sub_processes.put(True)

def empty_notices(self) -> bool:
ret = False
while not self.notices_from_sub_processes.empty():
_ = self.notices_from_sub_processes.get()
ret = True
return ret

def register(self) -> None:
if self.stream_redirection_enabled:
register_stream_queue(self.stream_queue)

def report(self, report: Report) -> None:
'''send ``report`` to a progress monitor
Expand Down
5 changes: 2 additions & 3 deletions tests/progress_report/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ def mock_time(monkeypatch: pytest.MonkeyPatch) -> mock.Mock:

@pytest.fixture()
def obj(mock_queue: mock.Mock, mock_time: mock.Mock) -> ProgressReporter:
ret = ProgressReporter(
mock_queue, notices_from_sub_processes=mock.Mock(), stream_queue=mock.Mock()
)
ret = ProgressReporter()
ret.queue = mock_queue
return ret


Expand Down
8 changes: 4 additions & 4 deletions tests/scenarios/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from atpbar import callback, funcs
from atpbar import funcs
from atpbar.progress_report import reporter

from .utils import mock_presentations
Expand All @@ -11,17 +11,17 @@ class MockException(Exception):

func_machine = funcs._machine
interval = reporter.DEFAULT_INTERVAL
create_presentation = callback.create_presentation
create_presentation = reporter.create_presentation
try:
with mock_presentations() as presentations:
assert func_machine is not funcs._machine
assert interval != reporter.DEFAULT_INTERVAL
assert create_presentation is not callback.create_presentation
assert create_presentation is not reporter.create_presentation
presentations
raise MockException()
except MockException:
pass
finally:
assert func_machine is funcs._machine
assert interval == reporter.DEFAULT_INTERVAL
assert create_presentation is callback.create_presentation
assert create_presentation is reporter.create_presentation
2 changes: 1 addition & 1 deletion tests/scenarios/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def monkeypatch_reporter_interval() -> Iterator[None]:
def mock_create_presentation() -> Iterator[MockCreatePresentation]:
y = MockCreatePresentation()
with MonkeyPatch.context() as m:
m.setattr(callback, 'create_presentation', y)
m.setattr(reporter, 'create_presentation', y)
yield y


Expand Down