From 3398c83773af6a84b745114b7fb7609b880c32d5 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 28 Nov 2023 17:09:24 +0100 Subject: [PATCH] Cache requests across trans session --- .../tools/parameters/cancelable_request.py | 73 +++++++++++++++++++ .../tools/parameters/dynamic_options.py | 13 ++-- lib/galaxy/work/context.py | 9 +++ 3 files changed, 90 insertions(+), 5 deletions(-) create mode 100644 lib/galaxy/tools/parameters/cancelable_request.py diff --git a/lib/galaxy/tools/parameters/cancelable_request.py b/lib/galaxy/tools/parameters/cancelable_request.py new file mode 100644 index 000000000000..794854a3ae62 --- /dev/null +++ b/lib/galaxy/tools/parameters/cancelable_request.py @@ -0,0 +1,73 @@ +import asyncio +import logging +from typing import ( + Any, + Dict, + Optional, +) + +import aiohttp +from typing_extensions import Literal + +log = logging.getLogger() + +REQUEST_METHOD = Literal["GET", "POST", "HEAD"] + + +async def fetch_url( + session: aiohttp.ClientSession, + url: str, + params: Optional[Dict[str, Any]] = None, + data: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, Any]] = None, + method: REQUEST_METHOD = "GET", +): + async with session.request(method=method, url=url, params=params, data=data, headers=headers) as response: + return await response.json() + + +async def async_request_with_timeout( + url: str, + params: Optional[Dict[str, Any]] = None, + data: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, Any]] = None, + method: REQUEST_METHOD = "GET", + timeout: float = 1.0, +): + async with aiohttp.ClientSession() as session: + try: + # Wait for the async request, with a user-defined timeout + result = await asyncio.wait_for( + fetch_url(session=session, url=url, params=params, data=data, headers=headers, method=method), + timeout=timeout, + ) + return result + except asyncio.TimeoutError: + log.debug("Request timed out after %s second", timeout) + return None + + +def request( + url: str, + params: Optional[Dict[str, Any]] = None, + data: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, Any]] = None, + method: REQUEST_METHOD = "GET", + timeout: float = 1.0, +): + loop = asyncio.new_event_loop() + + # Run the event loop until the future is done or cancelled + try: + result = loop.run_until_complete( + async_request_with_timeout( + url=url, params=params, data=data, headers=headers, method=method, timeout=timeout + ) + ) + except asyncio.CancelledError: + log.debug("Request cancelled") + result = None + + loop.close() + + return result diff --git a/lib/galaxy/tools/parameters/dynamic_options.py b/lib/galaxy/tools/parameters/dynamic_options.py index 1b54546f984a..1959f5e6f429 100644 --- a/lib/galaxy/tools/parameters/dynamic_options.py +++ b/lib/galaxy/tools/parameters/dynamic_options.py @@ -8,8 +8,6 @@ import re from io import StringIO -import requests - from galaxy.model import ( DatasetCollectionElement, HistoryDatasetAssociation, @@ -21,6 +19,7 @@ from galaxy.util import string_as_bool from galaxy.util.template import fill_template from . import validation +from .cancelable_request import request log = logging.getLogger(__name__) @@ -785,9 +784,13 @@ def to_triple(values): context = User.user_template_environment(trans.user) url = fill_template(self.from_url, context) try: - response = requests.get(url) - response.raise_for_status() - data = response.json() + unset_value = object() + cached_value = trans.get_cache_value(url, unset_value) + if cached_value is unset_value: + data = request(url, timeout=10) + trans.set_cache_value(url, data) + else: + data = cached_value except Exception as e: log.warning("Fetching from url '%s' failed: %s", url, str(e)) data = None diff --git a/lib/galaxy/work/context.py b/lib/galaxy/work/context.py index 8a1206018c0a..a5bb476e0d19 100644 --- a/lib/galaxy/work/context.py +++ b/lib/galaxy/work/context.py @@ -1,5 +1,7 @@ import abc from typing import ( + Any, + Dict, List, Optional, ) @@ -42,9 +44,16 @@ def __init__( self.__user_current_roles: Optional[List[Role]] = None self.__history = history self._url_builder = url_builder + self._short_term_cache: Dict[str, Any] = {} self.workflow_building_mode = workflow_building_mode self.galaxy_session = galaxy_session + def set_cache_value(self, key: str, value: Any): + self._short_term_cache[key] = value + + def get_cache_value(self, key: str, default: Any = None) -> Any: + return self._short_term_cache.get(key, default) + @property def app(self): return self._app