diff --git a/.github/workflows/aws_tests.yml b/.github/workflows/generic_aws_tests.yml similarity index 88% rename from .github/workflows/aws_tests.yml rename to .github/workflows/generic_aws_tests.yml index 0ecbb4d7b..20886a178 100644 --- a/.github/workflows/aws_tests.yml +++ b/.github/workflows/generic_aws_tests.yml @@ -11,7 +11,6 @@ concurrency: # Cancel previous workflows on the same pull request env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} jobs: run: @@ -36,8 +35,8 @@ jobs: git config --global user.email "CI@example.com" git config --global user.name "CI Almighty" - - name: Install full requirements + - name: Install AWS requirements run: pip install .[aws,test] - - name: Run subset of tests that use S3 live services - run: pytest -rsx -n auto tests/test_minimal/test_tools/aws_tools.py + - name: Run generic AWS tests + run: pytest -rsx -n auto tests/test_minimal/test_tools/aws_tools_tests.py diff --git a/.github/workflows/neuroconv_deployment_aws_tests.yml b/.github/workflows/neuroconv_deployment_aws_tests.yml new file mode 100644 index 000000000..64aae5ec9 --- /dev/null +++ b/.github/workflows/neuroconv_deployment_aws_tests.yml @@ -0,0 +1,46 @@ +name: NeuroConv Deployment AWS Tests +on: + schedule: + - cron: "0 16 * * 3" # Weekly at noon on Wednesday + workflow_dispatch: + +concurrency: # Cancel previous workflows on the same pull request + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + RCLONE_DRIVE_ACCESS_TOKEN: ${{ secrets.RCLONE_DRIVE_ACCESS_TOKEN }} + RCLONE_DRIVE_REFRESH_TOKEN: ${{ secrets.RCLONE_DRIVE_REFRESH_TOKEN }} + RCLONE_EXPIRY_TOKEN: ${{ secrets.RCLONE_EXPIRY_TOKEN }} + DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} + +jobs: + run: + name: ${{ matrix.os }} Python ${{ matrix.python-version }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + python-version: ["3.12"] + os: [ubuntu-latest] + steps: + - uses: actions/checkout@v4 + - run: git fetch --prune --unshallow --tags + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Global Setup + run: | + python -m pip install -U pip # Official recommended way + git config --global user.email "CI@example.com" + git config --global user.name "CI Almighty" + + - name: Install AWS requirements + run: pip install .[aws,test] + + - name: Run NeuroConv Deployment on AWS tests + run: pytest -rsx -n auto tests/test_on_data/test_yaml/neuroconv_deployment_aws_tools_tests.py diff --git a/.github/workflows/rclone_aws_tests.yml b/.github/workflows/rclone_aws_tests.yml new file mode 100644 index 000000000..bcfbeb5c7 --- /dev/null +++ b/.github/workflows/rclone_aws_tests.yml @@ -0,0 +1,46 @@ +name: Rclone AWS Tests +on: + schedule: + - cron: "0 16 * * 2" # Weekly at noon on Tuesday + workflow_dispatch: + +concurrency: # Cancel previous workflows on the same pull request + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + RCLONE_DRIVE_ACCESS_TOKEN: ${{ secrets.RCLONE_DRIVE_ACCESS_TOKEN }} + RCLONE_DRIVE_REFRESH_TOKEN: ${{ secrets.RCLONE_DRIVE_REFRESH_TOKEN }} + RCLONE_EXPIRY_TOKEN: ${{ secrets.RCLONE_EXPIRY_TOKEN }} + DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} + +jobs: + run: + name: ${{ matrix.os }} Python ${{ matrix.python-version }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + python-version: ["3.12"] + os: [ubuntu-latest] + steps: + - uses: actions/checkout@v4 + - run: git fetch --prune --unshallow --tags + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Global Setup + run: | + python -m pip install -U pip # Official recommended way + git config --global user.email "CI@example.com" + git config --global user.name "CI Almighty" + + - name: Install AWS requirements + run: pip install .[aws,test] + + - name: Run RClone on AWS tests + run: pytest -rsx -n auto tests/test_on_data/test_yaml/yaml_aws_tools_tests.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 543da42bd..a14185e1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -# Upcoming +# v0.6.6 (Upcoming) ## Deprecations * Completely removed compression settings from most places [PR #1126](https://github.com/catalystneuro/neuroconv/pull/1126) @@ -12,9 +12,13 @@ * Propagate the `unit_electrode_indices` argument from the spikeinterface tools to `BaseSortingExtractorInterface`. This allows users to map units to the electrode table when adding sorting data [PR #1124](https://github.com/catalystneuro/neuroconv/pull/1124) * Imaging interfaces have a new conversion option `always_write_timestamps` that can be used to force writing timestamps even if neuroconv's heuristics indicates regular sampling rate [PR #1125](https://github.com/catalystneuro/neuroconv/pull/1125) * Added .csv support to DeepLabCutInterface [PR #1140](https://github.com/catalystneuro/neuroconv/pull/1140) +* Added the `rclone_transfer_batch_job` helper function for executing Rclone data transfers in AWS Batch jobs. [PR #1085](https://github.com/catalystneuro/neuroconv/pull/1085) +* Added the `deploy_neuroconv_batch_job` helper function for deploying NeuroConv AWS Batch jobs. [PR #1086](https://github.com/catalystneuro/neuroconv/pull/1086) + ## Improvements * Use mixing tests for ecephy's mocks [PR #1136](https://github.com/catalystneuro/neuroconv/pull/1136) +* Use pytest format for dandi tests to avoid window permission error on teardown [PR #1151](https://github.com/catalystneuro/neuroconv/pull/1151) # v0.6.5 (November 1, 2024) @@ -38,6 +42,8 @@ * Avoid running link test when the PR is on draft [PR #1093](https://github.com/catalystneuro/neuroconv/pull/1093) * Centralize gin data preparation in a github action [PR #1095](https://github.com/catalystneuro/neuroconv/pull/1095) + + # v0.6.4 (September 17, 2024) ## Bug Fixes diff --git a/docs/api/tools.aws.rst b/docs/api/tools.aws.rst new file mode 100644 index 000000000..6f3ed0f86 --- /dev/null +++ b/docs/api/tools.aws.rst @@ -0,0 +1,5 @@ +.. _api_docs_aws_tools: + +AWS Tools +--------- +.. automodule:: neuroconv.tools.aws diff --git a/docs/api/tools.rst b/docs/api/tools.rst index 41515facd..793e0dbeb 100644 --- a/docs/api/tools.rst +++ b/docs/api/tools.rst @@ -13,3 +13,4 @@ Tools tools.signal_processing tools.data_transfers tools.nwb_helpers + tools.aws diff --git a/docs/user_guide/aws_demo.rst b/docs/user_guide/aws_demo.rst new file mode 100644 index 000000000..7002b7057 --- /dev/null +++ b/docs/user_guide/aws_demo.rst @@ -0,0 +1,136 @@ +NeuroConv AWS Demo +------------------ + +The :ref:`neuroconv.tools.aws ` submodule provides a number of tools for deploying NWB conversions +within AWS cloud services. These tools are primarily for facilitating source data transfers from cloud storage +sources to AWS, where the NWB conversion takes place, following by immediate direct upload to the `Dandi Archive `_. + +The following is an explicit demonstration of how to use these to create a pipeline to run a remote data conversion. + +This tutorial relies on setting up several cloud-based aspects ahead of time: + +a. Download some of the GIN data from the main testing suite, see :ref:`example_data` for more +details. Specifically, you will need the ``spikeglx`` and ``phy`` folders. + +b. Have access to a `Google Drive `_ folder to mimic a typical remote storage +location. The example data from (a) only takes up about 20 MB of space, so ensure you have that available. In +practice, any `cloud storage provider that can be accessed via Rclone `_ can be used. + +c. Install `Rclone `_, run ``rclone config``, and follow all instructions while giving your +remote the name ``test_google_drive_remote``. This step is necessary to provide the necessary credentials to access +the Google Drive folder from other locations by creating a file called ``rclone.conf``. You can find the path to +file, which you will need for a later step, by running ``rclone config file``. + +d. Have access to an `AWS account `_. Then, from +the `AWS console `_, sign in and navigate to the "IAM" page. Here, you will +generate some credentials by creating a new user with programmatic access. Save your access key and secret key +somewhere safe (such as installing the `AWS CLI `_ and running ``aws configure`` +to store the values on your local device). + +e. Have access to an account on both the `staging/testing server `_ (you +will probably want one on the main archive as well, but please do not upload demonstration data to the primary +server). This request can take a few days for the admin team to process. Once you have access, you will need +to create a new Dandiset on the staging server and record the six-digit Dandiset ID. + +.. warning:: + + *Cloud costs*. While the operations deployed on your behalf by NeuroConv are optimized to the best extent we can, cloud services can still become expensive. Please be aware of the costs associated with running these services and ensure you have the necessary permissions and budget to run these operations. While NeuroConv makes every effort to ensure there are no stalled resources, it is ultimately your responsibility to monitor and manage these resources. We recommend checking the AWS dashboards regularly while running these operations, manually removing any spurious resources, and setting up billing alerts to ensure you do not exceed your budget. + +Then, to setup the remaining steps of the tutorial: + +1. In your Google Drive, make a new folder for this demo conversion named ``demo_neuroconv_aws`` at the outermost +level (not nested in any other folders). + +2. Create a file on your local device named ``demo_neuroconv_aws.yml`` with the following content: + +.. code-block:: yaml + + metadata: + NWBFile: + lab: My Lab + institution: My Institution + + data_interfaces: + ap: SpikeGLXRecordingInterface + phy: PhySortingInterface + + upload_to_dandiset: "< enter your six-digit Dandiset ID here >" + + experiments: + my_experiment: + metadata: + NWBFile: + session_description: My session. + + sessions: + - source_data: + ap: + file_path: spikeglx/Noise4Sam_g0/Noise4Sam_g0_imec0/Noise4Sam_g0_t0.imec0.ap.bin + metadata: + NWBFile: + session_start_time: "2020-10-10T21:19:09+00:00" + Subject: + subject_id: "1" + sex: F + age: P35D + species: Mus musculus + - metadata: + NWBFile: + session_start_time: "2020-10-10T21:19:09+00:00" + Subject: + subject_id: "002" + sex: F + age: P35D + species: Mus musculus + source_data: + phy: + folder_path: phy/phy_example_0/ + + +3. Copy and paste the ``Noise4Sam_g0`` and ``phy_example_0`` folders from the :ref:`example_data` into this demo +folder so that you have the following structure... + +.. code:: + + demo_neuroconv_aws/ + ¦ demo_output/ + ¦ spikeglx/ + ¦ +-- Noise4Sam_g0/ + ¦ +-- ... # .nidq streams + ¦ ¦ +-- Noise4Sam_g0_imec0/ + ¦ ¦ +-- Noise4Sam_g0_t0.imec0.ap.bin + ¦ ¦ +-- Noise4Sam_g0_t0.imec0.ap.meta + ¦ ¦ +-- ... # .lf streams + ¦ phy/ + ¦ +-- phy_example_0/ + ¦ ¦ +-- ... # The various file contents from the example Phy folder + +4. Now run the following Python code to deploy the AWS Batch job: + +.. code:: python + + from neuroconv.tools.aws import deploy_neuroconv_batch_job + + rclone_command = ( + "rclone copy test_google_drive_remote:demo_neuroconv_aws /mnt/efs/source " + "--verbose --progress --config ./rclone.conf" + ) + + # Remember - you can find this via `rclone config file` + rclone_config_file_path = "/path/to/rclone.conf" + + yaml_specification_file_path = "/path/to/demo_neuroconv_aws.yml" + + job_name = "demo_deploy_neuroconv_batch_job" + efs_volume_name = "demo_deploy_neuroconv_batch_job" + deploy_neuroconv_batch_job( + rclone_command=rclone_command, + yaml_specification_file_path=yaml_specification_file_path, + job_name=job_name, + efs_volume_name=efs_volume_name, + rclone_config_file_path=rclone_config_file_path, + ) + +Voilà! If everything occurred successfully, you should eventually (~2-10 minutes) see the files uploaded to your +Dandiset on the staging server. You should also be able to monitor the resources running in the AWS Batch dashboard +as well as on the DynamoDB table. diff --git a/docs/user_guide/index.rst b/docs/user_guide/index.rst index 4077f49be..bf9aaf253 100644 --- a/docs/user_guide/index.rst +++ b/docs/user_guide/index.rst @@ -27,3 +27,4 @@ and synchronize data across multiple sources. backend_configuration yaml docker_demo + aws_demo diff --git a/src/neuroconv/tools/aws/__init__.py b/src/neuroconv/tools/aws/__init__.py index d40ddb2dd..70a42cbf5 100644 --- a/src/neuroconv/tools/aws/__init__.py +++ b/src/neuroconv/tools/aws/__init__.py @@ -1,3 +1,9 @@ from ._submit_aws_batch_job import submit_aws_batch_job +from ._rclone_transfer_batch_job import rclone_transfer_batch_job +from ._deploy_neuroconv_batch_job import deploy_neuroconv_batch_job -__all__ = ["submit_aws_batch_job"] +__all__ = [ + "submit_aws_batch_job", + "rclone_transfer_batch_job", + "deploy_neuroconv_batch_job", +] diff --git a/src/neuroconv/tools/aws/_deploy_neuroconv_batch_job.py b/src/neuroconv/tools/aws/_deploy_neuroconv_batch_job.py new file mode 100644 index 000000000..1df86d957 --- /dev/null +++ b/src/neuroconv/tools/aws/_deploy_neuroconv_batch_job.py @@ -0,0 +1,241 @@ +"""Collection of helper functions for deploying NeuroConv in EC2 Batch jobs on AWS.""" + +import os +import time +import uuid +import warnings +from typing import Optional + +import boto3 +from pydantic import FilePath, validate_call + +from ._rclone_transfer_batch_job import rclone_transfer_batch_job +from ._submit_aws_batch_job import submit_aws_batch_job + +_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"] + + +@validate_call +def deploy_neuroconv_batch_job( + *, + rclone_command: str, + yaml_specification_file_path: FilePath, + job_name: str, + efs_volume_name: str, + rclone_config_file_path: Optional[FilePath] = None, + status_tracker_table_name: str = "neuroconv_batch_status_tracker", + compute_environment_name: str = "neuroconv_batch_environment", + job_queue_name: str = "neuroconv_batch_queue", + job_definition_name: Optional[str] = None, + minimum_worker_ram_in_gib: int = 16, # Higher than previous recommendations for safer buffering room + minimum_worker_cpus: int = 4, + region: Optional[str] = None, +) -> dict[str, str]: + """ + Submit a job to AWS Batch for processing. + + Requires AWS credentials saved to files in the `~/.aws/` folder or set as environment variables. + + Parameters + ---------- + rclone_command : str + The command to pass directly to Rclone running on the EC2 instance. + E.g.: "rclone copy my_drive:testing_rclone /mnt/efs/source" + Must move data from or to '/mnt/efs/source'. + yaml_specification_file_path : FilePath + The path to the YAML file containing the NeuroConv specification. + job_name : str + The name of the job to submit. + efs_volume_name : str + The name of an EFS volume to be created and attached to the job. + The path exposed to the container will always be `/mnt/efs`. + rclone_config_file_path : FilePath, optional + The path to the Rclone configuration file to use for the job. + If unspecified, method will attempt to find the file in `~/.rclone` and will raise an error if it cannot. + status_tracker_table_name : str, default: "neuroconv_batch_status_tracker" + The name of the DynamoDB table to use for tracking job status. + compute_environment_name : str, default: "neuroconv_batch_environment" + The name of the compute environment to use for the job. + job_queue_name : str, default: "neuroconv_batch_queue" + The name of the job queue to use for the job. + job_definition_name : str, optional + The name of the job definition to use for the job. + If unspecified, a name starting with 'neuroconv_batch_' will be generated. + minimum_worker_ram_in_gib : int, default: 4 + The minimum amount of base worker memory required to run this job. + Determines the EC2 instance type selected by the automatic 'best fit' selector. + Recommended to be several GiB to allow comfortable buffer space for data chunk iterators. + minimum_worker_cpus : int, default: 4 + The minimum number of CPUs required to run this job. + A minimum of 4 is required, even if only one will be used in the actual process. + region : str, optional + The AWS region to use for the job. + If not provided, we will attempt to load the region from your local AWS configuration. + If that file is not found on your system, we will default to "us-east-2", the location of the DANDI Archive. + + Returns + ------- + info : dict + A dictionary containing information about this AWS Batch job. + + info["rclone_job_submission_info"] is the return value of `neuroconv.tools.aws.rclone_transfer_batch_job`. + info["neuroconv_job_submission_info"] is the return value of `neuroconv.tools.aws.submit_job`. + """ + efs_volume_name = efs_volume_name or f"neuroconv_batch_efs_volume_{uuid.uuid4().hex[:4]}" + region = region or "us-east-2" + + if "/mnt/efs/source" not in rclone_command: + message = ( + f"The Rclone command '{rclone_command}' does not contain a reference to '/mnt/efs/source'. " + "Without utilizing the EFS mount, the instance is unlikely to have enough local disk space. " + "The subfolder 'source' is also required to eliminate ambiguity in the transfer process." + ) + raise ValueError(message) + + rclone_job_name = f"{job_name}_rclone_transfer" + rclone_job_submission_info = rclone_transfer_batch_job( + rclone_command=rclone_command, + job_name=rclone_job_name, + efs_volume_name=efs_volume_name, + rclone_config_file_path=rclone_config_file_path, + region=region, + ) + rclone_job_id = rclone_job_submission_info["job_submission_info"]["jobId"] + + # Give the EFS and other aspects time to spin up before submitting next dependent job + # (Otherwise, good chance that duplicate EFS will be created) + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) + + batch_client = boto3.client( + service_name="batch", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + efs_client = boto3.client( + service_name="efs", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + + available_efs_volumes = efs_client.describe_file_systems() + matching_efs_volumes = [ + file_system + for file_system in available_efs_volumes["FileSystems"] + for tag in file_system["Tags"] + if tag["Key"] == "Name" and tag["Value"] == efs_volume_name + ] + max_iterations = 10 + iteration = 0 + while len(matching_efs_volumes) == 0 and iteration < max_iterations: + iteration += 1 + time.sleep(30) + + matching_efs_volumes = [ + file_system + for file_system in available_efs_volumes["FileSystems"] + for tag in file_system["Tags"] + if tag["Key"] == "Name" and tag["Value"] == efs_volume_name + ] + + if len(matching_efs_volumes) == 0: + message = f"Unable to create EFS volume '{efs_volume_name}' after {max_iterations} attempts!" + raise ValueError(message) + + docker_image = "ghcr.io/catalystneuro/neuroconv_yaml_variable:latest" + + with open(file=yaml_specification_file_path, mode="r") as io: + yaml_specification_file_stream = io.read() + + neuroconv_job_name = f"{job_name}_neuroconv_deployment" + job_dependencies = [{"jobId": rclone_job_id, "type": "SEQUENTIAL"}] + neuroconv_job_submission_info = submit_aws_batch_job( + job_name=neuroconv_job_name, + docker_image=docker_image, + environment_variables={ + "NEUROCONV_YAML": yaml_specification_file_stream, + "NEUROCONV_DATA_PATH": "/mnt/efs/source", + # TODO: would prefer this to use subfolders for source and output, but need logic for YAML + # related code to create them if missing (hard to send EFS this command directly) + # (the code was included in this PR, but a release cycle needs to complete for the docker images before + # it can be used here) + # "NEUROCONV_OUTPUT_PATH": "/mnt/efs/output", + "NEUROCONV_OUTPUT_PATH": "/mnt/efs", + }, + efs_volume_name=efs_volume_name, + job_dependencies=job_dependencies, + status_tracker_table_name=status_tracker_table_name, + compute_environment_name=compute_environment_name, + job_queue_name=job_queue_name, + job_definition_name=job_definition_name, + minimum_worker_ram_in_gib=minimum_worker_ram_in_gib, + minimum_worker_cpus=minimum_worker_cpus, + region=region, + ) + + info = { + "rclone_job_submission_info": rclone_job_submission_info, + "neuroconv_job_submission_info": neuroconv_job_submission_info, + } + + # TODO: would be better to spin up third dependent job to clean up EFS volume after neuroconv job completes + neuroconv_job_id = neuroconv_job_submission_info["job_submission_info"]["jobId"] + job = None + max_retries = 60 * 12 # roughly 12 hours max runtime (aside from internet loss) for checking cleanup + sleep_time = 60 # 1 minute + retry = 0.0 + time.sleep(sleep_time) + while retry < max_retries: + job_description_response = batch_client.describe_jobs(jobs=[neuroconv_job_id]) + if job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200: + # sleep but only increment retry by a small amount + # (really should only apply if internet connection is temporarily lost) + retry += 0.1 + time.sleep(sleep_time) + + job = job_description_response["jobs"][0] + if job["status"] in _RETRY_STATES: + retry += 1.0 + time.sleep(sleep_time) + elif job["status"] == "SUCCEEDED": + break + + if retry >= max_retries: + message = ( + "Maximum retries reached for checking job completion for automatic EFS cleanup! " + "Please delete the EFS volume manually." + ) + warnings.warn(message=message, stacklevel=2) + + return info + + # Cleanup EFS after job is complete - must clear mount targets first, then wait before deleting the volume + efs_volumes = efs_client.describe_file_systems() + matching_efs_volumes = [ + file_system + for file_system in efs_volumes["FileSystems"] + for tag in file_system["Tags"] + if tag["Key"] == "Name" and tag["Value"] == efs_volume_name + ] + if len(matching_efs_volumes) != 1: + message = ( + f"Expected to find exactly one EFS volume with name '{efs_volume_name}', " + f"but found {len(matching_efs_volumes)}\n\n{matching_efs_volumes=}\n\n!" + "You will have to delete these manually." + ) + warnings.warn(message=message, stacklevel=2) + + return info + + efs_volume = matching_efs_volumes[0] + efs_id = efs_volume["FileSystemId"] + mount_targets = efs_client.describe_mount_targets(FileSystemId=efs_id) + for mount_target in mount_targets["MountTargets"]: + efs_client.delete_mount_target(MountTargetId=mount_target["MountTargetId"]) + + time.sleep(sleep_time) + efs_client.delete_file_system(FileSystemId=efs_id) + + return info diff --git a/src/neuroconv/tools/aws/_rclone_transfer_batch_job.py b/src/neuroconv/tools/aws/_rclone_transfer_batch_job.py new file mode 100644 index 000000000..65bef7824 --- /dev/null +++ b/src/neuroconv/tools/aws/_rclone_transfer_batch_job.py @@ -0,0 +1,113 @@ +"""Collection of helper functions for assessing and performing automated data transfers related to AWS.""" + +import warnings +from typing import Optional + +from pydantic import FilePath, validate_call + +from ._submit_aws_batch_job import submit_aws_batch_job + + +@validate_call +def rclone_transfer_batch_job( + *, + rclone_command: str, + job_name: str, + efs_volume_name: str, + rclone_config_file_path: Optional[FilePath] = None, + status_tracker_table_name: str = "neuroconv_batch_status_tracker", + compute_environment_name: str = "neuroconv_batch_environment", + job_queue_name: str = "neuroconv_batch_queue", + job_definition_name: Optional[str] = None, + minimum_worker_ram_in_gib: int = 4, + minimum_worker_cpus: int = 4, + submission_id: Optional[str] = None, + region: Optional[str] = None, +) -> dict[str, str]: + """ + Submit a job to AWS Batch for processing. + + Requires AWS credentials saved to files in the `~/.aws/` folder or set as environment variables. + + Parameters + ---------- + rclone_command : str + The command to pass directly to Rclone running on the EC2 instance. + E.g.: "rclone copy my_drive:testing_rclone /mnt/efs" + Must move data from or to '/mnt/efs'. + job_name : str + The name of the job to submit. + efs_volume_name : str + The name of an EFS volume to be created and attached to the job. + The path exposed to the container will always be `/mnt/efs`. + rclone_config_file_path : FilePath, optional + The path to the Rclone configuration file to use for the job. + If unspecified, method will attempt to find the file in `~/.rclone` and will raise an error if it cannot. + status_tracker_table_name : str, default: "neuroconv_batch_status_tracker" + The name of the DynamoDB table to use for tracking job status. + compute_environment_name : str, default: "neuroconv_batch_environment" + The name of the compute environment to use for the job. + job_queue_name : str, default: "neuroconv_batch_queue" + The name of the job queue to use for the job. + job_definition_name : str, optional + The name of the job definition to use for the job. + If unspecified, a name starting with 'neuroconv_batch_' will be generated. + minimum_worker_ram_in_gib : int, default: 4 + The minimum amount of base worker memory required to run this job. + Determines the EC2 instance type selected by the automatic 'best fit' selector. + Recommended to be several GiB to allow comfortable buffer space for data chunk iterators. + minimum_worker_cpus : int, default: 4 + The minimum number of CPUs required to run this job. + A minimum of 4 is required, even if only one will be used in the actual process. + submission_id : str, optional + The unique ID to pair with this job submission when tracking the status via DynamoDB. + Defaults to a random UUID4. + region : str, optional + The AWS region to use for the job. + If not provided, we will attempt to load the region from your local AWS configuration. + If that file is not found on your system, we will default to "us-east-2", the location of the DANDI Archive. + + Returns + ------- + info : dict + A dictionary containing information about this AWS Batch job. + + info["job_submission_info"] is the return value of `boto3.client.submit_job` which contains the job ID. + info["table_submission_info"] is the initial row data inserted into the DynamoDB status tracking table. + """ + docker_image = "ghcr.io/catalystneuro/rclone_with_config:latest" + + if "/mnt/efs" not in rclone_command: + message = ( + f"The Rclone command '{rclone_command}' does not contain a reference to '/mnt/efs'. " + "Without utilizing the EFS mount, the instance is unlikely to have enough local disk space." + ) + warnings.warn(message=message, stacklevel=2) + + rclone_config_file_path = rclone_config_file_path or pathlib.Path.home() / ".rclone" / "rclone.conf" + if not rclone_config_file_path.exists(): + raise FileNotFoundError( + f"Rclone configuration file not found at: {rclone_config_file_path}! " + "Please check that `rclone config` successfully created the file." + ) + with open(file=rclone_config_file_path, mode="r") as io: + rclone_config_file_stream = io.read() + + region = region or "us-east-2" + + info = submit_aws_batch_job( + job_name=job_name, + docker_image=docker_image, + environment_variables={"RCLONE_CONFIG": rclone_config_file_stream, "RCLONE_COMMAND": rclone_command}, + efs_volume_name=efs_volume_name, + status_tracker_table_name=status_tracker_table_name, + compute_environment_name=compute_environment_name, + job_queue_name=job_queue_name, + job_definition_name=job_definition_name, + minimum_worker_ram_in_gib=minimum_worker_ram_in_gib, + minimum_worker_cpus=minimum_worker_cpus, + submission_id=submission_id, + region=region, + ) + + return info diff --git a/src/neuroconv/tools/aws/_submit_aws_batch_job.py b/src/neuroconv/tools/aws/_submit_aws_batch_job.py index 9e3ba0488..cae25f3ce 100644 --- a/src/neuroconv/tools/aws/_submit_aws_batch_job.py +++ b/src/neuroconv/tools/aws/_submit_aws_batch_job.py @@ -171,7 +171,9 @@ def submit_aws_batch_job( job_dependencies = job_dependencies or [] container_overrides = dict() if environment_variables is not None: - container_overrides["environment"] = [{key: value} for key, value in environment_variables.items()] + container_overrides["environment"] = [ + {"name": key, "value": value} for key, value in environment_variables.items() + ] if commands is not None: container_overrides["command"] = commands @@ -294,7 +296,7 @@ def _ensure_compute_environment_exists( The AWS Batch client to use for the job. max_retries : int, default: 12 If the compute environment does not already exist, then this is the maximum number of times to synchronously - check for its successful creation before erroring. + check for its successful creation before raising an error. This is essential for a clean setup of the entire pipeline, or else later steps might error because they tried to launch before the compute environment was ready. """ @@ -462,11 +464,14 @@ def _create_or_get_efs_id( if tag["Key"] == "Name" and tag["Value"] == efs_volume_name ] - if len(matching_efs_volumes) > 1: + if len(matching_efs_volumes) == 1: efs_volume = matching_efs_volumes[0] efs_id = efs_volume["FileSystemId"] return efs_id + elif len(matching_efs_volumes) > 1: + message = f"Multiple EFS volumes with the name '{efs_volume_name}' were found!\n\n{matching_efs_volumes=}\n" + raise ValueError(message) # Existing volume not found - must create a fresh one and set mount targets on it efs_volume = efs_client.create_file_system( @@ -504,7 +509,7 @@ def _create_or_get_efs_id( return efs_id -def _generate_job_definition_name( +def generate_job_definition_name( *, docker_image: str, minimum_worker_ram_in_gib: int, @@ -513,9 +518,7 @@ def _generate_job_definition_name( ) -> str: # pragma: no cover """ Generate a job definition name for the AWS Batch job. - Note that Docker images don't strictly require a tag to be pulled or used - 'latest' is always used by default. - Parameters ---------- docker_image : str @@ -527,11 +530,13 @@ def _generate_job_definition_name( minimum_worker_cpus : int The minimum number of CPUs required to run this job. A minimum of 4 is required, even if only one will be used in the actual process. + efs_id : Optional[str] + The ID of the EFS filesystem to mount, if any. """ - docker_tags = docker_image.split(":")[1:] - docker_tag = docker_tags[0] if len(docker_tags) > 1 else None - parsed_docker_image_name = docker_image.replace(":", "-") # AWS Batch does not allow colons in job definition names - + # AWS Batch does not allow colons, slashes, or periods in job definition names + parsed_docker_image_name = str(docker_image) + for disallowed_character in [":", "/", r"/", "."]: + parsed_docker_image_name = parsed_docker_image_name.replace(disallowed_character, "-") job_definition_name = f"neuroconv_batch" job_definition_name += f"_{parsed_docker_image_name}-image" job_definition_name += f"_{minimum_worker_ram_in_gib}-GiB-RAM" @@ -540,8 +545,6 @@ def _generate_job_definition_name( job_definition_name += f"_{efs_id}" if docker_tag is None or docker_tag == "latest": date = datetime.now().strftime("%Y-%m-%d") - job_definition_name += f"_created-on-{date}" - return job_definition_name @@ -639,9 +642,9 @@ def _ensure_job_definition_exists_and_get_arn( }, }, ] - mountPoints = [{"containerPath": "/mnt/efs/", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}] + mountPoints = [{"containerPath": "/mnt/efs", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}] - # batch_client.register_job_definition() is not synchronous and so we need to wait a bit afterwards + # batch_client.register_job_definition is not synchronous and so we need to wait a bit afterwards batch_client.register_job_definition( jobDefinitionName=job_definition_name, type="container", diff --git a/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py b/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py index 10e33cbc8..0e2f05f74 100644 --- a/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py +++ b/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py @@ -73,10 +73,16 @@ def run_conversion_from_yaml( if data_folder_path is None: data_folder_path = Path(specification_file_path).parent + else: + data_folder_path = Path(data_folder_path) + data_folder_path.mkdir(exist_ok=True) + if output_folder_path is None: - output_folder_path = Path(specification_file_path).parent + output_folder_path = specification_file_path.parent else: output_folder_path = Path(output_folder_path) + output_folder_path.mkdir(exist_ok=True) + specification = load_dict_from_file(file_path=specification_file_path) schema_folder = Path(__file__).parent.parent.parent / "schemas" specification_schema = load_dict_from_file(file_path=schema_folder / "yaml_conversion_specification_schema.json") diff --git a/tests/docker_rclone_with_config_cli.py b/tests/docker_rclone_with_config_cli.py index ed472bdf2..9b1e265dd 100644 --- a/tests/docker_rclone_with_config_cli.py +++ b/tests/docker_rclone_with_config_cli.py @@ -61,7 +61,8 @@ def test_direct_usage_of_rclone_with_config(self): os.environ["RCLONE_CONFIG"] = rclone_config_file_stream os.environ["RCLONE_COMMAND"] = ( - f"rclone copy test_google_drive_remote:testing_rclone_with_config {self.test_folder} --verbose --progress --config ./rclone.conf" + f"rclone copy test_google_drive_remote:testing_rclone_with_config {self.test_folder} " + "--verbose --progress --config ./rclone.conf" ) command = ( diff --git a/tests/test_minimal/test_tools/aws_tools.py b/tests/test_minimal/test_tools/aws_tools_tests.py similarity index 100% rename from tests/test_minimal/test_tools/aws_tools.py rename to tests/test_minimal/test_tools/aws_tools_tests.py diff --git a/tests/test_minimal/test_tools/dandi_transfer_tools.py b/tests/test_minimal/test_tools/dandi_transfer_tools.py index df4226d10..da35725a0 100644 --- a/tests/test_minimal/test_tools/dandi_transfer_tools.py +++ b/tests/test_minimal/test_tools/dandi_transfer_tools.py @@ -1,13 +1,9 @@ import os import sys from datetime import datetime -from pathlib import Path from platform import python_version as get_python_version -from shutil import rmtree -from tempfile import mkdtemp import pytest -from hdmf.testing import TestCase from pynwb import NWBHDF5IO from neuroconv.tools.data_transfers import automatic_dandi_upload @@ -24,80 +20,63 @@ not HAVE_DANDI_KEY, reason="You must set your DANDI_API_KEY to run this test!", ) -class TestAutomaticDANDIUpload(TestCase): - def setUp(self): - self.tmpdir = Path(mkdtemp()) - self.nwb_folder_path = self.tmpdir / "test_nwb" - self.nwb_folder_path.mkdir() - metadata = get_default_nwbfile_metadata() - metadata["NWBFile"].update( - session_start_time=datetime.now().astimezone(), - session_id=f"test-automatic-upload-{sys.platform}-{get_python_version().replace('.', '-')}", - ) - metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) - with NWBHDF5IO(path=self.nwb_folder_path / "test_nwb_1.nwb", mode="w") as io: - io.write(make_nwbfile_from_metadata(metadata=metadata)) +def test_automatic_dandi_upload(tmp_path): + nwb_folder_path = tmp_path / "test_nwb" + nwb_folder_path.mkdir() + metadata = get_default_nwbfile_metadata() + metadata["NWBFile"].update( + session_start_time=datetime.now().astimezone(), + session_id=f"test-automatic-upload-{sys.platform}-{get_python_version().replace('.', '-')}", + ) + metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) + with NWBHDF5IO(path=nwb_folder_path / "test_nwb_1.nwb", mode="w") as io: + io.write(make_nwbfile_from_metadata(metadata=metadata)) - def tearDown(self): - rmtree(self.tmpdir) - - def test_automatic_dandi_upload(self): - automatic_dandi_upload(dandiset_id="200560", nwb_folder_path=self.nwb_folder_path, staging=True) + automatic_dandi_upload(dandiset_id="200560", nwb_folder_path=nwb_folder_path, staging=True) @pytest.mark.skipif( not HAVE_DANDI_KEY, reason="You must set your DANDI_API_KEY to run this test!", ) -class TestAutomaticDANDIUploadNonParallel(TestCase): - def setUp(self): - self.tmpdir = Path(mkdtemp()) - self.nwb_folder_path = self.tmpdir / "test_nwb" - self.nwb_folder_path.mkdir() - metadata = get_default_nwbfile_metadata() - metadata["NWBFile"].update( - session_start_time=datetime.now().astimezone(), - session_id=f"test-automatic-upload-{sys.platform}-{get_python_version().replace('.', '-')}-non-parallel", - ) - metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) - with NWBHDF5IO(path=self.nwb_folder_path / "test_nwb_2.nwb", mode="w") as io: - io.write(make_nwbfile_from_metadata(metadata=metadata)) - - def tearDown(self): - rmtree(self.tmpdir) +def test_automatic_dandi_upload_non_parallel(tmp_path): + nwb_folder_path = tmp_path / "test_nwb" + nwb_folder_path.mkdir() + metadata = get_default_nwbfile_metadata() + metadata["NWBFile"].update( + session_start_time=datetime.now().astimezone(), + session_id=(f"test-automatic-upload-{sys.platform}-" f"{get_python_version().replace('.', '-')}-non-parallel"), + ) + metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) + with NWBHDF5IO(path=nwb_folder_path / "test_nwb_2.nwb", mode="w") as io: + io.write(make_nwbfile_from_metadata(metadata=metadata)) - def test_automatic_dandi_upload_non_parallel(self): - automatic_dandi_upload( - dandiset_id="200560", nwb_folder_path=self.nwb_folder_path, staging=True, number_of_jobs=1 - ) + automatic_dandi_upload(dandiset_id="200560", nwb_folder_path=nwb_folder_path, staging=True, number_of_jobs=1) @pytest.mark.skipif( not HAVE_DANDI_KEY, reason="You must set your DANDI_API_KEY to run this test!", ) -class TestAutomaticDANDIUploadNonParallelNonThreaded(TestCase): - def setUp(self): - self.tmpdir = Path(mkdtemp()) - self.nwb_folder_path = self.tmpdir / "test_nwb" - self.nwb_folder_path.mkdir() - metadata = get_default_nwbfile_metadata() - metadata["NWBFile"].update( - session_start_time=datetime.now().astimezone(), - session_id=f"test-automatic-upload-{sys.platform}-{get_python_version().replace('.', '-')}-non-parallel-non-threaded", - ) - metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) - with NWBHDF5IO(path=self.nwb_folder_path / "test_nwb_3.nwb", mode="w") as io: - io.write(make_nwbfile_from_metadata(metadata=metadata)) - - def tearDown(self): - rmtree(self.tmpdir) +def test_automatic_dandi_upload_non_parallel_non_threaded(tmp_path): + nwb_folder_path = tmp_path / "test_nwb" + nwb_folder_path.mkdir() + metadata = get_default_nwbfile_metadata() + metadata["NWBFile"].update( + session_start_time=datetime.now().astimezone(), + session_id=( + f"test-automatic-upload-{sys.platform}-" + f"{get_python_version().replace('.', '-')}-non-parallel-non-threaded" + ), + ) + metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) + with NWBHDF5IO(path=nwb_folder_path / "test_nwb_3.nwb", mode="w") as io: + io.write(make_nwbfile_from_metadata(metadata=metadata)) - def test_automatic_dandi_upload_non_parallel_non_threaded(self): - automatic_dandi_upload( - dandiset_id="200560", - nwb_folder_path=self.nwb_folder_path, - staging=True, - number_of_jobs=1, - number_of_threads=1, - ) + automatic_dandi_upload( + dandiset_id="200560", + nwb_folder_path=nwb_folder_path, + staging=True, + number_of_jobs=1, + number_of_threads=1, + ) diff --git a/tests/test_on_data/test_yaml/neuroconv_deployment_aws_tools_tests.py b/tests/test_on_data/test_yaml/neuroconv_deployment_aws_tools_tests.py new file mode 100644 index 000000000..f58865d26 --- /dev/null +++ b/tests/test_on_data/test_yaml/neuroconv_deployment_aws_tools_tests.py @@ -0,0 +1,167 @@ +import os +import pathlib +import time +import unittest + +import boto3 + +from neuroconv.tools.aws import deploy_neuroconv_batch_job + +from ..setup_paths import OUTPUT_PATH + +_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"] + + +class TestNeuroConvDeploymentBatchJob(unittest.TestCase): + """ + To allow this test to work, the developer must create a folder on the outer level of their personal Google Drive + called 'testing_rclone_spikegl_and_phy' with the following structure: + + testing_rclone_spikeglx_and_phy + ├── ci_tests + ├──── spikeglx + ├────── Noise4Sam_g0 + ├──── phy + ├────── phy_example_0 + + Where 'Noise4Sam' is from the 'spikeglx/Noise4Sam_g0' GIN ephys dataset and 'phy_example_0' is likewise from the + 'phy' folder of the same dataset. + + Then the developer must install Rclone and call `rclone config` to generate tokens in their own `rclone.conf` file. + The developer can easily find the location of the config file on their system using `rclone config file`. + """ + + test_folder = OUTPUT_PATH / "aws_rclone_tests" + test_config_file_path = test_folder / "rclone.conf" + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) + region = "us-east-2" + + def setUp(self): + self.test_folder.mkdir(exist_ok=True) + + # Pretend as if .conf file already exists on the system (created via interactive `rclone config` command) + token_dictionary = dict( + access_token=os.environ["RCLONE_DRIVE_ACCESS_TOKEN"], + token_type="Bearer", + refresh_token=os.environ["RCLONE_DRIVE_REFRESH_TOKEN"], + expiry=os.environ["RCLONE_EXPIRY_TOKEN"], + ) + token_string = str(token_dictionary).replace("'", '"').replace(" ", "") + rclone_config_contents = [ + "[test_google_drive_remote]\n", + "type = drive\n", + "scope = drive\n", + f"token = {token_string}\n", + "team_drive = \n", + "\n", + ] + with open(file=self.test_config_file_path, mode="w") as io: + io.writelines(rclone_config_contents) + + def test_deploy_neuroconv_batch_job(self): + region = "us-east-2" + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) + + dynamodb_resource = boto3.resource( + service_name="dynamodb", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + batch_client = boto3.client( + service_name="batch", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + efs_client = boto3.client( + service_name="efs", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + # Assume no other tests of EFS volumes are fluctuating at the same time, otherwise make this more specific + efs_volumes_before = efs_client.describe_file_systems() + + rclone_command = ( + "rclone copy test_google_drive_remote:testing_rclone_spikeglx_and_phy/ci_tests /mnt/efs/source " + "--verbose --progress --config ./rclone.conf" # TODO: should just include this in helper function? + ) + + testing_base_folder_path = pathlib.Path(__file__).parent.parent.parent + yaml_specification_file_path = ( + testing_base_folder_path + / "test_on_data" + / "test_yaml" + / "conversion_specifications" + / "GIN_conversion_specification.yml" + ) + + rclone_config_file_path = self.test_config_file_path + + job_name = "test_deploy_neuroconv_batch_job" + efs_volume_name = "test_deploy_neuroconv_batch_job" + all_info = deploy_neuroconv_batch_job( + rclone_command=rclone_command, + yaml_specification_file_path=yaml_specification_file_path, + job_name=job_name, + efs_volume_name=efs_volume_name, + rclone_config_file_path=rclone_config_file_path, + ) + + # Wait additional time for AWS to clean up resources + time.sleep(120) + + info = all_info["neuroconv_job_submission_info"] + job_id = info["job_submission_info"]["jobId"] + job = None + max_retries = 10 + retry = 0 + while retry < max_retries: + job_description_response = batch_client.describe_jobs(jobs=[job_id]) + assert job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + jobs = job_description_response["jobs"] + assert len(jobs) == 1 + + job = jobs[0] + + if job["status"] in _RETRY_STATES: + retry += 1 + time.sleep(60) + else: + break + + # Check EFS cleaned up automatically + efs_volumes_after = efs_client.describe_file_systems() + assert len(efs_volumes_after["FileSystems"]) == len(efs_volumes_before["FileSystems"]) + + # Check normal job completion + expected_job_name = f"{job_name}_neuroconv_deployment" + assert job["jobName"] == expected_job_name + assert "neuroconv_batch_queue" in job["jobQueue"] + assert "fs-" in job["jobDefinition"] + assert job["status"] == "SUCCEEDED" + + status_tracker_table_name = "neuroconv_batch_status_tracker" + table = dynamodb_resource.Table(name=status_tracker_table_name) + table_submission_id = info["table_submission_info"]["id"] + + table_item_response = table.get_item(Key={"id": table_submission_id}) + assert table_item_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + table_item = table_item_response["Item"] + assert table_item["job_name"] == expected_job_name + assert table_item["job_id"] == job_id + assert table_item["status"] == "Job submitted..." + + table.update_item( + Key={"id": table_submission_id}, + AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed - cleaning up..."}}, + ) + + table.update_item( + Key={"id": table_submission_id}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}} + ) diff --git a/tests/test_on_data/test_yaml/yaml_aws_tools_tests.py b/tests/test_on_data/test_yaml/yaml_aws_tools_tests.py new file mode 100644 index 000000000..e767e516b --- /dev/null +++ b/tests/test_on_data/test_yaml/yaml_aws_tools_tests.py @@ -0,0 +1,179 @@ +import datetime +import os +import time +import unittest + +import boto3 + +from neuroconv.tools.aws import rclone_transfer_batch_job + +from ..setup_paths import OUTPUT_PATH + +_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"] + + +class TestRcloneTransferBatchJob(unittest.TestCase): + """ + To allow this test to work, the developer must create a folder on the outer level of their personal Google Drive + called 'testing_rclone_spikegl_and_phy' with the following structure: + + testing_rclone_spikeglx_and_phy + ├── ci_tests + ├──── spikeglx + ├────── Noise4Sam_g0 + ├──── phy + ├────── phy_example_0 + + Where 'Noise4Sam' is from the 'spikeglx/Noise4Sam_g0' GIN ephys dataset and 'phy_example_0' is likewise from the + 'phy' folder of the same dataset. + + Then the developer must install Rclone and call `rclone config` to generate tokens in their own `rclone.conf` file. + The developer can easily find the location of the config file on their system using `rclone config file`. + """ + + test_folder = OUTPUT_PATH / "aws_rclone_tests" + test_config_file_path = test_folder / "rclone.conf" + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) + region = "us-east-2" + efs_id = None + + def setUp(self): + self.test_folder.mkdir(exist_ok=True) + + # Pretend as if .conf file already exists on the system (created via interactive `rclone config` command) + token_dictionary = dict( + access_token=os.environ["RCLONE_DRIVE_ACCESS_TOKEN"], + token_type="Bearer", + refresh_token=os.environ["RCLONE_DRIVE_REFRESH_TOKEN"], + expiry=os.environ["RCLONE_EXPIRY_TOKEN"], + ) + token_string = str(token_dictionary).replace("'", '"').replace(" ", "") + rclone_config_contents = [ + "[test_google_drive_remote]\n", + "type = drive\n", + "scope = drive\n", + f"token = {token_string}\n", + "team_drive = \n", + "\n", + ] + with open(file=self.test_config_file_path, mode="w") as io: + io.writelines(rclone_config_contents) + + self.efs_client = boto3.client( + service_name="efs", + region_name=self.region, + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key, + ) + + def tearDown(self) -> None: + if self.efs_id is None: + return None + efs_client = self.efs_client + + # Cleanup EFS after testing is complete - must clear mount targets first, then wait before deleting the volume + # TODO: cleanup job definitions? (since built daily) + mount_targets = efs_client.describe_mount_targets(FileSystemId=self.efs_id) + for mount_target in mount_targets["MountTargets"]: + efs_client.delete_mount_target(MountTargetId=mount_target["MountTargetId"]) + + time.sleep(60) + efs_client.delete_file_system(FileSystemId=self.efs_id) + + def test_rclone_transfer_batch_job(self): + region = self.region + aws_access_key_id = self.aws_access_key_id + aws_secret_access_key = self.aws_secret_access_key + + dynamodb_resource = boto3.resource( + service_name="dynamodb", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + batch_client = boto3.client( + service_name="batch", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + efs_client = self.efs_client + + rclone_command = ( + "rclone copy test_google_drive_remote:testing_rclone_spikeglx_and_phy /mnt/efs " + "--verbose --progress --config ./rclone.conf" # TODO: should just include this in helper function? + ) + rclone_config_file_path = self.test_config_file_path + + today = datetime.datetime.now().date().isoformat() + job_name = f"test_rclone_transfer_batch_job_{today}" + efs_volume_name = "test_rclone_transfer_batch_efs" + + info = rclone_transfer_batch_job( + rclone_command=rclone_command, + job_name=job_name, + efs_volume_name=efs_volume_name, + rclone_config_file_path=rclone_config_file_path, + ) + + # Wait for AWS to process the job + time.sleep(60) + + job_id = info["job_submission_info"]["jobId"] + job = None + max_retries = 10 + retry = 0 + while retry < max_retries: + job_description_response = batch_client.describe_jobs(jobs=[job_id]) + assert job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + jobs = job_description_response["jobs"] + assert len(jobs) == 1 + + job = jobs[0] + + if job["status"] in _RETRY_STATES: + retry += 1 + time.sleep(60) + else: + break + + # Check EFS specific details + efs_volumes = efs_client.describe_file_systems() + matching_efs_volumes = [ + file_system + for file_system in efs_volumes["FileSystems"] + for tag in file_system["Tags"] + if tag["Key"] == "Name" and tag["Value"] == efs_volume_name + ] + assert len(matching_efs_volumes) == 1 + efs_volume = matching_efs_volumes[0] + self.efs_id = efs_volume["FileSystemId"] + + # Check normal job completion + assert job["jobName"] == job_name + assert "neuroconv_batch_queue" in job["jobQueue"] + assert "fs-" in job["jobDefinition"] + assert job["status"] == "SUCCEEDED" + + status_tracker_table_name = "neuroconv_batch_status_tracker" + table = dynamodb_resource.Table(name=status_tracker_table_name) + table_submission_id = info["table_submission_info"]["id"] + + table_item_response = table.get_item(Key={"id": table_submission_id}) + assert table_item_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + table_item = table_item_response["Item"] + assert table_item["job_name"] == job_name + assert table_item["job_id"] == job_id + assert table_item["status"] == "Job submitted..." + + table.update_item( + Key={"id": table_submission_id}, + AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed - cleaning up..."}}, + ) + + table.update_item( + Key={"id": table_submission_id}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}} + )