From 3a3bffb1e9bd30ac970dac3dac046299c7a03b5b Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 16 Jun 2020 23:30:54 -0700 Subject: [PATCH] Fix parallel runner (#21) * fix cache bug * update * update doc * add parallel runner * fix parallel runner --- adagio/__init__.py | 2 +- adagio/instances.py | 39 +++++++++--------------------------- tests/test_core_instances.py | 26 +++++++++++++----------- 3 files changed, 25 insertions(+), 42 deletions(-) diff --git a/adagio/__init__.py b/adagio/__init__.py index f1380ee..9cb17e7 100644 --- a/adagio/__init__.py +++ b/adagio/__init__.py @@ -1 +1 @@ -__version__ = "0.1.7" +__version__ = "0.1.8" diff --git a/adagio/instances.py b/adagio/instances.py index a1d16bf..caa8826 100644 --- a/adagio/instances.py +++ b/adagio/instances.py @@ -2,7 +2,6 @@ import logging import sys from abc import ABC, abstractmethod -from collections import defaultdict from enum import Enum from threading import Event, RLock from traceback import StackSummary, extract_stack @@ -151,48 +150,30 @@ def preprocess(self, wf: "_Workflow") -> List["_Task"]: wf._register(temp) if self._concurrency <= 1: return temp - tempdict = {x.execution_id: x for x in temp} - down: Dict[str, Set[str]] = defaultdict(set) - up: Dict[str, Set[str]] = {} - q: List[str] = [] - result: List["_Task"] = [] - for t in temp: - u = set(x.execution_id for x in t.upstream) # noqa: C401 - c = t.execution_id - up[c] = u - for x in u: - down[x].add(c) - if len(u) == 0: - q.append(c) - while len(q) > 0: - key = q.pop(0) - result.append(tempdict[key]) - for d in down[key]: - up[d].remove(key) - if len(up[d]) == 0: - q.append(d) - return result + return [t for t in temp if len(t.upstream) == 0] def run_tasks(self, tasks: List["_Task"]) -> None: if self._concurrency <= 1: for t in tasks: self.run_single(t) - else: - with cf.ThreadPoolExecutor(max_workers=self._concurrency) as e: - jobs = [] - for task in tasks: - jobs.append(e.submit(self.run_single, task)) + return + with cf.ThreadPoolExecutor(max_workers=self._concurrency) as e: + jobs = [e.submit(self.run_single, task) for task in tasks] + while jobs: for f in cf.as_completed(jobs): + jobs.remove(f) try: - f.result() + for task in f.result().downstream: + jobs.append(e.submit(self.run_single, task)) except Exception: self.context.abort() raise - def run_single(self, task: "_Task") -> None: + def run_single(self, task: "_Task") -> "_Task": task.update_by_cache() task.run() task.reraise() + return task class SequentialExecutionEngine(ParallelExecutionEngine): diff --git a/tests/test_core_instances.py b/tests/test_core_instances.py index f2cedc5..b9bc41e 100644 --- a/tests/test_core_instances.py +++ b/tests/test_core_instances.py @@ -15,6 +15,7 @@ from adagio.specs import InputSpec, OutputSpec, WorkflowSpec, _NodeSpec from pytest import raises from triad.exceptions import InvalidOperationError +from timeit import timeit def test_task_skip(): @@ -335,16 +336,18 @@ def test_workflow_run_parallel(): s.add("f", wait_task1, "c") hooks = MockHooks(None) ctx = WorkflowContext(hooks=hooks) - ctx._engine = ParallelExecutionEngine(2, ctx) - with raises(NotImplementedError): - ctx.run(s, {}) + ctx._engine = ParallelExecutionEngine(10, ctx) + + def run(): + with raises(NotImplementedError): + ctx.run(s, {}) + + t = timeit(run, number=1) + assert t < 0.2 # only a and b are executed + expected = {'a': 1} for k, v in expected.items(): assert v == hooks.res[k] - # theoretically c is not determined - assert "d" in hooks.skipped - assert "e" in hooks.skipped - assert "f" in hooks.skipped assert "b" in hooks.failed # order of execution @@ -358,11 +361,10 @@ def test_workflow_run_parallel(): hooks = MockHooks(None) ctx = WorkflowContext(hooks=hooks) ctx._engine = ParallelExecutionEngine(2, ctx) - ctx.run(s, {}) - res = list(hooks.res.keys()) - assert {"a", "b"} == set(res[0:2]) - assert {"c", "d"} == set(res[2:4]) - assert {"e", "f"} == set(res[4:6]) + t = timeit(lambda: ctx.run(s, {}), number=1) + assert t < 0.4 + assert 3 == hooks.res["e"] + assert 3 == hooks.res["f"] def test_workflow_run_with_exception():