Skip to content

Commit

Permalink
Import executor migration to cloud run
Browse files Browse the repository at this point in the history
  • Loading branch information
vish-cs committed Jan 2, 2025
1 parent b4ebd73 commit 37f56cb
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 80 deletions.
3 changes: 2 additions & 1 deletion import-automation/executor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ RUN pip install -r /workspace/requirements.txt
RUN git clone https://github.com/datacommonsorg/data.git
RUN wget https://github.com/datacommonsorg/import/releases/download/0.1-alpha.1k/datacommons-import-tool-0.1-alpha.1-jar-with-dependencies.jar
COPY app/. /workspace/app/
COPY main.py /workspace/

CMD gunicorn --timeout 0 --workers 5 -b :$PORT app.main:FLASK_APP
ENTRYPOINT ["python","./main.py"]
2 changes: 2 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class ExecutorConfig:
dashboard_oauth_client_id: str = ''
# Oauth Client ID used to authenticate with the proxy.
importer_oauth_client_id: str = ''
# URL for the import executor container image.
executor_docker_image: str = 'gcr.io/datcom-ci/dc-import-executor:latest'
# Access token of the account used to authenticate with GitHub. This is not
# the account password. See
# https://docs.github.com/en/github/authenticating-to-github/creating-a-personal-access-token.
Expand Down
23 changes: 11 additions & 12 deletions import-automation/executor/app/executor/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@
from google.cloud import run_v2


