Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cpu_time_s to ProfileResult #11

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,23 @@ with Client(address="tcp://127.0.0.1:2345") as client:
```

Scaler is a suitable Dask replacement, offering significantly better scheduling performance for jobs with a large number
of lightweight tasks, while addressing some long-running issues with Dask such as deadlocks, load balancing issues,
messaging errors, among others.
of lightweight tasks while improving on load balancing, messaging and deadlocks.

## Features

- Distributed computing on **multiple cores and multiple servers**
- **Python** reference implementation, with **language agnostic messaging protocol** built on top of
[Cap'n Proto](https://capnproto.org/) and [ZeroMQ](https://zeromq.org)
- **Graph** scheduling, which supports [Dask](https://www.dask.org)-like graph computing, optionally you
can use [GraphBLAS](https://graphblas.org) for massive graph tasks
- **Automated load balancing**. automatically balance busy workers' loads to idle workers, keep every worker as busy as
possible
- **Automated recovery** from faulting workers or clients
- Supports for **nested tasks**. Tasks can themselves submit new tasks
can use [GraphBLAS](https://graphblas.org) for very large graph tasks
- **Automated load balancing**. automatically balances load from busy workers to idle workers and tries to keep workers
utilized as uniformly as possible
- **Automated task recovery** from faulting workers who have died
- Supports for **nested tasks**, tasks can themselves submit new tasks
- `top`-like **monitoring tools**
- GUI monitoring tool

Scaler's scheduler can be run on PyPy, which will provide a performance boost
Scaler's scheduler can be run on PyPy, which can provide a performance boost

## Installation

Expand Down
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.8.0"
__version__ = "1.8.1"
5 changes: 3 additions & 2 deletions scaler/utility/metadata/profile_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
class ProfileResult:
duration_s: float = dataclasses.field(default=0.0)
memory_peak: int = dataclasses.field(default=0)
cpu_time_s: float = dataclasses.field(default=0.0)

FORMAT = "!fQ"
FORMAT = "!fQf" # duration, memory peak, CPU time

def serialize(self) -> bytes:
return struct.pack(self.FORMAT, self.duration_s, self.memory_peak)
return struct.pack(self.FORMAT, self.duration_s, self.memory_peak, self.cpu_time_s)

@staticmethod
def deserialize(data: bytes) -> "ProfileResult":
Expand Down
16 changes: 12 additions & 4 deletions scaler/worker/agent/profiling_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class _ProcessProfiler:
current_task_id: Optional[bytes] = None

start_time: Optional[float] = None
start_cpu_time: Optional[float] = None
init_memory_rss: Optional[int] = None
peak_memory_rss: Optional[int] = None

Expand Down Expand Up @@ -46,7 +47,8 @@ def on_task_start(self, pid: int, task_id: bytes):

process = process_profiler.process

process_profiler.start_time = self.__process_cpu_time(process)
process_profiler.start_time = self.__process_time()
process_profiler.start_cpu_time = self.__process_cpu_time(process)
process_profiler.init_memory_rss = self.__process_memory_rss(process)
process_profiler.peak_memory_rss = process_profiler.init_memory_rss

Expand All @@ -65,14 +67,15 @@ def on_task_end(self, pid: int, task_id: bytes) -> ProfileResult:

process = process_profiler.process

time_delta = self.__process_cpu_time(process) - process_profiler.start_time
time_delta = self.__process_time() - process_profiler.start_time
cpu_time_delta = self.__process_cpu_time(process) - process_profiler.start_cpu_time
memory_delta = process_profiler.peak_memory_rss - process_profiler.init_memory_rss

process_profiler.current_task_id = None
process_profiler.init_memory_rss = None
process_profiler.peak_memory_rss = None

return ProfileResult(time_delta, memory_delta)
return ProfileResult(time_delta, memory_delta, cpu_time_delta)

async def routine(self):
for process_profiler in self._process_profiler_by_pid.values():
Expand All @@ -82,9 +85,14 @@ async def routine(self):
)

@staticmethod
def __process_cpu_time(process: psutil.Process) -> float:
def __process_time():
return time.monotonic()

@staticmethod
def __process_cpu_time(process: psutil.Process) -> float:
cpu_times = process.cpu_times()
return cpu_times.user + cpu_times.system

@staticmethod
def __process_memory_rss(process: psutil.Process) -> int:
return process.memory_info().rss
23 changes: 22 additions & 1 deletion tests/test_profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import unittest

from scaler import Client, SchedulerClusterCombo

from tests.utility import get_available_tcp_port


Expand All @@ -11,6 +10,14 @@ def dummy(n: int):
return n * n


def busy_dummy(n: int):
start_time = time.time()
while time.time() - start_time < n:
pass

return n * n


class TestProfiling(unittest.TestCase):
def setUp(self):
self.address = f"tcp://127.0.0.1:{get_available_tcp_port()}"
Expand Down Expand Up @@ -38,3 +45,17 @@ def test_future_incomplete(self) -> None:
# Raises error because future is not done
with self.assertRaises(ValueError):
_ = fut.profiling_info().duration_s

def test_cpu_time_busy(self) -> None:
fut = self.client.submit(busy_dummy, 1, profiling=True)
fut.result()

cpu_time = fut.profiling_info().cpu_time_s
assert cpu_time > 0

def test_cpu_time_sleep(self) -> None:
fut = self.client.submit(dummy, 1, profiling=True)
fut.result()

cpu_time = fut.profiling_info().cpu_time_s
assert cpu_time < 1
Loading