From 8ff1ca6a7e45ca6ec661146724a12560c44fb7c6 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 7 May 2024 19:57:45 -0400 Subject: [PATCH] Implement a boto3 object store. --- lib/galaxy/dependencies/__init__.py | 3 + lib/galaxy/objectstore/__init__.py | 4 + lib/galaxy/objectstore/s3_boto3.py | 340 ++++++++++++++++++++++ pyproject.toml | 1 + test/unit/objectstore/test_objectstore.py | 84 +++++- 5 files changed, 431 insertions(+), 1 deletion(-) create mode 100644 lib/galaxy/objectstore/s3_boto3.py diff --git a/lib/galaxy/dependencies/__init__.py b/lib/galaxy/dependencies/__init__.py index 0bb785aa136a..44322353329a 100644 --- a/lib/galaxy/dependencies/__init__.py +++ b/lib/galaxy/dependencies/__init__.py @@ -234,6 +234,9 @@ def check_python_pam(self): def check_azure_storage(self): return "azure_blob" in self.object_stores + def check_boto3(self): + return "boto3" in self.object_stores + def check_kamaki(self): return "pithos" in self.object_stores diff --git a/lib/galaxy/objectstore/__init__.py b/lib/galaxy/objectstore/__init__.py index b954ea910f65..a09bcf4adb31 100644 --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -1386,6 +1386,10 @@ def type_to_object_store_class(store: str, fsmon: bool = False) -> Tuple[Type[Ba objectstore_constructor_kwds = {} if store == "disk": objectstore_class = DiskObjectStore + elif store == "boto3": + from .s3_boto3 import S3ObjectStore as Boto3ObjectStore + + objectstore_class = Boto3ObjectStore elif store in ["s3", "aws_s3"]: from .s3 import S3ObjectStore diff --git a/lib/galaxy/objectstore/s3_boto3.py b/lib/galaxy/objectstore/s3_boto3.py new file mode 100644 index 000000000000..149bdaf5c815 --- /dev/null +++ b/lib/galaxy/objectstore/s3_boto3.py @@ -0,0 +1,340 @@ +"""A more modern version of the S3 object store based on boto3 instead of boto. +""" + +import logging +import os +from typing import ( + TYPE_CHECKING, + TypedDict, +) + +from typing_extensions import ( + Literal, + NotRequired, +) + +if TYPE_CHECKING: + from mypy_boto3_c3.client import S3Client + +try: + # Imports are done this way to allow objectstore code to be used outside of Galaxy. + import boto3 + from botocore.client import ClientError +except ImportError: + boto3 = None # type: ignore[assignment,unused-ignore] + +from ._caching_base import CachingConcreteObjectStore +from .caching import ( + enable_cache_monitor, + parse_caching_config_dict_from_xml, +) + +NO_BOTO_ERROR_MESSAGE = ( + "S3/Swift object store configured, but no boto3 dependency available." + "Please install and properly configure boto or modify object store configuration." +) + +log = logging.getLogger(__name__) +logging.getLogger("boto").setLevel(logging.INFO) # Otherwise boto is quite noisy + + +def download_directory(bucket, remote_folder, local_path): + # List objects in the specified S3 folder + objects = bucket.list(prefix=remote_folder) + + for obj in objects: + remote_file_path = obj.key + local_file_path = os.path.join(local_path, os.path.relpath(remote_file_path, remote_folder)) + + # Create directories if they don't exist + os.makedirs(os.path.dirname(local_file_path), exist_ok=True) + + # Download the file + obj.get_contents_to_filename(local_file_path) + + +def parse_config_xml(config_xml): + try: + a_xml = config_xml.findall("auth")[0] + access_key = a_xml.get("access_key") + secret_key = a_xml.get("secret_key") + + b_xml = config_xml.findall("bucket")[0] + bucket_name = b_xml.get("name") + + cn_xml = config_xml.findall("connection") + if not cn_xml: + cn_xml = {} + else: + cn_xml = cn_xml[0] + endpoint_url = cn_xml.get("endpoint_url") + region = cn_xml.get("region") + cache_dict = parse_caching_config_dict_from_xml(config_xml) + + tag, attrs = "extra_dir", ("type", "path") + extra_dirs = config_xml.findall(tag) + if not extra_dirs: + msg = f"No {tag} element in XML tree" + log.error(msg) + raise Exception(msg) + extra_dirs = [{k: e.get(k) for k in attrs} for e in extra_dirs] + + config_dict = { + "auth": { + "access_key": access_key, + "secret_key": secret_key, + }, + "bucket": { + "name": bucket_name, + }, + "connection": { + "endpoint_url": endpoint_url, + "region": region, + }, + "cache": cache_dict, + "extra_dirs": extra_dirs, + "private": CachingConcreteObjectStore.parse_private_from_config_xml(config_xml), + } + name = config_xml.attrib.get("name", None) + if name is not None: + config_dict["name"] = name + device = config_xml.attrib.get("device", None) + config_dict["device"] = device + return config_dict + except Exception: + # Toss it back up after logging, we can't continue loading at this point. + log.exception("Malformed ObjectStore Configuration XML -- unable to continue") + raise + + +class S3ClientConstructorKwds(TypedDict): + service_name: Literal["s3"] + endpoint_url: NotRequired[str] + region_name: NotRequired[str] + aws_access_key_id: NotRequired[str] + aws_secret_access_key: NotRequired[str] + + +class S3ObjectStore(CachingConcreteObjectStore): + """ + Object store that stores objects as items in an AWS S3 bucket. A local + cache exists that is used as an intermediate location for files between + Galaxy and S3. + """ + + _client: "S3Client" + store_type = "aws_s3_boto3" + cloud = True + + def __init__(self, config, config_dict): + super().__init__(config, config_dict) + self.cache_monitor = None + + auth_dict = config_dict["auth"] + bucket_dict = config_dict["bucket"] + connection_dict = config_dict.get("connection", {}) + cache_dict = config_dict.get("cache") or {} + self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict) + + self.access_key = auth_dict.get("access_key") + self.secret_key = auth_dict.get("secret_key") + + self.bucket = bucket_dict.get("name") + + self.endpoint_url = connection_dict.get("endpoint_url") + self.region = connection_dict.get("region") + + self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size + self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path + self.cache_updated_data = cache_dict.get("cache_updated_data", True) + + extra_dirs = {e["type"]: e["path"] for e in config_dict.get("extra_dirs", [])} + self.extra_dirs.update(extra_dirs) + + self._initialize() + + def _initialize(self): + if boto3 is None: + raise Exception(NO_BOTO_ERROR_MESSAGE) + + self._configure_connection() + self._start_cache_monitor_if_needed() + + def _configure_connection(self): + log.debug("Configuring S3 Connection") + self._init_client() + if not self._bucket_exists: + self._create_bucket() + + # get_object_url only works on AWS if client is set, so if it wasn't + # fetch it and reset the client now. Skip this logic entirely for other + # non-AWS services by ensuring endpoint_url is not set. + if not self.endpoint_url and not self.region: + response = self._client.get_bucket_location( + Bucket=self.bucket, + ) + if "LocationConstraint" in response: + region = response["LocationConstraint"] + self.region = region + self._init_client() + + def _init_client(self): + # set _client based on current args. + # If access_key is empty use default credential chain + kwds: S3ClientConstructorKwds = { + "service_name": "s3", + } + if self.endpoint_url: + kwds["endpoint_url"] = self.endpoint_url + if self.region: + kwds["region_name"] = self.region + if self.access_key: + kwds["aws_access_key_id"] = self.access_key + kwds["aws_secret_access_key"] = self.secret_key + self._client = boto3.client(**kwds) + + @property + def _bucket_exists(self) -> bool: + try: + self._client.head_bucket(Bucket=self.bucket) + return True + except ClientError as err: + if err.response["ResponseMetadata"]["HTTPStatusCode"] == 404: + return False + raise + + def _create_bucket(self): + kwds = {} + if self.region: + kwds["CreateBucketConfiguration"] = dict(LocationConstraint=self.region) + self._client.create_bucket(Bucket=self.bucket, **kwds) + + @classmethod + def parse_xml(clazz, config_xml): + return parse_config_xml(config_xml) + + def _config_to_dict(self): + return { + "auth": { + "access_key": self.access_key, + "secret_key": self.secret_key, + }, + "bucket": { + "name": self.bucket, + }, + "connection": { + "endpoint_url": self.endpoint_url, + "region": self.region, + }, + "cache": { + "size": self.cache_size, + "path": self.staging_path, + "cache_updated_data": self.cache_updated_data, + }, + } + + def to_dict(self): + as_dict = super().to_dict() + as_dict.update(self._config_to_dict()) + return as_dict + + def _get_remote_size(self, rel_path) -> int: + response = self._client.head_object(Bucket=self.bucket, Key=rel_path) + return response["ContentLength"] + + def _exists_remotely(self, rel_path: str) -> bool: + try: + self._client.head_object(Bucket=self.bucket, Key=rel_path) + return True + except ClientError as e: + if e.response["Error"]["Code"] == "404": + return False + raise + + def _download(self, rel_path: str) -> bool: + local_destination = self._get_cache_path(rel_path) + try: + log.debug("Pulling key '%s' into cache to %s", rel_path, local_destination) + if not self._caching_allowed(rel_path): + return False + self._client.download_file(self.bucket, rel_path, local_destination) + return True + except ClientError: + log.exception("Failed to download file from S3") + return False + + def _push_string_to_path(self, rel_path: str, from_string: str) -> bool: + try: + self._client.put_object(Body=from_string.encode("utf-8"), Bucket=self.bucket, Key=rel_path) + return True + except ClientError: + log.exception("Trouble pushing to S3 '%s' from string", rel_path) + return False + + def _push_file_to_path(self, rel_path: str, source_file: str) -> bool: + try: + self._client.upload_file(source_file, self.bucket, rel_path) + return True + except ClientError: + log.exception("Trouble pushing to S3 '%s' from file '%s'", rel_path, source_file) + return False + + def _delete_remote_all(self, rel_path: str) -> bool: + try: + for key in self._keys(rel_path): + self._client.delete_object(Bucket=self.bucket, Key=key) + return True + except ClientError: + log.exception("Could not delete blob '%s' from S3", rel_path) + return False + + def _delete_existing_remote(self, rel_path: str) -> bool: + try: + self._client.delete_object(Bucket=self.bucket, Key=rel_path) + return True + except ClientError: + log.exception("Could not delete blob '%s' from S3", rel_path) + return False + + # https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3 + def _keys(self, prefix="/", delimiter="/", start_after=""): + s3_paginator = self._client.get_paginator("list_objects_v2") + prefix = prefix.lstrip(delimiter) + start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after + for page in s3_paginator.paginate(Bucket=self.bucket, Prefix=prefix, StartAfter=start_after): + for content in page.get("Contents", ()): + yield content["Key"] + + def _download_directory_into_cache(self, rel_path, cache_path): + for key in self._keys(rel_path): + local_file_path = os.path.join(cache_path, os.path.relpath(key, rel_path)) + + # Create directories if they don't exist + os.makedirs(os.path.dirname(local_file_path), exist_ok=True) + + # Download the file + self._client.download_file(self.bucket, rel_path, local_file_path) + + def _get_object_url(self, obj, **kwargs): + try: + if self._exists(obj, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + url = self._client.generate_presigned_url( + ClientMethod="get_object", + Params={ + "Bucket": self.bucket, + "Key": rel_path, + }, + ExpiresIn=3600, + HttpMethod="GET", + ) + return url + except ClientError: + log.exception("Failed to generate URL for dataset.") + return None + + def _get_store_usage_percent(self, obj): + return 0.0 + + def shutdown(self): + self._shutdown_cache_monitor() diff --git a/pyproject.toml b/pyproject.toml index 7e18ef256bde..50c5fda83932 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -183,6 +183,7 @@ types-python-dateutil = "*" types-PyYAML = "*" types-requests = "*" types-six = "*" +"boto3-stubs[s3]" = "*" [tool.ruff] target-version = "py38" diff --git a/test/unit/objectstore/test_objectstore.py b/test/unit/objectstore/test_objectstore.py index 1229a01e0a74..5f60c007973c 100644 --- a/test/unit/objectstore/test_objectstore.py +++ b/test/unit/objectstore/test_objectstore.py @@ -1,4 +1,5 @@ import os +import random import time from tempfile import ( mkdtemp, @@ -1544,6 +1545,50 @@ def test_real_aws_s3_store(tmp_path): verify_caching_object_store_functionality(tmp_path, object_store) +AMAZON_BOTO3_S3_SIMPLE_TEMPLATE_TEST_CONFIG_YAML = """ +type: boto3 +store_by: uuid +auth: + access_key: ${access_key} + secret_key: ${secret_key} + +bucket: + name: ${bucket} + +extra_dirs: +- type: job_work + path: database/job_working_directory_azure +- type: temp + path: database/tmp_azure +""" + + +@skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY") +@skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY") +@skip_unless_environ("GALAXY_TEST_AWS_BUCKET") +def test_real_aws_s3_store_boto3(tmp_path): + template_vars = { + "access_key": os.environ["GALAXY_TEST_AWS_ACCESS_KEY"], + "secret_key": os.environ["GALAXY_TEST_AWS_SECRET_KEY"], + "bucket": os.environ["GALAXY_TEST_AWS_BUCKET"], + } + with TestConfig(AMAZON_BOTO3_S3_SIMPLE_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (_, object_store): + verify_caching_object_store_functionality(tmp_path, object_store) + + +@skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY") +@skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY") +def test_real_aws_s3_store_boto3_new_bucket(tmp_path): + rand_int = random.randint(100000, 999999) + template_vars = { + "access_key": os.environ["GALAXY_TEST_AWS_ACCESS_KEY"], + "secret_key": os.environ["GALAXY_TEST_AWS_SECRET_KEY"], + "bucket": f"mynewbucket{rand_int}", + } + with TestConfig(AMAZON_BOTO3_S3_SIMPLE_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (_, object_store): + verify_caching_object_store_functionality(tmp_path, object_store) + + AMAZON_CLOUDBRIDGE_TEMPLATE_TEST_CONFIG_YAML = """ type: cloud store_by: uuid @@ -1617,7 +1662,6 @@ def test_aws_via_cloudbridge_store_with_region(tmp_path): verify_caching_object_store_functionality(tmp_path, object_store) - GOOGLE_S3_INTEROP_TEMPLATE_TEST_CONFIG_YAML = """ type: generic_s3 store_by: uuid @@ -1639,6 +1683,7 @@ def test_aws_via_cloudbridge_store_with_region(tmp_path): path: database/tmp_azure """ + @skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_ACCESS_KEY") @skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_SECRET_KEY") @skip_unless_environ("GALAXY_TEST_GOOGLE_BUCKET") @@ -1655,6 +1700,43 @@ def test_gcp_via_s3_interop(tmp_path): verify_caching_object_store_functionality(tmp_path, object_store) +GOOGLE_INTEROP_VIA_BOTO3_TEMPLATE_TEST_CONFIG_YAML = """ +type: boto3 +store_by: uuid +auth: + access_key: ${access_key} + secret_key: ${secret_key} + +bucket: + name: ${bucket} + +connection: + endpoint_url: https://storage.googleapis.com + +extra_dirs: +- type: job_work + path: database/job_working_directory_azure +- type: temp + path: database/tmp_azure +""" + + +@skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_ACCESS_KEY") +@skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_SECRET_KEY") +@skip_unless_environ("GALAXY_TEST_GOOGLE_BUCKET") +def test_gcp_via_s3_interop_and_boto3(tmp_path): + template_vars = { + "access_key": os.environ["GALAXY_TEST_GOOGLE_INTEROP_ACCESS_KEY"], + "secret_key": os.environ["GALAXY_TEST_GOOGLE_INTEROP_SECRET_KEY"], + "bucket": os.environ["GALAXY_TEST_GOOGLE_BUCKET"], + } + with TestConfig(GOOGLE_INTEROP_VIA_BOTO3_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as ( + _, + object_store, + ): + verify_caching_object_store_functionality(tmp_path, object_store) + + class MockDataset: def __init__(self, id): self.id = id