diff --git a/google_cloud_automlops/AutoMLOps.py b/google_cloud_automlops/AutoMLOps.py index b7d2124..e71ac42 100644 --- a/google_cloud_automlops/AutoMLOps.py +++ b/google_cloud_automlops/AutoMLOps.py @@ -18,6 +18,8 @@ # pylint: disable=C0103 # pylint: disable=line-too-long # pylint: disable=unused-import +# pylint: disable=logging-fstring-interpolation +# pylint: disable=global-at-module-level import functools import json @@ -50,6 +52,7 @@ from google_cloud_automlops.utils.utils import ( account_permissions_warning, check_installation_versions, + coalesce, create_default_config, execute_process, make_dirs, @@ -62,15 +65,13 @@ write_yaml_file ) # Orchestration imports -from google_cloud_automlops.orchestration.kfp import builder as KfpBuilder -from google_cloud_automlops.orchestration.kfp import scaffold as KfpScaffold -from google_cloud_automlops.orchestration.enums import ( +from google_cloud_automlops.utils.enums import ( Orchestrator, PipelineJobSubmitter ) -from google_cloud_automlops.orchestration.configs import ( - KfpConfig -) +from google_cloud_automlops.orchestration.base import BaseComponent, BasePipeline, BaseServices +from google_cloud_automlops.orchestration.kfp import KFPComponent, KFPPipeline, KFPServices + # Provisioning imports from google_cloud_automlops.provisioning.pulumi import builder as PulumiBuilder from google_cloud_automlops.provisioning.terraform import builder as TerraformBuilder @@ -95,13 +96,19 @@ ) from google_cloud_automlops.deployments.gitops.git_utils import git_workflow +# Set up logging logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(message)s') logging.getLogger('googleapiclient').setLevel(logging.WARNING) logger = logging.getLogger() +# Create output directory make_dirs([OUTPUT_DIR]) +# Set up global dictionaries to hold pipeline and components +global components_dict +components_dict = {} + def launchAll( project_id: str, pipeline_params: Dict, @@ -278,8 +285,10 @@ def generate( raise ValueError(f'Unsupported deployment framework: {deployment_framework}') logging.info(f'Writing directories under {BASE_DIR}') + # Make standard directories make_dirs(GENERATED_DIRS) + # Make optional directories if use_ci: make_dirs(GENERATED_SERVICES_DIRS) @@ -291,15 +300,15 @@ def generate( make_dirs(GENERATED_MODEL_MONITORING_DIRS) # Set derived vars if none were given for certain variables - derived_artifact_repo_name = f'{naming_prefix}-artifact-registry' if artifact_repo_name is None else artifact_repo_name - derived_build_trigger_name = f'{naming_prefix}-build-trigger' if build_trigger_name is None else build_trigger_name - derived_custom_training_job_specs = stringify_job_spec_list(custom_training_job_specs) if custom_training_job_specs is not None else custom_training_job_specs - derived_pipeline_job_runner_service_account = f'vertex-pipelines@{project_id}.iam.gserviceaccount.com' if pipeline_job_runner_service_account is None else pipeline_job_runner_service_account - derived_pipeline_job_submission_service_name = f'{naming_prefix}-job-submission-svc' if pipeline_job_submission_service_name is None else pipeline_job_submission_service_name - derived_pubsub_topic_name = f'{naming_prefix}-queueing-svc' if pubsub_topic_name is None else pubsub_topic_name - derived_schedule_name = f'{naming_prefix}-schedule' if schedule_name is None else schedule_name - derived_source_repo_name = f'{naming_prefix}-repository' if source_repo_name is None else source_repo_name - derived_storage_bucket_name = f'{project_id}-{naming_prefix}-bucket' if storage_bucket_name is None else storage_bucket_name + derived_artifact_repo_name = coalesce(artifact_repo_name, f'{naming_prefix}-artifact-registry') + derived_build_trigger_name = coalesce(build_trigger_name, f'{naming_prefix}-build-trigger') + derived_custom_training_job_specs = stringify_job_spec_list(custom_training_job_specs) + derived_pipeline_job_runner_service_account = coalesce(pipeline_job_runner_service_account, f'vertex-pipelines@{project_id}.iam.gserviceaccount.com') + derived_pipeline_job_submission_service_name = coalesce(pipeline_job_submission_service_name, f'{naming_prefix}-job-submission-svc') + derived_pubsub_topic_name = coalesce(pubsub_topic_name, f'{naming_prefix}-queueing-svc') + derived_schedule_name = coalesce(schedule_name, f'{naming_prefix}-schedule') + derived_source_repo_name = coalesce(source_repo_name, f'{naming_prefix}-repository') + derived_storage_bucket_name = coalesce(storage_bucket_name, f'{project_id}-{naming_prefix}-bucket') # Write defaults.yaml defaults = create_default_config( @@ -337,20 +346,33 @@ def generate( # Generate files required to run a Kubeflow pipeline if orchestration_framework == Orchestrator.KFP.value: + + # Log what files will be created logging.info(f'Writing README.md to {BASE_DIR}README.md') - logging.info(f'Writing kubeflow pipelines code to {BASE_DIR}pipelines, {BASE_DIR}components') logging.info(f'Writing scripts to {BASE_DIR}scripts') - if use_ci: - logging.info(f'Writing submission service code to {BASE_DIR}services') + + # Write kubeflow pipeline code + logging.info(f'Writing kubeflow pipelines code to {BASE_DIR}pipelines') + kfppipe = KFPPipeline(func=pipeline_glob.func, + name=pipeline_glob.name, + description=pipeline_glob.description, + comps_dict=components_dict) + kfppipe.build(pipeline_params, derived_custom_training_job_specs) + + # Write kubeflow components code + logging.info(f'Writing kubeflow components code to {BASE_DIR}components') + for comp in kfppipe.comps: + logging.info(f' -- Writing {comp.name}') + KFPComponent(func=comp.func, packages_to_install=comp.packages_to_install).build() + if setup_model_monitoring: logging.info(f'Writing model monitoring code to {BASE_DIR}model_monitoring') - KfpBuilder.build(KfpConfig( - base_image=base_image, - custom_training_job_specs=derived_custom_training_job_specs, - setup_model_monitoring=setup_model_monitoring, - pipeline_params=pipeline_params, - pubsub_topic_name=derived_pubsub_topic_name, - use_ci=use_ci)) + + # If user specified services, write services scripts + if use_ci: + logging.info(f'Writing submission service code to {BASE_DIR}services') + defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) + KFPServices().build() # Generate files required to provision resources if provisioning_framework == Provisioner.GCLOUD.value: @@ -626,15 +648,17 @@ def my_function_one(input: str, output: Output[Model]): a plain parameter, or a path to a file). packages_to_install: A list of optional packages to install before executing func. These will always be installed at component runtime. - """ + """ if func is None: return functools.partial( component, packages_to_install=packages_to_install) else: - return KfpScaffold.create_component_scaffold( + components_dict[func.__name__] = BaseComponent( func=func, - packages_to_install=packages_to_install) + packages_to_install=packages_to_install + ) + return def pipeline(func: Optional[Callable] = None, @@ -670,10 +694,12 @@ def pipeline(bq_table: str, name=name, description=description) else: - return KfpScaffold.create_pipeline_scaffold( - func=func, - name=name, - description=description) + global pipeline_glob + pipeline_glob = BasePipeline(func=func, + name=name, + description=description, + comps_dict=components_dict) + return def clear_cache(): diff --git a/google_cloud_automlops/orchestration/airflow/.gitkeep b/google_cloud_automlops/orchestration/airflow/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/google_cloud_automlops/orchestration/argo/.gitkeep b/google_cloud_automlops/orchestration/argo/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/google_cloud_automlops/orchestration/base.py b/google_cloud_automlops/orchestration/base.py new file mode 100644 index 0000000..a8c9986 --- /dev/null +++ b/google_cloud_automlops/orchestration/base.py @@ -0,0 +1,359 @@ +# Copyright 2023 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Creates generic component, pipeline, and services objects.""" + +# pylint: disable=anomalous-backslash-in-string +# pylint: disable=C0103 +# pylint: disable=line-too-long + +import ast +import inspect +from typing import Callable, List, Optional, TypeVar, Union + +import docstring_parser + +from google_cloud_automlops.utils.utils import ( + get_function_source_definition, + read_yaml_file +) +from google_cloud_automlops.utils.constants import ( + BASE_DIR, + DEFAULT_PIPELINE_NAME, + GENERATED_DEFAULTS_FILE +) + +T = TypeVar('T') + + +class BaseComponent(): + """The Component object represents a component defined by the user. + """ + def __init__(self, + func: Optional[Callable] = None, + packages_to_install: Optional[List[str]] = None): + """Initiates a generic Component object created out of a function holding + all necessary code. + + Args: + func (Optional[Callable]): The python function to create a component from. The function + should have type annotations for all its arguments, indicating how it is intended to + be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a + file). Defaults to None. + packages_to_install (Optional[List[str]]): A list of optional packages to install before + executing func. These will always be installed at component runtime. Defaults to None. + + Raises: + ValueError: The parameter `func` is not an existing function. + """ + + # Confirm the input is an existing function + if not inspect.isfunction(func): + raise ValueError(f"{func} must be of type function.") + + # Set simple attributes of the component function + self.func = func + self.name = func.__name__ + self.packages_to_install = [] if not packages_to_install else packages_to_install + + # Parse the docstring for description + self.parsed_docstring = docstring_parser.parse(inspect.getdoc(func)) + self.description = self.parsed_docstring.short_description + + # Process and extract details from passed function + self.parameters = self._get_function_parameters() + self.return_types = self._get_function_return_types() + self.src_code = get_function_source_definition(self.func) + + # Instantiate attributes to be set during build + self.artifact_repo_location = None + self.artifact_repo_name = None + self.project_id = None + self.naming_prefix = None + + def build(self): + """Instantiates an abstract built method to create and write task files. Also reads in + defaults file to save default arguments to attributes. + + Raises: + NotImplementedError: The subclass has not defined the `build` method. + """ + + defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) + self.artifact_repo_location = defaults['gcp']['artifact_repo_location'] + self.artifact_repo_name = defaults['gcp']['artifact_repo_name'] + self.project_id = defaults['gcp']['project_id'] + self.naming_prefix = defaults['gcp']['naming_prefix'] + + raise NotImplementedError('Subclass needs to define this.') + + def _get_function_return_types(self) -> list: + """Returns a formatted list of function return types. + + Returns: + list: return value list with types converted to kubeflow spec. + + Raises: + Exception: If return type is provided and not a NamedTuple. + """ + # Extract return type annotation of function + annotation = inspect.signature(self.func).return_annotation + + # Ensures return type is not optional + if self.maybe_strip_optional_from_annotation(annotation) is not annotation: + raise TypeError('Return type cannot be Optional.') + + # No annotations provided, return none + # pylint: disable=protected-access + if annotation == inspect._empty: + return None + + # Checks if the function's return type annotation is a valid NamedTuple + if not (hasattr(annotation,'__annotations__') and isinstance(annotation.__annotations__, dict)): + raise TypeError(f'''Return type hint for function "{self.name}" must be a NamedTuple.''') + + # Creates a dictionary of metadata for each object returned by component + outputs = [] + for name, type_ in annotation.__annotations__.items(): + metadata = {} + metadata['name'] = name + metadata['type'] = type_ + metadata['description'] = None + outputs.append(metadata) + return outputs + + def _get_function_parameters(self) -> list: + """Returns a formatted list of parameters. + + Returns: + list: Params list with types converted to kubeflow spec. + + Raises: + Exception: Parameter type hints are not provided. + """ + # Extract function parameter names and their descriptions from the function's docstring + signature = inspect.signature(self.func) + parameters = list(signature.parameters.values()) + parsed_docstring = docstring_parser.parse(inspect.getdoc(self.func)) + doc_dict = {p.arg_name: p.description for p in parsed_docstring.params} + + # Extract parameter metadata + parameter_holder = [] + for param in parameters: + metadata = {} + metadata['name'] = param.name + metadata['description'] = doc_dict.get(param.name) + metadata['type'] = self.maybe_strip_optional_from_annotation( + param.annotation) + parameter_holder.append(metadata) + # pylint: disable=protected-access + if metadata['type'] == inspect._empty: + raise TypeError( + f'''Missing type hint for parameter "{metadata['name']}". ''' + f'''Please specify the type for this parameter.''') + return parameter_holder + + def maybe_strip_optional_from_annotation(self, annotation: T) -> T: + """Strips 'Optional' from 'Optional[]' if applicable. + + For example:: + Optional[str] -> str + str -> str + List[int] -> List[int] + + Args: + annotation: The original type annotation which may or may not has `Optional`. + + Returns: + The type inside Optional[] if Optional exists, otherwise the original type. + """ + if getattr(annotation, '__origin__', None) is Union and annotation.__args__[1] is type(None): + return annotation.__args__[0] + else: + return annotation + + +class BasePipeline(): + """The Pipeline object represents a component defined by the user. + """ + + def __init__(self, + func: Optional[Callable] = None, + name: Optional[str] = None, + description: Optional[str] = None, + comps_dict: dict = None): + """Initiates a pipeline object created out of a function holding + all necessary code. + + Args: + func (Optional[Callable]): The python function to create a pipeline from. The function + should have type annotations for all its arguments, indicating how it is intended to + be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a + file). Defaults to None. + name (Optional[str]): The name of the pipeline. Defaults to None. + description (Optional[str]): Short description of what the pipeline does. Defaults to None. + comps_list (dict): Dictionary of potential components for pipeline to utilize imported + as the global held in AutoMLOps.py. Defaults to None. + """ + # Instantiate and set key pipeline attributes + self.func = func + self.func_name = func.__name__ + self.name = DEFAULT_PIPELINE_NAME if not name else name + self.description = description + self.src_code = get_function_source_definition(self.func) + self.comps = self.get_pipeline_components(func, comps_dict) + + # Instantiate attributes to be set at build process + self.base_image = None + self.custom_training_job_specs = None + self.pipeline_params = None + self.pubsub_topic_name = None + self.use_ci = None + self.project_id = None + self.gs_pipeline_job_spec_path = None + self.setup_model_monitoring = None + + def build(self, + pipeline_params: dict, + custom_training_job_specs: Optional[List] = None): + """Instantiates an abstract built method to create and write pipeline files. Also reads in + defaults file to save default arguments to attributes. + + Files created must include: + 1. README.md + 2. Dockerfile + 3. Requirements.txt + + Args: + custom_training_job_specs (dict): Specifies the specs to run the training job with. + pipeline_params (Optional[List]): Dictionary containing runtime pipeline parameters. + Defaults to None. + + Raises: + NotImplementedError: The subclass has not defined the `build` method. + """ + # Save parameters as attributes + self.custom_training_job_specs = custom_training_job_specs + self.pipeline_params = pipeline_params + + # Extract additional attributes from defaults file + defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) + self.project_id = defaults['gcp']['project_id'] + self.gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path'] + self.base_image = defaults['gcp']['base_image'] + self.pubsub_topic_name = defaults['gcp']['pubsub_topic_name'] + self.use_ci = defaults['tooling']['use_ci'] + self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] + + raise NotImplementedError('Subclass needs to define this.') + + def get_pipeline_components(self, + pipeline_func: Callable, + comps_dict: dict) -> list: + """Returns a list of components used within a given pipeline. + + Args: + pipeline_func (Callable): Pipeline function. + comps_dict (dict): List of potential components to use within pipeline. + + Returns: + List: Components from comps_dict used within the pipeline_func. + """ + # Retrieves pipeline source code and parses it into an Abstract Syntax Tree (AST) + code = inspect.getsource(pipeline_func) + ast_tree = ast.parse(code) + + # Iterates through AST, finds function calls to components that are in comps_dict + comps_list = [] + for node in ast.walk(ast_tree): + try: + if isinstance(node, ast.Call) and node.func.id in comps_dict.keys(): + comps_list.append(comps_dict[node.func.id]) + except Exception: + pass + return comps_list + + +class BaseFuturePipeline(): + """Placeholder for future pipeline object that will be created out of a list of components. + """ + def __init__(self, comps: list) -> None: + self.comps = comps + self.names = [comp.name for comp in self.comps] + + +class BaseServices(): + """The Services object will contain TODO: fill out what this does + """ + + def __init__(self) -> None: + """Instantiates a generic Services object. + """ + self.pipeline_storage_path = None + self.pipeline_job_runner_service_account = None + self.pipeline_job_submission_service_type = None + self.project_id = None + self.pipeline_job_submission_service_type = None + self.setup_model_monitoring = None + + # Set directory for files to be written to + self.submission_service_base_dir = BASE_DIR + 'services/submission_service' + + def build(self): + """Constructs and writes files related to submission services and model monitoring. + + Files created under AutoMLOps/: + services/ + submission_service/ + Dockerfile + main.py + requirements.txt + model_monitoring/ (if requested) + monitor.py + requirements.txt + """ + # Extract additional attributes from defaults file + defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) + self.pipeline_storage_path = defaults['pipelines']['pipeline_storage_path'] + self.pipeline_job_runner_service_account = defaults['gcp']['pipeline_job_runner_service_account'] + self.pipeline_job_submission_service_type = defaults['gcp']['pipeline_job_submission_service_type'] + self.project_id = defaults['gcp']['project_id'] + self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] + + # Set directory for files to be written to + self.submission_service_base_dir = BASE_DIR + 'services/submission_service' + + # Build services files + self._build_submission_services() + + # Setup model monitoring + if self.setup_model_monitoring: + self._build_monitoring() + + def _build_monitoring(self): + """Abstract method to create the model monitoring files. + + Raises: + NotImplementedError: The subclass has not defined the `_build_monitoring` method. + """ + raise NotImplementedError('Subclass needs to define this') + + def _build_submission_services(self): + """Abstract method to create the Dockerfile, requirements.txt, and main.py files of the + services/submission_service directory. + + Raises: + NotImplementedError: The subclass has not defined the `_build_submission_services` method. + """ + raise NotImplementedError('Subclass needs to define this.') diff --git a/google_cloud_automlops/orchestration/configs.py b/google_cloud_automlops/orchestration/configs.py deleted file mode 100644 index 9474b07..0000000 --- a/google_cloud_automlops/orchestration/configs.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright 2024 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Model classes for AutoMLOps Orchestration Frameworks.""" - -# pylint: disable=C0103 -# pylint: disable=line-too-long - -from typing import Dict, List, Optional - -from pydantic import BaseModel - - -class KfpConfig(BaseModel): - """Model representing the KFP config. - - Args: - base_image: The image to use in the component base dockerfile. - custom_training_job_specs: Specifies the specs to run the training job with. - pipeline_params: Dictionary containing runtime pipeline parameters. - pubsub_topic_name: The name of the pubsub topic to publish to. - setup_model_monitoring: Boolean parameter which specifies whether to set up a Vertex AI Model Monitoring Job. - use_ci: Flag that determines whether to use Cloud Run CI/CD. - """ - base_image: str - custom_training_job_specs: Optional[List] - pipeline_params: Dict - pubsub_topic_name: str - setup_model_monitoring: bool - use_ci: bool diff --git a/google_cloud_automlops/orchestration/kfp.py b/google_cloud_automlops/orchestration/kfp.py new file mode 100644 index 0000000..75f3f01 --- /dev/null +++ b/google_cloud_automlops/orchestration/kfp.py @@ -0,0 +1,610 @@ +# Copyright 2023 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Creates a KFP component, pipeline, and services subclass.""" + +# pylint: disable=anomalous-backslash-in-string +# pylint: disable=C0103 +# pylint: disable=line-too-long + +import json +import re +import textwrap +from typing import Callable, List, Optional + +try: + from importlib.resources import files as import_files +except ImportError: + # Try backported to PY<37 `importlib_resources` + from importlib_resources import files as import_files + +from google_cloud_automlops.orchestration.base import BaseComponent, BasePipeline, BaseServices +from google_cloud_automlops.utils.utils import ( + execute_process, + get_components_list, + make_dirs, + read_file, + read_yaml_file, + render_jinja, + write_and_chmod, + write_file, + write_yaml_file +) +from google_cloud_automlops.utils.constants import ( + BASE_DIR, + GENERATED_BUILD_COMPONENTS_SH_FILE, + GENERATED_COMPONENT_BASE, + GENERATED_DEFAULTS_FILE, + GENERATED_LICENSE, + GENERATED_MODEL_MONITORING_MONITOR_PY_FILE, + GENERATED_MODEL_MONITORING_REQUIREMENTS_FILE, + GENERATED_MODEL_MONITORING_SH_FILE, + GENERATED_PARAMETER_VALUES_PATH, + GENERATED_PIPELINE_FILE, + GENERATED_PIPELINE_REQUIREMENTS_FILE, + GENERATED_PIPELINE_RUNNER_FILE, + GENERATED_PIPELINE_SPEC_SH_FILE, + GENERATED_PUBLISH_TO_TOPIC_FILE, + GENERATED_RUN_PIPELINE_SH_FILE, + GENERATED_RUN_ALL_SH_FILE, + KFP_TEMPLATES_PATH, + PINNED_KFP_VERSION, + PLACEHOLDER_IMAGE +) + + +class KFPComponent(BaseComponent): + """Creates a KFP specific Component object for #TODO: add more + + Args: + BaseComponent (object): Generic Component object. + """ + + def __init__(self, + func: Optional[Callable] = None, + packages_to_install: Optional[List[str]] = None): + """Initiates a KFP Component object created out of a function holding all necessary code. + + Args: + func (Optional[Callable]): The python function to create a component from. The function + should have type annotations for all its arguments, indicating how + it is intended to be used (e.g. as an input/output Artifact object, + a plain parameter, or a path to a file). Defaults to None. + packages_to_install (Optional[List[str]]): A list of optional packages to install before + executing func. These will always be installed at component runtime. Defaults to None. + """ + super().__init__(func, packages_to_install) + + # Update parameters and return types to reflect KFP data types + if self.parameters: + self.parameters = self._update_params(self.parameters) + if self.return_types: + self.return_types = self._update_params(self.return_types) + + # Set packages to install and component spec attributes + self.packages_to_install_command = self._get_packages_to_install_command() + self.component_spec = self._create_component_spec() + + def build(self): + """Constructs files for running and managing Kubeflow pipelines. + """ + defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) + self.artifact_repo_location = defaults['gcp']['artifact_repo_location'] + self.artifact_repo_name = defaults['gcp']['artifact_repo_name'] + self.project_id = defaults['gcp']['project_id'] + self.naming_prefix = defaults['gcp']['naming_prefix'] + + # Set and create directory for components if it does not already exist + component_dir = BASE_DIR + 'components/' + self.component_spec['name'] + + # Build necessary folders + # TODO: make this only happen for the first component? or pull into automlops.py + make_dirs([ + component_dir, + BASE_DIR + 'components/component_base/src/']) + + # TODO: can this be removed? + kfp_spec_bool = self.component_spec['implementation']['container']['image'] != PLACEHOLDER_IMAGE + + # Read in component specs + custom_code_contents = self.component_spec['implementation']['container']['command'][-1] + compspec_image = ( + f'''{self.artifact_repo_location}-docker.pkg.dev/''' + f'''{self.project_id}/''' + f'''{self.artifact_repo_name}/''' + f'''{self.naming_prefix}/''' + f'''components/component_base:latest''') + + # If using kfp, remove spaces in name and convert to lowercase + if kfp_spec_bool: + self.component_spec['name'] = self.component_spec['name'].replace(' ', '_').lower() + + # Write task script to component base + write_file( + filepath=BASE_DIR + 'components/component_base/src/' + self.component_spec['name'] + '.py', + text=render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH + '.components.component_base.src') / 'task.py.j2', + generated_license=GENERATED_LICENSE, + kfp_spec_bool=kfp_spec_bool, + custom_code_contents=custom_code_contents), + mode='w') + + # Update component_spec to include correct image and startup command + self.component_spec['implementation']['container']['image'] = compspec_image + self.component_spec['implementation']['container']['command'] = [ + 'python3', + f'''/pipelines/component/src/{self.component_spec['name']+'.py'}'''] + + # Write license and component spec to the appropriate component.yaml file + comp_yaml_path = component_dir + '/component.yaml' + write_file( + filepath=comp_yaml_path, + text=GENERATED_LICENSE, + mode='w') + write_yaml_file( + filepath=comp_yaml_path, + contents=self.component_spec, + mode='a') + + def _get_packages_to_install_command(self) -> list: + """Creates a list of formatted list of commands, including code for tmp storage. + + Returns: + list: Formatted commands to install necessary packages. #TODO: add more, where is this used + """ + newline = '\n' + concat_package_list = ' '.join([repr(str(package)) for package in self.packages_to_install]) + install_python_packages_script = ( + f'''if ! [ -x "$(command -v pip)" ]; then{newline}''' + f''' python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip{newline}''' + f'''fi{newline}''' + f'''PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet \{newline}''' + f''' --no-warn-script-location {concat_package_list} && "$0" "$@"{newline}''' + f'''{newline}''') + return ['sh', '-c', install_python_packages_script, self.src_code] + + def _create_component_spec(self) -> dict: + """Creates a tmp component scaffold which will be used by the formalize function. Code is + temporarily stored in component_spec['implementation']['container']['command']. + + Returns: + dict: _description_ #TODO: FILL OUT + """ + # Instantiate component yaml attributes + component_spec = {} + + # Save component name, description, outputs, and parameters + component_spec['name'] = self.name + if self.description: + component_spec['description'] = self.description + outputs = self.return_types + if outputs: + component_spec['outputs'] = outputs + component_spec['inputs'] = self.parameters + + # TODO: comment + component_spec['implementation'] = {} + component_spec['implementation']['container'] = {} + component_spec['implementation']['container']['image'] = PLACEHOLDER_IMAGE + component_spec['implementation']['container']['command'] = self.packages_to_install_command + component_spec['implementation']['container']['args'] = ['--executor_input', + {'executorInput': None}, + '--function_to_execute', + self.name] + return component_spec + + def _update_params(self, params: list) -> list: + """Converts the parameter types from Python types to Kubeflow types. Currently only supports + Python primitive types. + + Args: + params: Pipeline parameters. A list of dictionaries, Each param is a dict containing keys: + 'name': required, str param name. + 'type': required, python primitive type. + 'description': optional, str param desc. + + Returns: + list: Params list with converted types. + + Raises: + ValueError: If an inputted type is not a primitive. + """ + python_kfp_types_mapper = { + int: 'Integer', + str: 'String', + float: 'Float', + bool: 'Bool', + list: 'JsonArray', + dict: 'JsonObject' + } + for param in params: + try: + param['type'] = python_kfp_types_mapper[param['type']] + except KeyError as err: + raise ValueError(f'Unsupported python type - we only support ' + f'primitive types at this time. {err}') from err + return params + + +class KFPPipeline(BasePipeline): + """Creates a KFP specific Pipeline object for #TODO: add more + + Args: + BasePipeline (object): Generic Pipeline object. + """ + def __init__(self, + func: Optional[Callable] = None, + name: Optional[str] = None, + description: Optional[str] = None, + comps_dict: dict = None) -> None: + """Initiates a KFP pipeline object created out of a function holding all necessary code. + + Args: + func (Optional[Callable]): The python function to create a pipeline from. The functio + should have type annotations for all its arguments, indicating how it is intended + to be used (e.g. as an input/output Artifact object, a plain parameter, or a path + to a file). Defaults to None. + name (Optional[str]): The name of the pipeline. Defaults to None. + description (Optional[str]): Short description of what the pipeline does. Defaults to None. + comps_list (dict): Dictionary of potential components for pipeline to utilize imported + as the global held in AutoMLOps.py. Defaults to None. + """ + super().__init__( + func=func, + name=name, + description=description, + comps_dict=comps_dict) + + # Create pipeline scaffold attribute # TODO: more descriptive + self.pipeline_scaffold = ( + self._get_pipeline_decorator() + + self.src_code + + self._get_compile_step()) + + def build(self, + pipeline_params: dict, + custom_training_job_specs: Optional[List] = None): + """Constructs files for running and managing Kubeflow pipelines. + + Files created under AutoMLOps/: + README.md + scripts/ + pipeline_spec/.gitkeep + build_components.sh + build_pipeline_spec.sh + run_pipeline.sh + publish_to_topic.sh + run_all.sh + components/ + component_base/Dockerfile + component_base/requirements.txt + pipelines/ + pipeline.py + pipeline_runner.py + requirements.txt + runtime_parameters/pipeline_parameter_values.json + + Args: + custom_training_job_specs (dict): Specifies the specs to run the training job with. + pipeline_params (Optional[List]): Dictionary containing runtime pipeline parameters. Defaults + to None. + + """ + # Save parameters as attributes + self.custom_training_job_specs = custom_training_job_specs + self.pipeline_params = pipeline_params + + # Extract additional attributes from defaults file + defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) + self.project_id = defaults['gcp']['project_id'] + self.gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path'] + self.base_image = defaults['gcp']['base_image'] + self.pubsub_topic_name = defaults['gcp']['pubsub_topic_name'] + self.use_ci = defaults['tooling']['use_ci'] + self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] + + # Build necessary folders + make_dirs([ + f'{BASE_DIR}scripts/pipeline_spec/', + f'{BASE_DIR}pipelines', + f'{BASE_DIR}pipelines/runtime_parameters/' + ]) + + # README.md: Write description of the contents of the directory + write_file( + filepath=f'{BASE_DIR}README.md', + text=render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH) / 'README.md.j2', + setup_model_monitoring=self.setup_model_monitoring, + use_ci=self.use_ci), + mode='w') + + # components/component_base/dockerfile: Write the component base Dockerfile + write_file( + filepath=f'{GENERATED_COMPONENT_BASE}/Dockerfile', + text=render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH + '.components.component_base') / 'Dockerfile.j2', + base_image=self.base_image, + generated_license=GENERATED_LICENSE), + mode='w') + + # components/component_base/requirements.txt: Write the component base requirements file + write_file( + filepath=f'{GENERATED_COMPONENT_BASE}/requirements.txt', + text=self._create_component_base_requirements(), + mode='w') + + # Save scripts template path + scripts_template_path = import_files(KFP_TEMPLATES_PATH + '.scripts') + + # scripts/pipeline_spec/.gitkeep: Write gitkeep to pipeline_spec directory + write_file( + filepath=f'{BASE_DIR}scripts/pipeline_spec/.gitkeep', + text='', + mode='w') + + # scripts/build_components.sh: Write script for building components + write_and_chmod( + filepath=GENERATED_BUILD_COMPONENTS_SH_FILE, + text=render_jinja( + template_path=scripts_template_path / 'build_components.sh.j2', + generated_license=GENERATED_LICENSE, + base_dir=BASE_DIR)) + + # scripts/build_pipeline_spec.sh: Write script for building pipeline specs + write_and_chmod( + filepath=GENERATED_PIPELINE_SPEC_SH_FILE, + text=render_jinja( + template_path=scripts_template_path / 'build_pipeline_spec.sh.j2', + generated_license=GENERATED_LICENSE, + base_dir=BASE_DIR)) + + # scripts/run_pipline: Write script for running pipeline + write_and_chmod( + filepath=GENERATED_RUN_PIPELINE_SH_FILE, + text=render_jinja( + template_path=scripts_template_path / 'run_pipeline.sh.j2', + generated_license=GENERATED_LICENSE, + base_dir=BASE_DIR)) + + # scripts/run_all.sh: Write script for running all files + write_and_chmod( + filepath=GENERATED_RUN_ALL_SH_FILE, + text=render_jinja( + template_path=scripts_template_path / 'run_all.sh.j2', + generated_license=GENERATED_LICENSE, + base_dir=BASE_DIR)) + + # scripts/publish_to_topic.sh: If using CI, write script for publishing to pubsub topic + if self.use_ci: + write_and_chmod( + filepath=GENERATED_PUBLISH_TO_TOPIC_FILE, + text=render_jinja( + template_path=scripts_template_path / 'publish_to_topic.sh.j2', + base_dir=BASE_DIR, + generated_license=GENERATED_LICENSE, + generated_parameter_values_path=GENERATED_PARAMETER_VALUES_PATH, + pubsub_topic_name=self.pubsub_topic_name)) + + # pipelines/pipeline.py: Generates a Kubeflow pipeline spec from custom components. + components_list = get_components_list(full_path=False) + pipeline_scaffold_contents = textwrap.indent(self.pipeline_scaffold, 4 * ' ') + write_file( + filepath=GENERATED_PIPELINE_FILE, + text=render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'pipeline.py.j2', + components_list=components_list, + custom_training_job_specs=self.custom_training_job_specs, + generated_license=GENERATED_LICENSE, + pipeline_scaffold_contents=pipeline_scaffold_contents, + project_id=self.project_id), + mode='w') + + # pipelines/pipeline_runner.py: Sends a PipelineJob to Vertex AI using pipeline spec. + write_file( + filepath=GENERATED_PIPELINE_RUNNER_FILE, + text=render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'pipeline_runner.py.j2', + generated_license=GENERATED_LICENSE), + mode='w') + + # pipelines/requirements.txt + write_file( + filepath=GENERATED_PIPELINE_REQUIREMENTS_FILE, + text=render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'requirements.txt.j2', + pinned_kfp_version=PINNED_KFP_VERSION), + mode='w') + + # pipelines/runtime_parameters/pipeline_parameter_values.json: Provides runtime parameters for the PipelineJob. + self.pipeline_params['gs_pipeline_spec_path'] = self.gs_pipeline_job_spec_path + serialized_params = json.dumps(self.pipeline_params, indent=4) + write_file(BASE_DIR + GENERATED_PARAMETER_VALUES_PATH, serialized_params, 'w') + + def _get_pipeline_decorator(self): + """Constructs the kfp pipeline decorator. + + Returns: + str: KFP pipeline decorator. + """ + name_str = f'''(\n name='{self.name}',\n''' + desc_str = f''' description='{self.description}',\n''' if self.description else '' + ending_str = ')\n' + return '@dsl.pipeline' + name_str + desc_str + ending_str + + def _get_compile_step(self): + """Constructs the compile function call. + + Returns: + str: Compile function call. + """ + return ( + f'\n' + f'compiler.Compiler().compile(\n' + f' pipeline_func={self.func_name},\n' + f' package_path=pipeline_job_spec_path)\n' + f'\n' + ) + + def _create_component_base_requirements(self) -> str: + """Writes a requirements.txt to the component_base directory. Infers pip requirements from + the python srcfiles using pipreqs. Takes user-inputted requirements, and addes some default + gcp packages as well as packages that are often missing in setup.py files (e.g db_types, + pyarrow, gcsfs, fsspec). TODO: update this as it returns a string, doesn't write a file. + + Returns: + str: TODO + """ + reqs_filename = f'{GENERATED_COMPONENT_BASE}/requirements.txt' + default_gcp_reqs = [ + 'google-cloud-aiplatform', + 'google-cloud-appengine-logging', + 'google-cloud-audit-log', + 'google-cloud-bigquery', + 'google-cloud-bigquery-storage', + 'google-cloud-bigtable', + 'google-cloud-core', + 'google-cloud-dataproc', + 'google-cloud-datastore', + 'google-cloud-dlp', + 'google-cloud-firestore', + 'google-cloud-kms', + 'google-cloud-language', + 'google-cloud-logging', + 'google-cloud-monitoring', + 'google-cloud-notebooks', + 'google-cloud-pipeline-components', + 'google-cloud-pubsub', + 'google-cloud-pubsublite', + 'google-cloud-recommendations-ai', + 'google-cloud-resource-manager', + 'google-cloud-scheduler', + 'google-cloud-spanner', + 'google-cloud-speech', + 'google-cloud-storage', + 'google-cloud-tasks', + 'google-cloud-translate', + 'google-cloud-videointelligence', + 'google-cloud-vision', + 'db_dtypes', + 'pyarrow', + 'gcsfs', + 'fsspec'] + + # Get user-inputted requirements from the cache dir + user_inp_reqs = [] + components_path_list = get_components_list() + for component_path in components_path_list: + component_spec = read_yaml_file(component_path) + reqs = component_spec['implementation']['container']['command'][2] + formatted_reqs = re.findall('\'([^\']*)\'', reqs) + user_inp_reqs.extend(formatted_reqs) + + # Check if user inputted requirements + if user_inp_reqs: + # Remove duplicates + set_of_requirements = set(user_inp_reqs) + else: + # If user did not input requirements, then infer reqs using pipreqs + execute_process(f'python3 -m pipreqs.pipreqs {GENERATED_COMPONENT_BASE} --mode no-pin --force', to_null=True) + pipreqs = read_file(reqs_filename).splitlines() + set_of_requirements = set(pipreqs + default_gcp_reqs) + + # Remove empty string + if '' in set_of_requirements: + set_of_requirements.remove('') + + # Pin kfp version + if 'kfp' in set_of_requirements: + set_of_requirements.remove('kfp') + set_of_requirements.add(PINNED_KFP_VERSION) + + # Stringify and sort + reqs_str = ''.join(r+'\n' for r in sorted(set_of_requirements)) + return reqs_str + + +class KFPServices(BaseServices): + """Creates a KFP specific Services object for #TODO: add more + + Args: + BaseServices (object): Generic Services object. + """ + def _build_monitoring(self): + """Writes files necessary for implementing model monitoring. Files created are: + scripts/ + create_model_monitoring_job.sh + model_monitoring/ + monitor.py + requirements.txt + """ + # Writes script create_model_monitoring_job.sh which creates a Vertex AI model monitoring job + write_and_chmod( + filepath=GENERATED_MODEL_MONITORING_SH_FILE, + text=render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH + '.scripts') / 'create_model_monitoring_job.sh.j2', + generated_license=GENERATED_LICENSE, + base_dir=BASE_DIR + )) + + # Writes monitor.py to create or update a model monitoring job in Vertex AI for a deployed model endpoint + write_file( + filepath=GENERATED_MODEL_MONITORING_MONITOR_PY_FILE, + text=render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH + '.model_monitoring') / 'monitor.py.j2', + generated_license=GENERATED_LICENSE + ), + mode='w') + + # Writes a requirements.txt to the model_monitoring directory + write_file( + filepath=GENERATED_MODEL_MONITORING_REQUIREMENTS_FILE, + text=render_jinja(template_path=import_files(KFP_TEMPLATES_PATH + '.model_monitoring') / 'requirements.txt.j2'), + mode='w') + + def _build_submission_services(self): + """Writes the files necessary for utilizing submission services. Files written are: + services/ + submission_service/ + Dockerfile + main.py + requirements.txt + """ + write_file( + f'{self.submission_service_base_dir}/requirements.txt', + render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'requirements.txt.j2', + pinned_kfp_version=PINNED_KFP_VERSION, + pipeline_job_submission_service_type=self.pipeline_job_submission_service_type), + 'w') + + write_file( + f'{self.submission_service_base_dir}/main.py', + render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'main.py.j2', + generated_license=GENERATED_LICENSE, + pipeline_root=self.pipeline_storage_path, + pipeline_job_runner_service_account=self.pipeline_job_runner_service_account, + pipeline_job_submission_service_type=self.pipeline_job_submission_service_type, + project_id=self.project_id, + setup_model_monitoring=self.setup_model_monitoring), + 'w') + + write_file( + f'{self.submission_service_base_dir}/Dockerfile', + render_jinja( + template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'Dockerfile.j2', + base_dir=BASE_DIR, + generated_license=GENERATED_LICENSE), + 'w') diff --git a/google_cloud_automlops/orchestration/kfp/builder.py b/google_cloud_automlops/orchestration/kfp/builder.py deleted file mode 100644 index 662e313..0000000 --- a/google_cloud_automlops/orchestration/kfp/builder.py +++ /dev/null @@ -1,409 +0,0 @@ -# Copyright 2024 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Builds KFP components and pipeline.""" - -# pylint: disable=line-too-long - -import json -try: - from importlib.resources import files as import_files -except ImportError: - # Try backported to PY<37 `importlib_resources` - from importlib_resources import files as import_files -import re -import textwrap - -from google_cloud_automlops.utils.utils import ( - execute_process, - get_components_list, - make_dirs, - read_file, - read_yaml_file, - render_jinja, - is_using_kfp_spec, - write_and_chmod, - write_file, - write_yaml_file -) -from google_cloud_automlops.utils.constants import ( - BASE_DIR, - GENERATED_BUILD_COMPONENTS_SH_FILE, - GENERATED_DEFAULTS_FILE, - GENERATED_COMPONENT_BASE, - GENERATED_LICENSE, - GENERATED_MODEL_MONITORING_SH_FILE, - GENERATED_MODEL_MONITORING_MONITOR_PY_FILE, - GENERATED_MODEL_MONITORING_REQUIREMENTS_FILE, - GENERATED_PARAMETER_VALUES_PATH, - GENERATED_PIPELINE_FILE, - GENERATED_PIPELINE_REQUIREMENTS_FILE, - GENERATED_PIPELINE_RUNNER_FILE, - GENERATED_PIPELINE_SPEC_SH_FILE, - GENERATED_PUBLISH_TO_TOPIC_FILE, - GENERATED_RUN_PIPELINE_SH_FILE, - GENERATED_RUN_ALL_SH_FILE, - KFP_TEMPLATES_PATH, - PINNED_KFP_VERSION, - PIPELINE_CACHE_FILE -) -from google_cloud_automlops.orchestration.configs import KfpConfig - -def build(config: KfpConfig): - """Constructs files for running and managing Kubeflow pipelines. - - Args: - config.base_image: The image to use in the component base dockerfile. - config.custom_training_job_specs: Specifies the specs to run the training job with. - config.pipeline_params: Dictionary containing runtime pipeline parameters. - config.pubsub_topic_name: The name of the pubsub topic to publish to. - config.setup_model_monitoring: Boolean parameter which specifies whether to set up a Vertex AI Model Monitoring Job. - config.use_ci: Flag that determines whether to use Cloud Run CI/CD. - """ - - # Write scripts for building pipeline, building components, running pipeline, and running all files - scripts_path = import_files(KFP_TEMPLATES_PATH + '.scripts') - - # Write script for building pipeline - write_and_chmod( - GENERATED_PIPELINE_SPEC_SH_FILE, - render_jinja( - template_path=scripts_path / 'build_pipeline_spec.sh.j2', - generated_license=GENERATED_LICENSE, - base_dir=BASE_DIR)) - - # Write script for building components - write_and_chmod( - GENERATED_BUILD_COMPONENTS_SH_FILE, - render_jinja( - template_path=scripts_path / 'build_components.sh.j2', - generated_license=GENERATED_LICENSE, - base_dir=BASE_DIR)) - - # Write script for running pipeline - write_and_chmod( - GENERATED_RUN_PIPELINE_SH_FILE, - render_jinja( - template_path=scripts_path / 'run_pipeline.sh.j2', - generated_license=GENERATED_LICENSE, - base_dir=BASE_DIR)) - - # Write script for running all files - write_and_chmod( - GENERATED_RUN_ALL_SH_FILE, - render_jinja( - template_path=scripts_path / 'run_all.sh.j2', - generated_license=GENERATED_LICENSE, - base_dir=BASE_DIR)) - - # If using CI, write script for publishing to pubsub topic - if config.use_ci: - write_and_chmod( - GENERATED_PUBLISH_TO_TOPIC_FILE, - render_jinja( - template_path=scripts_path / 'publish_to_topic.sh.j2', - base_dir=BASE_DIR, - generated_license=GENERATED_LICENSE, - generated_parameter_values_path=GENERATED_PARAMETER_VALUES_PATH, - pubsub_topic_name=config.pubsub_topic_name)) - - # If using model monitoring, write correspointing scripts to model_monitoring directory - if config.setup_model_monitoring: - # Writes script create_model_monitoring_job.sh which creates a Vertex AI model monitoring job - write_and_chmod( - filepath=GENERATED_MODEL_MONITORING_SH_FILE, - text=render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH + '.scripts') / 'create_model_monitoring_job.sh.j2', - generated_license=GENERATED_LICENSE, - base_dir=BASE_DIR - )) - - # Writes monitor.py to create or update a model monitoring job in Vertex AI for a deployed model endpoint - write_file( - filepath=GENERATED_MODEL_MONITORING_MONITOR_PY_FILE, - text=render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH + '.model_monitoring') / 'monitor.py.j2', - generated_license=GENERATED_LICENSE - ), - mode='w') - - # Writes a requirements.txt to the model_monitoring directory - write_file( - filepath=GENERATED_MODEL_MONITORING_REQUIREMENTS_FILE, - text=render_jinja(template_path=import_files(KFP_TEMPLATES_PATH + '.model_monitoring') / 'requirements.txt.j2'), - mode='w') - - # Create components and pipelines - components_path_list = get_components_list(full_path=True) - for path in components_path_list: - build_component(path) - build_pipeline(config.custom_training_job_specs, config.pipeline_params) - - # Write empty .gitkeep to pipeline_spec directory - write_file(f'{BASE_DIR}scripts/pipeline_spec/.gitkeep', '', 'w') - - # Write readme.md to description the contents of the directory - write_file( - f'{BASE_DIR}README.md', - render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH) / 'README.md.j2', - setup_model_monitoring=config.setup_model_monitoring, - use_ci=config.use_ci), - 'w') - - # Write dockerfile to the component base directory - write_file( - f'{GENERATED_COMPONENT_BASE}/Dockerfile', - render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH + '.components.component_base') / 'Dockerfile.j2', - base_image=config.base_image, - generated_license=GENERATED_LICENSE), - 'w') - - # Write requirements.txt to the component base directory - write_file(f'{GENERATED_COMPONENT_BASE}/requirements.txt', create_component_base_requirements(), 'w') - - # Build the submission service files - if config.use_ci: - build_services() - - -def build_component(component_path: str): - """Constructs and writes component.yaml and {component_name}.py files. - component.yaml: Contains the Kubeflow custom component definition. - {component_name}.py: Contains the python code from the Jupyter cell. - - Args: - component_path: Path to the temporary component yaml. This file - is used to create the permanent component.yaml, and deleted - after calling AutoMLOps.generate(). - """ - # Retrieve defaults vars - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - - # Read in component specs - component_spec = read_yaml_file(component_path) - kfp_spec_bool = is_using_kfp_spec(component_spec['implementation']['container']['image']) - custom_code_contents = component_spec['implementation']['container']['command'][-1] - compspec_image = ( - f'''{defaults['gcp']['artifact_repo_location']}-docker.pkg.dev/''' - f'''{defaults['gcp']['project_id']}/''' - f'''{defaults['gcp']['artifact_repo_name']}/''' - f'''{defaults['gcp']['naming_prefix']}/''' - f'''components/component_base:latest''') - - # If using kfp, remove spaces in name and convert to lowercase - if kfp_spec_bool: - component_spec['name'] = component_spec['name'].replace(' ', '_').lower() - - # Set and create directory for component, and set directory for task - component_dir = BASE_DIR + 'components/' + component_spec['name'] - make_dirs([component_dir]) - task_filepath = (BASE_DIR - + 'components/component_base/src/' - + component_spec['name'] - + '.py') - - # Write task script to component base - write_file( - task_filepath, - render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH + '.components.component_base.src') / 'task.py.j2', - custom_code_contents=custom_code_contents, - generated_license=GENERATED_LICENSE, - kfp_spec_bool=kfp_spec_bool), - 'w') - - # Update component_spec to include correct image and startup command - component_spec['implementation']['container']['image'] = compspec_image - component_spec['implementation']['container']['command'] = [ - 'python3', - f'''/pipelines/component/src/{component_spec['name']+'.py'}'''] - - # Write license and component spec to the appropriate component.yaml file - filename = component_dir + '/component.yaml' - write_file(filename, GENERATED_LICENSE, 'w') - write_yaml_file(filename, component_spec, 'a') - - -def build_pipeline(custom_training_job_specs: list, - pipeline_parameter_values: dict): - """Constructs and writes pipeline.py, pipeline_runner.py, and pipeline_parameter_values.json files. - pipeline.py: Generates a Kubeflow pipeline spec from custom components. - pipeline_runner.py: Sends a PipelineJob to Vertex AI using pipeline spec. - pipeline_parameter_values.json: Provides runtime parameters for the PipelineJob. - - Args: - custom_training_job_specs: Specifies the specs to run the training job with. - pipeline_parameter_values: Dictionary of runtime parameters for the PipelineJob. - """ - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - - # Get the names of the components - components_list = get_components_list(full_path=False) - - # Read pipeline definition - pipeline_scaffold_contents = read_file(PIPELINE_CACHE_FILE) - - # Add indentation - pipeline_scaffold_contents = textwrap.indent(pipeline_scaffold_contents, 4 * ' ') - - # Construct pipeline.py - project_id = defaults['gcp']['project_id'] - write_file( - GENERATED_PIPELINE_FILE, - render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'pipeline.py.j2', - components_list=components_list, - custom_training_job_specs=custom_training_job_specs, - generated_license=GENERATED_LICENSE, - pipeline_scaffold_contents=pipeline_scaffold_contents, - project_id=project_id), - 'w') - - # Construct pipeline_runner.py - write_file( - GENERATED_PIPELINE_RUNNER_FILE, - render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'pipeline_runner.py.j2', - generated_license=GENERATED_LICENSE), - 'w') - - # Construct requirements.txt - write_file( - GENERATED_PIPELINE_REQUIREMENTS_FILE, - render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH + '.pipelines') / 'requirements.txt.j2', - pinned_kfp_version=PINNED_KFP_VERSION), - 'w') - - # Add pipeline_spec_path to dict - pipeline_parameter_values['gs_pipeline_spec_path'] = defaults['pipelines']['gs_pipeline_job_spec_path'] - - # Construct pipeline_parameter_values.json - serialized_params = json.dumps(pipeline_parameter_values, indent=4) - write_file(BASE_DIR + GENERATED_PARAMETER_VALUES_PATH, serialized_params, 'w') - - -def build_services(): - """Constructs and writes a Dockerfile, requirements.txt, and - main.py to the services/submission_service directory. - """ - # Retrieve defaults vars - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - - # Set new folders as variables - submission_service_base = BASE_DIR + 'services/submission_service' - - # Write cloud run dockerfile - write_file( - f'{submission_service_base}/Dockerfile', - render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'Dockerfile.j2', - base_dir=BASE_DIR, - generated_license=GENERATED_LICENSE), - 'w') - - # Write requirements files for cloud run base and queueing svc - write_file( - f'{submission_service_base}/requirements.txt', - render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'requirements.txt.j2', - pinned_kfp_version=PINNED_KFP_VERSION, - pipeline_job_submission_service_type=defaults['gcp']['pipeline_job_submission_service_type']), - 'w') - - # Write main code files for cloud run base and queueing svc - write_file( - f'{submission_service_base}/main.py', - render_jinja( - template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'main.py.j2', - generated_license=GENERATED_LICENSE, - pipeline_root=defaults['pipelines']['pipeline_storage_path'], - pipeline_job_runner_service_account=defaults['gcp']['pipeline_job_runner_service_account'], - pipeline_job_submission_service_type=defaults['gcp']['pipeline_job_submission_service_type'], - project_id=defaults['gcp']['project_id'], - setup_model_monitoring=defaults['gcp']['setup_model_monitoring']), - 'w') - - -def create_component_base_requirements(): - """Writes a requirements.txt to the component_base directory. - Infers pip requirements from the python srcfiles using - pipreqs. Takes user-inputted requirements, and addes some - default gcp packages as well as packages that are often missing - in setup.py files (e.g db_types, pyarrow, gcsfs, fsspec). - """ - reqs_filename = f'{GENERATED_COMPONENT_BASE}/requirements.txt' - default_gcp_reqs = [ - 'google-cloud-aiplatform', - 'google-cloud-appengine-logging', - 'google-cloud-audit-log', - 'google-cloud-bigquery', - 'google-cloud-bigquery-storage', - 'google-cloud-bigtable', - 'google-cloud-core', - 'google-cloud-dataproc', - 'google-cloud-datastore', - 'google-cloud-dlp', - 'google-cloud-firestore', - 'google-cloud-kms', - 'google-cloud-language', - 'google-cloud-logging', - 'google-cloud-monitoring', - 'google-cloud-notebooks', - 'google-cloud-pipeline-components', - 'google-cloud-pubsub', - 'google-cloud-pubsublite', - 'google-cloud-recommendations-ai', - 'google-cloud-resource-manager', - 'google-cloud-scheduler', - 'google-cloud-spanner', - 'google-cloud-speech', - 'google-cloud-storage', - 'google-cloud-tasks', - 'google-cloud-translate', - 'google-cloud-videointelligence', - 'google-cloud-vision', - 'db_dtypes', - 'pyarrow', - 'gcsfs', - 'fsspec'] - # Get user-inputted requirements from the cache dir - user_inp_reqs = [] - components_path_list = get_components_list() - for component_path in components_path_list: - component_spec = read_yaml_file(component_path) - reqs = component_spec['implementation']['container']['command'][2] - formatted_reqs = re.findall('\'([^\']*)\'', reqs) - user_inp_reqs.extend(formatted_reqs) - # Check if user inputted requirements - if user_inp_reqs: - # Remove duplicates - set_of_requirements = set(user_inp_reqs) - else: - # If user did not input requirements, then infer reqs using pipreqs - execute_process(f'python3 -m pipreqs.pipreqs {GENERATED_COMPONENT_BASE} --mode no-pin --force', to_null=True) - pipreqs = read_file(reqs_filename).splitlines() - set_of_requirements = set(pipreqs + default_gcp_reqs) - # Remove empty string - if '' in set_of_requirements: - set_of_requirements.remove('') - # Pin kfp version - if 'kfp' in set_of_requirements: - set_of_requirements.remove('kfp') - set_of_requirements.add(PINNED_KFP_VERSION) - # Stringify and sort - reqs_str = ''.join(r+'\n' for r in sorted(set_of_requirements)) - return reqs_str diff --git a/google_cloud_automlops/orchestration/kfp/scaffold.py b/google_cloud_automlops/orchestration/kfp/scaffold.py deleted file mode 100644 index 71a93e1..0000000 --- a/google_cloud_automlops/orchestration/kfp/scaffold.py +++ /dev/null @@ -1,253 +0,0 @@ -# Copyright 2024 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Builds temporary component scaffold yaml files.""" - -# pylint: disable=anomalous-backslash-in-string -# pylint: disable=C0103 -# pylint: disable=line-too-long - -import inspect -from typing import Callable, List, Optional, TypeVar, Union - -import docstring_parser - -from google_cloud_automlops.utils.constants import ( - DEFAULT_PIPELINE_NAME, - PLACEHOLDER_IMAGE, - PIPELINE_CACHE_FILE, - CACHE_DIR -) -from google_cloud_automlops.utils.utils import ( - get_function_source_definition, - make_dirs, - update_params, - write_file, - write_yaml_file -) - -T = TypeVar('T') - - -def create_component_scaffold(func: Optional[Callable] = None, - *, - packages_to_install: Optional[List[str]] = None): - """Creates a tmp component scaffold which will be used by the formalize function. - Code is temporarily stored in component_spec['implementation']['container']['command']. - - Args: - func: The python function to create a component from. The function - should have type annotations for all its arguments, indicating how - it is intended to be used (e.g. as an input/output Artifact object, - a plain parameter, or a path to a file). - packages_to_install: A list of optional packages to install before - executing func. These will always be installed at component runtime. - """ - # Extract name, docstring, and component description - name = func.__name__ - parsed_docstring = docstring_parser.parse(inspect.getdoc(func)) - description = parsed_docstring.short_description - - # Instantiate component yaml attributes - component_spec = {} - component_spec['name'] = name - if description: - component_spec['description'] = description - outputs = get_function_return_types(func) - if outputs: - component_spec['outputs'] = outputs - component_spec['inputs'] = get_function_parameters(func) - component_spec['implementation'] = {} - component_spec['implementation']['container'] = {} - component_spec['implementation']['container']['image'] = PLACEHOLDER_IMAGE - component_spec['implementation']['container']['command'] = get_packages_to_install_command(func, packages_to_install) - component_spec['implementation']['container']['args'] = ['--executor_input', - {'executorInput': None}, - '--function_to_execute', - name] - # Write component yaml - filename = CACHE_DIR + f'/{name}.yaml' - make_dirs([CACHE_DIR]) - write_yaml_file(filename, component_spec, 'w') - - -def get_packages_to_install_command(func: Optional[Callable] = None, - packages_to_install: Optional[List[str]] = None): - """Returns a list of formatted list of commands, including code for tmp storage. - - Args: - func: The python function to create a component from. The function - should have type annotations for all its arguments, indicating how - it is intended to be used (e.g. as an input/output Artifact object, - a plain parameter, or a path to a file). - packages_to_install: A list of optional packages to install before - executing func. These will always be installed at component runtime. - """ - newline = '\n' - if not packages_to_install: - packages_to_install = [] - concat_package_list = ' '.join([repr(str(package)) for package in packages_to_install]) - install_python_packages_script = ( - f'''if ! [ -x "$(command -v pip)" ]; then{newline}''' - f''' python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip{newline}''' - f'''fi{newline}''' - f'''PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet \{newline}''' - f''' --no-warn-script-location {concat_package_list} && "$0" "$@"{newline}''' - f'''{newline}''') - src_code = get_function_source_definition(func) - return ['sh', '-c', install_python_packages_script, src_code] - - -def get_function_return_types(func: Callable) -> list: - """Returns a formatted list of function return types. - - Args: - func: The python function to create a component from. The function - can optionally have type annotations for its return values. - Returns: - list: return value list with types converted to kubeflow spec. - Raises: - Exception: If return type is provided and not a NamedTuple. - """ - annotation = inspect.signature(func).return_annotation - if maybe_strip_optional_from_annotation(annotation) is not annotation: - raise TypeError('Return type cannot be Optional.') - - # No annotations provided - # pylint: disable=protected-access - if annotation == inspect._empty: - return None - - if not (hasattr(annotation,'__annotations__') and isinstance(annotation.__annotations__, dict)): - raise TypeError(f'''Return type hint for function "{func.__name__}" must be a NamedTuple.''') - - outputs = [] - for name, type_ in annotation.__annotations__.items(): - metadata = {} - metadata['name'] = name - metadata['type'] = type_ - metadata['description'] = None - outputs.append(metadata) - return update_params(outputs) - - -def get_function_parameters(func: Callable) -> list: - """Returns a formatted list of parameters. - - Args: - func: The python function to create a component from. The function - should have type annotations for all its arguments, indicating how - it is intended to be used (e.g. as an input/output Artifact object, - a plain parameter, or a path to a file). - Returns: - list: Params list with types converted to kubeflow spec. - Raises: - Exception: If parameter type hints are not provided. - """ - signature = inspect.signature(func) - parameters = list(signature.parameters.values()) - parsed_docstring = docstring_parser.parse(inspect.getdoc(func)) - doc_dict = {p.arg_name: p.description for p in parsed_docstring.params} - - # Extract parameter metadata - parameter_holder = [] - for param in parameters: - metadata = {} - metadata['name'] = param.name - metadata['description'] = doc_dict.get(param.name) - metadata['type'] = maybe_strip_optional_from_annotation( - param.annotation) - parameter_holder.append(metadata) - # pylint: disable=protected-access - if metadata['type'] == inspect._empty: - raise TypeError( - f'''Missing type hint for parameter "{metadata['name']}". ''' - f'''Please specify the type for this parameter.''') - return update_params(parameter_holder) - - -def maybe_strip_optional_from_annotation(annotation: T) -> T: - """Strips 'Optional' from 'Optional[]' if applicable. - For example:: - Optional[str] -> str - str -> str - List[int] -> List[int] - Args: - annotation: The original type annotation which may or may not has `Optional`. - Returns: - The type inside Optional[] if Optional exists, otherwise the original type. - """ - if getattr(annotation, '__origin__', None) is Union and annotation.__args__[1] is type(None): - return annotation.__args__[0] - else: - return annotation - - -def create_pipeline_scaffold(func: Optional[Callable] = None, - *, - name: Optional[str] = None, - description: Optional[str] = None): - """Creates a temporary pipeline scaffold which will - be used by the formalize function. - - Args: - func: The python function to create a pipeline from. The function - should have type annotations for all its arguments, indicating how - it is intended to be used (e.g. as an input/output Artifact object, - a plain parameter, or a path to a file). - name: The name of the pipeline. - description: Short description of what the pipeline does. - """ - pipeline_scaffold = (get_pipeline_decorator(name, description) + - get_function_source_definition(func) + - get_compile_step(func.__name__)) - make_dirs([CACHE_DIR]) # if it doesn't already exist - write_file(PIPELINE_CACHE_FILE, pipeline_scaffold, 'w') - - -def get_pipeline_decorator(name: Optional[str] = None, - description: Optional[str] = None): - """Creates the kfp pipeline decorator. - - Args: - name: The name of the pipeline. - description: Short description of what the pipeline does. - - Returns: - str: Python compile function call. - """ - default_name = DEFAULT_PIPELINE_NAME if not name else name - name_str = f'''(\n name='{default_name}',\n''' - desc_str = f''' description='{description}',\n''' if description else '' - ending_str = ')\n' - return '@dsl.pipeline' + name_str + desc_str + ending_str - - -def get_compile_step(func_name: str): - """Creates the compile function call. - - Args: - func_name: The name of the pipeline function. - - Returns: - str: Python compile function call. - """ - return ( - f'\n' - f'compiler.Compiler().compile(\n' - f' pipeline_func={func_name},\n' - f' package_path=pipeline_job_spec_path)\n' - f'\n' - ) - diff --git a/google_cloud_automlops/orchestration/ray/.gitkeep b/google_cloud_automlops/orchestration/ray/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/google_cloud_automlops/orchestration/kfp/__init__.py b/google_cloud_automlops/orchestration/templates/__init__.py similarity index 100% rename from google_cloud_automlops/orchestration/kfp/__init__.py rename to google_cloud_automlops/orchestration/templates/__init__.py diff --git a/google_cloud_automlops/orchestration/kfp/templates/README.md.j2 b/google_cloud_automlops/orchestration/templates/kfp/README.md.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/README.md.j2 rename to google_cloud_automlops/orchestration/templates/kfp/README.md.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/__init__.py b/google_cloud_automlops/orchestration/templates/kfp/__init__.py similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/__init__.py rename to google_cloud_automlops/orchestration/templates/kfp/__init__.py diff --git a/google_cloud_automlops/orchestration/kfp/templates/components/__init__.py b/google_cloud_automlops/orchestration/templates/kfp/components/__init__.py similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/components/__init__.py rename to google_cloud_automlops/orchestration/templates/kfp/components/__init__.py diff --git a/google_cloud_automlops/orchestration/kfp/templates/components/component_base/Dockerfile.j2 b/google_cloud_automlops/orchestration/templates/kfp/components/component_base/Dockerfile.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/components/component_base/Dockerfile.j2 rename to google_cloud_automlops/orchestration/templates/kfp/components/component_base/Dockerfile.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/components/component_base/__init__.py b/google_cloud_automlops/orchestration/templates/kfp/components/component_base/__init__.py similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/components/component_base/__init__.py rename to google_cloud_automlops/orchestration/templates/kfp/components/component_base/__init__.py diff --git a/google_cloud_automlops/orchestration/kfp/templates/components/component_base/src/__init__.py b/google_cloud_automlops/orchestration/templates/kfp/components/component_base/src/__init__.py similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/components/component_base/src/__init__.py rename to google_cloud_automlops/orchestration/templates/kfp/components/component_base/src/__init__.py diff --git a/google_cloud_automlops/orchestration/kfp/templates/components/component_base/src/task.py.j2 b/google_cloud_automlops/orchestration/templates/kfp/components/component_base/src/task.py.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/components/component_base/src/task.py.j2 rename to google_cloud_automlops/orchestration/templates/kfp/components/component_base/src/task.py.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/model_monitoring/__init__.py b/google_cloud_automlops/orchestration/templates/kfp/model_monitoring/__init__.py similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/model_monitoring/__init__.py rename to google_cloud_automlops/orchestration/templates/kfp/model_monitoring/__init__.py diff --git a/google_cloud_automlops/orchestration/kfp/templates/model_monitoring/monitor.py.j2 b/google_cloud_automlops/orchestration/templates/kfp/model_monitoring/monitor.py.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/model_monitoring/monitor.py.j2 rename to google_cloud_automlops/orchestration/templates/kfp/model_monitoring/monitor.py.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/model_monitoring/requirements.txt.j2 b/google_cloud_automlops/orchestration/templates/kfp/model_monitoring/requirements.txt.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/model_monitoring/requirements.txt.j2 rename to google_cloud_automlops/orchestration/templates/kfp/model_monitoring/requirements.txt.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/pipelines/__init__.py b/google_cloud_automlops/orchestration/templates/kfp/pipelines/__init__.py similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/pipelines/__init__.py rename to google_cloud_automlops/orchestration/templates/kfp/pipelines/__init__.py diff --git a/google_cloud_automlops/orchestration/kfp/templates/pipelines/pipeline.py.j2 b/google_cloud_automlops/orchestration/templates/kfp/pipelines/pipeline.py.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/pipelines/pipeline.py.j2 rename to google_cloud_automlops/orchestration/templates/kfp/pipelines/pipeline.py.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/pipelines/pipeline_runner.py.j2 b/google_cloud_automlops/orchestration/templates/kfp/pipelines/pipeline_runner.py.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/pipelines/pipeline_runner.py.j2 rename to google_cloud_automlops/orchestration/templates/kfp/pipelines/pipeline_runner.py.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/pipelines/requirements.txt.j2 b/google_cloud_automlops/orchestration/templates/kfp/pipelines/requirements.txt.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/pipelines/requirements.txt.j2 rename to google_cloud_automlops/orchestration/templates/kfp/pipelines/requirements.txt.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/scripts/__init__.py b/google_cloud_automlops/orchestration/templates/kfp/scripts/__init__.py similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/scripts/__init__.py rename to google_cloud_automlops/orchestration/templates/kfp/scripts/__init__.py diff --git a/google_cloud_automlops/orchestration/kfp/templates/scripts/build_components.sh.j2 b/google_cloud_automlops/orchestration/templates/kfp/scripts/build_components.sh.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/scripts/build_components.sh.j2 rename to google_cloud_automlops/orchestration/templates/kfp/scripts/build_components.sh.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/scripts/build_pipeline_spec.sh.j2 b/google_cloud_automlops/orchestration/templates/kfp/scripts/build_pipeline_spec.sh.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/scripts/build_pipeline_spec.sh.j2 rename to google_cloud_automlops/orchestration/templates/kfp/scripts/build_pipeline_spec.sh.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/scripts/create_model_monitoring_job.sh.j2 b/google_cloud_automlops/orchestration/templates/kfp/scripts/create_model_monitoring_job.sh.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/scripts/create_model_monitoring_job.sh.j2 rename to google_cloud_automlops/orchestration/templates/kfp/scripts/create_model_monitoring_job.sh.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/scripts/publish_to_topic.sh.j2 b/google_cloud_automlops/orchestration/templates/kfp/scripts/publish_to_topic.sh.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/scripts/publish_to_topic.sh.j2 rename to google_cloud_automlops/orchestration/templates/kfp/scripts/publish_to_topic.sh.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/scripts/run_all.sh.j2 b/google_cloud_automlops/orchestration/templates/kfp/scripts/run_all.sh.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/scripts/run_all.sh.j2 rename to google_cloud_automlops/orchestration/templates/kfp/scripts/run_all.sh.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/scripts/run_pipeline.sh.j2 b/google_cloud_automlops/orchestration/templates/kfp/scripts/run_pipeline.sh.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/scripts/run_pipeline.sh.j2 rename to google_cloud_automlops/orchestration/templates/kfp/scripts/run_pipeline.sh.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/services/__init__.py b/google_cloud_automlops/orchestration/templates/kfp/services/__init__.py similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/services/__init__.py rename to google_cloud_automlops/orchestration/templates/kfp/services/__init__.py diff --git a/google_cloud_automlops/orchestration/kfp/templates/services/submission_service/Dockerfile.j2 b/google_cloud_automlops/orchestration/templates/kfp/services/submission_service/Dockerfile.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/services/submission_service/Dockerfile.j2 rename to google_cloud_automlops/orchestration/templates/kfp/services/submission_service/Dockerfile.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/services/submission_service/__init__.py b/google_cloud_automlops/orchestration/templates/kfp/services/submission_service/__init__.py similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/services/submission_service/__init__.py rename to google_cloud_automlops/orchestration/templates/kfp/services/submission_service/__init__.py diff --git a/google_cloud_automlops/orchestration/kfp/templates/services/submission_service/main.py.j2 b/google_cloud_automlops/orchestration/templates/kfp/services/submission_service/main.py.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/services/submission_service/main.py.j2 rename to google_cloud_automlops/orchestration/templates/kfp/services/submission_service/main.py.j2 diff --git a/google_cloud_automlops/orchestration/kfp/templates/services/submission_service/requirements.txt.j2 b/google_cloud_automlops/orchestration/templates/kfp/services/submission_service/requirements.txt.j2 similarity index 100% rename from google_cloud_automlops/orchestration/kfp/templates/services/submission_service/requirements.txt.j2 rename to google_cloud_automlops/orchestration/templates/kfp/services/submission_service/requirements.txt.j2 diff --git a/google_cloud_automlops/orchestration/tfx/.gitkeep b/google_cloud_automlops/orchestration/tfx/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/google_cloud_automlops/utils/constants.py b/google_cloud_automlops/utils/constants.py index 2425be2..68f9149 100644 --- a/google_cloud_automlops/utils/constants.py +++ b/google_cloud_automlops/utils/constants.py @@ -133,7 +133,7 @@ TERRAFORM_TEMPLATES_PATH = 'google_cloud_automlops.provisioning.terraform.templates' PULUMI_TEMPLATES_PATH = 'google_cloud_automlops.provisioning.pulumi.templates' GCLOUD_TEMPLATES_PATH = 'google_cloud_automlops.provisioning.gcloud.templates' -KFP_TEMPLATES_PATH = 'google_cloud_automlops.orchestration.kfp.templates' +KFP_TEMPLATES_PATH = 'google_cloud_automlops.orchestration.templates.kfp' CLOUDBUILD_TEMPLATES_PATH = 'google_cloud_automlops.deployments.cloudbuild.templates' GITHUB_ACTIONS_TEMPLATES_PATH = 'google_cloud_automlops.deployments.github_actions.templates' GITOPS_TEMPLATES_PATH = 'google_cloud_automlops.deployments.gitops.templates' diff --git a/google_cloud_automlops/orchestration/enums.py b/google_cloud_automlops/utils/enums.py similarity index 82% rename from google_cloud_automlops/orchestration/enums.py rename to google_cloud_automlops/utils/enums.py index da09732..ac93951 100644 --- a/google_cloud_automlops/orchestration/enums.py +++ b/google_cloud_automlops/utils/enums.py @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC. All Rights Reserved. +# Copyright 2023 Google LLC. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Sets global enums.""" +"""Creates enums for orchestrator and submission service options as well as generic component, +pipeline, and services objects.""" +# pylint: disable=anomalous-backslash-in-string # pylint: disable=C0103 # pylint: disable=line-too-long @@ -29,6 +31,7 @@ class Orchestrator(Enum): # AIRFLOW = 'airflow' # roadmap item # RAY = 'ray' # roadmap item + class PipelineJobSubmitter(Enum): """Enum representing the available options for the Pipeline Job submission service.""" diff --git a/google_cloud_automlops/utils/utils.py b/google_cloud_automlops/utils/utils.py index 28f4a7b..0f5688b 100644 --- a/google_cloud_automlops/utils/utils.py +++ b/google_cloud_automlops/utils/utils.py @@ -47,22 +47,24 @@ PLACEHOLDER_IMAGE ) +from google_cloud_automlops.utils.enums import ( + Orchestrator, + PipelineJobSubmitter +) + from google_cloud_automlops.deployments.enums import ( ArtifactRepository, CodeRepository, Deployer ) from google_cloud_automlops.provisioning.enums import Provisioner -from google_cloud_automlops.orchestration.enums import ( - Orchestrator, - PipelineJobSubmitter -) + def make_dirs(directories: list): """Makes directories with the specified names. Args: - directories: Path of the directories to make. + directories (list): Path of the directories to make. """ for d in directories: try: @@ -72,13 +74,14 @@ def make_dirs(directories: list): def read_yaml_file(filepath: str) -> dict: - """Reads a yaml and returns file contents as a dict. - Defaults to utf-8 encoding. + """Reads a yaml and returns file contents as a dict. Defaults to utf-8 encoding. Args: - filepath: Path to the yaml. + filepath (str): Path to the yaml. + Returns: dict: Contents of the yaml. + Raises: Exception: If an error is encountered reading the file. """ @@ -95,11 +98,12 @@ def write_yaml_file(filepath: str, contents: dict, mode: str): """Writes a dictionary to yaml. Defaults to utf-8 encoding. Args: - filepath: Path to the file. - contents: Dictionary to be written to yaml. - mode: Read/write mode to be used. + filepath (str): Path to the file. + contents (dict): Dictionary to be written to yaml. + mode (str): Read/write mode to be used. + Raises: - Exception: If an error is encountered writing the file. + Exception: An error is encountered while writing the file. """ try: with open(filepath, mode, encoding='utf-8') as file: @@ -110,15 +114,16 @@ def write_yaml_file(filepath: str, contents: dict, mode: str): def read_file(filepath: str) -> str: - """Reads a file and returns contents as a string. - Defaults to utf-8 encoding. + """Reads a file and returns contents as a string. Defaults to utf-8 encoding. Args: - filepath: Path to the file. + filepath (str): Path to the file. + Returns: str: Contents of the file. + Raises: - Exception: If an error is encountered reading the file. + Exception: An error is encountered while reading the file. """ try: with open(filepath, 'r', encoding='utf-8') as file: @@ -133,11 +138,12 @@ def write_file(filepath: str, text: str, mode: str): """Writes a file at the specified path. Defaults to utf-8 encoding. Args: - filepath: Path to the file. - text: Text to be written to file. - mode: Read/write mode to be used. + filepath (str): Path to the file. + text (str): Text to be written to file. + mode (str): Read/write mode to be used. + Raises: - Exception: If an error is encountered writing the file. + Exception: An error is encountered writing the file. """ try: with open(filepath, mode, encoding='utf-8') as file: @@ -148,14 +154,14 @@ def write_file(filepath: str, text: str, mode: str): def write_and_chmod(filepath: str, text: str): - """Writes a file at the specified path and chmods the file - to allow for execution. + """Writes a file at the specified path and chmods the file to allow for execution. Args: - filepath: Path to the file. - text: Text to be written to file. + filepath (str): Path to the file. + text (str): Text to be written to file. + Raises: - Exception: If an error is encountered chmod-ing the file. + Exception: An error is encountered while chmod-ing the file. """ write_file(filepath, text, 'w') try: @@ -166,11 +172,10 @@ def write_and_chmod(filepath: str, text: str): def delete_file(filepath: str): - """Deletes a file at the specified path. - If it does not exist, pass. + """Deletes a file at the specified path. If it does not exist, pass. Args: - filepath: Path to the file. + filepath (str): Path to the file. """ try: os.remove(filepath) @@ -179,11 +184,12 @@ def delete_file(filepath: str): def get_components_list(full_path: bool = True) -> list: - """Reads yamls in the cache directory, verifies they are component - yamls, and returns the name of the files. + """Reads yamls in the cache directory, verifies they are component yamls, and returns the name + of the files. Args: - full_path: Boolean; if false, stores only the filename w/o extension. + full_path (bool): If false, stores only the filename w/o extension. + Returns: list: Contains the names or paths of all component yamls in the dir. """ @@ -203,7 +209,8 @@ def is_component_config(filepath: str) -> bool: """Checks to see if the given file is a component yaml. Args: - filepath: Path to a yaml file. + filepath (str): Path to a yaml file. + Returns: bool: Whether the given file is a component yaml. """ @@ -216,10 +223,11 @@ def execute_process(command: str, to_null: bool): """Executes an external shell process. Args: - command: The string of the command to execute. - to_null: Determines where to send output. + command (str): Command to execute. + to_null (bool): Determines where to send output. + Raises: - Exception: If an error occurs in executing the script. + Exception: An error occured while executing the script. """ stdout = subprocess.DEVNULL if to_null else None try: @@ -234,16 +242,18 @@ def execute_process(command: str, to_null: bool): def validate_use_ci(setup_model_monitoring: bool, schedule_pattern: str, use_ci: str): """Validates that the inputted schedule parameter and model_monitoring parameter align with the - use_ci configuration. + use_ci configuration. + Note: this function does not validate that schedule_pattern is a properly formatted cron value. Cron format validation is done in the backend by GCP. Args: - setup_model_monitoring: Boolean parameter which specifies whether to set up a Vertex AI Model Monitoring Job. - schedule_pattern: Cron formatted value used to create a Scheduled retrain job. - use_ci: Flag that determines whether to use Cloud CI/CD. + setup_model_monitoring (bool): Specifies whether to set up a Vertex AI Model Monitoring Job. + schedule_pattern (str): Cron formatted value used to create a Scheduled retrain job. + use_ci (bool): Specifies whether to use Cloud CI/CD. + Raises: - Exception: If use_ci validation fails. + Exception: use_ci validation failed. """ if setup_model_monitoring and not use_ci: raise ValueError('use_ci must be set to True to use Model Monitoring.') @@ -251,51 +261,19 @@ def validate_use_ci(setup_model_monitoring: bool, schedule_pattern: str, use_ci: raise ValueError('use_ci must be set to True to use Cloud Scheduler.') -def update_params(params: list) -> list: - """Converts the parameter types from Python types - to Kubeflow types. Currently only supports - Python primitive types. - - Args: - params: Pipeline parameters. A list of dictionaries, - each param is a dict containing keys: - 'name': required, str param name. - 'type': required, python primitive type. - 'description': optional, str param desc. - Returns: - list: Params list with converted types. - Raises: - Exception: If an inputted type is not a primitive. - """ - python_kfp_types_mapper = { - int: 'Integer', - str: 'String', - float: 'Float', - bool: 'Boolean', - list: 'JsonArray', - dict: 'JsonObject' - } - for param in params: - try: - param['type'] = python_kfp_types_mapper[param['type']] - except KeyError as err: - raise ValueError(f'Unsupported python type - we only support ' - f'primitive types at this time. {err}') from err - return params - - def get_function_source_definition(func: Callable) -> str: """Returns a formatted string of the source code. Args: - func: The python function to create a component from. The function - should have type annotations for all its arguments, indicating how - it is intended to be used (e.g. as an input/output Artifact object, - a plain parameter, or a path to a file). + func (Callable): The python function to create a component from. The function should have + type annotations for all its arguments, indicating how it is intended to be used (e.g. + as an input/output Artifact object, a plain parameter, or a path to a file). + Returns: str: The source code from the inputted function. + Raises: - Exception: If the preprocess operates failed. + Exception: The preprocess operations failed. """ source_code = inspect.getsource(func) source_code = textwrap.dedent(source_code) @@ -314,7 +292,7 @@ def stringify_job_spec_list(job_spec_list: list) -> list: """Takes in a list of job spec dictionaries and turns them into strings. Args: - job_spec: Dictionary with job spec info. e.g. + job_spec (list): Dictionary with job spec info. e.g. custom_training_job_specs = [{ 'component_spec': 'train_model', 'display_name': 'train-model-accelerated', @@ -326,6 +304,8 @@ def stringify_job_spec_list(job_spec_list: list) -> list: Returns: list[str]: Python formatted dictionary code. """ + if not job_spec_list: + return None output = [] for spec in job_spec_list: mapping = {} @@ -343,10 +323,10 @@ def is_using_kfp_spec(image: str) -> bool: """Takes in an image string from a component yaml and determines if it came from kfp or not. Args: - image: image string. + image (str): Image string. #TODO: make this more informative Returns: - bool: is the component using kfp spec. + bool: Whether the component using kfp spec. """ return image != PLACEHOLDER_IMAGE @@ -378,41 +358,43 @@ def create_default_config(artifact_repo_location: str, storage_bucket_name: str, use_ci: bool, vpc_connector: str) -> dict: - """Creates defaults.yaml file contents as a dict. This defaults - file is used by subsequent functions and by the pipeline - files themselves. + """Creates defaults.yaml file contents as a dict. This defaults file is used by subsequent + functions and by the pipeline files themselves. Args: - artifact_repo_location: Region of the artifact repo (default use with Artifact Registry). - artifact_repo_name: Artifact repo name where components are stored (default use with Artifact Registry). - artifact_repo_type: The type of artifact repository to use (e.g. Artifact Registry, JFrog, etc.) - base_image: The image to use in the component base dockerfile. - build_trigger_location: The location of the build trigger (for cloud build). - build_trigger_name: The name of the build trigger (for cloud build). - deployment_framework: The CI tool to use (e.g. cloud build, github actions, etc.) - naming_prefix: Unique value used to differentiate pipelines and services across AutoMLOps runs. - orchestration_framework: The orchestration framework to use (e.g. kfp, tfx, etc.) - pipeline_job_runner_service_account: Service Account to run PipelineJobs. - pipeline_job_submission_service_location: The location of the cloud submission service. - pipeline_job_submission_service_name: The name of the cloud submission service. - pipeline_job_submission_service_type: The tool to host for the cloud submission service (e.g. cloud run, cloud functions). - project_id: The project ID. - provisioning_framework: The IaC tool to use (e.g. Terraform, Pulumi, etc.) - pubsub_topic_name: The name of the pubsub topic to publish to. - schedule_location: The location of the scheduler resource. - schedule_name: The name of the scheduler resource. - schedule_pattern: Cron formatted value used to create a Scheduled retrain job. - setup_model_monitoring: Boolean parameter which specifies whether to set up a Vertex AI Model Monitoring Job. - source_repo_branch: The branch to use in the source repository. - source_repo_name: The name of the source repository to use. - source_repo_type: The type of source repository to use (e.g. gitlab, github, etc.) - storage_bucket_location: Region of the GS bucket. - storage_bucket_name: GS bucket name where pipeline run metadata is stored. - use_ci: Flag that determines whether to use Cloud CI/CD. - vpc_connector: The name of the vpc connector to use. + artifact_repo_location (str): Region of the artifact repo (default use with Artifact Registry). + artifact_repo_name (str): Artifact repo name where components are stored (default use with + Artifact Registry). + artifact_repo_type (str): Type of artifact repository to use (e.g. Artifact Registry, JFrog, etc.) + base_image (str): Image to use in the component base dockerfile. + build_trigger_location (str): Location of the build trigger (for cloud build). + build_trigger_name (str): Name of the build trigger (for cloud build). + deployment_framework (str): Name of CI tool to use (e.g. cloud build, github actions, etc.) + naming_prefix (str): Unique value used to differentiate pipelines and services across + AutoMLOps runs. + orchestration_framework (str): Orchestration framework to use (e.g. kfp, tfx, etc.) + pipeline_job_runner_service_account (str): Service Account to run PipelineJobs. + pipeline_job_submission_service_location (str): Location of the cloud submission service. + pipeline_job_submission_service_name (str): Name of the cloud submission service. + pipeline_job_submission_service_type (str): Tool to host for the cloud submission service + (e.g. cloud run, cloud functions). + project_id (str): The project ID. + provisioning_framework (str): IaC tool to use (e.g. Terraform, Pulumi, etc.) + pubsub_topic_name (str): Name of the pubsub topic to publish to. + schedule_location (str): Location of the scheduler resource. + schedule_name (str): Name of the scheduler resource. + schedule_pattern (str): Cron formatted value used to create a Scheduled retrain job. + setup_model_monitoring (bool): Specifies whether to set up a Vertex AI Model Monitoring Job. + source_repo_branch (str): Branch to use in the source repository. + source_repo_name (str): Name of the source repository to use. + source_repo_type (str): Type of source repository to use (e.g. gitlab, github, etc.) + storage_bucket_location (str): Region of the GS bucket. + storage_bucket_name (str): GS bucket name where pipeline run metadata is stored. + use_ci (bool): Specifies whether to use Cloud CI/CD. + vpc_connector (str): Name of the vpc connector to use. Returns: - dict: Defaults yaml file content + dict: Defaults yaml file content. """ defaults = {} defaults['gcp'] = {} @@ -459,7 +441,7 @@ def create_default_config(artifact_repo_location: str, defaults['tooling']['use_ci'] = use_ci if setup_model_monitoring: - # These fields to be set when AutoMLOps.monitor() is called + # These fields will be set up if and when AutoMLOps.monitor() is called defaults['monitoring'] = {} defaults['monitoring']['target_field'] = None defaults['monitoring']['model_endpoint'] = None @@ -479,14 +461,14 @@ def create_default_config(artifact_repo_location: str, def get_required_apis(defaults: dict) -> list: - """Returns the list of required APIs based on the user tooling selection - determined during the generate() step. + """Returns the list of required APIs based on the user tooling selection determined during + the generate() step. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). Returns: - list: The list of required APIs + list: Required APIs. """ required_apis = [ 'cloudbuild.googleapis.com', @@ -517,15 +499,14 @@ def get_required_apis(defaults: dict) -> list: def get_provision_min_permissions(defaults: dict) -> list: - """Returns the list of minimum required permissions to run - the provision() step based on the user tooling selection - determined during the generate() step. + """Returns the list of minimum required permissions to run the provision() step based on the + user tooling selection determined during the generate() step. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). Returns: - list: The list of required permissions + list: Required permissions. """ required_permissions = [ 'serviceusage.services.enable', @@ -555,16 +536,15 @@ def get_provision_min_permissions(defaults: dict) -> list: def get_provision_recommended_roles(defaults: dict) -> list: - """Returns the list of recommended roles to run - the provision() step based on the user tooling selection - determined during the generate() step. These roles have - the minimum permissions required for provision. + """Creates the list of recommended roles to run the provision() step based on the user tooling + selection determined during the generate() step. These roles have the minimum permissions + required for provision. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). Returns: - list: The list of recommended roles + list: Recommended provision roles. """ recommended_roles = [ 'roles/serviceusage.serviceUsageAdmin', @@ -590,17 +570,16 @@ def get_provision_recommended_roles(defaults: dict) -> list: def get_deploy_with_precheck_min_permissions(defaults: dict) -> list: - """Returns the list of minimum required permissions to run - the deploy() step based on the user tooling selection - determined during the generate() step. This function is called - when precheck=True, which makes several API calls to determine if the infra - exists to run deploy() and increases the required list of permissions. + """Creates the list of minimum required permissions to run the deploy() step based on the user + tooling selection, determined during the generate() step. This function is called when + precheck=True, which makes several API calls to determine if the infra exists to run deploy() + and increases the required list of permissions. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). Returns: - list: The list of minimum permissions to deploy with precheck=True + list: Minimum permissions to deploy with precheck=True. """ recommended_permissions = [ 'serviceusage.services.get', @@ -625,17 +604,16 @@ def get_deploy_with_precheck_min_permissions(defaults: dict) -> list: def get_deploy_with_precheck_recommended_roles(defaults: dict) -> list: - """Returns the list of recommended roles to run - the deploy() step based on the user tooling selection - determined during the generate() step. This function is called - when precheck=True, which makes several API calls to determine if the infra - exists to run deploy() and increases the required list of permissions. + """Returns the list of recommended roles to run the deploy() step based on the user tooling + selection determined during the generate() step. This function is called when precheck=True, + which makes several API calls to determine if the infra exists to run deploy() and increases the + required list of permissions. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). Returns: - list: The list of recommended roles to deploy with precheck=True + list: Recommended roles to deploy with precheck=True. """ recommended_roles = [ 'roles/serviceusage.serviceUsageViewer', @@ -660,16 +638,15 @@ def get_deploy_with_precheck_recommended_roles(defaults: dict) -> list: def get_deploy_without_precheck_min_permissions(defaults: dict) -> list: - """Returns the list of minimum required permissions to run - the deploy() step based on the user tooling selection - determined during the generate() step. This function is called - when precheck=False, which decreases the required list of permissions. + """Creates the list of minimum required permissions to run the deploy() step based on the user + tooling selection determined during the generate() step. This function is called when + precheck=False, which decreases the required list of permissions. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). Returns: - list: The list of minimum permissions to deploy with precheck=False + list: Minimum permissions to deploy with precheck=False. """ recommended_permissions = [] if defaults['tooling']['use_ci']: @@ -681,16 +658,15 @@ def get_deploy_without_precheck_min_permissions(defaults: dict) -> list: def get_deploy_without_precheck_recommended_roles(defaults: dict) -> list: - """Returns the list of recommended roles to run - the deploy() step based on the user tooling selection - determined during the generate() step. This function is called - when precheck=False, which decreases the required list of permissions. + """Creates the list of recommended roles to run the deploy() step based on the user tooling + selection determined during the generate() step. This function is called when precheck=False, + which decreases the required list of permissions. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). Returns: - list: The list of recommended roles to deploy with precheck=False + list: Recommended roles to deploy with precheck=False. """ recommended_roles = [] if defaults['tooling']['use_ci']: @@ -702,15 +678,14 @@ def get_deploy_without_precheck_recommended_roles(defaults: dict) -> list: def get_model_monitoring_min_permissions(defaults: dict) -> list: - """Returns the list of minimum required permissions to run - the monitor() step based on the user tooling selection - determined during the generate() step. + """Creates the list of minimum required permissions to run the monitor() step based on the user + tooling selection determined during the generate() step. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). Returns: - list: The list of minimum permissions to create a monitoring job. + list: Minimum permissions to create a monitoring job. """ recommended_permissions = [ 'aiplatform.endpoints.list', @@ -724,15 +699,14 @@ def get_model_monitoring_min_permissions(defaults: dict) -> list: def get_model_monitoring_recommended_roles(defaults: dict) -> list: - """Returns the list of recommended roles to run - the monitor() step based on the user tooling selection - determined during the generate() step. + """Creates the list of recommended roles to run the monitor() step based on the user tooling + selection determined during the generate() step. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). Returns: - list: The list of recommended roles to create a monitoring job. + list: Recommended roles to create a monitoring job. """ recommended_roles = ['roles/aiplatform.user'] if defaults['monitoring']['auto_retraining_params']: @@ -744,8 +718,9 @@ def account_permissions_warning(operation: str, defaults: dict): """Logs the current gcloud account and generates warnings based on the operation being performed. Args: - operation: Specifies which operation is being performed. Available options {provision, deploy_with_precheck, deploy_without_precheck, model_monitoring} - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + operation (str): Specifies which operation is being performed. Available options {provision, + deploy_with_precheck, deploy_without_precheck, model_monitoring}. + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). """ bullet_nl = '\n-' gcp_account = subprocess.check_output( @@ -773,10 +748,10 @@ def account_permissions_warning(operation: str, defaults: dict): def check_installation_versions(provisioning_framework: str): """Checks the version of the provisioning tool (e.g. terraform, gcloud) and generates warning if - either the tool is not installed, or if it below the recommended version. + either the tool is not installed, or if it below the recommended version. Args: - provisioning_framework: The IaC tool to use (e.g. Terraform, Pulumi, etc.) + provisioning_framework (str): The IaC tool to use (e.g. Terraform, Pulumi, etc.). """ if provisioning_framework == Provisioner.GCLOUD.value: try: @@ -812,12 +787,11 @@ def check_installation_versions(provisioning_framework: str): def precheck_deployment_requirements(defaults: dict): - """Checks to see if the necessary MLOps infra exists to run - the deploy() step based on the user tooling selection - determined during the generate() step. + """Checks to see if the necessary MLOps infra exists to run the deploy() step based on the user + tooling selection determined during the generate() step. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults: Contents of the Defaults yaml file (config/defaults.yaml). """ use_ci = defaults['tooling']['use_ci'] artifact_repo_location = defaults['gcp']['artifact_repo_location'] @@ -970,7 +944,7 @@ def resources_generation_manifest(defaults: dict): """Logs urls of generated resources. Args: - defaults: Dictionary contents of the Defaults yaml file (config/defaults.yaml) + defaults (dict): Contents of the Defaults yaml file (config/defaults.yaml). """ logging.info('Please wait for this build job to complete.') logging.info('\n' @@ -1021,8 +995,7 @@ def render_jinja(template_path, **template_vars): Args: template_path (str): The path to the Jinja2 template file. - **template_vars: Keyword arguments representing variables to substitute - in the template. + **template_vars: Keyword arguments representing variables to substitute in the template. Returns: str: The rendered template as a string. @@ -1030,3 +1003,14 @@ def render_jinja(template_path, **template_vars): with open(template_path, 'r', encoding='utf-8') as f: template = Template(f.read()) return template.render(**template_vars) + +def coalesce(*arg): + """Creates the first non-None value from a sequence of arguments. + + Returns: + The first non-None argument, or None if all arguments are None. + """ + for el in arg: + if el is not None: + return el + return None