diff --git a/.github/workflows/aws_tests.yml b/.github/workflows/generic_aws_tests.yml similarity index 88% rename from .github/workflows/aws_tests.yml rename to .github/workflows/generic_aws_tests.yml index 0ecbb4d7b..20886a178 100644 --- a/.github/workflows/aws_tests.yml +++ b/.github/workflows/generic_aws_tests.yml @@ -11,7 +11,6 @@ concurrency: # Cancel previous workflows on the same pull request env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} jobs: run: @@ -36,8 +35,8 @@ jobs: git config --global user.email "CI@example.com" git config --global user.name "CI Almighty" - - name: Install full requirements + - name: Install AWS requirements run: pip install .[aws,test] - - name: Run subset of tests that use S3 live services - run: pytest -rsx -n auto tests/test_minimal/test_tools/aws_tools.py + - name: Run generic AWS tests + run: pytest -rsx -n auto tests/test_minimal/test_tools/aws_tools_tests.py diff --git a/.github/workflows/rclone_aws_tests.yml b/.github/workflows/rclone_aws_tests.yml new file mode 100644 index 000000000..bcfbeb5c7 --- /dev/null +++ b/.github/workflows/rclone_aws_tests.yml @@ -0,0 +1,46 @@ +name: Rclone AWS Tests +on: + schedule: + - cron: "0 16 * * 2" # Weekly at noon on Tuesday + workflow_dispatch: + +concurrency: # Cancel previous workflows on the same pull request + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + RCLONE_DRIVE_ACCESS_TOKEN: ${{ secrets.RCLONE_DRIVE_ACCESS_TOKEN }} + RCLONE_DRIVE_REFRESH_TOKEN: ${{ secrets.RCLONE_DRIVE_REFRESH_TOKEN }} + RCLONE_EXPIRY_TOKEN: ${{ secrets.RCLONE_EXPIRY_TOKEN }} + DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} + +jobs: + run: + name: ${{ matrix.os }} Python ${{ matrix.python-version }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + python-version: ["3.12"] + os: [ubuntu-latest] + steps: + - uses: actions/checkout@v4 + - run: git fetch --prune --unshallow --tags + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Global Setup + run: | + python -m pip install -U pip # Official recommended way + git config --global user.email "CI@example.com" + git config --global user.name "CI Almighty" + + - name: Install AWS requirements + run: pip install .[aws,test] + + - name: Run RClone on AWS tests + run: pytest -rsx -n auto tests/test_on_data/test_yaml/yaml_aws_tools_tests.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c0c3bd729..a37093e39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Upcoming +## Features +* Added the `rclone_transfer_batch_job` helper function for executing Rclone data transfers in AWS Batch jobs. [PR #1085](https://github.com/catalystneuro/neuroconv/pull/1085) + + + +## v0.6.4 + ## Deprecations * Completely removed compression settings from most places [PR #1126](https://github.com/catalystneuro/neuroconv/pull/1126) @@ -37,6 +44,8 @@ * Avoid running link test when the PR is on draft [PR #1093](https://github.com/catalystneuro/neuroconv/pull/1093) * Centralize gin data preparation in a github action [PR #1095](https://github.com/catalystneuro/neuroconv/pull/1095) + + # v0.6.4 (September 17, 2024) ## Bug Fixes diff --git a/src/neuroconv/tools/aws/__init__.py b/src/neuroconv/tools/aws/__init__.py index d40ddb2dd..88144fb01 100644 --- a/src/neuroconv/tools/aws/__init__.py +++ b/src/neuroconv/tools/aws/__init__.py @@ -1,3 +1,4 @@ from ._submit_aws_batch_job import submit_aws_batch_job +from ._rclone_transfer_batch_job import rclone_transfer_batch_job -__all__ = ["submit_aws_batch_job"] +__all__ = ["submit_aws_batch_job", "rclone_transfer_batch_job"] diff --git a/src/neuroconv/tools/aws/_rclone_transfer_batch_job.py b/src/neuroconv/tools/aws/_rclone_transfer_batch_job.py new file mode 100644 index 000000000..65bef7824 --- /dev/null +++ b/src/neuroconv/tools/aws/_rclone_transfer_batch_job.py @@ -0,0 +1,113 @@ +"""Collection of helper functions for assessing and performing automated data transfers related to AWS.""" + +import warnings +from typing import Optional + +from pydantic import FilePath, validate_call + +from ._submit_aws_batch_job import submit_aws_batch_job + + +@validate_call +def rclone_transfer_batch_job( + *, + rclone_command: str, + job_name: str, + efs_volume_name: str, + rclone_config_file_path: Optional[FilePath] = None, + status_tracker_table_name: str = "neuroconv_batch_status_tracker", + compute_environment_name: str = "neuroconv_batch_environment", + job_queue_name: str = "neuroconv_batch_queue", + job_definition_name: Optional[str] = None, + minimum_worker_ram_in_gib: int = 4, + minimum_worker_cpus: int = 4, + submission_id: Optional[str] = None, + region: Optional[str] = None, +) -> dict[str, str]: + """ + Submit a job to AWS Batch for processing. + + Requires AWS credentials saved to files in the `~/.aws/` folder or set as environment variables. + + Parameters + ---------- + rclone_command : str + The command to pass directly to Rclone running on the EC2 instance. + E.g.: "rclone copy my_drive:testing_rclone /mnt/efs" + Must move data from or to '/mnt/efs'. + job_name : str + The name of the job to submit. + efs_volume_name : str + The name of an EFS volume to be created and attached to the job. + The path exposed to the container will always be `/mnt/efs`. + rclone_config_file_path : FilePath, optional + The path to the Rclone configuration file to use for the job. + If unspecified, method will attempt to find the file in `~/.rclone` and will raise an error if it cannot. + status_tracker_table_name : str, default: "neuroconv_batch_status_tracker" + The name of the DynamoDB table to use for tracking job status. + compute_environment_name : str, default: "neuroconv_batch_environment" + The name of the compute environment to use for the job. + job_queue_name : str, default: "neuroconv_batch_queue" + The name of the job queue to use for the job. + job_definition_name : str, optional + The name of the job definition to use for the job. + If unspecified, a name starting with 'neuroconv_batch_' will be generated. + minimum_worker_ram_in_gib : int, default: 4 + The minimum amount of base worker memory required to run this job. + Determines the EC2 instance type selected by the automatic 'best fit' selector. + Recommended to be several GiB to allow comfortable buffer space for data chunk iterators. + minimum_worker_cpus : int, default: 4 + The minimum number of CPUs required to run this job. + A minimum of 4 is required, even if only one will be used in the actual process. + submission_id : str, optional + The unique ID to pair with this job submission when tracking the status via DynamoDB. + Defaults to a random UUID4. + region : str, optional + The AWS region to use for the job. + If not provided, we will attempt to load the region from your local AWS configuration. + If that file is not found on your system, we will default to "us-east-2", the location of the DANDI Archive. + + Returns + ------- + info : dict + A dictionary containing information about this AWS Batch job. + + info["job_submission_info"] is the return value of `boto3.client.submit_job` which contains the job ID. + info["table_submission_info"] is the initial row data inserted into the DynamoDB status tracking table. + """ + docker_image = "ghcr.io/catalystneuro/rclone_with_config:latest" + + if "/mnt/efs" not in rclone_command: + message = ( + f"The Rclone command '{rclone_command}' does not contain a reference to '/mnt/efs'. " + "Without utilizing the EFS mount, the instance is unlikely to have enough local disk space." + ) + warnings.warn(message=message, stacklevel=2) + + rclone_config_file_path = rclone_config_file_path or pathlib.Path.home() / ".rclone" / "rclone.conf" + if not rclone_config_file_path.exists(): + raise FileNotFoundError( + f"Rclone configuration file not found at: {rclone_config_file_path}! " + "Please check that `rclone config` successfully created the file." + ) + with open(file=rclone_config_file_path, mode="r") as io: + rclone_config_file_stream = io.read() + + region = region or "us-east-2" + + info = submit_aws_batch_job( + job_name=job_name, + docker_image=docker_image, + environment_variables={"RCLONE_CONFIG": rclone_config_file_stream, "RCLONE_COMMAND": rclone_command}, + efs_volume_name=efs_volume_name, + status_tracker_table_name=status_tracker_table_name, + compute_environment_name=compute_environment_name, + job_queue_name=job_queue_name, + job_definition_name=job_definition_name, + minimum_worker_ram_in_gib=minimum_worker_ram_in_gib, + minimum_worker_cpus=minimum_worker_cpus, + submission_id=submission_id, + region=region, + ) + + return info diff --git a/src/neuroconv/tools/aws/_submit_aws_batch_job.py b/src/neuroconv/tools/aws/_submit_aws_batch_job.py index 9e3ba0488..748f25399 100644 --- a/src/neuroconv/tools/aws/_submit_aws_batch_job.py +++ b/src/neuroconv/tools/aws/_submit_aws_batch_job.py @@ -171,7 +171,9 @@ def submit_aws_batch_job( job_dependencies = job_dependencies or [] container_overrides = dict() if environment_variables is not None: - container_overrides["environment"] = [{key: value} for key, value in environment_variables.items()] + container_overrides["environment"] = [ + {"name": key, "value": value} for key, value in environment_variables.items() + ] if commands is not None: container_overrides["command"] = commands @@ -294,7 +296,7 @@ def _ensure_compute_environment_exists( The AWS Batch client to use for the job. max_retries : int, default: 12 If the compute environment does not already exist, then this is the maximum number of times to synchronously - check for its successful creation before erroring. + check for its successful creation before raising an error. This is essential for a clean setup of the entire pipeline, or else later steps might error because they tried to launch before the compute environment was ready. """ @@ -530,7 +532,11 @@ def _generate_job_definition_name( """ docker_tags = docker_image.split(":")[1:] docker_tag = docker_tags[0] if len(docker_tags) > 1 else None - parsed_docker_image_name = docker_image.replace(":", "-") # AWS Batch does not allow colons in job definition names + + # AWS Batch does not allow colons, slashes, or periods in job definition names + parsed_docker_image_name = str(docker_image) + for disallowed_character in [":", r"/", "."]: + parsed_docker_image_name = parsed_docker_image_name.replace(disallowed_character, "-") job_definition_name = f"neuroconv_batch" job_definition_name += f"_{parsed_docker_image_name}-image" @@ -540,7 +546,6 @@ def _generate_job_definition_name( job_definition_name += f"_{efs_id}" if docker_tag is None or docker_tag == "latest": date = datetime.now().strftime("%Y-%m-%d") - job_definition_name += f"_created-on-{date}" return job_definition_name @@ -641,7 +646,7 @@ def _ensure_job_definition_exists_and_get_arn( ] mountPoints = [{"containerPath": "/mnt/efs/", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}] - # batch_client.register_job_definition() is not synchronous and so we need to wait a bit afterwards + # batch_client.register_job_definition is not synchronous and so we need to wait a bit afterwards batch_client.register_job_definition( jobDefinitionName=job_definition_name, type="container", diff --git a/tests/docker_rclone_with_config_cli.py b/tests/docker_rclone_with_config_cli.py index ed472bdf2..9b1e265dd 100644 --- a/tests/docker_rclone_with_config_cli.py +++ b/tests/docker_rclone_with_config_cli.py @@ -61,7 +61,8 @@ def test_direct_usage_of_rclone_with_config(self): os.environ["RCLONE_CONFIG"] = rclone_config_file_stream os.environ["RCLONE_COMMAND"] = ( - f"rclone copy test_google_drive_remote:testing_rclone_with_config {self.test_folder} --verbose --progress --config ./rclone.conf" + f"rclone copy test_google_drive_remote:testing_rclone_with_config {self.test_folder} " + "--verbose --progress --config ./rclone.conf" ) command = ( diff --git a/tests/test_minimal/test_tools/aws_tools.py b/tests/test_minimal/test_tools/aws_tools_tests.py similarity index 100% rename from tests/test_minimal/test_tools/aws_tools.py rename to tests/test_minimal/test_tools/aws_tools_tests.py diff --git a/tests/test_on_data/test_yaml/yaml_aws_tools_tests.py b/tests/test_on_data/test_yaml/yaml_aws_tools_tests.py new file mode 100644 index 000000000..7ea49e644 --- /dev/null +++ b/tests/test_on_data/test_yaml/yaml_aws_tools_tests.py @@ -0,0 +1,176 @@ +import datetime +import os +import time +import unittest + +import boto3 + +from neuroconv.tools.aws import rclone_transfer_batch_job + +from ..setup_paths import OUTPUT_PATH + +_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"] + + +class TestRcloneTransferBatchJob(unittest.TestCase): + """ + To allow this test to work, the developer must create a folder on the outer level of their personal Google Drive + called 'testing_rclone_spikegl_and_phy' with the following structure: + + testing_rclone_spikeglx_and_phy + ├── ci_tests + ├──── spikeglx + ├────── Noise4Sam_g0 + ├──── phy + ├────── phy_example_0 + + Where 'Noise4Sam' is from the 'spikeglx/Noise4Sam_g0' GIN ephys dataset and 'phy_example_0' is likewise from the + 'phy' folder of the same dataset. + + Then the developer must install Rclone and call `rclone config` to generate tokens in their own `rclone.conf` file. + The developer can easily find the location of the config file on their system using `rclone config file`. + """ + + test_folder = OUTPUT_PATH / "aws_rclone_tests" + test_config_file_path = test_folder / "rclone.conf" + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) + region = "us-east-2" + + def setUp(self): + self.test_folder.mkdir(exist_ok=True) + + # Pretend as if .conf file already exists on the system (created via interactive `rclone config` command) + token_dictionary = dict( + access_token=os.environ["RCLONE_DRIVE_ACCESS_TOKEN"], + token_type="Bearer", + refresh_token=os.environ["RCLONE_DRIVE_REFRESH_TOKEN"], + expiry=os.environ["RCLONE_EXPIRY_TOKEN"], + ) + token_string = str(token_dictionary).replace("'", '"').replace(" ", "") + rclone_config_contents = [ + "[test_google_drive_remote]\n", + "type = drive\n", + "scope = drive\n", + f"token = {token_string}\n", + "team_drive = \n", + "\n", + ] + with open(file=self.test_config_file_path, mode="w") as io: + io.writelines(rclone_config_contents) + + self.efs_client = boto3.client( + service_name="efs", + region_name=self.region, + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key, + ) + + def tearDown(self): + efs_client = self.efs_client + + # Cleanup EFS after testing is complete - must clear mount targets first, then wait before deleting the volume + # TODO: cleanup job definitions? (since built daily) + mount_targets = efs_client.describe_mount_targets(FileSystemId=self.efs_id) + for mount_target in mount_targets["MountTargets"]: + efs_client.delete_mount_target(MountTargetId=mount_target["MountTargetId"]) + + time.sleep(60) + efs_client.delete_file_system(FileSystemId=self.efs_id) + + def test_rclone_transfer_batch_job(self): + region = self.region + aws_access_key_id = self.aws_access_key_id + aws_secret_access_key = self.aws_secret_access_key + + dynamodb_resource = boto3.resource( + service_name="dynamodb", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + batch_client = boto3.client( + service_name="batch", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + efs_client = self.efs_client + + rclone_command = ( + "rclone copy test_google_drive_remote:testing_rclone_spikeglx_and_phy /mnt/efs " + "--verbose --progress --config ./rclone.conf" # TODO: should just include this in helper function? + ) + rclone_config_file_path = self.test_config_file_path + + today = datetime.datetime.now().date().isoformat() + job_name = f"test_rclone_transfer_batch_job_{today}" + efs_volume_name = "test_rclone_transfer_batch_efs" + + info = rclone_transfer_batch_job( + rclone_command=rclone_command, + job_name=job_name, + efs_volume_name=efs_volume_name, + rclone_config_file_path=rclone_config_file_path, + ) + + # Wait for AWS to process the job + time.sleep(60) + + job_id = info["job_submission_info"]["jobId"] + job = None + max_retries = 10 + retry = 0 + while retry < max_retries: + job_description_response = batch_client.describe_jobs(jobs=[job_id]) + assert job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + jobs = job_description_response["jobs"] + assert len(jobs) == 1 + + job = jobs[0] + + if job["status"] in _RETRY_STATES: + retry += 1 + time.sleep(60) + else: + break + + # Check EFS specific details + efs_volumes = efs_client.describe_file_systems() + matching_efs_volumes = [ + file_system + for file_system in efs_volumes["FileSystems"] + for tag in file_system["Tags"] + if tag["Key"] == "Name" and tag["Value"] == efs_volume_name + ] + assert len(matching_efs_volumes) == 1 + efs_volume = matching_efs_volumes[0] + self.efs_id = efs_volume["FileSystemId"] + + # Check normal job completion + assert job["jobName"] == job_name + assert "neuroconv_batch_queue" in job["jobQueue"] + assert "fs-" in job["jobDefinition"] + assert job["status"] == "SUCCEEDED" + + status_tracker_table_name = "neuroconv_batch_status_tracker" + table = dynamodb_resource.Table(name=status_tracker_table_name) + table_submission_id = info["table_submission_info"]["id"] + + table_item_response = table.get_item(Key={"id": table_submission_id}) + assert table_item_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + table_item = table_item_response["Item"] + assert table_item["job_name"] == job_name + assert table_item["job_id"] == job_id + assert table_item["status"] == "Job submitted..." + + table.update_item( + Key={"id": table_submission_id}, + AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed - cleaning up..."}}, + ) + + table.update_item( + Key={"id": table_submission_id}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}} + )