diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index 2c9efa7eab45..bc25967a1475 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -6,7 +6,7 @@ sentry-sdk[fastapi] pbs_python drmaa statsd -azure-storage==0.32.0 +azure-storage-blob==12.19.1 python-irodsclient==2.0.0 python-ldap==3.4.0 ldap3==2.9.1 @@ -26,6 +26,7 @@ fs-gcsfs # type: googlecloudstorage google-cloud-storage>=2.8.0 # type: googlecloudstorage fs.onedatarestfs # type: onedata fs-basespace # type: basespace +fs-azureblob # type: azure # Vault backend hvac diff --git a/lib/galaxy/files/sources/azure.py b/lib/galaxy/files/sources/azure.py new file mode 100644 index 000000000000..519c3354ed5f --- /dev/null +++ b/lib/galaxy/files/sources/azure.py @@ -0,0 +1,43 @@ +from typing import Union + +try: + from fs.azblob import ( + BlobFS, + BlobFSV2, + ) +except ImportError: + BlobFS = None + +from typing import Optional + +from . import ( + FilesSourceOptions, + FilesSourceProperties, +) +from ._pyfilesystem2 import PyFilesystem2FilesSource + + +class AzureFileSource(PyFilesystem2FilesSource): + plugin_type = "azure" + required_module = BlobFS + required_package = "fs-azureblob" + + def _open_fs(self, user_context=None, opts: Optional[FilesSourceOptions] = None): + props = self._serialization_props(user_context) + extra_props: Union[FilesSourceProperties, dict] = opts.extra_props or {} if opts else {} + all_props = {**props, **extra_props} + namespace_type = all_props.get("namespace_type", "hierarchical") + if namespace_type not in ["hierarchical", "flat"]: + raise Exception("Misconfigured azure file source") + account_name = all_props["account_name"] + account_key = all_props["account_key"] + container = all_props["container_name"] + if namespace_type == "flat": + handle = BlobFS(account_name, container, account_key) + else: + handle = BlobFSV2(account_name, container, account_key) + + return handle + + +__all__ = ("AzureFileSource",) diff --git a/lib/galaxy/objectstore/azure_blob.py b/lib/galaxy/objectstore/azure_blob.py index 83c1d700e195..7c9002a9e6ee 100644 --- a/lib/galaxy/objectstore/azure_blob.py +++ b/lib/galaxy/objectstore/azure_blob.py @@ -5,16 +5,21 @@ import logging import os import shutil -from datetime import datetime +from datetime import ( + datetime, + timedelta, +) from typing import Optional try: from azure.common import AzureHttpError - from azure.storage import CloudStorageAccount - from azure.storage.blob import BlockBlobService - from azure.storage.blob.models import Blob + from azure.storage.blob import ( + BlobSasPermissions, + BlobServiceClient, + generate_blob_sas, + ) except ImportError: - BlockBlobService = None + BlobServiceClient = None from galaxy.exceptions import ( ObjectInvalid, @@ -48,6 +53,7 @@ def parse_config_xml(config_xml): account_name = auth_xml.get("account_name") account_key = auth_xml.get("account_key") + account_url = auth_xml.get("account_url") container_xml = config_xml.find("container") container_name = container_xml.get("name") @@ -62,11 +68,15 @@ def parse_config_xml(config_xml): log.error(msg) raise Exception(msg) extra_dirs = [{k: e.get(k) for k in attrs} for e in extra_dirs] + auth = { + "account_name": account_name, + "account_key": account_key, + } + if account_url: + auth["account_url"] = account_url + return { - "auth": { - "account_name": account_name, - "account_key": account_key, - }, + "auth": auth, "container": { "name": container_name, "max_chunk_size": max_chunk_size, @@ -94,14 +104,13 @@ class AzureBlobObjectStore(ConcreteObjectStore): def __init__(self, config, config_dict): super().__init__(config, config_dict) - self.transfer_progress = 0 - auth_dict = config_dict["auth"] container_dict = config_dict["container"] cache_dict = config_dict.get("cache") or {} self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict) self.account_name = auth_dict.get("account_name") + self.account_url = auth_dict.get("account_url") self.account_key = auth_dict.get("account_key") self.container_name = container_dict.get("name") @@ -114,7 +123,7 @@ def __init__(self, config, config_dict): self._initialize() def _initialize(self): - if BlockBlobService is None: + if BlobServiceClient is None: raise Exception(NO_BLOBSERVICE_ERROR_MESSAGE) self._configure_connection() @@ -124,12 +133,15 @@ def _initialize(self): def to_dict(self): as_dict = super().to_dict() + auth = { + "account_name": self.account_name, + "account_key": self.account_key, + } + if self.account_url: + auth["account_url"] = self.account_url as_dict.update( { - "auth": { - "account_name": self.account_name, - "account_key": self.account_key, - }, + "auth": auth, "container": { "name": self.container_name, "max_chunk_size": self.max_chunk_size, @@ -154,8 +166,18 @@ def parse_xml(clazz, config_xml): def _configure_connection(self): log.debug("Configuring Connection") - self.account = CloudStorageAccount(self.account_name, self.account_key) - self.service = self.account.create_block_blob_service() + if self.account_url: + # https://pypi.org/project/azure-storage-blob/ + service = BlobServiceClient( + account_url=self.account_url, + credential={"account_name": self.account_name, "account_key": self.account_key}, + ) + else: + service = BlobServiceClient( + account_url=f"https://{self.account_name}.blob.core.windows.net", + credential=self.account_key, + ) + self.service = service def _construct_path( self, @@ -224,32 +246,26 @@ def _fix_permissions(self, rel_path): def _get_cache_path(self, rel_path): return os.path.abspath(os.path.join(self.staging_path, rel_path)) - def _get_transfer_progress(self): - return self.transfer_progress - def _get_size_in_azure(self, rel_path): try: - properties = self.service.get_blob_properties(self.container_name, rel_path) - # Currently this returns a blob and not a BlobProperties object - # Similar issue for the ruby https://github.com/Azure/azure-storage-ruby/issues/13 - # The typecheck is an attempt at future-proofing this when/if the bug is fixed. - if type(properties) is Blob: - properties = properties.properties - if properties: - size_in_bytes = properties.content_length - return size_in_bytes + properties = self._blob_client(rel_path).get_blob_properties() + size_in_bytes = properties.size + return size_in_bytes except AzureHttpError: log.exception("Could not get size of blob '%s' from Azure", rel_path) return -1 def _in_azure(self, rel_path): try: - exists = self.service.exists(self.container_name, rel_path) + exists = self._blob_client(rel_path).exists() except AzureHttpError: log.exception("Trouble checking existence of Azure blob '%s'", rel_path) return False return exists + def _blob_client(self, rel_path: str): + return self.service.get_blob_client(self.container_name, rel_path) + def _in_cache(self, rel_path): """Check if the given dataset is in the local cache.""" cache_path = self._get_cache_path(rel_path) @@ -265,9 +281,6 @@ def _pull_into_cache(self, rel_path): self._fix_permissions(self._get_cache_path(rel_path_dir)) return file_ok - def _transfer_cb(self, complete, total): - self.transfer_progress = float(complete) / float(total) * 100 # in percent - def _download(self, rel_path): local_destination = self._get_cache_path(rel_path) try: @@ -281,10 +294,8 @@ def _download(self, rel_path): ) return False else: - self.transfer_progress = 0 # Reset transfer progress counter - self.service.get_blob_to_path( - self.container_name, rel_path, local_destination, progress_callback=self._transfer_cb - ) + with open(local_destination, "wb") as f: + self._blob_client(rel_path).download_blob().download_to_stream(f) return True except AzureHttpError: log.exception("Problem downloading '%s' from Azure", rel_path) @@ -301,7 +312,7 @@ def _push_to_os(self, rel_path, source_file=None, from_string=None): try: source_file = source_file or self._get_cache_path(rel_path) - if not os.path.exists(source_file): + if from_string is None and not os.path.exists(source_file): log.error( "Tried updating blob '%s' from source file '%s', but source file does not exist.", rel_path, @@ -309,16 +320,14 @@ def _push_to_os(self, rel_path, source_file=None, from_string=None): ) return False - if os.path.getsize(source_file) == 0: + if from_string is None and os.path.getsize(source_file) == 0: log.debug( "Wanted to push file '%s' to azure blob '%s' but its size is 0; skipping.", source_file, rel_path ) return True - if from_string: - self.service.create_blob_from_text( - self.container_name, rel_path, from_string, progress_callback=self._transfer_cb - ) + if from_string is not None: + self._blob_client(rel_path).upload_blob(from_string, overwrite=True) log.debug("Pushed data from string '%s' to blob '%s'", from_string, rel_path) else: start_time = datetime.now() @@ -328,13 +337,11 @@ def _push_to_os(self, rel_path, source_file=None, from_string=None): os.path.getsize(source_file), rel_path, ) - self.transfer_progress = 0 # Reset transfer progress counter - self.service.create_blob_from_path( - self.container_name, rel_path, source_file, progress_callback=self._transfer_cb - ) + with open(source_file, "rb") as f: + self._blob_client(rel_path).upload_blob(f, overwrite=True) end_time = datetime.now() log.debug( - "Pushed cache file '%s' to blob '%s' (%s bytes transfered in %s sec)", + "Pushed cache file '%s' to blob '%s' (%s bytes transferred in %s sec)", source_file, rel_path, os.path.getsize(source_file), @@ -353,6 +360,14 @@ def _push_to_os(self, rel_path, source_file=None, from_string=None): def _exists(self, obj, **kwargs): in_cache = in_azure = False rel_path = self._construct_path(obj, **kwargs) + dir_only = kwargs.get("dir_only", False) + base_dir = kwargs.get("base_dir", None) + + # check job work directory stuff early to skip API hits. + if dir_only and base_dir: + if not os.path.exists(rel_path): + os.makedirs(rel_path, exist_ok=True) + return True in_cache = self._in_cache(rel_path) in_azure = self._in_azure(rel_path) @@ -363,11 +378,6 @@ def _exists(self, obj, **kwargs): if dir_only: if in_cache or in_azure: return True - # for JOB_WORK directory - elif base_dir: - if not os.path.exists(rel_path): - os.makedirs(rel_path, exist_ok=True) - return True else: return False @@ -430,10 +440,13 @@ def _create(self, obj, **kwargs): rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") open(os.path.join(self.staging_path, rel_path), "w").close() self._push_to_os(rel_path, from_string="") + return self def _empty(self, obj, **kwargs): if self._exists(obj, **kwargs): - return bool(self._size(obj, **kwargs) > 0) + size = self._size(obj, **kwargs) + is_empty = bool(size == 0) + return is_empty else: raise ObjectNotFound(f"objectstore.empty, object does not exist: {str(obj)}, kwargs: {str(kwargs)}") @@ -467,10 +480,10 @@ def _delete(self, obj, entire_dir=False, **kwargs): # but requires iterating through each individual blob in Azure and deleing it. if entire_dir and extra_dir: shutil.rmtree(self._get_cache_path(rel_path), ignore_errors=True) - blobs = self.service.list_blobs(self.container_name, prefix=rel_path) + blobs = self.service.get_container_client(self.container_name).list_blobs() for blob in blobs: log.debug("Deleting from Azure: %s", blob) - self.service.delete_blob(self.container_name, blob.name) + self._blob_client(blob.name).delete_blob() return True else: # Delete from cache first @@ -478,7 +491,7 @@ def _delete(self, obj, entire_dir=False, **kwargs): # Delete from S3 as well if self._in_azure(rel_path): log.debug("Deleting from Azure: %s", rel_path) - self.service.delete_blob(self.container_name, rel_path) + self._blob_client(rel_path).delete_blob() return True except AzureHttpError: log.exception("Could not delete blob '%s' from Azure", rel_path) @@ -512,14 +525,6 @@ def _get_filename(self, obj, **kwargs): cache_path = self._get_cache_path(rel_path) if not sync_cache: return cache_path - # S3 does not recognize directories as files so cannot check if those exist. - # So, if checking dir only, ensure given dir exists in cache and return - # the expected cache path. - # dir_only = kwargs.get('dir_only', False) - # if dir_only: - # if not os.path.exists(cache_path): - # os.makedirs(cache_path) - # return cache_path # Check if the file exists in the cache first, always pull if file size in cache is zero if self._in_cache(rel_path) and (dir_only or os.path.getsize(self._get_cache_path(rel_path)) > 0): return cache_path @@ -539,7 +544,8 @@ def _get_filename(self, obj, **kwargs): def _update_from_file(self, obj, file_name=None, create=False, **kwargs): if create is True: self._create(obj, **kwargs) - elif self._exists(obj, **kwargs): + + if self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) # Chose whether to use the dataset file itself or an alternate file if file_name: @@ -567,13 +573,24 @@ def _get_object_url(self, obj, **kwargs): if self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) try: - url = self.service.make_blob_url(container_name=self.container_name, blob_name=rel_path) - return url + url = self._blob_client(rel_path).url + # https://learn.microsoft.com/en-us/azure/storage/blobs/sas-service-create-python + token = generate_blob_sas( + account_name=self.account_name, + account_key=self.account_key, + container_name=self.container_name, + blob_name=rel_path, + permission=BlobSasPermissions(read=True), + expiry=datetime.utcnow() + timedelta(hours=1), + ) + return f"{url}?{token}" except AzureHttpError: log.exception("Trouble generating URL for dataset '%s'", rel_path) return None - def _get_store_usage_percent(self): + def _get_store_usage_percent(self, obj): + # Percent used for Azure blob containers is effectively zero realistically. + # https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets return 0.0 @property diff --git a/lib/galaxy/objectstore/caching.py b/lib/galaxy/objectstore/caching.py index f923a33d3fb9..502d55b5a75c 100644 --- a/lib/galaxy/objectstore/caching.py +++ b/lib/galaxy/objectstore/caching.py @@ -5,6 +5,7 @@ import os import threading import time +from math import inf from typing import ( List, Optional, @@ -76,6 +77,11 @@ def check_cache(cache_target: CacheTarget): _clean_cache(file_list, delete_this_much) +def reset_cache(cache_target: CacheTarget): + _, file_list = _get_cache_size_files(cache_target.path) + _clean_cache(file_list, inf) + + def _clean_cache(file_list: FileListT, delete_this_much: float) -> None: """Keep deleting files from the file_list until the size of the deleted files is greater than the value in delete_this_much parameter. diff --git a/lib/galaxy/objectstore/s3.py b/lib/galaxy/objectstore/s3.py index 7e17c34d844e..9635d4386de2 100644 --- a/lib/galaxy/objectstore/s3.py +++ b/lib/galaxy/objectstore/s3.py @@ -513,6 +513,14 @@ def file_ready(self, obj, **kwargs): def _exists(self, obj, **kwargs): in_cache = in_s3 = False rel_path = self._construct_path(obj, **kwargs) + dir_only = kwargs.get("dir_only", False) + base_dir = kwargs.get("base_dir", None) + + # check job work directory stuff early to skip API hits. + if dir_only and base_dir: + if not os.path.exists(rel_path): + os.makedirs(rel_path, exist_ok=True) + return True # Check cache if self._in_cache(rel_path): @@ -521,16 +529,9 @@ def _exists(self, obj, **kwargs): in_s3 = self._key_exists(rel_path) # log.debug("~~~~~~ File '%s' exists in cache: %s; in s3: %s" % (rel_path, in_cache, in_s3)) # dir_only does not get synced so shortcut the decision - dir_only = kwargs.get("dir_only", False) - base_dir = kwargs.get("base_dir", None) if dir_only: if in_cache or in_s3: return True - # for JOB_WORK directory - elif base_dir: - if not os.path.exists(rel_path): - os.makedirs(rel_path, exist_ok=True) - return True else: return False @@ -721,7 +722,7 @@ def _get_object_url(self, obj, **kwargs): log.exception("Trouble generating URL for dataset '%s'", rel_path) return None - def _get_store_usage_percent(self): + def _get_store_usage_percent(self, obj): return 0.0 def shutdown(self): diff --git a/lib/galaxy/objectstore/unittest_utils/__init__.py b/lib/galaxy/objectstore/unittest_utils/__init__.py index f96b12d91a0c..8807159b2437 100644 --- a/lib/galaxy/objectstore/unittest_utils/__init__.py +++ b/lib/galaxy/objectstore/unittest_utils/__init__.py @@ -32,8 +32,11 @@ class Config: - def __init__(self, config_str=DISK_TEST_CONFIG, clazz=None, store_by="id"): + def __init__(self, config_str=DISK_TEST_CONFIG, clazz=None, store_by="id", template_vars=None): self.temp_directory = mkdtemp() + template_vars = template_vars or {} + template_vars["temp_directory"] = self.temp_directory + self.template_vars = template_vars if config_str.startswith("<"): config_file = "store.xml" else: @@ -60,7 +63,7 @@ def write(self, contents, name): if not os.path.exists(directory): os.makedirs(directory, exist_ok=True) contents_template = Template(contents) - expanded_contents = contents_template.safe_substitute(temp_directory=self.temp_directory) + expanded_contents = contents_template.safe_substitute(**self.template_vars) open(path, "w").write(expanded_contents) return path diff --git a/lib/galaxy/util/unittest_utils/__init__.py b/lib/galaxy/util/unittest_utils/__init__.py index 1a3d229eae21..c6832eae4546 100644 --- a/lib/galaxy/util/unittest_utils/__init__.py +++ b/lib/galaxy/util/unittest_utils/__init__.py @@ -1,3 +1,4 @@ +import os from functools import wraps from typing import ( Any, @@ -43,3 +44,10 @@ def skip_unless_executable(executable): if which(executable): return _identity return pytest.mark.skip(f"PATH doesn't contain executable {executable}") + + +def skip_unless_environ(env_var): + if os.environ.get(env_var): + return _identity + + return pytest.mark.skip(f"{env_var} must be set for this test") diff --git a/lib/galaxy_test/driver/integration_util.py b/lib/galaxy_test/driver/integration_util.py index 65e561187c0e..4a2d574f3430 100644 --- a/lib/galaxy_test/driver/integration_util.py +++ b/lib/galaxy_test/driver/integration_util.py @@ -253,3 +253,28 @@ class ConfiguresDatabaseVault: @classmethod def _configure_database_vault(cls, config): config["vault_config_file"] = VAULT_CONF + + +class ConfiguresWorkflowScheduling: + _test_driver: GalaxyTestDriver + + @classmethod + def _configure_workflow_schedulers(cls, schedulers_conf: str, config): + temp_directory = cls._test_driver.mkdtemp() + template_config_path = os.path.join(temp_directory, "workflow_schedulers.xml") + with open(template_config_path, "w") as f: + f.write(schedulers_conf) + + config["workflow_schedulers_config_file"] = template_config_path + + @classmethod + def _disable_workflow_scheduling(cls, config): + noop_schedulers_conf = """ + + + + + + +""" + cls._configure_workflow_schedulers(noop_schedulers_conf, config) diff --git a/test/integration/objectstore/_base.py b/test/integration/objectstore/_base.py index 243446c87c38..42107d5fd353 100644 --- a/test/integration/objectstore/_base.py +++ b/test/integration/objectstore/_base.py @@ -5,6 +5,7 @@ from galaxy_test.base.populators import DatasetPopulator from galaxy_test.driver import integration_util +from galaxy.util.unittest_utils import skip_unless_environ OBJECT_STORE_HOST = os.environ.get("GALAXY_INTEGRATION_OBJECT_STORE_HOST", "127.0.0.1") OBJECT_STORE_PORT = int(os.environ.get("GALAXY_INTEGRATION_OBJECT_STORE_PORT", 9000)) @@ -44,6 +45,46 @@ """ ) +AZURE_OBJECT_STORE_CONFIG = string.Template( + """ +type: distributed +backends: +- type: azure_blob + id: azure1 + name: Azure Store 1 + allow_selection: true + weight: 1 + auth: + account_name: ${account_name} + account_key: ${account_key} + + container: + name: ${container_name} + + extra_dirs: + - type: job_work + path: "${temp_directory}/database/job_working_directory_azure_1" + - type: temp + path: "${temp_directory}/database/tmp_azure_1" +- type: azure_blob + id: azure2 + name: Azure Store 2 + allow_selection: true + weight: 1 + auth: + account_name: ${account_name} + account_key: ${account_key} + + container: + name: ${container_name} + + extra_dirs: + - type: job_work + path: "${temp_directory}/database/job_working_directory_azure_2" + - type: temp + path: "${temp_directory}/database/tmp_azure_2" +""" +) def start_minio(container_name): @@ -175,6 +216,50 @@ def updateCacheData(cls): return True +@skip_unless_environ("GALAXY_TEST_AZURE_CONTAINER_NAME") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_KEY") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_NAME") +class BaseAzureObjectStoreIntegrationTestCase( + BaseObjectStoreIntegrationTestCase, integration_util.ConfiguresWorkflowScheduling +): + object_store_cache_path: str + + @classmethod + def handle_galaxy_config_kwds(cls, config): + super().handle_galaxy_config_kwds(config) + # disabling workflow scheduling to limit database locking when + # testing without postgres. + cls._disable_workflow_scheduling(config) + temp_directory = cls._test_driver.mkdtemp() + cls.object_stores_parent = temp_directory + cls.object_store_cache_path = f"{temp_directory}/object_store_cache" + config_path = os.path.join(temp_directory, "object_store_conf.yml") + config["object_store_store_by"] = "uuid" + config["metadata_strategy"] = "extended" + config["outputs_to_working_directory"] = True + config["retry_metadata_internally"] = False + with open(config_path, "w") as f: + f.write( + AZURE_OBJECT_STORE_CONFIG.safe_substitute( + { + "temp_directory": temp_directory, + "account_name": os.environ["GALAXY_TEST_AZURE_ACCOUNT_NAME"], + "account_key": os.environ["GALAXY_TEST_AZURE_ACCOUNT_KEY"], + "container_name": os.environ["GALAXY_TEST_AZURE_CONTAINER_NAME"], + } + ) + ) + config["object_store_config_file"] = config_path + + def setUp(self): + super().setUp() + self.dataset_populator = DatasetPopulator(self.galaxy_interactor) + + @classmethod + def updateCacheData(cls): + return True + + @integration_util.skip_unless_docker() class BaseRucioObjectStoreIntegrationTestCase(BaseObjectStoreIntegrationTestCase): object_store_cache_path: str diff --git a/test/integration/objectstore/test_azure.py b/test/integration/objectstore/test_azure.py new file mode 100644 index 000000000000..904380ea3a89 --- /dev/null +++ b/test/integration/objectstore/test_azure.py @@ -0,0 +1,35 @@ +from galaxy_test.driver import integration_util +from ._base import BaseAzureObjectStoreIntegrationTestCase + +TEST_TOOL_IDS = [ + "multi_output", + "multi_output_configured", + "multi_output_assign_primary", + "multi_output_recurse", + "tool_provided_metadata_1", + "tool_provided_metadata_2", + "tool_provided_metadata_3", + "tool_provided_metadata_4", + "tool_provided_metadata_5", + "tool_provided_metadata_6", + "tool_provided_metadata_7", + "tool_provided_metadata_8", + "tool_provided_metadata_9", + "tool_provided_metadata_10", + "tool_provided_metadata_11", + "tool_provided_metadata_12", + "composite_output", + "composite_output_tests", + "metadata", + "metadata_bam", + "output_format", + "output_auto_format", +] + + +class TestAzureObjectStoreIntegration(BaseAzureObjectStoreIntegrationTestCase): + pass + + +instance = integration_util.integration_module_instance(TestAzureObjectStoreIntegration) +test_tools = integration_util.integration_tool_runner(TEST_TOOL_IDS) diff --git a/test/unit/files/_util.py b/test/unit/files/_util.py index 9b23a7005f48..17e87d5173f3 100644 --- a/test/unit/files/_util.py +++ b/test/unit/files/_util.py @@ -157,7 +157,11 @@ def write_from( def configured_file_sources(conf_file): file_sources_config = FileSourcePluginsConfig() - return ConfiguredFileSources(file_sources_config, conf_file=conf_file) + if isinstance(conf_file, str): + conf = ConfiguredFileSources(file_sources_config, conf_file=conf_file) + else: + conf = ConfiguredFileSources(file_sources_config, conf_dict=conf_file) + return conf def assert_simple_file_realize(conf_file, recursive=False, filename="a", contents="a\n", contains=False): diff --git a/test/unit/files/test_azure.py b/test/unit/files/test_azure.py new file mode 100644 index 000000000000..3e69636df087 --- /dev/null +++ b/test/unit/files/test_azure.py @@ -0,0 +1,41 @@ +import os + +import pytest + +from galaxy.util.unittest_utils import skip_unless_environ +from ._util import ( + assert_realizes_contains, + configured_file_sources, + write_from, +) + +pytest.importorskip("fs.azblob") + + +@skip_unless_environ("GALAXY_TEST_AZURE_CONTAINER_NAME") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_KEY") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_NAME") +def test_azure(): + conf = { + "type": "azure", + "id": "azure_test", + "doc": "Test an Azure Blob Store thing.", + "container_name": os.environ["GALAXY_TEST_AZURE_CONTAINER_NAME"], + "account_key": os.environ["GALAXY_TEST_AZURE_ACCOUNT_KEY"], + "account_name": os.environ["GALAXY_TEST_AZURE_ACCOUNT_NAME"], + "namespace_type": os.environ.get("GALAXY_TEST_AZURE_NAMESPACE_TYPE", "flat"), + "writable": True, + } + file_sources = configured_file_sources([conf]) + test_uri = "gxfiles://azure_test/moo" + test_contents = "Hello World from Files Testing!" + write_from( + file_sources, + test_uri, + test_contents, + ) + assert_realizes_contains( + file_sources, + test_uri, + test_contents, + ) diff --git a/test/unit/objectstore/test_objectstore.py b/test/unit/objectstore/test_objectstore.py index abcb6985a16c..f37f4f40cf5a 100644 --- a/test/unit/objectstore/test_objectstore.py +++ b/test/unit/objectstore/test_objectstore.py @@ -15,6 +15,7 @@ CacheTarget, check_cache, InProcessCacheMonitor, + reset_cache, ) from galaxy.objectstore.cloud import Cloud from galaxy.objectstore.pithos import PithosObjectStore @@ -28,6 +29,7 @@ directory_hash_id, unlink, ) +from galaxy.util.unittest_utils import skip_unless_environ # Unit testing the cloud and advanced infrastructure object stores is difficult, but @@ -1264,6 +1266,273 @@ def test_config_parse_azure_no_cache(): assert object_store.staging_path == directory.global_config.object_store_cache_path +def verify_caching_object_store_functionality(tmp_path, object_store): + # Test no dataset with id 1 exists. + absent_dataset = MockDataset(1) + assert not object_store.exists(absent_dataset) + + # Write empty dataset 2 in second backend, ensure it is empty and + # exists. + empty_dataset = MockDataset(2) + object_store.create(empty_dataset) + assert object_store.exists(empty_dataset) + assert object_store.empty(empty_dataset) + + # Write non-empty dataset in backend 1, test it is not emtpy & exists. + # with cache... + hello_world_dataset = MockDataset(3) + hello_path = tmp_path / "hello.txt" + hello_path.write_text("Hello World!") + object_store.update_from_file(hello_world_dataset, file_name=hello_path, create=True) + assert object_store.exists(hello_world_dataset) + assert not object_store.empty(hello_world_dataset) + + # Test get_data + data = object_store.get_data(hello_world_dataset) + assert data == "Hello World!" + + data = object_store.get_data(hello_world_dataset, start=1, count=6) + assert data == "ello W" + path = object_store.get_filename(hello_world_dataset) + assert open(path).read() == "Hello World!" + + # Write non-empty dataset in backend 1, test it is not emtpy & exists. + # without cache... + hello_world_dataset_2 = MockDataset(10) + object_store.update_from_file(hello_world_dataset_2, file_name=hello_path, create=True) + reset_cache(object_store.cache_target) + assert object_store.exists(hello_world_dataset_2) + reset_cache(object_store.cache_target) + assert not object_store.empty(hello_world_dataset_2) + reset_cache(object_store.cache_target) + + data = object_store.get_data(hello_world_dataset_2) + assert data == "Hello World!" + reset_cache(object_store.cache_target) + data = object_store.get_data(hello_world_dataset_2, start=1, count=6) + assert data == "ello W" + reset_cache(object_store.cache_target) + path = object_store.get_filename(hello_world_dataset_2) + assert open(path).read() == "Hello World!" + + # Test Size + + # Test absent and empty datasets yield size of 0. + assert object_store.size(absent_dataset) == 0 + assert object_store.size(empty_dataset) == 0 + # Elsewise + assert object_store.size(hello_world_dataset) == 12 + + # Test percent used (to some degree) + percent_store_used = object_store.get_store_usage_percent() + assert percent_store_used >= 0.0 + assert percent_store_used < 100.0 + + # Test delete + to_delete_dataset = MockDataset(5) + object_store.create(to_delete_dataset) + assert object_store.exists(to_delete_dataset) + assert object_store.delete(to_delete_dataset) + assert not object_store.exists(to_delete_dataset) + + # Test delete no cache + to_delete_dataset = MockDataset(5) + object_store.create(to_delete_dataset) + assert object_store.exists(to_delete_dataset) + reset_cache(object_store.cache_target) + assert object_store.delete(to_delete_dataset) + reset_cache(object_store.cache_target) + assert not object_store.exists(to_delete_dataset) + + # Test get_object_url returns a read-only URL + url = object_store.get_object_url(hello_world_dataset) + from requests import get + + response = get(url) + response.raise_for_status() + assert response.text == "Hello World!" + + +def verify_object_store_functionality(tmp_path, object_store): + # Test no dataset with id 1 exists. + absent_dataset = MockDataset(1) + assert not object_store.exists(absent_dataset) + + # Write empty dataset 2 in second backend, ensure it is empty and + # exists. + empty_dataset = MockDataset(2) + object_store.create(empty_dataset) + assert object_store.exists(empty_dataset) + assert object_store.empty(empty_dataset) + + # Write non-empty dataset in backend 1, test it is not emtpy & exists. + # with cache... + hello_world_dataset = MockDataset(3) + hello_path = tmp_path / "hello.txt" + hello_path.write_text("Hello World!") + object_store.update_from_file(hello_world_dataset, file_name=hello_path, create=True) + assert object_store.exists(hello_world_dataset) + assert not object_store.empty(hello_world_dataset) + + # Test get_data + data = object_store.get_data(hello_world_dataset) + assert data == "Hello World!" + + data = object_store.get_data(hello_world_dataset, start=1, count=6) + assert data == "ello W" + path = object_store.get_filename(hello_world_dataset) + assert open(path).read() == "Hello World!" + + # Test Size + + # Test absent and empty datasets yield size of 0. + assert object_store.size(absent_dataset) == 0 + assert object_store.size(empty_dataset) == 0 + # Elsewise + assert object_store.size(hello_world_dataset) == 12 + + # Test delete + to_delete_dataset = MockDataset(5) + object_store.create(to_delete_dataset) + assert object_store.exists(to_delete_dataset) + assert object_store.delete(to_delete_dataset) + assert not object_store.exists(to_delete_dataset) + + # Test get_object_url returns a read-only URL + url = object_store.get_object_url(hello_world_dataset) + from requests import get + + response = get(url) + response.raise_for_status() + assert response.text == "Hello World!" + + +AZURE_BLOB_TEMPLATE_TEST_CONFIG_YAML = """ +type: azure_blob +store_by: uuid +auth: + account_name: ${account_name} + account_key: ${account_key} + +container: + name: ${container_name} + +extra_dirs: +- type: job_work + path: database/job_working_directory_azure +- type: temp + path: database/tmp_azure +""" + + +@skip_unless_environ("GALAXY_TEST_AZURE_CONTAINER_NAME") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_KEY") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_NAME") +def test_real_azure_blob_store(tmp_path): + template_vars = { + "container_name": os.environ["GALAXY_TEST_AZURE_CONTAINER_NAME"], + "account_key": os.environ["GALAXY_TEST_AZURE_ACCOUNT_KEY"], + "account_name": os.environ["GALAXY_TEST_AZURE_ACCOUNT_NAME"], + } + with TestConfig(AZURE_BLOB_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (_, object_store): + verify_caching_object_store_functionality(tmp_path, object_store) + + +AZURE_BLOB_TEMPLATE_WITH_ACCOUNT_URL_TEST_CONFIG_YAML = """ +type: azure_blob +store_by: uuid +auth: + account_name: ${account_name} + account_key: ${account_key} + account_url: ${account_url} + +container: + name: ${container_name} + +extra_dirs: +- type: job_work + path: database/job_working_directory_azure +- type: temp + path: database/tmp_azure +""" + + +@skip_unless_environ("GALAXY_TEST_AZURE_CONTAINER_NAME") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_KEY") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_NAME") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_URL") +def test_real_azure_blob_store_with_account_url(tmp_path): + template_vars = { + "container_name": os.environ["GALAXY_TEST_AZURE_CONTAINER_NAME"], + "account_key": os.environ["GALAXY_TEST_AZURE_ACCOUNT_KEY"], + "account_name": os.environ["GALAXY_TEST_AZURE_ACCOUNT_NAME"], + "account_url": os.environ["GALAXY_TEST_AZURE_ACCOUNT_URL"], + } + with TestConfig(AZURE_BLOB_TEMPLATE_WITH_ACCOUNT_URL_TEST_CONFIG_YAML, template_vars=template_vars) as ( + _, + object_store, + ): + verify_caching_object_store_functionality(tmp_path, object_store) + + +AZURE_BLOB_IN_HIERARCHICAL_TEMPLATE_TEST_CONFIG_YAML = """ +type: distributed +backends: +- type: azure_blob + id: azure1 + store_by: uuid + name: Azure Store 1 + allow_selection: true + weight: 1 + auth: + account_name: ${account_name} + account_key: ${account_key} + + container: + name: ${container_name} + + extra_dirs: + - type: job_work + path: database/job_working_directory_azure_1 + - type: temp + path: database/tmp_azure_1 +- type: azure_blob + id: azure2 + store_by: uuid + name: Azure Store 2 + allow_selection: true + weight: 1 + auth: + account_name: ${account_name} + account_key: ${account_key} + + container: + name: ${container_name} + + extra_dirs: + - type: job_work + path: database/job_working_directory_azure_2 + - type: temp + path: database/tmp_azure_2 +""" + + +@skip_unless_environ("GALAXY_TEST_AZURE_CONTAINER_NAME") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_KEY") +@skip_unless_environ("GALAXY_TEST_AZURE_ACCOUNT_NAME") +def test_real_azure_blob_store_in_hierarchical(tmp_path): + template_vars = { + "container_name": os.environ["GALAXY_TEST_AZURE_CONTAINER_NAME"], + "account_key": os.environ["GALAXY_TEST_AZURE_ACCOUNT_KEY"], + "account_name": os.environ["GALAXY_TEST_AZURE_ACCOUNT_NAME"], + } + with TestConfig(AZURE_BLOB_IN_HIERARCHICAL_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as ( + _, + object_store, + ): + verify_object_store_functionality(tmp_path, object_store) + + class MockDataset: def __init__(self, id): self.id = id