diff --git a/lib/galaxy/objectstore/__init__.py b/lib/galaxy/objectstore/__init__.py index d0dfad17dd51..efd4565f2412 100644 --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -1505,6 +1505,10 @@ def type_to_object_store_class( objectstore_constructor_kwds: Dict[str, Any] = {} if store == "disk": objectstore_class = DiskObjectStore + elif store == "boto3": + from .s3_boto3 import S3ObjectStore + + objectstore_class = S3ObjectStore elif store in ["s3", "aws_s3"]: from .s3 import S3ObjectStore diff --git a/lib/galaxy/objectstore/s3_boto3.py b/lib/galaxy/objectstore/s3_boto3.py new file mode 100644 index 000000000000..564227703ac3 --- /dev/null +++ b/lib/galaxy/objectstore/s3_boto3.py @@ -0,0 +1,390 @@ +"""A more modern version of the S3 object store based on boto3 instead of boto. +""" + +import logging +import os +from datetime import datetime +from typing import ( + Optional, + TYPE_CHECKING, +) + +if TYPE_CHECKING: + from mypy_boto3_c3.client import S3Client + +try: + # Imports are done this way to allow objectstore code to be used outside of Galaxy. + import boto3 + from botocore.client import ClientError +except ImportError: + boto3 = None # type: ignore[assignment] + +from . import ConcreteObjectStore +from .caching import ( + CacheTarget, + enable_cache_monitor, + InProcessCacheMonitor, + parse_caching_config_dict_from_xml, + UsesCache, +) + +NO_BOTO_ERROR_MESSAGE = ( + "S3/Swift object store configured, but no boto3 dependency available." + "Please install and properly configure boto or modify object store configuration." +) + +log = logging.getLogger(__name__) +logging.getLogger("boto").setLevel(logging.INFO) # Otherwise boto is quite noisy + + +def download_directory(bucket, remote_folder, local_path): + # List objects in the specified S3 folder + objects = bucket.list(prefix=remote_folder) + + for obj in objects: + remote_file_path = obj.key + local_file_path = os.path.join(local_path, os.path.relpath(remote_file_path, remote_folder)) + + # Create directories if they don't exist + os.makedirs(os.path.dirname(local_file_path), exist_ok=True) + + # Download the file + obj.get_contents_to_filename(local_file_path) + + +def parse_config_xml(config_xml): + try: + a_xml = config_xml.findall("auth")[0] + access_key = a_xml.get("access_key") + secret_key = a_xml.get("secret_key") + + b_xml = config_xml.findall("bucket")[0] + bucket_name = b_xml.get("name") + + cn_xml = config_xml.findall("connection") + if not cn_xml: + cn_xml = {} + else: + cn_xml = cn_xml[0] + endpoint_url = cn_xml.get("endpoint_url") + region = cn_xml.get("region") + cache_dict = parse_caching_config_dict_from_xml(config_xml) + + tag, attrs = "extra_dir", ("type", "path") + extra_dirs = config_xml.findall(tag) + if not extra_dirs: + msg = f"No {tag} element in XML tree" + log.error(msg) + raise Exception(msg) + extra_dirs = [{k: e.get(k) for k in attrs} for e in extra_dirs] + + config_dict = { + "auth": { + "access_key": access_key, + "secret_key": secret_key, + }, + "bucket": { + "name": bucket_name, + }, + "connection": { + "endpoint_url": endpoint_url, + "region": region, + }, + "cache": cache_dict, + "extra_dirs": extra_dirs, + "private": ConcreteObjectStore.parse_private_from_config_xml(config_xml), + } + name = config_xml.attrib.get("name", None) + if name is not None: + config_dict["name"] = name + device = config_xml.attrib.get("device", None) + config_dict["device"] = device + return config_dict + except Exception: + # Toss it back up after logging, we can't continue loading at this point. + log.exception("Malformed ObjectStore Configuration XML -- unable to continue") + raise + + +class CloudConfigMixin: + def _config_to_dict(self): + return { + "auth": { + "access_key": self.access_key, + "secret_key": self.secret_key, + }, + "bucket": { + "name": self.bucket, + }, + "connection": { + "endpoint_url": self.endpoint_url, + "region": self.region, + }, + "cache": { + "size": self.cache_size, + "path": self.staging_path, + "cache_updated_data": self.cache_updated_data, + }, + } + + +class S3ObjectStore(ConcreteObjectStore, CloudConfigMixin, UsesCache): + """ + Object store that stores objects as items in an AWS S3 bucket. A local + cache exists that is used as an intermediate location for files between + Galaxy and S3. + """ + + _client: "S3Client" + cache_monitor: Optional[InProcessCacheMonitor] = None + store_type = "aws_s3_boto3" + cloud = True + + def __init__(self, config, config_dict): + super().__init__(config, config_dict) + self.cache_monitor = None + + auth_dict = config_dict["auth"] + bucket_dict = config_dict["bucket"] + connection_dict = config_dict.get("connection", {}) + cache_dict = config_dict.get("cache") or {} + self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict) + + self.access_key = auth_dict.get("access_key") + self.secret_key = auth_dict.get("secret_key") + + self.bucket = bucket_dict.get("name") + + self.endpoint_url = connection_dict.get("endpoint_url") + self.region = connection_dict.get("region") + + self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size + self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path + self.cache_updated_data = cache_dict.get("cache_updated_data", True) + + extra_dirs = {e["type"]: e["path"] for e in config_dict.get("extra_dirs", [])} + self.extra_dirs.update(extra_dirs) + + self._initialize() + + def _initialize(self): + if boto3 is None: + raise Exception(NO_BOTO_ERROR_MESSAGE) + + self._configure_connection() + self.start_cache_monitor() + + def start_cache_monitor(self): + if self.enable_cache_monitor: + self.cache_monitor = InProcessCacheMonitor(self.cache_target, self.cache_monitor_interval) + + def _configure_connection(self): + log.debug("Configuring S3 Connection") + self._init_client() + if not self._bucket_exists: + self._create_bucket() + + # get_object_url only works on AWS if client is set, so if it wasn't + # fetch it and reset the client now. Skip this logic entirely for other + # non-AWS services by ensuring endpoint_url is not set. + if not self.endpoint_url and not self.region: + response = self._client.get_bucket_location( + Bucket=self.bucket, + ) + if "LocationConstraint" in response: + region = response["LocationConstraint"] + self.region = region + self._init_client() + + def _init_client(self): + # set _client based on current args. + # If access_key is empty use default credential chain + kwds = { + "service_name": "s3", + } + if self.endpoint_url: + kwds["endpoint_url"] = self.endpoint_url + if self.region: + kwds["region_name"] = self.region + if self.access_key: + kwds["aws_access_key_id"] = self.access_key + kwds["aws_secret_access_key"] = self.secret_key + self._client = boto3.client(**kwds) + + @property + def _bucket_exists(self) -> bool: + try: + self._client.head_bucket(Bucket=self.bucket) + return True + except ClientError as err: + if err.response["ResponseMetadata"]["HTTPStatusCode"] == 404: + return False + raise + + def _create_bucket(self): + kwds = {} + if self.region: + kwds["CreateBucketConfiguration"] = dict(LocationConstraint=self.region) + self._client.create_bucket(Bucket=self.bucket, **kwds) + + @classmethod + def parse_xml(clazz, config_xml): + return parse_config_xml(config_xml) + + def to_dict(self): + as_dict = super().to_dict() + as_dict.update(self._config_to_dict()) + return as_dict + + @property + def cache_target(self) -> CacheTarget: + return CacheTarget( + self.staging_path, + self.cache_size, + 0.9, + ) + + def _get_remote_size(self, rel_path) -> int: + response = self._client.head_object(Bucket=self.bucket, Key=rel_path) + return response["ContentLength"] + + def _exists_remotely(self, rel_path: str) -> bool: + try: + self._client.head_object(Bucket=self.bucket, Key=rel_path) + return True + except ClientError as e: + if e.response["Error"]["Code"] == "404": + return False + raise + + def _download(self, rel_path: str) -> bool: + local_destination = self._get_cache_path(rel_path) + try: + log.debug("Pulling key '%s' into cache to %s", rel_path, local_destination) + size = self._get_remote_size(rel_path) + if not self.cache_target.fits_in_cache(size): + log.critical( + "File %s is larger (%s) than the configured cache allows (%s). Cannot download.", + rel_path, + size, + self.cache_target.log_description, + ) + return False + self._client.download_file(self.bucket, rel_path, local_destination) + return True + except ClientError: + log.exception("Failed to download file from S3") + return False + + def _push_to_os(self, rel_path, source_file=None, from_string=None): + """ + Push the file pointed to by ``rel_path`` to the object store naming the key + ``rel_path``. If ``source_file`` is provided, push that file instead while + still using ``rel_path`` as the key name. + If ``from_string`` is provided, set contents of the file to the value of + the string. + """ + try: + source_file = source_file if source_file else self._get_cache_path(rel_path) + if os.path.exists(source_file): + exists_remotely = self._exists_remotely(rel_path) + local_size = os.path.getsize(source_file) + if local_size == 0 and exists_remotely: + log.debug( + "Wanted to push file '%s' to S3 key '%s' but its size is 0; skipping.", source_file, rel_path + ) + return True + if from_string is not None: + from io import StringIO + + # self._client.upload_file(self.bucket, rel_path, StringIO(from_string)) + self._client.put_object(Body=from_string.encode("utf-8"), Bucket=self.bucket, Key=rel_path) + log.debug("Pushed data from string '%s' to key '%s'", from_string, rel_path) + else: + start_time = datetime.now() + log.debug( + "Pushing cache file '%s' of size %s bytes to key '%s'", + source_file, + local_size, + rel_path, + ) + self._client.upload_file(source_file, self.bucket, rel_path) + end_time = datetime.now() + log.debug( + "Pushed cache file '%s' to key '%s' (%s bytes transfered in %s sec)", + source_file, + rel_path, + local_size, + end_time - start_time, + ) + return True + else: + log.error( + "Tried updating key '%s' from source file '%s', but source file does not exist.", + rel_path, + source_file, + ) + except ClientError: + log.exception("Trouble pushing S3 key '%s' from file '%s'", rel_path, source_file) + raise + return False + + def _delete_remote_all(self, rel_path: str) -> bool: + try: + for key in self._keys(rel_path): + self._client.delete_object(Bucket=self.bucket, Key=rel_path) + return True + except ClientError: + log.exception("Could not delete blob '%s' from S3", rel_path) + return False + + def _delete_existing_remote(self, rel_path: str) -> bool: + try: + self._client.delete_object(Bucket=self.bucket, Key=rel_path) + return True + except ClientError: + log.exception("Could not delete blob '%s' from S3", rel_path) + return False + + # https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3 + def _keys(self, prefix="/", delimiter="/", start_after=""): + s3_paginator = self._client.get_paginator("list_objects_v2") + prefix = prefix.lstrip(delimiter) + start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after + for page in s3_paginator.paginate(Bucket=self.bucket, Prefix=prefix, StartAfter=start_after): + for content in page.get("Contents", ()): + yield content["Key"] + + def _download_directory_into_cache(self, rel_path, cache_path): + for key in self._keys(rel_path): + local_file_path = os.path.join(cache_path, os.path.relpath(key, rel_path)) + + # Create directories if they don't exist + os.makedirs(os.path.dirname(local_file_path), exist_ok=True) + + # Download the file + self._client.download_file(self.bucket, rel_path, local_file_path) + + def _get_object_url(self, obj, **kwargs): + try: + if self._exists(obj, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + url = self._client.generate_presigned_url( + ClientMethod="get_object", + Params={ + "Bucket": self.bucket, + "Key": rel_path, + }, + ExpiresIn=3600, + HttpMethod="GET", + ) + return url + except ClientError: + log.exception("Failed to generate URL for dataset.") + return None + + def _get_store_usage_percent(self, obj): + return 0.0 + + def shutdown(self): + self.cache_monitor and self.cache_monitor.shutdown() diff --git a/pyproject.toml b/pyproject.toml index 7e18ef256bde..50c5fda83932 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -183,6 +183,7 @@ types-python-dateutil = "*" types-PyYAML = "*" types-requests = "*" types-six = "*" +"boto3-stubs[s3]" = "*" [tool.ruff] target-version = "py38" diff --git a/test/unit/objectstore/test_objectstore.py b/test/unit/objectstore/test_objectstore.py index 1229a01e0a74..b76669d8db4d 100644 --- a/test/unit/objectstore/test_objectstore.py +++ b/test/unit/objectstore/test_objectstore.py @@ -1544,6 +1544,51 @@ def test_real_aws_s3_store(tmp_path): verify_caching_object_store_functionality(tmp_path, object_store) +AMAZON_BOTO3_S3_SIMPLE_TEMPLATE_TEST_CONFIG_YAML = """ +type: boto3 +store_by: uuid +auth: + access_key: ${access_key} + secret_key: ${secret_key} + +bucket: + name: ${bucket} + +extra_dirs: +- type: job_work + path: database/job_working_directory_azure +- type: temp + path: database/tmp_azure +""" + + +@skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY") +@skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY") +@skip_unless_environ("GALAXY_TEST_AWS_BUCKET") +def test_real_aws_s3_store_boto3(tmp_path): + template_vars = { + "access_key": os.environ["GALAXY_TEST_AWS_ACCESS_KEY"], + "secret_key": os.environ["GALAXY_TEST_AWS_SECRET_KEY"], + "bucket": os.environ["GALAXY_TEST_AWS_BUCKET"], + } + with TestConfig(AMAZON_BOTO3_S3_SIMPLE_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (_, object_store): + verify_caching_object_store_functionality(tmp_path, object_store) + + +@skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY") +@skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY") +def test_real_aws_s3_store_boto3_new_bucket(tmp_path): + import random + rand_int = random.randint(100000, 999999) + template_vars = { + "access_key": os.environ["GALAXY_TEST_AWS_ACCESS_KEY"], + "secret_key": os.environ["GALAXY_TEST_AWS_SECRET_KEY"], + "bucket": f"mynewbucket{rand_int}", + } + with TestConfig(AMAZON_BOTO3_S3_SIMPLE_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (_, object_store): + verify_caching_object_store_functionality(tmp_path, object_store) + + AMAZON_CLOUDBRIDGE_TEMPLATE_TEST_CONFIG_YAML = """ type: cloud store_by: uuid @@ -1655,6 +1700,43 @@ def test_gcp_via_s3_interop(tmp_path): verify_caching_object_store_functionality(tmp_path, object_store) +GOOGLE_INTEROP_VIA_BOTO3_TEMPLATE_TEST_CONFIG_YAML = """ +type: boto3 +store_by: uuid +auth: + access_key: ${access_key} + secret_key: ${secret_key} + +bucket: + name: ${bucket} + +connection: + endpoint_url: https://storage.googleapis.com + +extra_dirs: +- type: job_work + path: database/job_working_directory_azure +- type: temp + path: database/tmp_azure +""" + + +@skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_ACCESS_KEY") +@skip_unless_environ("GALAXY_TEST_GOOGLE_INTEROP_SECRET_KEY") +@skip_unless_environ("GALAXY_TEST_GOOGLE_BUCKET") +def test_gcp_via_s3_interop_and_boto3(tmp_path): + template_vars = { + "access_key": os.environ["GALAXY_TEST_GOOGLE_INTEROP_ACCESS_KEY"], + "secret_key": os.environ["GALAXY_TEST_GOOGLE_INTEROP_SECRET_KEY"], + "bucket": os.environ["GALAXY_TEST_GOOGLE_BUCKET"], + } + with TestConfig(GOOGLE_INTEROP_VIA_BOTO3_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as ( + _, + object_store, + ): + verify_caching_object_store_functionality(tmp_path, object_store) + + class MockDataset: def __init__(self, id): self.id = id