Skip to content

Commit

Permalink
Fix cache bug (#19)
Browse files Browse the repository at this point in the history
* fix cache bug

* update

* update doc
  • Loading branch information
Han Wang authored May 30, 2020
1 parent 760b82e commit 7bf4cfe
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![PyPI license](https://img.shields.io/pypi/l/adagio.svg)](https://pypi.python.org/pypi/adagio/)
[![PyPI version](https://badge.fury.io/py/adagio.svg)](https://pypi.python.org/pypi/adagio/)
[![Coverage Status](https://coveralls.io/repos/github/fugue-project/adagio/badge.svg)](https://coveralls.io/github/fugue-project/adagio)
[![Doc](https://readthedocs.org/projects/adagio/badge)](https://traid.readthedocs.org)
[![Doc](https://readthedocs.org/projects/adag-io/badge)](https://adag-io.readthedocs.org)

A Dag IO framework for Fugue projects

Expand Down
2 changes: 1 addition & 1 deletion adagio/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.1.5"
__version__ = "0.1.6"
3 changes: 2 additions & 1 deletion adagio/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def preprocess(self, wf: "_Workflow") -> List["_Task"]:

def run_tasks(self, tasks: List["_Task"]) -> None:
for t in tasks:
t.update_by_cache()
t.run()
t.reraise()

Expand Down Expand Up @@ -659,7 +660,7 @@ def skip(self) -> None:

def run(self) -> None:
with self._lock:
if self.state == _State.SKIPPED:
if self.state in [_State.SKIPPED, _State.FINISHED]:
return
elif self.abort_requested:
self.skip()
Expand Down
39 changes: 38 additions & 1 deletion tests/test_core_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def test_workflow_run():
assert v == hooks.res[k]


def test_workflow_run():
def test_workflow_run_with_exception():
s = SimpleSpec()
s.add("a", example_helper_task0)
s.add("b", example_helper_task1e)
Expand All @@ -306,6 +306,41 @@ def test_workflow_run():
raises(NotImplementedError, lambda: ctx.run(s, {}))


def test_workflow_run_with_cache():
s = SimpleSpec()
s.add("a", example_helper_task0)
s.add("c", example_helper_task1)
cache = MockCache()
hooks1 = MockHooks(None)
ctx = WorkflowContext(cache=cache, hooks=hooks1)
ctx.run(s, {})
assert 2 == cache.get_called
assert 0 == cache.hit
assert 2 == cache.set_called
expected = {"a": 10, "c": 11}
for k, v in expected.items():
assert v == hooks1.res[k]
# for the second run, get cache will be called, set will not
ctx.run(s, {})
assert 4 == cache.get_called
assert 2 == cache.hit
assert 2 == cache.set_called
expected = {"a": 10, "c": 11}
for k, v in expected.items():
assert v == hooks1.res[k]
# for the third run, get cache will be called, set will not
hooks2 = MockHooks(None)
ctx = WorkflowContext(cache=cache, hooks=hooks2)
ctx.run(s, {})
assert 6 == cache.get_called
assert 4 == cache.hit
assert 2 == cache.set_called
expected = {"a": 10, "c": 11}
for k, v in expected.items():
assert v == hooks1.res[k]
# TODO: skip is not tested


def t1(ctx: TaskContext):
a = ctx.inputs.get_or_throw("a", int)
b = ctx.configs.get_or_throw("b", str)
Expand Down Expand Up @@ -393,6 +428,7 @@ def __init__(self, ctx=None):
self.set_called = 0
self.skip_called = 0
self.get_called = 0
self.hit = 0

def set(self, key: str, value: Any) -> None:
self.tb[key] = (False, value)
Expand All @@ -410,6 +446,7 @@ def get(self, key: str) -> Tuple[bool, bool, Any]:
return False, False, None
x = self.tb[key]
print("get", key)
self.hit += 1
return True, x[0], x[1]


Expand Down

0 comments on commit 7bf4cfe

Please sign in to comment.