Skip to content

Commit

Permalink
add tests and maintenance (#19)
Browse files Browse the repository at this point in the history
* update readme

* add tests

* run tests in CI

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add `coverage`

* fix tests

* bump zntrack pre-release

* add dev pandas

* fix tests

* test with datafile (WIP)

* more invovled graph

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* important bugfix

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* use logging

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix tests with logging instead of print

* ignore non-pipeline stages

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
PythonFZ and pre-commit-ci[bot] authored Sep 27, 2024
1 parent 152ba60 commit c14f04e
Show file tree
Hide file tree
Showing 8 changed files with 613 additions and 665 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: tests

on:
push:
branches: [ main ]
pull_request:
schedule:
- cron: '14 3 * * 1' # at 03:14 on Monday.

jobs:
pytest:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
python-version:
- "3.12"
- "3.11"
- "3.10"
os:
- ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install Poetry
uses: snok/install-poetry@v1
with:
virtualenvs-create: true
virtualenvs-in-project: true
- name: Install package
run: |
poetry install --no-interaction
- name: Setup git user
run: |
git config --global user.name "John Doe"
git config --global user.email [email protected]
git config --global init.defaultBranch "main"
- name: Pytest
run: |
poetry run python --version
poetry run coverage run -m pytest -vv
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,33 @@ For more information, run:
paraffin --help
```

## Labels

You can run `paraffin` in multiple processes (e.g. on different hardware with a
shared file system). To specify where a `stage` should run, you can assign
labels to each worker.

```
paraffin --labels GPU # on a GPU node
paraffin --label CPU intel # on a CPU node
```

To configure the stages you need to create a `paraffin.yaml` file as follows:

```yaml
labels:
GPU_TASK:
- GPU
CPU_TASK:
- CPU
SPECIAL_CPU_TASK:
- CPU
- intel
```
All `stages` that are not part of the `paraffin.yaml` will choose any of the
available workers.

> \[!TIP\] If you are building Python-based workflows with DVC, consider trying
> our other project [ZnTrack](https://zntrack.readthedocs.io/) for a more
> Pythonic way to define workflows.
110 changes: 71 additions & 39 deletions paraffin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@
from dvc.stage.cache import RunCacheNotFoundError

log = logging.getLogger(__name__)
log.setLevel(logging.INFO) # Ensure the logger itself is set to INFO or lower

# Attach a logging handler to print info to stdout
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)

# Set a format for the handler
formatter = logging.Formatter("%(asctime)s %(message)s")
handler.setFormatter(formatter)

log.addHandler(handler)

app = typer.Typer()

Expand Down Expand Up @@ -61,13 +72,13 @@ def run_stage(stage_name: str, max_retries: int) -> bool:
with dvc.repo.Repo() as repo:
for _ in range(max_retries):
with contextlib.suppress(LockError):
with repo.lock:
with dvc.repo.lock_repo(repo):
stages = list(repo.stage.collect(stage_name))
if len(stages) != 1:
raise RuntimeError(f"Stage {stage_name} not found.")
stage = stages[0]
if stage.already_cached():
print(f"Stage '{stage_name}' didn't change, skipping")
log.info(f"Stage '{stage_name}' didn't change, skipping")
return True
# try to restore the stage from the cache
# https://github.com/iterative/dvc/blob/main/dvc/stage/run.py#L166
Expand All @@ -78,13 +89,13 @@ def run_stage(stage_name: str, max_retries: int) -> bool:
# no LockError was raised and no return was
# executed -> the stage was not found in the cache

print(f"Running stage '{stage_name}':")
print(f"> {stage.cmd}")
log.info(f"Running stage '{stage_name}':")
log.info(f"> {stage.cmd}")
subprocess.check_call(stage.cmd, shell=True)

for _ in range(max_retries):
with contextlib.suppress(LockError):
with repo.lock:
with dvc.repo.lock_repo(repo):
stage.save()
stage.commit()
stage.dump(update_pipeline=False)
Expand Down Expand Up @@ -118,44 +129,54 @@ def execute_graph(
glob: bool = False,
):
with dvc.repo.Repo() as repo:
# graph: nx.DiGraph = repo.index.graph
# add to the existing graph
global graph
graph.add_nodes_from(repo.index.graph.nodes)
graph.add_edges_from(repo.index.graph.edges)

positions.update(get_tree_layout(graph))

# reverse the graph
graph = graph.reverse()

# construct a subgraph of the targets and their dependencies
if targets:
if not glob:
selected_stages = [
stage for stage in graph.nodes if stage.addressing in targets
]
else:
selected_stages = [
stage
for stage in graph.nodes
if any(
fnmatch.fnmatch(stage.addressing, target) for target in targets
)
]
log.debug(f"Selected stages: {selected_stages} from {targets}")

graph = get_predecessor_subgraph(graph, selected_stages)
log.debug(f"Graph: {graph}")
stages.extend(list(reversed(list(nx.topological_sort(graph)))))

print(f"Running {len(stages)} stages using {max_workers} workers.")
with dvc.repo.lock_repo(repo):
# graph: nx.DiGraph = repo.index.graph
# add to the existing graph
global graph
graph.add_nodes_from(repo.index.graph.nodes)
graph.add_edges_from(repo.index.graph.edges)

positions.update(get_tree_layout(graph))

# reverse the graph
graph = graph.reverse()

# construct a subgraph of the targets and their dependencies
if targets:
if not glob:
selected_stages = [
stage for stage in graph.nodes if stage.addressing in targets
]
else:
selected_stages = [
stage
for stage in graph.nodes
if any(
fnmatch.fnmatch(stage.addressing, target)
for target in targets
)
]
log.debug(f"Selected stages: {selected_stages} from {targets}")

graph = get_predecessor_subgraph(graph, selected_stages)
log.debug(f"Graph: {graph}")
stages.extend(list(reversed(list(nx.topological_sort(graph)))))
for stage in stages:
if stage.cmd is None:
# skip non-PipeLineStages
finished.add(stage.addressing)
# if stage.already_cached(): # this is not correct! If there are changed deps, this is true altough it should be false!
# finished.add(stage.addressing)
# warnings.warn(f"{stage.addressing} {stage.already_cached() = } ")

pipeline_stages = [x for x in stages if x.addressing not in finished]
log.info(f"Running {len(pipeline_stages)} stages using {max_workers} workers.")
try:
with ProcessPoolExecutor(max_workers=max_workers) as executor:
while len(finished) < len(stages):
# TODO: consider using proper workers / queues like celery with file system broker ?

for stage in stages:
for stage in pipeline_stages:
# shuffling the stages might lead to better performance with multiple workers
if (
len(submitted) >= max_workers
Expand Down Expand Up @@ -191,7 +212,7 @@ def execute_graph(
for stage_addressing in list(submitted.keys()):
get_paraffin_stage_file(stage_addressing).unlink(missing_ok=True)

print("Finished running all stages.")
log.info("Finished running all stages.")


@app.command()
Expand All @@ -214,6 +235,17 @@ def main(
),
):
"""Run DVC stages in parallel."""
# we need the globals for the dashboard >_<
global graph, stages, finished, submitted, positions

if not all(len(x) == 0 for x in [finished, submitted, positions, stages, graph]):
graph = nx.DiGraph()
stages.clear()
finished.clear()
submitted.clear()
positions.clear()
typer.echo("Found existing global variables, resetting.", err=True)

update_gitignore()

if verbose:
Expand Down
2 changes: 1 addition & 1 deletion paraffin/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dash import dcc, html
from dash.dependencies import Input, Output

from .cli import finished, graph, positions, submitted
from paraffin.cli import finished, graph, positions, submitted

log = logging.getLogger("werkzeug")
log.setLevel(logging.ERROR)
Expand Down
Loading

0 comments on commit c14f04e

Please sign in to comment.