Skip to content

Commit

Permalink
Implement a boto3 object store.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed May 9, 2024
1 parent 7fd767e commit 8ff1ca6
Show file tree
Hide file tree
Showing 5 changed files with 431 additions and 1 deletion.
3 changes: 3 additions & 0 deletions lib/galaxy/dependencies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ def check_python_pam(self):
def check_azure_storage(self):
return "azure_blob" in self.object_stores

def check_boto3(self):
return "boto3" in self.object_stores

def check_kamaki(self):
return "pithos" in self.object_stores

Expand Down
4 changes: 4 additions & 0 deletions lib/galaxy/objectstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,10 @@ def type_to_object_store_class(store: str, fsmon: bool = False) -> Tuple[Type[Ba
objectstore_constructor_kwds = {}
if store == "disk":
objectstore_class = DiskObjectStore
elif store == "boto3":
from .s3_boto3 import S3ObjectStore as Boto3ObjectStore

objectstore_class = Boto3ObjectStore
elif store in ["s3", "aws_s3"]:
from .s3 import S3ObjectStore

Expand Down
340 changes: 340 additions & 0 deletions lib/galaxy/objectstore/s3_boto3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
"""A more modern version of the S3 object store based on boto3 instead of boto.
"""

import logging
import os
from typing import (
TYPE_CHECKING,
TypedDict,
)

from typing_extensions import (
Literal,
NotRequired,
)

if TYPE_CHECKING:
from mypy_boto3_c3.client import S3Client

try:
# Imports are done this way to allow objectstore code to be used outside of Galaxy.
import boto3
from botocore.client import ClientError
except ImportError:
boto3 = None # type: ignore[assignment,unused-ignore]

from ._caching_base import CachingConcreteObjectStore
from .caching import (
enable_cache_monitor,
parse_caching_config_dict_from_xml,
)

NO_BOTO_ERROR_MESSAGE = (
"S3/Swift object store configured, but no boto3 dependency available."
"Please install and properly configure boto or modify object store configuration."
)

log = logging.getLogger(__name__)
logging.getLogger("boto").setLevel(logging.INFO) # Otherwise boto is quite noisy


def download_directory(bucket, remote_folder, local_path):
# List objects in the specified S3 folder
objects = bucket.list(prefix=remote_folder)

for obj in objects:
remote_file_path = obj.key
local_file_path = os.path.join(local_path, os.path.relpath(remote_file_path, remote_folder))

# Create directories if they don't exist
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

# Download the file
obj.get_contents_to_filename(local_file_path)


def parse_config_xml(config_xml):
try:
a_xml = config_xml.findall("auth")[0]
access_key = a_xml.get("access_key")
secret_key = a_xml.get("secret_key")

b_xml = config_xml.findall("bucket")[0]
bucket_name = b_xml.get("name")

cn_xml = config_xml.findall("connection")
if not cn_xml:
cn_xml = {}
else:
cn_xml = cn_xml[0]
endpoint_url = cn_xml.get("endpoint_url")
region = cn_xml.get("region")
cache_dict = parse_caching_config_dict_from_xml(config_xml)

tag, attrs = "extra_dir", ("type", "path")
extra_dirs = config_xml.findall(tag)
if not extra_dirs:
msg = f"No {tag} element in XML tree"
log.error(msg)
raise Exception(msg)
extra_dirs = [{k: e.get(k) for k in attrs} for e in extra_dirs]

config_dict = {
"auth": {
"access_key": access_key,
"secret_key": secret_key,
},
"bucket": {
"name": bucket_name,
},
"connection": {
"endpoint_url": endpoint_url,
"region": region,
},
"cache": cache_dict,
"extra_dirs": extra_dirs,
"private": CachingConcreteObjectStore.parse_private_from_config_xml(config_xml),
}
name = config_xml.attrib.get("name", None)
if name is not None:
config_dict["name"] = name
device = config_xml.attrib.get("device", None)
config_dict["device"] = device
return config_dict
except Exception:
# Toss it back up after logging, we can't continue loading at this point.
log.exception("Malformed ObjectStore Configuration XML -- unable to continue")
raise


class S3ClientConstructorKwds(TypedDict):
service_name: Literal["s3"]
endpoint_url: NotRequired[str]
region_name: NotRequired[str]
aws_access_key_id: NotRequired[str]
aws_secret_access_key: NotRequired[str]


class S3ObjectStore(CachingConcreteObjectStore):
"""
Object store that stores objects as items in an AWS S3 bucket. A local
cache exists that is used as an intermediate location for files between
Galaxy and S3.
"""

_client: "S3Client"
store_type = "aws_s3_boto3"
cloud = True

def __init__(self, config, config_dict):
super().__init__(config, config_dict)
self.cache_monitor = None

auth_dict = config_dict["auth"]
bucket_dict = config_dict["bucket"]
connection_dict = config_dict.get("connection", {})
cache_dict = config_dict.get("cache") or {}
self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict)

self.access_key = auth_dict.get("access_key")
self.secret_key = auth_dict.get("secret_key")

self.bucket = bucket_dict.get("name")

self.endpoint_url = connection_dict.get("endpoint_url")
self.region = connection_dict.get("region")

self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size
self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path
self.cache_updated_data = cache_dict.get("cache_updated_data", True)

extra_dirs = {e["type"]: e["path"] for e in config_dict.get("extra_dirs", [])}
self.extra_dirs.update(extra_dirs)

self._initialize()

def _initialize(self):
if boto3 is None:
raise Exception(NO_BOTO_ERROR_MESSAGE)

self._configure_connection()
self._start_cache_monitor_if_needed()

def _configure_connection(self):
log.debug("Configuring S3 Connection")
self._init_client()
if not self._bucket_exists:
self._create_bucket()

# get_object_url only works on AWS if client is set, so if it wasn't
# fetch it and reset the client now. Skip this logic entirely for other
# non-AWS services by ensuring endpoint_url is not set.
if not self.endpoint_url and not self.region:
response = self._client.get_bucket_location(
Bucket=self.bucket,
)
if "LocationConstraint" in response:
region = response["LocationConstraint"]
self.region = region
self._init_client()

def _init_client(self):
# set _client based on current args.
# If access_key is empty use default credential chain
kwds: S3ClientConstructorKwds = {
"service_name": "s3",
}
if self.endpoint_url:
kwds["endpoint_url"] = self.endpoint_url
if self.region:
kwds["region_name"] = self.region
if self.access_key:
kwds["aws_access_key_id"] = self.access_key
kwds["aws_secret_access_key"] = self.secret_key
self._client = boto3.client(**kwds)

@property
def _bucket_exists(self) -> bool:
try:
self._client.head_bucket(Bucket=self.bucket)
return True
except ClientError as err:
if err.response["ResponseMetadata"]["HTTPStatusCode"] == 404:
return False
raise

def _create_bucket(self):
kwds = {}
if self.region:
kwds["CreateBucketConfiguration"] = dict(LocationConstraint=self.region)
self._client.create_bucket(Bucket=self.bucket, **kwds)

@classmethod
def parse_xml(clazz, config_xml):
return parse_config_xml(config_xml)

def _config_to_dict(self):
return {
"auth": {
"access_key": self.access_key,
"secret_key": self.secret_key,
},
"bucket": {
"name": self.bucket,
},
"connection": {
"endpoint_url": self.endpoint_url,
"region": self.region,
},
"cache": {
"size": self.cache_size,
"path": self.staging_path,
"cache_updated_data": self.cache_updated_data,
},
}

def to_dict(self):
as_dict = super().to_dict()
as_dict.update(self._config_to_dict())
return as_dict

def _get_remote_size(self, rel_path) -> int:
response = self._client.head_object(Bucket=self.bucket, Key=rel_path)
return response["ContentLength"]

def _exists_remotely(self, rel_path: str) -> bool:
try:
self._client.head_object(Bucket=self.bucket, Key=rel_path)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
raise

def _download(self, rel_path: str) -> bool:
local_destination = self._get_cache_path(rel_path)
try:
log.debug("Pulling key '%s' into cache to %s", rel_path, local_destination)
if not self._caching_allowed(rel_path):
return False
self._client.download_file(self.bucket, rel_path, local_destination)
return True
except ClientError:
log.exception("Failed to download file from S3")
return False

def _push_string_to_path(self, rel_path: str, from_string: str) -> bool:
try:
self._client.put_object(Body=from_string.encode("utf-8"), Bucket=self.bucket, Key=rel_path)
return True
except ClientError:
log.exception("Trouble pushing to S3 '%s' from string", rel_path)
return False

def _push_file_to_path(self, rel_path: str, source_file: str) -> bool:
try:
self._client.upload_file(source_file, self.bucket, rel_path)
return True
except ClientError:
log.exception("Trouble pushing to S3 '%s' from file '%s'", rel_path, source_file)
return False

def _delete_remote_all(self, rel_path: str) -> bool:
try:
for key in self._keys(rel_path):
self._client.delete_object(Bucket=self.bucket, Key=key)
return True
except ClientError:
log.exception("Could not delete blob '%s' from S3", rel_path)
return False

def _delete_existing_remote(self, rel_path: str) -> bool:
try:
self._client.delete_object(Bucket=self.bucket, Key=rel_path)
return True
except ClientError:
log.exception("Could not delete blob '%s' from S3", rel_path)
return False

# https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3
def _keys(self, prefix="/", delimiter="/", start_after=""):
s3_paginator = self._client.get_paginator("list_objects_v2")
prefix = prefix.lstrip(delimiter)
start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
for page in s3_paginator.paginate(Bucket=self.bucket, Prefix=prefix, StartAfter=start_after):
for content in page.get("Contents", ()):
yield content["Key"]

def _download_directory_into_cache(self, rel_path, cache_path):
for key in self._keys(rel_path):
local_file_path = os.path.join(cache_path, os.path.relpath(key, rel_path))

# Create directories if they don't exist
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

# Download the file
self._client.download_file(self.bucket, rel_path, local_file_path)

def _get_object_url(self, obj, **kwargs):
try:
if self._exists(obj, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
url = self._client.generate_presigned_url(
ClientMethod="get_object",
Params={
"Bucket": self.bucket,
"Key": rel_path,
},
ExpiresIn=3600,
HttpMethod="GET",
)
return url
except ClientError:
log.exception("Failed to generate URL for dataset.")
return None

def _get_store_usage_percent(self, obj):
return 0.0

def shutdown(self):
self._shutdown_cache_monitor()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ types-python-dateutil = "*"
types-PyYAML = "*"
types-requests = "*"
types-six = "*"
"boto3-stubs[s3]" = "*"

[tool.ruff]
target-version = "py38"
Expand Down
Loading

0 comments on commit 8ff1ca6

Please sign in to comment.