From 0ef384411658e29e2a67126996ea50a5d40ce8a5 Mon Sep 17 00:00:00 2001 From: Marwan Zouinkhi Date: Thu, 18 Apr 2024 16:16:38 -0400 Subject: [PATCH 1/6] s3 weight store --- dacapo/options.py | 15 +++++++++ dacapo/store/create_store.py | 12 +++++-- dacapo/store/s3_weights_store.py | 54 ++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 dacapo/store/s3_weights_store.py diff --git a/dacapo/options.py b/dacapo/options.py index 589d3d7cd..52b7b0072 100644 --- a/dacapo/options.py +++ b/dacapo/options.py @@ -35,6 +35,21 @@ class DaCapoConfig: "Currently, only 'files' and 'mongo' are supported with files being the default." }, ) + + store: Optional[str] = attr.ib( + default="local", + metadata={ + "help_text": "The type of store to use for storing configurations and statistics. " + "Currently, only 'local' and 's3' are supported with local being the default." + }, + ) + s3_bucket: Optional[str] = attr.ib( + default=None, + metadata={ + "help_text": "The S3 bucket to use for storing configurations and statistics." + }, + ) + runs_base_dir: Path = attr.ib( default=Path(expanduser("~/dacapo")), metadata={ diff --git a/dacapo/store/create_store.py b/dacapo/store/create_store.py index 57eed6d9b..5263fa36c 100644 --- a/dacapo/store/create_store.py +++ b/dacapo/store/create_store.py @@ -1,5 +1,6 @@ from .local_array_store import LocalArrayStore from .local_weights_store import LocalWeightsStore +from .s3_weights_store import S3WeightsStore from .mongo_config_store import MongoConfigStore from .file_config_store import FileConfigStore from .mongo_stats_store import MongoStatsStore @@ -85,9 +86,14 @@ def create_weights_store(): options = Options.instance() - # currently, only the LocalWeightsStore is supported - base_dir = Path(options.runs_base_dir).expanduser() - return LocalWeightsStore(base_dir) + if options.store == "s3": + s3_bucket = options.s3_bucket + return S3WeightsStore(s3_bucket) + elif options.store == "local": + base_dir = Path(options.runs_base_dir).expanduser() + return LocalWeightsStore(base_dir) + else: + raise ValueError(f"Unknown weights store type {options.type}") def create_array_store(): diff --git a/dacapo/store/s3_weights_store.py b/dacapo/store/s3_weights_store.py new file mode 100644 index 000000000..9ff636ce1 --- /dev/null +++ b/dacapo/store/s3_weights_store.py @@ -0,0 +1,54 @@ +import boto3 +import torch +import json +from botocore.exceptions import NoCredentialsError +from dacapo.experiments.run import Run +from .weights_store import WeightsStore, Weights +from typing import Optional + + +class S3WeightsStore(WeightsStore): + def __init__(self, s3_path: str): + """ + Initialize the S3 weights store. + + Args: + s3_path: The S3 bucket path where the weights are stored. + """ + if s3_path is None: + raise ValueError("S3 bucket base path cannot be None") + self.s3_client = boto3.client("s3") + self.bucket, self.base_path = self.parse_s3_path(s3_path) + + def parse_s3_path(self, s3_path): + """Extract bucket and path from the full s3 path.""" + if not s3_path.startswith("s3://"): + raise ValueError("S3 path must start with 's3://'") + parts = s3_path[len("s3://") :].split("/", 1) + return parts[0], parts[1] if len(parts) > 1 else "" + + def store_weights(self, run: Run, iteration: int): + """ + Store the network weights of the given run on S3. + """ + weights = Weights(run.model.state_dict(), run.optimizer.state_dict()) + weights_name = f"{self.base_path}/{run.name}/checkpoints/iterations/{iteration}" + temp_file = f"/tmp/{weights_name.replace('/', '_')}" + torch.save(weights, temp_file) + self.s3_client.upload_file(temp_file, self.bucket, weights_name) + + def retrieve_weights(self, run: str, iteration: int) -> Weights: + """ + Retrieve the network weights of the given run from S3. + """ + weights_name = f"{self.base_path}/{run}/checkpoints/iterations/{iteration}" + temp_file = f"/tmp/{weights_name.replace('/', '_')}" + self.s3_client.download_file(self.bucket, weights_name, temp_file) + weights = torch.load(temp_file, map_location="cpu") + return weights + + # Implement other methods like latest_iteration, remove, store_best, retrieve_best etc. using S3 operations. + + +# Example usage +# s3_store = S3WeightsStore("s3://my-bucket/path/to/weights") From 800747374c0da2388bd2431a598c14bee07ea7de Mon Sep 17 00:00:00 2001 From: Marwan Zouinkhi Date: Mon, 22 Apr 2024 13:39:17 -0400 Subject: [PATCH 2/6] aws examples --- examples/aws/cloud_csv.csv | 3 +++ examples/aws/dacapo.yaml | 6 ++++++ examples/aws/s3_datasplit.py | 16 ++++++++++++++++ 3 files changed, 25 insertions(+) create mode 100644 examples/aws/cloud_csv.csv create mode 100644 examples/aws/dacapo.yaml create mode 100644 examples/aws/s3_datasplit.py diff --git a/examples/aws/cloud_csv.csv b/examples/aws/cloud_csv.csv new file mode 100644 index 000000000..99a407a82 --- /dev/null +++ b/examples/aws/cloud_csv.csv @@ -0,0 +1,3 @@ +train,s3://janelia-cosem-datasets/jrc_hela-2/jrc_hela-2.zarr,recon-1/em/fibsem-uint8,s3://janelia-cosem-datasets/jrc_hela-2/jrc_hela-2.zarr,recon-1/labels/groundtruth/crop155/[nuc] +train,s3://janelia-cosem-datasets/jrc_hela-2/jrc_hela-2.zarr,recon-1/em/fibsem-uint8,s3://janelia-cosem-datasets/jrc_hela-2/jrc_hela-2.zarr,recon-1/labels/groundtruth/crop7/[nuc] +val,s3://janelia-cosem-datasets/jrc_hela-2/jrc_hela-2.zarr,recon-1/em/fibsem-uint8,s3://janelia-cosem-datasets/jrc_hela-2/jrc_hela-2.zarr,recon-1/labels/groundtruth/crop6/[nuc] \ No newline at end of file diff --git a/examples/aws/dacapo.yaml b/examples/aws/dacapo.yaml new file mode 100644 index 000000000..b96f7fbe4 --- /dev/null +++ b/examples/aws/dacapo.yaml @@ -0,0 +1,6 @@ + +store: "s3" +s3_bucket: "s3//dacapo-tests" + +type: "files" + diff --git a/examples/aws/s3_datasplit.py b/examples/aws/s3_datasplit.py new file mode 100644 index 000000000..f06eb5f5a --- /dev/null +++ b/examples/aws/s3_datasplit.py @@ -0,0 +1,16 @@ +# %% +from dacapo.experiments.datasplits import DataSplitGenerator +from funlib.geometry import Coordinate + +input_resolution = Coordinate(8, 8, 8) +output_resolution = Coordinate(4, 4, 4) +datasplit_config = DataSplitGenerator.generate_from_csv( + "cloud_csv.csv", + input_resolution, + output_resolution, +).compute() +# # %% +datasplit = datasplit_config.datasplit_type(data) +# %% +viewer = datasplit._neuroglancer() +# %% \ No newline at end of file From ab5f4d059165c0be763c8726dba9ec1ae0dc2b01 Mon Sep 17 00:00:00 2001 From: rhoadesScholar Date: Wed, 24 Apr 2024 17:29:22 -0400 Subject: [PATCH 3/6] =?UTF-8?q?build:=20=E2=9E=95=20Add=20s3=20dependencie?= =?UTF-8?q?s.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/aws/s3_datasplit.py | 4 ++-- pyproject.toml | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/aws/s3_datasplit.py b/examples/aws/s3_datasplit.py index f06eb5f5a..2cf148acb 100644 --- a/examples/aws/s3_datasplit.py +++ b/examples/aws/s3_datasplit.py @@ -9,8 +9,8 @@ input_resolution, output_resolution, ).compute() -# # %% +# %% datasplit = datasplit_config.datasplit_type(data) # %% viewer = datasplit._neuroglancer() -# %% \ No newline at end of file +# %% diff --git a/pyproject.toml b/pyproject.toml index 0ef3a39c2..0ab64cdff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,8 @@ dependencies = [ "click", "pyyaml", "scipy", + "upath", + "boto3", ] # extras From 95d3f79e5cd9098b4e34591c77c31c2b922e788a Mon Sep 17 00:00:00 2001 From: Marwan Zouinkhi Date: Mon, 29 Apr 2024 15:00:15 -0400 Subject: [PATCH 4/6] fix typo --- examples/aws/s3_datasplit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/aws/s3_datasplit.py b/examples/aws/s3_datasplit.py index 2cf148acb..f5bb72b79 100644 --- a/examples/aws/s3_datasplit.py +++ b/examples/aws/s3_datasplit.py @@ -10,7 +10,7 @@ output_resolution, ).compute() # %% -datasplit = datasplit_config.datasplit_type(data) +datasplit = datasplit_config.datasplit_type(datasplit_config) # %% viewer = datasplit._neuroglancer() # %% From 1b414a79efe563c853eeb4e49d24a548b6c80b0c Mon Sep 17 00:00:00 2001 From: Marwan Zouinkhi Date: Mon, 29 Apr 2024 15:57:23 -0400 Subject: [PATCH 5/6] aws example --- dacapo/store/create_store.py | 15 +++------ dacapo/store/s3_weights_store.py | 54 -------------------------------- examples/aws/README.md | 14 +++++++++ examples/aws/aws_store_check.py | 30 ++++++++++++++++++ examples/aws/dacapo.yaml | 5 +-- 5 files changed, 49 insertions(+), 69 deletions(-) delete mode 100644 dacapo/store/s3_weights_store.py create mode 100644 examples/aws/README.md create mode 100644 examples/aws/aws_store_check.py diff --git a/dacapo/store/create_store.py b/dacapo/store/create_store.py index 5263fa36c..e04060e90 100644 --- a/dacapo/store/create_store.py +++ b/dacapo/store/create_store.py @@ -1,6 +1,5 @@ from .local_array_store import LocalArrayStore from .local_weights_store import LocalWeightsStore -from .s3_weights_store import S3WeightsStore from .mongo_config_store import MongoConfigStore from .file_config_store import FileConfigStore from .mongo_stats_store import MongoStatsStore @@ -32,7 +31,7 @@ def create_config_store(): db_name = options.mongo_db_name return MongoConfigStore(db_host, db_name) elif options.type == "files": - store_path = Path(options.runs_base_dir).expanduser() + store_path = Path(options.runs_base_dir) return FileConfigStore(store_path / "configs") else: raise ValueError(f"Unknown store type {options.type}") @@ -63,7 +62,7 @@ def create_stats_store(): db_name = options.mongo_db_name return MongoStatsStore(db_host, db_name) elif options.type == "files": - store_path = Path(options.runs_base_dir).expanduser() + store_path = Path(options.runs_base_dir) return FileStatsStore(store_path / "stats") else: raise ValueError(f"Unknown store type {options.type}") @@ -86,14 +85,8 @@ def create_weights_store(): options = Options.instance() - if options.store == "s3": - s3_bucket = options.s3_bucket - return S3WeightsStore(s3_bucket) - elif options.store == "local": - base_dir = Path(options.runs_base_dir).expanduser() - return LocalWeightsStore(base_dir) - else: - raise ValueError(f"Unknown weights store type {options.type}") + base_dir = Path(options.runs_base_dir) + return LocalWeightsStore(base_dir) def create_array_store(): diff --git a/dacapo/store/s3_weights_store.py b/dacapo/store/s3_weights_store.py deleted file mode 100644 index 9ff636ce1..000000000 --- a/dacapo/store/s3_weights_store.py +++ /dev/null @@ -1,54 +0,0 @@ -import boto3 -import torch -import json -from botocore.exceptions import NoCredentialsError -from dacapo.experiments.run import Run -from .weights_store import WeightsStore, Weights -from typing import Optional - - -class S3WeightsStore(WeightsStore): - def __init__(self, s3_path: str): - """ - Initialize the S3 weights store. - - Args: - s3_path: The S3 bucket path where the weights are stored. - """ - if s3_path is None: - raise ValueError("S3 bucket base path cannot be None") - self.s3_client = boto3.client("s3") - self.bucket, self.base_path = self.parse_s3_path(s3_path) - - def parse_s3_path(self, s3_path): - """Extract bucket and path from the full s3 path.""" - if not s3_path.startswith("s3://"): - raise ValueError("S3 path must start with 's3://'") - parts = s3_path[len("s3://") :].split("/", 1) - return parts[0], parts[1] if len(parts) > 1 else "" - - def store_weights(self, run: Run, iteration: int): - """ - Store the network weights of the given run on S3. - """ - weights = Weights(run.model.state_dict(), run.optimizer.state_dict()) - weights_name = f"{self.base_path}/{run.name}/checkpoints/iterations/{iteration}" - temp_file = f"/tmp/{weights_name.replace('/', '_')}" - torch.save(weights, temp_file) - self.s3_client.upload_file(temp_file, self.bucket, weights_name) - - def retrieve_weights(self, run: str, iteration: int) -> Weights: - """ - Retrieve the network weights of the given run from S3. - """ - weights_name = f"{self.base_path}/{run}/checkpoints/iterations/{iteration}" - temp_file = f"/tmp/{weights_name.replace('/', '_')}" - self.s3_client.download_file(self.bucket, weights_name, temp_file) - weights = torch.load(temp_file, map_location="cpu") - return weights - - # Implement other methods like latest_iteration, remove, store_best, retrieve_best etc. using S3 operations. - - -# Example usage -# s3_store = S3WeightsStore("s3://my-bucket/path/to/weights") diff --git a/examples/aws/README.md b/examples/aws/README.md new file mode 100644 index 000000000..96f8c9499 --- /dev/null +++ b/examples/aws/README.md @@ -0,0 +1,14 @@ +You can work locally using S3 data by setting the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables. You can also set the `AWS_REGION` environment variable to specify the region to use. If you are using a profile, you can set the `AWS_PROFILE` environment variable to specify the profile to use. + +```bash +aws configure +``` + +In order to store checkpoints and experiments data in S3, you need to modify `dacapo.yaml` to include the following: + +```yaml +runs_base_dir: "s3://dacapotest" +``` + +For configs and stats, you can save them locally or s3 by setting `type: files` or for mongodb by setting `type: mongo` in the `dacapo.yaml` file. + diff --git a/examples/aws/aws_store_check.py b/examples/aws/aws_store_check.py new file mode 100644 index 000000000..f44b261ed --- /dev/null +++ b/examples/aws/aws_store_check.py @@ -0,0 +1,30 @@ +# %% +import dacapo + +# from import create_config_store + +config_store = dacapo.store.create_store.create_config_store() + +# %% +from dacapo import Options + +options = Options.instance() + +# %% +options +# %% +from dacapo.experiments.tasks import DistanceTaskConfig + +task_config = DistanceTaskConfig( + name="cosem_distance_task_4nm", + channels=["mito"], + clip_distance=40.0, + tol_distance=40.0, + scale_factor=80.0, +) + +# %% + +config_store.store_task_config(task_config) + +# %% diff --git a/examples/aws/dacapo.yaml b/examples/aws/dacapo.yaml index b96f7fbe4..960719a6d 100644 --- a/examples/aws/dacapo.yaml +++ b/examples/aws/dacapo.yaml @@ -1,6 +1,3 @@ -store: "s3" -s3_bucket: "s3//dacapo-tests" - +runs_base_dir: "s3://dacapotest" type: "files" - From 7b5ab499544cd93f0bfcc5d03709528ab40c7c3d Mon Sep 17 00:00:00 2001 From: Marwan Zouinkhi Date: Mon, 29 Apr 2024 15:59:15 -0400 Subject: [PATCH 6/6] reset options --- dacapo/options.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/dacapo/options.py b/dacapo/options.py index 52b7b0072..589d3d7cd 100644 --- a/dacapo/options.py +++ b/dacapo/options.py @@ -35,21 +35,6 @@ class DaCapoConfig: "Currently, only 'files' and 'mongo' are supported with files being the default." }, ) - - store: Optional[str] = attr.ib( - default="local", - metadata={ - "help_text": "The type of store to use for storing configurations and statistics. " - "Currently, only 'local' and 's3' are supported with local being the default." - }, - ) - s3_bucket: Optional[str] = attr.ib( - default=None, - metadata={ - "help_text": "The S3 bucket to use for storing configurations and statistics." - }, - ) - runs_base_dir: Path = attr.ib( default=Path(expanduser("~/dacapo")), metadata={