diff --git a/import-automation/executor/Dockerfile b/import-automation/executor/Dockerfile index 9778c20824..0fa23b4d08 100644 --- a/import-automation/executor/Dockerfile +++ b/import-automation/executor/Dockerfile @@ -24,5 +24,6 @@ ADD requirements.txt /workspace/requirements.txt RUN pip install -r /workspace/requirements.txt COPY app/. /workspace/app/ +COPY main.py /workspace/ -CMD gunicorn --timeout 0 --workers 5 -b :$PORT app.main:FLASK_APP +ENTRYPOINT ["python","./main.py"] diff --git a/import-automation/executor/app/configs.py b/import-automation/executor/app/configs.py index 0df79f0c66..f0cd0aa7c5 100644 --- a/import-automation/executor/app/configs.py +++ b/import-automation/executor/app/configs.py @@ -81,6 +81,12 @@ class ExecutorConfig: dashboard_oauth_client_id: str = '' # Oauth Client ID used to authenticate with the proxy. importer_oauth_client_id: str = '' + # Artifact registry for docker images. + artifact_registry: str = '' + # Name for the import executor container image. + docker_image_name: str = 'dc-import' + # Service account for the Cloud Run job. + cloud_run_service_account: str = '' # Access token of the account used to authenticate with GitHub. This is not # the account password. See # https://docs.github.com/en/github/authenticating-to-github/creating-a-personal-access-token. diff --git a/import-automation/executor/app/executor/cloud_run.py b/import-automation/executor/app/executor/cloud_run.py index 3895a4ace2..1ca66a0fb4 100644 --- a/import-automation/executor/app/executor/cloud_run.py +++ b/import-automation/executor/app/executor/cloud_run.py @@ -26,13 +26,9 @@ from google.cloud import run_v2 -def create_or_update_cloud_run_job( - project_id: str, - location: str, - job_id: str, - image: str, - env_vars: dict, -) -> run_v2.Job: +def create_or_update_cloud_run_job(project_id: str, location: str, job_id: str, + image: str, env_vars: dict, args: list, + resources: dict) -> run_v2.Job: """Creates a new cloud run job or updates an existing one. If the jobs exists, the container is updated with new image and environment @@ -45,6 +41,8 @@ def create_or_update_cloud_run_job( job_id: Name of the job image: Container image URL such as 'gcr.io/your-project/your-image:latest' env_vars: dict of environment variables as {'VAR': ''} + args: list of command list arguments + resources: cpu/memory resources Returns: Job created as a dict. @@ -59,17 +57,18 @@ def create_or_update_cloud_run_job( for var, value in env_vars.items(): env.append(run_v2.EnvVar(name=var, value=value)) - container = run_v2.Container(image=image, env=env) + res = run_v2.types.ResourceRequirements(limits=resources) + container = run_v2.Container(image=image, env=env, resources=res, args=args) exe_template = run_v2.ExecutionTemplate(template=run_v2.TaskTemplate( containers=[container])) new_job = run_v2.Job(template=exe_template) - logging.info(f"Creating job {job_name}: {new_job}") + logging.info(f"Creating job: {job_name}") # Look for existing job to update job = None try: job = client.get_job(request=run_v2.GetJobRequest(name=job_name)) - logging.info(f"Found existing job {job_name}: {job}") + logging.info(f"Found existing job: {job_name}") except NotFound: logging.info(f"No existing job, creating new job: {job_name}") @@ -85,11 +84,11 @@ def create_or_update_cloud_run_job( # Update existing Cloud Run job # Overrides container settings including image, env job.template.template.containers = new_job.template.template.containers - logging.info(f"Updating job {job_name}: {job}") + logging.info(f"Updating job: {job_name}") update_request = run_v2.UpdateJobRequest(job=job) update_operation = client.update_job(request=update_request) result = update_operation.result() # Blocks until update completes - logging.info(f"Job updated {job_name}: {result}") + logging.info(f"Job updated: {job_name}") return result diff --git a/import-automation/executor/app/executor/cloud_run_simple_import.py b/import-automation/executor/app/executor/cloud_run_simple_import.py index 380e3a3b19..51736d4f24 100644 --- a/import-automation/executor/app/executor/cloud_run_simple_import.py +++ b/import-automation/executor/app/executor/cloud_run_simple_import.py @@ -188,8 +188,11 @@ def cloud_run_simple_import_job( logging.info( f'Setting up simple import cloud run {project_id}:{job_id} for' f' {config_file} with output: {gcs_output_dir}, env: {env_vars}') + resources = {} + args = [] job = cloud_run.create_or_update_cloud_run_job(project_id, location, job_id, - image, env_vars) + image, env_vars, args, + resources) if not job: logging.error( f'Failed to setup cloud run job {job_id} for {config_file}') diff --git a/import-automation/executor/app/executor/cloud_scheduler.py b/import-automation/executor/app/executor/cloud_scheduler.py index e89ec592b8..22a71a59f2 100644 --- a/import-automation/executor/app/executor/cloud_scheduler.py +++ b/import-automation/executor/app/executor/cloud_scheduler.py @@ -26,6 +26,7 @@ from google.protobuf import json_format from google.api_core.exceptions import AlreadyExists, NotFound +CLOUD_RUN_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA') GKE_SERVICE_DOMAIN = os.getenv('GKE_SERVICE_DOMAIN', 'importautomation.datacommons.org') GKE_CALLER_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA') @@ -50,15 +51,49 @@ def _base_job_request(absolute_import_name, schedule: str): # 30m is the max allowed deadline 'seconds': 60 * 30 } - # <'http_request'|'appengine_job_request'>: {...} + # <'gke_job_request'|'appengine_job_request'|'cloud_run_job_request'>: {...} } -def http_job_request(absolute_import_name, - schedule, - json_encoded_job_body: str, - gke_caller_service_account: str = "", - gke_oauth_audience: str = "") -> Dict: +def cloud_run_job_request(absolute_import_name, schedule, + json_encoded_config: str, cloud_run_job_url: str, + cloud_run_service_account: str) -> Dict: + """Cloud Scheduler request that targets jossZ [;bs in CLOUD_RUN.""" + json_encoded_job_body = json.dumps({}).encode("utf-8") + # json.dumps({ + # 'overrides': { + # "containerOverrides": [{ + # 'args': [ + # f'--import_name={absolute_import_name}', + # f'--import_config={json_encoded_config}' + # ] + # }] + # } + # }).encode("utf-8") + + job = _base_job_request(absolute_import_name, schedule) + job_name = absolute_import_name.split(':')[1] + job['name'] = f'{job_name}' + job['http_target'] = { + 'uri': f'https://{cloud_run_job_url}', + 'http_method': 'POST', + 'headers': { + 'Content-Type': 'application/json', + }, + 'body': json_encoded_job_body, + 'oauth_token': { + 'service_account_email': f'{cloud_run_service_account}', + 'scope': 'https://www.googleapis.com/auth/cloud-platform' + } + } + return job + + +def gke_job_request(absolute_import_name, + schedule, + json_encoded_job_body: str, + gke_caller_service_account: str = "", + gke_oauth_audience: str = "") -> Dict: """Cloud Scheduler request that targets executors launched in GKE.""" # If the service account and oauth audience are provided as diff --git a/import-automation/executor/app/executor/scheduler_job_manager.py b/import-automation/executor/app/executor/scheduler_job_manager.py index e1f7e775e7..81d1ddb306 100644 --- a/import-automation/executor/app/executor/scheduler_job_manager.py +++ b/import-automation/executor/app/executor/scheduler_job_manager.py @@ -33,6 +33,7 @@ import traceback import tempfile from typing import Dict +import cloud_run from app import configs from app.service import github_api @@ -40,6 +41,9 @@ from app.executor import import_executor from app.executor import cloud_scheduler +_GKE_SERVICE_ACCOUNT_KEY: str = 'gke_service_account' +_GKE_OAUTH_AUDIENCE_KEY: str = 'gke_oauth_audience' + def schedule_on_commit(github: github_api.GitHubRepoAPI, config: configs.ExecutorConfig, commit_sha: str): @@ -76,8 +80,8 @@ def schedule_on_commit(github: github_api.GitHubRepoAPI, relative_dir, spec['import_name']) logging.info('Scheduling a data update job for %s', absolute_import_name) - job = _create_or_update_import_schedule(absolute_import_name, - schedule, config) + job = create_or_update_import_schedule(absolute_import_name, + schedule, config, {}) scheduled.append(job) except Exception: raise import_executor.ExecutionError( @@ -87,27 +91,61 @@ def schedule_on_commit(github: github_api.GitHubRepoAPI, 'No issues') -def _create_or_update_import_schedule(absolute_import_name, schedule: str, - config: configs.ExecutorConfig): +def create_or_update_import_schedule(absolute_import_name: str, schedule: str, + config: configs.ExecutorConfig, + scheduler_config_dict: Dict): """Create/Update the import schedule for 1 import.""" - # Note: this is the content of what is passed to /update API - # inside each cronjob http calls. - json_encoded_job_body = json.dumps({ - 'absolute_import_name': absolute_import_name, - 'configs': config.get_data_refresh_config() - }).encode() - if config.executor_type == "GKE": - req = cloud_scheduler.http_job_request(absolute_import_name, schedule, - json_encoded_job_body) + # Note: this is the content of what is passed to /update API + # inside each cronjob http calls. + json_encoded_job_body = json.dumps({ + 'absolute_import_name': absolute_import_name, + 'configs': config.get_data_refresh_config() + }).encode('utf-8') + # Before proceeding, ensure that the configs read from GCS have the expected fields. + assert _GKE_SERVICE_ACCOUNT_KEY in scheduler_config_dict + assert _GKE_OAUTH_AUDIENCE_KEY in scheduler_config_dict + service_account_key = scheduler_config_dict[_GKE_SERVICE_ACCOUNT_KEY] + oauth_audience_key = scheduler_config_dict[_GKE_OAUTH_AUDIENCE_KEY] + req = cloud_scheduler.gke_job_request(absolute_import_name, schedule, + json_encoded_job_body, + service_account_key, + oauth_audience_key) elif config.executor_type == "GAE": + json_encoded_job_body = json.dumps({ + 'absolute_import_name': absolute_import_name, + 'configs': config.get_data_refresh_config() + }).encode('utf-8') req = cloud_scheduler.appengine_job_request(absolute_import_name, schedule, json_encoded_job_body) + elif config.executor_type == "CLOUD_RUN": + image_version = 'latest' + docker_image = f'{config.artifact_registry}/{config.docker_image_name}:{image_version}' + job_name = absolute_import_name.split(':')[1] + + json_encoded_config = json.dumps(config.get_data_refresh_config()) + args = [ + f'--import_name={absolute_import_name}', + f'--import_config={json_encoded_config}' + ] + resources = {"cpu": "2", "memory": "2G"} + env_vars = {} + job = cloud_run.create_or_update_cloud_run_job( + config.gcp_project_id, config.scheduler_location, job_name, + docker_image, env_vars, args, resources) + job_id = job.name.rsplit('/', 1)[1] + if not job: + logging.error( + f'Failed to setup cloud run job for {absolute_import_name}') + cloud_run_job_url = f'{config.scheduler_location}-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/{config.gcp_project_id}/jobs/{job_id}:run' + req = cloud_scheduler.cloud_run_job_request( + absolute_import_name, schedule, json_encoded_config, + cloud_run_job_url, scheduler_config_dict[_GKE_SERVICE_ACCOUNT_KEY]) + return cloud_scheduler.create_or_update_job(config.gcp_project_id, + config.scheduler_location, + req) else: raise Exception( - "Invalid executor_type %s, expects one of ('GKE', 'GAE')", + "Invalid executor_type %s, expects one of ('GKE', 'GAE', 'CLOUD_RUN')", config.executor_type) - - return cloud_scheduler.create_or_update_job(config.gcp_project_id, - config.scheduler_location, req) diff --git a/import-automation/executor/cloudbuild.yaml b/import-automation/executor/cloudbuild.yaml new file mode 100644 index 0000000000..f9e45962f2 --- /dev/null +++ b/import-automation/executor/cloudbuild.yaml @@ -0,0 +1,30 @@ +# Builds the docker image of import executor, pushes it to artificat registry, +# and creates a cloud run job using this images. +# +# Run it using: +# gcloud builds submit --region=us-west1 --config=cloudbuild.yaml --substitutions=_ARTIFACT_REGISTRY_REPO="dc-images",_IMAGE_NAME="dc-import",_REGION="us-west1",_JOB_NAME="dc-import" . + +steps: + # Install dependencies + - name: python + entrypoint: pip + args: ["install", "-r", "requirements.txt", "--user"] + + # Docker Build + - name: 'gcr.io/cloud-builders/docker' + args: ['build', '-t', + '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest', '.'] + + # Docker push to Google Artifact Registry + - name: 'gcr.io/cloud-builders/docker' + args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest'] + + # Deploy to Cloud Run + - name: google/cloud-sdk + args: ['gcloud', 'run', 'jobs', 'deploy', '${_JOB_NAME}', + '--image=${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest', + '--region', '${_REGION}'] + +# Store images in Google Artifact Registry +images: + - ${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest diff --git a/import-automation/executor/main.py b/import-automation/executor/main.py new file mode 100644 index 0000000000..5c2a492934 --- /dev/null +++ b/import-automation/executor/main.py @@ -0,0 +1,43 @@ +from app import configs +from app.executor import import_executor +from app.service import file_uploader +from app.service import github_api +from app.service import email_notifier + +from absl import flags +from absl import app +import logging +import json + +FLAGS = flags.FLAGS +flags.DEFINE_string('import_name', '', 'Absoluate import name.') +flags.DEFINE_string('import_config', '', 'Import executor configuration.') + + +def scheduled_updates(absolute_import_name: str, import_config: str): + logging.info(absolute_import_name) + cfg = json.loads(import_config) + config = configs.ExecutorConfig(**cfg) + logging.info(config) + executor = import_executor.ImportExecutor( + uploader=file_uploader.GCSFileUploader( + project_id=config.gcs_project_id, + bucket_name=config.storage_prod_bucket_name), + github=github_api.GitHubRepoAPI( + repo_owner_username=config.github_repo_owner_username, + repo_name=config.github_repo_name, + auth_username=config.github_auth_username, + auth_access_token=config.github_auth_access_token), + config=config, + notifier=email_notifier.EmailNotifier(config.email_account, + config.email_token)) + result = executor.execute_imports_on_update(absolute_import_name) + logging.info(result) + + +def main(_): + scheduled_updates(FLAGS.import_name, FLAGS.import_config) + + +if __name__ == '__main__': + app.run(main) diff --git a/import-automation/executor/schedule_update_import.py b/import-automation/executor/schedule_update_import.py index ee90e053e8..99ea996d67 100644 --- a/import-automation/executor/schedule_update_import.py +++ b/import-automation/executor/schedule_update_import.py @@ -25,6 +25,7 @@ from app.executor import import_target from app.executor import import_executor from app.executor import cloud_scheduler +from app.executor import scheduler_job_manager from app.executor import validation from app.service import email_notifier from app.service import file_uploader @@ -32,8 +33,6 @@ from google.cloud import storage _CONFIG_OVERRIDE_FILE: str = 'config_override.json' -_GKE_SERVICE_ACCOUNT_KEY: str = 'gke_service_account' -_GKE_OAUTH_AUDIENCE_KEY: str = 'gke_oauth_audience' _FLAGS = flags.FLAGS @@ -238,34 +237,6 @@ def update(cfg: configs.ExecutorConfig, return executor.execute_imports_on_update(absolute_import_path) -def schedule(cfg: configs.ExecutorConfig, - absolute_import_name: str, - repo_dir: str, - gke_service_account: str = "", - gke_oauth_audience: str = "") -> Dict: - # This is the content of what is passed to /update API - # inside each cronjob http calls from Cloud Scheduler. - json_encoded_job_body = json.dumps({ - 'absolute_import_name': absolute_import_name, - 'configs': cfg.get_data_refresh_config() - }).encode("utf-8") - - # Retrieve the cron schedule. - cron_schedule = _get_cron_schedule(repo_dir, absolute_import_name, - cfg.manifest_filename) - - # Create an HTTP Job Request. - req = cloud_scheduler.http_job_request( - absolute_import_name, - cron_schedule, - json_encoded_job_body, - gke_caller_service_account=gke_service_account, - gke_oauth_audience=gke_oauth_audience) - - return cloud_scheduler.create_or_update_job(cfg.gcp_project_id, - cfg.scheduler_location, req) - - def main(_): mode = _FLAGS.mode absolute_import_path = _FLAGS.absolute_import_path @@ -332,19 +303,14 @@ def main(_): _print_fileupload_results(cfg, absolute_import_path) elif mode == 'schedule': - # Before proceeding, ensure that the configs read from GCS have the expected fields. - assert _GKE_SERVICE_ACCOUNT_KEY in scheduler_config_dict - assert _GKE_OAUTH_AUDIENCE_KEY in scheduler_config_dict - logging.info("*************************************************") logging.info("***** Beginning Schedule Operation **************") logging.info("*************************************************") - res = schedule( - cfg, - absolute_import_path, - repo_dir, - gke_service_account=scheduler_config_dict[_GKE_SERVICE_ACCOUNT_KEY], - gke_oauth_audience=scheduler_config_dict[_GKE_OAUTH_AUDIENCE_KEY]) + # Retrieve the cron schedule. + cron_schedule = _get_cron_schedule(repo_dir, absolute_import_path, + cfg.manifest_filename) + res = scheduler_job_manager.create_or_update_import_schedule( + absolute_import_path, cron_schedule, cfg, scheduler_config_dict) logging.info("*************************************************") logging.info("*********** Schedule Operation Complete. ********") logging.info("*************************************************") diff --git a/import-automation/executor/test/cloud_scheduler_test.py b/import-automation/executor/test/cloud_scheduler_test.py index 468249bedb..08480dd79d 100644 --- a/import-automation/executor/test/cloud_scheduler_test.py +++ b/import-automation/executor/test/cloud_scheduler_test.py @@ -60,15 +60,15 @@ def test_appengine_job_request(self): } assert DeepDiff(got, want) == {} - def test_http_job_request(self): + def test_gke_job_request(self): absolute_import_name = "scripts/preprocess:A" schedule = "0 5 * * *" json_encoded_job_body = '{"k":"v"}' cloud_scheduler.GKE_CALLER_SERVICE_ACCOUNT = 'account' cloud_scheduler.GKE_OAUTH_AUDIENCE = 'audience' - got = cloud_scheduler.http_job_request(absolute_import_name, schedule, - json_encoded_job_body) + got = cloud_scheduler.gke_job_request(absolute_import_name, schedule, + json_encoded_job_body) want = { 'name': 'scripts_preprocess_A_GKE', 'description': 'scripts/preprocess:A', @@ -97,3 +97,47 @@ def test_http_job_request(self): } } assert DeepDiff(got, want) == {} + + def test_cloud_run_job_request(self): + absolute_import_name = "scripts/preprocess:A" + schedule = "0 5 * * *" + json_encoded_config = '{"k":"v"}' + + cloud_run_service_account = 'service_account' + cloud_run_job_url = 'run.googleapis.com/run' + got = cloud_scheduler.cloud_run_job_request(absolute_import_name, + schedule, + json_encoded_config, + cloud_run_job_url, + cloud_run_service_account) + want = { + 'name': 'scripts_preprocess_A', + 'description': 'scripts/preprocess:A', + 'schedule': "0 5 * * *", + 'time_zone': 'Etc/UTC', + 'retry_config': { + 'retry_count': 2, + 'min_backoff_duration': { + 'seconds': 60 * 60 + } + }, + 'attempt_deadline': { + 'seconds': 60 * 30 + }, + 'http_target': { + 'uri': + 'https://run.googleapis.com/run', + 'http_method': + 'POST', + 'headers': { + 'Content-Type': 'application/json', + }, + 'body': + b'{"overrides": {"containerOverrides": [{"args": ["--import_name=scripts/preprocess:A", "--import_config={\\"k\\":\\"v\\"}"]}]}}', + 'oauth_token': { + 'service_account_email': 'service_account', + 'scope': 'https://www.googleapis.com/auth/cloud-platform' + } + } + } + assert DeepDiff(got, want) == {}