Skip to content

Commit

Permalink
Format new onedata related code
Browse files Browse the repository at this point in the history
  • Loading branch information
bwalkowi committed May 6, 2024
1 parent 1f6b6a0 commit 2d995bb
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 78 deletions.
7 changes: 2 additions & 5 deletions lib/galaxy/files/sources/onedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ def _open_fs(self, user_context=None, opts: Optional[FilesSourceOptions] = None)
onezone_domain = props.pop("onezoneDomain", "") or ""
onezone_domain = remove_prefix("http://", remove_prefix("https://", onezone_domain))
access_token = props.pop("accessToken", "") or ""
disable_tls_certificate_validation = (
props.pop("disableTlsCertificateValidation", False) or False)
handle = OnedataRESTFS(onezone_domain,
access_token,
verify_ssl=not disable_tls_certificate_validation)
disable_tls_certificate_validation = props.pop("disableTlsCertificateValidation", False) or False
handle = OnedataRESTFS(onezone_domain, access_token, verify_ssl=not disable_tls_certificate_validation)
return handle


Expand Down
98 changes: 43 additions & 55 deletions lib/galaxy/objectstore/onedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from typing import Optional

try:
from onedatafilerestclient import OnedataFileRESTClient
from onedatafilerestclient import OnedataRESTError
from onedatafilerestclient import (
OnedataFileRESTClient,
OnedataRESTError,
)
except ImportError:
OnedataFileRESTClient = None

Expand Down Expand Up @@ -51,30 +53,26 @@ def _parse_config_xml(config_xml):

conn_xml = _get_config_xml_elements(config_xml, "connection")[0]
onezone_domain = conn_xml.get("onezone_domain")
disable_tls_certificate_validation = string_as_bool(
conn_xml.get("disable_tls_certificate_validation", "False"))
disable_tls_certificate_validation = string_as_bool(conn_xml.get("disable_tls_certificate_validation", "False"))

space_xml = _get_config_xml_elements(config_xml, "space")[0]
space_name = space_xml.get("name")
galaxy_root_dir = space_xml.get("path", "")

cache_dict = parse_caching_config_dict_from_xml(config_xml)

extra_dirs = [{attr: elem.get(attr) for attr in ("type", "path")}
for elem in _get_config_xml_elements(config_xml, "extra_dir")]
extra_dirs = [
{attr: elem.get(attr) for attr in ("type", "path")}
for elem in _get_config_xml_elements(config_xml, "extra_dir")
]

return {
"auth": {
"access_token": access_token
},
"auth": {"access_token": access_token},
"connection": {
"onezone_domain": onezone_domain,
"disable_tls_certificate_validation": disable_tls_certificate_validation,
},
"space": {
"name": space_name,
"galaxy_root_dir": galaxy_root_dir
},
"space": {"name": space_name, "galaxy_root_dir": galaxy_root_dir},
"cache": cache_dict,
"extra_dirs": extra_dirs,
"private": ConcreteObjectStore.parse_private_from_config_xml(config_xml),
Expand Down Expand Up @@ -115,8 +113,7 @@ def __init__(self, config, config_dict):

connection_dict = config_dict["connection"]
self.onezone_domain = connection_dict["onezone_domain"]
self.disable_tls_certificate_validation = connection_dict.get(
"disable_tls_certificate_validation", False)
self.disable_tls_certificate_validation = connection_dict.get("disable_tls_certificate_validation", False)

space_dict = config_dict["space"]
self.space_name = space_dict["name"]
Expand All @@ -137,42 +134,40 @@ def _initialize(self):
if OnedataFileRESTClient is None:
raise Exception(NO_ONEDATA_ERROR_MESSAGE)

log.debug(f"Configuring Onedata connection to {self.onezone_domain} "
f"(disable_tls_certificate_validation={self.disable_tls_certificate_validation})")
log.debug(
f"Configuring Onedata connection to {self.onezone_domain} "
f"(disable_tls_certificate_validation={self.disable_tls_certificate_validation})"

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
)

verify_ssl = not self.disable_tls_certificate_validation
self._client = OnedataFileRESTClient(self.onezone_domain,
self.access_token,
verify_ssl=verify_ssl)
self._client = OnedataFileRESTClient(self.onezone_domain, self.access_token, verify_ssl=verify_ssl)

if self.enable_cache_monitor:
self.cache_monitor = InProcessCacheMonitor(self.cache_target,
self.cache_monitor_interval)
self.cache_monitor = InProcessCacheMonitor(self.cache_target, self.cache_monitor_interval)

@classmethod
def parse_xml(clazz, config_xml):
return _parse_config_xml(config_xml)

def to_dict(self):
as_dict = super().to_dict()
as_dict.update({
"auth": {
"access_token": self.access_token,
},
"connection": {
"onezone_domain": self.onezone_domain,
"disable_tls_certificate_validation": self.disable_tls_certificate_validation,
},
"space": {
"name": self.space_name,
"galaxy_root_dir": self.galaxy_root_dir
},
"cache": {
"size": self.cache_size,
"path": self.staging_path,
"cache_updated_data": self.cache_updated_data,
},
})
as_dict.update(
{
"auth": {
"access_token": self.access_token,
},
"connection": {
"onezone_domain": self.onezone_domain,
"disable_tls_certificate_validation": self.disable_tls_certificate_validation,
},
"space": {"name": self.space_name, "galaxy_root_dir": self.galaxy_root_dir},
"cache": {
"size": self.cache_size,
"path": self.staging_path,
"cache_updated_data": self.cache_updated_data,
},
}
)
return as_dict

@property
Expand Down Expand Up @@ -243,7 +238,7 @@ def _construct_onedata_path(self, rel_path):
def _get_size_in_onedata(self, rel_path):
try:
onedata_path = self._construct_onedata_path(rel_path)
return self._client.get_attributes(self.space_name, file_path=onedata_path)['size']
return self._client.get_attributes(self.space_name, file_path=onedata_path)["size"]
except OnedataRESTError:
log.exception("Could not get '%s' size from Onedata", rel_path)
return -1
Expand Down Expand Up @@ -290,7 +285,7 @@ def _download(self, rel_path):
log.debug("Pulling file '%s' into cache to %s", rel_path, dst_path)

onedata_path = self._construct_onedata_path(rel_path)
file_size = self._client.get_attributes(self.space_name, file_path=onedata_path)['size']
file_size = self._client.get_attributes(self.space_name, file_path=onedata_path)["size"]

# Test if cache is large enough to hold the new file
if not self.cache_target.fits_in_cache(file_size):
Expand All @@ -302,10 +297,8 @@ def _download(self, rel_path):
)
return False

