-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Cloud Deployment IVb] Rclone in AWS on EFS #1085
Changes from 29 commits
9b78811
93fd38e
0520312
0c6c299
3f7e2e3
8652d57
b49a702
68c8e94
830ca3d
2f5c6c2
eb631f9
f1cfbd0
08ba1e1
bd7cb1e
6ddb351
beeabff
daeabc9
5a9eaff
d20ca09
8d0bb4c
a388bd1
8fb3811
41f3155
ff030df
cdb7f5f
750034d
860ad7f
7af57d4
c58649a
9ea3bab
cdd0c52
cd958c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,6 @@ concurrency: # Cancel previous workflows on the same pull request | |
env: | ||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} | ||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} | ||
DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} | ||
|
||
jobs: | ||
run: | ||
|
@@ -36,8 +35,8 @@ jobs: | |
git config --global user.email "[email protected]" | ||
git config --global user.name "CI Almighty" | ||
|
||
- name: Install full requirements | ||
- name: Install AWS requirements | ||
run: pip install .[aws,test] | ||
|
||
- name: Run subset of tests that use S3 live services | ||
run: pytest -rsx -n auto tests/test_minimal/test_tools/aws_tools.py | ||
- name: Run generic AWS tests | ||
run: pytest -rsx -n auto tests/test_minimal/test_tools/aws_tools_tests.py |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
name: Rclone AWS Tests | ||
on: | ||
schedule: | ||
- cron: "0 16 * * 2" # Weekly at noon on Tuesday | ||
workflow_dispatch: | ||
|
||
concurrency: # Cancel previous workflows on the same pull request | ||
group: ${{ github.workflow }}-${{ github.ref }} | ||
cancel-in-progress: true | ||
|
||
env: | ||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} | ||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} | ||
RCLONE_DRIVE_ACCESS_TOKEN: ${{ secrets.RCLONE_DRIVE_ACCESS_TOKEN }} | ||
RCLONE_DRIVE_REFRESH_TOKEN: ${{ secrets.RCLONE_DRIVE_REFRESH_TOKEN }} | ||
RCLONE_EXPIRY_TOKEN: ${{ secrets.RCLONE_EXPIRY_TOKEN }} | ||
DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} | ||
|
||
jobs: | ||
run: | ||
name: ${{ matrix.os }} Python ${{ matrix.python-version }} | ||
runs-on: ${{ matrix.os }} | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
python-version: ["3.12"] | ||
os: [ubuntu-latest] | ||
steps: | ||
- uses: actions/checkout@v4 | ||
- run: git fetch --prune --unshallow --tags | ||
- name: Setup Python ${{ matrix.python-version }} | ||
uses: actions/setup-python@v5 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
|
||
- name: Global Setup | ||
run: | | ||
python -m pip install -U pip # Official recommended way | ||
git config --global user.email "[email protected]" | ||
git config --global user.name "CI Almighty" | ||
|
||
- name: Install AWS requirements | ||
run: pip install .[aws,test] | ||
|
||
- name: Run RClone on AWS tests | ||
run: pytest -rsx -n auto tests/test_on_data/test_yaml/yaml_aws_tools_tests.py |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
from ._submit_aws_batch_job import submit_aws_batch_job | ||
from ._rclone_transfer_batch_job import rclone_transfer_batch_job | ||
|
||
__all__ = ["submit_aws_batch_job"] | ||
__all__ = ["submit_aws_batch_job", "rclone_transfer_batch_job"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
"""Collection of helper functions for assessing and performing automated data transfers related to AWS.""" | ||
|
||
import warnings | ||
from typing import Optional | ||
|
||
from pydantic import FilePath, validate_call | ||
|
||
from ._submit_aws_batch_job import submit_aws_batch_job | ||
|
||
|
||
@validate_call | ||
def rclone_transfer_batch_job( | ||
*, | ||
rclone_command: str, | ||
job_name: str, | ||
efs_volume_name: str, | ||
rclone_config_file_path: Optional[FilePath] = None, | ||
status_tracker_table_name: str = "neuroconv_batch_status_tracker", | ||
compute_environment_name: str = "neuroconv_batch_environment", | ||
job_queue_name: str = "neuroconv_batch_queue", | ||
job_definition_name: Optional[str] = None, | ||
minimum_worker_ram_in_gib: int = 4, | ||
minimum_worker_cpus: int = 4, | ||
submission_id: Optional[str] = None, | ||
region: Optional[str] = None, | ||
) -> dict[str, str]: | ||
""" | ||
Submit a job to AWS Batch for processing. | ||
|
||
Requires AWS credentials saved to files in the `~/.aws/` folder or set as environment variables. | ||
|
||
Parameters | ||
---------- | ||
rclone_command : str | ||
The command to pass directly to Rclone running on the EC2 instance. | ||
E.g.: "rclone copy my_drive:testing_rclone /mnt/efs" | ||
Must move data from or to '/mnt/efs'. | ||
job_name : str | ||
The name of the job to submit. | ||
efs_volume_name : str | ||
The name of an EFS volume to be created and attached to the job. | ||
The path exposed to the container will always be `/mnt/efs`. | ||
rclone_config_file_path : FilePath, optional | ||
The path to the Rclone configuration file to use for the job. | ||
If unspecified, method will attempt to find the file in `~/.rclone` and will raise an error if it cannot. | ||
status_tracker_table_name : str, default: "neuroconv_batch_status_tracker" | ||
The name of the DynamoDB table to use for tracking job status. | ||
compute_environment_name : str, default: "neuroconv_batch_environment" | ||
The name of the compute environment to use for the job. | ||
job_queue_name : str, default: "neuroconv_batch_queue" | ||
The name of the job queue to use for the job. | ||
job_definition_name : str, optional | ||
The name of the job definition to use for the job. | ||
If unspecified, a name starting with 'neuroconv_batch_' will be generated. | ||
minimum_worker_ram_in_gib : int, default: 4 | ||
The minimum amount of base worker memory required to run this job. | ||
Determines the EC2 instance type selected by the automatic 'best fit' selector. | ||
Recommended to be several GiB to allow comfortable buffer space for data chunk iterators. | ||
minimum_worker_cpus : int, default: 4 | ||
The minimum number of CPUs required to run this job. | ||
A minimum of 4 is required, even if only one will be used in the actual process. | ||
submission_id : str, optional | ||
The unique ID to pair with this job submission when tracking the status via DynamoDB. | ||
Defaults to a random UUID4. | ||
region : str, optional | ||
The AWS region to use for the job. | ||
If not provided, we will attempt to load the region from your local AWS configuration. | ||
If that file is not found on your system, we will default to "us-east-2", the location of the DANDI Archive. | ||
|
||
Returns | ||
------- | ||
info : dict | ||
A dictionary containing information about this AWS Batch job. | ||
|
||
info["job_submission_info"] is the return value of `boto3.client.submit_job` which contains the job ID. | ||
info["table_submission_info"] is the initial row data inserted into the DynamoDB status tracking table. | ||
""" | ||
docker_image = "ghcr.io/catalystneuro/rclone_with_config:latest" | ||
|
||
if "/mnt/efs" not in rclone_command: | ||
message = ( | ||
f"The Rclone command '{rclone_command}' does not contain a reference to '/mnt/efs'. " | ||
"Without utilizing the EFS mount, the instance is unlikely to have enough local disk space." | ||
) | ||
warnings.warn(message=message, stacklevel=2) | ||
|
||
rclone_config_file_path = rclone_config_file_path or pathlib.Path.home() / ".rclone" / "rclone.conf" | ||
if not rclone_config_file_path.exists(): | ||
raise FileNotFoundError( | ||
f"Rclone configuration file not found at: {rclone_config_file_path}! " | ||
"Please check that `rclone config` successfully created the file." | ||
) | ||
with open(file=rclone_config_file_path, mode="r") as io: | ||
rclone_config_file_stream = io.read() | ||
|
||
region = region or "us-east-2" | ||
|
||
info = submit_aws_batch_job( | ||
job_name=job_name, | ||
docker_image=docker_image, | ||
environment_variables={"RCLONE_CONFIG": rclone_config_file_stream, "RCLONE_COMMAND": rclone_command}, | ||
efs_volume_name=efs_volume_name, | ||
status_tracker_table_name=status_tracker_table_name, | ||
compute_environment_name=compute_environment_name, | ||
job_queue_name=job_queue_name, | ||
job_definition_name=job_definition_name, | ||
minimum_worker_ram_in_gib=minimum_worker_ram_in_gib, | ||
minimum_worker_cpus=minimum_worker_cpus, | ||
submission_id=submission_id, | ||
region=region, | ||
) | ||
|
||
return info |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -171,7 +171,9 @@ def submit_aws_batch_job( | |
job_dependencies = job_dependencies or [] | ||
container_overrides = dict() | ||
if environment_variables is not None: | ||
container_overrides["environment"] = [{key: value} for key, value in environment_variables.items()] | ||
container_overrides["environment"] = [ | ||
{"name": key, "value": value} for key, value in environment_variables.items() | ||
] | ||
if commands is not None: | ||
container_overrides["command"] = commands | ||
|
||
|
@@ -294,7 +296,7 @@ def _ensure_compute_environment_exists( | |
The AWS Batch client to use for the job. | ||
max_retries : int, default: 12 | ||
If the compute environment does not already exist, then this is the maximum number of times to synchronously | ||
check for its successful creation before erroring. | ||
check for its successful creation before raising an error. | ||
This is essential for a clean setup of the entire pipeline, or else later steps might error because they tried | ||
to launch before the compute environment was ready. | ||
""" | ||
|
@@ -530,7 +532,11 @@ def _generate_job_definition_name( | |
""" | ||
docker_tags = docker_image.split(":")[1:] | ||
docker_tag = docker_tags[0] if len(docker_tags) > 1 else None | ||
parsed_docker_image_name = docker_image.replace(":", "-") # AWS Batch does not allow colons in job definition names | ||
|
||
# AWS Batch does not allow colons, slashes, or periods in job definition names | ||
parsed_docker_image_name = str(docker_image) | ||
for disallowed_character in [":", r"/", "."]: | ||
parsed_docker_image_name = parsed_docker_image_name.replace(disallowed_character, "-") | ||
Comment on lines
+535
to
+539
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Turns out these are really restrictive, had a fix this using a GHCR image source |
||
|
||
job_definition_name = f"neuroconv_batch" | ||
job_definition_name += f"_{parsed_docker_image_name}-image" | ||
|
@@ -540,7 +546,6 @@ def _generate_job_definition_name( | |
job_definition_name += f"_{efs_id}" | ||
if docker_tag is None or docker_tag == "latest": | ||
date = datetime.now().strftime("%Y-%m-%d") | ||
job_definition_name += f"_created-on-{date}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After internal debate, I decided to roll this back since even latest images will be pulled, not at time of job definition declaration, but rather at time of instance run (which could be well after) Should reduce the amount of spam for the normal AWS tests |
||
|
||
return job_definition_name | ||
|
||
|
@@ -641,7 +646,7 @@ def _ensure_job_definition_exists_and_get_arn( | |
] | ||
mountPoints = [{"containerPath": "/mnt/efs/", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}] | ||
|
||
# batch_client.register_job_definition() is not synchronous and so we need to wait a bit afterwards | ||
# batch_client.register_job_definition is not synchronous and so we need to wait a bit afterwards | ||
batch_client.register_job_definition( | ||
jobDefinitionName=job_definition_name, | ||
type="container", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to fix this, was not tested before now