Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cloud Deployment IV]: Simple neuroconv deployment #393

Closed
wants to merge 64 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
f96652b
added helper function
CodyCBakerPhD Mar 27, 2023
da3ee44
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 27, 2023
aa3619a
remake dockerfile; add dandi upload to YAML
CodyCBakerPhD Mar 29, 2023
1ae8034
debugged
CodyCBakerPhD Apr 2, 2023
8f50c80
Create aws_batch_deployment.rst
CodyCBakerPhD Apr 2, 2023
901f1e1
Delete dockerfile_neuroconv_with_rclone
CodyCBakerPhD Apr 2, 2023
d4ae252
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 2, 2023
5659b35
Merge branch 'main' into batch_helper
CodyCBakerPhD Apr 2, 2023
95ab319
Merge branch 'batch_helper' into simple_neuroconv_deployment
CodyCBakerPhD Apr 2, 2023
e822bc7
Merge branch 'main' into batch_helper
CodyCBakerPhD Apr 24, 2023
f1f7b9f
typos and formatting
bendichter Feb 18, 2024
53258c4
Merge branch 'batch_helper' into simple_neuroconv_deployment
bendichter Feb 18, 2024
9739320
resolve conflicts
Jul 15, 2024
9213391
add changelog
Jul 15, 2024
a476ba7
correct merge conflict and changelog + imports
Jul 15, 2024
4f6489d
format docstring
Jul 15, 2024
db51921
resolve conflicts
Jul 15, 2024
766185f
add changelog
Jul 15, 2024
9ae7ace
adjust changelog
Jul 15, 2024
c7fb810
split estimator to different PR
Jul 15, 2024
7fedcdd
expose extra options and add tests
Jul 15, 2024
f15cb68
Merge branch 'batch_helper' into simple_neuroconv_deployment
CodyCBakerPhD Jul 15, 2024
935f038
debug import
Jul 15, 2024
7e8ef72
fix bad conflict
Jul 15, 2024
f2be008
add boto3 to requirements
Jul 15, 2024
a4e7bf5
pass AWS credentials in function and actions
Jul 15, 2024
16ef3f6
Merge branch 'main' into batch_helper
CodyCBakerPhD Jul 22, 2024
4939c60
pass secrets
CodyCBakerPhD Jul 22, 2024
7c66c82
correct keyword name
CodyCBakerPhD Jul 22, 2024
b115adb
debug role fetching
CodyCBakerPhD Jul 22, 2024
dfcb148
fix syntax
CodyCBakerPhD Jul 22, 2024
57f65ce
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 22, 2024
38327f7
splinter out aws tests to reduce costs
CodyCBakerPhD Jul 22, 2024
90deef6
splinter out aws tests to reduce costs
CodyCBakerPhD Jul 22, 2024
0b6e429
temporarily disable
CodyCBakerPhD Jul 22, 2024
06e9bdb
fix suffix
CodyCBakerPhD Jul 22, 2024
fe16dde
limit matrix to reduce costs
CodyCBakerPhD Jul 22, 2024
7f40885
cancel previous
CodyCBakerPhD Jul 22, 2024
34328cf
remove iam role stuff; has to be set on user
CodyCBakerPhD Jul 22, 2024
17898f4
fix API call
CodyCBakerPhD Jul 22, 2024
de4e18f
update to modern standard; expose extra options; rename argument
CodyCBakerPhD Jul 22, 2024
47cc917
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 22, 2024
4eea2db
fix keyword argument in tests
CodyCBakerPhD Jul 22, 2024
4b22903
add status helper
CodyCBakerPhD Jul 22, 2024
e16551d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 22, 2024
29aa19b
debug
CodyCBakerPhD Jul 22, 2024
37223c9
enhance doc
CodyCBakerPhD Jul 22, 2024
1b4d88f
try not casting as strings
CodyCBakerPhD Jul 22, 2024
829e5f2
fix deserialization type
CodyCBakerPhD Jul 22, 2024
e76897f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 22, 2024
df8cb10
debug
CodyCBakerPhD Jul 22, 2024
67e8405
expose submission ID
CodyCBakerPhD Jul 22, 2024
297476f
fix datetime typing
CodyCBakerPhD Jul 22, 2024
2cfaf58
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 22, 2024
9ad5ef2
update test to new structure
CodyCBakerPhD Jul 22, 2024
4db6141
remove trigger
CodyCBakerPhD Jul 22, 2024
6949be0
restore trigger
CodyCBakerPhD Jul 22, 2024
c193c55
Merge branch 'batch_helper' into simple_neuroconv_deployment
CodyCBakerPhD Jul 22, 2024
c990ddf
Merge remote-tracking branch 'origin/simple_neuroconv_deployment' int…
Jul 22, 2024
26c5f69
resolve conflict
Jul 22, 2024
37d5be4
finish initial structure for deployment helper
CodyCBakerPhD Jul 24, 2024
9af5b99
separate base code; add new entrypoint; adjust dockerfiles; add EFS c…
CodyCBakerPhD Jul 25, 2024
4022b60
fix tests; make deletion safe
CodyCBakerPhD Jul 25, 2024
4491a7f
debugs
CodyCBakerPhD Jul 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions docs/developer_guide/aws_batch_deployment.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
One way of deploying items on AWS Batch is to manually setup the entire workflow through AWS web UI, and to manually submit each jobs in that manner.

