Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable or Disable cache for the Compiler #11209

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion sdk/python/kfp/cli/compile_.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import click
from kfp import compiler
from kfp.cli.utils import parsing
from kfp.dsl import base_component
from kfp.dsl import graph_component

Expand Down Expand Up @@ -133,12 +134,19 @@ def parse_parameters(parameters: Optional[str]) -> Dict:
is_flag=True,
default=False,
help='Whether to disable type checking.')
@click.option(
'--enable-caching/--disable-caching',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to --disable-default-caching to maintain precedence with other flags in this file. (like the type_check one)

type=bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to use is_flag functionality here, to maintain precedence (just like type_check for instance).

default=None,
help=parsing.get_param_descr(compiler.Compiler.compile, 'enable_caching'),
)
def compile_(
py: str,
output: str,
function_name: Optional[str] = None,
pipeline_parameters: Optional[str] = None,
disable_type_check: bool = False,
enable_caching: Optional[bool] = None,
) -> None:
"""Compiles a pipeline or component written in a .py file."""
pipeline_func = collect_pipeline_or_component_func(
Expand All @@ -149,7 +157,8 @@ def compile_(
pipeline_func=pipeline_func,
pipeline_parameters=parsed_parameters,
package_path=package_path,
type_check=not disable_type_check)
type_check=not disable_type_check,
enable_caching=enable_caching)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once you update as per https://github.com/kubeflow/pipelines/pull/11209/files#r1759241709,
you would need to change this as well to something like:
enable_caching=not disable_caching_default


click.echo(package_path)

Expand Down
19 changes: 3 additions & 16 deletions sdk/python/kfp/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from kfp.client import auth
from kfp.client import set_volume_credentials
from kfp.client.token_credentials_base import TokenCredentialsBase
from kfp.compiler.compiler import override_caching_options
from kfp.dsl import base_component
from kfp.pipeline_spec import pipeline_spec_pb2
import kfp_server_api
Expand Down Expand Up @@ -955,8 +956,8 @@ def _create_job_config(
# Caching option set at submission time overrides the compile time
# settings.
if enable_caching is not None:
_override_caching_options(pipeline_doc.pipeline_spec,
enable_caching)
override_caching_options(pipeline_doc.pipeline_spec,
enable_caching)
pipeline_spec = pipeline_doc.to_dict()

pipeline_version_reference = None
Expand Down Expand Up @@ -1676,17 +1677,3 @@ def _safe_load_yaml(stream: TextIO) -> _PipelineDoc:
raise ValueError(
f'The package_file {package_file} should end with one of the '
'following formats: [.tar.gz, .tgz, .zip, .yaml, .yml].')


def _override_caching_options(
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
enable_caching: bool,
) -> None:
"""Overrides caching options.

Args:
pipeline_spec: The PipelineSpec object to update in-place.
enable_caching: Overrides options, one of True, False.
"""
for _, task_spec in pipeline_spec.root.dag.tasks.items():
task_spec.caching_options.enable_cache = enable_caching
5 changes: 3 additions & 2 deletions sdk/python/kfp/client/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from google.protobuf import json_format
from kfp.client import auth
from kfp.client import client
from kfp.compiler import Compiler
from kfp.compiler.compiler import Compiler
from kfp.compiler.compiler import override_caching_options
from kfp.dsl import component
from kfp.dsl import pipeline
from kfp.pipeline_spec import pipeline_spec_pb2
Expand Down Expand Up @@ -88,7 +89,7 @@ def pipeline_with_two_component(text: str = 'hi there'):
pipeline_obj = yaml.safe_load(f)
pipeline_spec = json_format.ParseDict(
pipeline_obj, pipeline_spec_pb2.PipelineSpec())
client._override_caching_options(pipeline_spec, True)
override_caching_options(pipeline_spec, True)
pipeline_obj = json_format.MessageToDict(pipeline_spec)
self.assertTrue(pipeline_obj['root']['dag']['tasks']
['hello-word']['cachingOptions']['enableCache'])
Expand Down
25 changes: 25 additions & 0 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from kfp.compiler import pipeline_spec_builder as builder
from kfp.dsl import base_component
from kfp.dsl.types import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2


class Compiler:
Expand Down Expand Up @@ -53,6 +54,7 @@ def compile(
pipeline_name: Optional[str] = None,
pipeline_parameters: Optional[Dict[str, Any]] = None,
type_check: bool = True,
enable_caching: Optional[bool] = None,
) -> None:
"""Compiles the pipeline or component function into IR YAML.

Expand All @@ -62,6 +64,12 @@ def compile(
pipeline_name: Name of the pipeline.
pipeline_parameters: Map of parameter names to argument values.
type_check: Whether to enable type checking of component interfaces during compilation.
enable_caching: Whether or not to enable caching for the
run. If not set, defaults to the compile time settings, which
is ``True`` for all tasks by default, while users may specify
different caching options for individual tasks. If set, the
setting applies to all tasks in the pipeline (overrides the
Copy link
Contributor

@DharmitD DharmitD Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we want to override compile time settings, we just want to have an option to change the caching default. When we add the flag, this needs to happen for instance:

Assume that we set our flag such that default caching is disabled.
When we are done with this feature, this is expected:
@dsl.pipeline(name='iris-training-pipeline')
def my_pipeline():
   task_1 = create_dataset()
   task_2 = create_dataset()
   task_3 = create_dataset()
   task_3.set_caching_options(True)
   # tasks 1 and 2 don’t try to use the cache. Task 3 does try to use the cache.

However, with your update of overriding in the client directly, task 3's caching might also get set to False. (Not entirely sure, something you would have to test and find out @diegolovison )

@gregsheremeta could elaborate

compile time settings).
"""

with type_utils.TypeCheckManager(enable=type_check):
Expand All @@ -78,9 +86,26 @@ def compile(
pipeline_parameters=pipeline_parameters,
)

if enable_caching is not None:
override_caching_options(pipeline_spec, enable_caching)

builder.write_pipeline_spec_to_file(
pipeline_spec=pipeline_spec,
pipeline_description=pipeline_func.description,
platform_spec=pipeline_func.platform_spec,
package_path=package_path,
)


def override_caching_options(
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
enable_caching: bool,
) -> None:
"""Overrides caching options.

Args:
pipeline_spec: The PipelineSpec object to update in-place.
enable_caching: Overrides options, one of True, False.
"""
for _, task_spec in pipeline_spec.root.dag.tasks.items():
task_spec.caching_options.enable_cache = enable_caching
Loading