Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cloud Deployment IVa] EFS creation and mounting #1018

Merged
merged 37 commits into from
Sep 12, 2024
Merged
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
cf7407a
adding EFS support; adjusting tests
Aug 19, 2024
3d4e62b
comment out efs test temporarily
Aug 19, 2024
664cf97
disable main tests
Aug 19, 2024
84c320b
try suppressing reason
Aug 19, 2024
f5ad841
fix reason
Aug 19, 2024
570b1cc
fix default volume
Aug 19, 2024
51d2403
fix default volume
Aug 19, 2024
03ac300
Merge branch 'main' into efs_mounting
CodyCBakerPhD Aug 29, 2024
93ddecc
Merge branch 'main' into efs_mounting
CodyCBakerPhD Aug 29, 2024
a0c3e47
finally figured it out
CodyCBakerPhD Sep 3, 2024
1160584
Merge branch 'main' into efs_mounting
CodyCBakerPhD Sep 3, 2024
a2c2453
fix import
CodyCBakerPhD Sep 3, 2024
997adf7
Merge branch 'efs_mounting' of https://github.com/catalystneuro/neuro…
CodyCBakerPhD Sep 3, 2024
68b91a1
enhance test assertions
CodyCBakerPhD Sep 3, 2024
dc384fe
debug retry delays
CodyCBakerPhD Sep 3, 2024
6f93e81
test debugs
CodyCBakerPhD Sep 3, 2024
b072225
debug
CodyCBakerPhD Sep 3, 2024
b4a5972
remove size assertion
CodyCBakerPhD Sep 3, 2024
ecc0f81
debug definition name assertion
CodyCBakerPhD Sep 3, 2024
acd0a70
relax one zone
CodyCBakerPhD Sep 3, 2024
d8dc4f0
final debugs
Sep 4, 2024
a0d23d4
Merge branch 'main' into efs_mounting
CodyCBakerPhD Sep 5, 2024
fd29b2f
try to suppress automatic job definition reuse
Sep 5, 2024
372bec5
Merge branch 'efs_mounting' of https://github.com/catalystneuro/neuro…
Sep 5, 2024
e991109
scope in private; add fs ID to definition name
Sep 5, 2024
4b7a1e7
debug
Sep 5, 2024
cf22cfd
let knew definition name decide
Sep 5, 2024
1ef23b9
add TODO note
Sep 6, 2024
635d2d4
restore tests
Sep 6, 2024
3b847be
Merge branch 'main' into efs_mounting
CodyCBakerPhD Sep 10, 2024
78f5bb1
Update CHANGELOG.md
CodyCBakerPhD Sep 10, 2024
ed975c8
Merge branch 'main' into efs_mounting
CodyCBakerPhD Sep 10, 2024
ddc54ac
Update CHANGELOG.md
CodyCBakerPhD Sep 10, 2024
71f162c
Merge branch 'main' into efs_mounting
CodyCBakerPhD Sep 10, 2024
6424f69
Update src/neuroconv/tools/aws/_submit_aws_batch_job.py
CodyCBakerPhD Sep 10, 2024
94eb0fd
Merge branch 'main' into efs_mounting
CodyCBakerPhD Sep 11, 2024
b175c60
Merge branch 'main' into efs_mounting
CodyCBakerPhD Sep 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ def read_requirements(file):


