Skip to content

Commit

Permalink
pin great expectations version and run local cache tests serially (#2205
Browse files Browse the repository at this point in the history
)

* pin great expectations version

Signed-off-by: Samhita Alla <[email protected]>

* Force test_local_cache tests to run serially

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Samhita Alla <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
samhita-alla and eapolinario authored Feb 22, 2024
1 parent d1d1bd9 commit 95ea92d
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
14 changes: 9 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ export REPOSITORY=flytekit
PIP_COMPILE = pip-compile --upgrade --verbose --resolver=backtracking
MOCK_FLYTE_REPO=tests/flytekit/integration/remote/mock_flyte_repo/workflows
PYTEST_OPTS ?= -n auto --dist=loadfile
PYTEST = pytest ${PYTEST_OPTS}
PYTEST_AND_OPTS = pytest ${PYTEST_OPTS}
PYTEST = pytest

.SILENT: help
.PHONY: help
Expand Down Expand Up @@ -62,19 +63,22 @@ unit_test_extras_codecov:
unit_test:
# Skip all extra tests and run them with the necessary env var set so that a working (albeit slower)
# library is used to serialize/deserialize protobufs is used.
$(PYTEST) -m "not sandbox_test" tests/flytekit/unit/ --ignore=tests/flytekit/unit/extras/ --ignore=tests/flytekit/unit/models ${CODECOV_OPTS}
$(PYTEST_AND_OPTS) -m "not (serial or sandbox_test)" tests/flytekit/unit/ --ignore=tests/flytekit/unit/extras/ --ignore=tests/flytekit/unit/models ${CODECOV_OPTS}
# Run serial tests without any parallelism
$(PYTEST) -m "serial" tests/flytekit/unit/ --ignore=tests/flytekit/unit/extras/ --ignore=tests/flytekit/unit/models ${CODECOV_OPTS}


.PHONY: unit_test_extras
unit_test_extras:
PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python $(PYTEST) tests/flytekit/unit/extras ${CODECOV_OPTS}
PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python $(PYTEST_AND_OPTS) tests/flytekit/unit/extras ${CODECOV_OPTS}

.PHONY: test_serialization_codecov
test_serialization_codecov:
$(MAKE) CODECOV_OPTS="--cov=./ --cov-report=xml --cov-append" test_serialization

.PHONY: test_serialization
test_serialization:
$(PYTEST) tests/flytekit/unit/models ${CODECOV_OPTS}
$(PYTEST_AND_OPTS) tests/flytekit/unit/models ${CODECOV_OPTS}


.PHONY: integration_test_codecov
Expand All @@ -83,7 +87,7 @@ integration_test_codecov:

.PHONY: integration_test
integration_test:
$(PYTEST) tests/flytekit/integration ${CODECOV_OPTS}
$(PYTEST_AND_OPTS) tests/flytekit/integration ${CODECOV_OPTS}

doc-requirements.txt: export CUSTOM_COMPILE_COMMAND := make doc-requirements.txt
doc-requirements.txt: doc-requirements.in install-piptools
Expand Down
2 changes: 1 addition & 1 deletion plugins/flytekit-greatexpectations/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

