From ac698f632a779afed3ddaa46f292855ecde5a060 Mon Sep 17 00:00:00 2001 From: Dan Fuller Date: Mon, 28 Oct 2024 10:56:46 -0700 Subject: [PATCH] feat(workflow-engine): Add basic `StatefulDetectorHandler` that can commit changes after related workflows finish running (#79573) This adds part of `StatefulDetectorHandler`. The main method here is `commit_state_update_data`, which is intended to be used once related workflows have been completed and we want to commit all the changes to the detector state. --- src/sentry/conf/server.py | 1 + .../migrations/0008_detector_state.py | 3 +- src/sentry/workflow_engine/models/detector.py | 115 +++++++++++++++-- .../workflow_engine/models/detector_state.py | 9 +- src/sentry/workflow_engine/types.py | 12 ++ tests/sentry/workflow_engine/__init__.py | 0 .../sentry/workflow_engine/models/__init__.py | 0 .../workflow_engine/models/test_detector.py | 120 ++++++++++++++++++ .../workflow_engine/processors/__init__.py | 0 .../processors/test_detector.py | 27 ++-- 10 files changed, 255 insertions(+), 32 deletions(-) create mode 100644 src/sentry/workflow_engine/types.py create mode 100644 tests/sentry/workflow_engine/__init__.py create mode 100644 tests/sentry/workflow_engine/models/__init__.py create mode 100644 tests/sentry/workflow_engine/models/test_detector.py create mode 100644 tests/sentry/workflow_engine/processors/__init__.py diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 34ebd34a095ef0..d018ad585a846f 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -136,6 +136,7 @@ def env( SENTRY_SPAN_BUFFER_CLUSTER = "default" SENTRY_ASSEMBLE_CLUSTER = "default" SENTRY_UPTIME_DETECTOR_CLUSTER = "default" +SENTRY_WORKFLOW_ENGINE_REDIS_CLUSTER = "default" # Hosts that are allowed to use system token authentication. # http://en.wikipedia.org/wiki/Reserved_IP_addresses diff --git a/src/sentry/workflow_engine/migrations/0008_detector_state.py b/src/sentry/workflow_engine/migrations/0008_detector_state.py index 2b4c0a649af18f..929e8aa0a0570f 100644 --- a/src/sentry/workflow_engine/migrations/0008_detector_state.py +++ b/src/sentry/workflow_engine/migrations/0008_detector_state.py @@ -7,6 +7,7 @@ import sentry.db.models.fields.foreignkey import sentry.workflow_engine.models.detector_state from sentry.new_migrations.migrations import CheckedMigration +from sentry.workflow_engine.types import DetectorPriorityLevel class Migration(CheckedMigration): @@ -45,7 +46,7 @@ class Migration(CheckedMigration): ( "state", models.CharField( - default=sentry.workflow_engine.models.detector_state.DetectorStatus["OK"], + default=DetectorPriorityLevel.OK, max_length=200, ), ), diff --git a/src/sentry/workflow_engine/models/detector.py b/src/sentry/workflow_engine/models/detector.py index dcabca1c66f54a..2698d682a187a4 100644 --- a/src/sentry/workflow_engine/models/detector.py +++ b/src/sentry/workflow_engine/models/detector.py @@ -3,24 +3,34 @@ import abc import dataclasses import logging -from typing import TYPE_CHECKING, Any, Generic, TypeVar +from datetime import timedelta +from typing import Any, Generic, TypeVar +from django.conf import settings from django.db import models -from django.db.models import UniqueConstraint +from django.db.models import Q, UniqueConstraint +from sentry_redis_tools.retrying_cluster import RetryingRedisCluster from sentry.backup.scopes import RelocationScope from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model from sentry.issues import grouptype from sentry.models.owner_base import OwnerModel -from sentry.types.group import PriorityLevel +from sentry.utils import redis from sentry.workflow_engine.models import DataPacket - -if TYPE_CHECKING: - from sentry.workflow_engine.models.detector_state import DetectorStatus +from sentry.workflow_engine.models.detector_state import DetectorState +from sentry.workflow_engine.types import DetectorPriorityLevel logger = logging.getLogger(__name__) +REDIS_TTL = int(timedelta(days=7).total_seconds()) + + +def get_redis_client() -> RetryingRedisCluster: + cluster_key = settings.SENTRY_WORKFLOW_ENGINE_REDIS_CLUSTER + return redis.redis_clusters.get(cluster_key) # type: ignore[return-value] + + @region_silo_model class Detector(DefaultFieldsModel, OwnerModel): __relocation_scope__ = RelocationScope.Organization @@ -83,20 +93,22 @@ def detector_handler(self) -> DetectorHandler | None: class DetectorStateData: group_key: str | None active: bool - status: DetectorStatus + status: DetectorPriorityLevel # Stateful detectors always process data packets in order. Once we confirm that a data packet has been fully # processed and all workflows have been done, this value will be used by the stateful detector to prevent # reprocessing dedupe_value: int # Stateful detectors allow various counts to be tracked. We need to update these after we process workflows, so - # include the updates in the state - counter_updates: dict[str, int] + # include the updates in the state. + # This dictionary is in the format {counter_name: counter_value, ...} + # If a counter value is `None` it means to unset the value + counter_updates: dict[str, int | None] @dataclasses.dataclass(frozen=True) class DetectorEvaluationResult: is_active: bool - priority: PriorityLevel + priority: DetectorPriorityLevel data: Any state_update_data: DetectorStateData | None = None @@ -111,3 +123,86 @@ def __init__(self, detector: Detector): @abc.abstractmethod def evaluate(self, data_packet: DataPacket[T]) -> list[DetectorEvaluationResult]: pass + + +class StatefulDetectorHandler(DetectorHandler[T], abc.ABC): + def build_dedupe_value_key(self, group_key: str | None) -> str: + if group_key is None: + group_key = "" + return f"{self.detector.id}:{group_key}:dedupe_value" + + def build_counter_value_key(self, group_key: str | None, counter_name: str) -> str: + if group_key is None: + group_key = "" + return f"{self.detector.id}:{group_key}:{counter_name}" + + def bulk_get_detector_state( + self, state_updates: list[DetectorStateData] + ) -> dict[str | None, DetectorState]: + group_keys = {update.group_key for update in state_updates} + query_filter = Q( + detector_group_key__in=[group_key for group_key in group_keys if group_key is not None] + ) + if None in group_keys: + query_filter |= Q(detector_group_key__isnull=True) + + return { + detector_state.detector_group_key: detector_state + for detector_state in self.detector.detectorstate_set.filter(query_filter) + } + + def commit_state_update_data(self, state_updates: list[DetectorStateData]): + self._bulk_commit_detector_state(state_updates) + self._bulk_commit_redis_state(state_updates) + + def _bulk_commit_redis_state(self, state_updates: list[DetectorStateData]): + dedupe_values = [] + group_counter_updates = {} + for state_update in state_updates: + dedupe_values.append((state_update.group_key, state_update.dedupe_value)) + group_counter_updates[state_update.group_key] = state_update.counter_updates + + pipeline = get_redis_client().pipeline() + if dedupe_values: + for group_key, dedupe_value in dedupe_values: + pipeline.set(self.build_dedupe_value_key(group_key), dedupe_value, ex=REDIS_TTL) + + if group_counter_updates: + for group_key, counter_updates in group_counter_updates.items(): + for counter_name, counter_value in counter_updates.items(): + key_name = self.build_counter_value_key(group_key, counter_name) + if counter_value is None: + pipeline.delete(key_name) + else: + pipeline.set(key_name, counter_value, ex=REDIS_TTL) + + pipeline.execute() + + def _bulk_commit_detector_state(self, state_updates: list[DetectorStateData]): + detector_state_lookup = self.bulk_get_detector_state(state_updates) + created_detector_states = [] + updated_detector_states = [] + for state_update in state_updates: + detector_state = detector_state_lookup.get(state_update.group_key) + if not detector_state: + created_detector_states.append( + DetectorState( + detector_group_key=state_update.group_key, + detector=self.detector, + active=state_update.active, + state=state_update.status, + ) + ) + elif ( + state_update.active != detector_state.active + or state_update.status != detector_state.state + ): + detector_state.active = state_update.active + detector_state.state = state_update.status + updated_detector_states.append(detector_state) + + if created_detector_states: + DetectorState.objects.bulk_create(created_detector_states) + + if updated_detector_states: + DetectorState.objects.bulk_update(updated_detector_states, ["active", "state"]) diff --git a/src/sentry/workflow_engine/models/detector_state.py b/src/sentry/workflow_engine/models/detector_state.py index 199546a315f817..9a97c8af8e8ec4 100644 --- a/src/sentry/workflow_engine/models/detector_state.py +++ b/src/sentry/workflow_engine/models/detector_state.py @@ -1,15 +1,10 @@ -from enum import StrEnum - from django.db import models from django.db.models import F, Value from django.db.models.functions import Coalesce from sentry.backup.scopes import RelocationScope from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model - - -class DetectorStatus(StrEnum): - OK = "ok" +from sentry.workflow_engine.types import DetectorPriorityLevel @region_silo_model @@ -26,7 +21,7 @@ class DetectorState(DefaultFieldsModel): active = models.BooleanField(default=False) # The current state of the detector - state = models.CharField(max_length=200, default=DetectorStatus.OK) + state = models.CharField(max_length=200, default=DetectorPriorityLevel.OK) class Meta: constraints = [ diff --git a/src/sentry/workflow_engine/types.py b/src/sentry/workflow_engine/types.py new file mode 100644 index 00000000000000..f998702ef15538 --- /dev/null +++ b/src/sentry/workflow_engine/types.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from enum import IntEnum + +from sentry.types.group import PriorityLevel + + +class DetectorPriorityLevel(IntEnum): + OK = 0 + LOW = PriorityLevel.LOW + MEDIUM = PriorityLevel.MEDIUM + HIGH = PriorityLevel.HIGH diff --git a/tests/sentry/workflow_engine/__init__.py b/tests/sentry/workflow_engine/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/tests/sentry/workflow_engine/models/__init__.py b/tests/sentry/workflow_engine/models/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/tests/sentry/workflow_engine/models/test_detector.py b/tests/sentry/workflow_engine/models/test_detector.py new file mode 100644 index 00000000000000..9725cfb8c93ee6 --- /dev/null +++ b/tests/sentry/workflow_engine/models/test_detector.py @@ -0,0 +1,120 @@ +import unittest + +from sentry.testutils.cases import TestCase +from sentry.workflow_engine.models import DataPacket, DetectorEvaluationResult +from sentry.workflow_engine.models.detector import ( + Detector, + DetectorStateData, + StatefulDetectorHandler, + get_redis_client, +) +from sentry.workflow_engine.models.detector_state import DetectorState +from sentry.workflow_engine.types import DetectorPriorityLevel + + +class MockDetectorStateHandler(StatefulDetectorHandler[dict]): + def evaluate(self, data_packet: DataPacket[dict]) -> list[DetectorEvaluationResult]: + return [] + + +class TestKeyBuilders(unittest.TestCase): + def build_handler(self, detector: Detector | None = None) -> MockDetectorStateHandler: + if detector is None: + detector = Detector(id=123) + return MockDetectorStateHandler(detector) + + def test(self): + assert self.build_handler().build_dedupe_value_key("test") == "123:test:dedupe_value" + assert self.build_handler().build_counter_value_key("test", "name_1") == "123:test:name_1" + + def test_different_dedupe_keys(self): + handler = self.build_handler() + handler_2 = self.build_handler(Detector(id=456)) + assert handler.build_dedupe_value_key("test") != handler_2.build_dedupe_value_key("test") + assert handler.build_dedupe_value_key("test") != handler_2.build_dedupe_value_key("test2") + assert handler.build_dedupe_value_key("test") == handler.build_dedupe_value_key("test") + assert handler.build_dedupe_value_key("test") != handler.build_dedupe_value_key("test_2") + + def test_different_counter_value_keys(self): + handler = self.build_handler() + handler_2 = self.build_handler(Detector(id=456)) + assert handler.build_counter_value_key( + "test", "name_1" + ) != handler_2.build_counter_value_key("test", "name_1") + assert handler.build_counter_value_key("test", "name_1") == handler.build_counter_value_key( + "test", "name_1" + ) + assert handler.build_counter_value_key("test", "name_1") != handler.build_counter_value_key( + "test2", "name_1" + ) + assert handler.build_counter_value_key("test", "name_1") != handler.build_counter_value_key( + "test", "name_2" + ) + assert handler.build_counter_value_key("test", "name_1") != handler.build_counter_value_key( + "test2", "name_2" + ) + + +class TestCommitStateUpdateData(TestCase): + def setUp(self): + super().setUp() + + def build_handler(self, detector: Detector | None = None) -> MockDetectorStateHandler: + if detector is None: + detector = self.create_detector() + return MockDetectorStateHandler(detector) + + def test(self): + handler = self.build_handler() + redis = get_redis_client() + group_key = None + assert not DetectorState.objects.filter( + detector=handler.detector, detector_group_key=group_key + ).exists() + dedupe_key = handler.build_dedupe_value_key(group_key) + counter_key_1 = handler.build_counter_value_key(group_key, "some_counter") + counter_key_2 = handler.build_counter_value_key(group_key, "another_counter") + assert not redis.exists(dedupe_key) + assert not redis.exists(counter_key_1) + assert not redis.exists(counter_key_2) + handler.commit_state_update_data( + [ + DetectorStateData( + group_key, + True, + DetectorPriorityLevel.OK, + 100, + {"some_counter": 1, "another_counter": 2}, + ) + ] + ) + assert DetectorState.objects.filter( + detector=handler.detector, + detector_group_key=group_key, + active=True, + state=DetectorPriorityLevel.OK, + ).exists() + assert redis.get(dedupe_key) == "100" + assert redis.get(counter_key_1) == "1" + assert redis.get(counter_key_2) == "2" + + handler.commit_state_update_data( + [ + DetectorStateData( + group_key, + False, + DetectorPriorityLevel.OK, + 150, + {"some_counter": None, "another_counter": 20}, + ) + ] + ) + assert DetectorState.objects.filter( + detector=handler.detector, + detector_group_key=group_key, + active=False, + state=DetectorPriorityLevel.OK, + ).exists() + assert redis.get(dedupe_key) == "150" + assert not redis.exists(counter_key_1) + assert redis.get(counter_key_2) == "20" diff --git a/tests/sentry/workflow_engine/processors/__init__.py b/tests/sentry/workflow_engine/processors/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/tests/sentry/workflow_engine/processors/test_detector.py b/tests/sentry/workflow_engine/processors/test_detector.py index f835b1d56d65c1..46a9a4b4f10879 100644 --- a/tests/sentry/workflow_engine/processors/test_detector.py +++ b/tests/sentry/workflow_engine/processors/test_detector.py @@ -1,15 +1,14 @@ from unittest import mock from sentry.issues.grouptype import GroupCategory, GroupType -from sentry.types.group import PriorityLevel from sentry.workflow_engine.models import DataPacket from sentry.workflow_engine.models.detector import ( DetectorEvaluationResult, DetectorHandler, DetectorStateData, ) -from sentry.workflow_engine.models.detector_state import DetectorStatus from sentry.workflow_engine.processors.detector import process_detectors +from sentry.workflow_engine.types import DetectorPriorityLevel from tests.sentry.issues.test_grouptype import BaseGroupTypeTest @@ -25,7 +24,7 @@ class NoHandlerGroupType(GroupType): class MockDetectorHandler(DetectorHandler[dict]): def evaluate(self, data_packet: DataPacket[dict]) -> list[DetectorEvaluationResult]: - return [DetectorEvaluationResult(True, PriorityLevel.HIGH, data_packet)] + return [DetectorEvaluationResult(True, DetectorPriorityLevel.HIGH, data_packet)] class HandlerGroupType(GroupType): type_id = 2 @@ -40,12 +39,12 @@ def evaluate(self, data_packet: DataPacket[dict]) -> list[DetectorEvaluationResu return [ DetectorEvaluationResult( True, - PriorityLevel.HIGH, + DetectorPriorityLevel.HIGH, data_packet, DetectorStateData( group_key, True, - DetectorStatus.OK, + DetectorPriorityLevel.OK, 100, {}, ), @@ -73,7 +72,7 @@ def test(self): data_packet = self.build_data_packet() results = process_detectors(data_packet, [detector]) assert results == [ - (detector, [DetectorEvaluationResult(True, PriorityLevel.HIGH, data_packet)]) + (detector, [DetectorEvaluationResult(True, DetectorPriorityLevel.HIGH, data_packet)]) ] def test_state_results(self): @@ -82,9 +81,9 @@ def test_state_results(self): results = process_detectors(data_packet, [detector]) result = DetectorEvaluationResult( True, - PriorityLevel.HIGH, + DetectorPriorityLevel.HIGH, data_packet, - DetectorStateData(None, True, DetectorStatus.OK, 100, {}), + DetectorStateData(None, True, DetectorPriorityLevel.OK, 100, {}), ) assert results == [ ( @@ -99,15 +98,15 @@ def test_state_results_multi_group(self): results = process_detectors(data_packet, [detector]) result_1 = DetectorEvaluationResult( True, - PriorityLevel.HIGH, + DetectorPriorityLevel.HIGH, data_packet, - DetectorStateData("group_1", True, DetectorStatus.OK, 100, {}), + DetectorStateData("group_1", True, DetectorPriorityLevel.OK, 100, {}), ) result_2 = DetectorEvaluationResult( True, - PriorityLevel.HIGH, + DetectorPriorityLevel.HIGH, data_packet, - DetectorStateData("group_2", True, DetectorStatus.OK, 100, {}), + DetectorStateData("group_2", True, DetectorPriorityLevel.OK, 100, {}), ) assert results == [ ( @@ -124,9 +123,9 @@ def test_state_results_multi_group_dupe(self): assert mock_logger.error.call_args[0][0] == "Duplicate detector state group keys found" result = DetectorEvaluationResult( True, - PriorityLevel.HIGH, + DetectorPriorityLevel.HIGH, data_packet, - DetectorStateData("dupe", True, DetectorStatus.OK, 100, {}), + DetectorStateData("dupe", True, DetectorPriorityLevel.OK, 100, {}), ) assert results == [ (