From 89becf66e4c5c2c0081b6450ae3fad5690ccdc6f Mon Sep 17 00:00:00 2001 From: John Chilton Date: Sun, 19 May 2024 16:56:12 -0400 Subject: [PATCH] De-duplication of onedata objectstore code. Use caching abstractions and bring in line with other object stores. --- lib/galaxy/objectstore/_caching_base.py | 5 +- lib/galaxy/objectstore/onedata.py | 346 +++--------------------- 2 files changed, 37 insertions(+), 314 deletions(-) diff --git a/lib/galaxy/objectstore/_caching_base.py b/lib/galaxy/objectstore/_caching_base.py index b63593ec7c50..066fc9077e34 100644 --- a/lib/galaxy/objectstore/_caching_base.py +++ b/lib/galaxy/objectstore/_caching_base.py @@ -254,11 +254,14 @@ def _empty(self, obj, **kwargs): else: raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}") + def _get_size_in_cache(self, rel_path): + return os.path.getsize(self._get_cache_path(rel_path)) + def _size(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) if self._in_cache(rel_path): try: - return os.path.getsize(self._get_cache_path(rel_path)) + return self._get_size_in_cache(rel_path) except OSError as ex: log.info("Could not get size of file '%s' in local cache, will try Azure. Error: %s", rel_path, ex) elif self._exists_remotely(rel_path): diff --git a/lib/galaxy/objectstore/onedata.py b/lib/galaxy/objectstore/onedata.py index f21236cdc8be..8e497297aabc 100644 --- a/lib/galaxy/objectstore/onedata.py +++ b/lib/galaxy/objectstore/onedata.py @@ -4,9 +4,7 @@ import logging import os -import shutil from datetime import datetime -from typing import Optional try: from onedatafilerestclient import ( @@ -16,25 +14,14 @@ except ImportError: OnedataFileRESTClient = None -from galaxy.exceptions import ( - ObjectInvalid, - ObjectNotFound, -) -from galaxy.util import ( - directory_hash_id, - string_as_bool, - umask_fix_perms, - unlink, -) -from galaxy.util.path import safe_relpath -from . import ConcreteObjectStore +from galaxy.util import string_as_bool +from ._caching_base import CachingConcreteObjectStore from .caching import ( - CacheTarget, enable_cache_monitor, - InProcessCacheMonitor, parse_caching_config_dict_from_xml, ) + NO_ONEDATA_ERROR_MESSAGE = ( "ObjectStore configured to use Onedata, but no OnedataFileRESTClient dependency " "available. Please install and properly configure Onedata or modify Object " @@ -75,7 +62,7 @@ def _parse_config_xml(config_xml): "space": {"name": space_name, "galaxy_root_dir": galaxy_root_dir}, "cache": cache_dict, "extra_dirs": extra_dirs, - "private": ConcreteObjectStore.parse_private_from_config_xml(config_xml), + "private": CachingConcreteObjectStore.parse_private_from_config_xml(config_xml), } except Exception: # Toss it back up after logging, we can't continue loading at this point. @@ -94,14 +81,13 @@ def _get_config_xml_elements(config_xml, tag): return elements -class OnedataObjectStore(ConcreteObjectStore): +class OnedataObjectStore(CachingConcreteObjectStore): """ Object store that stores objects as items in an Onedata. A local cache exists that is used as an intermediate location for files between Galaxy and Onedata. """ - cache_monitor: Optional[InProcessCacheMonitor] = None store_type = "onedata" def __init__(self, config, config_dict): @@ -142,8 +128,8 @@ def _initialize(self): verify_ssl = not self.disable_tls_certificate_validation self._client = OnedataFileRESTClient(self.onezone_domain, self.access_token, verify_ssl=verify_ssl) - if self.enable_cache_monitor: - self.cache_monitor = InProcessCacheMonitor(self.cache_target, self.cache_monitor_interval) + self._ensure_staging_path_writable() + self._start_cache_monitor_if_needed() @classmethod def parse_xml(clazz, config_xml): @@ -170,72 +156,10 @@ def to_dict(self): ) return as_dict - @property - def cache_target(self) -> CacheTarget: - return CacheTarget(self.staging_path, self.cache_size, 0.9) - - def _fix_permissions(self, rel_path): - """Set permissions on rel_path""" - for basedir, _, files in os.walk(rel_path): - umask_fix_perms(basedir, self.config.umask, 0o777, self.config.gid) - for filename in files: - path = os.path.join(basedir, filename) - # Ignore symlinks - if os.path.islink(path): - continue - umask_fix_perms(path, self.config.umask, 0o666, self.config.gid) - - def _construct_path( - self, - obj, - base_dir=None, - dir_only=None, - extra_dir=None, - extra_dir_at_root=False, - alt_name=None, - obj_dir=False, - in_cache=False, - **kwargs, - ): - # extra_dir should never be constructed from provided data but just - # make sure there are no shenanigans afoot - if extra_dir and extra_dir != os.path.normpath(extra_dir): - log.warning("extra_dir is not normalized: %s", extra_dir) - raise ObjectInvalid("The requested object is invalid") - # ensure that any parent directory references in alt_name would not - # result in a path not contained in the directory path constructed here - if alt_name: - if not safe_relpath(alt_name): - log.warning("alt_name would locate path outside dir: %s", alt_name) - raise ObjectInvalid("The requested object is invalid") - # alt_name can contain parent directory references, but Onedata will not - # follow them, so if they are valid we normalize them out - alt_name = os.path.normpath(alt_name) - rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) - if extra_dir is not None: - if extra_dir_at_root: - rel_path = os.path.join(extra_dir, rel_path) - else: - rel_path = os.path.join(rel_path, extra_dir) - - # for JOB_WORK directory - if obj_dir: - rel_path = os.path.join(rel_path, str(self._get_object_id(obj))) - if base_dir: - base = self.extra_dirs.get(base_dir) - return os.path.join(str(base), rel_path) - - if not dir_only: - rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") - if in_cache: - return self._get_cache_path(rel_path) - - return rel_path - def _construct_onedata_path(self, rel_path): return os.path.join(self.galaxy_root_dir, rel_path) - def _get_size_in_onedata(self, rel_path): + def _get_remote_size(self, rel_path): try: onedata_path = self._construct_onedata_path(rel_path) return self._client.get_attributes(self.space_name, file_path=onedata_path)["size"] @@ -243,7 +167,7 @@ def _get_size_in_onedata(self, rel_path): log.exception("Could not get '%s' size from Onedata", rel_path) return -1 - def _exists_in_onedata(self, rel_path): + def _exists_remotely(self, rel_path): try: onedata_path = self._construct_onedata_path(rel_path) self._client.get_attributes(self.space_name, file_path=onedata_path) @@ -257,27 +181,6 @@ def _exists_in_onedata(self, rel_path): log.exception("Trouble checking '%s' existence in Onedata", rel_path) return False - def _in_cache(self, rel_path): - """Check if the given dataset is in the local cache and return True if so.""" - cache_path = self._get_cache_path(rel_path) - return os.path.exists(cache_path) - - def _get_cache_path(self, rel_path): - return os.path.abspath(os.path.join(self.staging_path, rel_path)) - - def _get_size_in_cache(self, rel_path): - return os.path.getsize(self._get_cache_path(rel_path)) - - def _pull_into_cache(self, rel_path): - # Ensure the cache directory structure exists (e.g., dataset_#_files/) - rel_path_dir = os.path.dirname(rel_path) - if not os.path.exists(self._get_cache_path(rel_path_dir)): - os.makedirs(self._get_cache_path(rel_path_dir), exist_ok=True) - # Now pull in the file - file_ok = self._download(rel_path) - self._fix_permissions(self._get_cache_path(rel_path_dir)) - return file_ok - def _download(self, rel_path): try: dst_path = self._get_cache_path(rel_path) @@ -288,13 +191,7 @@ def _download(self, rel_path): file_size = self._client.get_attributes(self.space_name, file_path=onedata_path)["size"] # Test if cache is large enough to hold the new file - if not self.cache_target.fits_in_cache(file_size): - log.critical( - "File %s is larger (%s) than the configured cache allows (%s). Cannot download.", - rel_path, - file_size, - self.cache_target.log_description, - ) + if not self._caching_allowed(rel_path, file_size): return False with open(dst_path, "wb") as dst: @@ -308,7 +205,7 @@ def _download(self, rel_path): log.exception("Problem downloading file '%s'", rel_path) return False - def _push_to_os(self, rel_path, source_file=None): + def _push_to_storage(self, rel_path, source_file=None, from_string=None): """ Push the file pointed to by ``rel_path`` to the object store under ``rel_path``. If ``source_file`` is provided, push that file instead while still using @@ -317,7 +214,7 @@ def _push_to_os(self, rel_path, source_file=None): try: source_file = source_file if source_file else self._get_cache_path(rel_path) if os.path.exists(source_file): - if os.path.getsize(source_file) == 0 and self._exists_in_onedata(rel_path): + if os.path.getsize(source_file) == 0 and self._exists_remotely(rel_path): log.debug( "Wanted to push file '%s' to Onedata '%s' but its size is 0; skipping.", source_file, rel_path ) @@ -331,18 +228,24 @@ def _push_to_os(self, rel_path, source_file=None): rel_path, ) + if not self._exists_remotely(rel_path): + self._client.create_file(self.space_name, onedata_path, "REG", create_parents=True) + onedata_path = self._construct_onedata_path(rel_path) file_id = self._client.get_file_id(self.space_name, onedata_path) - with open(source_file, "rb") as src: - offset = 0 - while True: - chunk = src.read(STREAM_CHUNK_SIZE) - if not chunk: - break + if source_file: + with open(source_file, "rb") as src: + offset = 0 + while True: + chunk = src.read(STREAM_CHUNK_SIZE) + if not chunk: + break - self._client.put_file_content(self.space_name, file_id, offset, chunk) - offset += len(chunk) + self._client.put_file_content(self.space_name, file_id, offset, chunk) + offset += len(chunk) + else: + self._client.put_file_content(self.space_name, file_id, 0, from_string.encode("utf-8")) end_time = datetime.now() log.debug( @@ -360,203 +263,20 @@ def _push_to_os(self, rel_path, source_file=None): raise return False - def file_ready(self, obj, **kwargs): - """ - A helper method that checks if a file corresponding to a dataset is - ready and available to be used. Return ``True`` if so, ``False`` otherwise. - """ - rel_path = self._construct_path(obj, **kwargs) - # Make sure the size in cache is available in its entirety - if self._in_cache(rel_path): - if self._get_size_in_cache(rel_path) == self._get_size_in_onedata(rel_path): - return True - log.debug( - "Waiting for dataset %s to transfer from OS: %s/%s", - rel_path, - self._get_size_in_cache(rel_path), - self._get_size_in_onedata(rel_path), - ) - return False - - def _exists(self, obj, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - in_cache = self._in_cache(rel_path) - in_onedata = self._exists_in_onedata(rel_path) - - # dir_only does not get synced so shortcut the decision - dir_only = kwargs.get("dir_only", False) - base_dir = kwargs.get("base_dir", None) - if dir_only: - if in_cache or in_onedata: - return True - # for JOB_WORK directory - elif base_dir: - if not os.path.exists(rel_path): - os.makedirs(rel_path, exist_ok=True) - return True - else: - return False - - if in_cache and not in_onedata: - self._push_to_os(rel_path) - return True - elif in_onedata: - return True - else: - return False - - def _create(self, obj, **kwargs): - if not self._exists(obj, **kwargs): - # Pull out locally used fields - extra_dir = kwargs.get("extra_dir", None) - extra_dir_at_root = kwargs.get("extra_dir_at_root", False) - dir_only = kwargs.get("dir_only", False) - alt_name = kwargs.get("alt_name", None) - - # Construct hashed path - rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) - - # Optionally append extra_dir - if extra_dir is not None: - if extra_dir_at_root: - rel_path = os.path.join(extra_dir, rel_path) - else: - rel_path = os.path.join(rel_path, extra_dir) - - # Create given directory in cache - cache_dir = os.path.join(self.staging_path, rel_path) - if not os.path.exists(cache_dir): - os.makedirs(cache_dir, exist_ok=True) - - if not dir_only: - rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") - # need this line to set the dataset filename, not sure how this is done - filesystem is monitored? - open(os.path.join(self.staging_path, rel_path), "w").close() - onedata_path = self._construct_onedata_path(rel_path) - self._client.create_file(self.space_name, onedata_path, "REG", create_parents=True) - return self - - def _empty(self, obj, **kwargs): - if self._exists(obj, **kwargs): - return bool(self._size(obj, **kwargs) > 0) - else: - raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}") - - def _size(self, obj, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - if self._in_cache(rel_path): - try: - return self._get_size_in_cache(rel_path) - except OSError as ex: - log.info("Could not get size of file '%s' in local cache, will try Onedata. Error: %s", rel_path, ex) - elif self._exists(obj, **kwargs): - return self._get_size_in_onedata(rel_path) - log.warning("Did not find dataset '%s', returning 0 for size", rel_path) - return 0 - - def _delete(self, obj, entire_dir=False, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - extra_dir = kwargs.get("extra_dir", None) - base_dir = kwargs.get("base_dir", None) - dir_only = kwargs.get("dir_only", False) - obj_dir = kwargs.get("obj_dir", False) - + def _delete_existing_remote(self, rel_path) -> bool: try: - # Remove temporary data in JOB_WORK directory - if base_dir and dir_only and obj_dir: - shutil.rmtree(os.path.abspath(rel_path)) - return True - - # Delete from cache first - if entire_dir and extra_dir: - shutil.rmtree(self._get_cache_path(rel_path), ignore_errors=True) - else: - unlink(self._get_cache_path(rel_path), ignore_errors=True) - - # Delete from Onedata as well - if self._exists_in_onedata(rel_path): - onedata_path = self._construct_onedata_path(rel_path) - self._client.remove(self.space_name, onedata_path) - return True + onedata_path = self._construct_onedata_path(rel_path) + self._client.remove(self.space_name, onedata_path) + return True except OnedataRESTError: log.exception("Could not delete '%s' from Onedata", rel_path) - except OSError: - log.exception("%s delete error", self._get_filename(obj, **kwargs)) - return False - - def _get_data(self, obj, start=0, count=-1, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - # Check cache first and get file if not there - if not self._in_cache(rel_path) or self._get_size_in_cache(rel_path) == 0: - self._pull_into_cache(rel_path) - # Read the file content from cache - with open(self._get_cache_path(rel_path)) as data_file: - data_file.seek(start) - content = data_file.read(count) - return content - - def _get_filename(self, obj, **kwargs): - base_dir = kwargs.get("base_dir", None) - dir_only = kwargs.get("dir_only", False) - obj_dir = kwargs.get("obj_dir", False) - sync_cache = kwargs.get("sync_cache", True) - - rel_path = self._construct_path(obj, **kwargs) - - # for JOB_WORK directory - if base_dir and dir_only and obj_dir: - return os.path.abspath(rel_path) - - cache_path = self._get_cache_path(rel_path) - if not sync_cache: - return cache_path - - # return path if we do not need to update cache - if self._in_cache(rel_path) and (dir_only or self._get_size_in_cache(rel_path) > 0): - return cache_path - # something is already in cache - elif self._exists(obj, **kwargs): - if dir_only: # Directories do not get pulled into cache - return cache_path - else: - if self._pull_into_cache(rel_path): - return cache_path - - raise ObjectNotFound(f"objectstore.get_filename, no cache_path: {obj}, kwargs: {kwargs}") - - def _update_from_file(self, obj, file_name=None, create=False, **kwargs): - if create: - self._create(obj, **kwargs) - if self._exists(obj, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - # Chose whether to use the dataset file itself or an alternate file - if file_name: - source_file = os.path.abspath(file_name) - # Copy into cache - cache_file = self._get_cache_path(rel_path) - try: - if source_file != cache_file and self.cache_updated_data: - # FIXME? Should this be a `move`? - try: - shutil.copy2(source_file, cache_file) - except OSError: - os.makedirs(os.path.dirname(cache_file)) - shutil.copy2(source_file, cache_file) - self._fix_permissions(cache_file) - except OSError: - log.exception("Trouble copying source file '%s' to cache '%s'", source_file, cache_file) - else: - source_file = self._get_cache_path(rel_path) - # Update the file on Onedata - self._push_to_os(rel_path, source_file) - else: - raise ObjectNotFound(f"objectstore.update_from_file, object does not exist: {obj}, kwargs: {kwargs}") + return False def _get_object_url(self, obj, **kwargs): return None - def _get_store_usage_percent(self, **kwargs): + def _get_store_usage_percent(self, obj): return 0.0 def shutdown(self): - self.cache_monitor and self.cache_monitor.shutdown() + self._shutdown_cache_monitor()