Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

address name contraction issues in argo-workflows create #2082

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 148 additions & 24 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
from metaflow.tagging_util import validate_tags
from metaflow.util import get_username, to_bytes, to_unicode, version_parse

from .argo_workflows import ArgoWorkflows
from .argo_workflows import ArgoWorkflows, ArgoWorkflowsException

VALID_NAME = re.compile(r"^[a-z0-9]([a-z0-9\.\-]*[a-z0-9])?$")
VALID_NAME = re.compile(r"^[a-z]([a-z0-9\.\-]*[a-z0-9])?$")

unsupported_decorators = {
"snowpark": "Step *%s* is marked for execution on Snowpark with Argo Workflows which isn't currently supported.",
Expand Down Expand Up @@ -85,7 +85,15 @@ def argo_workflows(obj, name=None):
obj.workflow_name,
obj.token_prefix,
obj.is_project,
obj._is_workflow_name_modified,
) = resolve_workflow_name(obj, name)
# Backward compatibility for Metaflow versions <=2.12 because of
# change in name length restrictions in Argo Workflows from 253 to 52
# characters.
(
obj._v1_workflow_name,
obj._v1_is_workflow_name_modified,
) = resolve_workflow_name_v1(obj, name)


@argo_workflows.command(help="Deploy a new version of this workflow to Argo Workflows.")
Expand Down Expand Up @@ -218,6 +226,7 @@ def create(
deployer_attribute_file=None,
enable_error_msg_capture=False,
):
# TODO: Remove this once we have a proper validator system in place
for node in obj.graph:
for decorator, error_message in unsupported_decorators.items():
if any([d.name == decorator for d in node.decorators]):
Expand All @@ -236,7 +245,7 @@ def create(
f,
)

obj.echo("Deploying *%s* to Argo Workflows..." % obj.workflow_name, bold=True)
obj.echo("Deploying *%s* to Argo Workflows..." % obj.flow.name, bold=True)