Deploying hundreds of jobs in this way would be cumbersome.

Here are two other methods that allow simpler deployment by using `boto3`


Semi-automated Deployment of NeuroConv on AWS Batch
---------------------------------------------------

Step 1: Transfer data to Elastic File System (EFS)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The nice thing about using EFS is that we are only ever billed for our literal amount of disk storage over time, and do not need to specify a particular fixed allocation or scaling strategy.

It is also relatively easy to mount across multiple AWS Batch jobs simultaneously.

Unfortunately, the one downside is that it's pricing per GB-month is significantly higher than either S3 or EBS.

To easily transfer data from a Google Drive (or theoretically any backend supported by `rclone`), set the following environment variables for rclone credentials: `DRIVE_NAME`, `TOKEN`, `REFRESH_TOKEN`, and `EXPIRY`.

.. note:

I eventually hope to just be able to read and pass these directly from a local `rclone.conf` file, but

.. note:

All path references must point to `/mnt/data/` as the base in order to persist across jobs.

.. code: python

import os
from datetime import datetime

from neuroconv.tools.data_transfers import submit_aws_batch_job

job_name = "<unique job name>"
docker_container = "ghcr.io/catalystneuro/rclone_auto_config:latest"
efs_name = "<your EFS volume name>"

log_datetime = str(datetime.now()).replace(" ", ":") # no spaces in CLI
RCLONE_COMMAND = f"{os.environ['RCLONE_COMMAND']} -v --config /mnt/data/rclone.conf --log-file /mnt/data/submit-{log_datetime}.txt"

environment_variables = [
dict(name="DRIVE_NAME", value=os.environ["DRIVE_NAME"]),
dict(name="TOKEN", value=os.environ["TOKEN"]),
dict(name="REFRESH_TOKEN", value=os.environ["REFRESH_TOKEN"]),
dict(name="EXPIRY", value=os.environ["EXPIRY"]),
dict(name="RCLONE_COMMAND", value=RCLONE_COMMAND),
]

submit_aws_batch_job(
job_name=job_name,
docker_container=docker_container,
efs_name=efs_name,
environment_variables=environment_variables,
)


An example `RCLONE_COMMAND` for a drive named 'MyDrive' and the GIN testing data stored under `/ephy_testing_data/spikeglx/Noise4Sam_g0/` of that drive would be

.. code:

RCLONE_COMMAND = "sync MyDrive:/ephy_testing_data/spikeglx/Noise4Sam_g0 /mnt/data/Noise4Sam_g0"


Step 2: Run the YAML Conversion Specification
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Continuing the example above, if we have the YAML file `test_batch.yml`

.. code:

metadata:
NWBFile:
lab: My Lab
institution: My Institution

conversion_options:
stub_test: True

data_interfaces:
ap: SpikeGLXRecordingInterface
lf: SpikeGLXRecordingInterface

experiments:
ymaze:
metadata:
NWBFile:
session_description: Testing batch deployment.

sessions:
- nwbfile_name: /mnt/data/test_batch_deployment.nwb
source_data:
ap:
file_path: /mnt/data/Noise4Sam_g0/Noise4Sam_g0_imec0/Noise4Sam_g0_t0.imec0.ap.bin
lf:
file_path: /mnt/data/Noise4Sam_g0/Noise4Sam_g0_imec0/Noise4Sam_g0_t0.imec0.lf.bin
metadata:
NWBFile:
session_id: test_batch_deployment
Subject:
subject_id: "1"
sex: F
age: P35D
species: Mus musculus

