Skip to content

Commit

Permalink
Add cpu_time_s to ProfileResult
Browse files Browse the repository at this point in the history
- Add cpu_time_s to ProfileResult on the side of duration_s, duration_s
  means how long the task took to finish, cpu_time_s means how long the
  task used the cpu time
- Updated README.md for wording
  • Loading branch information
sharpener6 committed Aug 23, 2024
1 parent bafc407 commit 61f422b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 16 deletions.
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,23 @@ with Client(address="tcp://127.0.0.1:2345") as client:
```

Scaler is a suitable Dask replacement, offering significantly better scheduling performance for jobs with a large number
of lightweight tasks, while addressing some long-running issues with Dask such as deadlocks, load balancing issues,
messaging errors, among others.
of lightweight tasks while improving on load balancing, messaging and deadlocks.

## Features

- Distributed computing on **multiple cores and multiple servers**
- **Python** reference implementation, with **language agnostic messaging protocol** built on top of
[Cap'n Proto](https://capnproto.org/) and [ZeroMQ](https://zeromq.org)
- **Graph** scheduling, which supports [Dask](https://www.dask.org)-like graph computing, optionally you
can use [GraphBLAS](https://graphblas.org) for massive graph tasks
- **Automated load balancing**. automatically balance busy workers' loads to idle workers, keep every worker as busy as
possible
- **Automated recovery** from faulting workers or clients
- Supports for **nested tasks**. Tasks can themselves submit new tasks
can use [GraphBLAS](https://graphblas.org) for very large graph tasks
- **Automated load balancing**. automatically balances load from busy workers to idle workers and tries to keep workers
utilized as uniformly as possible
- **Automated task recovery** from faulting workers who have died
- Supports for **nested tasks**, tasks can themselves submit new tasks
- `top`-like **monitoring tools**
- GUI monitoring tool

Scaler's scheduler can be run on PyPy, which will provide a performance boost
Scaler's scheduler can be run on PyPy, which can provide a performance boost

## Installation

Expand Down
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.8.0"
__version__ = "1.8.1"
5 changes: 3 additions & 2 deletions scaler/utility/metadata/profile_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
class ProfileResult:
duration_s: float = dataclasses.field(default=0.0)
memory_peak: int = dataclasses.field(default=0)
cpu_time_s: float = dataclasses.field(default=0.0)

FORMAT = "!fQ"
FORMAT = "!fQf" # duration, memory peak, CPU time

def serialize(self) -> bytes:
return struct.pack(self.FORMAT, self.duration_s, self.memory_peak)
return struct.pack(self.FORMAT, self.duration_s, self.memory_peak, self.cpu_time_s)

@staticmethod
def deserialize(data: bytes) -> "ProfileResult":
Expand Down
16 changes: 12 additions & 4 deletions scaler/worker/agent/profiling_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class _ProcessProfiler:
current_task_id: Optional[bytes] = None

start_time: Optional[float] = None
start_cpu_time: Optional[float] = None
init_memory_rss: Optional[int] = None
peak_memory_rss: Optional[int] = None

Expand Down Expand Up @@ -46,7 +47,8 @@ def on_task_start(self, pid: int, task_id: bytes):

process = process_profiler.process

process_profiler.start_time = self.__process_cpu_time(process)
process_profiler.start_time = self.__process_time()
process_profiler.start_cpu_time = self.__process_cpu_time(process)
process_profiler.init_memory_rss = self.__process_memory_rss(process)
process_profiler.peak_memory_rss = process_profiler.init_memory_rss

Expand All @@ -65,14 +67,15 @@ def on_task_end(self, pid: int, task_id: bytes) -> ProfileResult:

process = process_profiler.process

time_delta = self.__process_cpu_time(process) - process_profiler.start_time
time_delta = self.__process_time() - process_profiler.start_time
cpu_time_delta = self.__process_cpu_time(process) - process_profiler.start_cpu_time
memory_delta = process_profiler.peak_memory_rss - process_profiler.init_memory_rss

process_profiler.current_task_id = None
process_profiler.init_memory_rss = None
process_profiler.peak_memory_rss = None

return ProfileResult(time_delta, memory_delta)
return ProfileResult(time_delta, memory_delta, cpu_time_delta)

async def routine(self):
for process_profiler in self._process_profiler_by_pid.values():
Expand All @@ -82,9 +85,14 @@ async def routine(self):
)

@staticmethod
def __process_cpu_time(process: psutil.Process) -> float:
def __process_time():
return time.monotonic()

@staticmethod
def __process_cpu_time(process: psutil.Process) -> float:
cpu_times = process.cpu_times()
return cpu_times.user + cpu_times.system

@staticmethod
def __process_memory_rss(process: psutil.Process) -> int:
return process.memory_info().rss
23 changes: 22 additions & 1 deletion tests/test_profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import unittest

from scaler import Client, SchedulerClusterCombo

from tests.utility import get_available_tcp_port


Expand All @@ -11,6 +10,14 @@ def dummy(n: int):
return n * n


def busy_dummy(n: int):
start_time = time.time()
while time.time() - start_time < n:
pass

return n * n


class TestProfiling(unittest.TestCase):
def setUp(self):
self.address = f"tcp://127.0.0.1:{get_available_tcp_port()}"
Expand Down Expand Up @@ -38,3 +45,17 @@ def test_future_incomplete(self) -> None:
# Raises error because future is not done
with self.assertRaises(ValueError):
_ = fut.profiling_info().duration_s

def test_cpu_time_busy(self) -> None:
fut = self.client.submit(busy_dummy, 1, profiling=True)
fut.result()

cpu_time = fut.profiling_info().cpu_time_s
assert cpu_time > 0

def test_cpu_time_sleep(self) -> None:
fut = self.client.submit(dummy, 1, profiling=True)
fut.result()

cpu_time = fut.profiling_info().cpu_time_s
assert cpu_time < 1

0 comments on commit 61f422b

Please sign in to comment.