Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Fugue to use Fugue API #9

Merged
merged 2 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
{
"name": "Fugue Development Environment",
"image": "fugueproject/devenv:0.4.4",
"settings": {
"terminal.integrated.shell.linux": "/bin/bash",
"python.pythonPath": "/usr/local/bin/python",
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8",
"python.formatting.blackPath": "/usr/local/py-utils/bin/black",
"python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf",
"python.linting.banditPath": "/usr/local/py-utils/bin/bandit",
"python.linting.flake8Path": "/usr/local/py-utils/bin/flake8",
"python.linting.mypyPath": "/usr/local/py-utils/bin/mypy",
"python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle",
"python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle",
"python.linting.pylintPath": "/usr/local/py-utils/bin/pylint"
"_image": "fugueproject/devenv:0.7.7",
"image": "mcr.microsoft.com/vscode/devcontainers/python:3.10",
"customizations": {
"vscode": {
"settings": {
"terminal.integrated.shell.linux": "/bin/bash",
"python.pythonPath": "/usr/local/bin/python",
"python.defaultInterpreterPath": "/usr/local/bin/python",
"isort.interpreter": [
"/usr/local/bin/python"
],
"flake8.interpreter": [
"/usr/local/bin/python"
],
"pylint.interpreter": [
"/usr/local/bin/python"
]
},
"extensions": [
"ms-python.python",
"ms-python.isort",
"ms-python.flake8",
"ms-python.pylint",
"ms-python.mypy",
"GitHub.copilot",
"njpwerner.autodocstring"
]
}
},
"extensions": [
"ms-python.python"
],
"forwardPorts": [
8888
],
"postCreateCommand": "make devenv",
"mounts": [
"source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind"
]
"features": {
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
"ghcr.io/devcontainers/features/java:1": {
"version": "11"
}
}
}
6 changes: 2 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ jobs:
strategy:
matrix:
python-version:
- "3.7"
- "3.8"
- "3.9"
- "3.10"
Expand All @@ -30,10 +29,9 @@ jobs:
python -m pip install --upgrade --upgrade-strategy eager -e ".[dev]"

- name: Run tests
env:
PREFECT_ORION_DATABASE_CONNECTION_URL: "sqlite+aiosqlite:///./orion-tests.db"
run: |
prefect orion database reset -y
prefect config set PREFECT_API_DATABASE_CONNECTION_URL="sqlite+aiosqlite:///./orion-tests.db"
prefect server database reset -y
coverage run --branch -m pytest tests -vv
coverage report
- name: "Upload coverage to Codecov"
Expand Down
5 changes: 0 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
repos:
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
language_version: python3
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ devenv:
pre-commit install
fugue-jupyter install startup

login:
prefect cloud login -k $(PREFECT_API_KEY)

docs:
rm -rf docs/api
rm -rf docs/build
Expand Down
76 changes: 56 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ The best way to get started with Fugue is to work through the 10 minute tutorial

### Python setup

Requires an installation of Python 3.7+.
Requires an installation of Python 3.8+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the [Prefect documentation](https://orion-docs.prefect.io/).

### Installation

Install `prefect-fugue` with `pip`:
Expand Down Expand Up @@ -80,36 +78,71 @@ For example, a Databricks Block could look like:
}
```

As long as you installed `prefect_fugue`, Fugue is able to recognize and convert a block expression to a `FugueExecutionEngine`. For example if you have a block with path `fugue/databricks`, then the expression `prefect:fugue/databricks` becomes a valid execution engine expression. When fugue parse this expression, it will load the `Block` from `fugue/databricks`, then base on the fields of the block, it will construct a `DatabricksExecutionEngine` for your Fugue operations.

### Using a Spark Cluster Inside a Flow

Let’s start by running code on top of Databricks. `databricks-connect` is already installed in this environment. This section may have a lot of logs because of the monitoring provided by Prefect. This section also assumes that the user has Prefect configured to the right workspace.

Below we have one task that takes in a `SparkSession` and uses it to run some Spark code. We can then use this in the Prefect Flow with the `fugue_engine` context. This `fugue_engine` will create an ephemeral cluster to run the code underneath, and then turn off when finished.
Below we have one task that takes in a `SparkSession` and uses it to run some Spark code. We can then use this in the Prefect Flow with the `fugue.api.engine_context`. This will create an ephemeral cluster to run the code underneath, and then turn off when finished.

```python
from prefect import task, flow
from prefect_fugue import fugue_engine
import fugue.api as fa

@task
def my_spark_task(spark, n=1):
df = spark.createDataFrame([[f"hello spark {n}"]], "a string")
df.show()

@flow
def native_spark_flow(engine):
with fugue_engine(engine) as engine:
my_spark_task(engine.spark_session, 1)
def spark_flow(engine):
with fa.engine_context(engine) as spark_engine:
my_spark_task(spark_engine.spark_session, 1)

native_spark_flow("fugue/databricks")
spark_flow("prefect:fugue/databricks") # pay attention to the engine expression
```

Similarly, if you don’t use Databricks but have your own way to get a `SparkSession`, you can directly pass the `SparkSession` into the Flow. The `fugue_engine` context will be able to interpret this.
Similarly, if you don’t use Databricks but have your own way to get a `SparkSession`, you can directly pass the `SparkSession` into the Flow.

```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

native_spark_flow(spark)
spark_flow(spark)
```

## More Flexibility

`fugue.api.engine_context` creates a context under which all fugue operations will use the context engine by default. This works either the fugue code is directly under the context or inside the tasks or flows that are invoked under this context.

```python
from prefect import task, flow
import fugue.api as fa

def my_transformer(df:pd.DataFrame) -> pd.DataFrame:
return df

@task
def sub_task(path:str):
df = fa.load(path)
df = fa.transform(df, my_transformer, schema="*")
df.save(path+".output.parquet")

@flow
def sub_flow(path:str):
df = fa.load(path)
df = fa.transform(df, my_transformer, schema="*")
df.save(path+".output.parquet")

@flow
def main_flow(path, engine=None):
with fa.engine_context(engine) as spark_engine:
sub_task(path)
sub_flow(path)

main_flow("<local path>") # test locally, all tasks and flows run without Spark
main_flow("<dbfs path>", "prefect:fugue/databricks") # all tasks and flows will run on Databrickes
```

## Testing Locally Before Running Map Jobs on Spark, Dask, and Ray
Expand All @@ -121,7 +154,8 @@ When testing the Flow, we can pass `None` as the engine so everything runs on Pa
```python
from time import sleep
import pandas as pd
from prefect_fugue import transform
import fugue.api as fa
from prefect import task, flow

@task
def create_jobs(n) -> pd.DataFrame:
Expand All @@ -135,7 +169,7 @@ def run_one_job(df:pd.DataFrame) -> pd.DataFrame:
@flow
def run_all_jobs(n, engine=None):
jobs = create_jobs(n)
with fugue_engine(engine):
with fa.engine_context(engine):
return transform(jobs, run_one_job, partition="per_row", as_local=True)
```

Expand All @@ -148,15 +182,15 @@ run_all_jobs(1) # run locally on Pandas
Becasue it succeeded, we can now attach our Fugue Databricks `Block` to run on Databricks. Now we run on 8 jobs, and we’ll see that parallelization from the Spark cluster will make this Flow execute faster.

```python
run_all_jobs(8, "fugue/databricks") # run on databricks
run_all_jobs(8, "prefect:fugue/databricks") # run on databricks
```

There is still some overhead with sending the work, but the time is decreased compared to the expected execution time if ran sequentially (40 seconds).

We can also use local Dask by passing the string `"dask"`. We can also pass a `Dask Client()` or use the Fugue Engine `Block` with [Coiled](https://coiled.io/). More information can be found in the [Coiled cloudprovider docs](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/coiled.html).

```python
run_all_jobs(4, "dask")
run_all_jobs(4, dask_client)
```

## Running SQL on any Spark, Dask, and Duckdb
Expand All @@ -166,16 +200,18 @@ Prototyping locally, and then running the full job on the cluster is also possib
Here we can load in data and perform a query with FugueSQL. FugueSQL has additional keywords such as LOAD and SAVE so we can run everything from loading, processing, and saving all on DuckDB or SparkSQL. More information on FugueSQL can be found in the FugueSQL tutorials.

```python
import fugue.api as fa

@flow
def run_sql_full(top, engine):
with fugue_engine(engine):
fsql("""
def run_sql(top, engine):
with fa.engine_context(engine):
fa.fugue_sql_flow("""
df = LOAD "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet"

SELECT PULocationID, COUNT(*) AS ct FROM df
GROUP BY 1 ORDER BY 2 DESC LIMIT {{top}}
PRINT
""", top=top)
""", top=top).run()
```

To debug locally without SparkSQL, we can use DuckDB as the engine.
Expand All @@ -187,7 +223,7 @@ run_sql(2, "duckdb"); # debug/develop without spark
Again to run on the cluster, we can use the Databricks Block.

```python
run_sql(10, "fugue/databricks")
run_sql(10, "prefect:fugue/databricks")
```

## Resources
Expand Down
Binary file added orion-tests.db
Binary file not shown.
4 changes: 1 addition & 3 deletions prefect_fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from . import _version
from .context import fugue_engine # noqa
from .tasks import fsql, transform # noqa
from .blocks import FugueEngine # noqa
from .blocks import FugueEngine, block_to_fugue_engine # noqa

__version__ = _version.get_versions()["version"]
83 changes: 50 additions & 33 deletions prefect_fugue/blocks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,45 @@
from typing import Any, Dict, Optional

from fugue import (
ExecutionEngine,
make_execution_engine,
parse_execution_engine,
register_execution_engine,
)
from typing import Any, Dict, Optional, Type

from fugue import ExecutionEngine
from fugue._utils.registry import fugue_plugin
from fugue.dev import make_execution_engine
from prefect.blocks.core import Block
from pydantic import Field, SecretStr
from triad import ParamDict, assert_or_throw, run_at_def
from triad import ParamDict


def block_to_fugue_engine(block_type: Type[Block]) -> Any:
"""
Convert a Prefect Block to a Fugue ExecutionEngine

Args:
block (Block): the block to be converted
conf (Any): the config to initialize the block

Returns:
ExecutionEngine: the converted Fugue ExecutionEngine

Example:

.. code-block:: python

from prefect_fugue import block_to_fugue_engine
from fugue.dev import make_execution_engine

@block_to_fugue_engine(MyBlock)
def my_block_to_fugue_engine(
block: MyBlock, conf: Any=None) -> ExecutionEngine:
return make_execution_engine(block.name, conf)

"""
return _block_to_fugue_engine.candidate(
lambda block, *args, **kwargs: isinstance(block, block_type)
) # noqa


@fugue_plugin
def _block_to_fugue_engine(block: Block, conf: Any = None) -> ExecutionEngine:
raise NotImplementedError(f"{type(Block)} to Fugue Engine is not registered")


class FugueEngine(Block):
Expand Down Expand Up @@ -46,28 +77,14 @@ class FugueEngine(Block):
description="A JSON-dict-compatible value",
)

def make_engine(self, custom_conf: Any = None) -> ExecutionEngine:
conf = ParamDict(self.conf)
if self.secret_conf is not None:
for k, v in self.secret_conf.items():
conf[k] = v.get_secret_value()
conf.update(ParamDict(custom_conf))
return make_execution_engine(self.engine, conf)


@parse_execution_engine.candidate(
matcher=lambda engine, conf, **kwargs: isinstance(engine, str) and "/" in engine
)
def _parse_config_engine(engine: Any, conf: Any, **kwargs: Any) -> ExecutionEngine:
block = Block.load(engine)
assert_or_throw(
isinstance(block, FugueEngine), f"{block} is not a FugueEngine config block"
)
return make_execution_engine(block, conf=conf)


@run_at_def
def _register():
register_execution_engine(
FugueEngine, lambda engine, conf, **kwargs: engine.make_engine(conf)
)
@block_to_fugue_engine(FugueEngine)
def _fugue_block_to_fugue_engine(
block: FugueEngine, conf: Any = None
) -> ExecutionEngine:
_conf = ParamDict(block.conf)
if block.secret_conf is not None:
for k, v in block.secret_conf.items():
_conf[k] = v.get_secret_value()
_conf.update(ParamDict(conf))
return make_execution_engine(block.engine, _conf)
Loading