plugin_requires = [
"flytekit>=1.5.0,<2.0.0",
"great-expectations>=0.13.30",
"great-expectations>=0.13.30,<=0.18.8",
"sqlalchemy>=1.4.23,<2.0.0",
"pyspark==3.3.1",
"s3fs<2023.6.0",
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ norecursedirs = ["common", "workflows", "spark", "fsspec"]
log_cli = true
log_cli_level = 20
markers = [
# unit tests that are really integration tests that run on a sandbox environment
"sandbox_test: fake integration tests",
"sandbox_test: fake integration tests", # unit tests that are really integration tests that run on a sandbox environment
"serial: tests to avoid using with pytest-xdist",
]

[tool.coverage.report]
Expand Down
18 changes: 18 additions & 0 deletions tests/flytekit/unit/core/test_local_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def setup():
LocalTaskCache.clear()


@pytest.mark.serial
def test_to_confirm_that_cache_keys_include_function_name():
"""
This test confirms that the function name is part of the cache key. It does so by defining 2 tasks with
Expand Down Expand Up @@ -67,6 +68,7 @@ def wf(n: int) -> typing.Tuple[int, int]:
assert wf(n=1) == (1, 2)


@pytest.mark.serial
def test_single_task_workflow():
@task(cache=True, cache_version="v1")
def is_even(n: int) -> bool:
Expand Down Expand Up @@ -101,6 +103,7 @@ def check_evenness(n: int) -> bool:
assert n_cached_task_calls == 2


@pytest.mark.serial
def test_cache_can_be_disabled(monkeypatch):
monkeypatch.setenv("FLYTE_LOCAL_CACHE_ENABLED", "false")

Expand All @@ -120,6 +123,7 @@ def is_even(n: int) -> bool:
assert n_cached_task_calls == 2


@pytest.mark.serial
def test_cache_can_be_overwrited(monkeypatch):
@task(cache=True, cache_version="v1")
def is_even(n: int) -> bool:
Expand Down Expand Up @@ -157,6 +161,7 @@ def is_even(n: int) -> bool:
assert n_cached_task_calls == 2


@pytest.mark.serial
def test_shared_tasks_in_two_separate_workflows():
@task(cache=True, cache_version="0.0.1")
def is_odd(n: int) -> bool:
Expand Down Expand Up @@ -188,6 +193,7 @@ def check_oddness_wf2(n: int) -> bool:


@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
@pytest.mark.serial
def test_sql_task():
import pandas as pd

Expand Down Expand Up @@ -222,6 +228,7 @@ def my_wf() -> FlyteSchema:
assert n_cached_task_calls == 1


@pytest.mark.serial
def test_wf_custom_types():
@dataclass
class MyCustomType(DataClassJsonMixin):
Expand Down Expand Up @@ -258,6 +265,7 @@ def my_wf(a: int, b: str) -> (MyCustomType, int):


@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
@pytest.mark.serial
def test_wf_schema_to_df():
import pandas as pd

Expand Down Expand Up @@ -293,6 +301,7 @@ def wf() -> int:
assert n_cached_task_calls == 2


@pytest.mark.serial
def test_dict_wf_with_constants():
@task(cache=True, cache_version="v99")
def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str):
Expand Down Expand Up @@ -324,6 +333,7 @@ def my_wf(a: int, b: str) -> (int, str):
assert n_cached_task_calls == 2


@pytest.mark.serial
def test_set_integer_literal_hash_is_cached():
"""
Test to confirm that the local cache is set in the case of integers, even if we
Expand Down Expand Up @@ -362,6 +372,7 @@ def wf(a: int) -> int:


@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
@pytest.mark.serial
def test_pass_annotated_to_downstream_tasks():
@task
def t0(a: int) -> Annotated[int, HashMethod(function=str)]:
Expand Down Expand Up @@ -390,6 +401,7 @@ def t1(a: int) -> int:


@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
@pytest.mark.serial
def test_pd_dataframe_hash():
"""
Test that cache is hit in the case of pd dataframes where we annotated dataframes to hash
Expand Down Expand Up @@ -425,6 +437,7 @@ def my_workflow():


@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
@pytest.mark.serial
def test_list_of_pd_dataframe_hash():
"""
Test that cache is hit in the case of a list of pd dataframes where we annotated dataframes to hash
Expand Down Expand Up @@ -459,6 +472,7 @@ def my_workflow():
assert n_cached_task_calls == 1


@pytest.mark.serial
def test_cache_key_repetition():
pt = Dict
lt = TypeEngine.to_literal_type(pt)
Expand All @@ -482,6 +496,7 @@ def test_cache_key_repetition():
assert len(keys) == 1


@pytest.mark.serial
def test_stable_cache_key():
"""
The intent of this test is to ensure cache keys are stable across releases and python versions.
Expand Down Expand Up @@ -551,10 +566,12 @@ def calculate_cache_key_multiple_times(x, n=1000):
dict(xs=[dict(a=1, b=2, c=3), dict(y=dict(a=10, b=20, c=30))]),
],
)
@pytest.mark.serial
def test_cache_key_consistency(d):
assert len(calculate_cache_key_multiple_times(d)) == 1


@pytest.mark.serial
def test_literal_hash_placement():
"""
Test that hashes on literal collections and maps are preserved by the
Expand All @@ -577,5 +594,6 @@ def t2(n: int) -> int:
return n + 1


@pytest.mark.serial
def test_checkpoint_cached_task():
assert t2(n=5) == 6

0 comments on commit 95ea92d

Please sign in to comment.