Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AWS object store for us-east-2 #18100

Merged
merged 4 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions lib/galaxy/objectstore/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ def _config_to_dict(self):
"name": self.bucket_name,
"use_reduced_redundancy": self.use_rr,
},
"connection": {
"host": self.host,
"port": self.port,
"multipart": self.multipart,
"is_secure": self.is_secure,
"conn_path": self.conn_path,
},
"cache": {
"size": self.cache_size,
"path": self.staging_path,
Expand All @@ -86,7 +79,6 @@ def __init__(self, config, config_dict):
self.transfer_progress = 0

bucket_dict = config_dict["bucket"]
connection_dict = config_dict.get("connection", {})
cache_dict = config_dict.get("cache") or {}
self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict)

Expand All @@ -96,12 +88,6 @@ def __init__(self, config, config_dict):
self.use_rr = bucket_dict.get("use_reduced_redundancy", False)
self.max_chunk_size = bucket_dict.get("max_chunk_size", 250)

self.host = connection_dict.get("host", None)
self.port = connection_dict.get("port", 6000)
self.multipart = connection_dict.get("multipart", True)
self.is_secure = connection_dict.get("is_secure", True)
self.conn_path = connection_dict.get("conn_path", "/")

self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size
self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path
self.cache_updated_data = cache_dict.get("cache_updated_data", True)
Expand Down Expand Up @@ -131,6 +117,8 @@ def _get_connection(provider, credentials):
log.debug(f"Configuring `{provider}` Connection")
if provider == "aws":
config = {"aws_access_key": credentials["access_key"], "aws_secret_key": credentials["secret_key"]}
if "region" in credentials:
config["aws_region_name"] = credentials["region"]
connection = CloudProviderFactory().create_provider(ProviderList.AWS, config)
elif provider == "azure":
config = {
Expand Down Expand Up @@ -198,8 +186,9 @@ def parse_xml(clazz, config_xml):
if provider == "aws":
akey = auth_element.get("access_key")
skey = auth_element.get("secret_key")

config["auth"] = {"access_key": akey, "secret_key": skey}
if "region" in auth_element:
config["auth"]["region"] = auth_element["region"]
elif provider == "azure":
sid = auth_element.get("subscription_id")
if sid is None:
Expand Down Expand Up @@ -553,7 +542,7 @@ def _create(self, obj, **kwargs):

def _empty(self, obj, **kwargs):
if self._exists(obj, **kwargs):
return bool(self._size(obj, **kwargs) > 0)
return bool(self._size(obj, **kwargs) == 0)
else:
raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}")

Expand Down Expand Up @@ -692,7 +681,7 @@ def _get_object_url(self, obj, **kwargs):
log.exception("Trouble generating URL for dataset '%s'", rel_path)
return None

def _get_store_usage_percent(self):
def _get_store_usage_percent(self, obj):
return 0.0

def shutdown(self):
Expand Down
24 changes: 22 additions & 2 deletions lib/galaxy/objectstore/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def parse_config_xml(config_xml):
multipart = string_as_bool(cn_xml.get("multipart", "True"))
is_secure = string_as_bool(cn_xml.get("is_secure", "True"))
conn_path = cn_xml.get("conn_path", "/")
region = cn_xml.get("region", None)

cache_dict = parse_caching_config_dict_from_xml(config_xml)

Expand Down Expand Up @@ -114,6 +115,7 @@ def parse_config_xml(config_xml):
"multipart": multipart,
"is_secure": is_secure,
"conn_path": conn_path,
"region": region,
},
"cache": cache_dict,
"extra_dirs": extra_dirs,
Expand Down Expand Up @@ -142,6 +144,7 @@ def _config_to_dict(self):
"multipart": self.multipart,
"is_secure": self.is_secure,
"conn_path": self.conn_path,
"region": self.region,
},
"cache": {
"size": self.cache_size,
Expand Down Expand Up @@ -185,6 +188,7 @@ def __init__(self, config, config_dict):
self.multipart = connection_dict.get("multipart", True)
self.is_secure = connection_dict.get("is_secure", True)
self.conn_path = connection_dict.get("conn_path", "/")
self.region = connection_dict.get("region", None)

self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size
self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path
Expand Down Expand Up @@ -228,7 +232,23 @@ def _configure_connection(self):
log.debug("Configuring S3 Connection")
# If access_key is empty use default credential chain
if self.access_key:
self.conn = S3Connection(self.access_key, self.secret_key)
if self.region:
# If specify a region we can infer a host and turn on SIGV4.
# https://stackoverflow.com/questions/26744712/s3-using-boto-and-sigv4-missing-host-parameter

# Turning on SIGV4 is needed for AWS regions created after 2014... from
# https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html:
#
# "Amazon S3 supports Signature Version 4, a protocol for authenticating inbound API requests to AWS services,
# in all AWS Regions. At this time, AWS Regions created before January 30, 2014 will continue to support the
# previous protocol, Signature Version 2. Any new Regions after January 30, 2014 will support only Signature
# Version 4 and therefore all requests to those Regions must be made with Signature Version 4."
os.environ["S3_USE_SIGV4"] = "True"
self.conn = S3Connection(self.access_key, self.secret_key, host=f"s3.{self.region}.amazonaws.com")
else:
# See notes above, this path through the code will not work for
# newer regions.
self.conn = S3Connection(self.access_key, self.secret_key)
else:
self.conn = S3Connection()

Expand Down Expand Up @@ -581,7 +601,7 @@ def _create(self, obj, **kwargs):

def _empty(self, obj, **kwargs):
if self._exists(obj, **kwargs):
return bool(self._size(obj, **kwargs) > 0)
return bool(self._size(obj, **kwargs) == 0)
Copy link
Member

@mvdbeek mvdbeek May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all the other object stores have the same issue. And we could drop the unnecessary bool cast.

else:
raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}")

