Skip to content

Commit

Permalink
fix: relying on heartbeat to turn to running status
Browse files Browse the repository at this point in the history
Before we turn an activation status to running as soon as the rulebook
container is running, but the rulebook engine may not be actually ready.
This fix turns the status to running only when the activation manager
detects updated_at is set after the server receives the first
heartbeat from the rulebook engine.

AAP-31225: event-stream connected Rulebook activations report running
before they are actually able to receive events
  • Loading branch information
bzwei committed Oct 15, 2024
1 parent 2bae5c7 commit 6a86a5d
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 12 deletions.
26 changes: 19 additions & 7 deletions src/aap_eda/services/activation/activation_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,7 @@ def _start_activation_instance(self):
self._error_activation(msg)
raise exceptions.ActivationStartError(msg) from exc

# update status
self.set_status(ActivationStatus.RUNNING)
with transaction.atomic():
self.set_latest_instance_status(ActivationStatus.RUNNING)
self._set_activation_pod_id(pod_id=container_id)
self._reset_failure_count()
self._set_activation_pod_id(pod_id=container_id)

# update logs
LOGGER.info(
Expand Down Expand Up @@ -357,14 +352,20 @@ def _is_already_stopped(self) -> bool:
)

def _is_unresponsive(self) -> bool:
previous_time = None
if self.db_instance.status in [
ActivationStatus.RUNNING,
ActivationStatus.STARTING,
]:
previous_time = self.latest_instance.updated_at
elif self.db_instance.status == ActivationStatus.STARTING:
previous_time = self.latest_instance.started_at

if previous_time:
cutoff_time = timezone.now() - timedelta(
seconds=settings.RULEBOOK_LIVENESS_TIMEOUT_SECONDS,
)
return self.latest_instance.updated_at < cutoff_time
return previous_time < cutoff_time
return False

def _unresponsive_policy(self):
Expand Down Expand Up @@ -861,6 +862,7 @@ def monitor(self):
return

if self.db_instance.status not in [
ActivationStatus.STARTING,
ActivationStatus.RUNNING,
ActivationStatus.WORKERS_OFFLINE,
]:
Expand All @@ -872,6 +874,16 @@ def monitor(self):
LOGGER.info(msg)
return

if (
self.latest_instance.status == ActivationStatus.STARTING
and self.latest_instance.updated_at
):
# update status
with transaction.atomic():
self.set_status(ActivationStatus.RUNNING)
self.set_latest_instance_status(ActivationStatus.RUNNING)
self._reset_failure_count()

# get the status of the container
container_status = None
with contextlib.suppress(engine_exceptions.ContainerNotFoundError):
Expand Down
2 changes: 2 additions & 0 deletions src/aap_eda/tasks/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def _manage(process_parent_type: str, id: int) -> None:
break

