Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import executor migration to cloud run #1156

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion import-automation/executor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ ADD requirements.txt /workspace/requirements.txt
RUN pip install -r /workspace/requirements.txt

COPY app/. /workspace/app/
COPY main.py /workspace/

CMD gunicorn --timeout 0 --workers 5 -b :$PORT app.main:FLASK_APP
ENTRYPOINT ["python","./main.py"]
6 changes: 6 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ class ExecutorConfig:
dashboard_oauth_client_id: str = ''
# Oauth Client ID used to authenticate with the proxy.
importer_oauth_client_id: str = ''
# Artifact registry for docker images.
artifact_registry: str = ''
# Name for the import executor container image.
docker_image_name: str = 'dc-import'
# Service account for the Cloud Run job.
cloud_run_service_account: str = ''
# Access token of the account used to authenticate with GitHub. This is not
# the account password. See
# https://docs.github.com/en/github/authenticating-to-github/creating-a-personal-access-token.
Expand Down
23 changes: 11 additions & 12 deletions import-automation/executor/app/executor/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@
from google.cloud import run_v2


def create_or_update_cloud_run_job(
project_id: str,
location: str,
job_id: str,
image: str,
env_vars: dict,
) -> run_v2.Job:
def create_or_update_cloud_run_job(project_id: str, location: str, job_id: str,
image: str, env_vars: dict, args: list,
resources: dict) -> run_v2.Job:
"""Creates a new cloud run job or updates an existing one.

If the jobs exists, the container is updated with new image and environment
Expand All @@ -45,6 +41,8 @@ def create_or_update_cloud_run_job(
job_id: Name of the job
image: Container image URL such as 'gcr.io/your-project/your-image:latest'
env_vars: dict of environment variables as {'VAR': '<value>'}
args: list of command list arguments
resources: cpu/memory resources

Returns:
Job created as a dict.
Expand All @@ -59,17 +57,18 @@ def create_or_update_cloud_run_job(
for var, value in env_vars.items():
env.append(run_v2.EnvVar(name=var, value=value))

container = run_v2.Container(image=image, env=env)
res = run_v2.types.ResourceRequirements(limits=resources)
container = run_v2.Container(image=image, env=env, resources=res, args=args)
exe_template = run_v2.ExecutionTemplate(template=run_v2.TaskTemplate(
containers=[container]))
new_job = run_v2.Job(template=exe_template)
logging.info(f"Creating job {job_name}: {new_job}")
logging.info(f"Creating job: {job_name}")

# Look for existing job to update
job = None
try:
job = client.get_job(request=run_v2.GetJobRequest(name=job_name))
logging.info(f"Found existing job {job_name}: {job}")
logging.info(f"Found existing job: {job_name}")
except NotFound:
logging.info(f"No existing job, creating new job: {job_name}")

Expand All @@ -85,11 +84,11 @@ def create_or_update_cloud_run_job(
# Update existing Cloud Run job
# Overrides container settings including image, env
job.template.template.containers = new_job.template.template.containers
logging.info(f"Updating job {job_name}: {job}")
logging.info(f"Updating job: {job_name}")
update_request = run_v2.UpdateJobRequest(job=job)
update_operation = client.update_job(request=update_request)
result = update_operation.result() # Blocks until update completes
logging.info(f"Job updated {job_name}: {result}")
logging.info(f"Job updated: {job_name}")
return result


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,11 @@ def cloud_run_simple_import_job(
logging.info(
f'Setting up simple import cloud run {project_id}:{job_id} for'
f' {config_file} with output: {gcs_output_dir}, env: {env_vars}')
resources = {}
args = []
job = cloud_run.create_or_update_cloud_run_job(project_id, location, job_id,
image, env_vars)
image, env_vars, args,
resources)
if not job:
logging.error(
f'Failed to setup cloud run job {job_id} for {config_file}')
Expand Down
47 changes: 41 additions & 6 deletions import-automation/executor/app/executor/cloud_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from google.protobuf import json_format
from google.api_core.exceptions import AlreadyExists, NotFound

CLOUD_RUN_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA')
GKE_SERVICE_DOMAIN = os.getenv('GKE_SERVICE_DOMAIN',
'importautomation.datacommons.org')
GKE_CALLER_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA')
Expand All @@ -50,15 +51,49 @@ def _base_job_request(absolute_import_name, schedule: str):
# 30m is the max allowed deadline
'seconds': 60 * 30
}
# <'http_request'|'appengine_job_request'>: {...}
# <'gke_job_request'|'appengine_job_request'|'cloud_run_job_request'>: {...}
}


def http_job_request(absolute_import_name,
schedule,
json_encoded_job_body: str,
gke_caller_service_account: str = "",
gke_oauth_audience: str = "") -> Dict:
def cloud_run_job_request(absolute_import_name, schedule,
json_encoded_config: str, cloud_run_job_url: str,
cloud_run_service_account: str) -> Dict:
"""Cloud Scheduler request that targets jossZ [;bs in CLOUD_RUN."""
json_encoded_job_body = json.dumps({}).encode("utf-8")
# json.dumps({
# 'overrides': {
# "containerOverrides": [{
# 'args': [
# f'--import_name={absolute_import_name}',
# f'--import_config={json_encoded_config}'
# ]
# }]
# }
# }).encode("utf-8")

job = _base_job_request(absolute_import_name, schedule)
job_name = absolute_import_name.split(':')[1]
job['name'] = f'{job_name}'
job['http_target'] = {
'uri': f'https://{cloud_run_job_url}',
'http_method': 'POST',
'headers': {
'Content-Type': 'application/json',
},
'body': json_encoded_job_body,
'oauth_token': {
'service_account_email': f'{cloud_run_service_account}',
'scope': 'https://www.googleapis.com/auth/cloud-platform'
}
}
return job


def gke_job_request(absolute_import_name,
schedule,
json_encoded_job_body: str,
gke_caller_service_account: str = "",
gke_oauth_audience: str = "") -> Dict:
"""Cloud Scheduler request that targets executors launched in GKE."""

# If the service account and oauth audience are provided as
Expand Down
72 changes: 55 additions & 17 deletions import-automation/executor/app/executor/scheduler_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
import traceback
import tempfile
from typing import Dict
import cloud_run

from app import configs
from app.service import github_api
from app.executor import import_target
from app.executor import import_executor
from app.executor import cloud_scheduler

_GKE_SERVICE_ACCOUNT_KEY: str = 'gke_service_account'
_GKE_OAUTH_AUDIENCE_KEY: str = 'gke_oauth_audience'


def schedule_on_commit(github: github_api.GitHubRepoAPI,
config: configs.ExecutorConfig, commit_sha: str):
Expand Down Expand Up @@ -76,8 +80,8 @@ def schedule_on_commit(github: github_api.GitHubRepoAPI,
relative_dir, spec['import_name'])
logging.info('Scheduling a data update job for %s',
absolute_import_name)
job = _create_or_update_import_schedule(absolute_import_name,
schedule, config)
job = create_or_update_import_schedule(absolute_import_name,
schedule, config, {})
scheduled.append(job)
except Exception:
raise import_executor.ExecutionError(
Expand All @@ -87,27 +91,61 @@ def schedule_on_commit(github: github_api.GitHubRepoAPI,
'No issues')


def _create_or_update_import_schedule(absolute_import_name, schedule: str,
config: configs.ExecutorConfig):
def create_or_update_import_schedule(absolute_import_name: str, schedule: str,
config: configs.ExecutorConfig,
scheduler_config_dict: Dict):
"""Create/Update the import schedule for 1 import."""
# Note: this is the content of what is passed to /update API
# inside each cronjob http calls.
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'configs': config.get_data_refresh_config()
}).encode()

if config.executor_type == "GKE":
req = cloud_scheduler.http_job_request(absolute_import_name, schedule,
json_encoded_job_body)
# Note: this is the content of what is passed to /update API
# inside each cronjob http calls.
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'configs': config.get_data_refresh_config()
}).encode('utf-8')
# Before proceeding, ensure that the configs read from GCS have the expected fields.
assert _GKE_SERVICE_ACCOUNT_KEY in scheduler_config_dict
assert _GKE_OAUTH_AUDIENCE_KEY in scheduler_config_dict
service_account_key = scheduler_config_dict[_GKE_SERVICE_ACCOUNT_KEY]
oauth_audience_key = scheduler_config_dict[_GKE_OAUTH_AUDIENCE_KEY]
req = cloud_scheduler.gke_job_request(absolute_import_name, schedule,
json_encoded_job_body,
service_account_key,
oauth_audience_key)
elif config.executor_type == "GAE":
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'configs': config.get_data_refresh_config()
}).encode('utf-8')
req = cloud_scheduler.appengine_job_request(absolute_import_name,
schedule,
json_encoded_job_body)
elif config.executor_type == "CLOUD_RUN":
image_version = 'latest'
docker_image = f'{config.artifact_registry}/{config.docker_image_name}:{image_version}'
job_name = absolute_import_name.split(':')[1]

json_encoded_config = json.dumps(config.get_data_refresh_config())
args = [
f'--import_name={absolute_import_name}',
f'--import_config={json_encoded_config}'
]
resources = {"cpu": "2", "memory": "2G"}
env_vars = {}
job = cloud_run.create_or_update_cloud_run_job(
config.gcp_project_id, config.scheduler_location, job_name,
docker_image, env_vars, args, resources)
job_id = job.name.rsplit('/', 1)[1]
if not job:
logging.error(
f'Failed to setup cloud run job for {absolute_import_name}')
cloud_run_job_url = f'{config.scheduler_location}-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/{config.gcp_project_id}/jobs/{job_id}:run'
req = cloud_scheduler.cloud_run_job_request(
absolute_import_name, schedule, json_encoded_config,
cloud_run_job_url, scheduler_config_dict[_GKE_SERVICE_ACCOUNT_KEY])
return cloud_scheduler.create_or_update_job(config.gcp_project_id,
config.scheduler_location,
req)
else:
raise Exception(
"Invalid executor_type %s, expects one of ('GKE', 'GAE')",
"Invalid executor_type %s, expects one of ('GKE', 'GAE', 'CLOUD_RUN')",
config.executor_type)

return cloud_scheduler.create_or_update_job(config.gcp_project_id,
config.scheduler_location, req)
30 changes: 30 additions & 0 deletions import-automation/executor/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Builds the docker image of import executor, pushes it to artificat registry,
# and creates a cloud run job using this images.
#
# Run it using:
# gcloud builds submit --region=us-west1 --config=cloudbuild.yaml --substitutions=_ARTIFACT_REGISTRY_REPO="dc-images",_IMAGE_NAME="dc-import",_REGION="us-west1",_JOB_NAME="dc-import" .

steps:
# Install dependencies
- name: python
entrypoint: pip
args: ["install", "-r", "requirements.txt", "--user"]

# Docker Build
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t',
'${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest', '.']

# Docker push to Google Artifact Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest']

# Deploy to Cloud Run
- name: google/cloud-sdk
args: ['gcloud', 'run', 'jobs', 'deploy', '${_JOB_NAME}',
'--image=${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest',
'--region', '${_REGION}']

# Store images in Google Artifact Registry
images:
- ${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest
43 changes: 43 additions & 0 deletions import-automation/executor/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from app import configs
from app.executor import import_executor
from app.service import file_uploader
from app.service import github_api
from app.service import email_notifier

from absl import flags
from absl import app
import logging
import json

FLAGS = flags.FLAGS
flags.DEFINE_string('import_name', '', 'Absoluate import name.')
flags.DEFINE_string('import_config', '', 'Import executor configuration.')


def scheduled_updates(absolute_import_name: str, import_config: str):
logging.info(absolute_import_name)
cfg = json.loads(import_config)
config = configs.ExecutorConfig(**cfg)
logging.info(config)
executor = import_executor.ImportExecutor(
uploader=file_uploader.GCSFileUploader(
project_id=config.gcs_project_id,
bucket_name=config.storage_prod_bucket_name),
github=github_api.GitHubRepoAPI(
repo_owner_username=config.github_repo_owner_username,
repo_name=config.github_repo_name,
auth_username=config.github_auth_username,
auth_access_token=config.github_auth_access_token),
config=config,
notifier=email_notifier.EmailNotifier(config.email_account,
config.email_token))
result = executor.execute_imports_on_update(absolute_import_name)
logging.info(result)


def main(_):
scheduled_updates(FLAGS.import_name, FLAGS.import_config)


if __name__ == '__main__':
app.run(main)
Loading
Loading