Skip to content

Commit

Permalink
Add support for execution name prefixes (#1803)
Browse files Browse the repository at this point in the history
Signed-off-by: troychiu <[email protected]>
  • Loading branch information
troychiu authored Aug 21, 2023
1 parent b49867a commit 22d1f30
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 9 deletions.
34 changes: 27 additions & 7 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,8 @@ def _execute(
inputs: typing.Dict[str, typing.Any],
project: str = None,
domain: str = None,
execution_name: str = None,
execution_name: typing.Optional[str] = None,
execution_name_prefix: typing.Optional[str] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
Expand All @@ -978,7 +979,10 @@ def _execute(
:param tags: Tags to set for the execution.
:returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution`
"""
execution_name = execution_name or "f" + uuid.uuid4().hex[:19]
if execution_name is not None and execution_name_prefix is not None:
raise ValueError("Only one of execution_name and execution_name_prefix can be set, but got both set")
execution_name_prefix = execution_name_prefix + "-" if execution_name_prefix is not None else None
execution_name = execution_name or (execution_name_prefix or "f") + uuid.uuid4().hex[:19]
if not options:
options = Options()
if options.disable_notifications is not None:
Expand Down Expand Up @@ -1092,7 +1096,8 @@ def execute(
domain: str = None,
name: str = None,
version: str = None,
execution_name: str = None,
execution_name: typing.Optional[str] = None,
execution_name_prefix: typing.Optional[str] = None,
image_config: typing.Optional[ImageConfig] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
Expand Down Expand Up @@ -1153,6 +1158,7 @@ def execute(
project=project,
domain=domain,
execution_name=execution_name,
execution_name_prefix=execution_name_prefix,
options=options,
wait=wait,
type_hints=type_hints,
Expand All @@ -1167,6 +1173,7 @@ def execute(
project=project,
domain=domain,
execution_name=execution_name,
execution_name_prefix=execution_name_prefix,
options=options,
wait=wait,
type_hints=type_hints,
Expand All @@ -1183,6 +1190,7 @@ def execute(
name=name,
version=version,
execution_name=execution_name,
execution_name_prefix=execution_name_prefix,
image_config=image_config,
wait=wait,
overwrite_cache=overwrite_cache,
Expand All @@ -1198,6 +1206,7 @@ def execute(
name=name,
version=version,
execution_name=execution_name,
execution_name_prefix=execution_name_prefix,
image_config=image_config,
options=options,
wait=wait,
Expand All @@ -1213,6 +1222,7 @@ def execute(
project=project,
domain=domain,
execution_name=execution_name,
execution_name_prefix=execution_name_prefix,
options=options,
wait=wait,
overwrite_cache=overwrite_cache,
Expand All @@ -1230,7 +1240,8 @@ def execute_remote_task_lp(
inputs: typing.Dict[str, typing.Any],
project: str = None,
domain: str = None,
execution_name: str = None,
execution_name: typing.Optional[str] = None,
execution_name_prefix: typing.Optional[str] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
Expand All @@ -1248,6 +1259,7 @@ def execute_remote_task_lp(
project=project,
domain=domain,
execution_name=execution_name,
execution_name_prefix=execution_name_prefix,
wait=wait,
options=options,
type_hints=type_hints,
Expand All @@ -1262,7 +1274,8 @@ def execute_remote_wf(
inputs: typing.Dict[str, typing.Any],
project: str = None,
domain: str = None,
execution_name: str = None,
execution_name: typing.Optional[str] = None,
execution_name_prefix: typing.Optional[str] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
Expand All @@ -1281,6 +1294,7 @@ def execute_remote_wf(
project=project,
domain=domain,
execution_name=execution_name,
execution_name_prefix=execution_name_prefix,
options=options,
wait=wait,
type_hints=type_hints,
Expand All @@ -1300,7 +1314,8 @@ def execute_local_task(
domain: str = None,
name: str = None,
version: str = None,
execution_name: str = None,
execution_name: typing.Optional[str] = None,
execution_name_prefix: typing.Optional[str] = None,
image_config: typing.Optional[ImageConfig] = None,
wait: bool = False,
overwrite_cache: typing.Optional[bool] = None,
Expand Down Expand Up @@ -1346,6 +1361,7 @@ def execute_local_task(
project=resolved_identifiers.project,
domain=resolved_identifiers.domain,
execution_name=execution_name,
execution_name_prefix=execution_name_prefix,
wait=wait,
type_hints=entity.python_interface.inputs,
overwrite_cache=overwrite_cache,
Expand All @@ -1361,7 +1377,8 @@ def execute_local_workflow(
domain: str = None,
name: str = None,
version: str = None,
execution_name: str = None,
execution_name: typing.Optional[str] = None,
execution_name_prefix: typing.Optional[str] = None,
image_config: typing.Optional[ImageConfig] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
Expand Down Expand Up @@ -1425,6 +1442,7 @@ def execute_local_workflow(
project=project,
domain=domain,
execution_name=execution_name,
execution_name_prefix=execution_name_prefix,
wait=wait,
options=options,
type_hints=entity.python_interface.inputs,
Expand All @@ -1441,6 +1459,7 @@ def execute_local_launch_plan(
project: typing.Optional[str] = None,
domain: typing.Optional[str] = None,
execution_name: typing.Optional[str] = None,
execution_name_prefix: typing.Optional[str] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
overwrite_cache: typing.Optional[bool] = None,
Expand Down Expand Up @@ -1482,6 +1501,7 @@ def execute_local_launch_plan(
project=project,
domain=domain,
execution_name=execution_name,
execution_name_prefix=execution_name_prefix,
options=options,
wait=wait,
type_hints=entity.python_interface.inputs,
Expand Down
56 changes: 54 additions & 2 deletions tests/flytekit/unit/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@
import pathlib
import tempfile
import typing
import uuid
from collections import OrderedDict
from datetime import datetime, timedelta

import mock
import pytest
from flyteidl.core import compiler_pb2 as _compiler_pb2
from flyteidl.service import dataproxy_pb2
from mock import MagicMock, patch
from mock import ANY, MagicMock, patch

import flytekit.configuration
from flytekit import CronSchedule, LaunchPlan, task, workflow
from flytekit.configuration import Config, DefaultImages, ImageConfig
from flytekit.configuration import Config, DefaultImages, Image, ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.core.context_manager import FlyteContextManager
from flytekit.core.type_engine import TypeEngine
Expand All @@ -25,6 +26,7 @@
from flytekit.models.core.identifier import Identifier, ResourceType, WorkflowExecutionIdentifier
from flytekit.models.execution import Execution
from flytekit.models.task import Task
from flytekit.remote import FlyteTask
from flytekit.remote.lazy_entity import LazyEntity
from flytekit.remote.remote import FlyteRemote
from flytekit.tools.translator import Options, get_serializable, get_serializable_launch_plan
Expand Down Expand Up @@ -373,3 +375,53 @@ def test_local_server(mock_client):
)
lr = rr.get("flyte://v1/flytesnacks/development/f6988c7bdad554a4da7a/n0/o")
assert lr.get("hello", int) == 55


@mock.patch("flytekit.remote.remote.uuid")
@mock.patch("flytekit.remote.remote.FlyteRemote.client")
def test_execution_name(mock_client, mock_uuid):
test_uuid = uuid.UUID("16fd2706-8baf-433b-82eb-8c7fada847da")
mock_uuid.uuid4.return_value = test_uuid
remote = FlyteRemote(config=Config.auto(), default_project="project", default_domain="domain")

default_img = Image(name="default", fqn="test", tag="tag")
serialization_settings = SerializationSettings(
project="project",
domain="domain",
version="version",
env=None,
image_config=ImageConfig(default_image=default_img, images=[default_img]),
)
tk_spec = get_serializable(OrderedDict(), serialization_settings, tk)
ft = FlyteTask.promote_from_model(tk_spec.template)

remote._execute(
entity=ft,
inputs={"t": datetime.now(), "v": 0},
execution_name="execution-test",
)
remote._execute(
entity=ft,
inputs={"t": datetime.now(), "v": 0},
execution_name_prefix="execution-test",
)
remote._execute(
entity=ft,
inputs={"t": datetime.now(), "v": 0},
)
mock_client.create_execution.assert_has_calls(
[
mock.call(ANY, ANY, "execution-test", ANY, ANY),
mock.call(ANY, ANY, "execution-test-" + test_uuid.hex[:19], ANY, ANY),
mock.call(ANY, ANY, "f" + test_uuid.hex[:19], ANY, ANY),
]
)
with pytest.raises(
ValueError, match="Only one of execution_name and execution_name_prefix can be set, but got both set"
):
remote._execute(
entity=ft,
inputs={"t": datetime.now(), "v": 0},
execution_name="execution-test",
execution_name_prefix="execution-test",
)

0 comments on commit 22d1f30

Please sign in to comment.