From 61f422bf54d8d9972fc630b9e000be9e35594785 Mon Sep 17 00:00:00 2001 From: Sharpner6 <1sc2l4qi@duck.com> Date: Thu, 22 Aug 2024 20:15:24 -0400 Subject: [PATCH] Add cpu_time_s to ProfileResult - Add cpu_time_s to ProfileResult on the side of duration_s, duration_s means how long the task took to finish, cpu_time_s means how long the task used the cpu time - Updated README.md for wording --- README.md | 15 +++++++-------- scaler/about.py | 2 +- scaler/utility/metadata/profile_result.py | 5 +++-- scaler/worker/agent/profiling_manager.py | 16 ++++++++++++---- tests/test_profiling.py | 23 ++++++++++++++++++++++- 5 files changed, 45 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 5b24d26..084d07a 100644 --- a/README.md +++ b/README.md @@ -39,8 +39,7 @@ with Client(address="tcp://127.0.0.1:2345") as client: ``` Scaler is a suitable Dask replacement, offering significantly better scheduling performance for jobs with a large number -of lightweight tasks, while addressing some long-running issues with Dask such as deadlocks, load balancing issues, -messaging errors, among others. +of lightweight tasks while improving on load balancing, messaging and deadlocks. ## Features @@ -48,15 +47,15 @@ messaging errors, among others. - **Python** reference implementation, with **language agnostic messaging protocol** built on top of [Cap'n Proto](https://capnproto.org/) and [ZeroMQ](https://zeromq.org) - **Graph** scheduling, which supports [Dask](https://www.dask.org)-like graph computing, optionally you - can use [GraphBLAS](https://graphblas.org) for massive graph tasks -- **Automated load balancing**. automatically balance busy workers' loads to idle workers, keep every worker as busy as - possible -- **Automated recovery** from faulting workers or clients -- Supports for **nested tasks**. Tasks can themselves submit new tasks + can use [GraphBLAS](https://graphblas.org) for very large graph tasks +- **Automated load balancing**. automatically balances load from busy workers to idle workers and tries to keep workers + utilized as uniformly as possible +- **Automated task recovery** from faulting workers who have died +- Supports for **nested tasks**, tasks can themselves submit new tasks - `top`-like **monitoring tools** - GUI monitoring tool -Scaler's scheduler can be run on PyPy, which will provide a performance boost +Scaler's scheduler can be run on PyPy, which can provide a performance boost ## Installation diff --git a/scaler/about.py b/scaler/about.py index 29654ee..2d986fc 100644 --- a/scaler/about.py +++ b/scaler/about.py @@ -1 +1 @@ -__version__ = "1.8.0" +__version__ = "1.8.1" diff --git a/scaler/utility/metadata/profile_result.py b/scaler/utility/metadata/profile_result.py index 210fcc8..eb389db 100644 --- a/scaler/utility/metadata/profile_result.py +++ b/scaler/utility/metadata/profile_result.py @@ -9,11 +9,12 @@ class ProfileResult: duration_s: float = dataclasses.field(default=0.0) memory_peak: int = dataclasses.field(default=0) + cpu_time_s: float = dataclasses.field(default=0.0) - FORMAT = "!fQ" + FORMAT = "!fQf" # duration, memory peak, CPU time def serialize(self) -> bytes: - return struct.pack(self.FORMAT, self.duration_s, self.memory_peak) + return struct.pack(self.FORMAT, self.duration_s, self.memory_peak, self.cpu_time_s) @staticmethod def deserialize(data: bytes) -> "ProfileResult": diff --git a/scaler/worker/agent/profiling_manager.py b/scaler/worker/agent/profiling_manager.py index 06e4939..5bd7365 100644 --- a/scaler/worker/agent/profiling_manager.py +++ b/scaler/worker/agent/profiling_manager.py @@ -16,6 +16,7 @@ class _ProcessProfiler: current_task_id: Optional[bytes] = None start_time: Optional[float] = None + start_cpu_time: Optional[float] = None init_memory_rss: Optional[int] = None peak_memory_rss: Optional[int] = None @@ -46,7 +47,8 @@ def on_task_start(self, pid: int, task_id: bytes): process = process_profiler.process - process_profiler.start_time = self.__process_cpu_time(process) + process_profiler.start_time = self.__process_time() + process_profiler.start_cpu_time = self.__process_cpu_time(process) process_profiler.init_memory_rss = self.__process_memory_rss(process) process_profiler.peak_memory_rss = process_profiler.init_memory_rss @@ -65,14 +67,15 @@ def on_task_end(self, pid: int, task_id: bytes) -> ProfileResult: process = process_profiler.process - time_delta = self.__process_cpu_time(process) - process_profiler.start_time + time_delta = self.__process_time() - process_profiler.start_time + cpu_time_delta = self.__process_cpu_time(process) - process_profiler.start_cpu_time memory_delta = process_profiler.peak_memory_rss - process_profiler.init_memory_rss process_profiler.current_task_id = None process_profiler.init_memory_rss = None process_profiler.peak_memory_rss = None - return ProfileResult(time_delta, memory_delta) + return ProfileResult(time_delta, memory_delta, cpu_time_delta) async def routine(self): for process_profiler in self._process_profiler_by_pid.values(): @@ -82,9 +85,14 @@ async def routine(self): ) @staticmethod - def __process_cpu_time(process: psutil.Process) -> float: + def __process_time(): return time.monotonic() + @staticmethod + def __process_cpu_time(process: psutil.Process) -> float: + cpu_times = process.cpu_times() + return cpu_times.user + cpu_times.system + @staticmethod def __process_memory_rss(process: psutil.Process) -> int: return process.memory_info().rss diff --git a/tests/test_profiling.py b/tests/test_profiling.py index cb174bf..f12adc8 100644 --- a/tests/test_profiling.py +++ b/tests/test_profiling.py @@ -2,7 +2,6 @@ import unittest from scaler import Client, SchedulerClusterCombo - from tests.utility import get_available_tcp_port @@ -11,6 +10,14 @@ def dummy(n: int): return n * n +def busy_dummy(n: int): + start_time = time.time() + while time.time() - start_time < n: + pass + + return n * n + + class TestProfiling(unittest.TestCase): def setUp(self): self.address = f"tcp://127.0.0.1:{get_available_tcp_port()}" @@ -38,3 +45,17 @@ def test_future_incomplete(self) -> None: # Raises error because future is not done with self.assertRaises(ValueError): _ = fut.profiling_info().duration_s + + def test_cpu_time_busy(self) -> None: + fut = self.client.submit(busy_dummy, 1, profiling=True) + fut.result() + + cpu_time = fut.profiling_info().cpu_time_s + assert cpu_time > 0 + + def test_cpu_time_sleep(self) -> None: + fut = self.client.submit(dummy, 1, profiling=True) + fut.result() + + cpu_time = fut.profiling_info().cpu_time_s + assert cpu_time < 1