Skip to content

Commit

Permalink
Merge pull request #18372 from bwalkowi/test-onedata-objectstore-with…
Browse files Browse the repository at this point in the history
…-new-caching

[24.1] Onedada object store and files source stability fixes
  • Loading branch information
mvdbeek authored Jun 13, 2024
2 parents 5c68108 + 396c3e4 commit d8d6eb4
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 59 deletions.
4 changes: 2 additions & 2 deletions lib/galaxy/config/sample/object_store_conf.sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ auth:
# an access token suitable for data access (allowing calls to the Oneprovider REST API).
access_token: ...
connection:
# the domain of the Onezone service (e.g. "demo.onedata.org"), or its IP address for
# the domain of the Onezone service (e.g. datahub.egi.eu), or its IP address for
# devel instances (see above).
onezone_domain: demo.onedata.org
onezone_domain: datahub.egi.eu
# Allows connection to Onedata servers that do not present trusted SSL certificates.
# SHOULD NOT be used unless you really know what you are doing.
disable_tls_certificate_validation: false
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/config/sample/object_store_conf.xml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
an access token suitable for data access (allowing calls to the Oneprovider REST API).
//connection/@onezone_domain -
the domain of the Onezone service (e.g. "demo.onedata.org"), or its IP address for
the domain of the Onezone service (e.g. datahub.egi.eu), or its IP address for
devel instances (see above).
//connection/@disable_tls_certificate_validation -
Expand All @@ -182,7 +182,7 @@
<!--
<object_store type="onedata">
<auth access_token="..." />
<connection onezone_domain="demo.onedata.org" disable_tls_certificate_validation="False"/>
<connection onezone_domain="datahub.egi.eu" disable_tls_certificate_validation="False"/>
<space name="demo-space" path="galaxy-data" />
<cache path="database/object_store_cache" size="1000" cache_updated_data="True" />
<extra_dir type="job_work" path="database/job_working_directory_onedata"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ preferences:
description: Your Onedata account
inputs:
- name: onezone_domain
label: Domain of the Onezone service (e.g. "demo.onedata.org")
label: Domain of the Onezone service (e.g. datahub.egi.eu)
type: text
required: False
- name: access_token
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/dependencies/conditional-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fs.googledrivefs # type: googledrive
fs-gcsfs # type: googlecloudstorage
# fs-gcsfs doesn't pin google-cloud-storage, and old versions log noisy exceptions and break test discovery
google-cloud-storage>=2.8.0 # type: googlecloudstorage
fs.onedatarestfs # type: onedata, depends on onedatafilerestclient
fs.onedatarestfs==21.2.5.1 # type: onedata, depends on onedatafilerestclient
fs-basespace # type: basespace
fs-azureblob # type: azure

Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/dependencies/dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ myst-parser==3.0.1 ; python_version >= "3.8" and python_version < "3.13"
nh3==0.2.17 ; python_version >= "3.8" and python_version < "3.13"
numpy==1.24.4 ; python_version >= "3.8" and python_version < "3.9"
numpy==1.26.4 ; python_version >= "3.9" and python_version < "3.13"
onedatafilerestclient==21.2.5rc1 ; python_version >= "3.8" and python_version < "3.13"
onedatafilerestclient==21.2.5.1 ; python_version >= "3.8" and python_version < "3.13"
outcome==1.3.0.post0 ; python_version >= "3.8" and python_version < "3.13"
packaging==24.0 ; python_version >= "3.8" and python_version < "3.13"
pathspec==0.12.1 ; python_version >= "3.8" and python_version < "3.13"
Expand Down
17 changes: 1 addition & 16 deletions lib/galaxy/files/sources/_pyfilesystem2.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,7 @@ def _list(
raise MessageException(f"Problem listing file source path {path}. Reason: {e}") from e

def _get_total_matches_count(self, fs: FS, path: str, filter: Optional[List[str]] = None) -> int:
# For some reason, using "*" as glob does not return all files and directories, only files.
# So we need to count files and directories "*/" separately.
# Also, some filesystems do not properly support directories count (like Google Cloud Storage),
# so we need to catch TypeError exceptions and fallback to 0.
files_glob_pattern = f"{path}/{filter[0] if filter else '*'}"
try:
files_count = fs.glob(files_glob_pattern).count().files
except TypeError:
files_count = 0

directory_glob_pattern = f"{files_glob_pattern}/"
try:
directories_count = fs.glob(directory_glob_pattern).count().directories
except TypeError:
directories_count = 0
return files_count + directories_count
return sum(1 for _ in fs.filterdir(path, namespaces=["basic"], files=filter, dirs=filter))

def _to_page(self, limit: Optional[int] = None, offset: Optional[int] = None) -> Optional[Tuple[int, int]]:
if limit is None and offset is None:
Expand Down
88 changes: 53 additions & 35 deletions lib/galaxy/objectstore/onedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from datetime import datetime

try:
from onedatafilerestclient import (
OnedataFileRESTClient,
from onedatafilerestclient import OnedataFileRESTClient
from onedatafilerestclient.errors import (
OnedataError,
OnedataRESTError,
)
except ImportError:
Expand Down Expand Up @@ -131,7 +132,7 @@ def _initialize(self):
self._start_cache_monitor_if_needed()

@classmethod
def parse_xml(clazz, config_xml):
def parse_xml(cls, config_xml):
return _parse_config_xml(config_xml)

def to_dict(self):
Expand All @@ -155,54 +156,54 @@ def to_dict(self):
)
return as_dict

def _construct_onedata_path(self, rel_path):
def _build_remote_path(self, rel_path):
return os.path.join(self.galaxy_root_dir, rel_path)

def _get_remote_size(self, rel_path):
try:
onedata_path = self._construct_onedata_path(rel_path)
return self._client.get_attributes(self.space_name, file_path=onedata_path)["size"]
except OnedataRESTError:
remote_path = self._build_remote_path(rel_path)
return self._client.get_attributes(self.space_name, attributes=["size"], file_path=remote_path)["size"]
except OnedataError:
log.exception("Could not get '%s' size from Onedata", rel_path)
return -1

def _exists_remotely(self, rel_path):
try:
onedata_path = self._construct_onedata_path(rel_path)
self._client.get_attributes(self.space_name, file_path=onedata_path)
remote_path = self._build_remote_path(rel_path)
self._client.get_attributes(self.space_name, attributes=["type", "size"], file_path=remote_path)
return True
except OnedataRESTError as ex:
if ex.http_code == 404:
return False
elif ex.http_code == 400 and ex.error_details.get("errno", None) == "enoent":
return False
else:
log.exception("Trouble checking '%s' existence in Onedata", rel_path)
except OnedataError as ex:
if _is_not_found_onedata_rest_error(ex):
return False

log.exception("Trouble checking '%s' existence in Onedata", rel_path)
return False

def _download(self, rel_path):
try:
dst_path = self._get_cache_path(rel_path)

log.debug("Pulling file '%s' into cache to %s", rel_path, dst_path)

onedata_path = self._construct_onedata_path(rel_path)
file_size = self._client.get_attributes(self.space_name, file_path=onedata_path)["size"]
remote_path = self._build_remote_path(rel_path)
file_size = self._client.get_attributes(self.space_name, attributes=["size"], file_path=remote_path)["size"]

# Test if cache is large enough to hold the new file
if not self._caching_allowed(rel_path, file_size):
return False

with open(dst_path, "wb") as dst:
for chunk in self._client.iter_file_content(self.space_name, STREAM_CHUNK_SIZE, file_path=onedata_path):
for chunk in self._client.iter_file_content(
self.space_name, chunk_size=STREAM_CHUNK_SIZE, file_path=remote_path
):
dst.write(chunk)

log.debug("Pulled '%s' into cache to %s", rel_path, dst_path)

return True
except OnedataRESTError:
except OnedataError:
log.exception("Problem downloading file '%s'", rel_path)
return False
return False

def _push_to_storage(self, rel_path, source_file=None, from_string=None):
"""
Expand All @@ -211,7 +212,7 @@ def _push_to_storage(self, rel_path, source_file=None, from_string=None):
``rel_path`` as the path.
"""
try:
source_file = source_file if source_file else self._get_cache_path(rel_path)
source_file = source_file or self._get_cache_path(rel_path)
if os.path.exists(source_file):
if os.path.getsize(source_file) == 0 and self._exists_remotely(rel_path):
log.debug(
Expand All @@ -226,12 +227,14 @@ def _push_to_storage(self, rel_path, source_file=None, from_string=None):
os.path.getsize(source_file),
rel_path,
)
onedata_path = self._construct_onedata_path(rel_path)
remote_path = self._build_remote_path(rel_path)

if not self._exists_remotely(rel_path):
self._client.create_file(self.space_name, onedata_path, "REG", create_parents=True)

file_id = self._client.get_file_id(self.space_name, onedata_path)
file_id = self._client.create_file(
self.space_name, file_path=remote_path, file_type="REG", create_parents=True
)
else:
file_id = self._client.get_file_id(self.space_name, file_path=remote_path)

if source_file:
with open(source_file, "rb") as src:
Expand All @@ -241,14 +244,18 @@ def _push_to_storage(self, rel_path, source_file=None, from_string=None):
if not chunk:
break

self._client.put_file_content(self.space_name, file_id, offset, chunk)
self._client.put_file_content(
self.space_name, data=chunk, offset=offset, file_id=file_id
)
offset += len(chunk)
else:
self._client.put_file_content(self.space_name, file_id, 0, from_string.encode("utf-8"))
self._client.put_file_content(
self.space_name, data=from_string.encode("utf-8"), file_id=file_id
)

end_time = datetime.now()
log.debug(
"Pushed cache file '%s' under '%s' (%s bytes transfered in %s sec)",
"Pushed cache file '%s' under '%s' (%s bytes transferred in %s sec)",
source_file,
rel_path,
os.path.getsize(source_file),
Expand All @@ -257,25 +264,36 @@ def _push_to_storage(self, rel_path, source_file=None, from_string=None):
return True
else:
log.error("Source file does not exist.", rel_path, source_file)
except OnedataRESTError:
except OnedataError:
log.exception("Trouble pushing Onedata key '%s' from file '%s'", rel_path, source_file)
raise
return False

def _delete_existing_remote(self, rel_path) -> bool:
try:
onedata_path = self._construct_onedata_path(rel_path)
self._client.remove(self.space_name, onedata_path)
onedata_path = self._build_remote_path(rel_path)
self._client.remove(self.space_name, file_path=onedata_path)
return True
except OnedataRESTError:
except OnedataError:
log.exception("Could not delete '%s' from Onedata", rel_path)
return False

def _get_object_url(self, obj, **kwargs):
def _get_object_url(self, _obj, **_kwargs):
return None

def _get_store_usage_percent(self, obj):
def _get_store_usage_percent(self, _obj):
return 0.0

def shutdown(self):
self._shutdown_cache_monitor()


def _is_not_found_onedata_rest_error(ex):
if isinstance(ex, OnedataRESTError):
if ex.http_code == 404:
return True

if ex.http_code == 400 and ex.category == "posix":
return ex.details["errno"] == "enoent"

return False
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ isort = "*"
lxml = "!=4.2.2"
markdown-it-reporter = "*"
myst-parser = "*"
onedatafilerestclient = "==21.2.5rc1"
onedatafilerestclient = "==21.2.5.1"
pkce = "*"
playwright = "*"
pytest = "*"
Expand Down

0 comments on commit d8d6eb4

Please sign in to comment.