Skip to content

Commit

Permalink
[Cloud Deployment IVa] EFS creation and mounting (#1018)
Browse files Browse the repository at this point in the history
Co-authored-by: CodyCBakerPhD <[email protected]>
  • Loading branch information
CodyCBakerPhD and CodyCBakerPhD authored Sep 12, 2024
1 parent 2611825 commit 78c1177
Show file tree
Hide file tree
Showing 4 changed files with 273 additions and 17 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# Upcoming

## v.0.6.3

## Bug Fixes
* Fixed a setup bug introduced in `v0.6.2` where installation process created a directory instead of a file for test configuration file [PR #1070](https://github.com/catalystneuro/neuroconv/pull/1070)
* The method `get_extractor` now works for `MockImagingInterface` [PR #1076](https://github.com/catalystneuro/neuroconv/pull/1076)

## Deprecations

## Features
* Added automated EFS volume creation and mounting to the `submit_aws_job` helper function. [PR #1018](https://github.com/catalystneuro/neuroconv/pull/1018)

## Improvements

Expand All @@ -26,6 +29,7 @@
* Added `get_stream_names` to `OpenEphysRecordingInterface`: [PR #1039](https://github.com/catalystneuro/neuroconv/pull/1039)
* Most data interfaces and converters now use Pydantic to validate their inputs, including existence of file and folder paths. [PR #1022](https://github.com/catalystneuro/neuroconv/pull/1022)
* All remaining data interfaces and converters now use Pydantic to validate their inputs, including existence of file and folder paths. [PR #1055](https://github.com/catalystneuro/neuroconv/pull/1055)
* Added automated EFS volume creation and mounting to the `submit_aws_job` helper function. [PR #1018](https://github.com/catalystneuro/neuroconv/pull/1018)


### Improvements
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def read_requirements(file):


extras_require = defaultdict(list)
extras_require["full"] = ["dandi>=0.58.1", "hdf5plugin"]
extras_require["full"] = ["dandi>=0.58.1", "hdf5plugin", "boto3"]

for modality in ["ophys", "ecephys", "icephys", "behavior", "text"]:
modality_path = root / "src" / "neuroconv" / "datainterfaces" / modality
Expand Down
121 changes: 116 additions & 5 deletions src/neuroconv/tools/aws/_submit_aws_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def submit_aws_batch_job(
docker_image: str,
commands: Optional[list[str]] = None,
environment_variables: Optional[dict[str, str]] = None,
efs_volume_name: Optional[str] = None,
job_dependencies: Optional[list[dict[str, str]]] = None,
status_tracker_table_name: str = "neuroconv_batch_status_tracker",
iam_role_name: str = "neuroconv_batch_role",
Expand Down Expand Up @@ -42,6 +43,9 @@ def submit_aws_batch_job(
E.g., `commands=["echo", "'Hello, World!'"]`.
environment_variables : dict, optional
A dictionary of environment variables to pass to the Docker container.
efs_volume_name : str, optional
The name of an EFS volume to be created and attached to the job.
The path exposed to the container will always be `/mnt/efs`.
job_dependencies : list of dict
A list of job dependencies for this job to trigger. Structured as follows:
[
Expand Down Expand Up @@ -88,6 +92,7 @@ def submit_aws_batch_job(
import boto3

region = region or "us-east-2"
subregion = region + "a" # For anything that requires subregion, always default to "a"
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None)
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None)

Expand Down Expand Up @@ -116,6 +121,12 @@ def submit_aws_batch_job(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
efs_client = boto3.client(
service_name="efs",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)

# Get the tracking table and IAM role
table = _create_or_get_status_tracker_table(
Expand All @@ -131,10 +142,12 @@ def submit_aws_batch_job(
job_queue_name=job_queue_name, compute_environment_name=compute_environment_name, batch_client=batch_client
)

efs_id = _create_or_get_efs_id(efs_volume_name=efs_volume_name, efs_client=efs_client, region=region)
job_definition_name = job_definition_name or _generate_job_definition_name(
docker_image=docker_image,
minimum_worker_ram_in_gib=minimum_worker_ram_in_gib,
minimum_worker_cpus=minimum_worker_cpus,
efs_id=efs_id,
)
job_definition_arn = _ensure_job_definition_exists_and_get_arn(
job_definition_name=job_definition_name,
Expand All @@ -143,6 +156,7 @@ def submit_aws_batch_job(
minimum_worker_cpus=minimum_worker_cpus,
role_info=iam_role_info,
batch_client=batch_client,
efs_id=efs_id,
)

# Submit job and update status tracker
Expand All @@ -160,6 +174,7 @@ def submit_aws_batch_job(
container_overrides["environment"] = [{key: value} for key, value in environment_variables.items()]
if commands is not None:
container_overrides["command"] = commands

job_submission_info = batch_client.submit_job(
jobName=job_name,
dependsOn=job_dependencies,
Expand All @@ -180,6 +195,7 @@ def submit_aws_batch_job(
table.put_item(Item=table_submission_info)

info = dict(job_submission_info=job_submission_info, table_submission_info=table_submission_info)

return info


Expand Down Expand Up @@ -305,8 +321,8 @@ def _ensure_compute_environment_exists(
"type": "EC2",
"allocationStrategy": "BEST_FIT", # Note: not currently supporting spot due to interruptibility
"instanceTypes": ["optimal"],
"minvCpus": 1,
"maxvCpus": 8, # Not: not currently exposing control over this since these are mostly I/O intensive
"minvCpus": 0, # Note: if not zero, will always keep an instance running in active state on standby
"maxvCpus": 8, # Note: not currently exposing control over this since these are mostly I/O intensive
"instanceRole": "ecsInstanceRole",
# Security groups and subnets last updated on 8/4/2024
"securityGroupIds": ["sg-001699e5b7496b226"],
Expand Down Expand Up @@ -391,9 +407,20 @@ def _ensure_job_queue_exists(
computeEnvironmentOrder=[
dict(order=1, computeEnvironment=compute_environment_name),
],
# Note: boto3 annotates the reason as a generic string
# But really it is Literal[
# "MISCONFIGURATION:COMPUTE_ENVIRONMENT_MAX_RESOURCE", "MISCONFIGURATION:JOB_RESOURCE_REQUIREMENT"
# ]
# And we should have limits on both
jobStateTimeLimitActions=[
dict(
reason="Avoid zombie jobs.",
reason="MISCONFIGURATION:COMPUTE_ENVIRONMENT_MAX_RESOURCE",
state="RUNNABLE",
maxTimeSeconds=minimum_time_to_kill_in_seconds,
action="CANCEL",
),
dict(
reason="MISCONFIGURATION:JOB_RESOURCE_REQUIREMENT",
state="RUNNABLE",
maxTimeSeconds=minimum_time_to_kill_in_seconds,
action="CANCEL",
Expand All @@ -418,11 +445,71 @@ def _ensure_job_queue_exists(
return None


def _create_or_get_efs_id(
efs_volume_name: Optional[str], efs_client: "boto3.client.efs", region: str = "us-east-2"
) -> Optional[str]: # pragma: no cover
if efs_volume_name is None:
return None

if region != "us-east-2":
raise NotImplementedError("EFS volumes are only supported in us-east-2 for now.")

available_efs_volumes = efs_client.describe_file_systems()
matching_efs_volumes = [
file_system
for file_system in available_efs_volumes["FileSystems"]
for tag in file_system["Tags"]
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name
]

if len(matching_efs_volumes) > 1:
efs_volume = matching_efs_volumes[0]
efs_id = efs_volume["FileSystemId"]

return efs_id

# Existing volume not found - must create a fresh one and set mount targets on it
efs_volume = efs_client.create_file_system(
PerformanceMode="generalPurpose", # Only type supported in one-zone
Encrypted=False,
ThroughputMode="elastic",
# TODO: figure out how to make job spawn only on subregion for OneZone discount
# AvailabilityZoneName=subregion,
Backup=False,
Tags=[{"Key": "Name", "Value": efs_volume_name}],
)
efs_id = efs_volume["FileSystemId"]

# Takes a while to spin up - cannot assign mount targets until it is ready
# TODO: in a follow-up replace with more robust checking mechanism
time.sleep(60)

# TODO: in follow-up, figure out how to fetch this automatically and from any region
# (might even resolve those previous OneZone issues)
region_to_subnet_id = {
"us-east-2a": "subnet-0890a93aedb42e73e",
"us-east-2b": "subnet-0e20bbcfb951b5387",
"us-east-2c": "subnet-0680e07980538b786",
}
for subnet_id in region_to_subnet_id.values():
efs_client.create_mount_target(
FileSystemId=efs_id,
SubnetId=subnet_id,
SecurityGroups=[
"sg-001699e5b7496b226",
],
)
time.sleep(60) # Also takes a while to create the mount targets so add some buffer time

return efs_id


def _generate_job_definition_name(
*,
docker_image: str,
minimum_worker_ram_in_gib: int,
minimum_worker_cpus: int,
efs_id: Optional[str] = None,
) -> str: # pragma: no cover
"""
Generate a job definition name for the AWS Batch job.
Expand All @@ -449,6 +536,8 @@ def _generate_job_definition_name(
job_definition_name += f"_{parsed_docker_image_name}-image"
job_definition_name += f"_{minimum_worker_ram_in_gib}-GiB-RAM"
job_definition_name += f"_{minimum_worker_cpus}-CPU"
if efs_id is not None:
job_definition_name += f"_{efs_id}"
if docker_tag is None or docker_tag == "latest":
date = datetime.now().strftime("%Y-%m-%d")
job_definition_name += f"_created-on-{date}"
Expand All @@ -464,6 +553,7 @@ def _ensure_job_definition_exists_and_get_arn(
minimum_worker_cpus: int,
role_info: dict,
batch_client: "boto3.client.Batch",
efs_id: Optional[str] = None,
max_retries: int = 12,
) -> str: # pragma: no cover
"""
Expand Down Expand Up @@ -494,6 +584,9 @@ def _ensure_job_definition_exists_and_get_arn(
The IAM role information for the job.
batch_client : boto3.client.Batch
The AWS Batch client to use for the job.
efs_id : str, optional
The EFS volume information for the job.
The path exposed to the container will always be `/mnt/efs`.
max_retries : int, default: 12
If the job definition does not already exist, then this is the maximum number of times to synchronously
check for its successful creation before erroring.
Expand Down Expand Up @@ -534,6 +627,20 @@ def _ensure_job_definition_exists_and_get_arn(
minimum_time_to_kill_in_days = 1 # Note: eventually consider exposing this for very long jobs?
minimum_time_to_kill_in_seconds = minimum_time_to_kill_in_days * 24 * 60 * 60

volumes = []
mountPoints = []
if efs_id is not None:
volumes = [
{
"name": "neuroconv_batch_efs_mounted",
"efsVolumeConfiguration": {
"fileSystemId": efs_id,
"transitEncryption": "DISABLED",
},
},
]
mountPoints = [{"containerPath": "/mnt/efs/", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}]

# batch_client.register_job_definition() is not synchronous and so we need to wait a bit afterwards
batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
Expand All @@ -542,9 +649,13 @@ def _ensure_job_definition_exists_and_get_arn(
containerProperties=dict(
image=docker_image,
resourceRequirements=resource_requirements,
jobRoleArn=role_info["Role"]["Arn"],
executionRoleArn=role_info["Role"]["Arn"],
# TODO: investigate if any IAM role is explicitly needed in conjunction with the credentials
# jobRoleArn=role_info["Role"]["Arn"],
# executionRoleArn=role_info["Role"]["Arn"],
volumes=volumes,
mountPoints=mountPoints,
),
platformCapabilities=["EC2"],
)

job_definition_request = batch_client.describe_job_definitions(jobDefinitions=[job_definition_with_revision])
Expand Down
Loading

0 comments on commit 78c1177

Please sign in to comment.