diff --git a/metaflow/plugins/argo/argo_workflows_cli.py b/metaflow/plugins/argo/argo_workflows_cli.py index 9abc772e564..fdae0b8a248 100644 --- a/metaflow/plugins/argo/argo_workflows_cli.py +++ b/metaflow/plugins/argo/argo_workflows_cli.py @@ -33,9 +33,9 @@ from metaflow.tagging_util import validate_tags from metaflow.util import get_username, to_bytes, to_unicode, version_parse -from .argo_workflows import ArgoWorkflows +from .argo_workflows import ArgoWorkflows, ArgoWorkflowsException -VALID_NAME = re.compile(r"^[a-z0-9]([a-z0-9\.\-]*[a-z0-9])?$") +VALID_NAME = re.compile(r"^[a-z]([a-z0-9\.\-]*[a-z0-9])?$") unsupported_decorators = { "snowpark": "Step *%s* is marked for execution on Snowpark with Argo Workflows which isn't currently supported.", @@ -85,7 +85,15 @@ def argo_workflows(obj, name=None): obj.workflow_name, obj.token_prefix, obj.is_project, + obj._is_workflow_name_modified, ) = resolve_workflow_name(obj, name) + # Backward compatibility for Metaflow versions <=2.12 because of + # change in name length restrictions in Argo Workflows from 253 to 52 + # characters. + ( + obj._v1_workflow_name, + obj._v1_is_workflow_name_modified, + ) = resolve_workflow_name_v1(obj, name) @argo_workflows.command(help="Deploy a new version of this workflow to Argo Workflows.") @@ -218,6 +226,7 @@ def create( deployer_attribute_file=None, enable_error_msg_capture=False, ): + # TODO: Remove this once we have a proper validator system in place for node in obj.graph: for decorator, error_message in unsupported_decorators.items(): if any([d.name == decorator for d in node.decorators]): @@ -236,7 +245,7 @@ def create( f, ) - obj.echo("Deploying *%s* to Argo Workflows..." % obj.workflow_name, bold=True) + obj.echo("Deploying *%s* to Argo Workflows..." % obj.flow.name, bold=True) if SERVICE_VERSION_CHECK: # TODO: Consider dispelling with this check since it's been 2 years since the @@ -280,7 +289,7 @@ def create( flow.deploy() obj.echo( "Workflow *{workflow_name}* " - "for flow *{name}* pushed to " + "for flow *{name}* deployed to " "Argo Workflows successfully.\n".format( workflow_name=obj.workflow_name, name=current.flow_name ), @@ -290,8 +299,41 @@ def create( obj.echo( "Note that the flow was deployed with a modified name " "due to Kubernetes naming conventions\non Argo Workflows. The " - "original flow name is stored in the workflow annotation.\n" + "original flow name is stored in the workflow annotations.\n" + ) + + if obj.workflow_name != obj._v1_workflow_name: + # Delete the old workflow if it exists + try: + ArgoWorkflows.delete(obj._v1_workflow_name) + obj.echo("Important!", bold=True, nl=False) + obj.echo( + " To comply with new naming restrictions on Argo " + "Workflows, this deployment replaced the\npreviously " + "deployed workflow {v1_workflow_name}.\n".format( + v1_workflow_name=obj._v1_workflow_name + ) + ) + except ArgoWorkflowsException as e: + # TODO: Catch a more specific exception + pass + + obj.echo("Warning! ", bold=True, nl=False) + obj.echo( + "Due to new naming restrictions on Argo Workflows, " + "re-deploying this flow with older\nversions of Metaflow (<2.13) " + "will result in the flow being deployed with a different name:" + ) + obj.echo( + "{v1_workflow_name}\n".format(v1_workflow_name=obj._v1_workflow_name), + indent=True, ) + obj.echo( + "without replacing the existing deployment. This may result in " + "duplicate executions of this flow.\nTo avoid this issue, deploy " + "this flow using Metaflow ≥2.13 or specify the flow name with --name.\n" + ) + # TODO: Add proper usage message for --name if ARGO_WORKFLOWS_UI_URL: obj.echo("See the deployed workflow here:", bold=True) @@ -370,9 +412,93 @@ def check_metadata_service_version(obj): ) +# Argo Workflows has a few restrictions on workflow names: +# - Argo Workflow Template names can't be longer than 253 characters since +# they follow DNS Subdomain name restrictions. +# - Argo Workflows stores workflow template names as a label in the workflow +# template metadata - workflows.argoproj.io/workflow-template, which follows +# RFC 1123, which is a strict subset of DNS Subdomain names and allows for +# 63 characters. +# - Argo Workflows appends a unix timestamp to the workflow name when the workflow +# is created (-1243856725) from a workflow template deployed as a cron workflow template +# reducing the number of characters available to 52. +# - TODO: Check naming restrictions for Argo Events. + +# In summary - +# - We truncate the workflow name to 45 characters to leave enough room for future +# enhancements to the Argo Workflows integration. +# - We remove any underscores since Argo Workflows doesn't allow them. +# - We convert the name to lower case. +# - We remove + and @ as not allowed characters, which can be part of the +# project branch due to using email addresses as user names. +# - We append a hash of the workflow name to the end to make it unique. + +# A complication here is that in previous versions of Metaflow (=<2.12), the limit was a +# rather lax 253 characters - so we have two issues to contend with: +# 1. Replacing any equivalent flows deployed using previous versions of Metaflow which +# adds a bit of complexity to the business logic. +# 2. Breaking Metaflow users who have multiple versions of Metaflow floating in their +# organization. Imagine a scenario, where metaflow-v1 (253 chars) deploys the same +# flow which was previously deployed using the new metaflow-v2 (45 chars) - the user +# will end up with two workflows templates instead of one since metaflow-v1 has no +# awareness of the new name truncation logic introduced by metaflow-v2. Unfortunately, +# there is no way to avoid this scenario - so we will do our best to message to the +# user to not use an older version of Metaflow to redeploy affected flows. +# ------------------------------------------------------------------------------------------ +# | metaflow-v1 (253 chars) | metaflow-v2 (45 chars) | Result | +# ------------------------------------------------------------------------------------------ +# | workflow_name_modified = True | workflow_name_modified = False | Not possible | +# ------------------------------------------------------------------------------------------ +# | workflow_name_modified = False | workflow_name_modified = True | Messaging needed | +# ------------------------------------------------------------------------------------------ +# | workflow_name_modified = False | workflow_name_modified = False | No message needed | +# ------------------------------------------------------------------------------------------ +# | workflow_name_modified = True | workflow_name_modified = True | Messaging needed | +# ------------------------------------------------------------------------------------------ + +# TODO: Verify that all the CLI methods work as before if the user passes a metaflow-v1 name +# using --name argument (except for create) + + +def resolve_workflow_name_v1(obj, name): + # models the workflow_name calculation logic in Metaflow versions =<2.12 + project = current.get("project_name") + is_workflow_name_modified = False + if project: + if name: + return None, False # not possible in versions =<2.12 + workflow_name = current.project_flow_name + if len(workflow_name) > 253: + name_hash = to_unicode( + base64.b32encode(sha1(to_bytes(workflow_name)).digest()) + )[:8].lower() + workflow_name = "%s-%s" % (workflow_name[:242], name_hash) + is_workflow_name_modified = True + if not VALID_NAME.search(workflow_name): + workflow_name = sanitize_for_argo(workflow_name) + is_workflow_name_modified = True + else: + if name and not VALID_NAME.search(name): + return None, False # not possible in versions =<2.12 + workflow_name = name if name else current.flow_name + if len(workflow_name) > 253: + return None, False # not possible in versions =<2.12 + if not VALID_NAME.search(workflow_name): + # Note - since sanitize_for_argo() is a surjective mapping, + # using it here is a bug, but we leave this in place + # since the usage of v1_workflow_name is to generate + # historical workflow names, so we need to replicate + # all the bugs too :'( + workflow_name = sanitize_for_argo(workflow_name) + is_workflow_name_modified = True + return workflow_name, is_workflow_name_modified + + def resolve_workflow_name(obj, name): + # current logic for imputing workflow_name + limit = 45 project = current.get("project_name") - obj._is_workflow_name_modified = False + is_workflow_name_modified = False if project: if name: raise MetaflowException( @@ -385,48 +511,46 @@ def resolve_workflow_name(obj, name): % to_unicode(base64.b32encode(sha1(project_branch).digest()))[:16] ) is_project = True - # Argo Workflow names can't be longer than 253 characters, so we truncate - # by default. Also, while project and branch allow for underscores, Argo - # Workflows doesn't (DNS Subdomain names as defined in RFC 1123) - so we will - # remove any underscores as well as convert the name to lower case. - # Also remove + and @ as not allowed characters, which can be part of the - # project branch due to using email addresses as user names. - if len(workflow_name) > 253: + if len(workflow_name) > limit: name_hash = to_unicode( base64.b32encode(sha1(to_bytes(workflow_name)).digest()) - )[:8].lower() - workflow_name = "%s-%s" % (workflow_name[:242], name_hash) - obj._is_workflow_name_modified = True + )[:5].lower() + # TODO: We can create better names that try to preserve the flow name + workflow_name = "%s-%s" % (workflow_name[: limit - 6], name_hash) + is_workflow_name_modified = True if not VALID_NAME.search(workflow_name): + # TODO: create a new sanitize_for_argo_v2() function that is not surjective + # and use it here. Might not be straight forward since it is also used + # in validate_run_id() :( workflow_name = sanitize_for_argo(workflow_name) - obj._is_workflow_name_modified = True + is_workflow_name_modified = True else: if name and not VALID_NAME.search(name): raise MetaflowException( "Name '%s' contains invalid characters. The " "name must consist of lower case alphanumeric characters, '-' or '.'" - ", and must start and end with an alphanumeric character." % name + ", and must start with an alphabetic character, " + "and end with an alphanumeric character." % name ) workflow_name = name if name else current.flow_name token_prefix = workflow_name is_project = False - if len(workflow_name) > 253: + if len(workflow_name) > limit: msg = ( - "The full name of the workflow:\n*%s*\nis longer than 253 " + "The full name of the workflow:\n*%s*\nis longer than %s " "characters.\n\n" "To deploy this workflow to Argo Workflows, please " "assign a shorter name\nusing the option\n" - "*argo-workflows --name create*." % workflow_name + "*argo-workflows --name create*." % (workflow_name, limit) ) raise ArgoWorkflowsNameTooLong(msg) if not VALID_NAME.search(workflow_name): workflow_name = sanitize_for_argo(workflow_name) - obj._is_workflow_name_modified = True - - return workflow_name, token_prefix.lower(), is_project + is_workflow_name_modified = True + return workflow_name, token_prefix.lower(), is_project, is_workflow_name_modified def make_flow(