-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Cloud Deployment IVa] EFS creation and mounting #1018
Changes from 34 commits
cf7407a
3d4e62b
664cf97
84c320b
f5ad841
570b1cc
51d2403
03ac300
93ddecc
a0c3e47
1160584
a2c2453
997adf7
68b91a1
dc384fe
6f93e81
b072225
b4a5972
ecc0f81
acd0a70
d8dc4f0
a0d23d4
fd29b2f
372bec5
e991109
4b7a1e7
cf22cfd
1ef23b9
635d2d4
3b847be
78f5bb1
ed975c8
ddc54ac
71f162c
6424f69
94eb0fd
b175c60
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ def submit_aws_batch_job( | |
docker_image: str, | ||
commands: Optional[list[str]] = None, | ||
environment_variables: Optional[dict[str, str]] = None, | ||
efs_volume_name: Optional[str] = None, | ||
job_dependencies: Optional[list[dict[str, str]]] = None, | ||
status_tracker_table_name: str = "neuroconv_batch_status_tracker", | ||
iam_role_name: str = "neuroconv_batch_role", | ||
|
@@ -42,6 +43,9 @@ def submit_aws_batch_job( | |
E.g., `commands=["echo", "'Hello, World!'"]`. | ||
environment_variables : dict, optional | ||
A dictionary of environment variables to pass to the Docker container. | ||
efs_volume_name : str | ||
CodyCBakerPhD marked this conversation as resolved.
Show resolved
Hide resolved
CodyCBakerPhD marked this conversation as resolved.
Show resolved
Hide resolved
|
||
The name of an EFS volume to be created and attached to the job. | ||
The path exposed to the container will always be `/mnt/efs`. | ||
job_dependencies : list of dict | ||
A list of job dependencies for this job to trigger. Structured as follows: | ||
[ | ||
|
@@ -88,6 +92,7 @@ def submit_aws_batch_job( | |
import boto3 | ||
|
||
region = region or "us-east-2" | ||
subregion = region + "a" # For anything that requires subregion, always default to "a" | ||
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) | ||
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) | ||
|
||
|
@@ -116,6 +121,12 @@ def submit_aws_batch_job( | |
aws_access_key_id=aws_access_key_id, | ||
aws_secret_access_key=aws_secret_access_key, | ||
) | ||
efs_client = boto3.client( | ||
service_name="efs", | ||
region_name=region, | ||
aws_access_key_id=aws_access_key_id, | ||
aws_secret_access_key=aws_secret_access_key, | ||
) | ||
|
||
# Get the tracking table and IAM role | ||
table = _create_or_get_status_tracker_table( | ||
|
@@ -131,10 +142,12 @@ def submit_aws_batch_job( | |
job_queue_name=job_queue_name, compute_environment_name=compute_environment_name, batch_client=batch_client | ||
) | ||
|
||
efs_id = _create_or_get_efs_id(efs_volume_name=efs_volume_name, efs_client=efs_client, region=region) | ||
job_definition_name = job_definition_name or _generate_job_definition_name( | ||
docker_image=docker_image, | ||
minimum_worker_ram_in_gib=minimum_worker_ram_in_gib, | ||
minimum_worker_cpus=minimum_worker_cpus, | ||
efs_id=efs_id, | ||
) | ||
job_definition_arn = _ensure_job_definition_exists_and_get_arn( | ||
job_definition_name=job_definition_name, | ||
|
@@ -143,6 +156,7 @@ def submit_aws_batch_job( | |
minimum_worker_cpus=minimum_worker_cpus, | ||
role_info=iam_role_info, | ||
batch_client=batch_client, | ||
efs_id=efs_id, | ||
) | ||
|
||
# Submit job and update status tracker | ||
|
@@ -160,6 +174,7 @@ def submit_aws_batch_job( | |
container_overrides["environment"] = [{key: value} for key, value in environment_variables.items()] | ||
if commands is not None: | ||
container_overrides["command"] = commands | ||
|
||
job_submission_info = batch_client.submit_job( | ||
jobName=job_name, | ||
dependsOn=job_dependencies, | ||
|
@@ -180,6 +195,7 @@ def submit_aws_batch_job( | |
table.put_item(Item=table_submission_info) | ||
|
||
info = dict(job_submission_info=job_submission_info, table_submission_info=table_submission_info) | ||
|
||
return info | ||
|
||
|
||
|
@@ -305,8 +321,8 @@ def _ensure_compute_environment_exists( | |
"type": "EC2", | ||
"allocationStrategy": "BEST_FIT", # Note: not currently supporting spot due to interruptibility | ||
"instanceTypes": ["optimal"], | ||
"minvCpus": 1, | ||
"maxvCpus": 8, # Not: not currently exposing control over this since these are mostly I/O intensive | ||
"minvCpus": 0, # Note: if not zero, will always keep an instance running in active state on standby | ||
"maxvCpus": 8, # Note: not currently exposing control over this since these are mostly I/O intensive | ||
Comment on lines
+324
to
+325
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @h-mayorquin This is a very important detail to be aware of |
||
"instanceRole": "ecsInstanceRole", | ||
# Security groups and subnets last updated on 8/4/2024 | ||
"securityGroupIds": ["sg-001699e5b7496b226"], | ||
|
@@ -391,9 +407,20 @@ def _ensure_job_queue_exists( | |
computeEnvironmentOrder=[ | ||
dict(order=1, computeEnvironment=compute_environment_name), | ||
], | ||
# Note: boto3 annotates the reason as a generic string | ||
# But really it is Literal[ | ||
# "MISCONFIGURATION:COMPUTE_ENVIRONMENT_MAX_RESOURCE", "MISCONFIGURATION:JOB_RESOURCE_REQUIREMENT" | ||
# ] | ||
# And we should have limits on both | ||
jobStateTimeLimitActions=[ | ||
dict( | ||
reason="Avoid zombie jobs.", | ||
reason="MISCONFIGURATION:COMPUTE_ENVIRONMENT_MAX_RESOURCE", | ||
state="RUNNABLE", | ||
maxTimeSeconds=minimum_time_to_kill_in_seconds, | ||
action="CANCEL", | ||
), | ||
dict( | ||
reason="MISCONFIGURATION:JOB_RESOURCE_REQUIREMENT", | ||
Comment on lines
+417
to
+423
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another bug fix I'm sneaking in here - previous testing suites ran under an existing compute environment that did not have this, but now it is actually setup properly (and tested in practice but no clue how to make a proper test for it) and will end any zombie jobs |
||
state="RUNNABLE", | ||
maxTimeSeconds=minimum_time_to_kill_in_seconds, | ||
action="CANCEL", | ||
|
@@ -418,11 +445,71 @@ def _ensure_job_queue_exists( | |
return None | ||
|
||
|
||
def _create_or_get_efs_id( | ||
efs_volume_name: Optional[str], efs_client: "boto3.client.efs", region: str = "us-east-2" | ||
) -> Optional[str]: # pragma: no cover | ||
if efs_volume_name is None: | ||
return None | ||
|
||
if region != "us-east-2": | ||
raise NotImplementedError("EFS volumes are only supported in us-east-2 for now.") | ||
|
||
available_efs_volumes = efs_client.describe_file_systems() | ||
matching_efs_volumes = [ | ||
file_system | ||
for file_system in available_efs_volumes["FileSystems"] | ||
for tag in file_system["Tags"] | ||
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name | ||
] | ||
|
||
if len(matching_efs_volumes) > 1: | ||
efs_volume = matching_efs_volumes[0] | ||
efs_id = efs_volume["FileSystemId"] | ||
|
||
return efs_id | ||
|
||
# Existing volume not found - must create a fresh one and set mount targets on it | ||
efs_volume = efs_client.create_file_system( | ||
PerformanceMode="generalPurpose", # Only type supported in one-zone | ||
Encrypted=False, | ||
ThroughputMode="elastic", | ||
# TODO: figure out how to make job spawn only on subregion for OneZone discount | ||
# AvailabilityZoneName=subregion, | ||
Backup=False, | ||
Tags=[{"Key": "Name", "Value": efs_volume_name}], | ||
) | ||
efs_id = efs_volume["FileSystemId"] | ||
|
||
# Takes a while to spin up - cannot assign mount targets until it is ready | ||
# TODO: in a follow-up replace with more robust checking mechanism | ||
time.sleep(60) | ||
|
||
# TODO: in follow-up, figure out how to fetch this automatically and from any region | ||
# (might even resolve those previous OneZone issues) | ||
region_to_subnet_id = { | ||
"us-east-2a": "subnet-0890a93aedb42e73e", | ||
"us-east-2b": "subnet-0e20bbcfb951b5387", | ||
"us-east-2c": "subnet-0680e07980538b786", | ||
} | ||
for subnet_id in region_to_subnet_id.values(): | ||
efs_client.create_mount_target( | ||
FileSystemId=efs_id, | ||
SubnetId=subnet_id, | ||
SecurityGroups=[ | ||
"sg-001699e5b7496b226", | ||
], | ||
) | ||
time.sleep(60) # Also takes a while to create the mount targets so add some buffer time | ||
|
||
return efs_id | ||
|
||
|
||
def _generate_job_definition_name( | ||
*, | ||
docker_image: str, | ||
minimum_worker_ram_in_gib: int, | ||
minimum_worker_cpus: int, | ||
efs_id: Optional[str] = None, | ||
) -> str: # pragma: no cover | ||
""" | ||
Generate a job definition name for the AWS Batch job. | ||
|
@@ -449,6 +536,8 @@ def _generate_job_definition_name( | |
job_definition_name += f"_{parsed_docker_image_name}-image" | ||
job_definition_name += f"_{minimum_worker_ram_in_gib}-GiB-RAM" | ||
job_definition_name += f"_{minimum_worker_cpus}-CPU" | ||
if efs_id is not None: | ||
job_definition_name += f"_{efs_id}" | ||
Comment on lines
+539
to
+540
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Turned out to be quite important to include the filesystem ID in the definition name to avoid reusing a previous definition created without the EFS mount configured I have toyed with the idea of making the job definition a hash of all the 'unique' configuration aspects, but it is also important for it to be readable - a follow-up towards the end might apply a bunch of human readable tags to this effect |
||
if docker_tag is None or docker_tag == "latest": | ||
date = datetime.now().strftime("%Y-%m-%d") | ||
job_definition_name += f"_created-on-{date}" | ||
|
@@ -464,6 +553,7 @@ def _ensure_job_definition_exists_and_get_arn( | |
minimum_worker_cpus: int, | ||
role_info: dict, | ||
batch_client: "boto3.client.Batch", | ||
efs_id: Optional[str] = None, | ||
max_retries: int = 12, | ||
) -> str: # pragma: no cover | ||
""" | ||
|
@@ -494,6 +584,9 @@ def _ensure_job_definition_exists_and_get_arn( | |
The IAM role information for the job. | ||
batch_client : boto3.client.Batch | ||
The AWS Batch client to use for the job. | ||
efs_id : str, optional | ||
The EFS volume information for the job. | ||
The path exposed to the container will always be `/mnt/efs`. | ||
max_retries : int, default: 12 | ||
If the job definition does not already exist, then this is the maximum number of times to synchronously | ||
check for its successful creation before erroring. | ||
|
@@ -534,6 +627,20 @@ def _ensure_job_definition_exists_and_get_arn( | |
minimum_time_to_kill_in_days = 1 # Note: eventually consider exposing this for very long jobs? | ||
minimum_time_to_kill_in_seconds = minimum_time_to_kill_in_days * 24 * 60 * 60 | ||
|
||
volumes = [] | ||
mountPoints = [] | ||
if efs_id is not None: | ||
volumes = [ | ||
{ | ||
"name": "neuroconv_batch_efs_mounted", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The name identifier here is entirely local (does not need to match the 'name' tagged on the actual EFS volume) |
||
"efsVolumeConfiguration": { | ||
"fileSystemId": efs_id, | ||
"transitEncryption": "DISABLED", | ||
}, | ||
}, | ||
] | ||
mountPoints = [{"containerPath": "/mnt/efs/", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}] | ||
|
||
# batch_client.register_job_definition() is not synchronous and so we need to wait a bit afterwards | ||
batch_client.register_job_definition( | ||
jobDefinitionName=job_definition_name, | ||
|
@@ -542,9 +649,13 @@ def _ensure_job_definition_exists_and_get_arn( | |
containerProperties=dict( | ||
image=docker_image, | ||
resourceRequirements=resource_requirements, | ||
jobRoleArn=role_info["Role"]["Arn"], | ||
executionRoleArn=role_info["Role"]["Arn"], | ||
# TODO: investigate if any IAM role is explicitly needed in conjunction with the credentials | ||
# jobRoleArn=role_info["Role"]["Arn"], | ||
# executionRoleArn=role_info["Role"]["Arn"], | ||
volumes=volumes, | ||
mountPoints=mountPoints, | ||
), | ||
platformCapabilities=["EC2"], | ||
) | ||
|
||
job_definition_request = batch_client.describe_job_definitions(jobDefinitions=[job_definition_with_revision]) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this got missed in the pyproject.toml refactor