Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orchestration oop #55

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 61 additions & 27 deletions google_cloud_automlops/AutoMLOps.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
# pylint: disable=C0103
# pylint: disable=line-too-long
# pylint: disable=unused-import
# pylint: disable=logging-fstring-interpolation
# pylint: disable=global-at-module-level

import functools
import logging
Expand Down Expand Up @@ -47,6 +49,7 @@
from google_cloud_automlops.utils.utils import (
account_permissions_warning,
check_installation_versions,
coalesce,
create_default_config,
execute_process,
make_dirs,
Expand All @@ -58,15 +61,18 @@
write_file
)
# Orchestration imports
from google_cloud_automlops.orchestration.kfp import builder as KfpBuilder
from google_cloud_automlops.orchestration.kfp import scaffold as KfpScaffold
from google_cloud_automlops.orchestration.enums import (
Orchestrator,
PipelineJobSubmitter
)
from google_cloud_automlops.orchestration.configs import (
KfpConfig
)

from google_cloud_automlops.orchestration.Component import Component
from google_cloud_automlops.orchestration.Pipeline import Pipeline
from google_cloud_automlops.orchestration.Services import Services
from google_cloud_automlops.orchestration.kfp.KFPComponent import KFPComponent
from google_cloud_automlops.orchestration.kfp.KFPPipeline import KFPPipeline
from google_cloud_automlops.orchestration.kfp.KFPServices import KFPServices

# Provisioning imports
from google_cloud_automlops.provisioning.pulumi import builder as PulumiBuilder
from google_cloud_automlops.provisioning.terraform import builder as TerraformBuilder
Expand All @@ -91,13 +97,19 @@
)
from google_cloud_automlops.deployments.gitops.git_utils import git_workflow

# Set up logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format='%(message)s')
logging.getLogger('googleapiclient').setLevel(logging.WARNING)
logger = logging.getLogger()

# Create output directory
make_dirs([OUTPUT_DIR])

# Set up global dictionaries to hold pipeline and components
global components_dict
components_dict = {}

