diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 64d9d8c52a..0b2ba136d7 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -36,7 +36,46 @@ jobs: key: ${{ format('{0}-pip-{1}', runner.os, hashFiles('dev-requirements.in', 'requirements.in')) }} - name: Install dependencies run: | - make setup && pip freeze + make setup + pip uninstall -y pandas + pip freeze + - name: Test with coverage + env: + PYTEST_OPTS: -n2 + run: | + make unit_test_codecov + - name: Codecov + uses: codecov/codecov-action@v3.1.4 + with: + fail_ci_if_error: false + files: coverage.xml + + build-with-pandas: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ ubuntu-latest ] + python-version: [ "3.11" ] + pandas: [ "pandas<2.0.0", "pandas>=2.0.0" ] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Cache pip + uses: actions/cache@v3 + with: + # This path is specific to Ubuntu + path: ~/.cache/pip + # Look to see if there is a cache hit for the corresponding requirements files + key: ${{ format('{0}-pip-{1}', runner.os, hashFiles('dev-requirements.in', 'requirements.in')) }} + - name: Install dependencies + run: | + make setup + pip install --force-reinstall "${{ matrix.pandas }}" + pip freeze - name: Test with coverage env: PYTEST_OPTS: -n2 @@ -69,8 +108,7 @@ jobs: # Look to see if there is a cache hit for the corresponding requirements files key: ${{ format('{0}-pip-{1}', runner.os, hashFiles('dev-requirements.in', 'requirements.in')) }} - name: Install dependencies - run: | - make setup && pip freeze + run: make setup && pip freeze - name: Test with coverage env: PYTEST_OPTS: -n2 diff --git a/dev-requirements.in b/dev-requirements.in index a59f6e4c1d..a548af4c42 100644 --- a/dev-requirements.in +++ b/dev-requirements.in @@ -36,11 +36,14 @@ torch<=2.0.0; python_version>='3.11' or platform_system!='Windows' # Once a solution is found, this should be updated to support Windows as well. python-magic; (platform_system=='Darwin' or platform_system=='Linux') -pillow -scikit-learn types-protobuf types-croniter types-mock autoflake + +pillow +numpy +pandas +scikit-learn types-requests prometheus-client diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index f79ddc3082..4bcbbeceef 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -995,7 +995,7 @@ def lazy_import_transformers(cls): from flytekit.extras import sklearn # noqa: F401 if is_imported("pandas"): try: - from flytekit.types import schema # noqa: F401 + from flytekit.types.schema.types_pandas import PandasSchemaReader, PandasSchemaWriter # noqa: F401 except ValueError: logger.debug("Transformer for pandas is already registered.") register_pandas_handlers() diff --git a/flytekit/extras/sqlite3/task.py b/flytekit/extras/sqlite3/task.py index 6bcbae5d4f..51dcaccd61 100644 --- a/flytekit/extras/sqlite3/task.py +++ b/flytekit/extras/sqlite3/task.py @@ -6,15 +6,18 @@ import typing from dataclasses import dataclass -import pandas as pd - -from flytekit import FlyteContext, kwtypes +from flytekit import FlyteContext, kwtypes, lazy_module from flytekit.configuration import DefaultImages, SerializationSettings from flytekit.core.base_sql_task import SQLTask from flytekit.core.python_customized_container_task import PythonCustomizedContainerTask from flytekit.core.shim_task import ShimTaskExecutor from flytekit.models import task as task_models +if typing.TYPE_CHECKING: + import pandas as pd +else: + pd = lazy_module("pandas") + def unarchive_file(local_path: str, to_dir: str): """ diff --git a/flytekit/lazy_import/lazy_module.py b/flytekit/lazy_import/lazy_module.py index 553386eb52..58f9923ff2 100644 --- a/flytekit/lazy_import/lazy_module.py +++ b/flytekit/lazy_import/lazy_module.py @@ -1,9 +1,19 @@ import importlib.util import sys +import types LAZY_MODULES = [] +class LazyModule(types.ModuleType): + def __init__(self, module_name: str): + super().__init__(module_name) + self._module_name = module_name + + def __getattribute__(self, attr): + raise ImportError(f"Module {object.__getattribute__(self, '_module_name')} is not yet installed.") + + def is_imported(module_name): """ This function is used to check if a module has been imported by the regular import. @@ -24,6 +34,10 @@ def lazy_module(fullname): return sys.modules[fullname] # https://docs.python.org/3/library/importlib.html#implementing-lazy-imports spec = importlib.util.find_spec(fullname) + if spec is None: + # Return a lazy module if the module is not found in the python environment, + # so that we can raise a proper error when the user tries to access an attribute in the module. + return LazyModule(fullname) loader = importlib.util.LazyLoader(spec.loader) spec.loader = loader module = importlib.util.module_from_spec(spec) diff --git a/flytekit/types/schema/__init__.py b/flytekit/types/schema/__init__.py index 3b3d0cf6c1..080927021a 100644 --- a/flytekit/types/schema/__init__.py +++ b/flytekit/types/schema/__init__.py @@ -9,4 +9,3 @@ SchemaReader, SchemaWriter, ) -from .types_pandas import PandasSchemaReader, PandasSchemaWriter diff --git a/flytekit/types/schema/types.py b/flytekit/types/schema/types.py index 6f42773ce6..fb3ad09d89 100644 --- a/flytekit/types/schema/types.py +++ b/flytekit/types/schema/types.py @@ -10,7 +10,6 @@ from typing import Type import numpy as _np -import pandas from dataclasses_json import config from marshmallow import fields from mashumaro.mixins.json import DataClassJSONMixin @@ -270,7 +269,7 @@ def supported_mode(self) -> SchemaOpenMode: return self._supported_mode def open( - self, dataframe_fmt: type = pandas.DataFrame, override_mode: typing.Optional[SchemaOpenMode] = None + self, dataframe_fmt: typing.Optional[type] = None, override_mode: typing.Optional[SchemaOpenMode] = None ) -> typing.Union[SchemaReader, SchemaWriter]: """ Returns a reader or writer depending on the mode of the object when created. This mode can be @@ -287,6 +286,9 @@ def open( raise AssertionError("Readonly schema cannot be opened in write mode!") mode = override_mode if override_mode else self._supported_mode + import pandas as pd + + dataframe_fmt = dataframe_fmt if dataframe_fmt else pd.DataFrame h = SchemaEngine.get_handler(dataframe_fmt) if not h.handles_remote_io: # The Schema Handler does not manage its own IO, and this it will expect the files are on local file-system diff --git a/flytekit/types/structured/basic_dfs.py b/flytekit/types/structured/basic_dfs.py index fabeb0e3bf..1c21a6b2d8 100644 --- a/flytekit/types/structured/basic_dfs.py +++ b/flytekit/types/structured/basic_dfs.py @@ -3,14 +3,11 @@ from pathlib import Path from typing import TypeVar -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq from botocore.exceptions import NoCredentialsError from fsspec.core import split_protocol, strip_protocol from fsspec.utils import get_protocol -from flytekit import FlyteContext, logger +from flytekit import FlyteContext, lazy_module, logger from flytekit.configuration import DataConfig from flytekit.core.data_persistence import get_fsspec_storage_options from flytekit.models import literals @@ -24,6 +21,13 @@ StructuredDatasetEncoder, ) +if typing.TYPE_CHECKING: + import pandas as pd + import pyarrow as pa +else: + pd = lazy_module("pandas") + pa = lazy_module("pyarrow") + T = TypeVar("T") @@ -70,7 +74,7 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> pd.DataFrame: + ) -> "pd.DataFrame": uri = flyte_value.uri columns = None kwargs = get_pandas_storage_options(uri=uri, data_config=ctx.file_access.data_config) @@ -121,7 +125,7 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> pd.DataFrame: + ) -> "pd.DataFrame": uri = flyte_value.uri columns = None kwargs = get_pandas_storage_options(uri=uri, data_config=ctx.file_access.data_config) @@ -145,6 +149,8 @@ def encode( structured_dataset: StructuredDataset, structured_dataset_type: StructuredDatasetType, ) -> literals.StructuredDataset: + import pyarrow.parquet as pq + uri = typing.cast(str, structured_dataset.uri) or ctx.file_access.join( ctx.file_access.raw_output_prefix, ctx.file_access.get_random_string() ) @@ -165,7 +171,9 @@ def decode( ctx: FlyteContext, flyte_value: literals.StructuredDataset, current_task_metadata: StructuredDatasetMetadata, - ) -> pa.Table: + ) -> "pa.Table": + import pyarrow.parquet as pq + uri = flyte_value.uri if not ctx.file_access.is_remote(uri): Path(uri).parent.mkdir(parents=True, exist_ok=True) diff --git a/flytekit/types/structured/bigquery.py b/flytekit/types/structured/bigquery.py index 049a21c07e..e9c3ae390d 100644 --- a/flytekit/types/structured/bigquery.py +++ b/flytekit/types/structured/bigquery.py @@ -1,12 +1,10 @@ import re import typing -import pandas as pd -import pyarrow as pa from google.cloud import bigquery, bigquery_storage from google.cloud.bigquery_storage_v1 import types -from flytekit import FlyteContext +from flytekit import FlyteContext, lazy_module from flytekit.models import literals from flytekit.models.types import StructuredDatasetType from flytekit.types.structured.structured_dataset import ( @@ -16,6 +14,13 @@ StructuredDatasetMetadata, ) +if typing.TYPE_CHECKING: + import pandas as pd + import pyarrow as pa +else: + pd = lazy_module("pandas") + pa = lazy_module("pyarrow") + BIGQUERY = "bq" diff --git a/plugins/flytekit-duckdb/setup.py b/plugins/flytekit-duckdb/setup.py index 9dd5a87ae7..ff16057728 100644 --- a/plugins/flytekit-duckdb/setup.py +++ b/plugins/flytekit-duckdb/setup.py @@ -4,7 +4,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.3.0b2,<2.0.0", "duckdb"] +plugin_requires = ["flytekit>=1.3.0b2,<2.0.0", "duckdb", "pandas"] __version__ = "0.0.0+develop" diff --git a/plugins/flytekit-mlflow/setup.py b/plugins/flytekit-mlflow/setup.py index fc5c073130..666aff4316 100644 --- a/plugins/flytekit-mlflow/setup.py +++ b/plugins/flytekit-mlflow/setup.py @@ -5,7 +5,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" # TODO: support mlflow 2.0+ -plugin_requires = ["flytekit>=1.1.0,<2.0.0", "plotly", "mlflow<2.0.0"] +plugin_requires = ["flytekit>=1.1.0,<2.0.0", "plotly", "mlflow<2.0.0", "pandas"] __version__ = "0.0.0+develop" diff --git a/plugins/flytekit-pandera/flytekitplugins/pandera/schema.py b/plugins/flytekit-pandera/flytekitplugins/pandera/schema.py index 95dbbf20c2..cae739fdae 100644 --- a/plugins/flytekit-pandera/flytekitplugins/pandera/schema.py +++ b/plugins/flytekit-pandera/flytekitplugins/pandera/schema.py @@ -8,8 +8,9 @@ from flytekit.extend import TypeEngine, TypeTransformer from flytekit.models.literals import Literal, Scalar, Schema from flytekit.models.types import LiteralType, SchemaType -from flytekit.types.schema import FlyteSchema, PandasSchemaWriter, SchemaFormat, SchemaOpenMode +from flytekit.types.schema import FlyteSchema, SchemaFormat, SchemaOpenMode from flytekit.types.schema.types import FlyteSchemaTransformer +from flytekit.types.schema.types_pandas import PandasSchemaWriter T = typing.TypeVar("T") diff --git a/plugins/flytekit-pandera/setup.py b/plugins/flytekit-pandera/setup.py index 2da7f0bde0..d4b54c4142 100644 --- a/plugins/flytekit-pandera/setup.py +++ b/plugins/flytekit-pandera/setup.py @@ -4,7 +4,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.3.0b2,<2.0.0", "pandera>=0.7.1"] +plugin_requires = ["flytekit>=1.3.0b2,<2.0.0", "pandera>=0.7.1", "pandas"] __version__ = "0.0.0+develop" diff --git a/plugins/flytekit-polars/setup.py b/plugins/flytekit-polars/setup.py index 4acc9d1e46..483c3d18a4 100644 --- a/plugins/flytekit-polars/setup.py +++ b/plugins/flytekit-polars/setup.py @@ -4,10 +4,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = [ - "flytekit>=1.3.0b2,<2.0.0", - "polars>=0.8.27,<0.17.0", -] +plugin_requires = ["flytekit>=1.3.0b2,<2.0.0", "polars>=0.8.27,<0.17.0", "pandas"] __version__ = "0.0.0+develop" diff --git a/plugins/flytekit-spark/setup.py b/plugins/flytekit-spark/setup.py index 65fa580170..ac7b650ecb 100644 --- a/plugins/flytekit-spark/setup.py +++ b/plugins/flytekit-spark/setup.py @@ -4,7 +4,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.3.0b2,<2.0.0", "pyspark>=3.0.0", "aiohttp", "flyteidl>=1.10.0"] +plugin_requires = ["flytekit>=1.3.0b2,<2.0.0", "pyspark>=3.0.0", "aiohttp", "flyteidl>=1.10.0", "pandas"] __version__ = "0.0.0+develop" diff --git a/plugins/flytekit-sqlalchemy/setup.py b/plugins/flytekit-sqlalchemy/setup.py index 8ffd2c8f64..4d59e31686 100644 --- a/plugins/flytekit-sqlalchemy/setup.py +++ b/plugins/flytekit-sqlalchemy/setup.py @@ -4,7 +4,7 @@ microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.3.0b2,<2.0.0", "sqlalchemy>=1.4.7"] +plugin_requires = ["flytekit>=1.3.0b2,<2.0.0", "sqlalchemy>=1.4.7", "pandas"] __version__ = "0.0.0+develop" diff --git a/pyproject.toml b/pyproject.toml index 541d92b376..2b18904e53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,14 +33,11 @@ dependencies = [ "keyring>=18.0.1", "kubernetes>=12.0.1", "marshmallow-enum", - # TODO: remove upper-bound after fixing change in contract "marshmallow-jsonschema>=0.12.0", "mashumaro>=3.9.1", - "numpy", - "pandas>=1.0.0,<2.0.0", # TODO: Remove upper-bound after protobuf community fixes it. https://github.com/flyteorg/flyte/issues/4359 "protobuf<4.25.0", - "pyarrow>=4.0.0", + "pyarrow", "python-json-logger>=2.0.0", "pytimeparse>=1.1.8,<2.0.0", "pyyaml!=6.0.0,!=5.4.0,!=5.4.1", # pyyaml is broken with cython 3: https://github.com/yaml/pyyaml/issues/601 diff --git a/tests/flytekit/unit/cli/pyflyte/test_run.py b/tests/flytekit/unit/cli/pyflyte/test_run.py index 86017ef0b9..be65099c93 100644 --- a/tests/flytekit/unit/cli/pyflyte/test_run.py +++ b/tests/flytekit/unit/cli/pyflyte/test_run.py @@ -16,6 +16,8 @@ from flytekit.interaction.click_types import DirParamType, FileParamType from flytekit.remote import FlyteRemote +pytest.importorskip("pandas") + WORKFLOW_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), "workflow.py") REMOTE_WORKFLOW_FILE = "https://raw.githubusercontent.com/flyteorg/flytesnacks/8337b64b33df046b2f6e4cba03c74b7bdc0c4fb1/cookbook/core/flyte_basics/basic_workflow.py" IMPERATIVE_WORKFLOW_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), "imperative_wf.py") diff --git a/tests/flytekit/unit/core/test_data_persistence.py b/tests/flytekit/unit/core/test_data_persistence.py index 6dbe553bdb..f478ead79c 100644 --- a/tests/flytekit/unit/core/test_data_persistence.py +++ b/tests/flytekit/unit/core/test_data_persistence.py @@ -3,10 +3,11 @@ import pathlib import random import string +import sys import tempfile import mock -import pandas as pd +import pytest from azure.identity import ClientSecretCredential, DefaultAzureCredential from flytekit.core.data_persistence import FileAccessProvider @@ -27,8 +28,11 @@ def test_is_remote(): assert fp.is_remote("s3://my-bucket/foo/bar") is True +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") @mock.patch("flytekit.core.data_persistence.UUID") def test_write_folder_put_raw(mock_uuid_class): + import pandas as pd + """ A test that writes this structure raw/ diff --git a/tests/flytekit/unit/core/test_dataclass.py b/tests/flytekit/unit/core/test_dataclass.py index 34350ca40b..b1b6b42761 100644 --- a/tests/flytekit/unit/core/test_dataclass.py +++ b/tests/flytekit/unit/core/test_dataclass.py @@ -1,12 +1,15 @@ +import sys from dataclasses import dataclass from typing import List +import pytest from dataclasses_json import DataClassJsonMixin from flytekit.core.task import task from flytekit.core.workflow import workflow +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_dataclass(): @dataclass class AppParams(DataClassJsonMixin): diff --git a/tests/flytekit/unit/core/test_imperative.py b/tests/flytekit/unit/core/test_imperative.py index ead5358316..24f37ff186 100644 --- a/tests/flytekit/unit/core/test_imperative.py +++ b/tests/flytekit/unit/core/test_imperative.py @@ -1,7 +1,7 @@ +import sys import typing from collections import OrderedDict -import pandas as pd import pytest import flytekit.configuration @@ -313,7 +313,10 @@ def t1(a: str) -> str: wb(in2="hello") -def test_nonfunction_task_and_df_input(): +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") +def test_non_function_task_and_df_input(): + import pandas as pd + @reference_task( project="flytesnacks", domain="development", diff --git a/tests/flytekit/unit/core/test_literals_resolver.py b/tests/flytekit/unit/core/test_literals_resolver.py index b37c81ec17..39b6c9c6ea 100644 --- a/tests/flytekit/unit/core/test_literals_resolver.py +++ b/tests/flytekit/unit/core/test_literals_resolver.py @@ -1,6 +1,6 @@ +import sys import typing -import pandas as pd import pytest from typing_extensions import Annotated @@ -50,7 +50,10 @@ def test_literals_resolver(literal_value, python_type, expected_python_value): assert out == expected_python_value +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_interface(): + import pandas as pd + ctx = FlyteContextManager.current_context() lt = TypeEngine.to_literal_type(pd.DataFrame) df = pd.DataFrame({"name": ["Tom", "Joseph"], "age": [20, 22]}) diff --git a/tests/flytekit/unit/core/test_local_cache.py b/tests/flytekit/unit/core/test_local_cache.py index 1569a258f4..c79930148f 100644 --- a/tests/flytekit/unit/core/test_local_cache.py +++ b/tests/flytekit/unit/core/test_local_cache.py @@ -1,10 +1,9 @@ import datetime +import sys import typing from dataclasses import dataclass from typing import Dict, List -import pandas -import pandas as pd import pytest from dataclasses_json import DataClassJsonMixin from pytest import fixture @@ -131,7 +130,10 @@ def check_oddness_wf2(n: int) -> bool: # TODO add test with typing.List[str] +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_sql_task(): + import pandas as pd + sql = SQLTask( "my-query", query_template="SELECT * FROM hive.city.fact_airport_sessions WHERE ds = '{{ .Inputs.ds }}' LIMIT 10", @@ -152,14 +154,14 @@ def my_wf() -> FlyteSchema: return sql(ds=dt) with task_mock(sql) as mock: - mock.return_value = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]}) + mock.return_value = pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]}) assert n_cached_task_calls == 0 - assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() + assert (my_wf().open().all() == pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() assert n_cached_task_calls == 1 # The second and third calls hit the cache - assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() + assert (my_wf().open().all() == pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() assert n_cached_task_calls == 1 - assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() + assert (my_wf().open().all() == pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() assert n_cached_task_calls == 1 @@ -198,7 +200,10 @@ def my_wf(a: int, b: str) -> (MyCustomType, int): assert n_cached_task_calls == 2 +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_wf_schema_to_df(): + import pandas as pd + schema1 = FlyteSchema[kwtypes(x=int, y=str)] @task(cache=True, cache_version="v0") @@ -207,11 +212,11 @@ def t1() -> schema1: n_cached_task_calls += 1 s = schema1() - s.open().write(pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})) + s.open().write(pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})) return s @task(cache=True, cache_version="v1") - def t2(df: pandas.DataFrame) -> int: + def t2(df: pd.DataFrame) -> int: global n_cached_task_calls n_cached_task_calls += 1 @@ -299,6 +304,7 @@ def wf(a: int) -> int: assert n_cached_task_calls == 1 +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_pass_annotated_to_downstream_tasks(): @task def t0(a: int) -> Annotated[int, HashMethod(function=str)]: @@ -326,21 +332,23 @@ def t1(a: int) -> int: assert n_cached_task_calls == 1 -def test_pandas_dataframe_hash(): +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") +def test_pd_dataframe_hash(): """ - Test that cache is hit in the case of pandas dataframes where we annotated dataframes to hash + Test that cache is hit in the case of pd dataframes where we annotated dataframes to hash the contents of the dataframes. """ + import pandas as pd - def hash_pandas_dataframe(df: pandas.DataFrame) -> str: - return str(pandas.util.hash_pandas_object(df)) + def hash_pd_dataframe(df: pd.DataFrame) -> str: + return str(pd.util.hash_pandas_object(df)) @task - def uncached_data_reading_task() -> Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]: - return pandas.DataFrame({"column_1": [1, 2, 3]}) + def uncached_data_reading_task() -> Annotated[pd.DataFrame, HashMethod(hash_pd_dataframe)]: + return pd.DataFrame({"column_1": [1, 2, 3]}) @task(cache=True, cache_version="0.1") - def cached_data_processing_task(data: pandas.DataFrame) -> pandas.DataFrame: + def cached_data_processing_task(data: pd.DataFrame) -> pd.DataFrame: global n_cached_task_calls n_cached_task_calls += 1 return data * 2 @@ -359,21 +367,23 @@ def my_workflow(): assert n_cached_task_calls == 1 -def test_list_of_pandas_dataframe_hash(): +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") +def test_list_of_pd_dataframe_hash(): """ - Test that cache is hit in the case of a list of pandas dataframes where we annotated dataframes to hash + Test that cache is hit in the case of a list of pd dataframes where we annotated dataframes to hash the contents of the dataframes. """ + import pandas as pd - def hash_pandas_dataframe(df: pandas.DataFrame) -> str: - return str(pandas.util.hash_pandas_object(df)) + def hash_pd_dataframe(df: pd.DataFrame) -> str: + return str(pd.util.hash_pandas_object(df)) @task - def uncached_data_reading_task() -> List[Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]]: - return [pandas.DataFrame({"column_1": [1, 2, 3]}), pandas.DataFrame({"column_1": [10, 20, 30]})] + def uncached_data_reading_task() -> List[Annotated[pd.DataFrame, HashMethod(hash_pd_dataframe)]]: + return [pd.DataFrame({"column_1": [1, 2, 3]}), pd.DataFrame({"column_1": [10, 20, 30]})] @task(cache=True, cache_version="0.1") - def cached_data_processing_task(data: List[pandas.DataFrame]) -> List[pandas.DataFrame]: + def cached_data_processing_task(data: List[pd.DataFrame]) -> List[pd.DataFrame]: global n_cached_task_calls n_cached_task_calls += 1 return [df * 2 for df in data] @@ -449,7 +459,10 @@ def test_stable_cache_key(): assert key == "task_name_1-31415-404b45f8556276183621d4bf37f50049" +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def calculate_cache_key_multiple_times(x, n=1000): + import pandas as pd + series = pd.Series( [ _calculate_cache_key( @@ -472,6 +485,7 @@ def calculate_cache_key_multiple_times(x, n=1000): return series +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") @pytest.mark.parametrize( "d", [ diff --git a/tests/flytekit/unit/core/test_partials.py b/tests/flytekit/unit/core/test_partials.py index 1e7b5d43ee..18fd29647b 100644 --- a/tests/flytekit/unit/core/test_partials.py +++ b/tests/flytekit/unit/core/test_partials.py @@ -1,8 +1,8 @@ +import sys import typing from collections import OrderedDict from functools import partial -import pandas as pd import pytest import flytekit.configuration @@ -25,9 +25,6 @@ ) -df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) - - def test_basics_1(): @task def t1(a: int, b: str, c: float) -> int: @@ -170,6 +167,7 @@ def t_list_of_lists(a: typing.List[typing.List[float]], b: int) -> str: map_task_fn(partial(t_list_of_lists, a=[[3.14]]))(b=[1, 2, 3, 4]) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") @pytest.mark.parametrize( "map_task_fn", [ @@ -178,6 +176,8 @@ def t_list_of_lists(a: typing.List[typing.List[float]], b: int) -> str: ], ) def test_everything(map_task_fn): + import pandas as pd + @task def get_static_list() -> typing.List[float]: return [3.14, 2.718] diff --git a/tests/flytekit/unit/core/test_promise.py b/tests/flytekit/unit/core/test_promise.py index b6decebd0d..3322d4dd59 100644 --- a/tests/flytekit/unit/core/test_promise.py +++ b/tests/flytekit/unit/core/test_promise.py @@ -1,3 +1,4 @@ +import sys import typing from dataclasses import dataclass from typing import Dict, List @@ -102,6 +103,7 @@ class MyDataclass(DataClassJsonMixin): a: typing.List[str] +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") @pytest.mark.parametrize( "input", [2.0, MyDataclass(i=1, a=["h", "e"]), [1, 2, 3], ["foo"] * 5], @@ -165,6 +167,7 @@ def func(foo: Optional[int] = None): wf() +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_promise_with_attr_path(): from dataclasses import dataclass from typing import Dict, List @@ -199,6 +202,7 @@ def my_workflow() -> (str, str, str): assert o3 == "b" +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_resolve_attr_path_in_promise(): @dataclass_json @dataclass diff --git a/tests/flytekit/unit/core/test_realworld_examples.py b/tests/flytekit/unit/core/test_realworld_examples.py index 86f86d4813..a8cc713c76 100644 --- a/tests/flytekit/unit/core/test_realworld_examples.py +++ b/tests/flytekit/unit/core/test_realworld_examples.py @@ -1,7 +1,8 @@ +import sys import typing from collections import OrderedDict -import pandas as pd +import pytest from typing_extensions import Annotated from flytekit import Resources @@ -11,7 +12,10 @@ from flytekit.types.schema import FlyteSchema +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_diabetes(): + import pandas as pd + # Since we are working with a specific dataset, we will create a strictly typed schema for the dataset. # If we wanted a generic data splitter we could use a Generic schema without any column type and name information # Example file: https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv diff --git a/tests/flytekit/unit/core/test_type_engine.py b/tests/flytekit/unit/core/test_type_engine.py index cb057fb21a..4d3661f329 100644 --- a/tests/flytekit/unit/core/test_type_engine.py +++ b/tests/flytekit/unit/core/test_type_engine.py @@ -2,6 +2,7 @@ import datetime import json import os +import sys import tempfile import typing from dataclasses import asdict, dataclass, field @@ -10,7 +11,6 @@ from typing import Optional, Type import mock -import pandas as pd import pyarrow as pa import pytest import typing_extensions @@ -21,7 +21,6 @@ from marshmallow_enum import LoadDumpOptions from marshmallow_jsonschema import JSONSchema from mashumaro.mixins.json import DataClassJSONMixin -from pandas._testing import assert_frame_equal from typing_extensions import Annotated, get_args, get_origin from flytekit import kwtypes @@ -61,7 +60,6 @@ from flytekit.types.pickle import FlytePickle from flytekit.types.pickle.pickle import BatchSize, FlytePickleTransformer from flytekit.types.schema import FlyteSchema -from flytekit.types.schema.types_pandas import PandasDataFrameTransformer from flytekit.types.structured.structured_dataset import StructuredDataset T = typing.TypeVar("T") @@ -1171,7 +1169,11 @@ def test_flyte_directory_in_dataclassjsonmixin(): assert o.b.e["hello"].path == ot.b.e["hello"].remote_source +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_structured_dataset_in_dataclass(): + import pandas as pd + from pandas._testing import assert_frame_equal + df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) People = Annotated[StructuredDataset, "parquet", kwtypes(Name=str, Age=int)] @@ -1206,23 +1208,27 @@ class DatasetStruct(DataClassJsonMixin): @dataclass -class InnerDatasetStruct_dataclassjsonmixin(DataClassJSONMixin): +class InnerDatasetStructDataclassJsonMixin(DataClassJSONMixin): a: StructuredDataset b: typing.List[Annotated[StructuredDataset, "parquet"]] c: typing.Dict[str, Annotated[StructuredDataset, kwtypes(Name=str, Age=int)]] +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_structured_dataset_in_dataclassjsonmixin(): + import pandas as pd + from pandas._testing import assert_frame_equal + df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) People = Annotated[StructuredDataset, "parquet"] @dataclass class DatasetStruct_dataclassjsonmixin(DataClassJSONMixin): a: People - b: InnerDatasetStruct_dataclassjsonmixin + b: InnerDatasetStructDataclassJsonMixin sd = StructuredDataset(dataframe=df, file_format="parquet") - o = DatasetStruct_dataclassjsonmixin(a=sd, b=InnerDatasetStruct_dataclassjsonmixin(a=sd, b=[sd], c={"hello": sd})) + o = DatasetStruct_dataclassjsonmixin(a=sd, b=InnerDatasetStructDataclassJsonMixin(a=sd, b=[sd], c={"hello": sd})) ctx = FlyteContext.current_context() tf = DataclassTransformer() @@ -1260,7 +1266,11 @@ class UnsupportedEnumValues(Enum): BLUE = 3 +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_structured_dataset_type(): + import pandas as pd + from pandas._testing import assert_frame_equal + name = "Name" age = "Age" data = {name: ["Tom", "Joseph"], age: [20, 22]} @@ -1268,8 +1278,6 @@ def test_structured_dataset_type(): subset_cols = kwtypes(Name=str) df = pd.DataFrame(data) - from flytekit.types.structured.structured_dataset import StructuredDataset - tf = TypeEngine.get_transformer(StructuredDataset) lt = tf.get_literal_type(Annotated[StructuredDataset, superset_cols, "parquet"]) assert lt.structured_dataset_type is not None @@ -1875,10 +1883,12 @@ def test_nested_annotated(): assert v == 42 +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_pass_annotated_to_downstream_tasks(): """ Test to confirm that the loaded dataframe is not affected and can be used in @dynamic. """ + import pandas as pd # pandas dataframe hash function def hash_pandas_dataframe(df: pd.DataFrame) -> str: @@ -1922,10 +1932,15 @@ def test_literal_hash_int_can_be_set(): assert lv.hash == "42" +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_literal_hash_to_python_value(): """ Test to confirm that literals can be converted to python values, regardless of the hash value set in the literal. """ + import pandas as pd + + from flytekit.types.schema.types_pandas import PandasDataFrameTransformer + ctx = FlyteContext.current_context() def constant_hash(df: pd.DataFrame) -> str: @@ -2020,7 +2035,10 @@ class Result(DataClassJsonMixin): schema: TestSchema # type: ignore +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_schema_in_dataclass(): + import pandas as pd + schema = TestSchema() df = pd.DataFrame(data={"some_str": ["a", "b", "c"]}) schema.open().write(df) @@ -2034,7 +2052,10 @@ def test_schema_in_dataclass(): assert o == ot +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_union_in_dataclass(): + import pandas as pd + schema = TestSchema() df = pd.DataFrame(data={"some_str": ["a", "b", "c"]}) schema.open().write(df) @@ -2060,7 +2081,12 @@ class Result_dataclassjsonmixin(DataClassJSONMixin): schema: TestSchema # type: ignore +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_schema_in_dataclassjsonmixin(): + import pandas as pd + + from flytekit.types.schema.types_pandas import PandasSchemaReader, PandasSchemaWriter # noqa: F401 + schema = TestSchema() df = pd.DataFrame(data={"some_str": ["a", "b", "c"]}) schema.open().write(df) diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index 9e01b4a619..53c1e45173 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -4,18 +4,16 @@ import os import random import re +import sys import tempfile import typing from collections import OrderedDict from dataclasses import dataclass from enum import Enum -import pandas -import pandas as pd import pytest from dataclasses_json import DataClassJsonMixin from google.protobuf.struct_pb2 import Struct -from pandas._testing import assert_frame_equal from typing_extensions import Annotated, get_origin import flytekit @@ -335,7 +333,10 @@ def mimic_sub_wf(a: int) -> (str, str): assert context_manager.FlyteContextManager.size() == 1 +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_wf1_with_sql(): + import pandas as pd + sql = SQLTask( "my-query", query_template="SELECT * FROM hive.city.fact_airport_sessions WHERE ds = '{{ .Inputs.ds }}' LIMIT 10", @@ -354,12 +355,15 @@ def my_wf() -> FlyteSchema: return sql(ds=dt) with task_mock(sql) as mock: - mock.return_value = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]}) - assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() + mock.return_value = pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]}) + assert (my_wf().open().all() == pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() assert context_manager.FlyteContextManager.size() == 1 +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_wf1_with_sql_with_patch(): + import pandas as pd + sql = SQLTask( "my-query", query_template="SELECT * FROM hive.city.fact_airport_sessions WHERE ds = '{{ .Inputs.ds }}' LIMIT 10", @@ -379,8 +383,8 @@ def my_wf() -> FlyteSchema: @patch(sql) def test_user_demo_test(mock_sql): - mock_sql.return_value = pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]}) - assert (my_wf().open().all() == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() + mock_sql.return_value = pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]}) + assert (my_wf().open().all() == pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})).all().all() # Have to call because tests inside tests don't run test_user_demo_test() @@ -464,7 +468,11 @@ def wf(path: str) -> os.PathLike: assert flyte_tmp_dir in wf(path="s3://somewhere").path +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_structured_dataset_in_dataclass(): + import pandas as pd + from pandas._testing import assert_frame_equal + df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) @dataclass @@ -852,23 +860,26 @@ def t1(a: str) -> tuple: return (a, 3) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_wf1_df(): + import pandas as pd + @task - def t1(a: int) -> pandas.DataFrame: - return pandas.DataFrame(data={"col1": [a, 2], "col2": [a, 4]}) + def t1(a: int) -> pd.DataFrame: + return pd.DataFrame(data={"col1": [a, 2], "col2": [a, 4]}) @task - def t2(df: pandas.DataFrame) -> pandas.DataFrame: - return df.append(pandas.DataFrame(data={"col1": [5, 10], "col2": [5, 10]})) + def t2(df: pd.DataFrame) -> pd.DataFrame: + return pd.concat([df, pd.DataFrame(data={"col1": [5, 10], "col2": [5, 10]})]) @workflow - def my_wf(a: int) -> pandas.DataFrame: + def my_wf(a: int) -> pd.DataFrame: df = t1(a=a) return t2(df=df) x = my_wf(a=20) - assert isinstance(x, pandas.DataFrame) - result_df = x.reset_index(drop=True) == pandas.DataFrame( + assert isinstance(x, pd.DataFrame) + result_df = x.reset_index(drop=True) == pd.DataFrame( data={"col1": [20, 2, 5, 10], "col2": [20, 4, 5, 10]} ).reset_index(drop=True) assert result_df.all().all() @@ -928,13 +939,18 @@ def t1(a: tuple) -> (int, str): return a[0] + 2, str(a) + "-HELLO" +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_wf_typed_schema(): + import pandas as pd + + from flytekit.types.schema import FlyteSchema + schema1 = FlyteSchema[kwtypes(x=int, y=str)] @task def t1() -> schema1: s = schema1() - s.open().write(pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})) + s.open().write(pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})) return s @task @@ -949,33 +965,36 @@ def wf() -> FlyteSchema[kwtypes(x=int)]: w = t1() assert w is not None df = w.open(override_mode=SchemaOpenMode.READ).all() - result_df = df.reset_index(drop=True) == pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]}).reset_index( - drop=True - ) + result_df = df.reset_index(drop=True) == pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]}).reset_index(drop=True) assert result_df.all().all() df = t2(s=w.as_readonly()) df = df.open(override_mode=SchemaOpenMode.READ).all() - result_df = df.reset_index(drop=True) == pandas.DataFrame(data={"x": [1, 2]}).reset_index(drop=True) + result_df = df.reset_index(drop=True) == pd.DataFrame(data={"x": [1, 2]}).reset_index(drop=True) assert result_df.all().all() x = wf() df = x.open().all() - result_df = df.reset_index(drop=True) == pandas.DataFrame(data={"x": [1, 2]}).reset_index(drop=True) + result_df = df.reset_index(drop=True) == pd.DataFrame(data={"x": [1, 2]}).reset_index(drop=True) assert result_df.all().all() +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_wf_schema_to_df(): + import pandas as pd + + from flytekit.types.schema import FlyteSchema + schema1 = FlyteSchema[kwtypes(x=int, y=str)] @task def t1() -> schema1: s = schema1() - s.open().write(pandas.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})) + s.open().write(pd.DataFrame(data={"x": [1, 2], "y": ["3", "4"]})) return s @task - def t2(df: pandas.DataFrame) -> int: + def t2(df: pd.DataFrame) -> int: return len(df.columns.values) @workflow @@ -1636,7 +1655,12 @@ def wf2(a: int, b: str) -> typing.Tuple[int, str]: assert wf2.failure_node.flyte_entity == failure_handler +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_union_type(): + import pandas as pd + + from flytekit.types.schema import FlyteSchema + ut = typing.Union[int, str, float, FlyteFile, FlyteSchema, typing.List[int], typing.Dict[str, int]] @task @@ -1829,53 +1853,59 @@ def plus_two( assert output_lm.literals["o0"].hash == "6" +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_task_hash_return_pandas_dataframe(): + import pandas as pd + constant_value = "road-hash" - def constant_function(df: pandas.DataFrame) -> str: + def constant_function(df: pd.DataFrame) -> str: return constant_value @task - def t0() -> Annotated[pandas.DataFrame, HashMethod(constant_function)]: - return pandas.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}) + def t0() -> Annotated[pd.DataFrame, HashMethod(constant_function)]: + return pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}) ctx = context_manager.FlyteContextManager.current_context() output_lm = t0.dispatch_execute(ctx, _literal_models.LiteralMap(literals={})) assert output_lm.literals["o0"].hash == constant_value # Confirm that the literal containing a hash does not have any effect on the scalar. - df = TypeEngine.to_python_value(ctx, output_lm.literals["o0"], pandas.DataFrame) - expected_df = pandas.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}) + df = TypeEngine.to_python_value(ctx, output_lm.literals["o0"], pd.DataFrame) + expected_df = pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}) assert df.equals(expected_df) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_workflow_containing_multiple_annotated_tasks(): - def hash_function_t0(df: pandas.DataFrame) -> str: + import pandas as pd + + def hash_function_t0(df: pd.DataFrame) -> str: return "hash-0" @task - def t0() -> Annotated[pandas.DataFrame, HashMethod(hash_function_t0)]: - return pandas.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}) + def t0() -> Annotated[pd.DataFrame, HashMethod(hash_function_t0)]: + return pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}) - def hash_function_t1(df: pandas.DataFrame) -> str: + def hash_function_t1(df: pd.DataFrame) -> str: return "hash-1" @task - def t1() -> Annotated[pandas.DataFrame, HashMethod(hash_function_t1)]: - return pandas.DataFrame(data={"col1": [10, 20], "col2": [30, 40]}) + def t1() -> Annotated[pd.DataFrame, HashMethod(hash_function_t1)]: + return pd.DataFrame(data={"col1": [10, 20], "col2": [30, 40]}) @task - def t2() -> pandas.DataFrame: - return pandas.DataFrame(data={"col1": [100, 200], "col2": [300, 400]}) + def t2() -> pd.DataFrame: + return pd.DataFrame(data={"col1": [100, 200], "col2": [300, 400]}) # Auxiliary task used to sum up the dataframes. It demonstrates that the use of `Annotated` does not # have any impact in the definition and execution of cached or uncached downstream tasks @task - def sum_dataframes(df0: pandas.DataFrame, df1: pandas.DataFrame, df2: pandas.DataFrame) -> pandas.DataFrame: + def sum_dataframes(df0: pd.DataFrame, df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: return df0 + df1 + df2 @workflow - def wf() -> pandas.DataFrame: + def wf() -> pd.DataFrame: df0 = t0() df1 = t1() df2 = t2() @@ -1883,32 +1913,35 @@ def wf() -> pandas.DataFrame: df = wf() - expected_df = pandas.DataFrame(data={"col1": [1 + 10 + 100, 2 + 20 + 200], "col2": [3 + 30 + 300, 4 + 40 + 400]}) + expected_df = pd.DataFrame(data={"col1": [1 + 10 + 100, 2 + 20 + 200], "col2": [3 + 30 + 300, 4 + 40 + 400]}) assert expected_df.equals(df) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_list_containing_multiple_annotated_pandas_dataframes(): - def hash_pandas_dataframe(df: pandas.DataFrame) -> str: - return str(pandas.util.hash_pandas_object(df)) + import pandas as pd + + def hash_pandas_dataframe(df: pd.DataFrame) -> str: + return str(pd.util.hash_pandas_object(df)) @task def produce_list_of_annotated_dataframes() -> ( - typing.List[Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]] + typing.List[Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]] ): - return [pandas.DataFrame({"column_1": [1, 2, 3]}), pandas.DataFrame({"column_1": [4, 5, 6]})] + return [pd.DataFrame({"column_1": [1, 2, 3]}), pd.DataFrame({"column_1": [4, 5, 6]})] @task(cache=True, cache_version="v0") - def sum_list_of_pandas_dataframes(lst: typing.List[pandas.DataFrame]) -> pandas.DataFrame: + def sum_list_of_pandas_dataframes(lst: typing.List[pd.DataFrame]) -> pd.DataFrame: return sum(lst) @workflow - def wf() -> pandas.DataFrame: + def wf() -> pd.DataFrame: lst = produce_list_of_annotated_dataframes() return sum_list_of_pandas_dataframes(lst=lst) df = wf() - expected_df = pandas.DataFrame({"column_1": [5, 7, 9]}) + expected_df = pd.DataFrame({"column_1": [5, 7, 9]}) assert expected_df.equals(df) diff --git a/tests/flytekit/unit/core/test_workflows.py b/tests/flytekit/unit/core/test_workflows.py index 974fad08ad..f0ba150f73 100644 --- a/tests/flytekit/unit/core/test_workflows.py +++ b/tests/flytekit/unit/core/test_workflows.py @@ -1,10 +1,9 @@ import os +import sys import typing from collections import OrderedDict -import pandas as pd import pytest -from pandas.testing import assert_frame_equal from typing_extensions import Annotated # type: ignore import flytekit.configuration @@ -16,7 +15,6 @@ from flytekit.core.workflow import WorkflowFailurePolicy, WorkflowMetadata, WorkflowMetadataDefaults, workflow from flytekit.exceptions.user import FlyteValidationException, FlyteValueException from flytekit.tools.translator import get_serializable -from flytekit.types.schema import FlyteSchema default_img = Image(name="default", fqn="test", tag="tag") serialization_settings = flytekit.configuration.SerializationSettings( @@ -356,66 +354,71 @@ def test_wf_docstring(): assert model_wf.template.interface.inputs["a"].description == "input a" -superset_cols = kwtypes(Name=str, Age=int, Height=int) -subset_cols = kwtypes(Name=str) -superset_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22], "Height": [160, 178]}) -subset_df = pd.DataFrame({"Name": ["Tom", "Joseph"]}) - - -@task -def t1() -> Annotated[pd.DataFrame, superset_cols]: - return superset_df - - -@task -def t2(df: Annotated[pd.DataFrame, subset_cols]) -> Annotated[pd.DataFrame, subset_cols]: - return df - - -@task -def t3(df: FlyteSchema[superset_cols]) -> FlyteSchema[superset_cols]: - return df - +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") +def test_structured_dataset_wf(): + import pandas as pd + from pandas.testing import assert_frame_equal -@task -def t4() -> FlyteSchema[superset_cols]: - return superset_df + from flytekit.types.schema import FlyteSchema + superset_cols = kwtypes(Name=str, Age=int, Height=int) + subset_cols = kwtypes(Name=str) + superset_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22], "Height": [160, 178]}) + subset_df = pd.DataFrame({"Name": ["Tom", "Joseph"]}) -@task -def t5(sd: Annotated[StructuredDataset, subset_cols]) -> Annotated[pd.DataFrame, subset_cols]: - return sd.open(pd.DataFrame).all() + @task + def t1() -> Annotated[pd.DataFrame, superset_cols]: + return superset_df + @task + def t2(df: Annotated[pd.DataFrame, subset_cols]) -> Annotated[pd.DataFrame, subset_cols]: + return df -@workflow -def sd_wf() -> Annotated[pd.DataFrame, subset_cols]: - # StructuredDataset -> StructuredDataset - df = t1() - return t2(df=df) + @task + def t3(df: FlyteSchema[superset_cols]) -> FlyteSchema[superset_cols]: + return df + @task + def t4() -> FlyteSchema[superset_cols]: + return superset_df -@workflow -def sd_to_schema_wf() -> pd.DataFrame: - # StructuredDataset -> schema - df = t1() - return t3(df=df) + @task + def t5(sd: Annotated[StructuredDataset, subset_cols]) -> Annotated[pd.DataFrame, subset_cols]: + return sd.open(pd.DataFrame).all() + @workflow + def sd_wf() -> Annotated[pd.DataFrame, subset_cols]: + # StructuredDataset -> StructuredDataset + df = t1() + return t2(df=df) -@workflow -def schema_to_sd_wf() -> typing.Tuple[pd.DataFrame, pd.DataFrame]: - # schema -> StructuredDataset - df = t4() - return t2(df=df), t5(sd=df) # type: ignore + @workflow + def sd_to_schema_wf() -> pd.DataFrame: + # StructuredDataset -> schema + df = t1() + return t3(df=df) + @workflow + def schema_to_sd_wf() -> typing.Tuple[pd.DataFrame, pd.DataFrame]: + # schema -> StructuredDataset + df = t4() + return t2(df=df), t5(sd=df) # type: ignore -def test_structured_dataset_wf(): assert_frame_equal(sd_wf(), subset_df) assert_frame_equal(sd_to_schema_wf(), superset_df) assert_frame_equal(schema_to_sd_wf()[0], subset_df) assert_frame_equal(schema_to_sd_wf()[1], subset_df) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_compile_wf_at_compile_time(): + import pandas as pd + + from flytekit.types.schema import FlyteSchema + + superset_cols = kwtypes(Name=str, Age=int, Height=int) + superset_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22], "Height": [160, 178]}) + ctx = FlyteContextManager.current_context() with FlyteContextManager.with_context( ctx.with_execution_state( @@ -423,6 +426,10 @@ def test_compile_wf_at_compile_time(): ) ): + @task + def t4() -> FlyteSchema[superset_cols]: + return superset_df + @workflow def wf(): t4() diff --git a/tests/flytekit/unit/deck/test_deck.py b/tests/flytekit/unit/deck/test_deck.py index cce6c4c5fe..d6c71b52d6 100644 --- a/tests/flytekit/unit/deck/test_deck.py +++ b/tests/flytekit/unit/deck/test_deck.py @@ -1,6 +1,6 @@ import datetime +import sys -import pandas as pd import pytest from mock import mock @@ -10,7 +10,10 @@ from flytekit.deck.deck import _output_deck +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_deck(): + import pandas as pd + df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [1, 22]}) ctx = FlyteContextManager.current_context() ctx.user_space_params._decks = [ctx.user_space_params.default_deck] @@ -66,6 +69,7 @@ def t1(a: int) -> str: assert len(ctx.user_space_params.decks) == expected_decks +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") @pytest.mark.filterwarnings("ignore:disable_deck was deprecated") @pytest.mark.parametrize( "enable_deck,disable_deck, expected_decks, expect_error", @@ -80,6 +84,8 @@ def t1(a: int) -> str: ], ) def test_deck_pandas_dataframe(enable_deck, disable_deck, expected_decks, expect_error): + import pandas as pd + ctx = FlyteContextManager.current_context() kwargs = {} diff --git a/tests/flytekit/unit/deck/test_renderer.py b/tests/flytekit/unit/deck/test_renderer.py index 257e320cfd..7263139acc 100644 --- a/tests/flytekit/unit/deck/test_renderer.py +++ b/tests/flytekit/unit/deck/test_renderer.py @@ -1,10 +1,12 @@ -import pandas as pd +import sys + import pyarrow as pa import pytest from flytekit.deck.renderer import DEFAULT_MAX_COLS, DEFAULT_MAX_ROWS, ArrowRenderer, TopFrameRenderer +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") @pytest.mark.parametrize( "rows, cols, max_rows, expected_max_rows, max_cols, expected_max_cols", [ @@ -20,6 +22,8 @@ ], ) def test_renderer(rows, cols, max_rows, expected_max_rows, max_cols, expected_max_cols): + import pandas as pd + df = pd.DataFrame({f"abc-{k}": list(range(rows)) for k in range(cols)}) pa_df = pa.Table.from_pandas(df) diff --git a/tests/flytekit/unit/experimental/test_eager_workflows.py b/tests/flytekit/unit/experimental/test_eager_workflows.py index 50ed29063c..6b01d2fc7a 100644 --- a/tests/flytekit/unit/experimental/test_eager_workflows.py +++ b/tests/flytekit/unit/experimental/test_eager_workflows.py @@ -1,10 +1,10 @@ import asyncio import os +import sys import typing from pathlib import Path import hypothesis.strategies as st -import pandas as pd import pytest from hypothesis import given, settings @@ -219,6 +219,8 @@ async def eager_wf(x: int) -> int: @task def create_structured_dataset() -> StructuredDataset: + import pandas as pd + df = pd.DataFrame({"a": [1, 2, 3]}) return StructuredDataset(dataframe=df) @@ -240,8 +242,10 @@ def create_directory() -> FlyteDirectory: return FlyteDirectory(path=dirname) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_eager_workflow_with_offloaded_types(): """Test eager workflow that eager workflows work with offloaded types.""" + import pandas as pd @eager async def eager_wf_structured_dataset() -> int: diff --git a/tests/flytekit/unit/extras/sqlite3/test_task.py b/tests/flytekit/unit/extras/sqlite3/test_task.py index f8014f244b..35e2c95fdb 100644 --- a/tests/flytekit/unit/extras/sqlite3/test_task.py +++ b/tests/flytekit/unit/extras/sqlite3/test_task.py @@ -1,6 +1,6 @@ import os +import sys -import pandas import pytest from flytekit import kwtypes, task, workflow @@ -25,6 +25,7 @@ ) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_task_static(): assert tk.output_columns is None @@ -32,6 +33,7 @@ def test_task_static(): assert df is not None +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_task_schema(): # sqlite3_start @@ -52,9 +54,12 @@ def test_task_schema(): assert df is not None +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_workflow(): + import pandas as pd + @task - def my_task(df: pandas.DataFrame) -> int: + def my_task(df: pd.DataFrame) -> int: return len(df[df.columns[0]]) sql_task = SQLite3Task( diff --git a/tests/flytekit/unit/extras/tasks/test_shell.py b/tests/flytekit/unit/extras/tasks/test_shell.py index 35a55789a2..08e5bb92af 100644 --- a/tests/flytekit/unit/extras/tasks/test_shell.py +++ b/tests/flytekit/unit/extras/tasks/test_shell.py @@ -1,5 +1,6 @@ import datetime import os +import sys import tempfile import typing from dataclasses import dataclass @@ -213,6 +214,7 @@ def test_reuse_variables_for_both_inputs_and_outputs(): t(f=test_csv, y=testdata, j=datetime.datetime(2021, 11, 10, 12, 15, 0)) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_can_use_complex_types_for_inputs_to_f_string_template(): @dataclass class InputArgs(DataClassJsonMixin): diff --git a/tests/flytekit/unit/lazy_module/__init__.py b/tests/flytekit/unit/lazy_module/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/flytekit/unit/lazy_module/test_lazy_module.py b/tests/flytekit/unit/lazy_module/test_lazy_module.py new file mode 100644 index 0000000000..714b3052e7 --- /dev/null +++ b/tests/flytekit/unit/lazy_module/test_lazy_module.py @@ -0,0 +1,12 @@ +import pytest + +from flytekit.lazy_import.lazy_module import LazyModule, lazy_module + + +def test_lazy_module(): + mod = lazy_module("pyarrow") + assert mod.__name__ == "pyarrow" + mod = lazy_module("fake_module") + assert isinstance(mod, LazyModule) + with pytest.raises(ImportError, match="Module fake_module is not yet installed."): + print(mod.attr) diff --git a/tests/flytekit/unit/core/test_flyte_pickle.py b/tests/flytekit/unit/types/pickle/test_flyte_pickle.py similarity index 96% rename from tests/flytekit/unit/core/test_flyte_pickle.py rename to tests/flytekit/unit/types/pickle/test_flyte_pickle.py index 85f2bff08f..140f8b28d9 100644 --- a/tests/flytekit/unit/core/test_flyte_pickle.py +++ b/tests/flytekit/unit/types/pickle/test_flyte_pickle.py @@ -1,9 +1,10 @@ +import sys from collections import OrderedDict from collections.abc import Sequence from typing import Dict, List, Union import numpy as np -import pandas as pd +import pytest from typing_extensions import Annotated import flytekit.configuration @@ -94,7 +95,10 @@ def t1(a: int) -> List[Dict[str, Foo]]: ) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_union(): + import pandas as pd + @task def t1(data: Annotated[Union[np.ndarray, pd.DataFrame, Sequence], "some annotation"]): print(data) diff --git a/tests/flytekit/unit/core/test_schema_types.py b/tests/flytekit/unit/types/schema/test_schema_types.py similarity index 91% rename from tests/flytekit/unit/core/test_schema_types.py rename to tests/flytekit/unit/types/schema/test_schema_types.py index 6d2e386bef..64f23cd327 100644 --- a/tests/flytekit/unit/core/test_schema_types.py +++ b/tests/flytekit/unit/types/schema/test_schema_types.py @@ -1,6 +1,6 @@ +import sys from datetime import datetime, timedelta -import pandas as pd import pytest from flytekit import kwtypes @@ -9,7 +9,6 @@ from flytekit.core.type_engine import TypeEngine from flytekit.types.schema import FlyteSchema, SchemaFormat from flytekit.types.schema.types import FlyteSchemaTransformer -from flytekit.types.schema.types_pandas import PandasDataFrameTransformer def test_typed_schema(): @@ -55,7 +54,12 @@ def test_bad_conversion(): TypeEngine.guess_python_type(lt) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") def test_to_html(): + import pandas as pd + + from flytekit.types.schema.types_pandas import PandasDataFrameTransformer + df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) tf = PandasDataFrameTransformer() output = tf.to_html(FlyteContextManager.current_context(), df, pd.DataFrame) diff --git a/tests/flytekit/unit/core/tracker/test_arrow_data.py b/tests/flytekit/unit/types/structured_dataset/test_arrow_data.py similarity index 53% rename from tests/flytekit/unit/core/tracker/test_arrow_data.py rename to tests/flytekit/unit/types/structured_dataset/test_arrow_data.py index 747e7f1651..9df8c9ba4b 100644 --- a/tests/flytekit/unit/core/tracker/test_arrow_data.py +++ b/tests/flytekit/unit/types/structured_dataset/test_arrow_data.py @@ -1,23 +1,26 @@ +import sys import typing -import pandas as pd import pyarrow as pa +import pytest from typing_extensions import Annotated from flytekit import kwtypes, task -cols = kwtypes(Name=str, Age=int) -subset_cols = kwtypes(Name=str) +@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.") +def test_structured_dataset_wf(): + import pandas as pd -@task -def t1( - df1: Annotated[pd.DataFrame, cols], df2: Annotated[pa.Table, cols] -) -> typing.Tuple[Annotated[pd.DataFrame, subset_cols], Annotated[pa.Table, subset_cols]]: - return df1, df2 + cols = kwtypes(Name=str, Age=int) + subset_cols = kwtypes(Name=str) + @task + def t1( + df1: Annotated[pd.DataFrame, cols], df2: Annotated[pa.Table, cols] + ) -> typing.Tuple[Annotated[pd.DataFrame, subset_cols], Annotated[pa.Table, subset_cols]]: + return df1, df2 -def test_structured_dataset_wf(): pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) pa_df = pa.Table.from_pandas(pd_df) diff --git a/tests/flytekit/unit/types/structured_dataset/test_bigquery.py b/tests/flytekit/unit/types/structured_dataset/test_bigquery.py index 2877ed3743..76adb4adf2 100644 --- a/tests/flytekit/unit/types/structured_dataset/test_bigquery.py +++ b/tests/flytekit/unit/types/structured_dataset/test_bigquery.py @@ -1,9 +1,11 @@ import mock -import pandas as pd +import pytest from typing_extensions import Annotated from flytekit import StructuredDataset, kwtypes, task, workflow +pd = pytest.importorskip("pandas") + pd_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) my_cols = kwtypes(Name=str, Age=int) diff --git a/tests/flytekit/unit/core/test_structured_dataset.py b/tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py similarity index 99% rename from tests/flytekit/unit/core/test_structured_dataset.py rename to tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py index 1e4277f2f3..cbe3fc422a 100644 --- a/tests/flytekit/unit/core/test_structured_dataset.py +++ b/tests/flytekit/unit/types/structured_dataset/test_structured_dataset.py @@ -2,7 +2,6 @@ import tempfile import typing -import pandas as pd import pyarrow as pa import pytest from fsspec.utils import get_protocol @@ -29,6 +28,8 @@ extract_cols_and_format, ) +pd = pytest.importorskip("pandas") + my_cols = kwtypes(w=typing.Dict[str, typing.Dict[str, int]], x=typing.List[typing.List[int]], y=int, z=str) fields = [("some_int", pa.int32()), ("some_string", pa.string())] diff --git a/tests/flytekit/unit/core/test_structured_dataset_handlers.py b/tests/flytekit/unit/types/structured_dataset/test_structured_dataset_handlers.py similarity index 99% rename from tests/flytekit/unit/core/test_structured_dataset_handlers.py rename to tests/flytekit/unit/types/structured_dataset/test_structured_dataset_handlers.py index b26349ceeb..b18da019ee 100644 --- a/tests/flytekit/unit/core/test_structured_dataset_handlers.py +++ b/tests/flytekit/unit/types/structured_dataset/test_structured_dataset_handlers.py @@ -1,7 +1,6 @@ import typing import mock -import pandas as pd import pyarrow as pa import pytest @@ -17,8 +16,8 @@ StructuredDatasetTransformerEngine, ) +pd = pytest.importorskip("pandas") my_cols = kwtypes(w=typing.Dict[str, typing.Dict[str, int]], x=typing.List[typing.List[int]], y=int, z=str) - fields = [("some_int", pa.int32()), ("some_string", pa.string())] arrow_schema = pa.schema(fields) diff --git a/tests/flytekit/unit/types/structured_dataset/test_structured_dataset_workflow.py b/tests/flytekit/unit/types/structured_dataset/test_structured_dataset_workflow.py index 9beb2301db..3b0bf96e7a 100644 --- a/tests/flytekit/unit/types/structured_dataset/test_structured_dataset_workflow.py +++ b/tests/flytekit/unit/types/structured_dataset/test_structured_dataset_workflow.py @@ -2,7 +2,6 @@ import typing import numpy as np -import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import pytest @@ -23,6 +22,8 @@ StructuredDatasetTransformerEngine, ) +pd = pytest.importorskip("pandas") + PANDAS_PATH = FlyteContextManager.current_context().file_access.get_random_local_directory() NUMPY_PATH = FlyteContextManager.current_context().file_access.get_random_local_directory() BQ_PATH = "bq://flyte-dataset:flyte.table"