Skip to content

Commit

Permalink
[WIP] device source stuff...
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jan 18, 2024
1 parent 1e3051d commit e6166e5
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 4 deletions.
27 changes: 27 additions & 0 deletions lib/galaxy/managers/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,33 @@ def has_access_permission(self, dataset, user):
roles = user.all_roles_exploiting_cache() if user else []
return self.app.security_agent.can_access_dataset(roles, dataset)

def update_object_store_id(self, trans, dataset, object_store_id: str):
device_source_map = self.app.object_store.get_device_source_map().get_device_id(object_store_id)
old_object_store_id = dataset.object_store_id
new_object_store_id = object_store_id
if old_object_store_id == new_object_store_id:
return None
old_device_id = device_source_map.get_device_id(old_object_store_id)
new_device_id = device_source_map.get_device_id(new_object_store_id)
if old_device_id != new_device_id:
raise exceptions.RequestParameterInvalidException("Cannot swap object store IDs for object stores that don't share a device ID.")

# prevent update if dataset shared with anyone but the current user
# private object stores would prevent this but if something has been
# kept private in a sharable object store still allow the swap


quota_source_map = self.app.object_store.get_quota_source_map()
if quota_source_map:
quota_source_map.get_quota_source_label(old_object_store_id)
quota_source_map.get_quota_source_label(new_object_store_id)

dataset.object_store_id = new_object_store_id





def compute_hash(self, request: ComputeDatasetHashTaskRequest):
# For files in extra_files_path
dataset = self.by_id(request.dataset_id)
Expand Down
10 changes: 10 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4024,6 +4024,16 @@ def quota_source_info(self):
quota_source_map = self.object_store.get_quota_source_map()
return quota_source_map.get_quota_source_info(object_store_id)

@property
def device_source_label(self):
return self.device_source_info.label

@property
def device_source_info(self):
object_store_id = self.object_store_id
device_source_map = self.object_store.get_quota_source_map()
return device_source_map.get_device_source_info(object_store_id)

def set_file_name(self, filename):
if not filename:
self.external_filename = None
Expand Down
17 changes: 17 additions & 0 deletions lib/galaxy/model/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
select,
)
from sqlalchemy.orm import joinedload
from sqlalchemy.sql import text

