diff --git a/.github/workflows/aws_tests.yml b/.github/workflows/aws_tests.yml new file mode 100644 index 000000000..94abcb46b --- /dev/null +++ b/.github/workflows/aws_tests.yml @@ -0,0 +1,42 @@ +name: AWS Tests +on: + schedule: + - cron: "0 16 * * 1" # Weekly at noon on Monday + +concurrency: # Cancel previous workflows on the same pull request + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} + +jobs: + run: + name: ${{ matrix.os }} Python ${{ matrix.python-version }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + python-version: ["3.12"] + os: [ubuntu-latest] + steps: + - uses: actions/checkout@v4 + - run: git fetch --prune --unshallow --tags + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Global Setup + run: | + python -m pip install -U pip # Official recommended way + git config --global user.email "CI@example.com" + git config --global user.name "CI Almighty" + + - name: Install full requirements + run: pip install .[aws,test] + + - name: Run subset of tests that use AWS live services + run: pytest -rsx -n auto tests/test_minimal/test_tools/aws_tools.py diff --git a/.github/workflows/build_and_upload_docker_image_yaml_variable.yml b/.github/workflows/build_and_upload_docker_image_dev_for_ec2_deployment.yml similarity index 60% rename from .github/workflows/build_and_upload_docker_image_yaml_variable.yml rename to .github/workflows/build_and_upload_docker_image_dev_for_ec2_deployment.yml index 7ff2dc63c..b24d5e5ed 100644 --- a/.github/workflows/build_and_upload_docker_image_yaml_variable.yml +++ b/.github/workflows/build_and_upload_docker_image_dev_for_ec2_deployment.yml @@ -1,9 +1,8 @@ -name: Build and Upload Docker Image of latest with YAML variable to GHCR +name: Build and Upload Docker Image of Current Dev Branch to GHCR on: - workflow_run: - workflows: [build_and_upload_docker_image_latest_release] - types: [completed] + schedule: + - cron: "0 16 * * 1" # Weekly at noon EST on Monday workflow_dispatch: concurrency: # Cancel previous workflows on the same pull request @@ -12,7 +11,7 @@ concurrency: # Cancel previous workflows on the same pull request jobs: release-image: - name: Build and Upload Docker Image of latest with YAML variable to GHCR + name: Build and Upload Docker Image of Current Dev Branch to GHCR runs-on: ubuntu-latest steps: - name: Checkout @@ -27,11 +26,16 @@ jobs: registry: ghcr.io username: ${{ secrets.DOCKER_UPLOADER_USERNAME }} password: ${{ secrets.DOCKER_UPLOADER_PASSWORD }} - - name: Build and push YAML variable image based on latest + - name: Get current date + id: date + run: | + date_tag="$(date +'%Y-%m-%d')" + echo "date_tag=$date_tag" >> $GITHUB_OUTPUT + - name: Build and push uses: docker/build-push-action@v5 with: push: true # Push is a shorthand for --output=type=registry - tags: ghcr.io/catalystneuro/neuroconv:yaml_variable + tags: ghcr.io/catalystneuro/neuroconv:dev,ghcr.io/catalystneuro/neuroconv:${{ steps.date.outputs.date_tag }} context: . - file: dockerfiles/neuroconv_latest_yaml_variable + file: dockerfiles/neuroconv_dev_for_ec2_deployment provenance: false diff --git a/.github/workflows/build_and_upload_docker_image_latest_release.yml b/.github/workflows/build_and_upload_docker_image_latest_release.yml index 423686ec9..947369787 100644 --- a/.github/workflows/build_and_upload_docker_image_latest_release.yml +++ b/.github/workflows/build_and_upload_docker_image_latest_release.yml @@ -17,6 +17,7 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 + - name: Parse the version from the GitHub latest release tag id: parsed_version run: | @@ -26,6 +27,7 @@ jobs: echo "version_tag=$version_tag" >> $GITHUB_OUTPUT - name: Printout parsed version for GitHub Action log run: echo ${{ steps.parsed_version.outputs.version_tag }} + - name: Set up QEMU uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx @@ -36,11 +38,12 @@ jobs: registry: ghcr.io username: ${{ secrets.DOCKER_UPLOADER_USERNAME }} password: ${{ secrets.DOCKER_UPLOADER_PASSWORD }} + - name: Build and push uses: docker/build-push-action@v5 with: push: true # Push is a shorthand for --output=type=registry tags: ghcr.io/catalystneuro/neuroconv:latest,ghcr.io/catalystneuro/neuroconv:${{ steps.parsed_version.outputs.version_tag }} context: . - file: dockerfiles/neuroconv_latest_release_dockerfile + file: dockerfiles/neuroconv_release_dockerfile provenance: false diff --git a/.github/workflows/build_and_upload_docker_image_latest_release_for_ec2_deployment.yml b/.github/workflows/build_and_upload_docker_image_latest_release_for_ec2_deployment.yml new file mode 100644 index 000000000..b7f62d980 --- /dev/null +++ b/.github/workflows/build_and_upload_docker_image_latest_release_for_ec2_deployment.yml @@ -0,0 +1,48 @@ +name: Build and Upload Docker Image of Latest Release for EC2 Deployment to GHCR + +on: + schedule: + - cron: "0 16 * * 1" # Weekly at noon EST on Monday + workflow_dispatch: + +concurrency: # Cancel previous workflows on the same pull request + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + release-image: + name: Build and Upload Docker Image + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Parse the version from the GitHub latest release tag + id: parsed_version + run: | + git fetch --prune --unshallow --tags + tags="$(git tag --list)" + version_tag=${tags: -6 : 6} + echo "version_tag=$version_tag" >> $GITHUB_OUTPUT + - name: Printout parsed version for GitHub Action log + run: echo ${{ steps.parsed_version.outputs.version_tag }} + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ secrets.DOCKER_UPLOADER_USERNAME }} + password: ${{ secrets.DOCKER_UPLOADER_PASSWORD }} + + - name: Build and push + uses: docker/build-push-action@v5 + with: + push: true # Push is a shorthand for --output=type=registry + tags: ghcr.io/catalystneuro/neuroconv_for_ec2_deployment:dev + context: . + file: dockerfiles/neuroconv_release_for_ec2_deployment + provenance: false diff --git a/.github/workflows/build_and_upload_docker_image_rclone_with_config.yml b/.github/workflows/build_and_upload_docker_image_rclone_with_config.yml index 7ff197bdc..73ab0e5f7 100644 --- a/.github/workflows/build_and_upload_docker_image_rclone_with_config.yml +++ b/.github/workflows/build_and_upload_docker_image_rclone_with_config.yml @@ -26,6 +26,7 @@ jobs: registry: ghcr.io username: ${{ secrets.DOCKER_UPLOADER_USERNAME }} password: ${{ secrets.DOCKER_UPLOADER_PASSWORD }} + - name: Build and push uses: docker/build-push-action@v5 with: diff --git a/.github/workflows/live-service-testing.yml b/.github/workflows/live-service-testing.yml index 1f7cda4da..28be2d94a 100644 --- a/.github/workflows/live-service-testing.yml +++ b/.github/workflows/live-service-testing.yml @@ -36,8 +36,6 @@ jobs: - name: Install full requirements run: pip install .[test,full] - - name: Run subset of tests that use S3 live services - run: pytest -rsx -n auto tests/test_minimal/test_tools/s3_tools.py - name: Run subset of tests that use DANDI live services run: pytest -rsx -n auto tests/test_minimal/test_tools/dandi_transfer_tools.py - name: Run subset of tests that use Globus live services diff --git a/CHANGELOG.md b/CHANGELOG.md index dfc6e93c8..d4698495e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Upcoming +### Features +* Added MedPCInterface for operant behavioral output files. [PR #883](https://github.com/catalystneuro/neuroconv/pull/883) +* Added helper function `neuroconv.tools.data_transfers.submit_aws_batch_job` for basic automated submission of AWS batch jobs. [PR #384](https://github.com/catalystneuro/neuroconv/pull/384) + + ## v0.5.0 (July 17, 2024) @@ -12,7 +17,6 @@ ### Features * Added docker image and tests for an automated Rclone configuration (with file stream passed via an environment variable). [PR #902](https://github.com/catalystneuro/neuroconv/pull/902) -* Added MedPCInterface for operant behavioral output files. [PR #883](https://github.com/catalystneuro/neuroconv/pull/883) ### Bug fixes * Fixed the conversion option schema of a `SpikeGLXConverter` when used inside another `NWBConverter`. [PR #922](https://github.com/catalystneuro/neuroconv/pull/922) diff --git a/dockerfiles/neuroconv_dev_for_ec2_deployment b/dockerfiles/neuroconv_dev_for_ec2_deployment new file mode 100644 index 000000000..64f37037a --- /dev/null +++ b/dockerfiles/neuroconv_dev_for_ec2_deployment @@ -0,0 +1,6 @@ +FROM python:3.11.7-slim +LABEL org.opencontainers.image.source=https://github.com/catalystneuro/neuroconv +LABEL org.opencontainers.image.description="A docker image extending the dev branch of the NeuroConv package with modifications related to deployment on EC2 Batch." +ADD ./ neuroconv +RUN cd neuroconv && pip install .[full] +CMD printf "$NEUROCONV_YAML" > run.yml && python -m neuroconv_ec2 run.yml --data-folder-path "$NEUROCONV_DATA_PATH" --output-folder-path "$NEUROCONV_OUTPUT_PATH" --overwrite --upload-to-dandiset-id "$DANDISET_ID" --update-tracking-table "$TRACKING_TABLE" --tracking-table-submission-id "$SUBMISSION_ID" --efs-volume-name-to-cleanup "$EFS_VOLUME" diff --git a/dockerfiles/neuroconv_latest_yaml_variable b/dockerfiles/neuroconv_latest_yaml_variable deleted file mode 100644 index ea411ee44..000000000 --- a/dockerfiles/neuroconv_latest_yaml_variable +++ /dev/null @@ -1,4 +0,0 @@ -FROM ghcr.io/catalystneuro/neuroconv:latest -LABEL org.opencontainers.image.source=https://github.com/catalystneuro/neuroconv -LABEL org.opencontainers.image.description="A docker image for the most recent official release of the NeuroConv package. Modified to take in environment variables for the YAML conversion specification and other command line arguments." -CMD echo "$NEUROCONV_YAML" > run.yml && python -m neuroconv run.yml --data-folder-path "$NEUROCONV_DATA_PATH" --output-folder-path "$NEUROCONV_OUTPUT_PATH" --overwrite diff --git a/dockerfiles/neuroconv_latest_release_dockerfile b/dockerfiles/neuroconv_release_dockerfile similarity index 63% rename from dockerfiles/neuroconv_latest_release_dockerfile rename to dockerfiles/neuroconv_release_dockerfile index a4a532b58..a2364d57e 100644 --- a/dockerfiles/neuroconv_latest_release_dockerfile +++ b/dockerfiles/neuroconv_release_dockerfile @@ -1,6 +1,6 @@ FROM python:3.11.7-slim LABEL org.opencontainers.image.source=https://github.com/catalystneuro/neuroconv -LABEL org.opencontainers.image.description="A docker image for the most recent official release of the NeuroConv package." +LABEL org.opencontainers.image.description="A docker image for an official release of the full NeuroConv package." RUN apt update && apt install musl-dev python3-dev -y RUN pip install "neuroconv[full]" CMD ["python -m"] diff --git a/dockerfiles/neuroconv_release_for_ec2_deployment b/dockerfiles/neuroconv_release_for_ec2_deployment new file mode 100644 index 000000000..723e4e631 --- /dev/null +++ b/dockerfiles/neuroconv_release_for_ec2_deployment @@ -0,0 +1,4 @@ +FROM ghcr.io/catalystneuro/neuroconv:latest +LABEL org.opencontainers.image.source=https://github.com/catalystneuro/neuroconv +LABEL org.opencontainers.image.description="A docker image extending the official release of the NeuroConv package with modifications related to deployment on EC2 Batch." +CMD printf "$NEUROCONV_YAML" > run.yml && python -m neuroconv_ec2 run.yml --data-folder-path "$NEUROCONV_DATA_PATH" --output-folder-path "$NEUROCONV_OUTPUT_PATH" --overwrite --upload-to-dandiset-id "$DANDISET_ID" --update-tracking-table "$TRACKING_TABLE" --tracking-table-submission-id "$SUBMISSION_ID" --efs-volume-name-to-cleanup "$EFS_VOLUME" diff --git a/docs/developer_guide/aws_batch_deployment.rst b/docs/developer_guide/aws_batch_deployment.rst new file mode 100644 index 000000000..5c2ad68de --- /dev/null +++ b/docs/developer_guide/aws_batch_deployment.rst @@ -0,0 +1,168 @@ +One way of deploying items on AWS Batch is to manually setup the entire workflow through AWS web UI, and to manually submit each jobs in that manner. + +Deploying hundreds of jobs in this way would be cumbersome. + +Here are two other methods that allow simpler deployment by using `boto3` + + +Semi-automated Deployment of NeuroConv on AWS Batch +--------------------------------------------------- + +Step 1: Transfer data to Elastic File System (EFS) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The nice thing about using EFS is that we are only ever billed for our literal amount of disk storage over time, and do not need to specify a particular fixed allocation or scaling strategy. + +It is also relatively easy to mount across multiple AWS Batch jobs simultaneously. + +Unfortunately, the one downside is that it's pricing per GB-month is significantly higher than either S3 or EBS. + +To easily transfer data from a Google Drive (or theoretically any backend supported by `rclone`), set the following environment variables for rclone credentials: `DRIVE_NAME`, `TOKEN`, `REFRESH_TOKEN`, and `EXPIRY`. + +.. note: + + I eventually hope to just be able to read and pass these directly from a local `rclone.conf` file, but + +.. note: + + All path references must point to `/mnt/data/` as the base in order to persist across jobs. + +.. code: python + + import os + from datetime import datetime + + from neuroconv.tools.data_transfers import submit_aws_batch_job + + job_name = "" + docker_container = "ghcr.io/catalystneuro/rclone_auto_config:latest" + efs_name = "" + + log_datetime = str(datetime.now()).replace(" ", ":") # no spaces in CLI + RCLONE_COMMAND = f"{os.environ['RCLONE_COMMAND']} -v --config /mnt/data/rclone.conf --log-file /mnt/data/submit-{log_datetime}.txt" + + environment_variables = [ + dict(name="DRIVE_NAME", value=os.environ["DRIVE_NAME"]), + dict(name="TOKEN", value=os.environ["TOKEN"]), + dict(name="REFRESH_TOKEN", value=os.environ["REFRESH_TOKEN"]), + dict(name="EXPIRY", value=os.environ["EXPIRY"]), + dict(name="RCLONE_COMMAND", value=RCLONE_COMMAND), + ] + + submit_aws_batch_job( + job_name=job_name, + docker_container=docker_container, + efs_name=efs_name, + environment_variables=environment_variables, + ) + + +An example `RCLONE_COMMAND` for a drive named 'MyDrive' and the GIN testing data stored under `/ephy_testing_data/spikeglx/Noise4Sam_g0/` of that drive would be + +.. code: + + RCLONE_COMMAND = "sync MyDrive:/ephy_testing_data/spikeglx/Noise4Sam_g0 /mnt/data/Noise4Sam_g0" + + +Step 2: Run the YAML Conversion Specification +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Continuing the example above, if we have the YAML file `test_batch.yml` + +.. code: + + metadata: + NWBFile: + lab: My Lab + institution: My Institution + + conversion_options: + stub_test: True + + data_interfaces: + ap: SpikeGLXRecordingInterface + lf: SpikeGLXRecordingInterface + + experiments: + ymaze: + metadata: + NWBFile: + session_description: Testing batch deployment. + + sessions: + - nwbfile_name: /mnt/data/test_batch_deployment.nwb + source_data: + ap: + file_path: /mnt/data/Noise4Sam_g0/Noise4Sam_g0_imec0/Noise4Sam_g0_t0.imec0.ap.bin + lf: + file_path: /mnt/data/Noise4Sam_g0/Noise4Sam_g0_imec0/Noise4Sam_g0_t0.imec0.lf.bin + metadata: + NWBFile: + session_id: test_batch_deployment + Subject: + subject_id: "1" + sex: F + age: P35D + species: Mus musculus + +then we can run the following stand-alone script to deploy the conversion after confirming Step 1 completed successfully. + +.. code: + + from neuroconv.tools.data_transfers import submit_aws_batch_job + + job_name = "" + docker_container = "ghcr.io/catalystneuro/neuroconv:dev_auto_yaml" + efs_name = "" + + yaml_file_path = "/path/to/test_batch.yml" + + with open(file=yaml_file_path) as file: + YAML_STREAM = "".join(file.readlines()).replace('"', "'") + + environment_variables = [dict(name="YAML_STREAM", value=YAML_STREAM)] + + submit_aws_batch_job( + job_name=job_name, + docker_container=docker_container, + efs_name=efs_name, + environment_variables=environment_variables, + ) + + +Step 3: Ensure File Cleanup +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +TODO: write a dockerfile to perform this step with the API + +It's a good idea to confirm that you have access to your EFS from on-demand resources in case you ever need to go in and perform a manual cleanup operation. + +Boot up a EC2 t2.micro instance using AWS Linux 2 image with minimal resources. + +Create 2 new security groups, `EFS Target` (no policies set) and `EFS Mount` (set inbound policy to NFS with the `EFS Target` as the source). + +On the EC2 instance, change the security group to the `EFS Target`. On the EFS Network settings, add the `EFS Mount` group. + +Connect to the EC2 instance and run + +.. code: + + mkdir ~/efs-mount-point # or any other name you want; I do recommend keeping this in the home directory (~) for ease of access though + sudo mount -t nfs -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport fs-.efs.us-east-2.amazonaws.com:/ ~/efs-mount-point # Note that any operations performed on contents of the mounted volume must utilize sudo + +and it _should_ work, but this step is known to have various issues. If you did everything exactly as illustrated above, hopefully it should work. At least it did on 4/2/2023. + +You can now read, write, and importantly delete any contents on the EFS. + +Until the automated DANDI upload is implemented in YAML functionality, you will need to use this method to manually remove the NWB file. + +Even after, you should double check to ensure the `cleanup=True` flag to that function properly executed. + + + +Fully Automated Deployment of NeuroConv on AWS Batch +---------------------------------------------------- + +Coming soon... + +Approach is essentially the same as the semi-automated, I just submit all jobs at the same time with the jobs being dependent on the completion of one another. diff --git a/setup.py b/setup.py index 0c89dd340..5ba404ccc 100644 --- a/setup.py +++ b/setup.py @@ -19,6 +19,9 @@ extras_require = defaultdict(list) +extras_require["aws"].append("boto3") +extras_require["full"].extend(extras_require["aws"]) + extras_require["dandi"].append("dandi>=0.58.1") extras_require["full"].extend(extras_require["dandi"]) @@ -82,7 +85,8 @@ extras_require=extras_require, entry_points={ "console_scripts": [ - "neuroconv = neuroconv.tools.yaml_conversion_specification._yaml_conversion_specification:run_conversion_from_yaml_cli", + "neuroconv = neuroconv.tools.yaml_conversion_specification._yaml_conversion_specification_cli:run_conversion_from_yaml_cli", + "neuroconv_ec2 = neuroconv.tools.yaml_conversion_specification._yaml_conversion_specification_cli:run_ec2_conversion_from_yaml_cli", ], }, license="BSD-3-Clause", diff --git a/src/neuroconv/tools/data_transfers/__init__.py b/src/neuroconv/tools/data_transfers/__init__.py index 565dee884..a6a90b848 100644 --- a/src/neuroconv/tools/data_transfers/__init__.py +++ b/src/neuroconv/tools/data_transfers/__init__.py @@ -1,14 +1,24 @@ """Collection of helper functions for assessing and performing automated data transfers.""" -from ._aws import estimate_s3_conversion_cost +from ._aws import ( + estimate_s3_conversion_cost, + submit_aws_batch_job, + update_table_status, + deploy_conversion_on_ec2, + delete_efs_volume, +) from ._dandi import automatic_dandi_upload from ._globus import get_globus_dataset_content_sizes, transfer_globus_content from ._helpers import estimate_total_conversion_runtime __all__ = [ + "submit_aws_batch_job", + "delete_efs_volume", + "deploy_conversion_on_ec2", "estimate_s3_conversion_cost", "automatic_dandi_upload", "get_globus_dataset_content_sizes", "transfer_globus_content", "estimate_total_conversion_runtime", + "update_table_status", ] diff --git a/src/neuroconv/tools/data_transfers/_aws.py b/src/neuroconv/tools/data_transfers/_aws.py index 554e64a40..3339b6dcd 100644 --- a/src/neuroconv/tools/data_transfers/_aws.py +++ b/src/neuroconv/tools/data_transfers/_aws.py @@ -1,5 +1,15 @@ """Collection of helper functions for assessing and performing automated data transfers related to AWS.""" +import json +import os +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Literal, Optional +from uuid import uuid4 +from warnings import warn + +from pydantic import FilePath + def estimate_s3_conversion_cost( total_mb: float, @@ -29,4 +39,497 @@ def estimate_s3_conversion_cost( total_mb_s = total_mb**2 / 2 * (1 / transfer_rate_mb + (2 * c + 1) / conversion_rate_mb + 2 * c**2 / upload_rate_mb) cost_gb_m = 0.08 / 1e3 # $0.08 / GB Month cost_mb_s = cost_gb_m / (1e3 * 2.628e6) # assuming 30 day month; unsure how amazon weights shorter months? + return cost_mb_s * total_mb_s + + +def submit_aws_batch_job( + *, + region: str = "us-east-2", + job_name: str, + docker_image: str, + command: Optional[str] = None, + job_dependencies: Optional[List[Dict[str, str]]] = None, + iam_role_name: str = "neuroconv_batch_role", + compute_environment_name: str = "neuroconv_batch_environment", + job_queue_name: str = "neuroconv_batch_queue", + job_definition_name: Optional[str] = None, + minimum_worker_ram_in_gb: float = 4.0, + minimum_worker_cpus: int = 4, + efs_volume_name: Optional[str] = None, + environment_variables: Optional[Dict[str, str]] = None, + status_tracker_table_name: str = "neuroconv_batch_status_tracker", + submission_id: Optional[str] = None, +) -> Dict[str, str]: + """ + Submit a job to AWS Batch for processing. + + Parameters + ---------- + region : str, default: "us-east-2" + The AWS region to use for the job. + us-east-2 (Ohio) is the location of the DANDI Archive, and we recommend all operations be run in that region to + remove cross-region transfer costs. + job_name : str + The name of the job to submit. + docker_image : str + The name of the Docker image to use for the job. + command : str, optional + The command to run in the Docker container. + Current syntax only supports a single line; consecutive actions should be chained with the '&&' operator. + job_dependencies : list of dict + A list of job dependencies for this job to trigger. Structured as follows: + [ + {"jobId": "job_id_1", "type": "N_TO_N"}, + {"jobId": "job_id_2", "type": "SEQUENTIAL"}, + ... + ] + iam_role_name : str, default: "neuroconv_batch_role" + The name of the IAM role to use for the job. + compute_environment_name : str, default: "neuroconv_batch_environment" + The name of the compute environment to use for the job. + job_queue_name : str, default: "neuroconv_batch_queue" + The name of the job queue to use for the job. + job_definition_name : str, optional + The name of the job definition to use for the job. + Defaults to f"neuroconv_batch_{ name of docker image }", + but replaces any colons from tags in the docker image name with underscores. + minimum_worker_ram_in_gb : int, default: 4.0 + The minimum amount of base worker memory required to run this job. + Determines the EC2 instance type selected by the automatic 'best fit' selector. + Recommended to be several GB to allow comfortable buffer space for data chunk iterators. + minimum_worker_cpus : int, default: 4 + The minimum number of CPUs required to run this job. + A minimum of 4 is required, even if only one will be used in the actual process. + efs_volume_name : str, optional + The name of the EFS volume to attach to the jobs used by the operation. + environment_variables : dict of str, optional + A dictionary of key-value pairs to pass to the docker image. + status_tracker_table_name : str, default: "neuroconv_batch_status_tracker" + The name of the DynamoDB table to use for tracking job status. + submission_id : str, optional + The unique ID to pair with this job submission when tracking the status via DynamoDB. + Defaults to a sampled UUID4. + + Returns + ------- + info : dict + A dictionary containing information about this AWS Batch job. + + info["job_submission_info"] is the return value of `boto3.client.submit_job` which contains the job ID. + info["table_submission_info"] is the initial row data inserted into the DynamoDB status tracking table. + """ + import boto3 + + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + if aws_access_key_id is None or aws_secret_access_key is None: + raise EnvironmentError( + "'AWS_ACCESS_KEY_ID' and 'AWS_SECRET_ACCESS_KEY' must both be set in the environment to use this function." + ) + + dynamodb_client = boto3.client( + service_name="dynamodb", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + dynamodb_resource = boto3.resource( + service_name="dynamodb", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + iam_client = boto3.client( + service_name="iam", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + batch_client = boto3.client( + service_name="batch", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + + # It is very useful to have a status tracker that is separate from the job environment + # Technically detailed logs of inner workings are given in the CloudWatch, but that can only really be + # analyzed from the AWS web console + current_tables = dynamodb_client.list_tables()["TableNames"] + if status_tracker_table_name not in current_tables: + table = dynamodb_resource.create_table( + TableName=status_tracker_table_name, + KeySchema=[dict(AttributeName="id", KeyType="HASH")], + AttributeDefinitions=[dict(AttributeName="id", AttributeType="S")], + ProvisionedThroughput=dict(ReadCapacityUnits=1, WriteCapacityUnits=1), + ) + else: + table = dynamodb_resource.Table(name=status_tracker_table_name) + + # Ensure role policy is set + current_roles = [role["RoleName"] for role in iam_client.list_roles()["Roles"]] + if iam_role_name not in current_roles: + assume_role_policy = dict( + Version="2012-10-17", + Statement=[ + dict(Effect="Allow", Principal=dict(Service="ecs-tasks.amazonaws.com"), Action="sts:AssumeRole"), + ], + ) + + role = iam_client.create_role(RoleName=iam_role_name, AssumeRolePolicyDocument=json.dumps(assume_role_policy)) + iam_client.attach_role_policy( + RoleName=role["Role"]["RoleName"], PolicyArn="arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess" + ) + iam_client.attach_role_policy( + RoleName=role["Role"]["RoleName"], PolicyArn="arn:aws:iam::aws:policy/CloudWatchFullAccess" + ) + else: + role = iam_client.get_role(RoleName=iam_role_name) + + # Ensure compute environment is setup + current_compute_environments = [ + environment["computeEnvironmentName"] + for environment in batch_client.describe_compute_environments()["computeEnvironments"] + ] + if compute_environment_name not in current_compute_environments: + batch_client.create_compute_environment( + computeEnvironmentName=compute_environment_name, + type="MANAGED", + state="ENABLED", + computeResources={ + "type": "EC2", + "allocationStrategy": "BEST_FIT", + "minvCpus": 0, + "maxvCpus": 256, + "subnets": ["subnet-0be50d51", "subnet-3fd16f77", "subnet-0092132b"], + "instanceRole": "ecsInstanceRole", + "securityGroupIds": ["sg-851667c7"], + "instanceTypes": ["optimal"], + }, + ) + + # Ensure job queue exists + current_job_queues = [queue["jobQueueName"] for queue in batch_client.describe_job_queues()["jobQueues"]] + if job_queue_name not in current_job_queues: + batch_client.create_job_queue( + jobQueueName=job_queue_name, + state="ENABLED", + priority=1, + computeEnvironmentOrder=[ + dict(order=100, computeEnvironment="dynamodb_import_environment"), + ], + ) + + # Ensure job definition exists + # By default, keep name unique by incorporating the name of the container + job_definition_docker_name = docker_image.replace(":", "_") + job_definition_name = job_definition_name or f"neuroconv_batch_{job_definition_docker_name}" + + resource_requirements = [ + { + "value": str(int(minimum_worker_ram_in_gb * 1e3 / 1.024**2)), # boto3 expects memory in round MiB + "type": "MEMORY", + }, + {"value": str(minimum_worker_cpus), "type": "VCPU"}, + ] + + container_properties = dict( + image=docker_image, + resourceRequirements=resource_requirements, + jobRoleArn=role["Role"]["Arn"], + executionRoleArn=role["Role"]["Arn"], + # environment=[ + # dict( + # name="AWS_DEFAULT_REGION", + # value=region, + # ) + # ], + ) + + if efs_volume_name is not None: + # Connect the job definition reference to the EFS mount + job_definition_name += f"_{efs_volume_name}" + + efs_client = boto3.client( + service_name="efs", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + all_efs_volumes = efs_client.describe_file_systems() + all_efs_volumes_by_name = {efs_volume["Name"]: efs_volume for efs_volume in all_efs_volumes["FileSystems"]} + efs_volume = all_efs_volumes_by_name[efs_volume_name] + + volumes = [ + dict( + name=efs_volume_name, + efsVolumeConfiguration=dict(fileSystemId=efs_volume["FileSystemId"]), + ) + ] + + container_properties.update(volumes) + + current_job_definitions = [ + job_definition["jobDefinitionName"] + for job_definition in batch_client.describe_job_definitions()["jobDefinitions"] + ] + if job_definition_name not in current_job_definitions: + batch_client.register_job_definition( + jobDefinitionName=job_definition_name, type="container", containerProperties=container_properties + ) + + # Submit job and update status tracker + currently_running_jobs = batch_client.list_jobs(jobQueue=job_queue_name) + if job_name in currently_running_jobs: + raise ValueError( + f"There is already a job named '{job_name}' running in the queue! " + "If you are submitting multiple jobs, each will need a unique name." + ) + + # Set environment variables to the docker container as well as optional commands to run + job_dependencies = job_dependencies or [] + + environment_variables_per_job = [ + dict( # The burden is on the calling script to update the table status to finished + name="STATUS_TRACKER_TABLE_NAME", + value=status_tracker_table_name, + ), + ] + if environment_variables is not None: + environment_variables_per_job.extend([{key: value for key, value in environment_variables.items()}]) + + container_overrides = dict(environment=environment_variables_per_job) + if command is not None: + container_overrides["command"] = [command] + + job_submission_info = batch_client.submit_job( + jobQueue=job_queue_name, + dependsOn=job_dependencies, + jobDefinition=job_definition_name, + jobName=job_name, + containerOverrides=container_overrides, + ) + + # Update DynamoDB status tracking table + submission_id = submission_id or str(uuid4()) + table_submission_info = dict( + id=submission_id, job_name=job_name, submitted_on=datetime.now().isoformat(), status="submitted" + ) + table.put_item(Item=table_submission_info) + + info = dict(job_submission_info=job_submission_info, table_submission_info=table_submission_info) + return info + + +def update_table_status( + *, + status_tracker_table_name: str, + submission_id: str, + status: str, + region: str = "us-east-2", +) -> None: + """ + Helper function for updating a status value on a DynamoDB table tracking the status of EC2 jobs. + + Intended for use by the running job to indicate its completion. + + Parameters + ---------- + status_tracker_table_name : str, default: "neuroconv_batch_status_tracker" + The name of the DynamoDB table to use for tracking job status. + submission_id : str + The random hash that was assigned on submission of this job to the status tracker table. + status : str + The new status value to update. + region : str, default: "us-east-2" + The AWS region to use for the job. + us-east-2 (Ohio) is the location of the DANDI Archive, and we recommend all operations be run in that region to + remove cross-region transfer costs. + """ + import boto3 + + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + + dynamodb_resource = boto3.resource( + service_name="dynamodb", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + table = dynamodb_resource.Table(name=status_tracker_table_name) + + table.update_item(Key={"id": submission_id}, AttributeUpdates={"status": {"Action": "PUT", "Value": status}}) + + return None + + +def delete_efs_volume(efs_volume_name: str) -> None: + """ + Delete an EFS volume of a particular name. + + Parameters + ---------- + efs_volume_name : str + The name of the EFS volume to delete. + """ + import boto3 + + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + + efs_client = boto3.client( + service_name="efs", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + + all_efs_volumes = efs_client.describe_file_systems() + all_efs_volumes_by_name = {efs_volume["Name"]: efs_volume for efs_volume in all_efs_volumes["FileSystems"]} + + efs_volume = all_efs_volumes_by_name.get(efs_volume_name, None) + if efs_volume is None: + warn( + message=( + f"The specified EFS volume '{efs_volume_name}' was not found, and so there is nothing to delete. " + "Please manually check your current EFS volumes to ensure none are hanging." + ), + stacklevel=2, + ) + + file_system_id = efs_volume["FileSystemId"] + efs_client.delete_file_system(FileSystemId=file_system_id) + + return None + + +def deploy_conversion_on_ec2( + specification_file_path: FilePath, + transfer_commands: str, + efs_volume_name: str, + dandiset_id: str, + region: str = "us-east-2", + transfer_method: Literal["rclone"] = "rclone", + transfer_config_file_path: Optional[FilePath] = None, + efs_volume_creation_options: Optional[dict] = None, + status_tracker_table_name: str = "neuroconv_batch_status_tracker", + cleanup_efs_volume: bool = True, +) -> None: + """ + Helper function for deploying a YAML-based NeuroConv data conversion in the cloud on AWS EC2 Batch. + + Parameters + ---------- + specification_file_path : FilePathType + File path leading to .yml specification file for NWB conversion. + transfer_commands : str + The syntax command to send to the transfer method. + E.g., `transfer_command="rclone copy YOUR_REMOTE:YOUR_SOURCE"` + efs_volume_name : str + The name of the EFS volume to attach to the jobs used by the operation. + dandiset_id : str + The six-digit Dandiset ID to use when uploading the data. + region : str, default: "us-east-2" + The AWS region to use for the job. + us-east-2 (Ohio) is the location of the DANDI Archive, and we recommend all operations be run in that region to + remove cross-region transfer costs. + transfer_method : Literal["rclone"] + The type of transfer used to move the data from the cloud source to the EFS volume. + Currently only supports Rclone. + transfer_config_file_path : FilePath, optional + Explicit path to the config file used by the transfer method. + When using `transfer_method = "rclone"`, this defaults to `~/.config/rclone/rclone.conf`. + efs_volume_creation_options : dict, optional + The dictionary of keyword arguments to pass to `boto3.client.EFS.create_file_system` when the volmume does not + already exist. + These are ignored if the volume already exists. + status_tracker_table_name : str, default: "neuroconv_batch_status_tracker" + The name of the DynamoDB table to use for tracking job status. + cleanup_efs_volume : bool, default: True + Whether or not to schedule the deletion of the associated `efs_volume_name` when the deployment is complete. + This is recommended to avoid unnecessary costs from leaving unused resources hanging indefinitely. + It is also recommended to manually ensure periodically that this cleanup was successful. + """ + import boto3 + + dandi_api_token = os.getenv("DANDI_API_KEY") + assert dandi_api_token is not None, ( + "Unable to find environment variable 'DANDI_API_KEY'. " + "Please retrieve your token from DANDI and set this environment variable." + ) + + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + + transfer_config_file_path = transfer_config_file_path or Path().home() / ".config" / "rclone" / "rclone.conf" + efs_volume_creation_options = efs_volume_creation_options or dict() + + if transfer_method != "rclone": + raise NotImplementedError(f"The transfer method '{transfer_method}' is not yet supported!") + if not transfer_config_file_path.exists(): + raise ValueError(f"The `transfer_config_file_path` located at '{transfer_config_file_path}' does not exist!") + + # Make EFS volume if it doesn't already exist + efs_client = boto3.client( + service_name="efs", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + all_efs_volumes = efs_client.describe_file_systems() + all_efs_volumes_by_name = {efs_volume["Name"]: efs_volume for efs_volume in all_efs_volumes["FileSystems"]} + + efs_volume = all_efs_volumes_by_name.get(efs_volume_name, None) + if efs_volume is None: + efs_volume_default_creation_kwargs = dict( + PerformanceMode="generalPurpose", + # Setting AvailabilityZoneName sets the volume as 'One-Zone', which is cheaper + AvailabilityZoneName="us-east-2b", + Tags=[dict(Key="Name", Value=efs_volume_name)], + ) + efs_volume_creation_kwargs = dict(efs_volume_default_creation_kwargs) + efs_volume_creation_kwargs.update(**efs_volume_creation_options) + + efs_volume = efs_client.create_file_system(**efs_volume_creation_kwargs) + + # To avoid errors related to name collisions, append all job names with a small unique reference + unique_job_reference = str(uuid4())[:8] + + # Job 1: Transfer data from source to EFS + with open(file=transfer_config_file_path) as io: + rclone_config_file_content = io.read() + + transfer_job_submission_info = submit_aws_batch_job( + transfer_job_name=Path(specification_file_path).stem + "_transfer_" + unique_job_reference, + docker_container="ghcr.io/catalystneuro/rclone_with_config:latest", + efs_volume_name=efs_volume_name, + environment_variables=[ + dict(name="RCLONE_CONFIG", value=rclone_config_file_content), + dict(name="RCLONE_COMMANDS", value=transfer_commands), + ], + status_tracker_table_name=status_tracker_table_name, + ) + + # Job 2: Run YAML specification on transferred data and upload to DANDI + with open(file=specification_file_path) as io: + specification_file_content = io.read() + + submit_aws_batch_job( + conversion_job_name=Path(specification_file_path).stem + "_conversion_" + unique_job_reference, + job_dependencies=[{"jobId": transfer_job_submission_info["jobId"], "type": "SEQUENTIAL"}], + docker_container="ghcr.io/catalystneuro/neuroconv_for_ec2_deployment:dev", + efs_volume_name=efs_volume_name, + environment_variables=[ + dict(name="NEUROCONV_YAML", value=specification_file_content), + dict(name="DANDI_API_KEY", value=dandi_api_token), + dict(name="DANDISET_ID", value=dandiset_id), + dict(name="AWS_ACCESS_KEY_ID", value=aws_access_key_id), + dict(name="AWS_SECRET_ACCESS_KEY", value=aws_secret_access_key), + dict(name="TRACKING_TABLE", value=status_tracker_table_name), + dict(name="SUBMISSION_ID", value=transfer_job_submission_info["table_submission_info"]["id"]), + dict(name="EFS_VOLUME", value=efs_volume_name), + ], + ) + + return None diff --git a/src/neuroconv/tools/yaml_conversion_specification/__init__.py b/src/neuroconv/tools/yaml_conversion_specification/__init__.py index f2adb6948..ef43b5306 100644 --- a/src/neuroconv/tools/yaml_conversion_specification/__init__.py +++ b/src/neuroconv/tools/yaml_conversion_specification/__init__.py @@ -1,3 +1,3 @@ -from ._yaml_conversion_specification import run_conversion_from_yaml +from ._yaml_conversion_specification import run_conversion_from_yaml, run_ec2_conversion_from_yaml -__all__ = ["run_conversion_from_yaml"] +__all__ = ["run_conversion_from_yaml", "run_ec2_conversion_from_yaml"] diff --git a/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py b/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py index 2c7e5b25c..c0a958e4e 100644 --- a/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py +++ b/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py @@ -1,49 +1,21 @@ +import os import sys from importlib import import_module from pathlib import Path from typing import Optional -import click from jsonschema import RefResolver, validate +from ..data_transfers import ( + automatic_dandi_upload, + delete_efs_volume, + update_table_status, +) +from ..importing import get_package from ...nwbconverter import NWBConverter from ...utils import FilePathType, FolderPathType, dict_deep_update, load_dict_from_file -@click.command() -@click.argument("specification-file-path") -@click.option( - "--data-folder-path", - help="Path to folder where the source data may be found.", - type=click.Path(writable=True), -) -@click.option( - "--output-folder-path", - default=None, - help="Path to folder where you want to save the output NWBFile.", - type=click.Path(writable=True), -) -@click.option("--overwrite", help="Overwrite an existing NWBFile at the location.", is_flag=True) -def run_conversion_from_yaml_cli( - specification_file_path: str, - data_folder_path: Optional[str] = None, - output_folder_path: Optional[str] = None, - overwrite: bool = False, -): - """ - Run the tool function 'run_conversion_from_yaml' via the command line. - - specification-file-path : - Path to the .yml specification file. - """ - run_conversion_from_yaml( - specification_file_path=specification_file_path, - data_folder_path=data_folder_path, - output_folder_path=output_folder_path, - overwrite=overwrite, - ) - - def run_conversion_from_yaml( specification_file_path: FilePathType, data_folder_path: Optional[FolderPathType] = None, @@ -64,8 +36,8 @@ def run_conversion_from_yaml( Folder path leading to the desired output location of the .nwb files. The default is the parent directory of the specification_file_path. overwrite : bool, default: False - If True, replaces any existing NWBFile at the nwbfile_path location, if save_to_file is True. - If False, appends the existing NWBFile at the nwbfile_path location, if save_to_file is True. + If True, replaces the existing corresponding NWBFile at the `output_folder_path`. + If False, appends the existing corresponding NWBFile at the `output_folder_path`. """ from dandi.organize import create_unique_filenames_from_metadata from dandi.pynwb_utils import _get_pynwb_metadata @@ -147,3 +119,79 @@ def run_conversion_from_yaml( # Rename file on system nwbfile_path_to_set.rename(str(output_folder_path / dandi_filename)) + + +def run_ec2_conversion_from_yaml( + specification_file_path: FilePathType, + upload_to_dandiset: str, + update_tracking_table: str, + tracking_table_submission_id: str, + efs_volume_name_to_cleanup: str, +): + """ + Run conversion to NWB given a yaml specification file. + + Parameters + ---------- + specification_file_path : FilePathType + File path leading to .yml specification file for NWB conversion. + upload_to_dandiset : str + If you wish to upload the resulting NWB file to a particular Dandiset, specify the six-digit ID here. + When using this feature, the `DANDI_API_KEY` environment variable must be set. + update_tracking_table : str + The name of the DynamoDB status tracking table to send a completion update to when the conversion is finished. + tracking_table_submission_id : str + The unique submission ID specifying the row (job) of the DynamoDB status tracking table to update the status of. + efs_volume_name_to_cleanup : str + The name of any associated EFS volume to cleanup upon successful conversion or upload. + This is only intended for use when running in EC2 Batch, but is necessary to include here in order to ensure + synchronicity. + """ + # Ensure boto3 is installed before beginning procedure + get_package(package_name="boto3") + + # This check is technically a part of the automatic dandi upload, but good to check as early as possible + # to avoid wasting time. + dandi_api_token = os.getenv("DANDI_API_KEY") + assert dandi_api_token is not None and dandi_api_token != "", ( + "Unable to read environment variable 'DANDI_API_KEY'. " + "Please retrieve your token from DANDI and set this environment variable." + ) + + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + assert aws_access_key_id is not None and aws_access_key_id != "", ( + "Unable to read environment variable 'AWS_ACCESS_KEY_ID'. " + "Please create and set AWS credentials if you wish to update a tracking table." + ) + assert aws_secret_access_key is not None and aws_secret_access_key != "", ( + "Unable to read environment variable 'AWS_SECRET_ACCESS_KEY'. " + "Please create and set AWS credentials if you wish to update a tracking table." + ) + + if update_tracking_table is not None and tracking_table_submission_id is None: + raise ValueError( + f"The table '{update_tracking_table}' was specified to be updated but no submission ID was specified! " + "Please specify the `tracking_table_submission_id` keyword argument." + ) + if update_tracking_table is None and tracking_table_submission_id is not None: + raise ValueError( + f"The submission ID '{tracking_table_submission_id}' was specified to be updated but no table name was " + "specified! Please specify the `update_tracking_table` keyword argument." + ) + + # Convert + run_conversion_from_yaml(specification_file_path=specification_file_path) + + # Upload + output_folder_path = Path(specification_file_path).parent + staging = int(upload_to_dandiset) >= 200_000 + automatic_dandi_upload(dandiset_id=upload_to_dandiset, nwb_folder_path=output_folder_path, staging=staging) + + # Update tracker + update_table_status( + status_tracker_table_name=update_tracking_table, submission_id=tracking_table_submission_id, status="Uploaded" + ) + + # Cleanup + delete_efs_volume(efs_volume_name=efs_volume_name_to_cleanup) diff --git a/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification_cli.py b/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification_cli.py new file mode 100644 index 000000000..bd1d50f63 --- /dev/null +++ b/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification_cli.py @@ -0,0 +1,93 @@ +from typing import Optional + +import click + +from ._yaml_conversion_specification import ( + run_conversion_from_yaml, + run_ec2_conversion_from_yaml, +) + + +@click.command() +@click.argument("specification-file-path") +@click.option( + "--data-folder-path", + help="Path to folder where the source data may be found.", + type=click.Path(writable=True), +) +@click.option( + "--output-folder-path", + default=None, + help="Path to folder where you want to save the output NWBFile.", + type=click.Path(writable=True), +) +@click.option("--overwrite", help="Overwrite an existing NWBFile at the location.", is_flag=True) +def run_conversion_from_yaml_cli( + specification_file_path: str, + data_folder_path: Optional[str] = None, + output_folder_path: Optional[str] = None, + overwrite: bool = False, +): + """ + Run the tool function 'run_conversion_from_yaml' via the command line. + + specification-file-path : + Path to the .yml specification file. + """ + run_conversion_from_yaml( + specification_file_path=specification_file_path, + data_folder_path=data_folder_path, + output_folder_path=output_folder_path, + overwrite=overwrite, + ) + + +@click.command() +@click.argument("specification-file-path") +@click.option( + "--upload-to-dandiset-id", + help=( + "Do you want to upload the result to DANDI? If so, specify the six-digit Dandiset ID. " + "Also ensure you have your DANDI_API_KEY set as an environment variable." + ), + type=str, + required=False, +) +@click.option( + "--update-tracking-table", + help=( + "The name of the DynamoDB status tracking table to send a completion update to when the conversion is finished." + ), + type=str, + required=False, +) +@click.option( + "--tracking-table-submission-id", + help=( + "The unique submission ID specifying the row (job) of the DynamoDB status tracking table " + "to update the status of." + ), + type=str, + required=False, +) +@click.option( + "--efs-volume-name-to-cleanup", + help="The name of any associated EFS volume to cleanup upon successful conversion or upload.", + type=str, + required=False, +) +def run_ec2_conversion_from_yaml_cli( + specification_file_path: str, + upload_to_dandiset: Optional[str] = None, + update_tracking_table: Optional[str] = None, + tracking_table_submission_id: Optional[str] = None, + efs_volume_name_to_cleanup: Optional[str] = None, +): + """Run the tool function `run_ec2_conversion_from_yaml` via the command line.""" + run_ec2_conversion_from_yaml( + specification_file_path=specification_file_path, + upload_to_dandiset=upload_to_dandiset, + update_tracking_table=update_tracking_table, + tracking_table_submission_id=tracking_table_submission_id, + efs_volume_name_to_cleanup=efs_volume_name_to_cleanup, + ) diff --git a/tests/imports.py b/tests/imports.py index 5f8b65e72..4ad279153 100644 --- a/tests/imports.py +++ b/tests/imports.py @@ -54,9 +54,9 @@ def test_tools(self): current_structure = _strip_magic_module_attributes(ls=tools.__dict__) expected_structure = [ - # Sub-Packages - "yaml_conversion_specification", # Attached to namespace by top __init__ call of NWBConverter # Sub-modules + "yaml_conversion_specification", # Attached to namespace by top __init__ call of NWBConverter + "data_transfers", # Attached indirectly by 'yaml_conversion_specification' "importing", # Attached to namespace by importing get_package "hdmf", "nwb_helpers", # Attached to namespace by top __init__ call of NWBConverter diff --git a/tests/test_minimal/test_tools/aws_tools.py b/tests/test_minimal/test_tools/aws_tools.py new file mode 100644 index 000000000..8d88f1df2 --- /dev/null +++ b/tests/test_minimal/test_tools/aws_tools.py @@ -0,0 +1,158 @@ +import os +from datetime import datetime +from pathlib import Path +from unittest import TestCase + +from neuroconv.tools.data_transfers import ( + deploy_conversion_on_ec2, + estimate_s3_conversion_cost, + estimate_total_conversion_runtime, + submit_aws_batch_job, +) + +from .test_on_data.setup_paths import OUTPUT_PATH + +RCLONE_DRIVE_ACCESS_TOKEN = os.environ["RCLONE_DRIVE_ACCESS_TOKEN"] +RCLONE_DRIVE_REFRESH_TOKEN = os.environ["RCLONE_DRIVE_REFRESH_TOKEN"] +RCLONE_EXPIRY_TOKEN = os.environ["RCLONE_EXPIRY_TOKEN"] + + +def test_estimate_s3_conversion_cost_standard(): + test_sizes = [ + 1, + 100, + 1e3, # 1 GB + 1e5, # 100 GB + 1e6, # 1 TB + 1e7, # 10 TB + 1e8, # 100 TB + ] + results = [estimate_s3_conversion_cost(total_mb=total_mb) for total_mb in test_sizes] + assert results == [ + 2.9730398740210563e-15, # 1 MB + 2.973039874021056e-11, # 100 MB + 2.9730398740210564e-09, # 1 GB + 2.9730398740210563e-05, # 100 GB + 0.002973039874021056, # 1 TB + 0.2973039874021056, # 10 TB + 29.73039874021056, # 100 TB + ] + + +def test_estimate_total_conversion_runtime(): + test_sizes = [ + 1, + 100, + 1e3, # 1 GB + 1e5, # 100 GB + 1e6, # 1 TB + 1e7, # 10 TB + 1e8, # 100 TB + ] + results = [estimate_total_conversion_runtime(total_mb=total_mb) for total_mb in test_sizes] + assert results == [ + 0.12352941176470589, + 12.352941176470589, + 123.52941176470588, + 12352.94117647059, + 123529.41176470589, + 1235294.1176470588, + 12352941.176470589, + ] + + +def test_submit_aws_batch_job(): + job_name = "test_submit_aws_batch_job" + docker_image = "ubuntu:latest" + command = "echo 'Testing NeuroConv AWS Batch submission." + + submit_aws_batch_job( + job_name=job_name, + docker_image=docker_image, + command=command, + ) + + +def test_submit_aws_batch_job_with_dependencies(): + job_name_1 = "test_submit_aws_batch_job_with_dependencies_1" + docker_image = "ubuntu:latest" + command_1 = "echo 'Testing NeuroConv AWS Batch submission." + + info = submit_aws_batch_job( + job_name=job_name_1, + docker_image=docker_image, + command=command_1, + ) + job_submission_info = info["job_submission_info"] + + job_name_2 = "test_submit_aws_batch_job_with_dependencies_1" + command_2 = "echo 'Testing NeuroConv AWS Batch submission with dependencies." + job_dependencies = [{"jobId": job_submission_info["jobId"], "type": "SEQUENTIAL"}] + submit_aws_batch_job( + job_name=job_name_2, + docker_image=docker_image, + command=command_2, + job_dependencies=job_dependencies, + ) + + +class TestDeployConversionOnEC2(TestCase): + """ + In order to run this test in CI successfully, whoever sets the Rclone credentials must use the following setup. + + 1) On your Google Drive, create a folder named 'test_neuroconv_ec2_batch_deployment' + 2) Create a subfolder there named 'test_rclone_source_data' + 3) Copy the 'spikelgx/Noise4Sam' and 'phy/phy_example_0' folders from the 'ephy_testing_data' into that subfolder + 4) Locally, run `rclone config`, then copy the relevant token values into GitHub Action secrets + """ + + test_folder = OUTPUT_PATH / "deploy_conversion_on_ec2_tests" + + # Save the .conf file in a separate folder to avoid the potential of the container using the locally mounted file + adjacent_folder = OUTPUT_PATH / "rclone_conf" + test_config_file = adjacent_folder / "rclone.conf" + + def setUp(self): + self.test_folder.mkdir(exist_ok=True) + self.adjacent_folder.mkdir(exist_ok=True) + + # Pretend as if .conf file already exists on the system (created via interactive `rclone config` command) + token_dictionary = dict( + access_token=RCLONE_DRIVE_ACCESS_TOKEN, + token_type="Bearer", + refresh_token=RCLONE_DRIVE_REFRESH_TOKEN, + expiry=RCLONE_EXPIRY_TOKEN, + ) + token_string = str(token_dictionary).replace("'", '"').replace(" ", "") + rclone_config_contents = [ + "[test_google_drive_remote]\n", + "type = drive\n", + "scope = drive\n", + f"token = {token_string}\n", + "team_drive = \n", + "\n", + ] + with open(file=self.test_config_file, mode="w") as io: + io.writelines(rclone_config_contents) + + def test_deploy_conversion_on_ec2(self): + path_to_test_yml_files = Path(__file__).parent.parent / "test_on_data" / "conversion_specifications" + yaml_file_path = path_to_test_yml_files / "GIN_conversion_specification.yml" + + transfer_commands = ( + "rclone copy test_google_drive_remote:test_neuroconv_ec2_batch_deployment {self.test_folder} " + "--verbose --progress --config ./rclone.conf" + ) + + date_tag = datetime.now().strftime("%y%m%d") + efs_volume_name = f"neuroconv_ci_tests_{date_tag}" + + deploy_conversion_on_ec2( + specification_file_path=yaml_file_path, + transfer_commands=transfer_commands, + transfer_config_file_path=self.test_config_file, + efs_volume_name=efs_volume_name, + dandiset_id="200560", + ) + + # assert that EFS volume was cleaned up after some extended wait time diff --git a/tests/test_minimal/test_tools/s3_tools.py b/tests/test_minimal/test_tools/s3_tools.py deleted file mode 100644 index 4998cc0c9..000000000 --- a/tests/test_minimal/test_tools/s3_tools.py +++ /dev/null @@ -1,48 +0,0 @@ -from neuroconv.tools.data_transfers import ( - estimate_s3_conversion_cost, - estimate_total_conversion_runtime, -) - - -def test_estimate_s3_conversion_cost_standard(): - test_sizes = [ - 1, - 100, - 1e3, # 1 GB - 1e5, # 100 GB - 1e6, # 1 TB - 1e7, # 10 TB - 1e8, # 100 TB - ] - results = [estimate_s3_conversion_cost(total_mb=total_mb) for total_mb in test_sizes] - assert results == [ - 2.9730398740210563e-15, # 1 MB - 2.973039874021056e-11, # 100 MB - 2.9730398740210564e-09, # 1 GB - 2.9730398740210563e-05, # 100 GB - 0.002973039874021056, # 1 TB - 0.2973039874021056, # 10 TB - 29.73039874021056, # 100 TB - ] - - -def test_estimate_total_conversion_runtime(): - test_sizes = [ - 1, - 100, - 1e3, # 1 GB - 1e5, # 100 GB - 1e6, # 1 TB - 1e7, # 10 TB - 1e8, # 100 TB - ] - results = [estimate_total_conversion_runtime(total_mb=total_mb) for total_mb in test_sizes] - assert results == [ - 0.12352941176470589, - 12.352941176470589, - 123.52941176470588, - 12352.94117647059, - 123529.41176470589, - 1235294.1176470588, - 12352941.176470589, - ]