Skip to content

Commit

Permalink
Use capnp to serialize/deserialize
Browse files Browse the repository at this point in the history
- Replace customized zmq frames serialization with Cap'n Proto to
  pave the road to implement language agnostic scheduler
  implementation, so make C++ implementation of scheduler feasible.
- Replace legacy setup.py with pyproject.toml
- Add configuration for black, flake8, and mypy
- Change to monotonic time for profiling
- Fix mypy warnings

Signed-off-by: Sharpner6 <[email protected]>
  • Loading branch information
sharpener6 committed Aug 22, 2024
1 parent c7df9a8 commit 0236fde
Show file tree
Hide file tree
Showing 75 changed files with 1,927 additions and 1,223 deletions.
39 changes: 39 additions & 0 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python

name: Python Linter And Unittest

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

permissions:
contents: read

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Set up Python 3.8
uses: actions/setup-python@v3
with:
python-version: "3.8"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pyproject-flake8 mypy
pip install -r requirements.txt
- name: Lint with flake8
run: |
pflake8 .
- name: Lint with MyPy
run: |
mypy .
- name: Run python unittest
run: |
python -m unittest discover -v tests
20 changes: 20 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
__pychace__/
*.py[cod]
*$py.class

build/
dist/
sdist/
wheels/
eggs/
.eggs/
.idea/
*.egg-info/
*.egg
.mypy_cache/

venv*/
.vscode/