def create_or_update_cloud_run_job(
project_id: str,
location: str,
job_id: str,
image: str,
env_vars: dict,
) -> run_v2.Job:
def create_or_update_cloud_run_job(project_id: str, location: str, job_id: str,
image: str, env_vars: dict, args: list,
resources: dict) -> run_v2.Job:
"""Creates a new cloud run job or updates an existing one.
If the jobs exists, the container is updated with new image and environment
Expand All @@ -45,6 +41,8 @@ def create_or_update_cloud_run_job(
job_id: Name of the job
image: Container image URL such as 'gcr.io/your-project/your-image:latest'
env_vars: dict of environment variables as {'VAR': '<value>'}
args: list of command line arguments
resources: cpu/memory resources
Returns:
Job created as a dict.
Expand All @@ -59,17 +57,18 @@ def create_or_update_cloud_run_job(
for var, value in env_vars.items():
env.append(run_v2.EnvVar(name=var, value=value))

container = run_v2.Container(image=image, env=env)
res = run_v2.types.ResourceRequirements(limits=resources)
container = run_v2.Container(image=image, env=env, resources=res, args=args)
exe_template = run_v2.ExecutionTemplate(template=run_v2.TaskTemplate(
containers=[container]))
new_job = run_v2.Job(template=exe_template)
logging.info(f"Creating job {job_name}: {new_job}")
logging.info(f"Creating job: {job_name}")

# Look for existing job to update
job = None
try:
job = client.get_job(request=run_v2.GetJobRequest(name=job_name))
logging.info(f"Found existing job {job_name}: {job}")
logging.info(f"Found existing job: {job_name}")
except NotFound:
logging.info(f"No existing job, creating new job: {job_name}")

Expand All @@ -85,11 +84,11 @@ def create_or_update_cloud_run_job(
# Update existing Cloud Run job
# Overrides container settings including image, env
job.template.template.containers = new_job.template.template.containers
logging.info(f"Updating job {job_name}: {job}")
logging.info(f"Updating job: {job_name}")
update_request = run_v2.UpdateJobRequest(job=job)
update_operation = client.update_job(request=update_request)
result = update_operation.result() # Blocks until update completes
logging.info(f"Job updated {job_name}: {result}")
logging.info(f"Job updated: {job_name}")
return result


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,11 @@ def cloud_run_simple_import_job(
logging.info(
f'Setting up simple import cloud run {project_id}:{job_id} for'
f' {config_file} with output: {gcs_output_dir}, env: {env_vars}')
resources = {}
args = []
job = cloud_run.create_or_update_cloud_run_job(project_id, location, job_id,
image, env_vars)
image, env_vars, args,
resources)
if not job:
logging.error(
f'Failed to setup cloud run job {job_id} for {config_file}')
Expand Down
47 changes: 41 additions & 6 deletions import-automation/executor/app/executor/cloud_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from google.protobuf import json_format
from google.api_core.exceptions import AlreadyExists, NotFound

CLOUD_RUN_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA')
GKE_SERVICE_DOMAIN = os.getenv('GKE_SERVICE_DOMAIN',
'importautomation.datacommons.org')
GKE_CALLER_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA')
Expand All @@ -50,15 +51,49 @@ def _base_job_request(absolute_import_name, schedule: str):
# 30m is the max allowed deadline
'seconds': 60 * 30
}
# <'http_request'|'appengine_job_request'>: {...}
# <'gke_job_request'|'appengine_job_request'|'cloud_run_job_request'>: {...}
}


def http_job_request(absolute_import_name,
schedule,
json_encoded_job_body: str,
gke_caller_service_account: str = "",
gke_oauth_audience: str = "") -> Dict:
def cloud_run_job_request(absolute_import_name, schedule,
json_encoded_config: str, cloud_run_job_url: str,
cloud_run_service_account: str) -> Dict:
"""Cloud Scheduler request that targets jossZ [;bs in CLOUD_RUN."""
json_encoded_job_body = json.dumps({}).encode("utf-8")
# json.dumps({
# 'overrides': {
# "containerOverrides": [{
# 'args': [
# f'--import_name={absolute_import_name}',
# f'--import_config={json_encoded_config}'
# ]
# }]
# }
# }).encode("utf-8")

job = _base_job_request(absolute_import_name, schedule)
job_name = absolute_import_name.split(':')[1]
job['name'] = f'{job_name}'
job['http_target'] = {
'uri': f'https://{cloud_run_job_url}',
'http_method': 'POST',
'headers': {
'Content-Type': 'application/json',
},
'body': json_encoded_job_body,
'oauth_token': {
'service_account_email': f'{cloud_run_service_account}',
'scope': 'https://www.googleapis.com/auth/cloud-platform'
}
}
return job


def gke_job_request(absolute_import_name,
schedule,
json_encoded_job_body: str,
gke_caller_service_account: str = "",
gke_oauth_audience: str = "") -> Dict:
"""Cloud Scheduler request that targets executors launched in GKE."""

# If the service account and oauth audience are provided as
Expand Down
71 changes: 54 additions & 17 deletions import-automation/executor/app/executor/scheduler_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
import traceback
import tempfile
from typing import Dict
import cloud_run

from app import configs
from app.service import github_api
from app.executor import import_target
from app.executor import import_executor
from app.executor import cloud_scheduler

_GKE_SERVICE_ACCOUNT_KEY: str = 'gke_service_account'
_GKE_OAUTH_AUDIENCE_KEY: str = 'gke_oauth_audience'


def schedule_on_commit(github: github_api.GitHubRepoAPI,
config: configs.ExecutorConfig, commit_sha: str):
Expand Down Expand Up @@ -76,8 +80,8 @@ def schedule_on_commit(github: github_api.GitHubRepoAPI,
relative_dir, spec['import_name'])
logging.info('Scheduling a data update job for %s',
absolute_import_name)
job = _create_or_update_import_schedule(absolute_import_name,
schedule, config)
job = create_or_update_import_schedule(absolute_import_name,
schedule, config, {})
scheduled.append(job)
except Exception:
raise import_executor.ExecutionError(
Expand All @@ -87,27 +91,60 @@ def schedule_on_commit(github: github_api.GitHubRepoAPI,
'No issues')


def _create_or_update_import_schedule(absolute_import_name, schedule: str,
config: configs.ExecutorConfig):
def create_or_update_import_schedule(absolute_import_name: str, schedule: str,
config: configs.ExecutorConfig,
scheduler_config_dict: Dict):
"""Create/Update the import schedule for 1 import."""
# Note: this is the content of what is passed to /update API
# inside each cronjob http calls.
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'configs': config.get_data_refresh_config()
}).encode()

if config.executor_type == "GKE":
req = cloud_scheduler.http_job_request(absolute_import_name, schedule,
json_encoded_job_body)
# Note: this is the content of what is passed to /update API
# inside each cronjob http calls.
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'configs': config.get_data_refresh_config()
}).encode('utf-8')
# Before proceeding, ensure that the configs read from GCS have the expected fields.
assert _GKE_SERVICE_ACCOUNT_KEY in scheduler_config_dict
assert _GKE_OAUTH_AUDIENCE_KEY in scheduler_config_dict
service_account_key = scheduler_config_dict[_GKE_SERVICE_ACCOUNT_KEY]
oauth_audience_key = scheduler_config_dict[_GKE_OAUTH_AUDIENCE_KEY]
req = cloud_scheduler.gke_job_request(absolute_import_name, schedule,
json_encoded_job_body,
service_account_key,
oauth_audience_key)
elif config.executor_type == "GAE":
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'configs': config.get_data_refresh_config()
}).encode('utf-8')
req = cloud_scheduler.appengine_job_request(absolute_import_name,
schedule,
json_encoded_job_body)
elif config.executor_type == "CLOUD_RUN":
docker_image = config.executor_docker_image
job_name = absolute_import_name.split(':')[1]

json_encoded_config = json.dumps(config.get_data_refresh_config())
args = [
f'--import_name={absolute_import_name}',
f'--import_config={json_encoded_config}'
]
resources = {"cpu": "2", "memory": "2G"}
env_vars = {}
job = cloud_run.create_or_update_cloud_run_job(
config.gcp_project_id, config.scheduler_location, job_name,
docker_image, env_vars, args, resources)
job_id = job.name.rsplit('/', 1)[1]
if not job:
logging.error(
f'Failed to setup cloud run job for {absolute_import_name}')
cloud_run_job_url = f'{config.scheduler_location}-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/{config.gcp_project_id}/jobs/{job_id}:run'
req = cloud_scheduler.cloud_run_job_request(
absolute_import_name, schedule, json_encoded_config,
cloud_run_job_url, scheduler_config_dict[_GKE_SERVICE_ACCOUNT_KEY])
return cloud_scheduler.create_or_update_job(config.gcp_project_id,
config.scheduler_location,
req)
else:
raise Exception(
"Invalid executor_type %s, expects one of ('GKE', 'GAE')",
"Invalid executor_type %s, expects one of ('GKE', 'GAE', 'CLOUD_RUN')",
config.executor_type)

return cloud_scheduler.create_or_update_job(config.gcp_project_id,
config.scheduler_location, req)
28 changes: 28 additions & 0 deletions import-automation/executor/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Builds the docker image of import executor, pushes it to artifact registry,
# and creates a cloud run job using this images.
#
# Run it using:
# gcloud builds submit --config=cloudbuild.yaml --substitutions=_DOCKER_IMAGE="us-docker.pkg.dev/datcom-ci/gcr.io/dc-import-executor:latest" .

steps:
# Install dependencies
- name: python
entrypoint: pip
args: ["install", "-r", "requirements.txt", "--user"]

# Docker Build
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', '${_DOCKER_IMAGE}', '.']

# Docker push to Google Artifact Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_DOCKER_IMAGE}']

# Deploy to Cloud Run
#- name: google/cloud-sdk
# args: ['gcloud', 'run', 'jobs', 'deploy', '${_JOB_NAME}',
# '--image=${_DOCKER_IMAGE}', '--region', '${_REGION}']

# Store images in Google Artifact Registry
images:
- ${_DOCKER_IMAGE}
48 changes: 48 additions & 0 deletions import-automation/executor/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging
import json

from absl import flags
from absl import app

from app import configs
from app.executor import import_executor
from app.service import file_uploader
from app.service import github_api
from app.service import email_notifier


FLAGS = flags.FLAGS
flags.DEFINE_string('import_name', '', 'Absoluate import name.')
flags.DEFINE_string('import_config', '', 'Import executor configuration.')


def scheduled_updates(absolute_import_name: str, import_config: str):
logging.info(absolute_import_name)
cfg = json.loads(import_config)
config = configs.ExecutorConfig(**cfg)
logging.info(config)
executor = import_executor.ImportExecutor(
uploader=file_uploader.GCSFileUploader(
project_id=config.gcs_project_id,
bucket_name=config.storage_prod_bucket_name),
github=github_api.GitHubRepoAPI(
repo_owner_username=config.github_repo_owner_username,
repo_name=config.github_repo_name,
auth_username=config.github_auth_username,
auth_access_token=config.github_auth_access_token),
config=config,
notifier=email_notifier.EmailNotifier(config.email_account,
config.email_token))
result = executor.execute_imports_on_update(absolute_import_name)
logging.info(result)
if result.status == 'failed':
return 1
return 0


def main(_):
return scheduled_updates(FLAGS.import_name, FLAGS.import_config)


if __name__ == '__main__':
app.run(main)
Loading

0 comments on commit 37f56cb

Please sign in to comment.