then we can run the following stand-alone script to deploy the conversion after confirming Step 1 completed successfully.

.. code:

from neuroconv.tools.data_transfers import submit_aws_batch_job

job_name = "<unique job name>"
docker_container = "ghcr.io/catalystneuro/neuroconv:dev_auto_yaml"
efs_name = "<name of EFS>"

yaml_file_path = "/path/to/test_batch.yml"

with open(file=yaml_file_path) as file:
YAML_STREAM = "".join(file.readlines()).replace('"', "'")

environment_variables = [dict(name="YAML_STREAM", value=YAML_STREAM)]

submit_aws_batch_job(
job_name=job_name,
docker_container=docker_container,
efs_name=efs_name,
environment_variables=environment_variables,
)


Step 3: Ensure File Cleanup
~~~~~~~~~~~~~~~~~~~~~~~~~~~

TODO: write a dockerfile to perform this step with the API

It's a good idea to confirm that you have access to your EFS from on-demand resources in case you ever need to go in and perform a manual cleanup operation.

Boot up a EC2 t2.micro instance using AWS Linux 2 image with minimal resources.

Create 2 new security groups, `EFS Target` (no policies set) and `EFS Mount` (set inbound policy to NFS with the `EFS Target` as the source).

On the EC2 instance, change the security group to the `EFS Target`. On the EFS Network settings, add the `EFS Mount` group.

Connect to the EC2 instance and run

.. code:

mkdir ~/efs-mount-point # or any other name you want; I do recommend keeping this in the home directory (~) for ease of access though
sudo mount -t nfs -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport fs-<efs number>.efs.us-east-2.amazonaws.com:/ ~/efs-mount-point # Note that any operations performed on contents of the mounted volume must utilize sudo

and it _should_ work, but this step is known to have various issues. If you did everything exactly as illustrated above, hopefully it should work. At least it did on 4/2/2023.

You can now read, write, and importantly delete any contents on the EFS.

Until the automated DANDI upload is implemented in YAML functionality, you will need to use this method to manually remove the NWB file.

Even after, you should double check to ensure the `cleanup=True` flag to that function properly executed.



Fully Automated Deployment of NeuroConv on AWS Batch
----------------------------------------------------

Coming soon...

Approach is essentially the same as the semi-automated, I just submit all jobs at the same time with the jobs being dependent on the completion of one another.
150 changes: 148 additions & 2 deletions src/neuroconv/tools/data_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import json
import os
import re
from datetime import datetime
from pathlib import Path
from shutil import rmtree
from tempfile import mkdtemp
from time import sleep, time
from typing import Dict, List, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union
from uuid import uuid4
from warnings import warn

from dandi.download import download as dandi_download
Expand All @@ -15,7 +17,7 @@
from pynwb import NWBHDF5IO
from tqdm import tqdm

from .processes import deploy_process
from ..tools import get_package
from ..utils import FolderPathType, OptionalFolderPathType

try: # pragma: no cover
Expand Down Expand Up @@ -349,3 +351,147 @@ def automatic_dandi_upload(
rmtree(path=nwb_folder_path)
except PermissionError: # pragma: no cover
warn("Unable to clean up source files and dandiset! Please manually delete them.")


def submit_aws_batch_job(
job_name: str,
docker_container: str,
efs_name: str,
environment_variables: List[Dict[str, str]],
status_tracker_table_name: str = "neuroconv_batch_status_tracker",
iam_role_name: str = "neuroconv_batch_role",
compute_environment_name: str = "neuroconv_batch_environment",
job_queue_name: str = "neuroconv_batch_queue",
) -> None:
import boto3

region = "us-east-2" # TODO, maybe control AWS region? Technically not required if user has it set in credentials

dynamodb_client = boto3.client("dynamodb", region)
dynamodb_resource = boto3.resource("dynamodb", region)
iam_client = boto3.client("iam", region)
batch_client = boto3.client("batch", region)
efs_client = boto3.client("efs", region)

# It is extremely useful to have a status tracker that is separate from the job environment
# Technically detailed logs of inner workings are given in the CloudWatch, but that can only really be
# analyzed from the AWS web console
current_tables = dynamodb_client.list_tables()["TableNames"]
if status_tracker_table_name not in current_tables:
table = dynamodb_resource.create_table(
TableName=status_tracker_table_name,
KeySchema=[dict(AttributeName="id", KeyType="HASH")],
AttributeDefinitions=[dict(AttributeName="id", AttributeType="S")],
ProvisionedThroughput=dict(ReadCapacityUnits=1, WriteCapacityUnits=1),
)
else:
table = dynamodb_resource.Table(name=status_tracker_table_name)

# Ensure role policy is set
current_roles = [role["RoleName"] for role in iam_client.list_roles()["Roles"]]
if iam_role_name not in current_roles:
assume_role_policy = dict(
Version="2012-10-17",
Statement=[
dict(Effect="Allow", Principal=dict(Service="ecs-tasks.amazonaws.com"), Action="sts:AssumeRole")
],
)

role = iam_client.create_role(RoleName=iam_role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy))
iam_client.attach_role_policy(
RoleName=role["Role"]["RoleName"], PolicyArn="arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess"
)
iam_client.attach_role_policy(
RoleName=role["Role"]["RoleName"], PolicyArn="arn:aws:iam::aws:policy/CloudWatchFullAccess"
)
else:
role = iam_client.get_role(RoleName=iam_role_name)

