diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index e04fb7c..06732ab 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,29 +1,42 @@ { "name": "Fugue Development Environment", - "image": "fugueproject/devenv:0.4.4", - "settings": { - "terminal.integrated.shell.linux": "/bin/bash", - "python.pythonPath": "/usr/local/bin/python", - "python.linting.enabled": true, - "python.linting.pylintEnabled": true, - "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8", - "python.formatting.blackPath": "/usr/local/py-utils/bin/black", - "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf", - "python.linting.banditPath": "/usr/local/py-utils/bin/bandit", - "python.linting.flake8Path": "/usr/local/py-utils/bin/flake8", - "python.linting.mypyPath": "/usr/local/py-utils/bin/mypy", - "python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle", - "python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle", - "python.linting.pylintPath": "/usr/local/py-utils/bin/pylint" + "_image": "fugueproject/devenv:0.7.7", + "image": "mcr.microsoft.com/vscode/devcontainers/python:3.10", + "customizations": { + "vscode": { + "settings": { + "terminal.integrated.shell.linux": "/bin/bash", + "python.pythonPath": "/usr/local/bin/python", + "python.defaultInterpreterPath": "/usr/local/bin/python", + "isort.interpreter": [ + "/usr/local/bin/python" + ], + "flake8.interpreter": [ + "/usr/local/bin/python" + ], + "pylint.interpreter": [ + "/usr/local/bin/python" + ] + }, + "extensions": [ + "ms-python.python", + "ms-python.isort", + "ms-python.flake8", + "ms-python.pylint", + "ms-python.mypy", + "GitHub.copilot", + "njpwerner.autodocstring" + ] + } }, - "extensions": [ - "ms-python.python" - ], "forwardPorts": [ 8888 ], "postCreateCommand": "make devenv", - "mounts": [ - "source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind" - ] + "features": { + "ghcr.io/devcontainers/features/docker-in-docker:2": {}, + "ghcr.io/devcontainers/features/java:1": { + "version": "11" + } + } } diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8cc05a4..b7ed7a9 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,7 +9,6 @@ jobs: strategy: matrix: python-version: - - "3.7" - "3.8" - "3.9" - "3.10" @@ -30,10 +29,9 @@ jobs: python -m pip install --upgrade --upgrade-strategy eager -e ".[dev]" - name: Run tests - env: - PREFECT_ORION_DATABASE_CONNECTION_URL: "sqlite+aiosqlite:///./orion-tests.db" run: | - prefect orion database reset -y + prefect config set PREFECT_API_DATABASE_CONNECTION_URL="sqlite+aiosqlite:///./orion-tests.db" + prefect server database reset -y coverage run --branch -m pytest tests -vv coverage report - name: "Upload coverage to Codecov" diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7aaa1ad..5542627 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,9 +1,4 @@ repos: - - repo: https://github.com/pycqa/isort - rev: 5.10.1 - hooks: - - id: isort - language_version: python3 - repo: https://github.com/psf/black rev: 22.3.0 hooks: diff --git a/Makefile b/Makefile index b566eba..4234530 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,9 @@ devenv: pre-commit install fugue-jupyter install startup +login: + prefect cloud login -k $(PREFECT_API_KEY) + docs: rm -rf docs/api rm -rf docs/build diff --git a/README.md b/README.md index aa4105a..759d104 100644 --- a/README.md +++ b/README.md @@ -32,12 +32,10 @@ The best way to get started with Fugue is to work through the 10 minute tutorial ### Python setup -Requires an installation of Python 3.7+. +Requires an installation of Python 3.8+. We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv. -These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the [Prefect documentation](https://orion-docs.prefect.io/). - ### Installation Install `prefect-fugue` with `pip`: @@ -80,15 +78,17 @@ For example, a Databricks Block could look like: } ``` +As long as you installed `prefect_fugue`, Fugue is able to recognize and convert a block expression to a `FugueExecutionEngine`. For example if you have a block with path `fugue/databricks`, then the expression `prefect:fugue/databricks` becomes a valid execution engine expression. When fugue parse this expression, it will load the `Block` from `fugue/databricks`, then base on the fields of the block, it will construct a `DatabricksExecutionEngine` for your Fugue operations. + ### Using a Spark Cluster Inside a Flow Let’s start by running code on top of Databricks. `databricks-connect` is already installed in this environment. This section may have a lot of logs because of the monitoring provided by Prefect. This section also assumes that the user has Prefect configured to the right workspace. -Below we have one task that takes in a `SparkSession` and uses it to run some Spark code. We can then use this in the Prefect Flow with the `fugue_engine` context. This `fugue_engine` will create an ephemeral cluster to run the code underneath, and then turn off when finished. +Below we have one task that takes in a `SparkSession` and uses it to run some Spark code. We can then use this in the Prefect Flow with the `fugue.api.engine_context`. This will create an ephemeral cluster to run the code underneath, and then turn off when finished. ```python from prefect import task, flow -from prefect_fugue import fugue_engine +import fugue.api as fa @task def my_spark_task(spark, n=1): @@ -96,20 +96,53 @@ def my_spark_task(spark, n=1): df.show() @flow -def native_spark_flow(engine): - with fugue_engine(engine) as engine: - my_spark_task(engine.spark_session, 1) +def spark_flow(engine): + with fa.engine_context(engine) as spark_engine: + my_spark_task(spark_engine.spark_session, 1) -native_spark_flow("fugue/databricks") +spark_flow("prefect:fugue/databricks") # pay attention to the engine expression ``` -Similarly, if you don’t use Databricks but have your own way to get a `SparkSession`, you can directly pass the `SparkSession` into the Flow. The `fugue_engine` context will be able to interpret this. +Similarly, if you don’t use Databricks but have your own way to get a `SparkSession`, you can directly pass the `SparkSession` into the Flow. ```python from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() -native_spark_flow(spark) +spark_flow(spark) +``` + +## More Flexibility + +`fugue.api.engine_context` creates a context under which all fugue operations will use the context engine by default. This works either the fugue code is directly under the context or inside the tasks or flows that are invoked under this context. + +```python +from prefect import task, flow +import fugue.api as fa + +def my_transformer(df:pd.DataFrame) -> pd.DataFrame: + return df + +@task +def sub_task(path:str): + df = fa.load(path) + df = fa.transform(df, my_transformer, schema="*") + df.save(path+".output.parquet") + +@flow +def sub_flow(path:str): + df = fa.load(path) + df = fa.transform(df, my_transformer, schema="*") + df.save(path+".output.parquet") + +@flow +def main_flow(path, engine=None): + with fa.engine_context(engine) as spark_engine: + sub_task(path) + sub_flow(path) + +main_flow("") # test locally, all tasks and flows run without Spark +main_flow("", "prefect:fugue/databricks") # all tasks and flows will run on Databrickes ``` ## Testing Locally Before Running Map Jobs on Spark, Dask, and Ray @@ -121,7 +154,8 @@ When testing the Flow, we can pass `None` as the engine so everything runs on Pa ```python from time import sleep import pandas as pd -from prefect_fugue import transform +import fugue.api as fa +from prefect import task, flow @task def create_jobs(n) -> pd.DataFrame: @@ -135,7 +169,7 @@ def run_one_job(df:pd.DataFrame) -> pd.DataFrame: @flow def run_all_jobs(n, engine=None): jobs = create_jobs(n) - with fugue_engine(engine): + with fa.engine_context(engine): return transform(jobs, run_one_job, partition="per_row", as_local=True) ``` @@ -148,7 +182,7 @@ run_all_jobs(1) # run locally on Pandas Becasue it succeeded, we can now attach our Fugue Databricks `Block` to run on Databricks. Now we run on 8 jobs, and we’ll see that parallelization from the Spark cluster will make this Flow execute faster. ```python -run_all_jobs(8, "fugue/databricks") # run on databricks +run_all_jobs(8, "prefect:fugue/databricks") # run on databricks ``` There is still some overhead with sending the work, but the time is decreased compared to the expected execution time if ran sequentially (40 seconds). @@ -156,7 +190,7 @@ There is still some overhead with sending the work, but the time is decreased co We can also use local Dask by passing the string `"dask"`. We can also pass a `Dask Client()` or use the Fugue Engine `Block` with [Coiled](https://coiled.io/). More information can be found in the [Coiled cloudprovider docs](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/coiled.html). ```python -run_all_jobs(4, "dask") +run_all_jobs(4, dask_client) ``` ## Running SQL on any Spark, Dask, and Duckdb @@ -166,16 +200,18 @@ Prototyping locally, and then running the full job on the cluster is also possib Here we can load in data and perform a query with FugueSQL. FugueSQL has additional keywords such as LOAD and SAVE so we can run everything from loading, processing, and saving all on DuckDB or SparkSQL. More information on FugueSQL can be found in the FugueSQL tutorials. ```python +import fugue.api as fa + @flow -def run_sql_full(top, engine): - with fugue_engine(engine): - fsql(""" +def run_sql(top, engine): + with fa.engine_context(engine): + fa.fugue_sql_flow(""" df = LOAD "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet" SELECT PULocationID, COUNT(*) AS ct FROM df GROUP BY 1 ORDER BY 2 DESC LIMIT {{top}} PRINT - """, top=top) + """, top=top).run() ``` To debug locally without SparkSQL, we can use DuckDB as the engine. @@ -187,7 +223,7 @@ run_sql(2, "duckdb"); # debug/develop without spark Again to run on the cluster, we can use the Databricks Block. ```python -run_sql(10, "fugue/databricks") +run_sql(10, "prefect:fugue/databricks") ``` ## Resources diff --git a/orion-tests.db b/orion-tests.db new file mode 100644 index 0000000..639a5b0 Binary files /dev/null and b/orion-tests.db differ diff --git a/prefect_fugue/__init__.py b/prefect_fugue/__init__.py index 72854e6..9c38dda 100644 --- a/prefect_fugue/__init__.py +++ b/prefect_fugue/__init__.py @@ -1,6 +1,4 @@ from . import _version -from .context import fugue_engine # noqa -from .tasks import fsql, transform # noqa -from .blocks import FugueEngine # noqa +from .blocks import FugueEngine, block_to_fugue_engine # noqa __version__ = _version.get_versions()["version"] diff --git a/prefect_fugue/blocks.py b/prefect_fugue/blocks.py index f56b752..76a7428 100644 --- a/prefect_fugue/blocks.py +++ b/prefect_fugue/blocks.py @@ -1,14 +1,45 @@ -from typing import Any, Dict, Optional - -from fugue import ( - ExecutionEngine, - make_execution_engine, - parse_execution_engine, - register_execution_engine, -) +from typing import Any, Dict, Optional, Type + +from fugue import ExecutionEngine +from fugue._utils.registry import fugue_plugin +from fugue.dev import make_execution_engine from prefect.blocks.core import Block from pydantic import Field, SecretStr -from triad import ParamDict, assert_or_throw, run_at_def +from triad import ParamDict + + +def block_to_fugue_engine(block_type: Type[Block]) -> Any: + """ + Convert a Prefect Block to a Fugue ExecutionEngine + + Args: + block (Block): the block to be converted + conf (Any): the config to initialize the block + + Returns: + ExecutionEngine: the converted Fugue ExecutionEngine + + Example: + + .. code-block:: python + + from prefect_fugue import block_to_fugue_engine + from fugue.dev import make_execution_engine + + @block_to_fugue_engine(MyBlock) + def my_block_to_fugue_engine( + block: MyBlock, conf: Any=None) -> ExecutionEngine: + return make_execution_engine(block.name, conf) + + """ + return _block_to_fugue_engine.candidate( + lambda block, *args, **kwargs: isinstance(block, block_type) + ) # noqa + + +@fugue_plugin +def _block_to_fugue_engine(block: Block, conf: Any = None) -> ExecutionEngine: + raise NotImplementedError(f"{type(Block)} to Fugue Engine is not registered") class FugueEngine(Block): @@ -46,28 +77,14 @@ class FugueEngine(Block): description="A JSON-dict-compatible value", ) - def make_engine(self, custom_conf: Any = None) -> ExecutionEngine: - conf = ParamDict(self.conf) - if self.secret_conf is not None: - for k, v in self.secret_conf.items(): - conf[k] = v.get_secret_value() - conf.update(ParamDict(custom_conf)) - return make_execution_engine(self.engine, conf) - - -@parse_execution_engine.candidate( - matcher=lambda engine, conf, **kwargs: isinstance(engine, str) and "/" in engine -) -def _parse_config_engine(engine: Any, conf: Any, **kwargs: Any) -> ExecutionEngine: - block = Block.load(engine) - assert_or_throw( - isinstance(block, FugueEngine), f"{block} is not a FugueEngine config block" - ) - return make_execution_engine(block, conf=conf) - -@run_at_def -def _register(): - register_execution_engine( - FugueEngine, lambda engine, conf, **kwargs: engine.make_engine(conf) - ) +@block_to_fugue_engine(FugueEngine) +def _fugue_block_to_fugue_engine( + block: FugueEngine, conf: Any = None +) -> ExecutionEngine: + _conf = ParamDict(block.conf) + if block.secret_conf is not None: + for k, v in block.secret_conf.items(): + _conf[k] = v.get_secret_value() + _conf.update(ParamDict(conf)) + return make_execution_engine(block.engine, _conf) diff --git a/prefect_fugue/context.py b/prefect_fugue/context.py deleted file mode 100644 index 71499f1..0000000 --- a/prefect_fugue/context.py +++ /dev/null @@ -1,124 +0,0 @@ -from contextlib import contextmanager -from typing import Any, List, Optional - -from fugue import ExecutionEngine, make_execution_engine -from prefect.context import ContextModel, ContextVar - - -class FugueEngineContext(ContextModel): - """ - The context for Fugue ExecutionEngine. - - Attributes: - engines: A stack of Fugue ExecutionEngines - """ - - engines: List[ExecutionEngine] = [] - - @classmethod - def get(cls) -> "FugueEngineContext": - # Return an empty `FugueEngineContext` instead of `None` if no context exists - return cls.__var__.get(FugueEngineContext()) - - def __exit__(self, *_): - try: - if len(self.engines) > 0: - self.engines.pop()._prefect_context_stop() - finally: - super().__exit__(*_) - - __var__ = ContextVar("fugue_engine") - - -def current_fugue_engine() -> Optional[ExecutionEngine]: - """ - Get the current Fugue ExecutionEngine created by the latest context manager - - Returns: - ExecutionEngine, optional: if within a context, then the latest Fugue - ExecutionEngine created by ``fugue_engine``, else None. - """ - - engines = FugueEngineContext.get().engines - return None if len(engines) == 0 else engines[-1] - - -def get_current_checkpoint( - checkpoint: Optional[bool] = None, -) -> bool: - """ - Get the current checkpoint setting - - Args: - - checkpoint (bool, optional): get the checkpoint setting, defaults to None. - - Returns: - bool: if ``checkpoint`` is not None then the value itself, else if it is in - a ``fugue_engine`` context manager, then return the checkpoint setting of the - current engine, else False. - """ - if checkpoint is not None: - return checkpoint - current_engine = current_fugue_engine() - if current_engine is not None: - return current_engine._prefect_default_checkpoint - return False - - -@contextmanager -def fugue_engine( - engine: Any = None, - conf: Any = None, - checkpoint: bool = False, -) -> ExecutionEngine: - """ - Context manager to create a new Fugue Execution Engine. - - Args: - - engine (object, optional): the object that can be converted to a - Fugue ``ExecutionEngine``. - - conf (object, optional): the object that can be converted to a - dict of Fugue configs. - - checkpoint (bool): for the steps using this engine, whether to - enable checkpoint, defaults to False. - - - Yields: - The current Fugue Execution Engine - - Examples: - - ```python - from prefect import flow - from prefect_fugue import fugue_engine, fsql - - @flow - def my_flow(): - with fugue_engine("duckdb"): - res = fsql("CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x") - fsql("PRINT x", res) - - my_flow() - - @flow - def flexible_flow(engine): - with fugue_engine(engine, {"some_config":"hello"}): - res = fsql("CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x") - fsql("PRINT x", res) - - flexible_flow("duckdb") # using DuckDB backend - flexible_flow("fugue/my_engine_conf") # using a FugueEngine block - ``` - - """ - engines = FugueEngineContext.get().engines - new_engine = make_execution_engine(engine, conf=conf) - new_engine._prefect_context_stop = new_engine.stop - new_engine.stop = _no_op_stop - new_engine._prefect_default_checkpoint = checkpoint - with FugueEngineContext(engines=list(engines) + [new_engine]): - yield new_engine - - -def _no_op_stop() -> None: - pass diff --git a/prefect_fugue/registry.py b/prefect_fugue/registry.py new file mode 100644 index 0000000..fb4ce22 --- /dev/null +++ b/prefect_fugue/registry.py @@ -0,0 +1,18 @@ +from typing import Any, Union + +from fugue import ExecutionEngine +from fugue.plugins import parse_execution_engine +from prefect.blocks.core import Block + +from .blocks import _block_to_fugue_engine + + +@parse_execution_engine.candidate( + matcher=lambda engine, *args, **kwargs: isinstance(engine, Block) + or (isinstance(engine, str) and engine.startswith("prefect:")) +) +def _parse_prefect_engine( + engine: Union[str, Block], conf: Any = None +) -> ExecutionEngine: + block = Block.load(engine.split(":", 1)[1]) if isinstance(engine, str) else engine + return _block_to_fugue_engine(block, conf) diff --git a/prefect_fugue/tasks.py b/prefect_fugue/tasks.py deleted file mode 100644 index fcd6139..0000000 --- a/prefect_fugue/tasks.py +++ /dev/null @@ -1,290 +0,0 @@ -from typing import Any, Dict, List, Optional - -import fugue -import fugue_sql -from fugue.constants import FUGUE_CONF_SQL_IGNORE_CASE -from fugue.extensions.transformer.convert import _to_transformer -from prefect import get_run_logger, task -from prefect.utilities.hashing import hash_objects -from triad.utils.convert import get_caller_global_local_vars - -from ._utils import suffix -from .context import current_fugue_engine, get_current_checkpoint - -_TASK_NAME_MAX_LEN = 50 - - -def fsql( - query: str, - yields: Any = None, - engine: Any = None, - engine_conf: Any = None, - checkpoint: Optional[bool] = None, - fsql_ignore_case: bool = False, - **kwargs: Any -) -> dict: - """ - Function for running Fugue SQL. - - This function generates the Prefect task that runs Fugue SQL. - - Args: - query (str): the Fugue SQL query - yields (Any): the yielded dataframes from the previous tasks, - defaults to None. It can be a single yielded result or an array of - yielded results (see example) - engine (Any): execution engine expression that can be recognized by Fugue, - default to None (the default ExecutionEngine of Fugue) - engine_conf (Any): extra execution engine configs, defaults to None - checkpoint (bool, optional): whether to checkpoint this task in Prefect, - defaults to None (determined by the ``fugue_engine`` context). - **kwargs (Any, optional): additional kwargs to pass to Fugue's `fsql` function - - References: - See: [Fugue SQL - Tutorial](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes_sql.html) - - - Example: - ```python - from prefect import flow, task - from prefect.tasks.fugue import fsql, fugue_engine - import pandas as pd - - # Basic usage - @flow - def flow1() - res1 = fsql("CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x") - res2 = fsql("CREATE [[1],[2]] SCHEMA a:int YIELD DATAFRAME AS y") - fsql(''' - SELECT * FROM x UNION SELECT * FROM y - SELECT * WHERE a<2 - PRINT - ''', [res1, res2]) # SQL union using pandas - fsql(''' - SELECT * FROM x UNION SELECT * FROM y - SELECT * WHERE a<2 - PRINT - ''', [res1, res2], engine="duckdb") # SQL union using duckdb (if installed) - - # Pass in other parameters and dataframes - @task - def gen_df(): - return pd.DataFrame(dict(a=[1])) - - @task - def gen_path(): - return "/tmp/t.parquet" - - @flow - def flow2() - df = gen_df() - path = gen_path() - fsql(''' - SELECT a+1 AS a FROM df - SAVE OVERWRITE {{path}} - ''', df=df, path=path) - - # Disable checkpoint for distributed dataframes - from pyspark.sql import SparkSession - - spark = SparkSession.builder.getOrCreate() - - @flow - def flow3() - with fugue_engine(spark, checkpoint=False): - # res1 needs to turn off checkpoint because it yields - # a Spark DataFrame - res1 = fsql(''' - CREATE [[1],[2]] SCHEMA a:int YIELD DATAFRAME AS y - ''') - - # res2 doesn't need to turn off checkpoint because it yields - # a local DataFrame (most likely Pandas DataFrame) - res2 = fsql(''' - CREATE [[1],[2]] SCHEMA a:int YIELD LOCAL DATAFRAME AS y - ''', checkpoint=True) - - # res3 doesn't need to turn off checkpoint because it yields - # a file (the dataframe is cached in the file) - res3 = fsql(''' - CREATE [[-1],[3]] SCHEMA a:int YIELD FILE AS z - ''', checkpoint=True) - - # this step doesn't need to turn off checkpoint because it - # doesn't have any output - fsql(''' - SELECT * FROM x UNION SELECT * FROM y UNION SELECT * FROM z - SELECT * WHERE a<2 - PRINT - ''', [res1, res2, res3]) - ``` - - Note: The best practice is always yielding files or local dataframes. If - you want to yield a distributed dataframe such as Spark or Dask, think it twice. - `YIELD FILE` is always preferred when Fugue SQL is running as a Prefect task. - If you feel `YIELD FILE` is too heavy, that means your - SQL logic may not be heavy enough to be broken into multiple tasks. - """ - tn = _truncate_name(query) - if engine is None and engine_conf is None: - engine = current_fugue_engine() - elif checkpoint is None: - checkpoint = False - - global_vars, local_vars = get_caller_global_local_vars() - - @task( - name=tn + suffix(), - description=query, - cache_key_fn=_get_cache_key_fn(checkpoint), - ) - def run_fsql( - query: str, - yields: Any, - engine: Any = None, - engine_conf: Any = None, - ) -> dict: - logger = get_run_logger() - logger.debug(query) - dag = fugue_sql.FugueSQLWorkflow( - None, {FUGUE_CONF_SQL_IGNORE_CASE: fsql_ignore_case} - ) - try: - dag._sql( - query, global_vars, local_vars, *_normalize_yields(yields), **kwargs - ) - except SyntaxError as ex: - raise SyntaxError(str(ex)).with_traceback(None) from None - dag.run(engine, engine_conf) - result: Dict[str, Any] = {} - for k, v in dag.yields.items(): - if isinstance(v, fugue.dataframe.YieldedDataFrame): - result[k] = v.result # type: ignore - else: - result[k] = v # type: ignore - return result - - return run_fsql(query=query, yields=yields, engine=engine, engine_conf=engine_conf) - - -def transform( - df: Any, - transformer: Any, - engine: Any = None, - engine_conf: Any = None, - checkpoint: Optional[bool] = None, - **kwargs -) -> Any: - """ - Function for running Fugue transform function. - - This function generates the Prefect task that runs Fugue transform. - - Args: - df (Any): a dataframe or a file path generated from the previous steps - transformer (Any): a function or class that be recognized by - Fugue as a transformer - engine (Any): execution engine expression that can be recognized by Fugue, - default to None (the default ExecutionEngine of Fugue) - engine_conf (Any): extra execution engine configs, defaults to None - checkpoint (bool, optional): whether to checkpoint this task in Prefect, - defaults to None (determined by the ``fugue_engine`` context). - **kwargs (Any, optional): additional kwargs to pass to - Fugue's `transform` function - - References: - See: [Fugue - Transform](https://fugue-tutorials.readthedocs.io/tutorials/extensions/transformer.html) - - Example: - ```python - from prefect import flow, task - from prefect.tasks.fugue import transform, fsql, fugue_engine - from dask.distributed import Client - import pandas as pd - - client = Client(processes=True) - - # Basic usage - @task - def gen_df() -> pd.DataFrame: - return pd.DataFrame(dict(a=[1])) - - @task - def show_df(dask_df): - print(dask_df.compute()) - - def add_col(df:pd.DataFrame) -> pd.DataFrame - return df.assign(b=2) - - @flow - def flow1(): - df = gen_df() - dask_df = transform(df, add_col, schema="*,b:int", engine=client) - show_df(dask_df) - - # Turning on checkpoint when returning a local dataframe - @flow - def flow2(): - df = gen_df() - local_df = transform(df, add_col, schema="*,b:int", - engine=client, as_local=True, checkpoint=True) - - # fsql + transform - @flow - def flow3(): - with fugue_engine(client, checkpoint=False): - dfs = fsql("CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x") - dask_df = transform(dfs["x"], add_col, schema="*,b:int") - fsql(''' - SELECT * FROM df WHERE b<3 - PRINT - ''', df=dask_df) - ``` - """ - tn = transformer.__name__ + " (transfomer)" - if engine is None and engine_conf is None: - engine = current_fugue_engine() - elif checkpoint is None: - checkpoint = False - - _t = _to_transformer(transformer, kwargs.get("schema", None)) - - @task(name=tn + suffix(), cache_key_fn=_get_cache_key_fn(checkpoint)) - def _run_with_func(df: Any, **kwargs): - kw = dict(kwargs) - kw.pop("schema", None) - return fugue.transform(df, _t, **kw) - - return _run_with_func(df, engine=engine, engine_conf=engine_conf, **kwargs) - - -def _truncate_name(name: str, max_len: int = _TASK_NAME_MAX_LEN) -> str: - if name is None: - raise ValueError("task name can't be None") - if len(name) <= max_len: - return name.strip() - return name[:max_len].strip() + "..." - - -def _normalize_yields(yields: Any) -> List[Any]: - if yields is None: - return [] - elif isinstance(yields, (list, tuple)): - return list(yields) - else: # single item - return [yields] - - -def _get_cache_key_fn(checkpoint: Optional[bool]) -> Any: - def extract_kwargs(kw: Dict[str, Any]) -> Dict[str, Any]: - return {k: v for k, v in kw.items() if k not in ["engine", "engine_conf"]} - - def _key(ctx, kwargs): - return hash_objects(ctx.task.fn.__code__.co_code, extract_kwargs(kwargs)) - - ck = get_current_checkpoint(checkpoint) - if not ck: - return None - return _key diff --git a/requirements.txt b/requirements.txt index 7638f16..5ce77ef 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ prefect>=2.0.1 -fugue>=0.7.1 \ No newline at end of file +fugue>=0.8.6 \ No newline at end of file diff --git a/setup.py b/setup.py index 433c3d6..5aad67a 100644 --- a/setup.py +++ b/setup.py @@ -24,16 +24,18 @@ version=versioneer.get_version(), cmdclass=versioneer.get_cmdclass(), packages=find_packages(exclude=("tests", "docs")), - python_requires=">=3.7", + python_requires=">=3.8", install_requires=install_requires, extras_require={"dev": dev_requires}, + entry_points={ + "fugue.plugins": ["prefect = prefect_fugue.registry"], + }, classifiers=[ "Natural Language :: English", "Intended Audience :: Developers", "Intended Audience :: System Administrators", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", diff --git a/tests/test_block.py b/tests/test_block.py index 87a6d34..247e779 100644 --- a/tests/test_block.py +++ b/tests/test_block.py @@ -1,10 +1,11 @@ from typing import Any, List +import fugue.api as fa from fugue import ExecutionEngine from prefect import flow from pydantic import SecretStr -from prefect_fugue import FugueEngine, fsql, fugue_engine +from prefect_fugue import FugueEngine def test_fugue_engine_block(mocker): @@ -19,13 +20,24 @@ def test_fugue_engine_block(mocker): @flow def fft1(): - with fugue_engine("fugue/ddb"): - res = fsql("CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS a") - fsql("OUTPUT a USING assert_result", res) - - with fugue_engine("native", conf={"a": "xyz", "b": 1}): - res = fsql("CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS a") - fsql("OUTPUT a USING assert_result", res) + with fa.engine_context("prefect:fugue/ddb"): + res = fa.fugue_sql_flow( + "CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS a" + ).run() + fa.fugue_sql_flow("OUTPUT a USING assert_result", res).run() + + fa.fugue_sql_flow( + """ + CREATE [[0]] SCHEMA a:int + OUTPUT USING assert_result + """ + ).run("prefect:fugue/ddb") + + with fa.engine_context("native", {"a": "xyz", "b": 1}): + res = fa.fugue_sql_flow( + "CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS a" + ).run() + fa.fugue_sql_flow("OUTPUT a USING assert_result", res).run() fft1() diff --git a/tests/test_context.py b/tests/test_context.py deleted file mode 100644 index e41a08f..0000000 --- a/tests/test_context.py +++ /dev/null @@ -1,80 +0,0 @@ -import pandas as pd -from prefect import flow - -from prefect_fugue import fsql, fugue_engine, transform - - -def test_fugue_engine(): - @flow(name="fet1") - def fet1(): - with fugue_engine("duckdb"): - res = fsql("""CREATE [['abc'],['aa']] SCHEMA a:str YIELD DATAFRAME AS df""") - fsql( - """ - SELECT * FROM df WHERE a LIKE '%bc' - PRINT - """, - res, - ) - - fet1() - - # schema: * - def t(df: pd.DataFrame) -> pd.DataFrame: - return df - - @flow(name="fet1") - def fet2(): - with fugue_engine("duckdb"): - res = fsql("""CREATE [['abc'],['aa']] SCHEMA a:str YIELD DATAFRAME AS df""") - with fugue_engine("dask"): - res = transform(res["df"], t, as_local=True) - fsql( - """ - SELECT * FROM ddf WHERE a LIKE '%bc' - PRINT - """, - ddf=res, - ) - - fet2() - - -def test_checkpoint(): - # schema: * - def t(df: pd.DataFrame) -> pd.DataFrame: - return df - - @flow(name="fct1") - def fct1(): - with fugue_engine(checkpoint=True): - res = fsql("""CREATE [['abc'],['aa']] SCHEMA a:str YIELD DATAFRAME AS df""") - res = transform(res["df"], t, as_local=True) - fsql( - """ - SELECT * FROM df - PRINT - """, - df=res, - ) - - fct1() - - @flow(name="fct2") - def fct2(): - with fugue_engine("duckdb"): - res = fsql( - """CREATE [['abc'],['aa']] SCHEMA a:str YIELD DATAFRAME AS df""", - engine="native", - ) - res = transform(res["df"], t, as_local=True, engine="native") - fsql( - """ - SELECT * FROM df - PRINT - """, - df=res, - engine="native", - ) - - fct2() diff --git a/tests/test_tasks.py b/tests/test_tasks.py deleted file mode 100644 index fc7e1e8..0000000 --- a/tests/test_tasks.py +++ /dev/null @@ -1,144 +0,0 @@ -from typing import Any, List - -from fugue import register_global_conf -from fugue.constants import FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH -from prefect import flow, task -from prefect.task_runners import SequentialTaskRunner -from pytest import raises - -from prefect_fugue import fsql, transform -from prefect_fugue.tasks import _normalize_yields, _truncate_name - - -def test_fsql(tmpdir): - register_global_conf({FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH: str(tmpdir)}) - - # simplest case - @flow(retries=3, task_runner=SequentialTaskRunner()) - def test_fsql1(): - result = fsql("""CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x""") - wf_assert(result, lambda dfs: dfs["x"].as_array() == [[0]]) - - test_fsql1() - - # with simple parameter - @flow(retries=3) - def test_fsql2(): - result = fsql("""CREATE [[{{x}}]] SCHEMA a:int YIELD DATAFRAME AS x""", x=0) - wf_assert(result, lambda dfs: dfs["x"].as_array() == [[0]]) - - test_fsql2() - - # with Prefect parameter, also with very long query - s = ( - "as;f;lkasdf;lkasd;lfalk;sdflk;asdl;kf;lkasdf;lkas;l;;" - "lka;sldkgfj;lkasdf;lkasd;lkfa;lksdf;lajsdf;lka;lskdf" - ) - - @flow(retries=3) - def test_fsql3(x): - result = fsql("""CREATE [["{{x}}"]] SCHEMA a:str YIELD DATAFRAME AS x""", x=x) - wf_assert(result, lambda dfs: dfs["x"].as_array() == [[x]]) - - test_fsql3(x=s) - - # with df parameter - @flow(retries=3) - def test_fsql4(d): - result1 = fsql("""CREATE [[0]] SCHEMA a:int YIELD FILE AS x""") - # pass result1 as yields - result2 = fsql( - """SELECT a+{{d}} AS a FROM x YIELD DATAFRAME AS y""", result1, d=d - ) - # pass the specific dataframe - result3 = fsql("""SELECT * FROM df YIELD DATAFRAME AS y""", df=result1["x"]) - wf_assert(result2, lambda dfs: dfs["y"].as_array() == [[1]]) - wf_assert(result3, lambda dfs: dfs["y"].as_array() == [[0]]) - - test_fsql4(d=1) - - @task - def gen_query(): - return """ - CREATE [[0]] SCHEMA a:int YIELD LOCAL DATAFRAME AS x - CREATE [[1]] SCHEMA a:int YIELD FILE AS y - """ - - @flow(retries=3) - def test_fsql5(): - result = fsql(gen_query(), checkpoint=False) - wf_assert(result, lambda dfs: dfs["x"].as_array() == [[0]]) - - test_fsql5() - - @flow() - def test_fsql6(): - fsql("CREATE SCHEMA") - - with raises(SyntaxError): - test_fsql6() - - -def test_transform(): - def t1(df: List[List[Any]]) -> List[List[Any]]: - return df - - # schema: * - def t2(df: List[List[Any]]) -> List[List[Any]]: - return df - - # simplest case - @flow(retries=3) - def test_transform1(): - dfs = fsql("""CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x""") - - result = transform(dfs["x"], t1, schema="*", force_output_fugue_dataframe=True) - wf_assert(result, lambda df: df.as_array() == [[0]]) - - result = transform(dfs["x"], t2, force_output_fugue_dataframe=True) - wf_assert(result, lambda df: df.as_array() == [[0]]) - - test_transform1() - - @task - def provide_func(): - return t2 - - @task - def provide_schema(): - return "*" - - # with dependency - @flow(retries=3) - def test_transform2(): - dfs = fsql("""CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x""") - - result = transform( - dfs["x"], t1, schema=provide_schema(), force_output_fugue_dataframe=True - ) - wf_assert(result, lambda df: df.as_array() == [[0]]) - - result = transform(dfs["x"], provide_func(), force_output_fugue_dataframe=True) - wf_assert(result, lambda df: df.as_array() == [[0]]) - - test_transform2() - - -def test_truncate_name(): - raises(ValueError, lambda: _truncate_name(None)) - assert _truncate_name("abc") == "abc" - assert _truncate_name("abc", max_len=3) == "abc" - assert _truncate_name("abc", max_len=2) == "ab..." - - -def test_normalize_yields(): - assert _normalize_yields(1) == [1] - assert _normalize_yields(None) == [] - assert _normalize_yields((1, 2)) == [1, 2] - assert _normalize_yields([1, 2]) == [1, 2] - - -@task -def wf_assert(data, validator) -> None: - print(validator(data)) - assert validator(data)