Expand Down
160 changes: 122 additions & 38 deletions test/unit/objectstore/test_objectstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from uuid import uuid4

import pytest
from requests import get

from galaxy.exceptions import ObjectInvalid
from galaxy.objectstore.azure_blob import AzureBlobObjectStore
Expand Down Expand Up @@ -991,26 +992,19 @@ def test_config_parse_cloud():
assert object_store.bucket_name == "unique_bucket_name_all_lowercase"
assert object_store.use_rr is False

assert object_store.host is None
assert object_store.port == 6000
assert object_store.multipart is True
assert object_store.is_secure is True
assert object_store.conn_path == "/"

cache_target = object_store.cache_target
assert cache_target.size == 1000.0
assert cache_target.path == "database/object_store_cache"
assert object_store.extra_dirs["job_work"] == "database/job_working_directory_cloud"
assert object_store.extra_dirs["temp"] == "database/tmp_cloud"

as_dict = object_store.to_dict()
_assert_has_keys(as_dict, ["provider", "auth", "bucket", "connection", "cache", "extra_dirs", "type"])
_assert_has_keys(as_dict, ["provider", "auth", "bucket", "cache", "extra_dirs", "type"])

_assert_key_has_value(as_dict, "type", "cloud")

auth_dict = as_dict["auth"]
bucket_dict = as_dict["bucket"]
connection_dict = as_dict["connection"]
cache_dict = as_dict["cache"]

provider = as_dict["provider"]
Expand All @@ -1028,11 +1022,6 @@ def test_config_parse_cloud():
_assert_key_has_value(bucket_dict, "name", "unique_bucket_name_all_lowercase")
_assert_key_has_value(bucket_dict, "use_reduced_redundancy", False)

_assert_key_has_value(connection_dict, "host", None)
_assert_key_has_value(connection_dict, "port", 6000)
_assert_key_has_value(connection_dict, "multipart", True)
_assert_key_has_value(connection_dict, "is_secure", True)

_assert_key_has_value(cache_dict, "size", 1000.0)
_assert_key_has_value(cache_dict, "path", "database/object_store_cache")

Expand All @@ -1056,26 +1045,19 @@ def test_config_parse_cloud_noauth_for_aws():
assert object_store.bucket_name == "unique_bucket_name_all_lowercase"
assert object_store.use_rr is False

assert object_store.host is None
assert object_store.port == 6000
assert object_store.multipart is True
assert object_store.is_secure is True
assert object_store.conn_path == "/"

cache_target = object_store.cache_target
assert cache_target.size == 1000.0
assert cache_target.path == "database/object_store_cache"
assert object_store.extra_dirs["job_work"] == "database/job_working_directory_cloud"
assert object_store.extra_dirs["temp"] == "database/tmp_cloud"

as_dict = object_store.to_dict()
_assert_has_keys(as_dict, ["provider", "auth", "bucket", "connection", "cache", "extra_dirs", "type"])
_assert_has_keys(as_dict, ["provider", "auth", "bucket", "cache", "extra_dirs", "type"])

_assert_key_has_value(as_dict, "type", "cloud")

auth_dict = as_dict["auth"]
bucket_dict = as_dict["bucket"]
connection_dict = as_dict["connection"]
cache_dict = as_dict["cache"]

provider = as_dict["provider"]
Expand All @@ -1087,11 +1069,6 @@ def test_config_parse_cloud_noauth_for_aws():
_assert_key_has_value(bucket_dict, "name", "unique_bucket_name_all_lowercase")
_assert_key_has_value(bucket_dict, "use_reduced_redundancy", False)

_assert_key_has_value(connection_dict, "host", None)
_assert_key_has_value(connection_dict, "port", 6000)
_assert_key_has_value(connection_dict, "multipart", True)
_assert_key_has_value(connection_dict, "is_secure", True)

_assert_key_has_value(cache_dict, "size", 1000.0)
_assert_key_has_value(cache_dict, "path", "database/object_store_cache")

Expand Down Expand Up @@ -1266,7 +1243,7 @@ def test_config_parse_azure_no_cache():
assert object_store.staging_path == directory.global_config.object_store_cache_path


