diff --git a/hyrisecockpit/database_manager/database.py b/hyrisecockpit/database_manager/database.py index aafadb007..16983b42f 100644 --- a/hyrisecockpit/database_manager/database.py +++ b/hyrisecockpit/database_manager/database.py @@ -3,12 +3,11 @@ from multiprocessing import Value from typing import Dict, List, Optional, Tuple, TypedDict -from psycopg2 import DatabaseError, Error, InterfaceError - -from .background_scheduler import BackgroundJobManager from .cursor import ConnectionFactory, StorageConnectionFactory from .interfaces import SqlResultInterface -from .table_names import table_names as _table_names +from .job_manager.asynchronous import AsynchronousJobManager +from .job_manager.background import BackgroundJobManager +from .job_manager.synchronous import SynchronousJobManager from .worker_pool import WorkerPool PluginSetting = TypedDict( @@ -66,17 +65,24 @@ def __init__( workload_publisher_url, self._database_blocked, ) - self._background_scheduler: BackgroundJobManager = BackgroundJobManager( - self._id, + self._background_job_manager: BackgroundJobManager = BackgroundJobManager( self._database_blocked, self._connection_factory, self._hyrise_active, self._worker_pool, self._storage_connection_factory, ) + self._asynchronous_job_manager = AsynchronousJobManager( + self._background_job_manager, + self._database_blocked, + self._connection_factory, + ) + self._synchronous_job_manager = SynchronousJobManager( + self._id, self._database_blocked, self._connection_factory, + ) self._initialize_influx() - self._background_scheduler.start() - self._background_scheduler.load_tables(self._default_tables) + self._background_job_manager.start() + self._asynchronous_job_manager.load_tables(self._default_tables) def _initialize_influx(self) -> None: """Initialize Influx database.""" @@ -114,7 +120,7 @@ def load_data(self, folder_name: str) -> bool: return ( False if self._worker_pool.get_status() != "closed" - else self._background_scheduler.load_tables(folder_name) + else self._asynchronous_job_manager.load_tables(folder_name) ) def delete_data(self, folder_name: str) -> bool: @@ -122,7 +128,7 @@ def delete_data(self, folder_name: str) -> bool: return ( False if self._worker_pool.get_status() != "closed" - else self._background_scheduler.delete_tables(folder_name) + else self._asynchronous_job_manager.delete_tables(folder_name) ) def activate_plugin(self, plugin: str) -> bool: @@ -130,11 +136,11 @@ def activate_plugin(self, plugin: str) -> bool: active_plugins = self._get_plugins() if active_plugins is None or plugin in active_plugins: return False - return self._background_scheduler.activate_plugin(plugin) + return self._asynchronous_job_manager.activate_plugin(plugin) def deactivate_plugin(self, plugin: str) -> bool: """Deactivate plugin.""" - return self._background_scheduler.deactivate_plugin(plugin) + return self._asynchronous_job_manager.deactivate_plugin(plugin) def get_database_blocked(self) -> bool: """Return tables loading flag.""" @@ -150,43 +156,15 @@ def get_hyrise_active(self) -> bool: def get_loaded_tables(self) -> List[Dict[str, str]]: """Return already loaded tables.""" - try: - with self._connection_factory.create_cursor() as cur: - cur.execute("select * from meta_tables;", None) - rows = cur.fetchall() - except (DatabaseError, InterfaceError): - return [] - else: - return [row[0] for row in rows] if rows else [] + return self._synchronous_job_manager.get_loaded_tables() def get_loaded_benchmarks(self, loaded_tables) -> List[str]: """Get list of all benchmarks which are completely loaded.""" - loaded_benchmarks: List = [] - benchmark_names = _table_names.keys() - scale_factors = ["0_1", "1"] - - for benchmark_name in benchmark_names: - for scale_factor in scale_factors: - required_tables = _table_names[benchmark_name] - loaded = True - for table_name in required_tables: - loaded = loaded and ( - f"{table_name}_{benchmark_name}_{scale_factor}" in loaded_tables - ) - - if loaded: - loaded_benchmarks.append(f"{benchmark_name}_{scale_factor}") - - return loaded_benchmarks + return self._synchronous_job_manager.get_loaded_benchmarks() def get_loaded_benchmark_data(self) -> Tuple: """Get loaded benchmark data.""" - loaded_tables: List = self.get_loaded_tables() - loaded_benchmarks: List = self.get_loaded_benchmarks(loaded_tables) - return ( - loaded_tables, - loaded_benchmarks, - ) + return self._synchronous_job_manager.get_loaded_benchmark_data() def start_worker(self) -> bool: """Start worker.""" @@ -198,97 +176,29 @@ def close_worker(self) -> bool: def _get_plugins(self) -> Optional[List[str]]: """Return all currently activated plugins.""" - try: - with self._connection_factory.create_cursor() as cur: - cur.execute(("SELECT name FROM meta_plugins;"), None) - rows = cur.fetchall() - except (DatabaseError, InterfaceError): - return None - else: - return [row[0].split("Plugin")[0] for row in rows] + return self._synchronous_job_manager.get_plugins() def _get_plugin_setting(self) -> Plugins: """Return currently set plugin settings.""" - try: - with self._connection_factory.create_cursor() as cur: - cur.execute( - "SELECT name, value, description FROM meta_settings WHERE name LIKE 'Plugin::%';", - None, - ) - rows = cur.fetchall() - except (DatabaseError, InterfaceError): - return None - else: - plugins: Dict[str, List[PluginSetting]] = {} - for row in rows: - plugin_name, setting_name = row[0].split("::")[1:] - value, description = row[1], row[2] - if plugins.get(plugin_name) is None: - plugins[plugin_name] = [] - plugins[plugin_name].append( - PluginSetting( - name=setting_name, value=value, description=description - ) - ) - return plugins + return self._synchronous_job_manager.get_plugin_setting() def get_detailed_plugins(self) -> Plugins: """Get all activated plugins with their settings.""" - if (plugins := self._get_plugins()) is None: - return None - if (settings := self._get_plugin_setting()) is None: - return None - return { - plugin_name: ( - settings[plugin_name] if plugin_name in settings.keys() else [] - ) - for plugin_name in plugins - } + return self._synchronous_job_manager.get_detailed_plugins() def set_plugin_setting( self, plugin_name: str, setting_name: str, setting_value: str ) -> bool: - """Adjust setting for given plugin.""" - if not self._database_blocked.value: - try: - with self._connection_factory.create_cursor() as cur: - cur.execute( - "UPDATE meta_settings SET value=%s WHERE name=%s;", - ( - setting_value, - "::".join(["Plugin", plugin_name, setting_name]), - ), - ) - return True - except (DatabaseError, InterfaceError): - return False - return False + """Set plug-in settings.""" + return self._synchronous_job_manager.set_plugin_setting( + plugin_name, setting_name, setting_value + ) def execute_sql_query(self, query) -> Optional[SqlResultInterface]: """Execute sql query on database.""" - if not self._database_blocked.value: - try: - with self._connection_factory.create_cursor() as cur: - cur.execute(query, None) - col_names = cur.fetch_column_names() - return SqlResultInterface( - id=self._id, - successful=True, - results=[[str(col) for col in row] for row in cur.fetchall()], - col_names=col_names, - error_message="", - ) - except Error as e: - return SqlResultInterface( - id=self._id, - successful=False, - results=[], - col_names=[], - error_message=str(e), - ) - return None + return self._synchronous_job_manager.execute_sql_query(query) def close(self) -> None: """Close the database.""" self._worker_pool.terminate() - self._background_scheduler.close() + self._background_job_manager.close() diff --git a/hyrisecockpit/database_manager/job/activate_plugin.py b/hyrisecockpit/database_manager/job/activate_plugin.py deleted file mode 100644 index 521e941aa..000000000 --- a/hyrisecockpit/database_manager/job/activate_plugin.py +++ /dev/null @@ -1,20 +0,0 @@ -"""This job activates a plug-in.""" - -from psycopg2 import DatabaseError, InterfaceError -from psycopg2.extensions import AsIs - -from hyrisecockpit.database_manager.cursor import ConnectionFactory - - -def activate_plugin(connection_factory: ConnectionFactory, plugin: str) -> None: - """Activate plug-in in database.""" - try: - with connection_factory.create_cursor() as cur: - cur.execute( - ( - "INSERT INTO meta_plugins(name) VALUES ('/usr/local/hyrise/lib/lib%sPlugin.so');" - ), - (AsIs(plugin),), - ) - except (DatabaseError, InterfaceError): - return None # TODO: log that activate plug-in failed diff --git a/hyrisecockpit/database_manager/job/deactivate_plugin.py b/hyrisecockpit/database_manager/job/deactivate_plugin.py deleted file mode 100644 index 0b8c20515..000000000 --- a/hyrisecockpit/database_manager/job/deactivate_plugin.py +++ /dev/null @@ -1,17 +0,0 @@ -"""This job deactivates a plug-in.""" - -from psycopg2 import DatabaseError, InterfaceError -from psycopg2.extensions import AsIs - -from hyrisecockpit.database_manager.cursor import ConnectionFactory - - -def deactivate_plugin(connection_factory: ConnectionFactory, plugin: str) -> None: - """Deactivate plug-in in database.""" - try: - with connection_factory.create_cursor() as cur: - cur.execute( - ("DELETE FROM meta_plugins WHERE name='%sPlugin';"), (AsIs(plugin),) - ) - except (DatabaseError, InterfaceError): - return None # TODO: log that deactivate plug-in failed diff --git a/hyrisecockpit/database_manager/job_manager/asynchronous.py b/hyrisecockpit/database_manager/job_manager/asynchronous.py new file mode 100644 index 000000000..d768c0a33 --- /dev/null +++ b/hyrisecockpit/database_manager/job_manager/asynchronous.py @@ -0,0 +1,85 @@ +"""Asynchronous job manager.""" + +from multiprocessing import Value +from threading import Thread + +from hyrisecockpit.database_manager.cursor import ConnectionFactory + +from .background import BackgroundJobManager +from .job.delete_tables import delete_tables as delete_tables_job +from .job.load_tables import load_tables as load_tables_job +from .job.plugin import activate_plugin as activate_plugin_job +from .job.plugin import deactivate_plugin as deactivate_plugin_job + + +class AsynchronousJobManager(object): + """Manage asynchronous jobs.""" + + def __init__( + self, + background_job_manager: BackgroundJobManager, + database_blocked: Value, + connection_factory: ConnectionFactory, + ): + """Initialize AsynchronousJobManager object.""" + self._background_job_manager: BackgroundJobManager = background_job_manager + self._database_blocked: Value = database_blocked + self._connection_factory: ConnectionFactory = connection_factory + + def load_tables(self, folder_name: str) -> bool: + """Load tables.""" + if not self._database_blocked.value: + self._database_blocked.value = True + job_thread = Thread( + target=load_tables_job, + args=( + self._database_blocked, + folder_name, + self._connection_factory, + self._background_job_manager, + ), + ) + job_thread.start() + return True + else: + return False + + def delete_tables(self, folder_name: str) -> bool: + """Delete tables.""" + if not self._database_blocked.value: + self._database_blocked.value = True + job_thread = Thread( + target=delete_tables_job, + args=( + self._database_blocked, + folder_name, + self._connection_factory, + self._background_job_manager, + ), + ) + job_thread.start() + return True + else: + return False + + def activate_plugin(self, plugin: str) -> bool: + """Activate plugin.""" + if not self._database_blocked.value: + job_thread = Thread( + target=activate_plugin_job, args=(self._connection_factory, plugin,) + ) + job_thread.start() + return True + else: + return False + + def deactivate_plugin(self, plugin: str) -> bool: + """Dectivate plugin.""" + if not self._database_blocked.value: + job_thread = Thread( + target=deactivate_plugin_job, args=(self._connection_factory, plugin,) + ) + job_thread.start() + return True + else: + return False diff --git a/hyrisecockpit/database_manager/job_manager/background.py b/hyrisecockpit/database_manager/job_manager/background.py new file mode 100644 index 000000000..77fd0da3a --- /dev/null +++ b/hyrisecockpit/database_manager/job_manager/background.py @@ -0,0 +1,136 @@ +"""The BackgroundJobManager is managing the background jobs for the apscheduler.""" + +from multiprocessing import Value + +from apscheduler.schedulers.background import BackgroundScheduler + +from hyrisecockpit.database_manager.cursor import ( + ConnectionFactory, + StorageConnectionFactory, +) +from hyrisecockpit.database_manager.worker_pool import WorkerPool + +from .job.ping_hyrise import ping_hyrise +from .job.update_chunks_data import update_chunks_data +from .job.update_plugin_log import update_plugin_log +from .job.update_queue_length import update_queue_length +from .job.update_storage_data import update_storage_data +from .job.update_system_data import update_system_data +from .job.update_workload_operator_information import ( + update_workload_operator_information, +) +from .job.update_workload_statement_information import ( + update_workload_statement_information, +) + + +class BackgroundJobManager(object): + """Manage background scheduling jobs.""" + + def __init__( + self, + database_blocked: Value, + connection_factory: ConnectionFactory, + hyrise_active: Value, + worker_pool: WorkerPool, + storage_connection_factory: StorageConnectionFactory, + ): + """Initialize BackgroundJobManager object.""" + self._database_blocked: Value = database_blocked + self._connection_factory: ConnectionFactory = connection_factory + self._storage_connection_factory: StorageConnectionFactory = storage_connection_factory + self._worker_pool: WorkerPool = worker_pool + self._scheduler: BackgroundScheduler = BackgroundScheduler() + self._hyrise_active: Value = hyrise_active + self._init_jobs() + + def _init_jobs(self) -> None: + """Initialize basic background jobs.""" + self._ping_hyrise_job = self._scheduler.add_job( + func=ping_hyrise, + trigger="interval", + seconds=0.5, + args=(self._connection_factory, self._hyrise_active), + ) + self._update_queue_length_job = self._scheduler.add_job( + func=update_queue_length, + trigger="interval", + seconds=1, + args=(self._worker_pool, self._storage_connection_factory), + ) + self._update_system_data_job = self._scheduler.add_job( + func=update_system_data, + trigger="interval", + seconds=1, + args=( + self._database_blocked, + self._connection_factory, + self._storage_connection_factory, + ), + ) + self._update_storage_data_job = self._scheduler.add_job( + func=update_storage_data, + trigger="interval", + seconds=5, + args=( + self._database_blocked, + self._connection_factory, + self._storage_connection_factory, + ), + ) + self._update_plugin_log_job = self._scheduler.add_job( + func=update_plugin_log, + trigger="interval", + seconds=1, + args=( + self._database_blocked, + self._connection_factory, + self._storage_connection_factory, + ), + ) + self._update_chunks_data_job = self._scheduler.add_job( + func=update_chunks_data, + trigger="interval", + seconds=5, + args=( + self._database_blocked, + self._connection_factory, + self._storage_connection_factory, + ), + ) + self._update_workload_statement_information_job = self._scheduler.add_job( + func=update_workload_statement_information, + trigger="interval", + seconds=5, + args=( + self._database_blocked, + self._connection_factory, + self._storage_connection_factory, + ), + ) + self._update_workload_operator_information_job = self._scheduler.add_job( + func=update_workload_operator_information, + trigger="interval", + seconds=5, + args=( + self._database_blocked, + self._connection_factory, + self._storage_connection_factory, + ), + ) + + def start(self) -> None: + """Start background scheduler.""" + self._scheduler.start() + + def close(self) -> None: + """Close background scheduler.""" + self._scheduler.shutdown() + + def pause(self) -> None: + """Pause background job execution.""" + self._scheduler.pause() + + def resume(self) -> None: + """Resume background job execution.""" + self._scheduler.resume() diff --git a/hyrisecockpit/database_manager/job/__init__.py b/hyrisecockpit/database_manager/job_manager/job/__init__.py similarity index 100% rename from hyrisecockpit/database_manager/job/__init__.py rename to hyrisecockpit/database_manager/job_manager/job/__init__.py diff --git a/hyrisecockpit/database_manager/job/delete_tables.py b/hyrisecockpit/database_manager/job_manager/job/delete_tables.py similarity index 77% rename from hyrisecockpit/database_manager/job/delete_tables.py rename to hyrisecockpit/database_manager/job_manager/job/delete_tables.py index 01ea0c9ce..5d2662272 100644 --- a/hyrisecockpit/database_manager/job/delete_tables.py +++ b/hyrisecockpit/database_manager/job_manager/job/delete_tables.py @@ -2,10 +2,10 @@ from multiprocessing import Value from typing import List, Tuple -from hyrisecockpit.database_manager.job.execute_queries_parallel import ( +from hyrisecockpit.database_manager.job_manager.job.execute_queries_parallel import ( execute_queries_parallel, ) -from hyrisecockpit.database_manager.job.get_loaded_tables import ( +from hyrisecockpit.database_manager.job_manager.job.get_loaded_tables import ( get_loaded_tables_for_scale_factor, ) from hyrisecockpit.database_manager.table_names import table_names as _table_names @@ -25,7 +25,10 @@ def _generate_table_drop_queries( def delete_tables( - database_blocked: Value, folder_name: str, connection_factory + database_blocked: Value, + folder_name: str, + connection_factory, + background_job_manager, ) -> None: """Delete tables.""" benchmark, scale_factor = folder_name.split("_", maxsplit=1) @@ -36,7 +39,7 @@ def delete_tables( tables_delete_queries = _generate_table_drop_queries( loaded_tables, benchmark, scale_factor ) - + background_job_manager.pause() execute_queries_parallel(tables_delete_queries, connection_factory) - + background_job_manager.resume() database_blocked.value = False diff --git a/hyrisecockpit/database_manager/job/execute_queries_parallel.py b/hyrisecockpit/database_manager/job_manager/job/execute_queries_parallel.py similarity index 82% rename from hyrisecockpit/database_manager/job/execute_queries_parallel.py rename to hyrisecockpit/database_manager/job_manager/job/execute_queries_parallel.py index a9edd6177..308a0e441 100644 --- a/hyrisecockpit/database_manager/job/execute_queries_parallel.py +++ b/hyrisecockpit/database_manager/job_manager/job/execute_queries_parallel.py @@ -1,5 +1,5 @@ """This job execute queries parallel in separate processes.""" -from multiprocessing import Process +from threading import Thread from typing import Any, List, Optional, Tuple from psycopg2 import DatabaseError, InterfaceError, ProgrammingError @@ -36,12 +36,11 @@ def _execute_table_query( def execute_queries_parallel(queries, connection_factory: ConnectionFactory) -> None: """Start processes for query execution.""" - processes: List[Process] = [ - Process(target=_execute_table_query, args=(query, connection_factory),) + threads: List[Thread] = [ + Thread(target=_execute_table_query, args=(query, connection_factory),) for query in queries ] - for process in processes: - process.start() - for process in processes: - process.join() - process.terminate() + for thread in threads: + thread.start() + for thread in threads: + thread.join() diff --git a/hyrisecockpit/database_manager/job/get_loaded_tables.py b/hyrisecockpit/database_manager/job_manager/job/get_loaded_tables.py similarity index 100% rename from hyrisecockpit/database_manager/job/get_loaded_tables.py rename to hyrisecockpit/database_manager/job_manager/job/get_loaded_tables.py diff --git a/hyrisecockpit/database_manager/job/interfaces.py b/hyrisecockpit/database_manager/job_manager/job/interfaces.py similarity index 100% rename from hyrisecockpit/database_manager/job/interfaces.py rename to hyrisecockpit/database_manager/job_manager/job/interfaces.py diff --git a/hyrisecockpit/database_manager/job/load_tables.py b/hyrisecockpit/database_manager/job_manager/job/load_tables.py similarity index 80% rename from hyrisecockpit/database_manager/job/load_tables.py rename to hyrisecockpit/database_manager/job_manager/job/load_tables.py index 7ca0e05aa..2b1ef19f3 100644 --- a/hyrisecockpit/database_manager/job/load_tables.py +++ b/hyrisecockpit/database_manager/job_manager/job/load_tables.py @@ -2,10 +2,10 @@ from multiprocessing import Value from typing import List, Optional, Tuple, Union -from hyrisecockpit.database_manager.job.execute_queries_parallel import ( +from hyrisecockpit.database_manager.job_manager.job.execute_queries_parallel import ( execute_queries_parallel, ) -from hyrisecockpit.database_manager.job.get_loaded_tables import ( +from hyrisecockpit.database_manager.job_manager.job.get_loaded_tables import ( get_loaded_tables_for_scale_factor, ) from hyrisecockpit.database_manager.table_names import table_names as _table_names @@ -31,7 +31,12 @@ def _generate_table_loading_queries( ] -def load_tables(database_blocked: Value, folder_name: str, connection_factory) -> None: +def load_tables( + database_blocked: Value, + folder_name: str, + connection_factory, + background_job_manager, +) -> None: """Load tables.""" benchmark, scale_factor = folder_name.split("_", maxsplit=1) table_names: List[str] = _table_names[benchmark] @@ -44,7 +49,7 @@ def load_tables(database_blocked: Value, folder_name: str, connection_factory) - tables_loading_queries = _generate_table_loading_queries( tables_to_load, benchmark, scale_factor ) - + background_job_manager.pause() execute_queries_parallel(tables_loading_queries, connection_factory) - + background_job_manager.resume() database_blocked.value = False diff --git a/hyrisecockpit/database_manager/job/ping_hyrise.py b/hyrisecockpit/database_manager/job_manager/job/ping_hyrise.py similarity index 100% rename from hyrisecockpit/database_manager/job/ping_hyrise.py rename to hyrisecockpit/database_manager/job_manager/job/ping_hyrise.py diff --git a/hyrisecockpit/database_manager/job_manager/job/plugin.py b/hyrisecockpit/database_manager/job_manager/job/plugin.py new file mode 100644 index 000000000..92e8d5dfe --- /dev/null +++ b/hyrisecockpit/database_manager/job_manager/job/plugin.py @@ -0,0 +1,106 @@ +"""Jobs that are interacting with plug-ins.""" +from typing import Dict, List, Optional, TypedDict + +from psycopg2 import DatabaseError, InterfaceError +from psycopg2.extensions import AsIs + +from hyrisecockpit.database_manager.cursor import ConnectionFactory + +PluginSetting = TypedDict( + "PluginSetting", {"name": str, "value": str, "description": str} +) +Plugins = Optional[Dict[str, List[PluginSetting]]] + + +def activate_plugin(connection_factory: ConnectionFactory, plugin: str) -> None: + """Activate plug-in in database.""" + try: + with connection_factory.create_cursor() as cur: + cur.execute( + ( + "INSERT INTO meta_plugins(name) VALUES ('/usr/local/hyrise/lib/lib%sPlugin.so');" + ), + (AsIs(plugin),), + ) + except (DatabaseError, InterfaceError): + return None # TODO: log that activate plug-in failed + + +def deactivate_plugin(connection_factory: ConnectionFactory, plugin: str) -> None: + """Deactivate plug-in in database.""" + try: + with connection_factory.create_cursor() as cur: + cur.execute( + ("DELETE FROM meta_plugins WHERE name='%sPlugin';"), (AsIs(plugin),) + ) + except (DatabaseError, InterfaceError): + return None # TODO: log that deactivate plug-in failed + + +def get_plugins(connection_factory: ConnectionFactory) -> Optional[List[str]]: + """Return all currently activated plugins.""" + try: + with connection_factory.create_cursor() as cur: + cur.execute(("SELECT name FROM meta_plugins;"), None) + rows = cur.fetchall() + except (DatabaseError, InterfaceError): + return None + else: + return [row[0].split("Plugin")[0] for row in rows] + + +def get_plugin_setting(connection_factory: ConnectionFactory) -> Plugins: + """Return currently set plugin settings.""" + try: + with connection_factory.create_cursor() as cur: + cur.execute( + "SELECT name, value, description FROM meta_settings WHERE name LIKE 'Plugin::%';", + None, + ) + rows = cur.fetchall() + except (DatabaseError, InterfaceError): + return None + else: + plugins: Dict[str, List[PluginSetting]] = {} + for row in rows: + plugin_name, setting_name = row[0].split("::")[1:] + value, description = row[1], row[2] + if plugins.get(plugin_name) is None: + plugins[plugin_name] = [] + plugins[plugin_name].append( + PluginSetting(name=setting_name, value=value, description=description) + ) + return plugins + + +def get_detailed_plugins(connection_factory: ConnectionFactory) -> Plugins: + """Get all activated plugins with their settings.""" + if (plugins := get_plugins(connection_factory)) is None: + return None + if (settings := get_plugin_setting(connection_factory)) is None: + return None + return { + plugin_name: (settings[plugin_name] if plugin_name in settings.keys() else []) + for plugin_name in plugins + } + + +def set_plugin_setting( + connection_factory: ConnectionFactory, + database_blocked, + plugin_name: str, + setting_name: str, + setting_value: str, +) -> bool: + """Adjust setting for given plugin.""" + if not database_blocked.value: + try: + with connection_factory.create_cursor() as cur: + cur.execute( + "UPDATE meta_settings SET value=%s WHERE name=%s;", + (setting_value, "::".join(["Plugin", plugin_name, setting_name]),), + ) + return True + except (DatabaseError, InterfaceError): + return False + return False diff --git a/hyrisecockpit/database_manager/job_manager/job/sql_interface.py b/hyrisecockpit/database_manager/job_manager/job/sql_interface.py new file mode 100644 index 000000000..f6eb0c920 --- /dev/null +++ b/hyrisecockpit/database_manager/job_manager/job/sql_interface.py @@ -0,0 +1,34 @@ +"""This job sends a sql query to the database and returns the result.""" + +from typing import Optional + +from psycopg2 import Error + +from hyrisecockpit.database_manager.interfaces import SqlResultInterface + + +def execute_sql_query( + connection_factory, database_blocked, database_id, query +) -> Optional[SqlResultInterface]: + """Execute sql query on database.""" + if not database_blocked.value: + try: + with connection_factory.create_cursor() as cur: + cur.execute(query, None) + col_names = cur.fetch_column_names() + return SqlResultInterface( + id=database_id, + successful=True, + results=[[str(col) for col in row] for row in cur.fetchall()], + col_names=col_names, + error_message="", + ) + except Error as e: + return SqlResultInterface( + id=database_id, + successful=False, + results=[], + col_names=[], + error_message=str(e), + ) + return None diff --git a/hyrisecockpit/database_manager/job/sql_to_data_frame.py b/hyrisecockpit/database_manager/job_manager/job/sql_to_data_frame.py similarity index 100% rename from hyrisecockpit/database_manager/job/sql_to_data_frame.py rename to hyrisecockpit/database_manager/job_manager/job/sql_to_data_frame.py diff --git a/hyrisecockpit/database_manager/job/update_chunks_data.py b/hyrisecockpit/database_manager/job_manager/job/update_chunks_data.py similarity index 96% rename from hyrisecockpit/database_manager/job/update_chunks_data.py rename to hyrisecockpit/database_manager/job_manager/job/update_chunks_data.py index 2dd8920ad..3231f7d1d 100644 --- a/hyrisecockpit/database_manager/job/update_chunks_data.py +++ b/hyrisecockpit/database_manager/job_manager/job/update_chunks_data.py @@ -7,7 +7,9 @@ from pandas import DataFrame from hyrisecockpit.database_manager.cursor import StorageConnectionFactory -from hyrisecockpit.database_manager.job.sql_to_data_frame import sql_to_data_frame +from hyrisecockpit.database_manager.job_manager.job.sql_to_data_frame import ( + sql_to_data_frame, +) previous_chunks_data: Dict = {} diff --git a/hyrisecockpit/database_manager/job/update_plugin_log.py b/hyrisecockpit/database_manager/job_manager/job/update_plugin_log.py similarity index 90% rename from hyrisecockpit/database_manager/job/update_plugin_log.py rename to hyrisecockpit/database_manager/job_manager/job/update_plugin_log.py index 26d420d64..cbfe15947 100644 --- a/hyrisecockpit/database_manager/job/update_plugin_log.py +++ b/hyrisecockpit/database_manager/job_manager/job/update_plugin_log.py @@ -3,7 +3,9 @@ from time import time_ns from hyrisecockpit.database_manager.cursor import StorageConnectionFactory -from hyrisecockpit.database_manager.job.sql_to_data_frame import sql_to_data_frame +from hyrisecockpit.database_manager.job_manager.job.sql_to_data_frame import ( + sql_to_data_frame, +) def update_plugin_log( diff --git a/hyrisecockpit/database_manager/job/update_queue_length.py b/hyrisecockpit/database_manager/job_manager/job/update_queue_length.py similarity index 100% rename from hyrisecockpit/database_manager/job/update_queue_length.py rename to hyrisecockpit/database_manager/job_manager/job/update_queue_length.py diff --git a/hyrisecockpit/database_manager/job/update_storage_data.py b/hyrisecockpit/database_manager/job_manager/job/update_storage_data.py similarity index 93% rename from hyrisecockpit/database_manager/job/update_storage_data.py rename to hyrisecockpit/database_manager/job_manager/job/update_storage_data.py index 5a6252b34..0ba6382f2 100644 --- a/hyrisecockpit/database_manager/job/update_storage_data.py +++ b/hyrisecockpit/database_manager/job_manager/job/update_storage_data.py @@ -6,8 +6,10 @@ from pandas import DataFrame from hyrisecockpit.database_manager.cursor import StorageConnectionFactory -from hyrisecockpit.database_manager.job.interfaces import StorageDataType -from hyrisecockpit.database_manager.job.sql_to_data_frame import sql_to_data_frame +from hyrisecockpit.database_manager.job_manager.job.interfaces import StorageDataType +from hyrisecockpit.database_manager.job_manager.job.sql_to_data_frame import ( + sql_to_data_frame, +) def _create_storage_data_dataframe(meta_segments: DataFrame) -> DataFrame: diff --git a/hyrisecockpit/database_manager/job/update_system_data.py b/hyrisecockpit/database_manager/job_manager/job/update_system_data.py similarity index 95% rename from hyrisecockpit/database_manager/job/update_system_data.py rename to hyrisecockpit/database_manager/job_manager/job/update_system_data.py index e88bfa08e..64e41c84b 100644 --- a/hyrisecockpit/database_manager/job/update_system_data.py +++ b/hyrisecockpit/database_manager/job_manager/job/update_system_data.py @@ -4,7 +4,9 @@ from typing import Dict, Union from hyrisecockpit.database_manager.cursor import StorageConnectionFactory -from hyrisecockpit.database_manager.job.sql_to_data_frame import sql_to_data_frame +from hyrisecockpit.database_manager.job_manager.job.sql_to_data_frame import ( + sql_to_data_frame, +) previous_system_usage = None previous_process_usage = None diff --git a/hyrisecockpit/database_manager/job/update_workload_operator_information.py b/hyrisecockpit/database_manager/job_manager/job/update_workload_operator_information.py similarity index 92% rename from hyrisecockpit/database_manager/job/update_workload_operator_information.py rename to hyrisecockpit/database_manager/job_manager/job/update_workload_operator_information.py index 9f87a4e07..6b50a424c 100644 --- a/hyrisecockpit/database_manager/job/update_workload_operator_information.py +++ b/hyrisecockpit/database_manager/job_manager/job/update_workload_operator_information.py @@ -4,7 +4,9 @@ from time import time_ns from hyrisecockpit.database_manager.cursor import StorageConnectionFactory -from hyrisecockpit.database_manager.job.sql_to_data_frame import sql_to_data_frame +from hyrisecockpit.database_manager.job_manager.job.sql_to_data_frame import ( + sql_to_data_frame, +) def update_workload_operator_information( diff --git a/hyrisecockpit/database_manager/job/update_workload_statement_information.py b/hyrisecockpit/database_manager/job_manager/job/update_workload_statement_information.py similarity index 95% rename from hyrisecockpit/database_manager/job/update_workload_statement_information.py rename to hyrisecockpit/database_manager/job_manager/job/update_workload_statement_information.py index 409b6c424..d3010fe59 100644 --- a/hyrisecockpit/database_manager/job/update_workload_statement_information.py +++ b/hyrisecockpit/database_manager/job_manager/job/update_workload_statement_information.py @@ -5,7 +5,9 @@ from typing import Dict, List from hyrisecockpit.database_manager.cursor import StorageConnectionFactory -from hyrisecockpit.database_manager.job.sql_to_data_frame import sql_to_data_frame +from hyrisecockpit.database_manager.job_manager.job.sql_to_data_frame import ( + sql_to_data_frame, +) def update_workload_statement_information( diff --git a/hyrisecockpit/database_manager/job_manager/synchronous.py b/hyrisecockpit/database_manager/job_manager/synchronous.py new file mode 100644 index 000000000..9cab5c8a6 --- /dev/null +++ b/hyrisecockpit/database_manager/job_manager/synchronous.py @@ -0,0 +1,107 @@ +"""Synchronous job manager.""" + +from multiprocessing import Value +from typing import Dict, List, Optional, Tuple, TypedDict + +from psycopg2 import DatabaseError, InterfaceError + +from hyrisecockpit.database_manager.cursor import ConnectionFactory +from hyrisecockpit.database_manager.interfaces import SqlResultInterface +from hyrisecockpit.database_manager.table_names import table_names as _table_names + +from .job.plugin import get_detailed_plugins as get_detailed_plugins_job +from .job.plugin import get_plugin_setting as get_plugin_setting_job +from .job.plugin import get_plugins as get_plugins_job +from .job.plugin import set_plugin_setting as set_plugin_setting_job +from .job.sql_interface import execute_sql_query as execute_sql_query_job + +PluginSetting = TypedDict( + "PluginSetting", {"name": str, "value": str, "description": str} +) +Plugins = Optional[Dict[str, List[PluginSetting]]] + + +class SynchronousJobManager(object): + """Manage synchronous jobs.""" + + def __init__( + self, + database_id: str, + database_blocked: Value, + connection_factory: ConnectionFactory, + ): + """Initialize SynchronousJobManager object.""" + self._database_id = database_id + self._database_blocked: Value = database_blocked + self._connection_factory: ConnectionFactory = connection_factory + + def get_plugins(self) -> Optional[List[str]]: + """Return all currently activated plugins.""" + return get_plugins_job(self._connection_factory) + + def get_plugin_setting(self) -> Plugins: + """Return currently set plugin settings.""" + return get_plugin_setting_job(self._connection_factory) + + def get_detailed_plugins(self) -> Plugins: + """Get all activated plugins with their settings.""" + return get_detailed_plugins_job(self._connection_factory) + + def set_plugin_setting( + self, plugin_name: str, setting_name: str, setting_value: str + ) -> bool: + """Adjust setting for given plugin.""" + return set_plugin_setting_job( + self._connection_factory, + self._database_blocked, + plugin_name, + setting_name, + setting_value, + ) + + def execute_sql_query(self, query) -> Optional[SqlResultInterface]: + """Execute sql query on database.""" + return execute_sql_query_job( + self._connection_factory, self._database_blocked, self._database_id, query + ) + + # TODO: Move to jobs + def get_loaded_tables(self) -> List[Dict[str, str]]: + """Return already loaded tables.""" + try: + with self._connection_factory.create_cursor() as cur: + cur.execute("select * from meta_tables;", None) + rows = cur.fetchall() + except (DatabaseError, InterfaceError): + return [] + else: + return [row[0] for row in rows] if rows else [] + + def get_loaded_benchmarks(self, loaded_tables) -> List[str]: + """Get list of all benchmarks which are completely loaded.""" + loaded_benchmarks: List = [] + benchmark_names = _table_names.keys() + scale_factors = ["0_1", "1"] + + for benchmark_name in benchmark_names: + for scale_factor in scale_factors: + required_tables = _table_names[benchmark_name] + loaded = True + for table_name in required_tables: + loaded = loaded and ( + f"{table_name}_{benchmark_name}_{scale_factor}" in loaded_tables + ) + + if loaded: + loaded_benchmarks.append(f"{benchmark_name}_{scale_factor}") + + return loaded_benchmarks + + def get_loaded_benchmark_data(self) -> Tuple: + """Get loaded benchmark data.""" + loaded_tables: List = self.get_loaded_tables() + loaded_benchmarks: List = self.get_loaded_benchmarks(loaded_tables) + return ( + loaded_tables, + loaded_benchmarks, + )