Skip to content

Commit

Permalink
Merge pull request #60 from alphatwirl/dev
Browse files Browse the repository at this point in the history
Clean code
  • Loading branch information
TaiSakuma authored Jul 4, 2024
2 parents 74147f7 + 423a2fc commit ad16185
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 73 deletions.
63 changes: 14 additions & 49 deletions atpbar/callback.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from collections.abc import Iterator
from contextlib import contextmanager
from multiprocessing import Queue
from threading import Lock, current_thread, main_thread

from .machine import StateMachine
from .presentation import create_presentation
from .progress_report import ProgressReporter, ProgressReportPickup, Report
from .stream import StreamQueue, StreamRedirection, register_stream_queue
from .progress_report import ProgressReporter


class CallbackImp:
Expand All @@ -17,74 +14,43 @@ def __init__(self) -> None:
self._lock: Lock # to be set by the StateMachine

def on_active(self) -> None:

self.queue: Queue[Report] = Queue()
self.notices_from_sub_processes: Queue[bool] = Queue()
self.stream_queue: StreamQueue = Queue()
self.reporter = ProgressReporter(
queue=self.queue,
notices_from_sub_processes=self.notices_from_sub_processes,
stream_queue=self.stream_queue,
)

self._start_pickup()

if self.stream_redirection.disabled:
self.reporter.stream_redirection_enabled = False

def _start_pickup(self) -> None:
presentation = create_presentation()
self.pickup = ProgressReportPickup(self.queue, presentation)

self.stream_redirection = StreamRedirection(
queue=self.stream_queue, presentation=presentation
)
self.stream_redirection.start()

def _end_pickup(self) -> None:
self.pickup.end()

self.stream_redirection.end()

def _restart_pickup(self) -> None:
self._end_pickup()
self._start_pickup()
self.reporter = ProgressReporter()
self.reporter.start_pickup()

@contextmanager
def fetch_reporter_in_active(self) -> Iterator[ProgressReporter | None]:
assert self.reporter

if not in_main_thread():
self.to_restart_pickup = False
yield self.reporter
return

self.to_restart_pickup = True

while not self.notices_from_sub_processes.empty():
_ = self.notices_from_sub_processes.get()

self.reporter.empty_notices()
self._machine.on_yielded()

try:
yield self.reporter
finally:
self._machine.on_resumed()

while not self.notices_from_sub_processes.empty():
_ = self.notices_from_sub_processes.get()
if self.reporter.empty_notices():
self.to_restart_pickup = False

if not self.to_restart_pickup:
return

with self._lock:
self._restart_pickup()
with self._lock: # NOTE: This lock is probably unnecessary
self.reporter.restart_pickup()

def flush_in_active(self) -> None:
self._restart_pickup()
assert self.reporter
self.reporter.restart_pickup()

def shutdown_in_active(self) -> None:
self._end_pickup()
assert self.reporter
self.reporter.end_pickup()

@contextmanager
def fetch_reporter_in_yielded(self) -> Iterator[ProgressReporter | None]:
Expand All @@ -100,16 +66,15 @@ def on_registered(self, reporter: ProgressReporter | None) -> None:
self.reporter = reporter
if reporter is None:
return
if reporter.stream_redirection_enabled:
register_stream_queue(reporter.stream_queue)
reporter.register()

@contextmanager
def fetch_reporter_in_registered(self) -> Iterator[ProgressReporter | None]:
if self.reporter is None:
yield self.reporter
return

self.reporter.notices_from_sub_processes.put(True)
self.reporter.notice()
yield self.reporter

def on_disabled(self) -> None:
Expand Down
59 changes: 43 additions & 16 deletions atpbar/progress_report/reporter.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import time
from multiprocessing import Queue
from typing import TYPE_CHECKING
from uuid import UUID

from atpbar.presentation import create_presentation
from atpbar.stream import StreamQueue, StreamRedirection, register_stream_queue

from .complement import ProgressReportComplementer
from .pickup import ProgressReportPickup
from .report import Report

if TYPE_CHECKING:
from atpbar.stream import StreamQueue