if SERVICE_VERSION_CHECK:
# TODO: Consider dispelling with this check since it's been 2 years since the
Expand Down Expand Up @@ -280,7 +289,7 @@ def create(
flow.deploy()
obj.echo(
"Workflow *{workflow_name}* "
"for flow *{name}* pushed to "
"for flow *{name}* deployed to "
"Argo Workflows successfully.\n".format(
workflow_name=obj.workflow_name, name=current.flow_name
),
Expand All @@ -290,8 +299,41 @@ def create(
obj.echo(
"Note that the flow was deployed with a modified name "
"due to Kubernetes naming conventions\non Argo Workflows. The "
"original flow name is stored in the workflow annotation.\n"
"original flow name is stored in the workflow annotations.\n"
)

if obj.workflow_name != obj._v1_workflow_name:
# Delete the old workflow if it exists
try:
ArgoWorkflows.delete(obj._v1_workflow_name)
obj.echo("Important!", bold=True, nl=False)
obj.echo(
" To comply with new naming restrictions on Argo "
"Workflows, this deployment replaced the\npreviously "
"deployed workflow {v1_workflow_name}.\n".format(
v1_workflow_name=obj._v1_workflow_name
Comment on lines +311 to +314
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also check whether v1_workflow_name exists or not, and cut this alert out if we did not replace an old deployment

)
)
except ArgoWorkflowsException as e:
# TODO: Catch a more specific exception
pass

obj.echo("Warning! ", bold=True, nl=False)
obj.echo(
"Due to new naming restrictions on Argo Workflows, "
"re-deploying this flow with older\nversions of Metaflow (<2.13) "
"will result in the flow being deployed with a different name:"
)
obj.echo(
"{v1_workflow_name}\n".format(v1_workflow_name=obj._v1_workflow_name),
indent=True,
)
obj.echo(
"without replacing the existing deployment. This may result in "
"duplicate executions of this flow.\nTo avoid this issue, deploy "
"this flow using Metaflow ≥2.13 or specify the flow name with --name.\n"
)
# TODO: Add proper usage message for --name

if ARGO_WORKFLOWS_UI_URL:
obj.echo("See the deployed workflow here:", bold=True)
Expand Down Expand Up @@ -370,9 +412,93 @@ def check_metadata_service_version(obj):
)


# Argo Workflows has a few restrictions on workflow names:
# - Argo Workflow Template names can't be longer than 253 characters since
# they follow DNS Subdomain name restrictions.
# - Argo Workflows stores workflow template names as a label in the workflow
# template metadata - workflows.argoproj.io/workflow-template, which follows
# RFC 1123, which is a strict subset of DNS Subdomain names and allows for
# 63 characters.
# - Argo Workflows appends a unix timestamp to the workflow name when the workflow
# is created (-1243856725) from a workflow template deployed as a cron workflow template
# reducing the number of characters available to 52.
# - TODO: Check naming restrictions for Argo Events.

# In summary -
# - We truncate the workflow name to 45 characters to leave enough room for future
# enhancements to the Argo Workflows integration.
# - We remove any underscores since Argo Workflows doesn't allow them.
# - We convert the name to lower case.
# - We remove + and @ as not allowed characters, which can be part of the
# project branch due to using email addresses as user names.
# - We append a hash of the workflow name to the end to make it unique.

# A complication here is that in previous versions of Metaflow (=<2.12), the limit was a
# rather lax 253 characters - so we have two issues to contend with:
# 1. Replacing any equivalent flows deployed using previous versions of Metaflow which
# adds a bit of complexity to the business logic.
# 2. Breaking Metaflow users who have multiple versions of Metaflow floating in their
# organization. Imagine a scenario, where metaflow-v1 (253 chars) deploys the same
# flow which was previously deployed using the new metaflow-v2 (45 chars) - the user
# will end up with two workflows templates instead of one since metaflow-v1 has no
# awareness of the new name truncation logic introduced by metaflow-v2. Unfortunately,
# there is no way to avoid this scenario - so we will do our best to message to the
# user to not use an older version of Metaflow to redeploy affected flows.
# ------------------------------------------------------------------------------------------
# | metaflow-v1 (253 chars) | metaflow-v2 (45 chars) | Result |
# ------------------------------------------------------------------------------------------
# | workflow_name_modified = True | workflow_name_modified = False | Not possible |
# ------------------------------------------------------------------------------------------
# | workflow_name_modified = False | workflow_name_modified = True | Messaging needed |
# ------------------------------------------------------------------------------------------
# | workflow_name_modified = False | workflow_name_modified = False | No message needed |
# ------------------------------------------------------------------------------------------
# | workflow_name_modified = True | workflow_name_modified = True | Messaging needed |
# ------------------------------------------------------------------------------------------

# TODO: Verify that all the CLI methods work as before if the user passes a metaflow-v1 name
# using --name argument (except for create)


def resolve_workflow_name_v1(obj, name):
# models the workflow_name calculation logic in Metaflow versions =<2.12
project = current.get("project_name")
is_workflow_name_modified = False
if project:
if name:
return None, False # not possible in versions =<2.12
workflow_name = current.project_flow_name
if len(workflow_name) > 253:
name_hash = to_unicode(
base64.b32encode(sha1(to_bytes(workflow_name)).digest())
)[:8].lower()
workflow_name = "%s-%s" % (workflow_name[:242], name_hash)
is_workflow_name_modified = True
if not VALID_NAME.search(workflow_name):
workflow_name = sanitize_for_argo(workflow_name)
is_workflow_name_modified = True
else:
if name and not VALID_NAME.search(name):
return None, False # not possible in versions =<2.12
workflow_name = name if name else current.flow_name
if len(workflow_name) > 253:
return None, False # not possible in versions =<2.12
if not VALID_NAME.search(workflow_name):
# Note - since sanitize_for_argo() is a surjective mapping,
# using it here is a bug, but we leave this in place
# since the usage of v1_workflow_name is to generate
# historical workflow names, so we need to replicate
# all the bugs too :'(
workflow_name = sanitize_for_argo(workflow_name)
is_workflow_name_modified = True
return workflow_name, is_workflow_name_modified


def resolve_workflow_name(obj, name):
# current logic for imputing workflow_name
limit = 45
project = current.get("project_name")
obj._is_workflow_name_modified = False
is_workflow_name_modified = False
if project:
if name:
raise MetaflowException(
Expand All @@ -385,48 +511,46 @@ def resolve_workflow_name(obj, name):
% to_unicode(base64.b32encode(sha1(project_branch).digest()))[:16]
)
is_project = True
# Argo Workflow names can't be longer than 253 characters, so we truncate
# by default. Also, while project and branch allow for underscores, Argo
# Workflows doesn't (DNS Subdomain names as defined in RFC 1123) - so we will
# remove any underscores as well as convert the name to lower case.
# Also remove + and @ as not allowed characters, which can be part of the
# project branch due to using email addresses as user names.
if len(workflow_name) > 253:
if len(workflow_name) > limit:
name_hash = to_unicode(
base64.b32encode(sha1(to_bytes(workflow_name)).digest())
)[:8].lower()
workflow_name = "%s-%s" % (workflow_name[:242], name_hash)
obj._is_workflow_name_modified = True
)[:5].lower()
# TODO: We can create better names that try to preserve the flow name
workflow_name = "%s-%s" % (workflow_name[: limit - 6], name_hash)
Comment on lines +518 to +519
Copy link
Collaborator

@saikonen saikonen Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use current.flow_name as the base and append the hash which comes from the current.project_flow_name if the flow name is the most significant info in the template name

Though one issue with this would be that validate_run_id relies on checking whether project/branch are part of the generated workflow_name. This needs an overhaul in any case though, as it will most likely start breaking due to the new limits

is_workflow_name_modified = True
if not VALID_NAME.search(workflow_name):
# TODO: create a new sanitize_for_argo_v2() function that is not surjective
# and use it here. Might not be straight forward since it is also used
# in validate_run_id() :(
workflow_name = sanitize_for_argo(workflow_name)
obj._is_workflow_name_modified = True
is_workflow_name_modified = True
else:
if name and not VALID_NAME.search(name):
raise MetaflowException(
"Name '%s' contains invalid characters. The "
"name must consist of lower case alphanumeric characters, '-' or '.'"
", and must start and end with an alphanumeric character." % name
", and must start with an alphabetic character, "
"and end with an alphanumeric character." % name
)

workflow_name = name if name else current.flow_name
token_prefix = workflow_name
is_project = False

if len(workflow_name) > 253:
if len(workflow_name) > limit:
msg = (
"The full name of the workflow:\n*%s*\nis longer than 253 "
"The full name of the workflow:\n*%s*\nis longer than %s "
"characters.\n\n"
"To deploy this workflow to Argo Workflows, please "
"assign a shorter name\nusing the option\n"
"*argo-workflows --name <name> create*." % workflow_name
"*argo-workflows --name <name> create*." % (workflow_name, limit)
)
raise ArgoWorkflowsNameTooLong(msg)

if not VALID_NAME.search(workflow_name):
workflow_name = sanitize_for_argo(workflow_name)
obj._is_workflow_name_modified = True

return workflow_name, token_prefix.lower(), is_project
is_workflow_name_modified = True
return workflow_name, token_prefix.lower(), is_project, is_workflow_name_modified


def make_flow(
Expand Down
Loading