diff --git a/lib/galaxy/objectstore/__init__.py b/lib/galaxy/objectstore/__init__.py index efd4565f2412..0a94bf9810f3 100644 --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -1506,9 +1506,9 @@ def type_to_object_store_class( if store == "disk": objectstore_class = DiskObjectStore elif store == "boto3": - from .s3_boto3 import S3ObjectStore + from .s3_boto3 import S3ObjectStore as Boto3ObjectStore - objectstore_class = S3ObjectStore + objectstore_class = Boto3ObjectStore elif store in ["s3", "aws_s3"]: from .s3 import S3ObjectStore diff --git a/lib/galaxy/objectstore/_caching_base.py b/lib/galaxy/objectstore/_caching_base.py new file mode 100644 index 000000000000..6f8af4590ce9 --- /dev/null +++ b/lib/galaxy/objectstore/_caching_base.py @@ -0,0 +1,334 @@ +import logging +import os +import shutil +from math import inf +from typing import ( + Any, + Dict, + Optional, +) + +from galaxy.exceptions import ( + ObjectInvalid, + ObjectNotFound, +) +from galaxy.objectstore import ConcreteObjectStore +from galaxy.util import ( + directory_hash_id, + ExecutionTimer, + unlink, +) +from galaxy.util.path import safe_relpath +from ._util import fix_permissions +from .caching import ( + CacheTarget, + InProcessCacheMonitor, +) + +log = logging.getLogger(__name__) + + +class CachingConcreteObjectStore(ConcreteObjectStore): + staging_path: str + extra_dirs: Dict[str, str] + config: Any + cache_updated_data: bool + enable_cache_monitor: bool + cache_size: int + cache_monitor: Optional[InProcessCacheMonitor] = None + cache_monitor_interval: int + + def _construct_path( + self, + obj, + base_dir=None, + dir_only=None, + extra_dir=None, + extra_dir_at_root=False, + alt_name=None, + obj_dir=False, + in_cache=False, + **kwargs, + ): + # extra_dir should never be constructed from provided data but just + # make sure there are no shenannigans afoot + if extra_dir and extra_dir != os.path.normpath(extra_dir): + log.warning("extra_dir is not normalized: %s", extra_dir) + raise ObjectInvalid("The requested object is invalid") + # ensure that any parent directory references in alt_name would not + # result in a path not contained in the directory path constructed here + if alt_name: + if not safe_relpath(alt_name): + log.warning("alt_name would locate path outside dir: %s", alt_name) + raise ObjectInvalid("The requested object is invalid") + # alt_name can contain parent directory references, but S3 will not + # follow them, so if they are valid we normalize them out + alt_name = os.path.normpath(alt_name) + + object_id = self._get_object_id(obj) + rel_path = os.path.join(*directory_hash_id(object_id)) + + if extra_dir is not None: + if extra_dir_at_root: + rel_path = os.path.join(extra_dir, rel_path) + else: + rel_path = os.path.join(rel_path, extra_dir) + + # for JOB_WORK directory + if obj_dir: + rel_path = os.path.join(rel_path, str(object_id)) + if base_dir: + base = self.extra_dirs.get(base_dir) + assert base + return os.path.join(base, rel_path) + + if not dir_only: + rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{object_id}.dat") + + if in_cache: + return self._get_cache_path(rel_path) + + return rel_path + + def _get_cache_path(self, rel_path: str) -> str: + return os.path.abspath(os.path.join(self.staging_path, rel_path)) + + def _in_cache(self, rel_path: str) -> bool: + """Check if the given dataset is in the local cache and return True if so.""" + cache_path = self._get_cache_path(rel_path) + return os.path.exists(cache_path) + + def _pull_into_cache(self, rel_path) -> bool: + ipt_timer = ExecutionTimer() + # Ensure the cache directory structure exists (e.g., dataset_#_files/) + rel_path_dir = os.path.dirname(rel_path) + if not os.path.exists(self._get_cache_path(rel_path_dir)): + os.makedirs(self._get_cache_path(rel_path_dir), exist_ok=True) + # Now pull in the file + file_ok = self._download(rel_path) + fix_permissions(self.config, self._get_cache_path(rel_path_dir)) + log.debug("_pull_into_cache: %s\n\n\n\n\n\n", ipt_timer) + return file_ok + + def _get_data(self, obj, start=0, count=-1, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + # Check cache first and get file if not there + if not self._in_cache(rel_path): + self._pull_into_cache(rel_path) + # Read the file content from cache + data_file = open(self._get_cache_path(rel_path)) + data_file.seek(start) + content = data_file.read(count) + data_file.close() + return content + + def _exists(self, obj, **kwargs): + in_cache = exists_remotely = False + rel_path = self._construct_path(obj, **kwargs) + dir_only = kwargs.get("dir_only", False) + base_dir = kwargs.get("base_dir", None) + + # check job work directory stuff early to skip API hits. + if dir_only and base_dir: + if not os.path.exists(rel_path): + os.makedirs(rel_path, exist_ok=True) + return True + + in_cache = self._in_cache(rel_path) + exists_remotely = self._exists_remotely(rel_path) + dir_only = kwargs.get("dir_only", False) + base_dir = kwargs.get("base_dir", None) + if dir_only: + if in_cache or exists_remotely: + return True + else: + return False + + # TODO: Sync should probably not be done here. Add this to an async upload stack? + if in_cache and not exists_remotely: + self._push_to_os(rel_path, source_file=self._get_cache_path(rel_path)) + return True + elif exists_remotely: + return True + else: + return False + + def _create(self, obj, **kwargs): + if not self._exists(obj, **kwargs): + # Pull out locally used fields + extra_dir = kwargs.get("extra_dir", None) + extra_dir_at_root = kwargs.get("extra_dir_at_root", False) + dir_only = kwargs.get("dir_only", False) + alt_name = kwargs.get("alt_name", None) + + # Construct hashed path + rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) + + # Optionally append extra_dir + if extra_dir is not None: + if extra_dir_at_root: + rel_path = os.path.join(extra_dir, rel_path) + else: + rel_path = os.path.join(rel_path, extra_dir) + + # Create given directory in cache + cache_dir = os.path.join(self.staging_path, rel_path) + if not os.path.exists(cache_dir): + os.makedirs(cache_dir, exist_ok=True) + + # If instructed, create the dataset in cache & in S3 + if not dir_only: + rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") + open(os.path.join(self.staging_path, rel_path), "w").close() + self._push_to_os(rel_path, from_string="") + return self + + def _empty(self, obj, **kwargs): + if self._exists(obj, **kwargs): + return self._size(obj, **kwargs) == 0 + else: + raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}") + + def _size(self, obj, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + if self._in_cache(rel_path): + try: + return os.path.getsize(self._get_cache_path(rel_path)) + except OSError as ex: + log.info("Could not get size of file '%s' in local cache, will try Azure. Error: %s", rel_path, ex) + elif self._exists_remotely(rel_path): + return self._get_remote_size(rel_path) + log.warning("Did not find dataset '%s', returning 0 for size", rel_path) + return 0 + + def _get_filename(self, obj, **kwargs): + base_dir = kwargs.get("base_dir", None) + dir_only = kwargs.get("dir_only", False) + obj_dir = kwargs.get("obj_dir", False) + sync_cache = kwargs.get("sync_cache", True) + + rel_path = self._construct_path(obj, **kwargs) + + # for JOB_WORK directory + if base_dir and dir_only and obj_dir: + return os.path.abspath(rel_path) + + cache_path = self._get_cache_path(rel_path) + if not sync_cache: + return cache_path + + # Check if the file exists in the cache first, always pull if file size in cache is zero + if self._in_cache(rel_path) and (dir_only or os.path.getsize(self._get_cache_path(rel_path)) > 0): + return cache_path + + # Check if the file exists in persistent storage and, if it does, pull it into cache + elif self._exists(obj, **kwargs): + if dir_only: + self._download_directory_into_cache(rel_path, cache_path) + return cache_path + else: + if self._pull_into_cache(rel_path): + return cache_path + raise ObjectNotFound(f"objectstore.get_filename, no cache_path: {obj}, kwargs: {kwargs}") + + def _download_directory_into_cache(self, rel_path, cache_path): + # azure, pithos, irod, and cloud did not do this prior to refactoring so I am assuming + # there is just operations that fail with these object stores, + # I'm placing a no-op here to match their behavior but we should + # maybe implement this for those object stores. + pass + + def _delete(self, obj, entire_dir=False, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + extra_dir = kwargs.get("extra_dir", None) + base_dir = kwargs.get("base_dir", None) + dir_only = kwargs.get("dir_only", False) + obj_dir = kwargs.get("obj_dir", False) + try: + # Remove temporary data in JOB_WORK directory + if base_dir and dir_only and obj_dir: + shutil.rmtree(os.path.abspath(rel_path)) + return True + + # For the case of extra_files, because we don't have a reference to + # individual files/keys we need to remove the entire directory structure + # with all the files in it. This is easy for the local file system, + # but requires iterating through each individual key in S3 and deleing it. + if entire_dir and extra_dir: + shutil.rmtree(self._get_cache_path(rel_path), ignore_errors=True) + return self._delete_remote_all(rel_path) + else: + # Delete from cache first + unlink(self._get_cache_path(rel_path), ignore_errors=True) + # Delete from S3 as well + if self._exists_remotely(rel_path): + return self._delete_existing_remote(rel_path) + except OSError: + log.exception("%s delete error", self._get_filename(obj, **kwargs)) + return False + + def _update_from_file(self, obj, file_name=None, create=False, **kwargs): + if create: + self._create(obj, **kwargs) + + if self._exists(obj, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + # Chose whether to use the dataset file itself or an alternate file + if file_name: + source_file = os.path.abspath(file_name) + # Copy into cache + cache_file = self._get_cache_path(rel_path) + try: + if source_file != cache_file and self.cache_updated_data: + # FIXME? Should this be a `move`? + shutil.copy2(source_file, cache_file) + fix_permissions(self.config, cache_file) + except OSError: + log.exception("Trouble copying source file '%s' to cache '%s'", source_file, cache_file) + else: + source_file = self._get_cache_path(rel_path) + + self._push_to_os(rel_path, source_file) + + else: + raise ObjectNotFound( + f"objectstore.update_from_file, object does not exist: {str(obj)}, kwargs: {str(kwargs)}" + ) + + @property + def cache_target(self) -> CacheTarget: + print("GRABBING CACHE_TARGET>...") + return CacheTarget( + self.staging_path, + self.cache_size, + 0.9, + ) + + def _shutdown_cache_monitor(self) -> None: + self.cache_monitor and self.cache_monitor.shutdown() + + def _start_cache_monitor_if_needed(self): + if self.enable_cache_monitor: + self.cache_monitor = InProcessCacheMonitor(self.cache_target, self.cache_monitor_interval) + + def _get_remote_size(self, rel_path: str) -> int: + raise NotImplementedError() + + def _exists_remotely(self, rel_path: str) -> bool: + raise NotImplementedError() + + def _push_to_os(self, rel_path, source_file: Optional[str] = None, from_string: Optional[str] = None) -> None: + raise NotImplementedError() + + # def _get_object_id(self, obj: Any) -> str: + # raise NotImplementedError() + + def _download(self, rel_path: str) -> bool: + raise NotImplementedError() + + # Do not need to override these if instead replacing _delete + def _delete_existing_remote(self, rel_path) -> bool: + raise NotImplementedError() + + def _delete_remote_all(self, rel_path) -> bool: + raise NotImplementedError() diff --git a/lib/galaxy/objectstore/azure_blob.py b/lib/galaxy/objectstore/azure_blob.py index 4d25d1d5df05..be6446de9a21 100644 --- a/lib/galaxy/objectstore/azure_blob.py +++ b/lib/galaxy/objectstore/azure_blob.py @@ -8,7 +8,6 @@ datetime, timedelta, ) -from typing import Optional try: from azure.common import AzureHttpError @@ -20,12 +19,10 @@ except ImportError: BlobServiceClient = None -from . import ConcreteObjectStore +from ._caching_base import CachingConcreteObjectStore from .caching import ( enable_cache_monitor, - InProcessCacheMonitor, parse_caching_config_dict_from_xml, - UsesCache, ) NO_BLOBSERVICE_ERROR_MESSAGE = ( @@ -72,7 +69,7 @@ def parse_config_xml(config_xml): }, "cache": cache_dict, "extra_dirs": extra_dirs, - "private": ConcreteObjectStore.parse_private_from_config_xml(config_xml), + "private": CachingConcreteObjectStore.parse_private_from_config_xml(config_xml), } name = config_xml.attrib.get("name", None) if name is not None: @@ -86,14 +83,13 @@ def parse_config_xml(config_xml): raise -class AzureBlobObjectStore(ConcreteObjectStore, UsesCache): +class AzureBlobObjectStore(CachingConcreteObjectStore): """ Object store that stores objects as blobs in an Azure Blob Container. A local cache exists that is used as an intermediate location for files between Galaxy and Azure. """ - cache_monitor: Optional[InProcessCacheMonitor] = None store_type = "azure_blob" cloud = True diff --git a/lib/galaxy/objectstore/caching.py b/lib/galaxy/objectstore/caching.py index 18bc159651a4..fed5582197d1 100644 --- a/lib/galaxy/objectstore/caching.py +++ b/lib/galaxy/objectstore/caching.py @@ -3,7 +3,6 @@ import logging import os -import shutil import threading import time from math import inf @@ -17,10 +16,6 @@ from typing_extensions import NamedTuple -from galaxy.exceptions import ( - ObjectInvalid, - ObjectNotFound, -) from galaxy.util import ( directory_hash_id, ExecutionTimer, @@ -225,305 +220,3 @@ def shutdown(self): # Wait for the cache monitor thread to join before ending self.cache_monitor_thread.join(5) - - -# mixin for object stores using a cache directory -class UsesCache: - staging_path: str - extra_dirs: Dict[str, str] - config: Any - cache_updated_data: bool - - def _construct_path( - self, - obj, - base_dir=None, - dir_only=None, - extra_dir=None, - extra_dir_at_root=False, - alt_name=None, - obj_dir=False, - in_cache=False, - **kwargs, - ): - # extra_dir should never be constructed from provided data but just - # make sure there are no shenannigans afoot - if extra_dir and extra_dir != os.path.normpath(extra_dir): - log.warning("extra_dir is not normalized: %s", extra_dir) - raise ObjectInvalid("The requested object is invalid") - # ensure that any parent directory references in alt_name would not - # result in a path not contained in the directory path constructed here - if alt_name: - if not safe_relpath(alt_name): - log.warning("alt_name would locate path outside dir: %s", alt_name) - raise ObjectInvalid("The requested object is invalid") - # alt_name can contain parent directory references, but S3 will not - # follow them, so if they are valid we normalize them out - alt_name = os.path.normpath(alt_name) - - object_id = self._get_object_id(obj) - rel_path = os.path.join(*directory_hash_id(object_id)) - - if extra_dir is not None: - if extra_dir_at_root: - rel_path = os.path.join(extra_dir, rel_path) - else: - rel_path = os.path.join(rel_path, extra_dir) - - # for JOB_WORK directory - if obj_dir: - rel_path = os.path.join(rel_path, str(object_id)) - if base_dir: - base = self.extra_dirs.get(base_dir) - assert base - return os.path.join(base, rel_path) - - if not dir_only: - rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{object_id}.dat") - - if in_cache: - return self._get_cache_path(rel_path) - - return rel_path - - def _get_cache_path(self, rel_path: str) -> str: - return os.path.abspath(os.path.join(self.staging_path, rel_path)) - - def _in_cache(self, rel_path: str) -> bool: - """Check if the given dataset is in the local cache and return True if so.""" - cache_path = self._get_cache_path(rel_path) - return os.path.exists(cache_path) - - def _pull_into_cache(self, rel_path) -> bool: - ipt_timer = ExecutionTimer() - # Ensure the cache directory structure exists (e.g., dataset_#_files/) - rel_path_dir = os.path.dirname(rel_path) - if not os.path.exists(self._get_cache_path(rel_path_dir)): - os.makedirs(self._get_cache_path(rel_path_dir), exist_ok=True) - # Now pull in the file - file_ok = self._download(rel_path) - fix_permissions(self.config, self._get_cache_path(rel_path_dir)) - log.debug("_pull_into_cache: %s\n\n\n\n\n\n", ipt_timer) - return file_ok - - def _get_data(self, obj, start=0, count=-1, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - # Check cache first and get file if not there - if not self._in_cache(rel_path): - self._pull_into_cache(rel_path) - # Read the file content from cache - data_file = open(self._get_cache_path(rel_path)) - data_file.seek(start) - content = data_file.read(count) - data_file.close() - return content - - def _exists(self, obj, **kwargs): - in_cache = exists_remotely = False - rel_path = self._construct_path(obj, **kwargs) - dir_only = kwargs.get("dir_only", False) - base_dir = kwargs.get("base_dir", None) - - # check job work directory stuff early to skip API hits. - if dir_only and base_dir: - if not os.path.exists(rel_path): - os.makedirs(rel_path, exist_ok=True) - return True - - in_cache = self._in_cache(rel_path) - exists_remotely = self._exists_remotely(rel_path) - dir_only = kwargs.get("dir_only", False) - base_dir = kwargs.get("base_dir", None) - if dir_only: - if in_cache or exists_remotely: - return True - else: - return False - - # TODO: Sync should probably not be done here. Add this to an async upload stack? - if in_cache and not exists_remotely: - self._push_to_os(rel_path, source_file=self._get_cache_path(rel_path)) - return True - elif exists_remotely: - return True - else: - return False - - def _create(self, obj, **kwargs): - if not self._exists(obj, **kwargs): - # Pull out locally used fields - extra_dir = kwargs.get("extra_dir", None) - extra_dir_at_root = kwargs.get("extra_dir_at_root", False) - dir_only = kwargs.get("dir_only", False) - alt_name = kwargs.get("alt_name", None) - - # Construct hashed path - rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) - - # Optionally append extra_dir - if extra_dir is not None: - if extra_dir_at_root: - rel_path = os.path.join(extra_dir, rel_path) - else: - rel_path = os.path.join(rel_path, extra_dir) - - # Create given directory in cache - cache_dir = os.path.join(self.staging_path, rel_path) - if not os.path.exists(cache_dir): - os.makedirs(cache_dir, exist_ok=True) - - # If instructed, create the dataset in cache & in S3 - if not dir_only: - rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") - open(os.path.join(self.staging_path, rel_path), "w").close() - self._push_to_os(rel_path, from_string="") - return self - - def _empty(self, obj, **kwargs): - if self._exists(obj, **kwargs): - return self._size(obj, **kwargs) == 0 - else: - raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}") - - def _size(self, obj, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - if self._in_cache(rel_path): - try: - return os.path.getsize(self._get_cache_path(rel_path)) - except OSError as ex: - log.info("Could not get size of file '%s' in local cache, will try Azure. Error: %s", rel_path, ex) - elif self._exists_remotely(rel_path): - return self._get_remote_size(rel_path) - log.warning("Did not find dataset '%s', returning 0 for size", rel_path) - return 0 - - def _get_filename(self, obj, **kwargs): - base_dir = kwargs.get("base_dir", None) - dir_only = kwargs.get("dir_only", False) - obj_dir = kwargs.get("obj_dir", False) - sync_cache = kwargs.get("sync_cache", True) - - rel_path = self._construct_path(obj, **kwargs) - - # for JOB_WORK directory - if base_dir and dir_only and obj_dir: - return os.path.abspath(rel_path) - - cache_path = self._get_cache_path(rel_path) - if not sync_cache: - return cache_path - - # Check if the file exists in the cache first, always pull if file size in cache is zero - if self._in_cache(rel_path) and (dir_only or os.path.getsize(self._get_cache_path(rel_path)) > 0): - return cache_path - - # Check if the file exists in persistent storage and, if it does, pull it into cache - elif self._exists(obj, **kwargs): - if dir_only: - self._download_directory_into_cache(rel_path, cache_path) - return cache_path - else: - if self._pull_into_cache(rel_path): - return cache_path - raise ObjectNotFound(f"objectstore.get_filename, no cache_path: {obj}, kwargs: {kwargs}") - - def _download_directory_into_cache(self, rel_path, cache_path): - # azure, pithos, irod, and cloud did not do this prior to refactoring so I am assuming - # there is just operations that fail with these object stores, - # I'm placing a no-op here to match their behavior but we should - # maybe implement this for those object stores. - pass - - def _delete(self, obj, entire_dir=False, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - extra_dir = kwargs.get("extra_dir", None) - base_dir = kwargs.get("base_dir", None) - dir_only = kwargs.get("dir_only", False) - obj_dir = kwargs.get("obj_dir", False) - try: - # Remove temporary data in JOB_WORK directory - if base_dir and dir_only and obj_dir: - shutil.rmtree(os.path.abspath(rel_path)) - return True - - # For the case of extra_files, because we don't have a reference to - # individual files/keys we need to remove the entire directory structure - # with all the files in it. This is easy for the local file system, - # but requires iterating through each individual key in S3 and deleing it. - if entire_dir and extra_dir: - shutil.rmtree(self._get_cache_path(rel_path), ignore_errors=True) - return self._delete_remote_all(rel_path) - else: - # Delete from cache first - unlink(self._get_cache_path(rel_path), ignore_errors=True) - # Delete from S3 as well - if self._exists_remotely(rel_path): - return self._delete_existing_remote(rel_path) - except OSError: - log.exception("%s delete error", self._get_filename(obj, **kwargs)) - return False - - def _update_from_file(self, obj, file_name=None, create=False, **kwargs): - if create: - self._create(obj, **kwargs) - - if self._exists(obj, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - # Chose whether to use the dataset file itself or an alternate file - if file_name: - source_file = os.path.abspath(file_name) - # Copy into cache - cache_file = self._get_cache_path(rel_path) - try: - if source_file != cache_file and self.cache_updated_data: - # FIXME? Should this be a `move`? - shutil.copy2(source_file, cache_file) - fix_permissions(self.config, cache_file) - except OSError: - log.exception("Trouble copying source file '%s' to cache '%s'", source_file, cache_file) - else: - source_file = self._get_cache_path(rel_path) - - self._push_to_os(rel_path, source_file) - - else: - raise ObjectNotFound( - f"objectstore.update_from_file, object does not exist: {str(obj)}, kwargs: {str(kwargs)}" - ) - - @property - def cache_target(self) -> CacheTarget: - return CacheTarget( - self.staging_path, - self.cache_size, - 0.9, - ) - - def _shutdown_cache_monitor(self) -> None: - self.cache_monitor and self.cache_monitor.shutdown() - - def _start_cache_monitor_if_needed(self): - if self.enable_cache_monitor: - self.cache_monitor = InProcessCacheMonitor(self.cache_target, self.cache_monitor_interval) - - def _get_remote_size(self, rel_path: str) -> int: - raise NotImplementedError() - - def _exists_remotely(self, rel_path: str) -> bool: - raise NotImplementedError() - - def _push_to_os(self, rel_path, source_file: Optional[str] = None, from_string: Optional[str] = None) -> None: - raise NotImplementedError() - - # def _get_object_id(self, obj: Any) -> str: - # raise NotImplementedError() - - def _download(self, rel_path: str) -> bool: - raise NotImplementedError() - - # Do not need to override these if instead replacing _delete - def _delete_existing_remote(self, rel_path) -> bool: - raise NotImplementedError() - - def _delete_remote_all(self, rel_path) -> bool: - raise NotImplementedError() diff --git a/lib/galaxy/objectstore/cloud.py b/lib/galaxy/objectstore/cloud.py index ddf0a2659063..c78992dd6d94 100644 --- a/lib/galaxy/objectstore/cloud.py +++ b/lib/galaxy/objectstore/cloud.py @@ -8,14 +8,9 @@ import os.path import subprocess from datetime import datetime -from typing import Optional -from . import ConcreteObjectStore -from .caching import ( - enable_cache_monitor, - InProcessCacheMonitor, - UsesCache, -) +from ._caching_base import CachingConcreteObjectStore +from .caching import enable_cache_monitor from .s3 import parse_config_xml try: @@ -53,14 +48,13 @@ def _config_to_dict(self): } -class Cloud(ConcreteObjectStore, CloudConfigMixin, UsesCache): +class Cloud(CachingConcreteObjectStore, CloudConfigMixin): """ Object store that stores objects as items in an cloud storage. A local cache exists that is used as an intermediate location for files between Galaxy and the cloud storage. """ - cache_monitor: Optional[InProcessCacheMonitor] = None store_type = "cloud" def __init__(self, config, config_dict): diff --git a/lib/galaxy/objectstore/irods.py b/lib/galaxy/objectstore/irods.py index 88146aa6076f..d3a989739667 100644 --- a/lib/galaxy/objectstore/irods.py +++ b/lib/galaxy/objectstore/irods.py @@ -25,8 +25,7 @@ string_as_bool, unlink, ) -from . import DiskObjectStore -from .caching import UsesCache +from ._caching_base import CachingConcreteObjectStore IRODS_IMPORT_MESSAGE = "The Python irods package is required to use this feature, please install it" # 1 MB @@ -109,7 +108,7 @@ def parse_config_xml(config_xml): "cache_updated_data": cache_updated_data, }, "extra_dirs": extra_dirs, - "private": DiskObjectStore.parse_private_from_config_xml(config_xml), + "private": CachingConcreteObjectStore.parse_private_from_config_xml(config_xml), } except Exception: # Toss it back up after logging, we can't continue loading at this point. @@ -145,7 +144,7 @@ def _config_to_dict(self): } -class IRODSObjectStore(DiskObjectStore, CloudConfigMixin, UsesCache): +class IRODSObjectStore(CachingConcreteObjectStore, CloudConfigMixin): """ Object store that stores files as data objects in an iRODS Zone. A local cache exists that is used as an intermediate location for files between Galaxy and iRODS. diff --git a/lib/galaxy/objectstore/pithos.py b/lib/galaxy/objectstore/pithos.py index 2ba74636ac2b..89cfb13dfe59 100644 --- a/lib/galaxy/objectstore/pithos.py +++ b/lib/galaxy/objectstore/pithos.py @@ -17,8 +17,7 @@ KamakiClient = None from galaxy.util import directory_hash_id -from . import ConcreteObjectStore -from .caching import UsesCache +from ._caching_base import CachingConcreteObjectStore NO_KAMAKI_ERROR_MESSAGE = ( "ObjectStore configured, but no kamaki.clients dependency available." @@ -69,7 +68,7 @@ def parse_config_xml(config_xml): log.error(msg) raise Exception(msg) r["extra_dirs"] = [{k: e.get(k) for k in attrs} for e in extra_dirs] - r["private"] = ConcreteObjectStore.parse_private_from_config_xml(config_xml) + r["private"] = CachingConcreteObjectStore.parse_private_from_config_xml(config_xml) if "job_work" not in (d["type"] for d in r["extra_dirs"]): msg = f'No value for {tag}:type="job_work" in XML tree' log.error(msg) @@ -80,7 +79,7 @@ def parse_config_xml(config_xml): return r -class PithosObjectStore(ConcreteObjectStore, UsesCache): +class PithosObjectStore(CachingConcreteObjectStore): """ Object store that stores objects as items in a Pithos+ container. Cache is ignored for the time being. diff --git a/lib/galaxy/objectstore/rucio.py b/lib/galaxy/objectstore/rucio.py index dc3435a84c52..2274946e375a 100644 --- a/lib/galaxy/objectstore/rucio.py +++ b/lib/galaxy/objectstore/rucio.py @@ -2,7 +2,6 @@ import logging import os import shutil -from typing import Optional try: import rucio.common @@ -32,12 +31,10 @@ umask_fix_perms, unlink, ) -from . import ConcreteObjectStore +from ._caching_base import CachingConcreteObjectStore from .caching import ( enable_cache_monitor, - InProcessCacheMonitor, parse_caching_config_dict_from_xml, - UsesCache, ) log = logging.getLogger(__name__) @@ -272,7 +269,7 @@ def delete(self, key, auth_token): return True -class RucioObjectStore(ConcreteObjectStore, UsesCache): +class RucioObjectStore(CachingConcreteObjectStore): """ Object store implementation that uses ORNL remote data broker. @@ -280,8 +277,6 @@ class RucioObjectStore(ConcreteObjectStore, UsesCache): Galaxy at some future point or significantly modified. """ - cache_monitor: Optional[InProcessCacheMonitor] = None - store_type = "rucio" def to_dict(self): diff --git a/lib/galaxy/objectstore/s3.py b/lib/galaxy/objectstore/s3.py index f9561707ff63..63884c1721d0 100644 --- a/lib/galaxy/objectstore/s3.py +++ b/lib/galaxy/objectstore/s3.py @@ -23,11 +23,10 @@ string_as_bool, which, ) -from . import ConcreteObjectStore +from ._caching_base import CachingConcreteObjectStore from .caching import ( enable_cache_monitor, parse_caching_config_dict_from_xml, - UsesCache, ) from .s3_multipart_upload import multipart_upload @@ -109,7 +108,7 @@ def parse_config_xml(config_xml): }, "cache": cache_dict, "extra_dirs": extra_dirs, - "private": ConcreteObjectStore.parse_private_from_config_xml(config_xml), + "private": CachingConcreteObjectStore.parse_private_from_config_xml(config_xml), } name = config_xml.attrib.get("name", None) if name is not None: @@ -150,14 +149,13 @@ def _config_to_dict(self): } -class S3ObjectStore(ConcreteObjectStore, CloudConfigMixin, UsesCache): +class S3ObjectStore(CachingConcreteObjectStore, CloudConfigMixin): """ Object store that stores objects as items in an AWS S3 bucket. A local cache exists that is used as an intermediate location for files between Galaxy and S3. """ - cache_monitor: Optional[InProcessCacheMonitor] = None store_type = "aws_s3" cloud = True diff --git a/lib/galaxy/objectstore/s3_boto3.py b/lib/galaxy/objectstore/s3_boto3.py index 8aef501aa88b..5af4c478e20b 100644 --- a/lib/galaxy/objectstore/s3_boto3.py +++ b/lib/galaxy/objectstore/s3_boto3.py @@ -5,8 +5,13 @@ import os from datetime import datetime from typing import ( - Optional, TYPE_CHECKING, + TypedDict, +) + +from typing_extensions import ( + Literal, + NotRequired, ) if TYPE_CHECKING: @@ -19,12 +24,10 @@ except ImportError: boto3 = None # type: ignore[assignment] -from . import ConcreteObjectStore +from ._caching_base import CachingConcreteObjectStore from .caching import ( enable_cache_monitor, - InProcessCacheMonitor, parse_caching_config_dict_from_xml, - UsesCache, ) NO_BOTO_ERROR_MESSAGE = ( @@ -91,7 +94,7 @@ def parse_config_xml(config_xml): }, "cache": cache_dict, "extra_dirs": extra_dirs, - "private": ConcreteObjectStore.parse_private_from_config_xml(config_xml), + "private": CachingConcreteObjectStore.parse_private_from_config_xml(config_xml), } name = config_xml.attrib.get("name", None) if name is not None: @@ -105,29 +108,15 @@ def parse_config_xml(config_xml): raise -class CloudConfigMixin: - def _config_to_dict(self): - return { - "auth": { - "access_key": self.access_key, - "secret_key": self.secret_key, - }, - "bucket": { - "name": self.bucket, - }, - "connection": { - "endpoint_url": self.endpoint_url, - "region": self.region, - }, - "cache": { - "size": self.cache_size, - "path": self.staging_path, - "cache_updated_data": self.cache_updated_data, - }, - } +class S3ClientConstructorKwds(TypedDict): + service_name: Literal["s3"] + endpoint_url: NotRequired[str] + region_name: NotRequired[str] + aws_access_key_id: NotRequired[str] + aws_secret_access_key: NotRequired[str] -class S3ObjectStore(ConcreteObjectStore, CloudConfigMixin, UsesCache): +class S3ObjectStore(CachingConcreteObjectStore): """ Object store that stores objects as items in an AWS S3 bucket. A local cache exists that is used as an intermediate location for files between @@ -135,7 +124,6 @@ class S3ObjectStore(ConcreteObjectStore, CloudConfigMixin, UsesCache): """ _client: "S3Client" - cache_monitor: Optional[InProcessCacheMonitor] = None store_type = "aws_s3_boto3" cloud = True @@ -194,7 +182,7 @@ def _configure_connection(self): def _init_client(self): # set _client based on current args. # If access_key is empty use default credential chain - kwds = { + kwds: S3ClientConstructorKwds = { "service_name": "s3", } if self.endpoint_url: @@ -226,6 +214,26 @@ def _create_bucket(self): def parse_xml(clazz, config_xml): return parse_config_xml(config_xml) + def _config_to_dict(self): + return { + "auth": { + "access_key": self.access_key, + "secret_key": self.secret_key, + }, + "bucket": { + "name": self.bucket, + }, + "connection": { + "endpoint_url": self.endpoint_url, + "region": self.region, + }, + "cache": { + "size": self.cache_size, + "path": self.staging_path, + "cache_updated_data": self.cache_updated_data, + }, + } + def to_dict(self): as_dict = super().to_dict() as_dict.update(self._config_to_dict())