DEFAULT_INTERVAL = 0.1 # [second]


class ProgressReporter:
'''A progress reporter
'''A progress reporter.
NOTE: This docstring is outdated.
This class sends progress reports. The reports will be picked up
by the pickup (`ProgressReportPickup`), which uses the reports,
Expand Down Expand Up @@ -45,24 +47,49 @@ class ProgressReporter:
The queue through which this class sends progress reports.
'''

def __init__(
self,
queue: 'Queue[Report]',
notices_from_sub_processes: 'Queue[bool]',
stream_queue: 'StreamQueue',
) -> None:
self.queue = queue
def __init__(self) -> None:
self.queue: Queue[Report] = Queue()
self.notices_from_sub_processes: Queue[bool] = Queue()
self.stream_queue: StreamQueue = Queue()
self.interval = DEFAULT_INTERVAL # [second]
self.last_time = dict[UUID, float]()
self.complete_report = ProgressReportComplementer()
self.notices_from_sub_processes = notices_from_sub_processes
self.stream_queue = stream_queue
self.stream_redirection_enabled = True

def __repr__(self) -> str:
return '{}(queue={!r}, interval={!r})'.format(
self.__class__.__name__, self.queue, self.interval
return f'{self.__class__.__name__}(queue={self.queue!r}, interval={self.interval!r})'

def start_pickup(self) -> None:
presentation = create_presentation()
self.pickup = ProgressReportPickup(self.queue, presentation)

self.stream_redirection = StreamRedirection(
queue=self.stream_queue, presentation=presentation
)
self.stream_redirection.start()
self.stream_redirection_enabled = not self.stream_redirection.disabled

def end_pickup(self) -> None:
self.pickup.end()
self.stream_redirection.end()

def restart_pickup(self) -> None:
self.end_pickup()
self.start_pickup()

def notice(self) -> None:
self.notices_from_sub_processes.put(True)

def empty_notices(self) -> bool:
ret = False
while not self.notices_from_sub_processes.empty():
_ = self.notices_from_sub_processes.get()
ret = True
return ret

def register(self) -> None:
if self.stream_redirection_enabled:
register_stream_queue(self.stream_queue)

def report(self, report: Report) -> None:
'''send ``report`` to a progress monitor
Expand Down
5 changes: 2 additions & 3 deletions tests/progress_report/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ def mock_time(monkeypatch: pytest.MonkeyPatch) -> mock.Mock:

@pytest.fixture()
def obj(mock_queue: mock.Mock, mock_time: mock.Mock) -> ProgressReporter:
ret = ProgressReporter(
mock_queue, notices_from_sub_processes=mock.Mock(), stream_queue=mock.Mock()
)
ret = ProgressReporter()
ret.queue = mock_queue
return ret


Expand Down
8 changes: 4 additions & 4 deletions tests/scenarios/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from atpbar import callback, funcs
from atpbar import funcs
from atpbar.progress_report import reporter

from .utils import mock_presentations
Expand All @@ -11,17 +11,17 @@ class MockException(Exception):

func_machine = funcs._machine
interval = reporter.DEFAULT_INTERVAL
create_presentation = callback.create_presentation
create_presentation = reporter.create_presentation
try:
with mock_presentations() as presentations:
assert func_machine is not funcs._machine
assert interval != reporter.DEFAULT_INTERVAL
assert create_presentation is not callback.create_presentation
assert create_presentation is not reporter.create_presentation
presentations
raise MockException()
except MockException:
pass
finally:
assert func_machine is funcs._machine
assert interval == reporter.DEFAULT_INTERVAL
assert create_presentation is callback.create_presentation
assert create_presentation is reporter.create_presentation
2 changes: 1 addition & 1 deletion tests/scenarios/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def monkeypatch_reporter_interval() -> Iterator[None]:
def mock_create_presentation() -> Iterator[MockCreatePresentation]:
y = MockCreatePresentation()
with MonkeyPatch.context() as m:
m.setattr(callback, 'create_presentation', y)
m.setattr(reporter, 'create_presentation', y)
yield y


Expand Down

0 comments on commit ad16185

Please sign in to comment.