def launchAll(
project_id: str,
pipeline_params: Dict,
Expand Down Expand Up @@ -270,8 +282,10 @@ def generate(
raise ValueError(f'Unsupported deployment framework: {deployment_framework}')

logging.info(f'Writing directories under {BASE_DIR}')

# Make standard directories
make_dirs(GENERATED_DIRS)

# Make optional directories
if use_ci:
make_dirs(GENERATED_SERVICES_DIRS)
Expand All @@ -281,15 +295,15 @@ def generate(
make_dirs(GENERATED_GITHUB_DIRS)

# Set derived vars if none were given for certain variables
derived_artifact_repo_name = f'{naming_prefix}-artifact-registry' if artifact_repo_name is None else artifact_repo_name
derived_build_trigger_name = f'{naming_prefix}-build-trigger' if build_trigger_name is None else build_trigger_name
derived_custom_training_job_specs = stringify_job_spec_list(custom_training_job_specs) if custom_training_job_specs is not None else custom_training_job_specs
derived_pipeline_job_runner_service_account = f'vertex-pipelines@{project_id}.iam.gserviceaccount.com' if pipeline_job_runner_service_account is None else pipeline_job_runner_service_account
derived_pipeline_job_submission_service_name = f'{naming_prefix}-job-submission-svc' if pipeline_job_submission_service_name is None else pipeline_job_submission_service_name
derived_pubsub_topic_name = f'{naming_prefix}-queueing-svc' if pubsub_topic_name is None else pubsub_topic_name
derived_schedule_name = f'{naming_prefix}-schedule' if schedule_name is None else schedule_name
derived_source_repo_name = f'{naming_prefix}-repository' if source_repo_name is None else source_repo_name
derived_storage_bucket_name = f'{project_id}-{naming_prefix}-bucket' if storage_bucket_name is None else storage_bucket_name
derived_artifact_repo_name = coalesce(artifact_repo_name, f'{naming_prefix}-artifact-registry')
derived_build_trigger_name = coalesce(build_trigger_name, f'{naming_prefix}-build-trigger')
derived_custom_training_job_specs = stringify_job_spec_list(custom_training_job_specs)
derived_pipeline_job_runner_service_account = coalesce(pipeline_job_runner_service_account, f'vertex-pipelines@{project_id}.iam.gserviceaccount.com')
derived_pipeline_job_submission_service_name = coalesce(pipeline_job_submission_service_name, f'{naming_prefix}-job-submission-svc')
derived_pubsub_topic_name = coalesce(pubsub_topic_name, f'{naming_prefix}-queueing-svc')
derived_schedule_name = coalesce(schedule_name, f'{naming_prefix}-schedule')
derived_source_repo_name = coalesce(source_repo_name, f'{naming_prefix}-repository')
derived_storage_bucket_name = coalesce(storage_bucket_name, f'{project_id}-{naming_prefix}-bucket')

# Write defaults.yaml
defaults = create_default_config(
Expand Down Expand Up @@ -324,17 +338,33 @@ def generate(

# Generate files required to run a Kubeflow pipeline
if orchestration_framework == Orchestrator.KFP.value:

# Log what files will be created
logging.info(f'Writing README.md to {BASE_DIR}README.md')
logging.info(f'Writing kubeflow pipelines code to {BASE_DIR}pipelines, {BASE_DIR}components')
logging.info(f'Writing scripts to {BASE_DIR}scripts')

# Write kubeflow pipeline code
logging.info(f'Writing kubeflow pipelines code to {BASE_DIR}pipelines')
kfppipe = KFPPipeline(func=pipeline_glob.func,
name=pipeline_glob.name,
description=pipeline_glob.description,
comps_dict=components_dict)
kfppipe.build(base_image,
custom_training_job_specs,
pipeline_params,
pubsub_topic_name,
use_ci)

# Write kubeflow components code
logging.info(f'Writing kubeflow components code to {BASE_DIR}components')
for comp in kfppipe.comps:
logging.info(f' -- Writing {comp.name}')
KFPComponent(func=comp.func, packages_to_install=comp.packages_to_install).build()

# If user specified services, write services scripts
if use_ci:
logging.info(f'Writing submission service code to {BASE_DIR}services')
KfpBuilder.build(KfpConfig(
base_image=base_image,
custom_training_job_specs=derived_custom_training_job_specs,
pipeline_params=pipeline_params,
pubsub_topic_name=derived_pubsub_topic_name,
use_ci=use_ci))
KFPServices().build()

# Generate files required to provision resources
if provisioning_framework == Provisioner.GCLOUD.value:
Expand Down Expand Up @@ -532,9 +562,11 @@ def my_function_one(input: str, output: Output[Model]):
component,
packages_to_install=packages_to_install)
else:
return KfpScaffold.create_component_scaffold(
components_dict[func.__name__] = Component(
func=func,
packages_to_install=packages_to_install)
packages_to_install=packages_to_install
)
return


def pipeline(func: Optional[Callable] = None,
Expand Down Expand Up @@ -570,10 +602,12 @@ def pipeline(bq_table: str,
name=name,
description=description)
else:
return KfpScaffold.create_pipeline_scaffold(
func=func,
name=name,
description=description)
global pipeline_glob
pipeline_glob = Pipeline(func=func,
name=name,
description=description,
comps_dict=components_dict)
return


def clear_cache():
Expand Down
169 changes: 169 additions & 0 deletions google_cloud_automlops/orchestration/Component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Creates a generic component object."""

# pylint: disable=anomalous-backslash-in-string
# pylint: disable=C0103
# pylint: disable=line-too-long

import docstring_parser
import inspect
from typing import Callable, List, Optional, TypeVar, Union

from google_cloud_automlops.utils.constants import GENERATED_DEFAULTS_FILE
from google_cloud_automlops.utils.utils import (
get_function_source_definition,
read_yaml_file
)

T = TypeVar('T')


class Component():
"""The Component object represents a component defined by the user.

Args:
ABC: Abstract class
"""

def __init__(self,
func: Optional[Callable] = None,
packages_to_install: Optional[List[str]] = None):
"""Initiates a generic Component object created out of a function holding
all necessary code.

Args:
func: The python function to create a component from. The function
should have type annotations for all its arguments, indicating how
it is intended to be used (e.g. as an input/output Artifact object,
a plain parameter, or a path to a file).
packages_to_install: A list of optional packages to install before
executing func. These will always be installed at component runtime.

Raises:
ValueError: Confirms that the input is an existing function.
"""

# Confirm the input is an existing function
if not inspect.isfunction(func):
raise ValueError(f"{func} must be of type function.")

# Set simple attributes of the component function
self.func = func
self.name = func.__name__
self.packages_to_install = [] if not packages_to_install else packages_to_install

# Parse the docstring for description
self.parsed_docstring = docstring_parser.parse(inspect.getdoc(func))
self.description = self.parsed_docstring.short_description

# Process and extract details from passed function
self.parameters = self._get_function_parameters()
self.return_types = self._get_function_return_types()
self.src_code = get_function_source_definition(self.func)

# Instantiate attributes to be set during build
self.artifact_repo_location = None
self.artifact_repo_name = None
self.project_id = None
self.naming_prefix = None

def build(self):
"""Instantiates an abstract built method to create and write task files. Also
reads in defaults file to save default arguments to attributes.
"""
defaults = read_yaml_file(GENERATED_DEFAULTS_FILE)
self.artifact_repo_location = defaults['gcp']['artifact_repo_location']
self.artifact_repo_name = defaults['gcp']['artifact_repo_name']
self.project_id = defaults['gcp']['project_id']
self.naming_prefix = defaults['gcp']['naming_prefix']
raise NotImplementedError("Subclass needs to define this.")

def _get_function_return_types(self) -> list:
"""Returns a formatted list of function return types.

Returns:
list: return value list with types converted to kubeflow spec.
Raises:
Exception: If return type is provided and not a NamedTuple.
"""
# TODO: COMMENT
annotation = inspect.signature(self.func).return_annotation
if maybe_strip_optional_from_annotation(annotation) is not annotation:
raise TypeError('Return type cannot be Optional.')

# No annotations provided
# pylint: disable=protected-access
if annotation == inspect._empty:
return None

if not (hasattr(annotation,'__annotations__') and isinstance(annotation.__annotations__, dict)):
raise TypeError(f'''Return type hint for function "{self.name}" must be a NamedTuple.''')

# TODO: COMMENT
outputs = []
for name, type_ in annotation.__annotations__.items():
metadata = {}
metadata['name'] = name
metadata['type'] = type_
metadata['description'] = None
outputs.append(metadata)
return outputs

def _get_function_parameters(self) -> list:
"""Returns a formatted list of parameters.

Returns:
list: Params list with types converted to kubeflow spec.
Raises:
Exception: If parameter type hints are not provided.
"""
#TODO: COMMENT?
signature = inspect.signature(self.func)
parameters = list(signature.parameters.values())
parsed_docstring = docstring_parser.parse(inspect.getdoc(self.func))
doc_dict = {p.arg_name: p.description for p in parsed_docstring.params}

# Extract parameter metadata
parameter_holder = []
for param in parameters:
metadata = {}
metadata['name'] = param.name
metadata['description'] = doc_dict.get(param.name)
metadata['type'] = maybe_strip_optional_from_annotation(
param.annotation)
parameter_holder.append(metadata)
# pylint: disable=protected-access
if metadata['type'] == inspect._empty:
raise TypeError(
f'''Missing type hint for parameter "{metadata['name']}". '''
f'''Please specify the type for this parameter.''')
return parameter_holder

def maybe_strip_optional_from_annotation(annotation: T) -> T:
"""Strips 'Optional' from 'Optional[<type>]' if applicable.
For example::
Optional[str] -> str
str -> str
List[int] -> List[int]
Args:
annotation: The original type annotation which may or may not has `Optional`.
Returns:
The type inside Optional[] if Optional exists, otherwise the original type.
"""
if getattr(annotation, '__origin__', None) is Union and annotation.__args__[1] is type(None):
return annotation.__args__[0]
else:
return annotation
Loading