import galaxy.model
from galaxy.model import (
Expand Down Expand Up @@ -634,6 +635,22 @@ def can_modify_library_item(self, roles, item):
def can_manage_library_item(self, roles, item):
return self.allow_action(roles, self.permitted_actions.LIBRARY_MANAGE, item)

def can_change_object_store_id(self, user, dataset):
# prevent update if dataset shared with anyone but the current user
# private object stores would prevent this but if something has been
# kept private in a sharable object store still allow the swap
if dataset.library_associations:
return False
else:
query = text("""
SELECT COUNT(*)
FROM history
INNER JOIN
history_dataset_association on history_dataset_association.history_id = history.id
WHERE history.user_id != :user_id and history_dataset_association.dataset_id = :dataset_id
""").bindparams(dataset_id=dataset.id, user_id=user.id)
return self.sa_session.scalars(query).first() == 0

def get_item_actions(self, action, item):
# item must be one of: Dataset, Library, LibraryFolder, LibraryDataset, LibraryDatasetDatasetAssociation
# SM: Accessing item.actions emits a query to Library_Dataset_Permissions
Expand Down
46 changes: 44 additions & 2 deletions lib/galaxy/objectstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
DEFAULT_PRIVATE = False
DEFAULT_QUOTA_SOURCE = None # Just track quota right on user object in Galaxy.
DEFAULT_QUOTA_ENABLED = True # enable quota tracking in object stores by default

DEFAULT_DEVICE_ID = None
log = logging.getLogger(__name__)


Expand Down Expand Up @@ -329,6 +329,10 @@ def to_dict(self) -> Dict[str, Any]:
def get_quota_source_map(self):
"""Return QuotaSourceMap describing mapping of object store IDs to quota sources."""

@abc.abstractmethod
def get_device_source_map(self) -> "DeviceSourceMap":
"""Return DeviceSourceMap describing mapping of object store IDs to device sources."""


class BaseObjectStore(ObjectStore):
store_by: str
Expand Down Expand Up @@ -491,6 +495,9 @@ def get_quota_source_map(self):
# I'd rather keep this abstract... but register_singleton wants it to be instantiable...
raise NotImplementedError()

def get_device_source_map(self):
return DeviceSourceMap()


class ConcreteObjectStore(BaseObjectStore):
"""Subclass of ObjectStore for stores that don't delegate (non-nested).
Expand All @@ -501,6 +508,7 @@ class ConcreteObjectStore(BaseObjectStore):
"""

badges: List[StoredBadgeDict]
device_id: Optional[str] = None

def __init__(self, config, config_dict=None, **kwargs):
"""
Expand Down Expand Up @@ -528,6 +536,7 @@ def __init__(self, config, config_dict=None, **kwargs):
quota_config = config_dict.get("quota", {})
self.quota_source = quota_config.get("source", DEFAULT_QUOTA_SOURCE)
self.quota_enabled = quota_config.get("enabled", DEFAULT_QUOTA_ENABLED)
self.device_id = config_dict.get("device", None)
self.badges = read_badges(config_dict)

def to_dict(self):
Expand All @@ -541,6 +550,7 @@ def to_dict(self):
"enabled": self.quota_enabled,
}
rval["badges"] = self._get_concrete_store_badges(None)
rval["device"] = self.device_id
return rval

def to_model(self, object_store_id: str) -> "ConcreteObjectStoreModel":
Expand All @@ -551,6 +561,7 @@ def to_model(self, object_store_id: str) -> "ConcreteObjectStoreModel":
description=self.description,
quota=QuotaModel(source=self.quota_source, enabled=self.quota_enabled),
badges=self._get_concrete_store_badges(None),
device=self.device_id,
)

def _get_concrete_store_badges(self, obj) -> List[BadgeDict]:
Expand Down Expand Up @@ -587,6 +598,9 @@ def get_quota_source_map(self):
)
return quota_source_map

def get_device_source_map(self) -> "DeviceSourceMap":
return DeviceSourceMap(self.device_id)


class DiskObjectStore(ConcreteObjectStore):
"""
Expand Down Expand Up @@ -1036,7 +1050,7 @@ def __init__(self, config, config_dict, fsmon=False):
"""
super().__init__(config, config_dict)
self._quota_source_map = None

self._device_source_map = None
self.backends = {}
self.weighted_backend_ids = []
self.original_weighted_backend_ids = []
Expand Down Expand Up @@ -1208,6 +1222,13 @@ def get_quota_source_map(self):
self._quota_source_map = quota_source_map
return self._quota_source_map

def get_device_source_map(self) -> "DeviceSourceMap":
if self._device_source_map is None:
device_source_map = DeviceSourceMap()
self._merge_device_source_map(device_source_map, self)
self._device_source_map = device_source_map
return self._device_source_map

@classmethod
def _merge_quota_source_map(clz, quota_source_map, object_store):
for backend_id, backend in object_store.backends.items():
Expand All @@ -1216,6 +1237,14 @@ def _merge_quota_source_map(clz, quota_source_map, object_store):
else:
quota_source_map.backends[backend_id] = backend.get_quota_source_map()

@classmethod
def _merge_device_source_map(clz, device_source_map: "DeviceSourceMap", object_store):
for backend_id, backend in object_store.backends.items():
if isinstance(backend, DistributedObjectStore):
clz._merge_device_source_map(device_source_map, backend)
else:
device_source_map.backends[backend_id] = backend.get_device_source_map()

def __get_store_id_for(self, obj, **kwargs):
if obj.object_store_id is not None:
if obj.object_store_id in self.backends:
Expand Down Expand Up @@ -1364,6 +1393,7 @@ class ConcreteObjectStoreModel(BaseModel):
description: Optional[str] = None
quota: QuotaModel
badges: List[BadgeDict]
device: Optional[str] = None


def type_to_object_store_class(store: str, fsmon: bool = False) -> Tuple[Type[BaseObjectStore], Dict[str, Any]]:
Expand Down Expand Up @@ -1506,6 +1536,18 @@ class QuotaSourceInfo(NamedTuple):
use: bool


class DeviceSourceMap:
def __init__(self, device_id=DEFAULT_DEVICE_ID):
self.default_device_id = device_id
self.backends = {}

def get_device_id(self, object_store_id: str) -> Optional[str]:
if object_store_id in self.backends:
return self.backends[object_store_id].get_device_source_map().get_device_id(object_store_id)
else:
return self.default_device_id


class QuotaSourceMap:
def __init__(self, source=DEFAULT_QUOTA_SOURCE, enabled=DEFAULT_QUOTA_ENABLED):
self.default_quota_source = source
Expand Down
102 changes: 102 additions & 0 deletions lib/galaxy/quota/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Optional

from sqlalchemy import select
from sqlalchemy.orm import object_session
from sqlalchemy.sql import text

import galaxy.util
Expand All @@ -25,6 +26,13 @@ class QuotaAgent: # metaclass=abc.ABCMeta
the quota in other apps (LDAP maybe?) or via configuration files.
"""

def relabel_quota_for_dataset(self, dataset, quota_source_label):
"""Update the quota source label for dataset and adjust relevant quotas.
Subtract quota for labels from users using old label and quota for new label
for these users.
"""

# TODO: make abstractmethod after they work better with mypy
def get_quota(self, user, quota_source_label=None) -> Optional[int]:
"""Return quota in bytes or None if no quota is set."""
Expand Down Expand Up @@ -81,6 +89,9 @@ def __init__(self):
def get_quota(self, user, quota_source_label=None) -> Optional[int]:
return None

def relabel_quota_for_dataset(self, dataset, quota_source_label):
return None

@property
def default_quota(self):
return None
Expand Down Expand Up @@ -173,6 +184,97 @@ def get_quota(self, user, quota_source_label=None) -> Optional[int]:
else:
return None

def relabel_quota_for_dataset(self, dataset, from_label, to_label):
adjust = dataset.get_total_size()
with_quota_affected_users = """WITH quota_affected_users AS
(
SELECT DISTINCT user_id
FROM history
INNER JOIN
history_dataset_association on history_dataset_association.history_id = history.id
INNER JOIN
dataset on history_dataset_association.dataset_id = dataset.id
WHERE
dataset_id = :dataset_id
)"""
engine = object_session(dataset).bind

# Hack for older sqlite, would work on newer sqlite - 3.24.0
for_sqlite = "sqlite" in engine.dialect.name

if to_label == from_label:
return
if to_label is None:
to_statement = f"""
{with_quota_affected_users}
UPDATE galaxy_user
SET disk_usage = coalesce(disk_usage, 0) + :adjust
WHERE id in quota_affected_users
"""
else:
if for_sqlite:
to_statement = f"""
{with_quota_affected_users},
new_quota_sources (user_id, disk_usage, quota_source_label) AS (
SELECT user_id, :adjust as disk_usage, :to_label as quota_source_label
FROM quota_affected_users
)
INSERT OR REPLACE INTO user_quota_source_usage (id, user_id, quota_source_label, disk_usage)
SELECT old.id, new.user_id, new.quota_source_label, COALESCE(old.disk_usage + :adjust, :adjust)
FROM new_quota_sources as new LEFT JOIN user_quota_source_usage AS old ON new.user_id = old.user_id AND NEW.quota_source_label = old.quota_source_label"""
else:
to_statement = f"""
{with_quota_affected_users},
new_quota_sources (user_id, disk_usage, quota_source_label) AS (
SELECT user_id, :adjust as disk_usage, :to_label as quota_source_label
FROM quota_affected_users
)
INSERT INTO user_quota_source_usage(user_id, disk_usage, quota_source_label)
SELECT * FROM new_quota_sources
ON CONFLICT
ON constraint uqsu_unique_label_per_user
DO UPDATE SET disk_usage = user_quota_source_usage.disk_usage + :adjust
"""

if from_label is None:
from_statement = f"""
{with_quota_affected_users}
UPDATE galaxy_user
SET disk_usage = coalesce(disk_usage - :adjust, 0)
WHERE id in quota_affected_users
"""
else:
if for_sqlite:
from_statement = f"""
{with_quota_affected_users},
new_quota_sources (user_id, disk_usage, quota_source_label) AS (
SELECT user_id, :adjust as disk_usage, :from_label as quota_source_label
FROM quota_affected_users
)
INSERT OR REPLACE INTO user_quota_source_usage (id, user_id, quota_source_label, disk_usage)
SELECT old.id, new.user_id, new.quota_source_label, COALESCE(old.disk_usage - :adjust, 0)
FROM new_quota_sources as new LEFT JOIN user_quota_source_usage AS old ON new.user_id = old.user_id AND NEW.quota_source_label = old.quota_source_label"""
else:
from_statement = f"""
{with_quota_affected_users},
new_quota_sources (user_id, disk_usage, quota_source_label) AS (
SELECT user_id, 0 as disk_usage, :from_label as quota_source_label
FROM quota_affected_users
)
INSERT INTO user_quota_source_usage(user_id, disk_usage, quota_source_label)
SELECT * FROM new_quota_sources
ON CONFLICT
ON constraint uqsu_unique_label_per_user
DO UPDATE SET disk_usage = user_quota_source_usage.disk_usage - :adjust
"""

bind = {"dataset_id": dataset.id, "adjust": int(adjust), "to_label": to_label, "from_label": from_label}
engine = self.sa_session.get_bind()
with engine.connect() as conn:
conn.execute(text(from_statement), bind)
conn.execute(text(to_statement), bind)
return None

def _default_unregistered_quota(self, quota_source_label):
return self._default_quota(self.model.DefaultQuotaAssociation.types.UNREGISTERED, quota_source_label)

Expand Down
3 changes: 3 additions & 0 deletions lib/galaxy/security/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def can_add_library_item(self, roles, item):
def can_modify_library_item(self, roles, item):
raise Exception("Unimplemented Method")

def can_change_object_store_id(self, user, dataset):
raise Exception("Unimplemented Method")

def can_manage_library_item(self, roles, item):
raise Exception("Unimplemented Method")

Expand Down
14 changes: 14 additions & 0 deletions lib/galaxy/webapps/galaxy/api/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
DeleteDatasetBatchPayload,
DeleteDatasetBatchResult,
RequestDataType,
UpdateObjectStoreIdPayload,
)

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -484,3 +485,16 @@ def compute_hash(
payload: ComputeDatasetHashPayload = Body(...),
) -> AsyncTaskResultSummary:
return self.service.compute_hash(trans, dataset_id, payload, hda_ldda=hda_ldda)

@router.put(
"/api/datasets/{dataset_id}/object_store_id",
summary="Update an object store ID for a dataset you own.",
operation_id="datasets__update_object_store_id",
)
def update_object_store_id(
self,
dataset_id: HistoryDatasetIDPathParam,
trans=DependsOnTrans,
payload: UpdateObjectStoreIdPayload = Body(...),
) -> AsyncTaskResultSummary:
return self.service.update_object_store_id(trans, dataset_id, payload)
Loading

0 comments on commit e6166e5

Please sign in to comment.