diff --git a/flytekit/__init__.py b/flytekit/__init__.py index f48d21d51a..e39bc204b3 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -2,4 +2,4 @@ import flytekit.plugins -__version__ = '0.9.4' +__version__ = '0.10.0' diff --git a/flytekit/common/constants.py b/flytekit/common/constants.py index 34c30e2cad..9b6ab77a35 100644 --- a/flytekit/common/constants.py +++ b/flytekit/common/constants.py @@ -22,6 +22,8 @@ class SdkTaskType(object): SENSOR_TASK = "sensor-task" PRESTO_TASK = "presto" PYTORCH_TASK = "pytorch" + # Raw container task is just a name, it defaults to using the regular container task (like python etc), but sets the data_config in the container + RAW_CONTAINER_TASK = "raw-container" GLOBAL_INPUT_NODE_ID = '' diff --git a/flytekit/common/tasks/presto_task.py b/flytekit/common/tasks/presto_task.py index dfc4b5b47f..0c3a4cca41 100644 --- a/flytekit/common/tasks/presto_task.py +++ b/flytekit/common/tasks/presto_task.py @@ -1,24 +1,20 @@ from __future__ import absolute_import -import six as _six - +import datetime as _datetime from google.protobuf.json_format import MessageToDict as _MessageToDict -from flytekit import __version__ +from flytekit import __version__ from flytekit.common import constants as _constants +from flytekit.common import interface as _interface +from flytekit.common.exceptions import scopes as _exception_scopes from flytekit.common.tasks import task as _base_task from flytekit.models import ( interface as _interface_model ) from flytekit.models import literals as _literals, types as _types, \ task as _task_model - -from flytekit.common import interface as _interface -import datetime as _datetime from flytekit.models import presto as _presto_models -from flytekit.common.types import helpers as _type_helpers -from flytekit.common.exceptions import scopes as _exception_scopes class SdkPrestoTask(_base_task.SdkTask): @@ -39,7 +35,6 @@ def __init__( discovery_version=None, retries=1, timeout=None, - deprecated=None ): """ :param Text statement: Presto query specification @@ -52,8 +47,6 @@ def __init__( :param Text discovery_version: String describing the version for task discovery purposes :param int retries: Number of retries to attempt :param datetime.timedelta timeout: - :param Text deprecated: This string can be used to mark the task as deprecated. Consumers of the task will - receive deprecation warnings. """ # Set as class fields which are used down below to configure implicit @@ -72,7 +65,7 @@ def __init__( _literals.RetryStrategy(retries), interruptible, discovery_version, - deprecated + "This is deprecated!" ) presto_query = _presto_models.PrestoQuery( @@ -117,38 +110,16 @@ def __init__( # Set user provided inputs task_inputs(self) - def _add_implicit_inputs(self, inputs): - """ - :param dict[Text,Any] inputs: - :param inputs: - :return: - """ - inputs["__implicit_routing_group"] = self.routing_group - inputs["__implicit_catalog"] = self.catalog - inputs["__implicit_schema"] = self.schema - return inputs - # Override method in order to set the implicit inputs def __call__(self, *args, **kwargs): - kwargs = self._add_implicit_inputs(kwargs) + kwargs["__implicit_routing_group"] = self.routing_group + kwargs["__implicit_catalog"] = self.catalog + kwargs["__implicit_schema"] = self.schema return super(SdkPrestoTask, self).__call__( *args, **kwargs ) - # Override method in order to set the implicit inputs - def _python_std_input_map_to_literal_map(self, inputs): - """ - :param dict[Text,Any] inputs: A dictionary of Python standard inputs that will be type-checked and compiled - to a LiteralMap - :rtype: flytekit.models.literals.LiteralMap - """ - inputs = self._add_implicit_inputs(inputs) - return _type_helpers.pack_python_std_map_to_literal_map(inputs, { - k: _type_helpers.get_sdk_type_from_literal_type(v.type) - for k, v in _six.iteritems(self.interface.inputs) - }) - @_exception_scopes.system_entry_point def add_inputs(self, inputs): """ diff --git a/flytekit/common/tasks/raw_container.py b/flytekit/common/tasks/raw_container.py new file mode 100644 index 0000000000..a352a7f63a --- /dev/null +++ b/flytekit/common/tasks/raw_container.py @@ -0,0 +1,253 @@ +from __future__ import absolute_import + +import datetime as _datetime +from typing import Dict, List + +from flytekit import __version__ +from flytekit.common import constants as _constants +from flytekit.common import interface as _interface +from flytekit.common.exceptions import scopes as _exception_scopes +from flytekit.common.tasks import task as _base_task +from flytekit.common.types.base_sdk_types import FlyteSdkType +from flytekit.configuration import resources as _resource_config +from flytekit.models import literals as _literals, task as _task_models +from flytekit.models.interface import Variable + + +def types_to_variable(t: Dict[str, FlyteSdkType]) -> Dict[str, Variable]: + var = {} + if t: + for k, v in t.items(): + var[k] = Variable(v.to_flyte_literal_type(), "") + return var + + +def _get_container_definition( + image: str, + command: List[str], + args: List[str], + data_loading_config: _task_models.DataLoadingConfig, + storage_request: str = None, + cpu_request: str = None, + gpu_request: str = None, + memory_request: str = None, + storage_limit: str = None, + cpu_limit: str = None, + gpu_limit: str = None, + memory_limit: str = None, + environment: Dict[str, str] = None, +) -> _task_models.Container: + storage_limit = storage_limit or _resource_config.DEFAULT_STORAGE_LIMIT.get() + storage_request = storage_request or _resource_config.DEFAULT_STORAGE_REQUEST.get() + cpu_limit = cpu_limit or _resource_config.DEFAULT_CPU_LIMIT.get() + cpu_request = cpu_request or _resource_config.DEFAULT_CPU_REQUEST.get() + gpu_limit = gpu_limit or _resource_config.DEFAULT_GPU_LIMIT.get() + gpu_request = gpu_request or _resource_config.DEFAULT_GPU_REQUEST.get() + memory_limit = memory_limit or _resource_config.DEFAULT_MEMORY_LIMIT.get() + memory_request = memory_request or _resource_config.DEFAULT_MEMORY_REQUEST.get() + + requests = [] + if storage_request: + requests.append( + _task_models.Resources.ResourceEntry( + _task_models.Resources.ResourceName.STORAGE, + storage_request + ) + ) + if cpu_request: + requests.append( + _task_models.Resources.ResourceEntry( + _task_models.Resources.ResourceName.CPU, + cpu_request + ) + ) + if gpu_request: + requests.append( + _task_models.Resources.ResourceEntry( + _task_models.Resources.ResourceName.GPU, + gpu_request + ) + ) + if memory_request: + requests.append( + _task_models.Resources.ResourceEntry( + _task_models.Resources.ResourceName.MEMORY, + memory_request + ) + ) + + limits = [] + if storage_limit: + limits.append( + _task_models.Resources.ResourceEntry( + _task_models.Resources.ResourceName.STORAGE, + storage_limit + ) + ) + if cpu_limit: + limits.append( + _task_models.Resources.ResourceEntry( + _task_models.Resources.ResourceName.CPU, + cpu_limit + ) + ) + if gpu_limit: + limits.append( + _task_models.Resources.ResourceEntry( + _task_models.Resources.ResourceName.GPU, + gpu_limit + ) + ) + if memory_limit: + limits.append( + _task_models.Resources.ResourceEntry( + _task_models.Resources.ResourceName.MEMORY, + memory_limit + ) + ) + + if environment is None: + environment = {} + + return _task_models.Container( + image=image, + command=command, + args=args, + resources=_task_models.Resources(limits=limits, requests=requests), + env=environment, + config={}, + data_loading_config=data_loading_config, + ) + + +class SdkRawContainerTask(_base_task.SdkTask): + """ + Use this task when you want to run an arbitrary container as a task (e.g. external tools, binaries compiled + separately as a container completely separate from the container where your Flyte workflow is defined. + """ + METADATA_FORMAT_JSON = _task_models.DataLoadingConfig.LITERALMAP_FORMAT_JSON + METADATA_FORMAT_YAML = _task_models.DataLoadingConfig.LITERALMAP_FORMAT_YAML + METADATA_FORMAT_PROTO = _task_models.DataLoadingConfig.LITERALMAP_FORMAT_PROTO + + def __init__( + self, + inputs: Dict[str, FlyteSdkType], + image: str, + outputs: Dict[str, FlyteSdkType] = None, + input_data_dir: str = None, + output_data_dir: str = None, + metadata_format: int = METADATA_FORMAT_JSON, + io_strategy: _task_models.IOStrategy=None, + command: List[str] = None, + args: List[str] = None, + storage_request: str = None, + cpu_request: str = None, + gpu_request: str = None, + memory_request: str = None, + storage_limit: str = None, + cpu_limit: str = None, + gpu_limit: str = None, + memory_limit: str = None, + environment: Dict[str, str] = None, + interruptible: bool = False, + discoverable: bool = False, + discovery_version: str = None, + retries: int = 1, + timeout: _datetime.timedelta = None, + ): + """ + :param inputs: + :param outputs: + :param image: + :param command: + :param args: + :param storage_request: + :param cpu_request: + :param gpu_request: + :param memory_request: + :param storage_limit: + :param cpu_limit: + :param gpu_limit: + :param memory_limit: + :param environment: + :param interruptible: + :param discoverable: + :param discovery_version: + :param retries: + :param timeout: + :param input_data_dir: This is the directory where data will be downloaded to + :param output_data_dir: This is the directory where data will be uploaded from + :param metadata_format: Format in which the metadata will be available for the script + """ + + # Set as class fields which are used down below to configure implicit + # parameters + self._data_loading_config = _task_models.DataLoadingConfig( + input_path=input_data_dir, + output_path=output_data_dir, + format=metadata_format, + enabled=True, + io_strategy=io_strategy, + ) + + metadata = _task_models.TaskMetadata( + discoverable, + # This needs to have the proper version reflected in it + _task_models.RuntimeMetadata( + _task_models.RuntimeMetadata.RuntimeType.FLYTE_SDK, __version__, + "python"), + timeout or _datetime.timedelta(seconds=0), + _literals.RetryStrategy(retries), + interruptible, + discovery_version, + None + ) + + # The interface is defined using the inputs and outputs + i = _interface.TypedInterface(inputs=types_to_variable(inputs), outputs=types_to_variable(outputs)) + + # This sets the base SDKTask with container etc + super(SdkRawContainerTask, self).__init__( + _constants.SdkTaskType.RAW_CONTAINER_TASK, + metadata, + i, + None, + container=_get_container_definition( + image=image, + args=args, + command=command, + data_loading_config=self._data_loading_config, + storage_request=storage_request, + cpu_request=cpu_request, + gpu_request=gpu_request, + memory_request=memory_request, + storage_limit=storage_limit, + cpu_limit=cpu_limit, + gpu_limit=gpu_limit, + memory_limit=memory_limit, + environment=environment, + ) + ) + + + @_exception_scopes.system_entry_point + def add_inputs(self, inputs: Dict[str, Variable]): + """ + Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given + name is added more than once, a name collides with an output, or if the name doesn't exist as an arg name in + the wrapped function. + :param dict[Text, flytekit.models.interface.Variable] inputs: names and variables + """ + self._validate_inputs(inputs) + self.interface.inputs.update(inputs) + + @_exception_scopes.system_entry_point + def add_outputs(self, outputs: Dict[str, Variable]): + """ + Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given + name is added more than once, a name collides with an output, or if the name doesn't exist as an arg name in + the wrapped function. + :param dict[Text, flytekit.models.interface.Variable] outputs: names and variables + """ + self._validate_outputs(outputs) + self.interface.outputs.update(outputs) diff --git a/flytekit/common/tasks/task.py b/flytekit/common/tasks/task.py index 64149bf41a..89166460e6 100644 --- a/flytekit/common/tasks/task.py +++ b/flytekit/common/tasks/task.py @@ -279,7 +279,7 @@ def _produce_deterministic_version(self, version=None): :return Text: """ - if self.container is not None and self.container.data_config is None: + if self.container is not None and self.container.data_loading_config is None: # Only in the case of raw container tasks (which are the only valid tasks with container definitions that # can assign a client-side task version) their data config will be None. raise ValueError("Client-side task versions are not supported for {} task type".format(self.type)) diff --git a/flytekit/models/task.py b/flytekit/models/task.py index 67187b5070..bf849f9ecf 100644 --- a/flytekit/models/task.py +++ b/flytekit/models/task.py @@ -16,7 +16,6 @@ class Resources(_common.FlyteIdlEntity): - class ResourceName(object): UNKNOWN = _core_task.Resources.UNKNOWN CPU = _core_task.Resources.CPU @@ -112,7 +111,6 @@ def from_flyte_idl(cls, pb2_object): class RuntimeMetadata(_common.FlyteIdlEntity): - class RuntimeType(object): OTHER = 0 FLYTE_SDK = 1 @@ -177,7 +175,8 @@ def from_flyte_idl(cls, pb2_object): class TaskMetadata(_common.FlyteIdlEntity): - def __init__(self, discoverable, runtime, timeout, retries, interruptible, discovery_version, deprecated_error_message): + def __init__(self, discoverable, runtime, timeout, retries, interruptible, discovery_version, + deprecated_error_message): """ Information needed at runtime to determine behavior such as whether or not outputs are discoverable, timeouts, and retries. @@ -650,7 +649,7 @@ def from_flyte_idl(cls, pb2_object): application_type = _spark_type.R return cls( - type= application_type, + type=application_type, spark_conf=pb2_object.sparkConf, application_file=pb2_object.mainApplicationFile, main_class=pb2_object.mainClass, @@ -659,20 +658,96 @@ def from_flyte_idl(cls, pb2_object): ) +class IOStrategy(_common.FlyteIdlEntity): + """ + Provides methods to manage data in and out of the Raw container using Download Modes. This can only be used if DataLoadingConfig is enabled. + """ + DOWNLOAD_MODE_EAGER = _core_task.IOStrategy.DOWNLOAD_EAGER + DOWNLOAD_MODE_STREAM = _core_task.IOStrategy.DOWNLOAD_STREAM + DOWNLOAD_MODE_NO_DOWNLOAD = _core_task.IOStrategy.DO_NOT_DOWNLOAD + + UPLOAD_MODE_EAGER = _core_task.IOStrategy.UPLOAD_EAGER + UPLOAD_MODE_ON_EXIT = _core_task.IOStrategy.UPLOAD_ON_EXIT + UPLOAD_MODE_NO_UPLOAD = _core_task.IOStrategy.DO_NOT_UPLOAD + + def __init__(self, + download_mode: _core_task.IOStrategy.DownloadMode=DOWNLOAD_MODE_EAGER, + upload_mode: _core_task.IOStrategy.UploadMode=UPLOAD_MODE_ON_EXIT): + self._download_mode = download_mode + self._upload_mode = upload_mode + + def to_flyte_idl(self) -> _core_task.IOStrategy: + return _core_task.IOStrategy( + download_mode=self._download_mode, + upload_mode=self._upload_mode + ) + + @classmethod + def from_flyte_idl(cls, pb2_object: _core_task.IOStrategy): + if pb2_object is None: + return None + return cls( + download_mode=pb2_object.download_mode, + upload_mode=pb2_object.upload_mode, + ) + + +class DataLoadingConfig(_common.FlyteIdlEntity): + LITERALMAP_FORMAT_PROTO = _core_task.DataLoadingConfig.PROTO + LITERALMAP_FORMAT_JSON = _core_task.DataLoadingConfig.JSON + LITERALMAP_FORMAT_YAML = _core_task.DataLoadingConfig.YAML + _LITERALMAP_FORMATS = frozenset([LITERALMAP_FORMAT_JSON, LITERALMAP_FORMAT_PROTO, LITERALMAP_FORMAT_YAML]) + + def __init__(self, input_path: str, output_path: str, enabled: bool = True, + format: _core_task.DataLoadingConfig.LiteralMapFormat = LITERALMAP_FORMAT_PROTO, io_strategy: IOStrategy=None): + if format not in self._LITERALMAP_FORMATS: + raise ValueError( + "Metadata format {} not supported. Should be one of {}".format(format, self._LITERALMAP_FORMATS)) + self._input_path = input_path + self._output_path = output_path + self._enabled = enabled + self._format = format + self._io_strategy = io_strategy + + def to_flyte_idl(self) -> _core_task.DataLoadingConfig: + return _core_task.DataLoadingConfig( + input_path=self._input_path, + output_path=self._output_path, + format=self._format, + enabled=self._enabled, + io_strategy=self._io_strategy.to_flyte_idl() if self._io_strategy is not None else None, + ) + + @classmethod + def from_flyte_idl(cls, pb2: _core_task.DataLoadingConfig): + # TODO use python 3.7+ only and then https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel -> DataLoadingConfig: + if pb2 is None: + return None + return cls( + input_path=pb2.input_path, + output_path=pb2.output_path, + enabled=pb2.enabled, + format=pb2.format, + io_strategy=IOStrategy.from_flyte_idl(pb2.io_strategy) if pb2.HasField("io_strategy") else None, + ) + + class Container(_common.FlyteIdlEntity): - def __init__(self, image, command, args, resources, env, config): + def __init__(self, image, command, args, resources, env, config, data_loading_config=None): """ - This defines a container target. It will execute the appropriate command line on the appropriate image with - the given configurations. + This defines a container target. It will execute the appropriate command line on the appropriate image with + the given configurations. - :param Text image: The fully-qualified identifier for the image. - :param list[Text] command: A list of 'words' for the command. i.e. ['aws', 's3', 'ls'] - :param list[Text] args: A list of arguments for the command. i.e. ['s3://some/path', '/tmp/local/path'] - :param Resources resources: A definition of requisite compute resources. - :param dict[Text, Text] env: A definition of key-value pairs for environment variables. - :param dict[Text, Text] config: A definition of configuration key-value pairs. - """ + :param Text image: The fully-qualified identifier for the image. + :param list[Text] command: A list of 'words' for the command. i.e. ['aws', 's3', 'ls'] + :param list[Text] args: A list of arguments for the command. i.e. ['s3://some/path', '/tmp/local/path'] + :param Resources resources: A definition of requisite compute resources. + :param dict[Text, Text] env: A definition of key-value pairs for environment variables. + :param dict[Text, Text] config: A definition of configuration key-value pairs. + :type DataLoadingConfig data_loading_config: object + """ + self._data_loading_config = data_loading_config self._image = image self._command = command self._args = args @@ -730,6 +805,13 @@ def config(self): """ return self._config + @property + def data_loading_config(self): + """ + :rtype: DataLoadingConfig + """ + return self._data_loading_config + def to_flyte_idl(self): """ :rtype: flyteidl.core.tasks_pb2.Container @@ -740,7 +822,8 @@ def to_flyte_idl(self): args=self.args, resources=self.resources.to_flyte_idl(), env=[_literals_pb2.KeyValuePair(key=k, value=v) for k, v in _six.iteritems(self.env)], - config=[_literals_pb2.KeyValuePair(key=k, value=v) for k, v in _six.iteritems(self.config)] + config=[_literals_pb2.KeyValuePair(key=k, value=v) for k, v in _six.iteritems(self.config)], + data_config=self._data_loading_config.to_flyte_idl() if self._data_loading_config else None, ) @classmethod @@ -755,7 +838,9 @@ def from_flyte_idl(cls, pb2_object): args=pb2_object.args, resources=Resources.from_flyte_idl(pb2_object.resources), env={kv.key: kv.value for kv in pb2_object.env}, - config={kv.key: kv.value for kv in pb2_object.config} + config={kv.key: kv.value for kv in pb2_object.config}, + data_loading_config=DataLoadingConfig.from_flyte_idl(pb2_object.data_config) + if pb2_object.HasField("data_config") else None, ) diff --git a/sample-notebooks/image.py b/sample-notebooks/image.py new file mode 100644 index 0000000000..d21323349c --- /dev/null +++ b/sample-notebooks/image.py @@ -0,0 +1,19 @@ +import cv2 +import sys + +def filter_edges(input_image_path: str, output_image_path: str): + print("Reading {}".format(input_image_path)) + img = cv2.imread(input_image_path, 0) + if img is None: + raise Exception("Failed to read image") + edges = cv2.Canny(img, 50, 200) # hysteresis thresholds + cv2.imwrite(output_image_path, edges) + return output_image_path + +if __name__ == "__main__": + inp = sys.argv[1] + out = sys.argv[2] + out = "{}.png".format(out) + print("filtering only edges from {}\n".format(inp)) + filter_edges(inp, out) + print("Done, created {}".format(out)) diff --git a/sample-notebooks/notebook.config b/sample-notebooks/notebook.config new file mode 100644 index 0000000000..3688a1e31c --- /dev/null +++ b/sample-notebooks/notebook.config @@ -0,0 +1,10 @@ +[sdk] +workflow_packages=tests.flytekit.common.workflows +python_venv=service_venv + +[auth] +assumable_iam_role=arn:aws:iam::test + +[platform] +url=localhost:30081 +insecure=True diff --git a/sample-notebooks/raw-container-shell.ipynb b/sample-notebooks/raw-container-shell.ipynb new file mode 100644 index 0000000000..8353dfafef --- /dev/null +++ b/sample-notebooks/raw-container-shell.ipynb @@ -0,0 +1,186 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from flytekit.configuration import set_flyte_config_file, platform\n", + "set_flyte_config_file(\"notebook.config\")\n", + "\n", + "print(\"Connected to {}\".format(platform.URL.get()))\n", + "\n", + "def print_console_url(exc):\n", + " print(\"http://{}/console/projects/{}/domains/{}/executions/{}\".format(platform.URL.get(), exc.id.project, exc.id.domain, exc.id.name))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Demo: Workflow without flytekit dependency in containers\n", + "\n", + "This is a simple workflow where we will show how one can use flytekit to create a workflow that needs not container building, but uses an existing set of container to run logic. It also demonstrates that even in this case memoization, cataloging and input/output types are preserved.\n", + "\n", + "## Step I: Lets declare a couple tasks.\n", + "\n", + "### Task 1: square\n", + "Given an integer returns the squre of the integer" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from flytekit.common.tasks.raw_container import SdkRawContainerTask\n", + "from flytekit.sdk.types import Types\n", + "\n", + "square = SdkRawContainerTask(\n", + " input_data_dir=\"/var/inputs\",\n", + " output_data_dir=\"/var/outputs\",\n", + " inputs={\"val\": Types.Integer},\n", + " outputs={\"out\": Types.Integer},\n", + " image=\"alpine\",\n", + " command=[\"sh\", \"-c\", \"echo $(( {{.Inputs.val}} * {{.Inputs.val}} )) | tee /var/outputs/out\"],\n", + ")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Task 2: sum\n", + "Given two integer x & y returns the sum (x + y)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sum = SdkRawContainerTask(\n", + " input_data_dir=\"/var/flyte/inputs\",\n", + " output_data_dir=\"/var/flyte/outputs\",\n", + " inputs={\"x\": Types.Integer, \"y\": Types.Integer},\n", + " outputs={\"out\": Types.Integer},\n", + " image=\"alpine\",\n", + " command=[\"sh\", \"-c\", \"echo $(( {{.Inputs.x}} + {{.Inputs.y}} )) | tee /var/flyte/outputs/out\"],\n", + ")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step II: Declare a workflow\n", + "#### Sum of Squares\n", + "Given two integers x & y, return (x^2 + y^2)\n", + "This is composed using two functions\n", + " sum_of_squares = sum(square(x),square(y))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from flytekit.sdk.workflow import workflow_class, Input, Output\n", + "\n", + "@workflow_class\n", + "class SumOfSquares(object):\n", + " val1 = Input(Types.Integer)\n", + " val2 = Input(Types.Integer)\n", + " sq1 = square(val=val1)\n", + " sq2 = square(val=val2)\n", + " sm = sum(x=sq1.outputs.out, y=sq2.outputs.out)\n", + " sum_of_squares = Output(sm.outputs.out, sdk_type=Types.Integer)\n", + " \n", + "SumOfSquares_lp = SumOfSquares.create_launch_plan() " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Step III: Register the workflow & launch plan\n", + "Register the workflow under project: flyteexamples " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Currently in flytekit you have to register the tasks first and then register the workflow\n", + "square.register(name=\"square\", project=\"flyteexamples\", domain=\"development\", version=\"1\")\n", + "sum.register(name=\"sum\", project=\"flyteexamples\", domain=\"development\", version=\"1\")\n", + "\n", + "# It will use the name of the tasks to find the registrations\n", + "SumOfSquares.register(name=\"SumOfSquares\", project=\"flyteexamples\", domain=\"development\", version=\"1\")\n", + "SumOfSquares_lp.register(name=\"SumOfSquares\", project=\"flyteexamples\", domain=\"development\", version=\"1\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "exc = SumOfSquares_lp.execute(\"flyteexamples\", \"development\", inputs={\"val1\": 3, \"val2\": 2})\n", + "print_console_url(exc)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "exc.wait_for_completion()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "exc.outputs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.7.4 64-bit ('flytekit': virtualenv)", + "language": "python", + "name": "python37464bitflytekitvirtualenv72cbb5e9968e4a299c6026c09cce8d4c" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/sample-notebooks/raw-container.ipynb b/sample-notebooks/raw-container.ipynb new file mode 100644 index 0000000000..9afa501c96 --- /dev/null +++ b/sample-notebooks/raw-container.ipynb @@ -0,0 +1,292 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 49, + "metadata": {}, + "outputs": [], + "source": [ + "from flytekit.configuration import set_flyte_config_file\n", + "set_flyte_config_file(\"notebook.config\")\n", + "\n", + "def print_console_url(exc):\n", + " print(\"http://localhost:30081/console/projects/{}/domains/{}/executions/{}\".format(exc.id.project, exc.id.domain, exc.id.name))" + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "from botocore.client import Config\n", + "\n", + "s3 = boto3.resource('s3',\n", + " endpoint_url='http://localhost:30084',\n", + " aws_access_key_id='minio',\n", + " aws_secret_access_key='miniostorage',\n", + " config=Config(signature_version='s3v4'),\n", + " region_name='us-east-1')\n", + "\n", + "def upload_file(f, ref):\n", + " mod = ref.lstrip(\"s3://\")\n", + " bucket, path = mod.split(\"/\", 1)\n", + " s3.Bucket(bucket).upload_file('image.py',path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import Image, display\n", + "def display_images(paths):\n", + " for p in paths:\n", + " display(Image(p))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Demo: Remote iteration with an OpenCV Script, using an open source OpenCV Image\n", + "\n", + "We are using the filter_edges method that is available in the adjoining module - image.py\n", + "> filter_edges" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step I: Run the filter_edges example within Jupyter Notebook First" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from image import filter_edges" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "inp = \"image.jpg\" # This should be a path to an image that is available locally \n", + "out = \"edges.png\" # This is the path to where we want to create the image\n", + "filter_edges(inp, out)\n", + "# Invoke the helper method that displays the image in Jupyter\n", + "display_images([inp, out])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step II: Create a Flyte Task for this function\n", + "In this case we will use an SdkRawContainerTask. A raw container task is essentially a container task, where we tell Flyte, that this container does not have flytekit. So all the inputs and outputs should be auto-mounted and uploaded.\n", + "The task can use an open source container `docker.io/jjanzic/docker-python3-opencv`. This container has python and OpenCV already installed.\n", + "\n", + "### Hot load the code\n", + "If you notice the edges task accepts 2 inputs\n", + "\"image\": the image to be converted\n", + "\"script\": the script to execute. It is assumed the script to be executed is available in some bucket (s3) that is accessible by your Flyte task.\n", + "\n", + "### Command\n", + "The most important part is the command\n", + "```python\n", + "[\"python\", \"/inputs/script\", \"/inputs/image\", \"/outputs/edges\"],\n", + "```\n", + "the command is just running python and the passed in script. Note the input name and output name\n", + "The input name is the name of the \"input variable\". If this was a list of images, then this would be a directory.\n", + "The output is also the name of the \"output variable\"\n", + "\n", + "The names are extremely important, as Flyte will only download the image locally to a file that has the name that matches the variable name. Also it will upload a file whose name matches the output variable name. The extension of the file does not matter." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from flytekit.common.tasks.raw_container import SdkRawContainerTask\n", + "from flytekit.sdk.types import Types\n", + "\n", + "edges = SdkRawContainerTask(\n", + " input_data_dir=\"/inputs\",\n", + " output_data_dir=\"/outputs\",\n", + " inputs={\"image\": Types.Blob, \"script\": Types.Blob},\n", + " outputs={\"edges\": Types.Blob},\n", + " image=\"docker.io/jjanzic/docker-python3-opencv\",\n", + " command=[\"python\", \"/inputs/script\", \"/inputs/image\", \"/outputs/edges\"],\n", + ")\n", + "\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step IIIa: Just for this excercise\n", + "To make the dynamic loading of the script work, we have to **upload the script to some s3 bucket**. Since we are testing this locally, I have originally created an s3 client that points to Flyte installed minio. Upload the code there too\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "script_path=\"s3://my-s3-bucket/code/image.py\"\n", + "upload_file(\"image.py\", script_path)\n", + "example_image=\"https://www.naturephotographysimplified.com/wp-content/uploads/2019/06/How-to-get-sharp-images-Birds-in-flight-Bharatpur-Bird-Sanctuary-bird-Photography-by-Prathap-DK-bronze-winged-jacana-Greater-Spotted-Eagle-750x500.jpg\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step III: Launch an execution for the task\n", + "This creates an execution of just the task. Remember in Flyte, Task is a standalone top level entity, so you can execute it" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "exc = edges.register_and_launch(\"flyteexamples\", \"development\", inputs={\"image\":example_image, \"script\":script_path})\n", + "\n", + "print_console_url(exc)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "exc.wait_for_completion()\n", + "#exc.outputs\n", + "# Outputs are not working with Minio" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step IV: Optional. Create a Workflow\n", + "Ofcourse you can use this task in a workflow. We are creating a trivial workflow in this case that has only one task" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from flytekit.sdk.workflow import workflow_class, Input, Output\n", + "@workflow_class\n", + "class EdgeDetector(object):\n", + " script = Input(Types.Blob)\n", + " image = Input(Types.Blob)\n", + " edge_task = edges(script=script, image=image)\n", + " out = Output(edge_task.outputs.edges, sdk_type=Types.Blob)\n", + "\n", + "EdgeDetector_lp = EdgeDetector.create_launch_plan() " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step V: Optional. Register and execute the workflow\n", + "To make the dynamic loading of the script work, we have to upload the script to some s3 bucket. Since we are testing this locally, I have originally created an s3 client that points to Flyte installed minio. Upload the code there too\n", + "```python\n", + "s3.Bucket('my-s3-bucket').upload_file('image.py','code/image.py')\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "edges.register(name=\"EdgeDetectorFunc\", project=\"flyteexamples\", domain=\"development\", version=\"5\")\n", + "EdgeDetector.register(name=\"EdgeDetector\", project=\"flyteexamples\", domain=\"development\", version=\"5\")\n", + "EdgeDetector_lp.register(name=\"EdgeDetector\", project=\"flyteexamples\", domain=\"development\", version=\"5\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "exc = EdgeDetector_lp.execute(\"flyteexamples\", \"development\", inputs={\"image\":example_image, \"script\":script_path})\n", + "print_console_url(exc)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step VI: Visualize the results\n", + "You can retrieve the results and visualize them here" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "key=\"/gn/ff554920363ff4da1903-edge-task-0/edges\"\n", + "s3.Bucket('my-s3-bucket').download_file(key,'edges.png')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "display_images([\"edges.png\"])" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/tests/flytekit/common/workflows/raw_container.py b/tests/flytekit/common/workflows/raw_container.py new file mode 100644 index 0000000000..e260a7acc6 --- /dev/null +++ b/tests/flytekit/common/workflows/raw_container.py @@ -0,0 +1,33 @@ +from __future__ import absolute_import, division, print_function + +from flytekit.common.tasks.raw_container import SdkRawContainerTask +from flytekit.sdk.types import Types +from flytekit.sdk.workflow import workflow_class, Input, Output + +square = SdkRawContainerTask( + input_data_dir="/var/inputs", + output_data_dir="/var/outputs", + inputs={"val": Types.Integer}, + outputs={"out": Types.Integer}, + image="alpine", + command=["sh", "-c", "echo $(( {{.Inputs.val}} * {{.Inputs.val}} )) | tee /var/outputs/out"], +) + +sum = SdkRawContainerTask( + input_data_dir="/var/flyte/inputs", + output_data_dir="/var/flyte/outputs", + inputs={"x": Types.Integer, "y": Types.Integer}, + outputs={"out": Types.Integer}, + image="alpine", + command=["sh", "-c", "echo $(( {{.Inputs.x}} + {{.Inputs.y}} )) | tee /var/flyte/outputs/out"], +) + + +@workflow_class +class RawContainerWorkflow(object): + val1 = Input(Types.Integer) + val2 = Input(Types.Integer) + sq1 = square(val=val1) + sq2 = square(val=val2) + sm = sum(x=sq1.outputs.out, y=sq2.outputs.out) + sum_of_squares = Output(sm.outputs.out, sdk_type=Types.Integer) diff --git a/tests/flytekit/common/workflows/raw_edge_detector.py b/tests/flytekit/common/workflows/raw_edge_detector.py new file mode 100644 index 0000000000..295a846f0f --- /dev/null +++ b/tests/flytekit/common/workflows/raw_edge_detector.py @@ -0,0 +1,20 @@ +from flytekit.common.tasks.raw_container import SdkRawContainerTask +from flytekit.sdk.types import Types +from flytekit.sdk.workflow import workflow_class, Input, Output + +edges = SdkRawContainerTask( + input_data_dir="/inputs", + output_data_dir="/outputs", + inputs={"image": Types.Blob, "script": Types.Blob}, + outputs={"edges": Types.Blob}, + image="jjanzic/docker-python3-opencv", + command=["python", "{{.inputs.script}}", "/inputs/image", "/outputs/edges"], +) + + +@workflow_class +class EdgeDetector(object): + script = Input(Types.Blob) + image = Input(Types.Blob) + edge_task = edges(script=script, image=image) + out = Output(edge_task.outputs.edges, sdk_type=Types.Blob) diff --git a/tests/flytekit/unit/common_tests/tasks/test_raw_container_task.py b/tests/flytekit/unit/common_tests/tasks/test_raw_container_task.py new file mode 100644 index 0000000000..1267ce1c8b --- /dev/null +++ b/tests/flytekit/unit/common_tests/tasks/test_raw_container_task.py @@ -0,0 +1,27 @@ +from flytekit.common.tasks.raw_container import SdkRawContainerTask +from flytekit.sdk.types import Types + + +def test_raw_container_task_definition(): + tk = SdkRawContainerTask( + inputs={"x": Types.Integer}, + outputs={"y": Types.Integer}, + image="my-image", + command=["echo", "hello, world!"], + gpu_limit="1", + gpu_request="1", + ) + assert not tk.serialize() is None + + +def test_raw_container_task_definition_no_outputs(): + tk = SdkRawContainerTask( + inputs={"x": Types.Integer}, + image="my-image", + command=["echo", "hello, world!"], + gpu_limit="1", + gpu_request="1", + ) + assert not tk.serialize() is None + task_instance = tk(x=3) + assert task_instance.inputs[0].binding.scalar.primitive.integer == 3 diff --git a/tests/flytekit/unit/models/test_tasks.py b/tests/flytekit/unit/models/test_tasks.py index ff40307167..ac6694c2dd 100644 --- a/tests/flytekit/unit/models/test_tasks.py +++ b/tests/flytekit/unit/models/test_tasks.py @@ -42,6 +42,7 @@ def test_runtime_metadata(): assert obj != task.RuntimeMetadata(task.RuntimeMetadata.RuntimeType.OTHER, "1.0.0", "python") assert obj != task.RuntimeMetadata(task.RuntimeMetadata.RuntimeType.FLYTE_SDK, "1.0.0", "golang") + def test_task_metadata_interruptible_from_flyte_idl(): # Interruptible not set idl = TaskMetadata() @@ -165,3 +166,20 @@ def test_sidecar_task(): obj2 = task.SidecarJob.from_flyte_idl(obj.to_flyte_idl()) assert obj2 == obj + + +def test_dataloadingconfig(): + dlc = task.DataLoadingConfig("s3://input/path", "s3://output/path", True, + task.DataLoadingConfig.LITERALMAP_FORMAT_YAML) + dlc2 = task.DataLoadingConfig.from_flyte_idl(dlc.to_flyte_idl()) + assert dlc2 == dlc + + dlc = task.DataLoadingConfig("s3://input/path", "s3://output/path", True, + task.DataLoadingConfig.LITERALMAP_FORMAT_YAML, io_strategy=task.IOStrategy()) + dlc2 = task.DataLoadingConfig.from_flyte_idl(dlc.to_flyte_idl()) + assert dlc2 == dlc + + +def test_ioconfig(): + io = task.IOStrategy(task.IOStrategy.DOWNLOAD_MODE_NO_DOWNLOAD, task.IOStrategy.UPLOAD_MODE_NO_UPLOAD) + assert io == task.IOStrategy.from_flyte_idl(io.to_flyte_idl())