if not has_request_processed and process_parent.status in [
ActivationStatus.STARTING,
ActivationStatus.RUNNING,
ActivationStatus.WORKERS_OFFLINE,
]:
Expand Down Expand Up @@ -466,6 +467,7 @@ def monitor_rulebook_processes() -> None:
# monitor running instances
for process in models.RulebookProcess.objects.filter(
status__in=[
ActivationStatus.STARTING,
ActivationStatus.RUNNING,
ActivationStatus.WORKERS_OFFLINE,
]
Expand Down
5 changes: 5 additions & 0 deletions src/aap_eda/wsapi/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from aap_eda.core import models
from aap_eda.core.enums import DefaultCredentialType
from aap_eda.core.models.activation import ActivationStatus
from aap_eda.tasks import orchestrator

from .messages import (
ActionMessage,
Expand Down Expand Up @@ -161,6 +163,9 @@ def handle_heartbeat(self, message: HeartbeatMessage) -> None:
message.stats["ruleSetName"]
] = message.stats
activation.save(update_fields=["ruleset_stats"])

if activation.status == ActivationStatus.STARTING:
orchestrator.monitor_rulebook_processes()
else:
logger.warning(
f"Activation instance {message.activation_id} is not present."
Expand Down
54 changes: 50 additions & 4 deletions tests/integration/services/activation/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import pytest
from _pytest.logging import LogCaptureFixture
from django.utils import timezone
from pytest_django.fixtures import SettingsWrapper
from pytest_lazyfixture import lazy_fixture

Expand Down Expand Up @@ -81,6 +82,21 @@ def activation_with_instance(
return basic_activation


@pytest.fixture
def starting_activation(activation_with_instance: models.Activation):
"""Return a running activation."""
activation = activation_with_instance
activation.status = enums.ActivationStatus.STARTING
activation.save(update_fields=["status"])
activation.latest_instance.status = enums.ActivationStatus.STARTING
activation.latest_instance.activation_pod_id = "test-pod-id"
activation.latest_instance.updated_at = timezone.now()
activation.latest_instance.save(
update_fields=["status", "activation_pod_id", "updated_at"],
)
return activation


@pytest.fixture
def running_activation(activation_with_instance: models.Activation):
"""Return a running activation."""
Expand Down Expand Up @@ -151,6 +167,13 @@ def job_mock():
return mock_job


@pytest.fixture
def running_container_status_mock():
status_mock = MagicMock()
status_mock.status = enums.ActivationStatus.RUNNING
return status_mock


@pytest.mark.django_db
def test_get_container_request(
activation_with_instance: models.Activation,
Expand Down Expand Up @@ -306,10 +329,10 @@ def test_start_first_run(
assert container_engine_mock.start.called
assert container_engine_mock.update_logs.called
assert "Starting" in eda_caplog.text
assert basic_activation.status == enums.ActivationStatus.RUNNING
assert basic_activation.status == enums.ActivationStatus.STARTING
assert (
basic_activation.latest_instance.status
== enums.ActivationStatus.RUNNING
== enums.ActivationStatus.STARTING
)
assert basic_activation.latest_instance.activation_pod_id == "test-pod-id"
assert basic_activation.restart_count == 0
Expand All @@ -320,6 +343,29 @@ def test_start_first_run(
assert rulebook_process_queue.queue_name == job_mock.origin


@pytest.mark.django_db
def test_monitor_to_running_status(
starting_activation: models.Activation,
container_engine_mock: MagicMock,
running_container_status_mock: MagicMock,
):
"""Teset monitor task brings activation to running status"""
activation_manager = ActivationManager(
db_instance=starting_activation,
container_engine=container_engine_mock,
)
container_engine_mock.get_status.return_value = (
running_container_status_mock
)
activation_manager.monitor()
assert starting_activation.status == enums.ActivationStatus.RUNNING
assert (
starting_activation.latest_instance.status
== enums.ActivationStatus.RUNNING
)
assert starting_activation.restart_count == 0


@pytest.mark.django_db
def test_start_restart(
running_activation: models.Activation,
Expand All @@ -342,10 +388,10 @@ def test_start_restart(
assert container_engine_mock.start.called
assert container_engine_mock.update_logs.called
assert "Starting" in eda_caplog.text
assert running_activation.status == enums.ActivationStatus.RUNNING
assert running_activation.status == enums.ActivationStatus.STARTING
assert (
running_activation.latest_instance.status
== enums.ActivationStatus.RUNNING
== enums.ActivationStatus.STARTING
)
assert (
running_activation.latest_instance.activation_pod_id == "test-pod-id"
Expand Down
53 changes: 52 additions & 1 deletion tests/integration/wsapi/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import uuid
from typing import Generator
from unittest.mock import patch

import pytest
import pytest_asyncio
Expand All @@ -9,6 +10,7 @@
from pydantic.error_wrappers import ValidationError

from aap_eda.core import enums, models
from aap_eda.core.models.activation import ActivationStatus
from aap_eda.wsapi.consumers import AnsibleRulebookConsumer

# TODO(doston): this test module needs a whole refactor to use already
Expand Down Expand Up @@ -441,6 +443,53 @@ async def test_handle_heartbeat(
]


@pytest.mark.django_db(transaction=True)
async def test_handle_heartbeat_running_status(
ws_communicator: WebsocketCommunicator,
default_organization: models.Organization,
):
rulebook_process_id = await _prepare_db_data(ActivationStatus.STARTING)
rulebook_process = await get_rulebook_process(rulebook_process_id)
activation = await get_activation_by_rulebook_process(rulebook_process_id)
assert activation.ruleset_stats == {}

stats = [
{
"start": rulebook_process.started_at.strftime(DATETIME_FORMAT),
"end": None,
"numberOfRules": 1,
"numberOfDisabledRules": 0,
"rulesTriggered": 1,
"eventsProcessed": 2000,
"eventsMatched": 1,
"eventsSuppressed": 1999,
"permanentStorageSize": 0,
"asyncResponses": 0,
"bytesSentOnAsync": 0,
"sessionId": 1,
"ruleSetName": "ruleset1",
},
]

payloads = [
{
"type": "SessionStats",
"activation_id": rulebook_process_id,
"stats": stat,
"reported_at": timezone.now().strftime(DATETIME_FORMAT),
}
for stat in stats
]
with patch(
"aap_eda.tasks.orchestrator.monitor_rulebook_processes"
) as mock_orchestrator:
for payload in payloads:
await ws_communicator.send_json_to(payload)

await ws_communicator.wait()
mock_orchestrator.assert_called_once()


@pytest.mark.django_db(transaction=True)
async def test_multiple_rules_for_one_event(
ws_communicator: WebsocketCommunicator,
Expand Down Expand Up @@ -773,7 +822,7 @@ def _prepare_activation_with_controller_info(inputs=AAP_INPUTS):


@database_sync_to_async
def _prepare_db_data():
def _prepare_db_data(status=ActivationStatus.RUNNING):
project, _ = models.Project.objects.get_or_create(
name="test-project",
url="https://github.com/test/project",
Expand Down Expand Up @@ -812,10 +861,12 @@ def _prepare_db_data():
user=user,
decision_environment=decision_environment,
awx_token=token[0],
status=status,
)

rulebook_process, _ = models.RulebookProcess.objects.get_or_create(
activation=activation,
status=status,
)

ruleset, _ = models.Ruleset.objects.get_or_create(
Expand Down

0 comments on commit 6a86a5d

Please sign in to comment.