with open(dst_path, 'wb') as dst:
for chunk in self._client.iter_file_content(self.space_name,
STREAM_CHUNK_SIZE,
file_path=onedata_path):
with open(dst_path, "wb") as dst:
for chunk in self._client.iter_file_content(self.space_name, STREAM_CHUNK_SIZE, file_path=onedata_path):
dst.write(chunk)

log.debug("Pulled '%s' into cache to %s", rel_path, dst_path)
Expand All @@ -326,9 +319,7 @@ def _push_to_os(self, rel_path, source_file=None):
if os.path.exists(source_file):
if os.path.getsize(source_file) == 0 and self._exists_in_onedata(rel_path):
log.debug(
"Wanted to push file '%s' to Onedata '%s' but its size is 0; skipping.",
source_file,
rel_path
"Wanted to push file '%s' to Onedata '%s' but its size is 0; skipping.", source_file, rel_path
)
return True
else:
Expand All @@ -343,17 +334,14 @@ def _push_to_os(self, rel_path, source_file=None):
onedata_path = self._construct_onedata_path(rel_path)
file_id = self._client.get_file_id(self.space_name, onedata_path)

with open(source_file, 'rb') as src:
with open(source_file, "rb") as src:
offset = 0
while True:
chunk = src.read(STREAM_CHUNK_SIZE)
if not chunk:
break

self._client.put_file_content(self.space_name,
file_id,
offset,
chunk)
self._client.put_file_content(self.space_name, file_id, offset, chunk)
offset += len(chunk)

end_time = datetime.now()
Expand Down Expand Up @@ -445,7 +433,7 @@ def _create(self, obj, **kwargs):
# need this line to set the dataset filename, not sure how this is done - filesystem is monitored?
open(os.path.join(self.staging_path, rel_path), "w").close()
onedata_path = self._construct_onedata_path(rel_path)
self._client.create_file(self.space_name, onedata_path, 'REG', create_parents=True)
self._client.create_file(self.space_name, onedata_path, "REG", create_parents=True)
return self

def _empty(self, obj, **kwargs):
Expand Down
34 changes: 16 additions & 18 deletions test/integration/objectstore/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,20 +259,18 @@ def handle_galaxy_config_kwds(cls, config):
config["outputs_to_working_directory"] = True
config["retry_metadata_internally"] = False
with open(config_path, "w") as f:
f.write(ONEDATA_OBJECT_STORE_CONFIG.safe_substitute(
{
"temp_directory": temp_directory,
"access_token": get_onedata_access_token(cls.oz_container_name),
"onezone_domain": docker_ip_address(cls.oz_container_name),
"space_name": ONEDATA_DEMO_SPACE_NAME,
"optional_space_params": random.choice([
'',
'path=""',
'path="a/b/c/d"'
]),
"cache_updated_data": cls.updateCacheData(),
}
))
f.write(
ONEDATA_OBJECT_STORE_CONFIG.safe_substitute(
{
"temp_directory": temp_directory,
"access_token": get_onedata_access_token(cls.oz_container_name),
"onezone_domain": docker_ip_address(cls.oz_container_name),
"space_name": ONEDATA_DEMO_SPACE_NAME,
"optional_space_params": random.choice(["", 'path=""', 'path="a/b/c/d"']),
"cache_updated_data": cls.updateCacheData(),
}
)
)

config["object_store_config_file"] = config_path

Expand Down Expand Up @@ -310,7 +308,7 @@ def docker_run(image, name, *args, detach=True, remove=True, ports=None):
cmd = ["docker", "run"]

if ports:
for (container_port, host_port) in ports:
for container_port, host_port in ports:
cmd.extend(["-p", f"{container_port}:{host_port}"])

if detach:
Expand Down Expand Up @@ -341,11 +339,11 @@ def docker_ip_address(container_name):
cmd = [
"docker",
"inspect",
'-f',
'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}',
"-f",
"{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",
container_name,
]
return subprocess.check_output(cmd).decode('utf-8').strip()
return subprocess.check_output(cmd).decode("utf-8").strip()


def docker_rm(container_name):
Expand Down

0 comments on commit 2d995bb

Please sign in to comment.