diff --git a/client/src/components/Dataset/DatasetStorage/DatasetStorage.test.js b/client/src/components/Dataset/DatasetStorage/DatasetStorage.test.js index 229c7e170c0c..fefe272a5540 100644 --- a/client/src/components/Dataset/DatasetStorage/DatasetStorage.test.js +++ b/client/src/components/Dataset/DatasetStorage/DatasetStorage.test.js @@ -4,7 +4,6 @@ import { getLocalVue } from "jest/helpers"; import flushPromises from "flush-promises"; import MockAdapter from "axios-mock-adapter"; import axios from "axios"; -import MarkdownIt from "markdown-it"; const localVue = getLocalVue(); @@ -12,32 +11,11 @@ const TEST_STORAGE_API_RESPONSE_WITHOUT_ID = { object_store_id: null, private: false, }; -const TEST_STORAGE_API_RESPONSE_WITH_ID = { - object_store_id: "foobar", - private: false, -}; -const TEST_STORAGE_API_RESPONSE_WITH_NAME = { - object_store_id: "foobar", - name: "my cool storage", - description: "My cool **markdown**", - private: true, -}; const TEST_DATASET_ID = "1"; const TEST_STORAGE_URL = `/api/datasets/${TEST_DATASET_ID}/storage`; -const TEST_RENDERED_MARKDOWN_AS_HTML = "

My cool markdown\n"; const TEST_ERROR_MESSAGE = "Opps all errors."; -// works fine without mocking but I guess it is more JS unit-y with the mock? -jest.mock("markdown-it"); -MarkdownIt.mockImplementation(() => { - return { - render(markdown) { - return TEST_RENDERED_MARKDOWN_AS_HTML; - }, - }; -}); - -describe("Dataset Storage", () => { +describe("DatasetStorage.vue", () => { let axiosMock; let wrapper; @@ -62,6 +40,7 @@ describe("Dataset Storage", () => { mount(); await wrapper.vm.$nextTick(); expect(wrapper.findAll("loading-span-stub").length).toBe(1); + expect(wrapper.findAll("describe-object-store-stub").length).toBe(0); }); it("test error rendering...", async () => { @@ -78,46 +57,8 @@ describe("Dataset Storage", () => { it("test dataset storage with object store without id", async () => { await mountWithResponse(TEST_STORAGE_API_RESPONSE_WITHOUT_ID); expect(wrapper.findAll("loading-span-stub").length).toBe(0); - expect(wrapper.vm.descriptionRendered).toBeNull(); - const header = wrapper.findAll("h3"); - expect(header.length).toBe(1); - expect(header.at(0).text()).toBe("Dataset Storage"); - const byIdSpan = wrapper.findAll(".display-os-by-id"); - expect(byIdSpan.length).toBe(0); - const byNameSpan = wrapper.findAll(".display-os-by-name"); - expect(byNameSpan.length).toBe(0); - const byDefaultSpan = wrapper.findAll(".display-os-default"); - expect(byDefaultSpan.length).toBe(1); - }); - - it("test dataset storage with object store id", async () => { - await mountWithResponse(TEST_STORAGE_API_RESPONSE_WITH_ID); - expect(wrapper.findAll("loading-span-stub").length).toBe(0); - expect(wrapper.vm.storageInfo.object_store_id).toBe("foobar"); - expect(wrapper.vm.descriptionRendered).toBeNull(); - const header = wrapper.findAll("h3"); - expect(header.length).toBe(1); - expect(header.at(0).text()).toBe("Dataset Storage"); - const byIdSpan = wrapper.findAll(".display-os-by-id"); - expect(byIdSpan.length).toBe(1); - const byNameSpan = wrapper.findAll(".display-os-by-name"); - expect(byNameSpan.length).toBe(0); - expect(wrapper.find("object-store-restriction-span-stub").props("isPrivate")).toBeFalsy(); - }); - - it("test dataset storage with object store name", async () => { - await mountWithResponse(TEST_STORAGE_API_RESPONSE_WITH_NAME); - expect(wrapper.findAll("loading-span-stub").length).toBe(0); - expect(wrapper.vm.storageInfo.object_store_id).toBe("foobar"); - expect(wrapper.vm.descriptionRendered).toBe(TEST_RENDERED_MARKDOWN_AS_HTML); - const header = wrapper.findAll("h3"); - expect(header.length).toBe(1); - expect(header.at(0).text()).toBe("Dataset Storage"); - const byIdSpan = wrapper.findAll(".display-os-by-id"); - expect(byIdSpan.length).toBe(0); - const byNameSpan = wrapper.findAll(".display-os-by-name"); - expect(byNameSpan.length).toBe(1); - expect(wrapper.find("object-store-restriction-span-stub").props("isPrivate")).toBeTruthy(); + expect(wrapper.findAll("describe-object-store-stub").length).toBe(1); + expect(wrapper.vm.storageInfo.private).toEqual(false); }); afterEach(() => { diff --git a/client/src/components/Dataset/DatasetStorage/DatasetStorage.vue b/client/src/components/Dataset/DatasetStorage/DatasetStorage.vue index 4b8b6de0523e..437194f4b218 100644 --- a/client/src/components/Dataset/DatasetStorage/DatasetStorage.vue +++ b/client/src/components/Dataset/DatasetStorage/DatasetStorage.vue @@ -16,21 +16,7 @@

-

- This dataset is stored in - - a Galaxy object store named - {{ storageInfo.name }} - - - a Galaxy object store with id - {{ storageInfo.object_store_id }} - - - the default configured Galaxy object store . -