dask-worker-space/*
.pre-commit-config.yaml
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ _doc:
rm -fr docsvenv build; mkdir build
python3.8 -m venv docsvenv
. docsvenv/bin/activate; \
pip install -r docs/requirements_docs.txt; \
pip install -r requirements.txt; \
cd docs; make clean && make html
pip install -r docs/requirements_docs.txt; \
pip install -r requirements.txt; \
cd docs; make clean && make html
zip -r build/scaler_docs.zip docs/build/html/*
47 changes: 29 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<img src="https://github.com/citi.png" alt="Citi" width="80" height="80">
</a>

<h3 align="center">Citi/scaler</h3>
<h3 align="center">Citi/scaler</h3>

<p align="center">
Efficient, lightweight and reliable distributed computation engine.
Expand All @@ -22,6 +22,9 @@
with a stable and language agnostic protocol for client and worker communications.

```python
import math
from scaler import Client

with Client(address="tcp://127.0.0.1:2345") as client:
# Submits 100 tasks
futures = [
Expand All @@ -43,20 +46,22 @@ messaging errors, among others.

- Distributed computing on **multiple cores and multiple servers**
- **Python** reference implementation, with **language agnostic messaging protocol** built on top of
[ZeroMQ](https://zeromq.org)
[Cap'n Proto](https://capnproto.org/) and [ZeroMQ](https://zeromq.org)
- **Graph** scheduling, which supports [Dask](https://www.dask.org)-like graph computing, optionally you
can use [GraphBLAS](https://graphblas.org)
- **Automated load balancing**. When workers got full of tasks, these will be scheduled to idle workers
can use [GraphBLAS](https://graphblas.org) for massive graph tasks
- **Automated load balancing**. automatically balance busy workers' loads to idle workers, keep every worker as busy as
possible
- **Automated recovery** from faulting workers or clients
- Supports for **nested tasks**. Tasks can themselves submit new tasks
- `top`-like **monitoring tools**
- GUI monitoring tool

Scaler's scheduler can be run on PyPy, which will provide a performance boost

## Installation

```bash
$ pip instal scaler
$ pip install scaler

# or with graphblas and uvloop support
$ pip install scaler[graphblas,uvloop]
Expand All @@ -77,7 +82,6 @@ A local scheduler and a local set of workers can be conveniently spawn using `Sc
```python
from scaler import SchedulerClusterCombo


cluster = SchedulerClusterCombo(address="tcp://127.0.0.1:2345", n_workers=4)

...
Expand Down Expand Up @@ -154,9 +158,11 @@ from scaler import Client
def inc(i):
return i + 1


def add(a, b):
return a + b


def minus(a, b):
return a - b

Expand All @@ -169,7 +175,7 @@ graph = {
"e": (minus, "d", "c") # e = d - c = 4 - 3 = 1
}

with Client(address="tcp://127.0.0.1:2345") as client
with Client(address="tcp://127.0.0.1:2345") as client:
result = client.get(graph, keys=["e"])
print(result) # {"e": 1}
```
Expand All @@ -179,18 +185,21 @@ with Client(address="tcp://127.0.0.1:2345") as client
Scaler allows tasks to submit new tasks while being executed. Scaler also supports recursive task calls.

```python
def fibonacci(client: Client, n: int):
from scaler import Client


def fibonacci(clnt: Client, n: int):
if n == 0:
return 0
elif n == 1:
return 1
else:
a = client.submit(fibonacci, client, n - 1)
b = client.submit(fibonacci, client, n - 2)
a = clnt.submit(fibonacci, clnt, n - 1)
b = clnt.submit(fibonacci, clnt, n - 2)
return a.result() + b.result()


with Client(address="tcp://127.0.0.1:2345") as client
with Client(address="tcp://127.0.0.1:2345") as client:
result = client.submit(fibonacci, client, 8).result()
print(result) # 21
```
Expand Down Expand Up @@ -256,11 +265,11 @@ W|Linux|15943|a7fe8b5e+ 0.0% 30.7m 0.0% 28.3m 1000 0 0 |
- function_id_to_tasks section shows task count for each function used
- worker section shows worker details, you can use shortcuts to sort by columns, the char * on column header show which
column is sorted right now
- agt_cpu/agt_rss means cpu/memory usage of worker agent
- cpu/rss means cpu/memory usage of worker
- free means number of free task slots for this worker
- sent means how many tasks scheduler sent to the worker
- queued means how many tasks worker received and queued
- agt_cpu/agt_rss means cpu/memory usage of worker agent
- cpu/rss means cpu/memory usage of worker
- free means number of free task slots for this worker
- sent means how many tasks scheduler sent to the worker
- queued means how many tasks worker received and queued

### From the web UI

Expand All @@ -274,7 +283,8 @@ This will open a web server on port `8081`.

## Contributing

Your contributions are at the core of making this a true open source project. Any contributions you make are **greatly appreciated**.
Your contributions are at the core of making this a true open source project. Any contributions you make are **greatly
appreciated**.

We welcome you to:

Expand All @@ -297,4 +307,5 @@ This project is distributed under the [Apache-2.0 License](https://www.apache.or

## Contact

If you have a query or require support with this project, [raise an issue](https://github.com/Citi/scaler/issues). Otherwise, reach out to [[email protected]](mailto:[email protected]).
If you have a query or require support with this project, [raise an issue](https://github.com/Citi/scaler/issues).
Otherwise, reach out to [[email protected]](mailto:[email protected]).
67 changes: 67 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
[build-system]
requires = ["setuptools", "setuptools-scm", "mypy", "black", "flake8", "pyproject-flake8"]
build-backend = "setuptools.build_meta"

[project]
name = "scaler"
description = "Scaler Distribution Framework"
requires-python = ">=3.8"
readme = { file = "README.md", content-type = "text/markdown" }
license = { text = "Apache 2.0" }
authors = [{ name = "Citi", email = "[email protected]" }]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"Topic :: System :: Distributed Computing",
]
dynamic = ["dependencies", "version"]

[project.urls]
Home = "https://github.com/Citi/scaler"

[project.scripts]
scaler_scheduler = "scaler.entry_points.scheduler:main"
scaler_cluster = "scaler.entry_points.cluster:main"
scaler_top = "scaler.entry_points.top:main"
scaler_ui = "scaler.entry_points.webui:main"

[project.optional-dependencies]
uvloop = ["uvloop"]
graphblas = ["python-graphblas", "numpy"]
gui = ["nicegui[plotly]"]
all = ["python-graphblas", "numpy", "uvloop", "nicegui[plotly]"]

[tool.setuptools]
packages = ["scaler"]
include-package-data = true

[tool.setuptools.dynamic]
dependencies = { file = "requirements.txt" }
version = { attr = "scaler.about.__version__" }

[tool.mypy]
no_strict_optional = true
check_untyped_defs = true
ignore_missing_imports = true
exclude = [
"^docs.*$",
"^benchmark.*$",
"^venv.*$"
]

[tool.flake8]
count = true
statistics = true
max-line-length = 120
extend-ignore = ["E203"]
exclude = "venv312"

[tool.black]
line-length = 120
skip-magic-trailing-comma = true

[metadata]
long_description = { file = "README.md" }
long_description_content_type = "text/markdown"
9 changes: 5 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pyzmq
psutil
bidict
cloudpickle
graphlib-backport; python_version < '3.9'
psutil
pycapnp
pyzmq
tblib
bidict
graphlib-backport; python_version < '3.9'
3 changes: 2 additions & 1 deletion run_top.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from scaler.entry_points.top import main
from scaler.utility.debug import pdb_wrapped

if __name__ == "__main__":
main()
pdb_wrapped(main)()
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7.14"
__version__ = "1.8.0"
6 changes: 3 additions & 3 deletions scaler/client/agent/client_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
ClientShutdownResponse,
GraphTask,
GraphTaskCancel,
MessageVariant,
ObjectInstruction,
ObjectRequest,
ObjectResponse,
Task,
TaskCancel,
TaskResult,
)
from scaler.protocol.python.mixins import Message
from scaler.utility.event_loop import create_async_loop_routine
from scaler.utility.exceptions import ClientCancelledException, ClientQuitException, ClientShutdownException
from scaler.utility.zmq_config import ZMQConfig
Expand Down Expand Up @@ -113,7 +113,7 @@ def run(self):
self.__initialize()
self.__run_loop()

async def __on_receive_from_client(self, message: MessageVariant):
async def __on_receive_from_client(self, message: Message):
if isinstance(message, ClientDisconnect):
await self._disconnect_manager.on_client_disconnect(message)
return
Expand Down Expand Up @@ -144,7 +144,7 @@ async def __on_receive_from_client(self, message: MessageVariant):

raise TypeError(f"Unknown {message=}")

async def __on_receive_from_scheduler(self, message: MessageVariant):
async def __on_receive_from_scheduler(self, message: Message):
if isinstance(message, ClientShutdownResponse):
await self._disconnect_manager.on_client_shutdown_response(message)
return
Expand Down
4 changes: 2 additions & 2 deletions scaler/client/agent/disconnect_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from scaler.client.agent.mixins import DisconnectManager
from scaler.io.async_connector import AsyncConnector
from scaler.protocol.python.message import ClientDisconnect, ClientShutdownResponse, DisconnectType
from scaler.protocol.python.message import ClientDisconnect, ClientShutdownResponse
from scaler.utility.exceptions import ClientQuitException, ClientShutdownException


Expand All @@ -18,7 +18,7 @@ def register(self, connector_internal: AsyncConnector, connector_external: Async
async def on_client_disconnect(self, disconnect: ClientDisconnect):
await self._connector_external.send(disconnect)

if disconnect.type == DisconnectType.Disconnect:
if disconnect.disconnect_type == ClientDisconnect.DisconnectType.Disconnect:
raise ClientQuitException("client disconnecting")

async def on_client_shutdown_response(self, response: ClientShutdownResponse):
Expand Down
8 changes: 5 additions & 3 deletions scaler/client/agent/future_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
import threading
from concurrent.futures import InvalidStateError
from concurrent.futures import InvalidStateError, Future
from typing import Dict, Tuple

from scaler.client.agent.mixins import FutureManager
from scaler.client.future import ScalerFuture
from scaler.client.serializer.mixins import Serializer
from scaler.protocol.python.message import ObjectResponse, TaskResult, TaskStatus
from scaler.protocol.python.common import TaskStatus
from scaler.protocol.python.message import ObjectResponse, TaskResult
from scaler.utility.exceptions import DisconnectedError, NoWorkerError, TaskNotFoundError, WorkerDiedError
from scaler.utility.metadata.profile_result import retrieve_profiling_result_from_task_result
from scaler.utility.object_utility import deserialize_failure
Expand All @@ -20,7 +21,8 @@ def __init__(self, serializer: Serializer):
self._task_id_to_future: Dict[bytes, ScalerFuture] = dict()
self._object_id_to_future: Dict[bytes, Tuple[TaskStatus, ScalerFuture]] = dict()

def add_future(self, future: ScalerFuture):
def add_future(self, future: Future):
assert isinstance(future, ScalerFuture)
with self._lock:
future.set_running_or_notify_cancel()
self._task_id_to_future[future.task_id] = future
Expand Down
Loading

0 comments on commit 0236fde

Please sign in to comment.