extras_require = defaultdict(list)
extras_require["full"] = ["dandi>=0.58.1", "hdf5plugin"]
extras_require["full"] = ["dandi>=0.58.1", "hdf5plugin", "boto3"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this got missed in the pyproject.toml refactor


for modality in ["ophys", "ecephys", "icephys", "behavior", "text"]:
modality_path = root / "src" / "neuroconv" / "datainterfaces" / modality
58 changes: 56 additions & 2 deletions src/neuroconv/tools/aws/_submit_aws_batch_job.py
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ def submit_aws_batch_job(
docker_image: str,
commands: Optional[List[str]] = None,
environment_variables: Optional[Dict[str, str]] = None,
efs_volume_name: Optional[str] = None,
job_dependencies: Optional[List[Dict[str, str]]] = None,
status_tracker_table_name: str = "neuroconv_batch_status_tracker",
iam_role_name: str = "neuroconv_batch_role",
@@ -42,6 +43,8 @@ def submit_aws_batch_job(
E.g., `commands=["echo", "'Hello, World!'"]`.
environment_variables : dict, optional
A dictionary of environment variables to pass to the Docker container.
efs_volume_name : str
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
The name of an EFS volume to be created and attached to the job.
job_dependencies : list of dict
A list of job dependencies for this job to trigger. Structured as follows:
[
@@ -88,6 +91,7 @@ def submit_aws_batch_job(
import boto3

region = region or "us-east-2"
subregion = region + "a" # For anything that requires subregion, always default to "a"
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None)
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None)

@@ -116,6 +120,12 @@ def submit_aws_batch_job(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
efs_client = boto3.client(
service_name="efs",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)

# Get the tracking table and IAM role
table = _create_or_get_status_tracker_table(
@@ -131,6 +141,29 @@ def submit_aws_batch_job(
job_queue_name=job_queue_name, compute_environment_name=compute_environment_name, batch_client=batch_client
)

# Create or fetch EFS volume and attach it to the job
efs_id = None
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
if efs_volume_name is not None:
efs_volumes = efs_client.describe_file_systems()
matching_efs_volumes = [
file_system
for file_system in efs_volumes["FileSystems"]
for tag in file_system["Tags"]
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name
]
if len(matching_efs_volumes) == 0:
efs_volume = efs_client.create_file_system(
PerformanceMode="generalPurpose", # Only type supported in one-zone
Encrypted=False,
ThroughputMode="elastic",
AvailabilityZoneName=subregion, # Enables one-zone for cheaper pricing
Backup=False,
Tags=[{"Key": "Name", "Value": efs_volume_name}],
)
else:
efs_volume = matching_efs_volumes[0]
efs_id = efs_volume["FileSystemId"]

job_definition_name = job_definition_name or _generate_job_definition_name(
docker_image=docker_image,
minimum_worker_ram_in_gib=minimum_worker_ram_in_gib,
@@ -143,6 +176,7 @@ def submit_aws_batch_job(
minimum_worker_cpus=minimum_worker_cpus,
role_info=iam_role_info,
batch_client=batch_client,
efs_id=efs_id,
)

# Submit job and update status tracker
@@ -160,6 +194,7 @@ def submit_aws_batch_job(
container_overrides["environment"] = [{key: value} for key, value in environment_variables.items()]
if commands is not None:
container_overrides["command"] = commands

job_submission_info = batch_client.submit_job(
jobName=job_name,
dependsOn=job_dependencies,
@@ -180,6 +215,10 @@ def submit_aws_batch_job(
table.put_item(Item=table_submission_info)

info = dict(job_submission_info=job_submission_info, table_submission_info=table_submission_info)

if efs_volume_name is not None:
info["efs_volume"] = efs_volume

return info


@@ -305,8 +344,8 @@ def _ensure_compute_environment_exists(
"type": "EC2",
"allocationStrategy": "BEST_FIT", # Note: not currently supporting spot due to interruptibility
"instanceTypes": ["optimal"],
"minvCpus": 1,
"maxvCpus": 8, # Not: not currently exposing control over this since these are mostly I/O intensive
"minvCpus": 0, # Note: if not zero, will always keep an instance running in active state on standby
"maxvCpus": 8, # Note: not currently exposing control over this since these are mostly I/O intensive
Comment on lines +324 to +325
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@h-mayorquin This is a very important detail to be aware of

"instanceRole": "ecsInstanceRole",
# Security groups and subnets last updated on 8/4/2024
"securityGroupIds": ["sg-001699e5b7496b226"],
@@ -464,6 +503,7 @@ def _ensure_job_definition_exists_and_get_arn(
minimum_worker_cpus: int,
role_info: dict,
batch_client: "boto3.client.Batch",
efs_id: Optional[str] = None,
max_retries: int = 12,
) -> str: # pragma: no cover
"""
@@ -494,6 +534,8 @@ def _ensure_job_definition_exists_and_get_arn(
The IAM role information for the job.
batch_client : boto3.client.Batch
The AWS Batch client to use for the job.
efs_id : str, optional
The EFS volume information for the job.
max_retries : int, default: 12
If the job definition does not already exist, then this is the maximum number of times to synchronously
check for its successful creation before erroring.
@@ -534,6 +576,17 @@ def _ensure_job_definition_exists_and_get_arn(
minimum_time_to_kill_in_days = 1 # Note: eventually consider exposing this for very long jobs?
minimum_time_to_kill_in_seconds = minimum_time_to_kill_in_days * 24 * 60 * 60

if efs_id is not None:
volumes = [
{
"name": "neuroconv_batch_efs_mounted",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name identifier here is entirely local (does not need to match the 'name' tagged on the actual EFS volume)

"efsVolumeConfiguration": {
"fileSystemId": efs_id,
"transitEncryption": "DISABLED",
},
},
]

# batch_client.register_job_definition() is not synchronous and so we need to wait a bit afterwards
batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
@@ -544,6 +597,7 @@ def _ensure_job_definition_exists_and_get_arn(
resourceRequirements=resource_requirements,
jobRoleArn=role_info["Role"]["Arn"],
executionRoleArn=role_info["Role"]["Arn"],
volumes=volumes,
),
)

59 changes: 59 additions & 0 deletions tests/test_minimal/test_tools/aws_tools.py
Original file line number Diff line number Diff line change
@@ -156,3 +156,62 @@ def test_submit_aws_batch_job_with_dependencies():
table.update_item(
Key={"id": table_submission_id_2}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}}
)


def test_submit_aws_batch_job_with_efs_mount():
region = "us-east-2"
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None)
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None)

dynamodb_resource = boto3.resource(
service_name="dynamodb",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
batch_client = boto3.client(
service_name="batch",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)

job_name = "test_submit_aws_batch_job"
docker_image = "ubuntu:latest"
commands = ["echo", "'Testing NeuroConv AWS Batch submission.'"]

# TODO: to reduce costs even more, find a good combinations of memory/CPU to minimize size of instance
info = submit_aws_batch_job(job_name=job_name, docker_image=docker_image, commands=commands, efs_volume="/mnt/data")

# Wait for AWS to process the job
time.sleep(60)

job_id = info["job_submission_info"]["jobId"]

all_jobs_response = batch_client.describe_jobs(jobs=[job_id])
assert all_jobs_response["ResponseMetadata"]["HTTPStatusCode"] == 200

jobs = all_jobs_response["jobs"]
assert len(jobs) == 1

job = jobs[0]
assert job["jobName"] == job_name
assert "neuroconv_batch_queue" in job["jobQueue"]
assert "neuroconv_batch_ubuntu-latest-image_4-GiB-RAM_4-CPU" in job["jobDefinition"]
assert job["status"] == "SUCCEEDED"

status_tracker_table_name = "neuroconv_batch_status_tracker"
table = dynamodb_resource.Table(name=status_tracker_table_name)
table_submission_id = info["table_submission_info"]["id"]

table_item_response = table.get_item(Key={"id": table_submission_id})
assert table_item_response["ResponseMetadata"]["HTTPStatusCode"] == 200

table_item = table_item_response["Item"]
assert table_item["job_name"] == job_name
assert table_item["job_id"] == job_id
assert table_item["status"] == "Job submitted..."

table.update_item(
Key={"id": table_submission_id}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}}
)