def verify_caching_object_store_functionality(tmp_path, object_store):
def verify_caching_object_store_functionality(tmp_path, object_store, check_get_url=True):
# Test no dataset with id 1 exists.
absent_dataset = MockDataset(1)
assert not object_store.exists(absent_dataset)
Expand Down Expand Up @@ -1346,14 +1323,13 @@ def verify_caching_object_store_functionality(tmp_path, object_store):

# Test get_object_url returns a read-only URL
url = object_store.get_object_url(hello_world_dataset)
from requests import get

response = get(url)
response.raise_for_status()
assert response.text == "Hello World!"
if check_get_url:
response = get(url)
response.raise_for_status()
assert response.text == "Hello World!"


def verify_object_store_functionality(tmp_path, object_store):
def verify_object_store_functionality(tmp_path, object_store, check_get_url=True):
# Test no dataset with id 1 exists.
absent_dataset = MockDataset(1)
assert not object_store.exists(absent_dataset)
Expand Down Expand Up @@ -1400,11 +1376,10 @@ def verify_object_store_functionality(tmp_path, object_store):

# Test get_object_url returns a read-only URL
url = object_store.get_object_url(hello_world_dataset)
from requests import get

response = get(url)
response.raise_for_status()
assert response.text == "Hello World!"
if check_get_url:
response = get(url)
response.raise_for_status()
assert response.text == "Hello World!"


AZURE_BLOB_TEMPLATE_TEST_CONFIG_YAML = """
Expand Down Expand Up @@ -1533,6 +1508,115 @@ def test_real_azure_blob_store_in_hierarchical(tmp_path):
verify_object_store_functionality(tmp_path, object_store)


AMAZON_S3_SIMPLE_TEMPLATE_TEST_CONFIG_YAML = """
type: aws_s3
store_by: uuid
auth:
access_key: ${access_key}
secret_key: ${secret_key}

bucket:
name: ${bucket}

connection:
region: ${region}

extra_dirs:
- type: job_work
path: database/job_working_directory_azure
- type: temp
path: database/tmp_azure
"""


@skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_BUCKET")
@skip_unless_environ("GALAXY_TEST_AWS_REGION")
def test_real_aws_s3_store(tmp_path):
template_vars = {
"access_key": os.environ["GALAXY_TEST_AWS_ACCESS_KEY"],
"secret_key": os.environ["GALAXY_TEST_AWS_SECRET_KEY"],
"bucket": os.environ["GALAXY_TEST_AWS_BUCKET"],
"region": os.environ["GALAXY_TEST_AWS_REGION"],
}
with TestConfig(AMAZON_S3_SIMPLE_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (_, object_store):
verify_caching_object_store_functionality(tmp_path, object_store)


AMAZON_CLOUDBRIDGE_TEMPLATE_TEST_CONFIG_YAML = """
type: cloud
store_by: uuid
provider: aws
auth:
access_key: ${access_key}
secret_key: ${secret_key}

bucket:
name: ${bucket}

extra_dirs:
- type: job_work
path: database/job_working_directory_azure
- type: temp
path: database/tmp_azure
"""


@skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_BUCKET")
def test_aws_via_cloudbridge_store(tmp_path):
template_vars = {
"access_key": os.environ["GALAXY_TEST_AWS_ACCESS_KEY"],
"secret_key": os.environ["GALAXY_TEST_AWS_SECRET_KEY"],
"bucket": os.environ["GALAXY_TEST_AWS_BUCKET"],
}
with TestConfig(AMAZON_CLOUDBRIDGE_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (_, object_store):
# disabling get_object_url check - cloudbridge in this config assumes the region
# is us-east-1 and generates a URL for that region. This functionality works and can
# be tested if a region is specified in the configuration (see next config and test case).
verify_caching_object_store_functionality(tmp_path, object_store, check_get_url=False)


AMAZON_CLOUDBRIDGE_WITH_REGION_TEMPLATE_TEST_CONFIG_YAML = """
type: cloud
store_by: uuid
provider: aws
auth:
access_key: ${access_key}
secret_key: ${secret_key}
region: ${region}

bucket:
name: ${bucket}

extra_dirs:
- type: job_work
path: database/job_working_directory_azure
- type: temp
path: database/tmp_azure
"""


@skip_unless_environ("GALAXY_TEST_AWS_ACCESS_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_SECRET_KEY")
@skip_unless_environ("GALAXY_TEST_AWS_BUCKET")
@skip_unless_environ("GALAXY_TEST_AWS_REGION")
def test_aws_via_cloudbridge_store_with_region(tmp_path):
template_vars = {
"access_key": os.environ["GALAXY_TEST_AWS_ACCESS_KEY"],
"secret_key": os.environ["GALAXY_TEST_AWS_SECRET_KEY"],
"bucket": os.environ["GALAXY_TEST_AWS_BUCKET"],
"region": os.environ["GALAXY_TEST_AWS_REGION"],
}
with TestConfig(AMAZON_CLOUDBRIDGE_WITH_REGION_TEMPLATE_TEST_CONFIG_YAML, template_vars=template_vars) as (
_,
object_store,
):
verify_caching_object_store_functionality(tmp_path, object_store)


class MockDataset:
def __init__(self, id):
self.id = id
Expand Down
Loading