# Ensure compute environment is setup
# Note that it's easier to do this through the UI
current_compute_environments = [
environment["computeEnvironmentName"]
for environment in batch_client.describe_compute_environments()["computeEnvironments"]
]
if compute_environment_name not in current_compute_environments:
batch_client.create_compute_environment(
computeEnvironmentName=compute_environment_name,
type="MANAGED",
state="ENABLED",
computeResources={
"type": "SPOT",
"instanceTypes": ["optimal"],
"allocationStrategy": "BEST_FIT_PROGRESSIVE",
"minvCpus": 0,
"maxvCpus": 256,
"instanceRole": "ecsInstanceRole",
"subnets": ["subnet-0890a93aedb42e73e", "subnet-0680e07980538b786", "subnet-0e20bbcfb951b5387"],
"securityGroupIds": [
# "sg-0ad453a713c5b5580",
"sg-001699e5b7496b226",
],
},
)

# Ensure job queue exists
current_job_queues = [queue["jobQueueName"] for queue in batch_client.describe_job_queues()["jobQueues"]]
if job_queue_name not in current_job_queues:
batch_client.create_job_queue(
jobQueueName=job_queue_name,
state="ENABLED",
priority=1,
computeEnvironmentOrder=[
dict(order=100, computeEnvironment=compute_environment_name),
],
)

# Create unique EFS volume - having some trouble with automatically doing this ATM
# efs_client.create_file_system(name=job_name) # TODO, decide how best to perform cleanup
# efs_name = "test_efs"

# Always re-register the job; if the name already exists, it will count as a 'revision'
batch_client.register_job_definition(
jobDefinitionName=job_name,
type="container",
containerProperties=dict(
image=docker_container,
# instanceType="t2.small",
memory=16 * 1024, # TODO, also conflicting info on if its MiB or GiB; confirmed, via boto3 it's in MiB
vcpus=4,
jobRoleArn=role["Role"]["Arn"],
executionRoleArn=role["Role"]["Arn"],
environment=[
dict(
name="AWS_DEFAULT_REGION",
value=region,
)
],
volumes=[
# {"name": efs_name, "host": {"sourcePath": "/mnt/data"}},
{
"name": efs_name,
# "host": {"sourcePath": "/mnt/data"},
"efsVolumeConfiguration": {"fileSystemId": "fs-0ab4d92c222097625"}, # "rootDirectory": ""},
}
],
mountPoints=[{"sourceVolume": efs_name, "containerPath": "/mnt/data", "readOnly": False}],
),
)

# Submit job and update status tracker
currently_running_jobs = batch_client.list_jobs(jobQueue=job_queue_name)
if job_name not in currently_running_jobs:
batch_client.submit_job(
jobQueue=job_queue_name,
jobDefinition=job_name,
jobName=job_name,
containerOverrides=dict(environment=environment_variables), # TODO - auto inject status tracker?
)
table.put_item(
Item=dict(id=str(uuid4()), job_name=job_name, submitted_on=str(datetime.now()), status="submitted")
)
else:
raise ValueError(
f"There is already a job named '{job_name}' running in the queue! "
"If you are submitting multiple jobs, each will need a unique name."
)
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
from .yaml_conversion_specification import run_conversion_from_yaml
from .yaml_conversion_specification import (
deploy_conversion_from_yaml_and_upload_to_dandi,
run_conversion_from_yaml,
)
Loading
Loading