Skip to content
This repository has been archived by the owner on Nov 30, 2022. It is now read-only.

B/add different job managers #674

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 30 additions & 120 deletions hyrisecockpit/database_manager/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
from multiprocessing import Value
from typing import Dict, List, Optional, Tuple, TypedDict

from psycopg2 import DatabaseError, Error, InterfaceError

from .background_scheduler import BackgroundJobManager
from .cursor import ConnectionFactory, StorageConnectionFactory
from .interfaces import SqlResultInterface
from .table_names import table_names as _table_names
from .job_manager.asynchronous import AsynchronousJobManager
from .job_manager.background import BackgroundJobManager
from .job_manager.synchronous import SynchronousJobManager
from .worker_pool import WorkerPool

PluginSetting = TypedDict(
Expand Down Expand Up @@ -66,17 +65,24 @@ def __init__(
workload_publisher_url,
self._database_blocked,
)
self._background_scheduler: BackgroundJobManager = BackgroundJobManager(
self._id,
self._background_job_manager: BackgroundJobManager = BackgroundJobManager(
self._database_blocked,
self._connection_factory,
self._hyrise_active,
self._worker_pool,
self._storage_connection_factory,
)
self._asynchronous_job_manager = AsynchronousJobManager(
self._background_job_manager,
self._database_blocked,
self._connection_factory,
)
self._synchronous_job_manager = SynchronousJobManager(
self._id, self._database_blocked, self._connection_factory,
)
self._initialize_influx()
self._background_scheduler.start()
self._background_scheduler.load_tables(self._default_tables)
self._background_job_manager.start()
self._background_job_manager.load_tables(self._default_tables)

def _initialize_influx(self) -> None:
"""Initialize Influx database."""
Expand Down Expand Up @@ -114,24 +120,24 @@ def load_data(self, folder_name: str) -> bool:
return (
False
if self._worker_pool.get_status() != "closed"
else self._background_scheduler.load_tables(folder_name)
else self._asynchronous_job_manager.load_tables(folder_name)
)

def delete_data(self, folder_name: str) -> bool:
"""Delete tables."""
return (
False
if self._worker_pool.get_status() != "closed"
else self._background_scheduler.delete_tables(folder_name)
else self._asynchronous_job_manager.delete_tables(folder_name)
)

def activate_plugin(self, plugin: str) -> bool:
"""Activate plugin."""
return self._background_scheduler.activate_plugin(plugin)
return self._asynchronous_job_manager.activate_plugin(plugin)

def deactivate_plugin(self, plugin: str) -> bool:
"""Deactivate plugin."""
return self._background_scheduler.deactivate_plugin(plugin)
return self._asynchronous_job_manager.deactivate_plugin(plugin)

def get_database_blocked(self) -> bool:
"""Return tables loading flag."""
Expand All @@ -147,43 +153,15 @@ def get_hyrise_active(self) -> bool:

def get_loaded_tables(self) -> List[Dict[str, str]]:
"""Return already loaded tables."""
try:
with self._connection_factory.create_cursor() as cur:
cur.execute("select * from meta_tables;", None)
rows = cur.fetchall()
except (DatabaseError, InterfaceError):
return []
else:
return [row[0] for row in rows] if rows else []
return self._synchronous_job_manager.get_loaded_tables()

def get_loaded_benchmarks(self, loaded_tables) -> List[str]:
"""Get list of all benchmarks which are completely loaded."""
loaded_benchmarks: List = []
benchmark_names = _table_names.keys()
scale_factors = ["0_1", "1"]

for benchmark_name in benchmark_names:
for scale_factor in scale_factors:
required_tables = _table_names[benchmark_name]
loaded = True
for table_name in required_tables:
loaded = loaded and (
f"{table_name}_{benchmark_name}_{scale_factor}" in loaded_tables
)

if loaded:
loaded_benchmarks.append(f"{benchmark_name}_{scale_factor}")

return loaded_benchmarks
return self._synchronous_job_manager.get_loaded_benchmarks()

def get_loaded_benchmark_data(self) -> Tuple:
"""Get loaded benchmark data."""
loaded_tables: List = self.get_loaded_tables()
loaded_benchmarks: List = self.get_loaded_benchmarks(loaded_tables)
return (
loaded_tables,
loaded_benchmarks,
)
return self._synchronous_job_manager.get_loaded_benchmark_data()

def start_worker(self) -> bool:
"""Start worker."""
Expand All @@ -195,97 +173,29 @@ def close_worker(self) -> bool:

def _get_plugins(self) -> Optional[List[str]]:
"""Return all currently activated plugins."""
try:
with self._connection_factory.create_cursor() as cur:
cur.execute(("SELECT name FROM meta_plugins;"), None)
rows = cur.fetchall()
except (DatabaseError, InterfaceError):
return None
else:
return [row[0].split("Plugin")[0] for row in rows]
return self._synchronous_job_manager.get_plugins()

def _get_plugin_setting(self) -> Plugins:
"""Return currently set plugin settings."""
try:
with self._connection_factory.create_cursor() as cur:
cur.execute(
"SELECT name, value, description FROM meta_settings WHERE name LIKE 'Plugin::%';",
None,
)
rows = cur.fetchall()
except (DatabaseError, InterfaceError):
return None
else:
plugins: Dict[str, List[PluginSetting]] = {}
for row in rows:
plugin_name, setting_name = row[0].split("::")[1:]
value, description = row[1], row[2]
if plugins.get(plugin_name) is None:
plugins[plugin_name] = []
plugins[plugin_name].append(
PluginSetting(
name=setting_name, value=value, description=description
)
)
return plugins
return self._synchronous_job_manager.get_plugin_setting()

def get_detailed_plugins(self) -> Plugins:
"""Get all activated plugins with their settings."""
if (plugins := self._get_plugins()) is None:
return None
if (settings := self._get_plugin_setting()) is None:
return None
return {
plugin_name: (
settings[plugin_name] if plugin_name in settings.keys() else []
)
for plugin_name in plugins
}
return self._synchronous_job_manager.get_detailed_plugins()

def set_plugin_setting(
self, plugin_name: str, setting_name: str, setting_value: str
) -> bool:
"""Adjust setting for given plugin."""
if not self._database_blocked.value:
try:
with self._connection_factory.create_cursor() as cur:
cur.execute(
"UPDATE meta_settings SET value=%s WHERE name=%s;",
(
setting_value,
"::".join(["Plugin", plugin_name, setting_name]),
),
)
return True
except (DatabaseError, InterfaceError):
return False
return False
"""Set plug-in settings."""
return self._synchronous_job_manager.set_plugin_setting(
plugin_name, setting_name, setting_value
)

def execute_sql_query(self, query) -> Optional[SqlResultInterface]:
"""Execute sql query on database."""
if not self._database_blocked.value:
try:
with self._connection_factory.create_cursor() as cur:
cur.execute(query, None)
col_names = cur.fetch_column_names()
return SqlResultInterface(
id=self._id,
successful=True,
results=[[str(col) for col in row] for row in cur.fetchall()],
col_names=col_names,
error_message="",
)
except Error as e:
return SqlResultInterface(
id=self._id,
successful=False,
results=[],
col_names=[],
error_message=str(e),
)
return None
return self._synchronous_job_manager.execute_sql_query(query)

def close(self) -> None:
"""Close the database."""
self._worker_pool.terminate()
self._background_scheduler.close()
self._background_job_manager.close()
20 changes: 0 additions & 20 deletions hyrisecockpit/database_manager/job/activate_plugin.py

This file was deleted.

17 changes: 0 additions & 17 deletions hyrisecockpit/database_manager/job/deactivate_plugin.py

This file was deleted.

84 changes: 84 additions & 0 deletions hyrisecockpit/database_manager/job_manager/asynchronous.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Asynchronous job manager."""

from multiprocessing import Value
from threading import Thread

from .background import BackgroundJobManager
from .cursor import ConnectionFactory
from .job.delete_tables import delete_tables as delete_tables_job
from .job.load_tables import load_tables as load_tables_job
from .job.plugin import activate_plugin as activate_plugin_job
from .job.plugin import deactivate_plugin as deactivate_plugin_job


class AsynchronousJobManager(object):
"""Manage asynchronous jobs."""

def __init__(
self,
background_job_manager: BackgroundJobManager,
database_blocked: Value,
connection_factory: ConnectionFactory,
):
"""Initialize AsynchronousJobManager object."""
self._background_job_manager: BackgroundJobManager = background_job_manager
self._database_blocked: Value = database_blocked
self._connection_factory: ConnectionFactory = connection_factory

def load_tables(self, folder_name: str) -> bool:
"""Load tables."""
if not self._database_blocked.value:
self._database_blocked.value = True
job_thread = Thread(
target=load_tables_job,
args=(
self._database_blocked,
folder_name,
self._connection_factory,
self._background_job_manager,
),
)
job_thread.start()
return True
else:
return False

def delete_tables(self, folder_name: str) -> bool:
"""Delete tables."""
if not self._database_blocked.value:
self._database_blocked.value = True
job_thread = Thread(
target=delete_tables_job,
args=(
self._database_blocked,
folder_name,
self._connection_factory,
self._background_job_manager,
),
)
job_thread.start()
return True
else:
return False

def activate_plugin(self, plugin: str) -> bool:
"""Activate plugin."""
if not self._database_blocked.value:
job_thread = Thread(
target=activate_plugin_job, args=(self._connection_factory, plugin,)
)
job_thread.start()
return True
else:
return False

def deactivate_plugin(self, plugin: str) -> bool:
"""Dectivate plugin."""
if not self._database_blocked.value:
job_thread = Thread(
target=deactivate_plugin_job, args=(self._connection_factory, plugin,)
)
job_thread.start()
return True
else:
return False
Loading