-
+
@@ -38,15 +24,14 @@ diff --git a/client/src/components/Dataset/DatasetStorage/ObjectStoreRestrictionSpan.test.js b/client/src/components/ObjectStore/ObjectStoreRestrictionSpan.test.js similarity index 100% rename from client/src/components/Dataset/DatasetStorage/ObjectStoreRestrictionSpan.test.js rename to client/src/components/ObjectStore/ObjectStoreRestrictionSpan.test.js diff --git a/client/src/components/Dataset/DatasetStorage/ObjectStoreRestrictionSpan.vue b/client/src/components/ObjectStore/ObjectStoreRestrictionSpan.vue similarity index 100% rename from client/src/components/Dataset/DatasetStorage/ObjectStoreRestrictionSpan.vue rename to client/src/components/ObjectStore/ObjectStoreRestrictionSpan.vue diff --git a/client/src/components/User/DiskUsage/Quota/QuotaUsageBar.vue b/client/src/components/User/DiskUsage/Quota/QuotaUsageBar.vue index 08a18c4d2e1d..2218657a8049 100644 --- a/client/src/components/User/DiskUsage/Quota/QuotaUsageBar.vue +++ b/client/src/components/User/DiskUsage/Quota/QuotaUsageBar.vue @@ -1,19 +1,23 @@ @@ -27,6 +31,12 @@ export default { type: Object, required: true, }, + // If this is embedded in DatasetStorage or more intricate components like + // that - shrink everything and avoid h2/h3 (component already has those). + embedded: { + type: Boolean, + default: false, + }, }, data() { return { @@ -55,6 +65,13 @@ export default { } return "danger"; }, + /** @returns {String} */ + sourceTag() { + return this.embedded ? "div" : "h2"; + }, + usageTag() { + return this.embedded ? "div" : "h3"; + }, }, }; diff --git a/client/src/components/User/DiskUsage/Quota/QuotaUsageProvider.js b/client/src/components/User/DiskUsage/Quota/QuotaUsageProvider.js index a794b00291aa..7fca9a08660b 100644 --- a/client/src/components/User/DiskUsage/Quota/QuotaUsageProvider.js +++ b/client/src/components/User/DiskUsage/Quota/QuotaUsageProvider.js @@ -8,17 +8,39 @@ import { QuotaUsage } from "./model"; // https://github.com/galaxyproject/galaxy/pull/10977 is available /** - * Fetches the disk usage by the user across all ObjectStores. + * Fetches the disk usage by the user across all ObjectStores quota + * sources. * @returns {Array} */ async function fetchQuotaUsage() { - const url = `${getAppRoot()}api/users/current`; + const url = `${getAppRoot()}api/users/current/usage`; try { const { data } = await axios.get(url); - return [new QuotaUsage(data)]; + const result = data.map((usage) => new QuotaUsage(usage)); + return result; } catch (e) { rethrowSimple(e); } } export const QuotaUsageProvider = SingleQueryProvider(fetchQuotaUsage); + +/** + * Fetches the disk usage corresponding to one quota source label - + * or the default quota sources if the supplied label is null. + * @returns {} + */ +async function fetchQuotaSourceUsage({ quotaSourceLabel = null }) { + if (quotaSourceLabel == null) { + quotaSourceLabel = "__null__"; + } + const url = `${getAppRoot()}api/users/current/usage/${quotaSourceLabel}`; + try { + const { data } = await axios.get(url); + return new QuotaUsage(data); + } catch (e) { + rethrowSimple(e); + } +} + +export const QuotaSourceUsageProvider = SingleQueryProvider(fetchQuotaSourceUsage); diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 3ad25cf2c154..f288574b7db2 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1885,13 +1885,17 @@ def fail(message=job.info, exception=None): # custom post process setup collected_bytes = 0 + quota_source_info = None # Once datasets are collected, set the total dataset size (includes extra files) for dataset_assoc in job.output_datasets: if not dataset_assoc.dataset.dataset.purged: + # assume all datasets in a job get written to the same objectstore + quota_source_info = dataset_assoc.dataset.dataset.quota_source_info collected_bytes += dataset_assoc.dataset.set_total_size() - if job.user: - job.user.adjust_total_disk_usage(collected_bytes) + user = job.user + if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use: + user.adjust_total_disk_usage(collected_bytes, quota_source_info.label) # Certain tools require tasks to be completed after job execution # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). diff --git a/lib/galaxy/managers/configuration.py b/lib/galaxy/managers/configuration.py index f1bc26aab754..9b3c8da9efe1 100644 --- a/lib/galaxy/managers/configuration.py +++ b/lib/galaxy/managers/configuration.py @@ -208,6 +208,9 @@ def _config_is_truthy(item, key, **context): "expose_user_email": _use_config, "enable_tool_source_display": _use_config, "enable_celery_tasks": _use_config, + "quota_source_labels": lambda item, key, **context: list( + self.app.object_store.get_quota_source_map().get_quota_source_labels() + ), "user_library_import_dir_available": lambda item, key, **context: bool(item.get("user_library_import_dir")), "welcome_directory": _use_config, } diff --git a/lib/galaxy/managers/hdas.py b/lib/galaxy/managers/hdas.py index 8ec470efa8b4..89969d542b8b 100644 --- a/lib/galaxy/managers/hdas.py +++ b/lib/galaxy/managers/hdas.py @@ -205,8 +205,9 @@ def _purge(self, hda, flush=True): quota_amount_reduction = hda.quota_amount(user) super().purge(hda, flush=flush) # decrease the user's space used - if quota_amount_reduction: - user.adjust_total_disk_usage(-quota_amount_reduction) + quota_source_info = hda.dataset.quota_source_info + if quota_amount_reduction and quota_source_info.use: + user.adjust_total_disk_usage(-quota_amount_reduction, quota_source_info.label) # .... states def error_if_uploading(self, hda): diff --git a/lib/galaxy/managers/quotas.py b/lib/galaxy/managers/quotas.py index 3ad48f095450..125be4e3f77a 100644 --- a/lib/galaxy/managers/quotas.py +++ b/lib/galaxy/managers/quotas.py @@ -60,7 +60,11 @@ def create_quota(self, payload: dict, decode_id=None) -> Tuple[model.Quota, str] raise ActionInputError("Operation for an unlimited quota must be '='.") # Create the quota quota = model.Quota( - name=params.name, description=params.description, amount=create_amount, operation=params.operation + name=params.name, + description=params.description, + amount=create_amount, + operation=params.operation, + quota_source_label=params.quota_source_label, ) self.sa_session.add(quota) # If this is a default quota, create the DefaultQuotaAssociation diff --git a/lib/galaxy/managers/users.py b/lib/galaxy/managers/users.py index 3f9a6e12eade..81191083584b 100644 --- a/lib/galaxy/managers/users.py +++ b/lib/galaxy/managers/users.py @@ -7,6 +7,12 @@ import socket import time from datetime import datetime +from typing import ( + Any, + Dict, + List, + Optional, +) from markupsafe import escape from sqlalchemy import ( @@ -411,13 +417,13 @@ def sharing_roles(self, user): def default_permissions(self, user): return self.app.security_agent.user_get_default_permissions(user) - def quota(self, user, total=False): + def quota(self, user, total=False, quota_source_label=None): if total: - return self.app.quota_agent.get_quota_nice_size(user) - return self.app.quota_agent.get_percent(user=user) + return self.app.quota_agent.get_quota_nice_size(user, quota_source_label=quota_source_label) + return self.app.quota_agent.get_percent(user=user, quota_source_label=quota_source_label) - def quota_bytes(self, user): - return self.app.quota_agent.get_quota(user=user) + def quota_bytes(self, user, quota_source_label: Optional[str] = None): + return self.app.quota_agent.get_quota(user=user, quota_source_label=quota_source_label) def tags_used(self, user, tag_models=None): """ @@ -699,6 +705,25 @@ def add_serializers(self): } ) + def serialize_disk_usage(self, user: model.User) -> List[Dict[str, Any]]: + rval = user.dictify_usage(self.app.object_store) + for usage in rval: + quota_source_label = usage["quota_source_label"] + usage["quota_percent"] = self.user_manager.quota(user, quota_source_label=quota_source_label) + usage["quota"] = self.user_manager.quota(user, total=True, quota_source_label=quota_source_label) + usage["quota_bytes"] = self.user_manager.quota_bytes(user, quota_source_label=quota_source_label) + usage["nice_total_disk_usage"] = util.nice_size(usage["total_disk_usage"]) + return rval + + def serialize_disk_usage_for(self, user: model.User, label: Optional[str]) -> Dict[str, Any]: + usage = user.dictify_usage_for(label) + quota_source_label = usage["quota_source_label"] + usage["quota_percent"] = self.user_manager.quota(user, quota_source_label=quota_source_label) + usage["quota"] = self.user_manager.quota(user, total=True, quota_source_label=quota_source_label) + usage["quota_bytes"] = self.user_manager.quota_bytes(user, quota_source_label=quota_source_label) + usage["nice_total_disk_usage"] = util.nice_size(usage["total_disk_usage"]) + return usage + class UserDeserializer(base.ModelDeserializer): """ diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 3de41642f26c..a0bb4533e2ae 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -30,6 +30,7 @@ List, NamedTuple, Optional, + Set, Tuple, Type, TYPE_CHECKING, @@ -54,6 +55,7 @@ and_, asc, BigInteger, + bindparam, Boolean, Column, DateTime, @@ -514,6 +516,109 @@ def stderr(self, stderr): raise NotImplementedError("Attempt to set stdout, must set tool_stderr or job_stderr") +UNIQUE_DATASET_USER_USAGE = """ +WITH per_user_histories AS +( + SELECT id + FROM history + WHERE user_id = :id + AND NOT purged +), +per_hist_hdas AS ( + SELECT DISTINCT dataset_id + FROM history_dataset_association + WHERE NOT purged + AND history_id IN (SELECT id FROM per_user_histories) +) +SELECT COALESCE(SUM(COALESCE(dataset.total_size, dataset.file_size, 0)), 0) +FROM dataset +LEFT OUTER JOIN library_dataset_dataset_association ON dataset.id = library_dataset_dataset_association.dataset_id +WHERE dataset.id IN (SELECT dataset_id FROM per_hist_hdas) + AND library_dataset_dataset_association.id IS NULL + AND ( + {dataset_condition} + ) +""" + + +def calculate_user_disk_usage_statements(user_id, quota_source_map, for_sqlite=False): + """Standalone function so can be reused for postgres directly in pgcleanup.py.""" + statements = [] + default_quota_enabled = quota_source_map.default_quota_enabled + default_exclude_ids = quota_source_map.default_usage_excluded_ids() + default_cond = "dataset.object_store_id IS NULL" if default_quota_enabled else "" + exclude_cond = "dataset.object_store_id NOT IN :exclude_object_store_ids" if default_exclude_ids else "" + use_or = " OR " if (default_cond != "" and exclude_cond != "") else "" + default_usage_dataset_condition = "{default_cond} {use_or} {exclude_cond}".format( + default_cond=default_cond, + exclude_cond=exclude_cond, + use_or=use_or, + ) + default_usage = UNIQUE_DATASET_USER_USAGE.format(dataset_condition=default_usage_dataset_condition) + default_usage = ( + """ +UPDATE galaxy_user SET disk_usage = (%s) +WHERE id = :id +""" + % default_usage + ) + params = {"id": user_id} + if default_exclude_ids: + params["exclude_object_store_ids"] = default_exclude_ids + statements.append((default_usage, params)) + source = quota_source_map.ids_per_quota_source() + # TODO: Merge a lot of these settings together by generating a temp table for + # the object_store_id to quota_source_label into a temp table of values + for (quota_source_label, object_store_ids) in source.items(): + label_usage = UNIQUE_DATASET_USER_USAGE.format( + dataset_condition="dataset.object_store_id IN :include_object_store_ids" + ) + if for_sqlite: + # hacky alternative for older sqlite + statement = """ +WITH new (user_id, quota_source_label, disk_usage) AS ( + VALUES(:id, :label, ({label_usage})) +) +INSERT OR REPLACE INTO user_quota_source_usage (id, user_id, quota_source_label, disk_usage) +SELECT old.id, new.user_id, new.quota_source_label, new.disk_usage +FROM new + LEFT JOIN user_quota_source_usage AS old + ON new.user_id = old.user_id + AND new.quota_source_label = old.quota_source_label +""".format( + label_usage=label_usage + ) + else: + statement = """ +INSERT INTO user_quota_source_usage(user_id, quota_source_label, disk_usage) +VALUES(:user_id, :label, ({label_usage})) +ON CONFLICT +ON constraint uqsu_unique_label_per_user +DO UPDATE SET disk_usage = excluded.disk_usage +""".format( + label_usage=label_usage + ) + statements.append( + (statement, {"id": user_id, "label": quota_source_label, "include_object_store_ids": object_store_ids}) + ) + + params = {"id": user_id} + source_labels = list(source.keys()) + if len(source_labels) > 0: + clean_old_statement = """ +DELETE FROM user_quota_source_usage +WHERE user_id = :id AND quota_source_label NOT IN :labels +""" + params["labels"] = source_labels + else: + clean_old_statement = """ +DELETE FROM user_quota_source_usage +WHERE user_id = :id AND quota_source_label IS NOT NULL +""" + statements.append((clean_old_statement, params)) + return statements + + class User(Base, Dictifiable, RepresentById): """ Data for a Galaxy user or admin and relations to their @@ -560,6 +665,7 @@ class User(Base, Dictifiable, RepresentById): "GalaxySession", back_populates="user", order_by=lambda: desc(GalaxySession.update_time) # type: ignore[has-type] ) quotas = relationship("UserQuotaAssociation", back_populates="user") + quota_source_usages = relationship("UserQuotaSourceUsage", back_populates="user") social_auth = relationship("UserAuthnzToken", back_populates="user") stored_workflow_menu_entries = relationship( "StoredWorkflowMenuEntry", @@ -716,14 +822,31 @@ def all_roles_exploiting_cache(self): roles.append(role) return roles - def get_disk_usage(self, nice_size=False): + def get_disk_usage(self, nice_size=False, quota_source_label=None): """ Return byte count of disk space used by user or a human-readable string if `nice_size` is `True`. """ - rval = 0 - if self.disk_usage is not None: - rval = self.disk_usage + if quota_source_label is None: + rval = 0 + if self.disk_usage is not None: + rval = self.disk_usage + else: + statement = """ +SELECT DISK_USAGE +FROM user_quota_source_usage +WHERE user_id = :user_id and quota_source_label = :label +""" + sa_session = object_session(self) + params = { + "user_id": self.id, + "label": quota_source_label, + } + row = sa_session.execute(statement, params).fetchone() + if row is not None: + rval = row[0] + else: + rval = 0 if nice_size: rval = galaxy.util.nice_size(rval) return rval @@ -736,9 +859,36 @@ def set_disk_usage(self, bytes): total_disk_usage = property(get_disk_usage, set_disk_usage) - def adjust_total_disk_usage(self, amount): + def adjust_total_disk_usage(self, amount, quota_source_label): + assert amount is not None if amount != 0: - self.disk_usage = func.coalesce(self.table.c.disk_usage, 0) + amount + if quota_source_label is None: + self.disk_usage = func.coalesce(self.table.c.disk_usage, 0) + amount + else: + # else would work on newer sqlite - 3.24.0 + sa_session = object_session(self) + if "sqlite" in sa_session.bind.dialect.name: + # hacky alternative for older sqlite + statement = """ +WITH new (user_id, quota_source_label) AS ( VALUES(:user_id, :label) ) +INSERT OR REPLACE INTO user_quota_source_usage (id, user_id, quota_source_label, disk_usage) +SELECT old.id, new.user_id, new.quota_source_label, COALESCE(old.disk_usage + :amount, :amount) +FROM new LEFT JOIN user_quota_source_usage AS old ON new.user_id = old.user_id AND NEW.quota_source_label = old.quota_source_label; +""" + else: + statement = """ +INSERT INTO user_quota_source_usage(user_id, disk_usage, quota_source_label) +VALUES(:user_id, :amount, :label) +ON CONFLICT + ON constraint uqsu_unique_label_per_user + DO UPDATE SET disk_usage = user_quota_source_usage.disk_usage + :amount +""" + params = { + "user_id": self.id, + "amount": int(amount), + "label": quota_source_label, + } + sa_session.execute(statement, params) @property def nice_total_disk_usage(self): @@ -747,53 +897,54 @@ def nice_total_disk_usage(self): """ return self.get_disk_usage(nice_size=True) - def calculate_disk_usage(self): + def calculate_disk_usage_default_source(self, object_store): """ Return byte count total of disk space used by all non-purged, non-library - HDAs in non-purged histories. + HDAs in non-purged histories assigned to default quota source. """ - # maintain a list so that we don't double count - return self._calculate_or_set_disk_usage(dryrun=True) + # only used in set_user_disk_usage.py + assert object_store is not None + quota_source_map = object_store.get_quota_source_map() + default_quota_enabled = quota_source_map.default_quota_enabled + default_cond = "dataset.object_store_id IS NULL OR" if default_quota_enabled else "" + default_usage_dataset_condition = ( + "{default_cond} dataset.object_store_id NOT IN :exclude_object_store_ids".format( + default_cond=default_cond, + ) + ) + default_usage = UNIQUE_DATASET_USER_USAGE.format(dataset_condition=default_usage_dataset_condition) + sql_calc = text(default_usage) + sql_calc = sql_calc.bindparams(bindparam("id"), bindparam("exclude_object_store_ids", expanding=True)) + params = {"id": self.id, "exclude_object_store_ids": quota_source_map.default_usage_excluded_ids()} + sa_session = object_session(self) + usage = sa_session.scalar(sql_calc, params) + return usage - def calculate_and_set_disk_usage(self): + def calculate_and_set_disk_usage(self, object_store): """ Calculates and sets user disk usage. """ - self._calculate_or_set_disk_usage(dryrun=False) + self._calculate_or_set_disk_usage(object_store=object_store) - def _calculate_or_set_disk_usage(self, dryrun=True): + def _calculate_or_set_disk_usage(self, object_store): """ Utility to calculate and return the disk usage. If dryrun is False, the new value is set immediately. """ - sql_calc = text( - """ - WITH per_user_histories AS - ( - SELECT id - FROM history - WHERE user_id = :id - AND NOT purged - ), - per_hist_hdas AS ( - SELECT DISTINCT dataset_id - FROM history_dataset_association - WHERE NOT purged - AND history_id IN (SELECT id FROM per_user_histories) - ) - SELECT SUM(COALESCE(dataset.total_size, dataset.file_size, 0)) - FROM dataset - LEFT OUTER JOIN library_dataset_dataset_association ON dataset.id = library_dataset_dataset_association.dataset_id - WHERE dataset.id IN (SELECT dataset_id FROM per_hist_hdas) - AND library_dataset_dataset_association.id IS NULL - """ - ) + assert object_store is not None + quota_source_map = object_store.get_quota_source_map() sa_session = object_session(self) - usage = sa_session.scalar(sql_calc, {"id": self.id}) - if not dryrun: - self.set_disk_usage(usage) + for_sqlite = "sqlite" in sa_session.bind.dialect.name + statements = calculate_user_disk_usage_statements(self.id, quota_source_map, for_sqlite) + for (sql, args) in statements: + statement = text(sql) + binds = [] + for key, _ in args.items(): + expand_binding = key.endswith("s") + binds.append(bindparam(key, expanding=expand_binding)) + statement = statement.bindparams(*binds) + sa_session.execute(statement, args) sa_session.flush() - return usage @staticmethod def user_template_environment(user): @@ -857,6 +1008,66 @@ def attempt_create_private_role(self): session.add(assoc) session.flush() + def dictify_usage(self, object_store=None) -> List[Dict[str, Any]]: + """Include object_store to include empty/unused usage info.""" + used_labels: Set[Union[str, None]] = set() + rval: List[Dict[str, Any]] = [ + { + "quota_source_label": None, + "total_disk_usage": float(self.disk_usage or 0), + } + ] + used_labels.add(None) + for quota_source_usage in self.quota_source_usages: + label = quota_source_usage.quota_source_label + rval.append( + { + "quota_source_label": label, + "total_disk_usage": float(quota_source_usage.disk_usage), + } + ) + used_labels.add(label) + + if object_store is not None: + for label in object_store.get_quota_source_map().ids_per_quota_source().keys(): + if label not in used_labels: + rval.append( + { + "quota_source_label": label, + "total_disk_usage": 0.0, + } + ) + + return rval + + def dictify_usage_for(self, quota_source_label: Optional[str]) -> Dict[str, Any]: + rval: Dict[str, Any] + if quota_source_label is None: + rval = { + "quota_source_label": None, + "total_disk_usage": float(self.disk_usage or 0), + } + else: + quota_source_usage = self.quota_source_usage_for(quota_source_label) + if quota_source_usage is None: + rval = { + "quota_source_label": quota_source_label, + "total_disk_usage": 0.0, + } + else: + rval = { + "quota_source_label": quota_source_label, + "total_disk_usage": float(quota_source_usage.disk_usage), + } + + return rval + + def quota_source_usage_for(self, quota_source_label: Optional[str]) -> Optional["UserQuotaSourceUsage"]: + for quota_source_usage in self.quota_source_usages: + if quota_source_usage.quota_source_label == quota_source_label: + return quota_source_usage + return None + class PasswordResetToken(Base, _HasTable): __tablename__ = "password_reset_token" @@ -2676,7 +2887,9 @@ def add_dataset(self, dataset, parent_id=None, genome_build=None, set_hid=True, dataset.hid = self._next_hid() add_object_to_object_session(dataset, self) if quota and is_dataset and self.user: - self.user.adjust_total_disk_usage(dataset.quota_amount(self.user)) + quota_source_info = dataset.dataset.quota_source_info + if quota_source_info.use: + self.user.adjust_total_disk_usage(dataset.quota_amount(self.user), quota_source_info.label) dataset.history = self if is_dataset and genome_build not in [None, "?"]: self.genome_build = genome_build @@ -2694,7 +2907,10 @@ def add_datasets( self.__add_datasets_optimized(datasets, genome_build=genome_build) if quota and self.user: disk_usage = sum(d.get_total_size() for d in datasets if is_hda(d)) - self.user.adjust_total_disk_usage(disk_usage) + if disk_usage: + quota_source_info = datasets[0].dataset.quota_source_info + if quota_source_info.use: + self.user.adjust_total_disk_usage(disk_usage, quota_source_info.label) sa_session.add_all(datasets) if flush: sa_session.flush() @@ -3093,6 +3309,20 @@ def __init__(self, name=None, description=None, type=types.SYSTEM, deleted=False self.deleted = deleted +class UserQuotaSourceUsage(Base, Dictifiable, RepresentById): + __tablename__ = "user_quota_source_usage" + __table_args__ = (UniqueConstraint("user_id", "quota_source_label", name="uqsu_unique_label_per_user"),) + + dict_element_visible_keys = ["disk_usage", "quota_source_label"] + + id = Column(Integer, primary_key=True) + user_id = Column(Integer, ForeignKey("galaxy_user.id"), index=True) + quota_source_label = Column(String(32), index=True) + # user had an index on disk_usage - does that make any sense? -John + disk_usage = Column(Numeric(15, 0), default=0, nullable=False) + user = relationship("User", back_populates="quota_source_usages") + + class UserQuotaAssociation(Base, Dictifiable, RepresentById): __tablename__ = "user_quota_association" @@ -3133,6 +3363,7 @@ def __init__(self, group, quota): class Quota(Base, Dictifiable, RepresentById): __tablename__ = "quota" + __table_args__ = (Index("ix_quota_quota_source_label", "quota_source_label"),) id = Column(Integer, primary_key=True) create_time = Column(DateTime, default=now) @@ -3142,11 +3373,12 @@ class Quota(Base, Dictifiable, RepresentById): bytes = Column(BigInteger) operation = Column(String(8)) deleted = Column(Boolean, index=True, default=False) + quota_source_label = Column(String(32), default=None) default = relationship("DefaultQuotaAssociation", back_populates="quota") groups = relationship("GroupQuotaAssociation", back_populates="quota") users = relationship("UserQuotaAssociation", back_populates="quota") - dict_collection_visible_keys = ["id", "name"] + dict_collection_visible_keys = ["id", "name", "quota_source_label"] dict_element_visible_keys = [ "id", "name", @@ -3157,10 +3389,11 @@ class Quota(Base, Dictifiable, RepresentById): "default", "users", "groups", + "quota_source_label", ] valid_operations = ("+", "-", "=") - def __init__(self, name=None, description=None, amount=0, operation="="): + def __init__(self, name=None, description=None, amount=0, operation="=", quota_source_label=None): self.name = name self.description = description if amount is None: @@ -3168,6 +3401,7 @@ def __init__(self, name=None, description=None, amount=0, operation="="): else: self.bytes = amount self.operation = operation + self.quota_source_label = quota_source_label def get_amount(self): if self.bytes == -1: @@ -3196,7 +3430,7 @@ class DefaultQuotaAssociation(Base, Dictifiable, RepresentById): id = Column(Integer, primary_key=True) create_time = Column(DateTime, default=now) update_time = Column(DateTime, default=now, onupdate=now) - type = Column(String(32), index=True, unique=True) + type = Column(String(32), index=True) quota_id = Column(Integer, ForeignKey("quota.id"), index=True) quota = relationship("Quota", back_populates="default") @@ -3539,6 +3773,16 @@ def get_file_name(self): # Make filename absolute return os.path.abspath(filename) + @property + def quota_source_label(self): + return self.quota_source_info.label + + @property + def quota_source_info(self): + object_store_id = self.object_store_id + quota_source_map = self.object_store.get_quota_source_map() + return quota_source_map.get_quota_source_info(object_store_id) + def set_file_name(self, filename): if not filename: self.external_filename = None @@ -4654,10 +4898,10 @@ def get_access_roles(self, security_agent): """ return self.dataset.get_access_roles(security_agent) - def purge_usage_from_quota(self, user): + def purge_usage_from_quota(self, user, quota_source_info): """Remove this HDA's quota_amount from user's quota.""" - if user: - user.adjust_total_disk_usage(-self.quota_amount(user)) + if user and quota_source_info.use: + user.adjust_total_disk_usage(-self.quota_amount(user), quota_source_info.label) def quota_amount(self, user): """ diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/d0583094c8cd_add_quota_source_labels.py b/lib/galaxy/model/migrations/alembic/versions_gxy/d0583094c8cd_add_quota_source_labels.py new file mode 100644 index 000000000000..712bec7b7a35 --- /dev/null +++ b/lib/galaxy/model/migrations/alembic/versions_gxy/d0583094c8cd_add_quota_source_labels.py @@ -0,0 +1,50 @@ +"""add quota source labels + +Revision ID: d0583094c8cd +Revises: 186d4835587b +Create Date: 2022-06-09 12:24:44.329038 + +""" +from alembic import op +from sqlalchemy import ( + Column, + ForeignKey, + Integer, + Numeric, + String, +) + +from galaxy.model.migrations.util import ( + add_unique_constraint, + drop_column, + drop_unique_constraint, +) + +# revision identifiers, used by Alembic. +revision = "d0583094c8cd" +down_revision = "186d4835587b" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("quota", Column("quota_source_label", String(32), default=None)) + + op.create_table( + "user_quota_source_usage", + Column("id", Integer, primary_key=True), + Column("user_id", Integer, ForeignKey("galaxy_user.id"), index=True), + Column("quota_source_label", String(32), index=True), + # user had an index on disk_usage - does that make any sense? -John + Column("disk_usage", Numeric(15, 0)), + ) + add_unique_constraint("uqsu_unique_label_per_user", "user_quota_source_usage", ["user_id", "quota_source_label"]) + drop_unique_constraint("ix_default_quota_association_type", "default_quota_association") + op.create_index("ix_quota_quota_source_label", "quota", ["quota_source_label"]) + + +def downgrade(): + add_unique_constraint("ix_default_quota_association_type", "default_quota_association", ["type"]) + op.drop_table("user_quota_source_usage") + op.drop_index("ix_quota_quota_source_label", "quota") + drop_column("quota", "quota_source_label") diff --git a/lib/galaxy/model/migrations/util.py b/lib/galaxy/model/migrations/util.py index 5163b2ff1c22..7a358bb41b7a 100644 --- a/lib/galaxy/model/migrations/util.py +++ b/lib/galaxy/model/migrations/util.py @@ -1,4 +1,5 @@ import logging +from typing import List from alembic import op from sqlalchemy import inspect @@ -11,6 +12,24 @@ def drop_column(table_name, column_name): batch_op.drop_column(column_name) +def add_unique_constraint(index_name: str, table_name: str, columns: List[str]): + bind = op.get_context().bind + if bind.engine.name == "sqlite": + with op.batch_alter_table(table_name) as batch_op: + batch_op.create_unique_constraint(index_name, columns) + else: + op.create_unique_constraint(index_name, table_name, columns) + + +def drop_unique_constraint(index_name: str, table_name: str): + bind = op.get_context().bind + if bind.engine.name == "sqlite": + with op.batch_alter_table(table_name) as batch_op: + batch_op.drop_constraint(index_name) + else: + op.drop_constraint(index_name, table_name) + + def column_exists(table_name, column_name): bind = op.get_context().bind insp = inspect(bind) diff --git a/lib/galaxy/objectstore/__init__.py b/lib/galaxy/objectstore/__init__.py index 15148faa818e..448e026f85d2 100644 --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -16,6 +16,8 @@ Any, Dict, List, + NamedTuple, + Optional, Type, ) @@ -43,6 +45,8 @@ "Attempted to 'create' object store entity in configuration with no database session present." ) DEFAULT_PRIVATE = False +DEFAULT_QUOTA_SOURCE = None # Just track quota right on user object in Galaxy. +DEFAULT_QUOTA_ENABLED = True # enable quota tracking in object stores by default log = logging.getLogger(__name__) @@ -273,6 +277,10 @@ def get_store_by(self, obj): """ raise NotImplementedError() + @abc.abstractmethod + def get_quota_source_map(self): + """Return QuotaSourceMap describing mapping of object store IDs to quota sources.""" + class BaseObjectStore(ObjectStore): store_by: str @@ -405,12 +413,17 @@ def parse_private_from_config_xml(clazz, config_xml): private = asbool(config_xml.attrib.get("private", DEFAULT_PRIVATE)) return private + def get_quota_source_map(self): + # I'd rather keep this abstract... but register_singleton wants it to be instantiable... + raise NotImplementedError() + class ConcreteObjectStore(BaseObjectStore): """Subclass of ObjectStore for stores that don't delegate (non-nested). - Currently only adds store_by functionality. Which doesn't make - sense for the delegating object stores. + Adds store_by and quota_source functionality. These attributes do not make + sense for the delegating object stores, they should describe files at actually + persisted, not how a file is routed to a persistence source. """ def __init__(self, config, config_dict=None, **kwargs): @@ -434,6 +447,11 @@ def __init__(self, config, config_dict=None, **kwargs): self.description = config_dict.get("description", None) # Annotate this as true to prevent sharing of data. self.private = config_dict.get("private", DEFAULT_PRIVATE) + # short label describing the quota source or null to use default + # quota source right on user object. + quota_config = config_dict.get("quota", {}) + self.quota_source = quota_config.get("source", DEFAULT_QUOTA_SOURCE) + self.quota_enabled = quota_config.get("enabled", DEFAULT_QUOTA_ENABLED) def to_dict(self): rval = super().to_dict() @@ -441,6 +459,10 @@ def to_dict(self): rval["store_by"] = self.store_by rval["name"] = self.name rval["description"] = self.description + rval["quota"] = { + "source": self.quota_source, + "enabled": self.quota_enabled, + } return rval def _get_concrete_store_name(self, obj): @@ -455,6 +477,13 @@ def _get_store_by(self, obj): def _is_private(self, obj): return self.private + def get_quota_source_map(self): + quota_source_map = QuotaSourceMap( + self.quota_source, + self.quota_enabled, + ) + return quota_source_map + class DiskObjectStore(ConcreteObjectStore): """ @@ -506,7 +535,12 @@ def parse_xml(clazz, config_xml): if name is not None: config_dict["name"] = name for e in config_xml: - if e.tag == "files_dir": + if e.tag == "quota": + config_dict["quota"] = { + "source": e.get("source", DEFAULT_QUOTA_SOURCE), + "enabled": asbool(e.get("enabled", DEFAULT_QUOTA_ENABLED)), + } + elif e.tag == "files_dir": config_dict["files_dir"] = e.get("path") elif e.tag == "description": config_dict["description"] = e.text @@ -887,6 +921,7 @@ def __init__(self, config, config_dict, fsmon=False): removing backends when they get too full. """ super().__init__(config, config_dict) + self._quota_source_map = None self.backends = {} self.weighted_backend_ids = [] @@ -1043,6 +1078,21 @@ def _call_method(self, method, obj, default, default_is_exception, **kwargs): else: return default + def get_quota_source_map(self): + if self._quota_source_map is None: + quota_source_map = QuotaSourceMap() + self._merge_quota_source_map(quota_source_map, self) + self._quota_source_map = quota_source_map + return self._quota_source_map + + @classmethod + def _merge_quota_source_map(clz, quota_source_map, object_store): + for backend_id, backend in object_store.backends.items(): + if isinstance(backend, DistributedObjectStore): + clz._merge_quota_source_map(quota_source_map, backend) + else: + quota_source_map.backends[backend_id] = backend.get_quota_source_map() + def __get_store_id_for(self, obj, **kwargs): if obj.object_store_id is not None: if obj.object_store_id in self.backends: @@ -1075,7 +1125,6 @@ def object_store_ids(self, private=None): class HierarchicalObjectStore(NestedObjectStore): - """ ObjectStore that defers to a list of backends. @@ -1097,10 +1146,20 @@ def __init__(self, config, config_dict, fsmon=False): assert ( is_private == backend_is_private ), "The private attribute must be defined on the HierarchicalObjectStore and not contained concrete objectstores." + backend_quota = backend_def.get("quota") + if backend_quota is not None: + # Make sure just was using defaults - because cannot override what is + # is setup by the HierarchicalObjectStore. + assert backend_quota.get("source", DEFAULT_QUOTA_SOURCE) == DEFAULT_QUOTA_SOURCE + assert backend_quota.get("enabled", DEFAULT_QUOTA_ENABLED) == DEFAULT_QUOTA_ENABLED + backends[order] = build_object_store_from_config(config, config_dict=backend_def, fsmon=fsmon) self.backends = backends self.private = is_private + quota_config = config_dict.get("quota", {}) + self.quota_source = quota_config.get("source", DEFAULT_QUOTA_SOURCE) + self.quota_enabled = quota_config.get("enabled", DEFAULT_QUOTA_ENABLED) @classmethod def parse_xml(clazz, config_xml): @@ -1145,6 +1204,13 @@ def _is_private(self, obj): # the same way. return self.private + def get_quota_source_map(self): + quota_source_map = QuotaSourceMap( + self.quota_source, + self.quota_enabled, + ) + return quota_source_map + def type_to_object_store_class(store, fsmon=False): objectstore_class: Type[ObjectStore] @@ -1295,6 +1361,66 @@ def config_to_dict(config): } +class QuotaSourceInfo(NamedTuple): + label: Optional[str] + use: bool + + +class QuotaSourceMap: + def __init__(self, source=DEFAULT_QUOTA_SOURCE, enabled=DEFAULT_QUOTA_ENABLED): + self.default_quota_source = source + self.default_quota_enabled = enabled + self.info = QuotaSourceInfo(self.default_quota_source, self.default_quota_enabled) + self.backends = {} + self._labels = None + + def get_quota_source_info(self, object_store_id): + if object_store_id in self.backends: + return self.backends[object_store_id].get_quota_source_info(object_store_id) + else: + return self.info + + def get_quota_source_label(self, object_store_id): + if object_store_id in self.backends: + return self.backends[object_store_id].get_quota_source_label(object_store_id) + else: + return self.default_quota_source + + def get_quota_source_labels(self): + if self._labels is None: + labels = set() + if self.default_quota_source: + labels.add(self.default_quota_source) + for backend in self.backends.values(): + labels = labels.union(backend.get_quota_source_labels()) + self._labels = labels + return self._labels + + def default_usage_excluded_ids(self): + exclude_object_store_ids = [] + for backend_id, backend_source_map in self.backends.items(): + if backend_source_map.default_quota_source is not None: + exclude_object_store_ids.append(backend_id) + elif not backend_source_map.default_quota_enabled: + exclude_object_store_ids.append(backend_id) + return exclude_object_store_ids + + def get_id_to_source_pairs(self): + pairs = [] + for backend_id, backend_source_map in self.backends.items(): + if backend_source_map.default_quota_source is not None and backend_source_map.default_quota_enabled: + pairs.append((backend_id, backend_source_map.default_quota_source)) + return pairs + + def ids_per_quota_source(self): + quota_sources: Dict[str, List[str]] = {} + for (object_id, quota_source_label) in self.get_id_to_source_pairs(): + if quota_source_label not in quota_sources: + quota_sources[quota_source_label] = [] + quota_sources[quota_source_label].append(object_id) + return quota_sources + + class ObjectStorePopulator: """Small helper for interacting with the object store and making sure all datasets from a job end up with the same object_store_id. diff --git a/lib/galaxy/quota/__init__.py b/lib/galaxy/quota/__init__.py index ae074228f0dc..b01cf47d5ae6 100644 --- a/lib/galaxy/quota/__init__.py +++ b/lib/galaxy/quota/__init__.py @@ -23,12 +23,12 @@ class QuotaAgent: # metaclass=abc.ABCMeta """ # TODO: make abstractmethod after they work better with mypy - def get_quota(self, user): + def get_quota(self, user, quota_source_label=None): """Return quota in bytes or None if no quota is set.""" - def get_quota_nice_size(self, user): + def get_quota_nice_size(self, user, quota_source_label=None): """Return quota as a human-readable string or 'unlimited' if no quota is set.""" - quota_bytes = self.get_quota(user) + quota_bytes = self.get_quota(user, quota_source_label=quota_source_label) if quota_bytes is not None: quota_str = galaxy.util.nice_size(quota_bytes) else: @@ -36,10 +36,10 @@ def get_quota_nice_size(self, user): return quota_str # TODO: make abstractmethod after they work better with mypy - def get_percent(self, trans=None, user=False, history=False, usage=False, quota=False): + def get_percent(self, trans=None, user=False, history=False, usage=False, quota=False, quota_source_label=None): """Return the percentage of any storage quota applicable to the user/transaction.""" - def get_usage(self, trans=None, user=False, history=False): + def get_usage(self, trans=None, user=False, history=False, quota_source_label=None): if trans: user = trans.user history = trans.history @@ -48,7 +48,14 @@ def get_usage(self, trans=None, user=False, history=False): assert history, "Could not determine anonymous user's history." usage = history.disk_size else: - usage = user.total_disk_usage + if quota_source_label is None: + usage = user.total_disk_usage + else: + quota_source_usage = user.quota_source_usage_for(quota_source_label) + if not quota_source_usage or quota_source_usage.disk_usage is None: + usage = 0.0 + else: + usage = quota_source_usage.disk_usage return usage def is_over_quota(self, app, job, job_destination): @@ -66,14 +73,14 @@ class NoQuotaAgent(QuotaAgent): def __init__(self): pass - def get_quota(self, user): + def get_quota(self, user, quota_source_label=None): return None @property def default_quota(self): return None - def get_percent(self, trans=None, user=False, history=False, usage=False, quota=False): + def get_percent(self, trans=None, user=False, history=False, usage=False, quota=False, quota_source_label=None): return None def is_over_quota(self, app, job, job_destination): @@ -87,7 +94,7 @@ def __init__(self, model): self.model = model self.sa_session = model.context - def get_quota(self, user): + def get_quota(self, user, quota_source_label=None): """ Calculated like so: @@ -100,7 +107,7 @@ def get_quota(self, user): quotas. """ if not user: - return self._default_unregistered_quota + return self._default_unregistered_quota(quota_source_label) query = text( """ SELECT ( @@ -111,8 +118,9 @@ def get_quota(self, user): (SELECT default_quota.bytes FROM quota as default_quota LEFT JOIN default_quota_association on default_quota.id = default_quota_association.quota_id - WHERE default_quota_association.type == 'registered' - AND default_quota.deleted != :is_true)) + WHERE default_quota_association.type = 'registered' + AND default_quota.deleted != :is_true + AND default_quota.quota_source_label {label_cond})) + (CASE WHEN SUM(CASE WHEN union_quota.operation = '=' AND union_quota.bytes = -1 THEN 1 ELSE 0 @@ -129,46 +137,60 @@ def get_quota(self, user): ) FROM ( SELECT user_quota.operation as operation, user_quota.bytes as bytes - FROM galaxy_user as user - LEFT JOIN user_quota_association as uqa on user.id = uqa.user_id + FROM galaxy_user as guser + LEFT JOIN user_quota_association as uqa on guser.id = uqa.user_id LEFT JOIN quota as user_quota on user_quota.id = uqa.quota_id WHERE user_quota.deleted != :is_true - AND user.id = :user_id + AND user_quota.quota_source_label {label_cond} + AND guser.id = :user_id UNION ALL SELECT group_quota.operation as operation, group_quota.bytes as bytes - FROM galaxy_user as user - LEFT JOIN user_group_association as uga on user.id = uga.user_id + FROM galaxy_user as guser + LEFT JOIN user_group_association as uga on guser.id = uga.user_id LEFT JOIN galaxy_group on galaxy_group.id = uga.group_id LEFT JOIN group_quota_association as gqa on galaxy_group.id = gqa.group_id LEFT JOIN quota as group_quota on group_quota.id = gqa.quota_id WHERE group_quota.deleted != :is_true - AND user.id = :user_id + AND group_quota.quota_source_label {label_cond} + AND guser.id = :user_id ) as union_quota -""" +""".format( + label_cond="IS NULL" if quota_source_label is None else " = :label" + ) ) conn = self.sa_session.connection() with conn.begin(): - res = conn.execute(query, is_true=True, user_id=user.id).fetchone() + res = conn.execute(query, is_true=True, user_id=user.id, label=quota_source_label).fetchone() if res: - return res[0] + return int(res[0]) if res[0] else None else: return None - @property - def _default_unregistered_quota(self): - return self._default_quota(self.model.DefaultQuotaAssociation.types.UNREGISTERED) + def _default_unregistered_quota(self, quota_source_label): + return self._default_quota(self.model.DefaultQuotaAssociation.types.UNREGISTERED, quota_source_label) - def _default_quota(self, default_type): - dqa = ( - self.sa_session.query(self.model.DefaultQuotaAssociation) - .filter(self.model.DefaultQuotaAssociation.type == default_type) - .first() + def _default_quota(self, default_type, quota_source_label): + label_condition = "IS NULL" if quota_source_label is None else "= :label" + query = text( + """ +SELECT bytes +FROM quota as default_quota +LEFT JOIN default_quota_association on default_quota.id = default_quota_association.quota_id +WHERE default_quota_association.type = :default_type + AND default_quota.deleted != :is_true + AND default_quota.quota_source_label {label_condition} +""".format( + label_condition=label_condition + ) ) - if not dqa: - return None - if dqa.quota.bytes < 0: - return None - return dqa.quota.bytes + + conn = self.sa_session.connection() + with conn.begin(): + res = conn.execute(query, is_true=True, label=quota_source_label, default_type=default_type).fetchone() + if res: + return res[0] + else: + return None def set_default_quota(self, default_type, quota): # Unset the current default(s) associated with this quota, if there are any @@ -180,20 +202,25 @@ def set_default_quota(self, default_type, quota): for gqa in quota.groups: self.sa_session.delete(gqa) # Find the old default, assign the new quota if it exists - dqa = ( + label = quota.quota_source_label + dqas = ( self.sa_session.query(self.model.DefaultQuotaAssociation) - .filter(self.model.DefaultQuotaAssociation.type == default_type) - .first() + .filter(self.model.DefaultQuotaAssociation.table.c.type == default_type) + .all() ) - if dqa: - dqa.quota = quota + target_default = None + for dqa in dqas: + if dqa.quota.quota_source_label == label and not dqa.quota.deleted: + target_default = dqa + if target_default: + target_default.quota = quota # Or create if necessary else: - dqa = self.model.DefaultQuotaAssociation(default_type, quota) - self.sa_session.add(dqa) + target_default = self.model.DefaultQuotaAssociation(default_type, quota) + self.sa_session.add(target_default) self.sa_session.flush() - def get_percent(self, trans=None, user=False, history=False, usage=False, quota=False): + def get_percent(self, trans=None, user=False, history=False, usage=False, quota=False, quota_source_label=None): """ Return the percentage of any storage quota applicable to the user/transaction. """ @@ -203,13 +230,13 @@ def get_percent(self, trans=None, user=False, history=False, usage=False, quota= history = trans.history # if quota wasn't passed, attempt to get the quota if quota is False: - quota = self.get_quota(user) + quota = self.get_quota(user, quota_source_label=quota_source_label) # return none if no applicable quotas or quotas disabled if quota is None: return None # get the usage, if it wasn't passed if usage is False: - usage = self.get_usage(trans, user, history) + usage = self.get_usage(trans, user, history, quota_source_label=quota_source_label) try: return min((int(float(usage) / quota * 100), 100)) except ZeroDivisionError: @@ -239,10 +266,19 @@ def set_entity_quota_associations(self, quotas=None, users=None, groups=None, de self.sa_session.flush() def is_over_quota(self, app, job, job_destination): - quota = self.get_quota(job.user) + # Doesn't work because job.object_store_id until inside handler :_( + # quota_source_label = job.quota_source_label + if job_destination is not None: + object_store_id = job_destination.params.get("object_store_id", None) + object_store = app.object_store + quota_source_map = object_store.get_quota_source_map() + quota_source_label = quota_source_map.get_quota_source_info(object_store_id).label + else: + quota_source_label = None + quota = self.get_quota(job.user, quota_source_label=quota_source_label) if quota is not None: try: - usage = self.get_usage(user=job.user, history=job.history) + usage = self.get_usage(user=job.user, history=job.history, quota_source_label=quota_source_label) if usage > quota: return True except AssertionError: diff --git a/lib/galaxy/quota/_schema.py b/lib/galaxy/quota/_schema.py index 11d3b29e45b4..7691fa11224d 100644 --- a/lib/galaxy/quota/_schema.py +++ b/lib/galaxy/quota/_schema.py @@ -108,6 +108,11 @@ class QuotaBase(BaseModel): description="The `encoded identifier` of the quota.", ) name: str = QuotaNameField + quota_source_label: Optional[str] = Field( + None, + title="Quota Source Label", + description="Quota source label", + ) class QuotaSummary(QuotaBase): @@ -184,6 +189,11 @@ class CreateQuotaParams(BaseModel): " equivalent to ``no``." ), ) + quota_source_label: Optional[str] = Field( + default=None, + title="Quota Source Label", + description="If set, quota source label to apply this quota operation to. Otherwise, the deafult quota is used.", + ) in_users: Optional[List[str]] = Field( default=[], title="Users", diff --git a/lib/galaxy/webapps/base/webapp.py b/lib/galaxy/webapps/base/webapp.py index 2ac753025e5c..e93e550a15f4 100644 --- a/lib/galaxy/webapps/base/webapp.py +++ b/lib/galaxy/webapps/base/webapp.py @@ -808,7 +808,7 @@ def _associate_user_history(self, user, prev_galaxy_session=None): # Increase the user's disk usage by the amount of the previous history's datasets if they didn't already # own it. for hda in history.datasets: - user.adjust_total_disk_usage(hda.quota_amount(user)) + user.adjust_total_disk_usage(hda.quota_amount(user), hda.dataset.quota_source_info.label) # Only set default history permissions if the history is from the previous session and anonymous set_permissions = True elif self.galaxy_session.current_history: diff --git a/lib/galaxy/webapps/galaxy/api/users.py b/lib/galaxy/webapps/galaxy/api/users.py index 4aeef672942d..48505cf57952 100644 --- a/lib/galaxy/webapps/galaxy/api/users.py +++ b/lib/galaxy/webapps/galaxy/api/users.py @@ -5,6 +5,7 @@ import json import logging import re +from typing import Optional from fastapi import ( Response, @@ -222,6 +223,37 @@ def _get_user_full(self, trans, user_id, **kwd): except Exception: raise exceptions.RequestParameterInvalidException("Invalid user id specified", id=user_id) + @expose_api + def usage(self, trans, user_id: str, **kwd): + """ + GET /api/users/{user_id}/usage + + Get user's disk usage broken down by quota source. + """ + user = self._get_user_full(trans, user_id, **kwd) + if user: + rval = self.user_serializer.serialize_disk_usage(user) + return rval + else: + return [] + + @expose_api + def usage_for(self, trans, user_id: str, label: str, **kwd): + """ + GET /api/users/{user_id}/usage/{label} + + Get user's disk usage for supplied quota source label. + """ + user = self._get_user_full(trans, user_id, **kwd) + effective_label: Optional[str] = label + if label == "__null__": + effective_label = None + if user: + rval = self.user_serializer.serialize_disk_usage_for(user, effective_label) + return rval + else: + return None + @expose_api def create(self, trans: GalaxyWebTransaction, payload: dict, **kwd): """ @@ -321,7 +353,7 @@ def anon_user_api_value(self, trans): if not trans.user and not trans.history: # Can't return info about this user, may not have a history yet. return {} - usage = trans.app.quota_agent.get_usage(trans) + usage = trans.app.quota_agent.get_usage(trans, history=trans.history) percent = trans.app.quota_agent.get_percent(trans=trans, usage=usage) return { "total_disk_usage": int(usage), diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index 83757e53074c..90d7885da28a 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -570,6 +570,12 @@ def populate_api_routes(webapp, app): conditions=dict(method=["POST"]), ) + webapp.mapper.connect( + "/api/users/{user_id}/usage", action="usage", controller="users", conditions=dict(method=["GET"]) + ) + webapp.mapper.connect( + "/api/users/{user_id}/usage/{label}", action="usage_for", controller="users", conditions=dict(method=["GET"]) + ) webapp.mapper.resource_with_deleted("user", "users", path_prefix="/api") webapp.mapper.resource("visualization", "visualizations", path_prefix="/api") webapp.mapper.resource("plugins", "plugins", path_prefix="/api") diff --git a/lib/galaxy/webapps/galaxy/controllers/admin.py b/lib/galaxy/webapps/galaxy/controllers/admin.py index 1edfd57d9d0f..8b7456f8c92e 100644 --- a/lib/galaxy/webapps/galaxy/controllers/admin.py +++ b/lib/galaxy/webapps/galaxy/controllers/admin.py @@ -710,6 +710,9 @@ def create_quota(self, trans, payload=None, **kwd): if trans.request.method == "GET": all_users = [] all_groups = [] + labels = trans.app.object_store.get_quota_source_map().get_quota_source_labels() + label_options = [("Default Quota", None)] + label_options.extend([(label, label) for label in labels]) for user in ( trans.sa_session.query(trans.app.model.User) .filter(trans.app.model.User.table.c.deleted == false()) @@ -725,7 +728,7 @@ def create_quota(self, trans, payload=None, **kwd): default_options = [("No", "no")] for type_ in trans.app.model.DefaultQuotaAssociation.types: default_options.append((f"Yes, {type_}", type_)) - return { + rval = { "title": "Create Quota", "inputs": [ {"name": "name", "label": "Name"}, @@ -742,10 +745,23 @@ def create_quota(self, trans, payload=None, **kwd): "options": default_options, "help": "Warning: Any users or groups associated with this quota will be disassociated.", }, - build_select_input("in_groups", "Groups", all_groups, []), - build_select_input("in_users", "Users", all_users, []), ], } + if len(label_options) > 1: + rval["inputs"].append( + { + "name": "quota_source_label", + "label": "Apply quota to labeled object stores.", + "options": label_options, + } + ) + rval["inputs"].extend( + [ + build_select_input("in_groups", "Groups", all_groups, []), + build_select_input("in_users", "Users", all_users, []), + ] + ) + return rval else: try: quota, message = self.quota_manager.create_quota(payload, decode_id=trans.security.decode_id) diff --git a/lib/galaxy/webapps/galaxy/controllers/dataset.py b/lib/galaxy/webapps/galaxy/controllers/dataset.py index c7de005a16ae..efc8e3ee96ac 100644 --- a/lib/galaxy/webapps/galaxy/controllers/dataset.py +++ b/lib/galaxy/webapps/galaxy/controllers/dataset.py @@ -927,7 +927,7 @@ def _purge(self, trans, dataset_id): hda.deleted = True # HDA is purgeable # Decrease disk usage first - hda.purge_usage_from_quota(user) + hda.purge_usage_from_quota(user, hda.dataset.quota_source_info) # Mark purged hda.purged = True trans.sa_session.add(hda) diff --git a/lib/galaxy/webapps/galaxy/controllers/history.py b/lib/galaxy/webapps/galaxy/controllers/history.py index 825854476b61..3a477ed96edb 100644 --- a/lib/galaxy/webapps/galaxy/controllers/history.py +++ b/lib/galaxy/webapps/galaxy/controllers/history.py @@ -833,7 +833,7 @@ def purge_deleted_datasets(self, trans): for hda in trans.history.datasets: if not hda.deleted or hda.purged: continue - hda.purge_usage_from_quota(trans.user) + hda.purge_usage_from_quota(trans.user, hda.dataset.quota_source_info) hda.purged = True trans.sa_session.add(hda) trans.log_event(f"HDA id {hda.id} has been purged") diff --git a/lib/galaxy/webapps/galaxy/services/datasets.py b/lib/galaxy/webapps/galaxy/services/datasets.py index 24604477675e..738029fae9a2 100644 --- a/lib/galaxy/webapps/galaxy/services/datasets.py +++ b/lib/galaxy/webapps/galaxy/services/datasets.py @@ -82,6 +82,15 @@ class RequestDataType(str, Enum): in_use_state = "in_use_state" +class ConcreteObjectStoreQuotaSourceDetails(Model): + source: Optional[str] = Field( + description="The quota source label corresponding to the object store the dataset is stored in (or would be stored in)" + ) + enabled: bool = Field( + description="Whether the object store tracks quota on the data (independent of Galaxy's configuration)" + ) + + class DatasetStorageDetails(Model): object_store_id: Optional[str] = Field( description="The identifier of the destination ObjectStore for this dataset.", @@ -103,6 +112,7 @@ class DatasetStorageDetails(Model): sharable: bool = Field( description="Is this dataset sharable.", ) + quota: dict = Field(description="Information about quota sources around dataset storage.") class DatasetInheritanceChainEntry(Model): @@ -352,6 +362,13 @@ def show_storage( except FileNotFoundError: # uninitalized directory (emtpy) disk object store can cause this... percent_used = None + + quota_source = dataset.quota_source_info + quota = ConcreteObjectStoreQuotaSourceDetails( + source=quota_source.label, + enabled=quota_source.use, + ) + dataset_state = dataset.state hashes = [h.to_dict() for h in dataset.hashes] sources = [s.to_dict() for s in dataset.sources] @@ -364,6 +381,7 @@ def show_storage( dataset_state=dataset_state, hashes=hashes, sources=sources, + quota=quota, ) def show_inheritance_chain( diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index 9ad74b6d5bf0..1a4e94b92277 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -1060,6 +1060,17 @@ def user_private_role_id(self) -> str: assert "id" in role, role return role["id"] + def get_usage(self) -> List[Dict[str, Any]]: + usage_response = self.galaxy_interactor.get("users/current/usage") + usage_response.raise_for_status() + return usage_response.json() + + def get_usage_for(self, label: Optional[str]) -> Dict[str, Any]: + label_as_str = label if label is not None else "__null__" + usage_response = self.galaxy_interactor.get(f"users/current/usage/{label_as_str}") + usage_response.raise_for_status() + return usage_response.json() + def create_role(self, user_ids: list, description: Optional[str] = None) -> dict: payload = { "name": self.get_random_name(prefix="testpop"), @@ -1071,13 +1082,13 @@ def create_role(self, user_ids: list, description: Optional[str] = None) -> dict return role_response.json() def create_quota(self, quota_payload: dict) -> dict: - quota_response = self._post("quotas", data=quota_payload, admin=True) - quota_response.raise_for_status() + quota_response = self._post("quotas", data=quota_payload, admin=True, json=True) + api_asserts.assert_status_code_is_ok(quota_response) return quota_response.json() def get_quotas(self) -> list: quota_response = self._get("quotas", admin=True) - quota_response.raise_for_status() + api_asserts.assert_status_code_is_ok(quota_response) return quota_response.json() def make_private(self, history_id: str, dataset_id: str) -> dict: @@ -1088,14 +1099,14 @@ def make_private(self, history_id: str, dataset_id: str) -> dict: "manage": [role_id], } response = self.update_permissions_raw(history_id, dataset_id, payload) - response.raise_for_status() + api_asserts.assert_status_code_is_ok(response) return response.json() - def make_public_raw(self, history_id: str, dataset_id: str) -> Response: + def make_dataset_public_raw(self, history_id: str, dataset_id: str) -> Response: role_id = self.user_private_role_id() payload = { - "access": json.dumps([]), - "manage": json.dumps([role_id]), + "access": [], + "manage": [role_id], } response = self.update_permissions_raw(history_id, dataset_id, payload) return response @@ -1103,11 +1114,11 @@ def make_public_raw(self, history_id: str, dataset_id: str) -> Response: def update_permissions_raw(self, history_id: str, dataset_id: str, payload: dict) -> Response: url = f"histories/{history_id}/contents/{dataset_id}/permissions" update_response = self._put(url, payload, admin=True, json=True) - update_response.raise_for_status() return update_response def make_public(self, history_id: str) -> dict: sharing_response = self._put(f"histories/{history_id}/publish") + api_asserts.assert_status_code_is_ok(sharing_response) assert sharing_response.status_code == 200 return sharing_response.json() diff --git a/scripts/cleanup_datasets/pgcleanup.py b/scripts/cleanup_datasets/pgcleanup.py index 5b28ecfea5ac..8e7e3eaef1bc 100755 --- a/scripts/cleanup_datasets/pgcleanup.py +++ b/scripts/cleanup_datasets/pgcleanup.py @@ -10,6 +10,7 @@ import inspect import logging import os +import re import string import sys import time @@ -26,6 +27,7 @@ import galaxy.config from galaxy.exceptions import ObjectNotFound +from galaxy.model import calculate_user_disk_usage_statements from galaxy.objectstore import build_object_store_from_config from galaxy.util.script import ( app_properties_from_args, @@ -76,6 +78,7 @@ class Action: directly.) """ + requires_objectstore = True update_time_sql = ", update_time = NOW() AT TIME ZONE 'utc'" force_retry_sql = " AND NOT purged" primary_key = None @@ -116,6 +119,9 @@ def __init__(self, app): self.__row_methods = [] self.__post_methods = [] self.__exit_methods = [] + if self.requires_objectstore: + self.object_store = build_object_store_from_config(self._config) + self._register_exit_method(self.object_store.shutdown) self._init() def __enter__(self): @@ -248,13 +254,14 @@ def _init(self): class RemovesObjects: """Base class for mixins that remove objects from object stores.""" + requires_objectstore = True + def _init(self): + super()._init() self.objects_to_remove = set() log.info("Initializing object store for action %s", self.name) - self.object_store = build_object_store_from_config(self._config) self._register_row_method(self.collect_removed_object_info) self._register_post_method(self.remove_objects) - self._register_exit_method(self.object_store.shutdown) def collect_removed_object_info(self, row): object_id = getattr(row, self.id_column, None) @@ -361,7 +368,10 @@ class RequiresDiskUsageRecalculation: To use, ensure your query returns a ``recalculate_disk_usage_user_id`` column. """ + requires_objectstore = True + def _init(self): + super()._init() self.__recalculate_disk_usage_user_ids = set() self._register_row_method(self.collect_recalculate_disk_usage_user_id) self._register_post_method(self.recalculate_disk_usage) @@ -381,30 +391,19 @@ def recalculate_disk_usage(self): """ log.info("Recalculating disk usage for users whose data were purged") for user_id in sorted(self.__recalculate_disk_usage_user_ids): - # TODO: h.purged = false should be unnecessary once all hdas in purged histories are purged. - sql = """ - UPDATE galaxy_user - SET disk_usage = ( - SELECT COALESCE(SUM(total_size), 0) - FROM ( SELECT d.total_size - FROM history_dataset_association hda - JOIN history h ON h.id = hda.history_id - JOIN dataset d ON hda.dataset_id = d.id - WHERE h.user_id = %(user_id)s - AND h.purged = false - AND hda.purged = false - AND d.purged = false - AND d.id NOT IN (SELECT dataset_id - FROM library_dataset_dataset_association) - GROUP BY d.id) AS sizes) - WHERE id = %(user_id)s - RETURNING disk_usage; - """ - args = {"user_id": user_id} - cur = self._update(sql, args, add_event=False) - for row in cur: - # disk_usage might be None (e.g. user has purged all data) - self.log.info("recalculate_disk_usage user_id %i to %s bytes" % (user_id, row.disk_usage)) + quota_source_map = self.object_store.get_quota_source_map() + statements = calculate_user_disk_usage_statements(user_id, quota_source_map) + + for (sql, args) in statements: + sql, _ = re.subn(r"\:([\w]+)", r"%(\1)s", sql) + new_args = {} + for key, val in args.items(): + if isinstance(val, list): + val = tuple(val) + new_args[key] = val + self._update(sql, new_args, add_event=False) + + self.log.info("recalculate_disk_usage user_id %i" % user_id) class RemovesMetadataFiles(RemovesObjects): diff --git a/scripts/set_user_disk_usage.py b/scripts/set_user_disk_usage.py index 1aad6d48a161..311e146cf6f0 100755 --- a/scripts/set_user_disk_usage.py +++ b/scripts/set_user_disk_usage.py @@ -45,18 +45,18 @@ def init(): return init_models_from_config(config, object_store=object_store), object_store, engine -def quotacheck(sa_session, users, engine): +def quotacheck(sa_session, users, engine, object_store): sa_session.refresh(user) current = user.get_disk_usage() print(user.username, "<" + user.email + ">:", end=" ") if not args.dryrun: # Apply new disk usage - user.calculate_and_set_disk_usage() + user.calculate_and_set_disk_usage(object_store) # And fetch new = user.get_disk_usage() else: - new = user.calculate_disk_usage() + new = user.calculate_disk_usage_default_source(object_store) print("old usage:", nice_size(current), "change:", end=" ") if new in (current, None): @@ -78,7 +78,7 @@ def quotacheck(sa_session, users, engine): print("Processing %i users..." % user_count) for i, user in enumerate(sa_session.query(model.User).enable_eagerloads(False).yield_per(1000)): print("%3i%%" % int(float(i) / user_count * 100), end=" ") - quotacheck(sa_session, user, engine) + quotacheck(sa_session, user, engine, object_store) print("100% complete") object_store.shutdown() sys.exit(0) @@ -89,5 +89,5 @@ def quotacheck(sa_session, users, engine): if not user: print("User not found") sys.exit(1) + quotacheck(sa_session, user, engine, object_store) object_store.shutdown() - quotacheck(sa_session, user, engine) diff --git a/test/integration/objectstore/test_private_handling.py b/test/integration/objectstore/test_private_handling.py index b7914fd6cf7f..acc8d60754d4 100644 --- a/test/integration/objectstore/test_private_handling.py +++ b/test/integration/objectstore/test_private_handling.py @@ -30,8 +30,9 @@ def test_both_types(self): hda = self.dataset_populator.new_dataset(history_id, content=TEST_INPUT_FILES_CONTENT, wait=True) content = self.dataset_populator.get_history_dataset_content(history_id, hda["id"]) assert content.startswith(TEST_INPUT_FILES_CONTENT) - response = self.dataset_populator.make_public_raw(history_id, hda["id"]) - assert response.status_code != 200 + response = self.dataset_populator.make_dataset_public_raw(history_id, hda["id"]) + api_asserts.assert_status_code_is(response, 400) + api_asserts.assert_error_code_is(response, 400008) api_asserts.assert_error_message_contains(response, "Attempting to share a non-sharable dataset.") diff --git a/test/integration/objectstore/test_quota_limit.py b/test/integration/objectstore/test_quota_limit.py new file mode 100644 index 000000000000..781fbbf85d34 --- /dev/null +++ b/test/integration/objectstore/test_quota_limit.py @@ -0,0 +1,72 @@ +from ._base import BaseObjectStoreIntegrationTestCase +from .test_selection_with_resource_parameters import ( + DISTRIBUTED_OBJECT_STORE_CONFIG_TEMPLATE, + JOB_CONFIG_FILE, + JOB_RESOURCE_PARAMETERS_CONFIG_FILE, +) + + +class QuotaIntegrationTestCase(BaseObjectStoreIntegrationTestCase): + @classmethod + def handle_galaxy_config_kwds(cls, config): + cls._configure_object_store(DISTRIBUTED_OBJECT_STORE_CONFIG_TEMPLATE, config) + config["job_config_file"] = JOB_CONFIG_FILE + config["job_resource_params_file"] = JOB_RESOURCE_PARAMETERS_CONFIG_FILE + config["enable_quotas"] = True + + def test_selection_limit(self): + with self.dataset_populator.test_history() as history_id: + + hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3\n4 5 6\n7 8 9\n") + self.dataset_populator.wait_for_history(history_id) + hda1_input = {"src": "hda", "id": hda1["id"]} + + quotas = self.dataset_populator.get_quotas() + assert len(quotas) == 0 + + payload = { + "name": "defaultquota1", + "description": "first default quota", + "amount": "1 bytes", + "operation": "=", + "default": "registered", + } + self.dataset_populator.create_quota(payload) + + payload = { + "name": "ebsquota1", + "description": "first ebs quota", + "amount": "100 MB", + "operation": "=", + "default": "registered", + "quota_source_label": "ebs", + } + self.dataset_populator.create_quota(payload) + + quotas = self.dataset_populator.get_quotas() + assert len(quotas) == 2 + + hda2 = self.dataset_populator.new_dataset(history_id, content="1 2 3\n4 5 6\n7 8 9\n") + self.dataset_populator.wait_for_history(history_id) + + hda2_now = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda2, wait=False) + assert hda2_now["state"] == "paused" + + create_10_inputs = { + "input1": hda1_input, + "input2": hda1_input, + "__job_resource|__job_resource__select": "yes", + "__job_resource|how_store": "slow", + } + create10_response = self.dataset_populator.run_tool( + "create_10", + create_10_inputs, + history_id, + assert_ok=False, + ) + job_id = create10_response["jobs"][0]["id"] + self.dataset_populator.wait_for_job(job_id) + job_details = self.dataset_populator.get_job_details(job_id).json() + # This job isn't paused, it goes through because we used a different + # objectstore using job parameters. + assert job_details["state"] == "ok" diff --git a/test/integration/objectstore/test_selection_with_resources_parameters.py b/test/integration/objectstore/test_selection_with_resource_parameters.py similarity index 83% rename from test/integration/objectstore/test_selection_with_resources_parameters.py rename to test/integration/objectstore/test_selection_with_resource_parameters.py index 60d9d8f45468..ef5aac71e752 100644 --- a/test/integration/objectstore/test_selection_with_resources_parameters.py +++ b/test/integration/objectstore/test_selection_with_resource_parameters.py @@ -33,11 +33,13 @@ + + @@ -63,7 +65,8 @@ def handle_galaxy_config_kwds(cls, config): config["job_config_file"] = JOB_CONFIG_FILE config["job_resource_params_file"] = JOB_RESOURCE_PARAMETERS_CONFIG_FILE config["object_store_store_by"] = "uuid" - config["metadata_strategy"] = "celery_extended" + # Broken in dev https://github.com/galaxyproject/galaxy/pull/14055 + # config["metadata_strategy"] = "celery_extended" config["outputs_to_working_directory"] = True def _object_store_counts(self): @@ -87,7 +90,7 @@ def _assert_no_external_filename(self): for external_filename_tuple in self._app.model.session.query(Dataset.external_filename).all(): assert external_filename_tuple[0] is None - def test_tool_simple_constructs(self): + def test_objectstore_selection(self): with self.dataset_populator.test_history() as history_id: def _run_tool(tool_id, inputs): @@ -109,11 +112,23 @@ def _run_tool(tool_id, inputs): # One file uploaded, added to default object store ID. self._assert_file_counts(1, 0, 0, 0) + usage_list = self.dataset_populator.get_usage() + # assert len(usage_list) == 1 + assert usage_list[0]["quota_source_label"] is None + assert usage_list[0]["total_disk_usage"] == 6 + + usage = self.dataset_populator.get_usage_for(None) + assert usage["quota_source_label"] is None + assert usage["total_disk_usage"] == 6 # should create two files in static object store. _run_tool("multi_data_param", {"f1": hda1_input, "f2": hda1_input}) self._assert_file_counts(1, 2, 0, 0) + usage = self.dataset_populator.get_usage_for(None) + assert usage["quota_source_label"] is None + assert usage["total_disk_usage"] == 18 + # should create two files in ebs object store. create_10_inputs_1 = { "input1": hda1_input, @@ -122,6 +137,18 @@ def _run_tool(tool_id, inputs): _run_tool("create_10", create_10_inputs_1) self._assert_file_counts(1, 2, 10, 0) + usage = self.dataset_populator.get_usage_for("ebs") + assert usage["quota_source_label"] == "ebs" + assert usage["total_disk_usage"] == 21 + + usage_list = self.dataset_populator.get_usage() + # assert len(usage_list) == 2 + assert usage_list[0]["quota_source_label"] is None + assert usage_list[0]["total_disk_usage"] == 18 + ebs_usage = [u for u in usage_list if u["quota_source_label"] == "ebs"][0] + assert ebs_usage["quota_source_label"] == "ebs" + assert ebs_usage["total_disk_usage"] == 21, str(usage_list) + # should create 10 files in S3 object store. create_10_inputs_2 = { "__job_resource|__job_resource__select": "yes", diff --git a/test/integration/test_quota.py b/test/integration/test_quota.py index 87be3b2fd26c..8fe07a4aef22 100644 --- a/test/integration/test_quota.py +++ b/test/integration/test_quota.py @@ -148,6 +148,26 @@ def test_400_when_show_unknown_quota(self): show_response = self._get(f"quotas/{quota_id}") self._assert_status_code_is(show_response, 400) + def test_quota_source_label_basics(self): + quotas = self.dataset_populator.get_quotas() + prior_quotas_len = len(quotas) + + payload = { + "name": "defaultmylabeledquota1", + "description": "first default quota that is labeled", + "amount": "120MB", + "operation": "=", + "default": "registered", + "quota_source_label": "mylabel", + } + self.dataset_populator.create_quota(payload) + + quotas = self.dataset_populator.get_quotas() + assert len(quotas) == prior_quotas_len + 1 + + labels = [q["quota_source_label"] for q in quotas] + assert "mylabel" in labels + def _create_quota_with_name(self, quota_name: str, is_default: bool = False): payload = self._build_quota_payload_with_name(quota_name, is_default) create_response = self._post("quotas", data=payload, json=True) diff --git a/test/unit/data/test_galaxy_mapping.py b/test/unit/data/test_galaxy_mapping.py index 1eba8fcabc86..419dd1880f9d 100644 --- a/test/unit/data/test_galaxy_mapping.py +++ b/test/unit/data/test_galaxy_mapping.py @@ -23,6 +23,7 @@ get_object_session, ) from galaxy.model.security import GalaxyRBACAgent +from galaxy.objectstore import QuotaSourceMap datatypes_registry = galaxy.datatypes.registry.Registry() datatypes_registry.load_datatypes() @@ -496,7 +497,7 @@ def test_populated_optimized_list_list_not_populated(self): def test_default_disk_usage(self): u = model.User(email="disk_default@test.com", password="password") self.persist(u) - u.adjust_total_disk_usage(1) + u.adjust_total_disk_usage(1, None) u_id = u.id self.expunge() user_reload = self.model.session.query(model.User).get(u_id) @@ -1137,8 +1138,11 @@ def _workflow_from_steps(user, steps): class MockObjectStore: - def __init__(self): - pass + def __init__(self, quota_source_map=None): + self._quota_source_map = quota_source_map or QuotaSourceMap() + + def get_quota_source_map(self): + return self._quota_source_map def size(self, dataset): return 42 diff --git a/test/unit/data/test_quota.py b/test/unit/data/test_quota.py index c36412ee7bb8..0639ffdcc540 100644 --- a/test/unit/data/test_quota.py +++ b/test/unit/data/test_quota.py @@ -1,26 +1,103 @@ +import uuid + from galaxy import model +from galaxy.objectstore import ( + QuotaSourceInfo, + QuotaSourceMap, +) from galaxy.quota import DatabaseQuotaAgent -from .test_galaxy_mapping import BaseModelTestCase +from .test_galaxy_mapping import ( + BaseModelTestCase, + MockObjectStore, +) -class CalculateUsageTestCase(BaseModelTestCase): - def test_calculate_usage(self): - u = model.User(email="calc_usage@example.com", password="password") +class PurgeUsageTestCase(BaseModelTestCase): + def setUp(self): + super().setUp() + model = self.model + u = model.User(email="purge_usage@example.com", password="password") + u.disk_usage = 25 self.persist(u) - h = model.History(name="History for Usage", user=u) + h = model.History(name="History for Purging", user=u) self.persist(h) + self.u = u + self.h = h - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h, create_dataset=True, sa_session=self.model.session + def _setup_dataset(self): + d1 = self.model.HistoryDatasetAssociation( + extension="txt", history=self.h, create_dataset=True, sa_session=self.model.session ) d1.dataset.total_size = 10 self.persist(d1) + return d1 + + def test_calculate_usage(self): + d1 = self._setup_dataset() + quota_source_info = QuotaSourceInfo(None, True) + d1.purge_usage_from_quota(self.u, quota_source_info) + self.persist(self.u) + assert int(self.u.disk_usage) == 15 + + def test_calculate_usage_untracked(self): + # test quota tracking off on the objectstore + d1 = self._setup_dataset() + quota_source_info = QuotaSourceInfo(None, False) + d1.purge_usage_from_quota(self.u, quota_source_info) + self.persist(self.u) + assert int(self.u.disk_usage) == 25 + + def test_calculate_usage_per_source(self): + self.u.adjust_total_disk_usage(124, "myquotalabel") + + # test quota tracking with a non-default quota label + d1 = self._setup_dataset() + quota_source_info = QuotaSourceInfo("myquotalabel", True) + d1.purge_usage_from_quota(self.u, quota_source_info) + self.persist(self.u) + assert int(self.u.disk_usage) == 25 - assert u.calculate_disk_usage() == 10 + usages = self.u.dictify_usage() + assert len(usages) == 2 + assert usages[1]["quota_source_label"] == "myquotalabel" + assert usages[1]["total_disk_usage"] == 114 + + +class CalculateUsageTestCase(BaseModelTestCase): + def setUp(self): + model = self.model + u = model.User(email="calc_usage%s@example.com" % str(uuid.uuid1()), password="password") + self.persist(u) + h = model.History(name="History for Calculated Usage", user=u) + self.persist(h) + self.u = u + self.h = h + + def _add_dataset(self, total_size, object_store_id=None): + model = self.model + d1 = model.HistoryDatasetAssociation( + extension="txt", history=self.h, create_dataset=True, sa_session=self.model.session + ) + d1.dataset.total_size = total_size + d1.dataset.object_store_id = object_store_id + self.persist(d1) + return d1 + + def test_calculate_usage(self): + model = self.model + u = self.u + h = self.h + + d1 = self._add_dataset(10) + + object_store = MockObjectStore() + assert u.calculate_disk_usage_default_source(object_store) == 10 assert u.disk_usage is None - u.calculate_and_set_disk_usage() - assert u.disk_usage == 10 + u.calculate_and_set_disk_usage(object_store) + assert u.calculate_disk_usage_default_source(object_store) == 10 + # method no longer updates user object + # assert u.disk_usage == 10 # Test dataset being in another history doesn't duplicate usage cost. h2 = model.History(name="Second usage history", user=u) @@ -32,7 +109,138 @@ def test_calculate_usage(self): d3 = model.HistoryDatasetAssociation(extension="txt", history=h, dataset=d1.dataset) self.persist(d3) - assert u.calculate_disk_usage() == 10 + assert u.calculate_disk_usage_default_source(object_store) == 10 + + def test_calculate_usage_disabled_quota(self): + u = self.u + + self._add_dataset(10, "not_tracked") + self._add_dataset(15, "tracked") + + quota_source_map = QuotaSourceMap() + not_tracked = QuotaSourceMap() + not_tracked.default_quota_enabled = False + quota_source_map.backends["not_tracked"] = not_tracked + + object_store = MockObjectStore(quota_source_map) + + assert u.calculate_disk_usage_default_source(object_store) == 15 + + def test_calculate_usage_alt_quota(self): + model = self.model + u = self.u + + self._add_dataset(10) + self._add_dataset(15, "alt_source_store") + + quota_source_map = QuotaSourceMap() + alt_source = QuotaSourceMap() + alt_source.default_quota_source = "alt_source" + quota_source_map.backends["alt_source_store"] = alt_source + + object_store = MockObjectStore(quota_source_map) + + u.calculate_and_set_disk_usage(object_store) + model.context.refresh(u) + usages = u.dictify_usage(object_store) + assert len(usages) == 2 + assert usages[0]["quota_source_label"] is None + assert usages[0]["total_disk_usage"] == 10 + + assert usages[1]["quota_source_label"] == "alt_source" + assert usages[1]["total_disk_usage"] == 15 + + usage = u.dictify_usage_for(None) + assert usage["quota_source_label"] is None + assert usage["total_disk_usage"] == 10 + + usage = u.dictify_usage_for("alt_source") + assert usage["quota_source_label"] == "alt_source" + assert usage["total_disk_usage"] == 15 + + usage = u.dictify_usage_for("unused_source") + assert usage["quota_source_label"] == "unused_source" + assert usage["total_disk_usage"] == 0 + + def test_calculate_usage_removes_unused_quota_labels(self): + model = self.model + u = self.u + + self._add_dataset(10) + self._add_dataset(15, "alt_source_store") + + quota_source_map = QuotaSourceMap() + alt_source = QuotaSourceMap() + alt_source.default_quota_source = "alt_source" + quota_source_map.backends["alt_source_store"] = alt_source + + object_store = MockObjectStore(quota_source_map) + + u.calculate_and_set_disk_usage(object_store) + model.context.refresh(u) + usages = u.dictify_usage() + assert len(usages) == 2 + assert usages[0]["quota_source_label"] is None + assert usages[0]["total_disk_usage"] == 10 + + assert usages[1]["quota_source_label"] == "alt_source" + assert usages[1]["total_disk_usage"] == 15 + + alt_source.default_quota_source = "new_alt_source" + u.calculate_and_set_disk_usage(object_store) + model.context.refresh(u) + usages = u.dictify_usage() + assert len(usages) == 2 + assert usages[0]["quota_source_label"] is None + assert usages[0]["total_disk_usage"] == 10 + + assert usages[1]["quota_source_label"] == "new_alt_source" + assert usages[1]["total_disk_usage"] == 15 + + def test_dictify_usage_unused_quota_labels(self): + model = self.model + u = self.u + + self._add_dataset(10) + self._add_dataset(15, "alt_source_store") + + quota_source_map = QuotaSourceMap() + alt_source = QuotaSourceMap() + alt_source.default_quota_source = "alt_source" + quota_source_map.backends["alt_source_store"] = alt_source + + unused_source = QuotaSourceMap() + unused_source.default_quota_source = "unused_source" + quota_source_map.backends["unused_source_store"] = unused_source + + object_store = MockObjectStore(quota_source_map) + u.calculate_and_set_disk_usage(object_store) + model.context.refresh(u) + usages = u.dictify_usage(object_store) + assert len(usages) == 3 + + def test_calculate_usage_default_storage_disabled(self): + model = self.model + u = self.u + + self._add_dataset(10) + self._add_dataset(15, "alt_source_store") + + quota_source_map = QuotaSourceMap(None, False) + alt_source = QuotaSourceMap("alt_source", True) + quota_source_map.backends["alt_source_store"] = alt_source + + object_store = MockObjectStore(quota_source_map) + + u.calculate_and_set_disk_usage(object_store) + model.context.refresh(u) + usages = u.dictify_usage(object_store) + assert len(usages) == 2 + assert usages[0]["quota_source_label"] is None + assert usages[0]["total_disk_usage"] == 0 + + assert usages[1]["quota_source_label"] == "alt_source" + assert usages[1]["total_disk_usage"] == 15 class QuotaTestCase(BaseModelTestCase): @@ -86,6 +294,27 @@ def test_quota(self): self._add_group_quota(u, quota) self._assert_user_quota_is(u, None) + def test_labeled_quota(self): + model = self.model + u = model.User(email="labeled_quota@example.com", password="password") + self.persist(u) + + label1 = "coollabel1" + self._assert_user_quota_is(u, None, label1) + + quota = model.Quota(name="default registered labeled", amount=21, quota_source_label=label1) + self.quota_agent.set_default_quota( + model.DefaultQuotaAssociation.types.REGISTERED, + quota, + ) + + self._assert_user_quota_is(u, 21, label1) + + quota = model.Quota(name="user quota add labeled", amount=31, operation="+", quota_source_label=label1) + self._add_user_quota(u, quota) + + self._assert_user_quota_is(u, 52, label1) + def _add_group_quota(self, user, quota): group = model.Group() uga = model.UserGroupAssociation(user, group) @@ -97,18 +326,56 @@ def _add_user_quota(self, user, quota): user.quotas.append(uqa) self.persist(quota, uqa, user) - def _assert_user_quota_is(self, user, amount): - actual_quota = self.quota_agent.get_quota(user) + def _assert_user_quota_is(self, user, amount, quota_source_label=None): + actual_quota = self.quota_agent.get_quota(user, quota_source_label=quota_source_label) assert amount == actual_quota, "Expected quota [%s], got [%s]" % (amount, actual_quota) - if amount is None: - user.total_disk_usage = 1000 - job = model.Job() - job.user = user - assert not self.quota_agent.is_over_quota(None, job, None) - else: - job = model.Job() - job.user = user - user.total_disk_usage = amount - 1 - assert not self.quota_agent.is_over_quota(None, job, None) - user.total_disk_usage = amount + 1 - assert self.quota_agent.is_over_quota(None, job, None) + if quota_source_label is None: + if amount is None: + user.total_disk_usage = 1000 + job = self.model.Job() + job.user = user + assert not self.quota_agent.is_over_quota(None, job, None) + else: + job = self.model.Job() + job.user = user + user.total_disk_usage = amount - 1 + assert not self.quota_agent.is_over_quota(None, job, None) + user.total_disk_usage = amount + 1 + assert self.quota_agent.is_over_quota(None, job, None) + + +class UsageTestCase(BaseModelTestCase): + def test_usage(self): + model = self.model + u = model.User(email="usage@example.com", password="password") + self.persist(u) + + u.adjust_total_disk_usage(123, None) + self.persist(u) + + assert u.get_disk_usage() == 123 + + def test_labeled_usage(self): + model = self.model + u = model.User(email="labeled.usage@example.com", password="password") + self.persist(u) + assert len(u.quota_source_usages) == 0 + + u.adjust_total_disk_usage(123, "foobar") + usages = u.dictify_usage() + assert len(usages) == 1 + + assert u.get_disk_usage() == 0 + assert u.get_disk_usage(quota_source_label="foobar") == 123 + self.model.context.refresh(u) + + usages = u.dictify_usage() + assert len(usages) == 2 + + u.adjust_total_disk_usage(124, "foobar") + self.model.context.refresh(u) + + usages = u.dictify_usage() + assert len(usages) == 2 + assert usages[1]["quota_source_label"] == "foobar" + assert usages[1]["total_disk_usage"] == 247 diff --git a/test/unit/objectstore/test_objectstore.py b/test/unit/objectstore/test_objectstore.py index bf9de843c0c2..0bdfd4d5bd56 100644 --- a/test/unit/objectstore/test_objectstore.py +++ b/test/unit/objectstore/test_objectstore.py @@ -388,11 +388,13 @@ def test_mixed_private(): + + @@ -406,6 +408,8 @@ def test_mixed_private(): type: distributed backends: - id: files1 + quota: + source: 1files type: disk weight: 2 files_dir: "${temp_directory}/files1" @@ -415,6 +419,8 @@ def test_mixed_private(): - type: job_work path: "${temp_directory}/job_working_directory1" - id: files2 + quota: + source: 2files type: disk weight: 1 files_dir: "${temp_directory}/files2" @@ -447,10 +453,45 @@ def test_distributed_store(): _assert_has_keys(as_dict, ["backends", "extra_dirs", "type"]) _assert_key_has_value(as_dict, "type", "distributed") + backends = as_dict["backends"] + assert len(backends) + assert backends[0]["quota"]["source"] == "1files" + assert backends[1]["quota"]["source"] == "2files" + extra_dirs = as_dict["extra_dirs"] assert len(extra_dirs) == 2 +HIERARCHICAL_MUST_HAVE_UNIFIED_QUOTA_SOURCE = """ + + + + + + + + + + + + + + + +""" + + +def test_hiercachical_backend_must_share_quota_source(): + the_exception = None + for config_str in [HIERARCHICAL_MUST_HAVE_UNIFIED_QUOTA_SOURCE]: + try: + with TestConfig(config_str) as (directory, object_store): + pass + except Exception as e: + the_exception = e + assert the_exception is not None + + # Unit testing the cloud and advanced infrastructure object stores is difficult, but # we can at least stub out initializing and test the configuration of these things from # XML and dicts.