Skip to content

Commit

Permalink
Add additional monitor tasks tests
Browse files Browse the repository at this point in the history
  • Loading branch information
badrogger committed Oct 1, 2024
1 parent f7fa831 commit 812a579
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 8 deletions.
1 change: 1 addition & 0 deletions core/schains/monitor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def __init__(
self.node_config = node_config
self.dutils = dutils
self._future = Future()
self._start_ts = 0
self.stream_version = stream_version

@property
Expand Down
2 changes: 1 addition & 1 deletion core/schains/monitor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def execute_tasks(
stucked = []
while True:
for index, task in enumerate(tasks):
if not task.future.running() and task.needed:
if not task.future.running() and task.needed and len(stucked) == 0:
task.start_ts = int(time.time())
logger.info('Starting task %s at %d', task.name, task.start_ts)
pipeline = task.create_pipeline()
Expand Down
60 changes: 55 additions & 5 deletions tests/schains/monitor/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
import time
from concurrent.futures import Future
from typing import Callable
from unittest import mock

import pytest

from core.schains.firewall.types import IpRange
from core.schains.firewall.utils import get_sync_agent_ranges
from core.schains.process import ProcessReport
from core.schains.monitor.main import ConfigTask, SkaledTask
from core.schains.monitor.tasks import execute_tasks, ITask
from tools.configs.schains import SCHAINS_DIR_PATH
from tools.helper import is_node_part_of_chain
from web.models.schain import upsert_schain_record


@pytest.fixture
Expand Down Expand Up @@ -64,6 +67,57 @@ def test_is_node_part_of_chain(skale, schain_on_contracts, node_config):
assert not chain_on_node


def test_config_task(skale, skale_ima, schain_db, schain_on_contracts, node_config):
stream_version = '2.3.0'
config_task = ConfigTask(
schain_name=schain_on_contracts,
skale=skale,
skale_ima=skale_ima,
node_config=node_config,
stream_version=stream_version,
)
assert config_task.needed
skale_ima.linker.has_schain = mock.Mock(return_value=True)

def get_monitor_mock(*args, **kwargs):
result = mock.MagicMock()
result.__name__ = 'TestConfigMonitor'
return result

with mock.patch('core.schains.monitor.main.RegularConfigMonitor', get_monitor_mock):
pipeline = config_task.create_pipeline()
pipeline()


def test_skaled_task(skale, schain_db, schain_on_contracts, node_config, dutils):
record = upsert_schain_record(schain_on_contracts)
stream_version = '2.3.0'
skaled_task = SkaledTask(
schain_name=schain_on_contracts,
skale=skale,
node_config=node_config,
stream_version=stream_version,
dutils=dutils,
)
assert not skaled_task.needed
assert skaled_task.name == 'skaled'
assert skaled_task.start_ts == 0
assert skaled_task.stuck_timeout == 3600

record.set_config_version(stream_version)
assert skaled_task.needed

def get_monitor_mock(*args, **kwargs):
result = mock.MagicMock()
result.__name__ = 'TestSkaledMonitor'
return result

with mock.patch('core.schains.monitor.main.get_skaled_monitor', get_monitor_mock):
with mock.patch('core.schains.monitor.main.notify_checks'):
pipeline = skaled_task.create_pipeline()
pipeline()


def test_execute_tasks(tmp_dir, _schain_name):
def run_stuck_pipeline(index: int) -> None:
logging.info('Running stuck pipeline %d', index)
Expand Down Expand Up @@ -127,11 +181,7 @@ def needed(self) -> bool:

process_report = ProcessReport(name=_schain_name)
tasks = [StuckedTask(0), NotNeededTask(1)]
execute_tasks(
tasks=tasks,
process_report=process_report,
sleep_interval=1
)
execute_tasks(tasks=tasks, process_report=process_report, sleep_interval=1)

print(tasks[0], tasks[1])
assert tasks[0].start_ts == -1
Expand Down
4 changes: 2 additions & 2 deletions web/models/schain.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
logger = logging.getLogger(__name__)

DEFAULT_CONFIG_VERSION = '0.0.0'
RETRY_ATTEMPTS = 10
RETRY_ATTEMPTS = 5
TIMEOUTS = [2 ** p for p in range(RETRY_ATTEMPTS)]


Expand All @@ -43,7 +43,7 @@ def wrapper(cls, *args, **kwargs):
try:
result = func(cls, *args, **kwargs)
except OperationalError as e:
logger.exception('DB operational error')
logger.error('DB operational error. Sleeping %d', timeout, exc_info=e)
error = e
time.sleep(timeout)
else:
Expand Down

0 comments on commit 812a579

Please sign in to comment.