Skip to content

Commit

Permalink
feat(workflow-engine): Add basic StatefulDetectorHandler that can c…
Browse files Browse the repository at this point in the history
…ommit changes after related workflows finish running (#79573)

This adds part of `StatefulDetectorHandler`. The main method here is
`commit_state_update_data`, which is intended to be used once related
workflows have been completed and we want to commit all the changes to
the detector state.
  • Loading branch information
wedamija authored Oct 28, 2024
1 parent c6891fe commit ac698f6
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def env(
SENTRY_SPAN_BUFFER_CLUSTER = "default"
SENTRY_ASSEMBLE_CLUSTER = "default"
SENTRY_UPTIME_DETECTOR_CLUSTER = "default"
SENTRY_WORKFLOW_ENGINE_REDIS_CLUSTER = "default"

# Hosts that are allowed to use system token authentication.
# http://en.wikipedia.org/wiki/Reserved_IP_addresses
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/workflow_engine/migrations/0008_detector_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sentry.db.models.fields.foreignkey
import sentry.workflow_engine.models.detector_state
from sentry.new_migrations.migrations import CheckedMigration
from sentry.workflow_engine.types import DetectorPriorityLevel


class Migration(CheckedMigration):
Expand Down Expand Up @@ -45,7 +46,7 @@ class Migration(CheckedMigration):
(
"state",
models.CharField(
default=sentry.workflow_engine.models.detector_state.DetectorStatus["OK"],
default=DetectorPriorityLevel.OK,
max_length=200,
),
),
Expand Down
115 changes: 105 additions & 10 deletions src/sentry/workflow_engine/models/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,34 @@
import abc
import dataclasses
import logging
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from datetime import timedelta
from typing import Any, Generic, TypeVar

from django.conf import settings
from django.db import models
from django.db.models import UniqueConstraint
from django.db.models import Q, UniqueConstraint
from sentry_redis_tools.retrying_cluster import RetryingRedisCluster

from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model
from sentry.issues import grouptype
from sentry.models.owner_base import OwnerModel
from sentry.types.group import PriorityLevel
from sentry.utils import redis
from sentry.workflow_engine.models import DataPacket

if TYPE_CHECKING:
from sentry.workflow_engine.models.detector_state import DetectorStatus
from sentry.workflow_engine.models.detector_state import DetectorState
from sentry.workflow_engine.types import DetectorPriorityLevel

logger = logging.getLogger(__name__)


REDIS_TTL = int(timedelta(days=7).total_seconds())


def get_redis_client() -> RetryingRedisCluster:
cluster_key = settings.SENTRY_WORKFLOW_ENGINE_REDIS_CLUSTER
return redis.redis_clusters.get(cluster_key) # type: ignore[return-value]


@region_silo_model
class Detector(DefaultFieldsModel, OwnerModel):
__relocation_scope__ = RelocationScope.Organization
Expand Down Expand Up @@ -83,20 +93,22 @@ def detector_handler(self) -> DetectorHandler | None:
class DetectorStateData:
group_key: str | None
active: bool
status: DetectorStatus
status: DetectorPriorityLevel
# Stateful detectors always process data packets in order. Once we confirm that a data packet has been fully
# processed and all workflows have been done, this value will be used by the stateful detector to prevent
# reprocessing
dedupe_value: int
# Stateful detectors allow various counts to be tracked. We need to update these after we process workflows, so
# include the updates in the state
counter_updates: dict[str, int]
# include the updates in the state.
# This dictionary is in the format {counter_name: counter_value, ...}
# If a counter value is `None` it means to unset the value
counter_updates: dict[str, int | None]


@dataclasses.dataclass(frozen=True)
class DetectorEvaluationResult:
is_active: bool
priority: PriorityLevel
priority: DetectorPriorityLevel
data: Any
state_update_data: DetectorStateData | None = None

Expand All @@ -111,3 +123,86 @@ def __init__(self, detector: Detector):
@abc.abstractmethod
def evaluate(self, data_packet: DataPacket[T]) -> list[DetectorEvaluationResult]:
pass


class StatefulDetectorHandler(DetectorHandler[T], abc.ABC):
def build_dedupe_value_key(self, group_key: str | None) -> str:
if group_key is None:
group_key = ""
return f"{self.detector.id}:{group_key}:dedupe_value"

def build_counter_value_key(self, group_key: str | None, counter_name: str) -> str:
if group_key is None:
group_key = ""
return f"{self.detector.id}:{group_key}:{counter_name}"

def bulk_get_detector_state(
self, state_updates: list[DetectorStateData]
) -> dict[str | None, DetectorState]:
group_keys = {update.group_key for update in state_updates}
query_filter = Q(
detector_group_key__in=[group_key for group_key in group_keys if group_key is not None]
)
if None in group_keys:
query_filter |= Q(detector_group_key__isnull=True)

return {
detector_state.detector_group_key: detector_state
for detector_state in self.detector.detectorstate_set.filter(query_filter)
}

def commit_state_update_data(self, state_updates: list[DetectorStateData]):
self._bulk_commit_detector_state(state_updates)
self._bulk_commit_redis_state(state_updates)

def _bulk_commit_redis_state(self, state_updates: list[DetectorStateData]):
dedupe_values = []
group_counter_updates = {}
for state_update in state_updates:
dedupe_values.append((state_update.group_key, state_update.dedupe_value))
group_counter_updates[state_update.group_key] = state_update.counter_updates

pipeline = get_redis_client().pipeline()
if dedupe_values:
for group_key, dedupe_value in dedupe_values:
pipeline.set(self.build_dedupe_value_key(group_key), dedupe_value, ex=REDIS_TTL)

if group_counter_updates:
for group_key, counter_updates in group_counter_updates.items():
for counter_name, counter_value in counter_updates.items():
key_name = self.build_counter_value_key(group_key, counter_name)
if counter_value is None:
pipeline.delete(key_name)
else:
pipeline.set(key_name, counter_value, ex=REDIS_TTL)

pipeline.execute()

def _bulk_commit_detector_state(self, state_updates: list[DetectorStateData]):
detector_state_lookup = self.bulk_get_detector_state(state_updates)
created_detector_states = []
updated_detector_states = []
for state_update in state_updates:
detector_state = detector_state_lookup.get(state_update.group_key)
if not detector_state:
created_detector_states.append(
DetectorState(
detector_group_key=state_update.group_key,
detector=self.detector,
active=state_update.active,
state=state_update.status,
)
)
elif (
state_update.active != detector_state.active
or state_update.status != detector_state.state
):
detector_state.active = state_update.active
detector_state.state = state_update.status
updated_detector_states.append(detector_state)

if created_detector_states:
DetectorState.objects.bulk_create(created_detector_states)

if updated_detector_states:
DetectorState.objects.bulk_update(updated_detector_states, ["active", "state"])
9 changes: 2 additions & 7 deletions src/sentry/workflow_engine/models/detector_state.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
from enum import StrEnum

from django.db import models
from django.db.models import F, Value
from django.db.models.functions import Coalesce

from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model


class DetectorStatus(StrEnum):
OK = "ok"
from sentry.workflow_engine.types import DetectorPriorityLevel


@region_silo_model
Expand All @@ -26,7 +21,7 @@ class DetectorState(DefaultFieldsModel):
active = models.BooleanField(default=False)

# The current state of the detector
state = models.CharField(max_length=200, default=DetectorStatus.OK)
state = models.CharField(max_length=200, default=DetectorPriorityLevel.OK)

class Meta:
constraints = [
Expand Down
12 changes: 12 additions & 0 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from __future__ import annotations

from enum import IntEnum

from sentry.types.group import PriorityLevel


class DetectorPriorityLevel(IntEnum):
OK = 0
LOW = PriorityLevel.LOW
MEDIUM = PriorityLevel.MEDIUM
HIGH = PriorityLevel.HIGH
Empty file.
Empty file.
120 changes: 120 additions & 0 deletions tests/sentry/workflow_engine/models/test_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import unittest

from sentry.testutils.cases import TestCase
from sentry.workflow_engine.models import DataPacket, DetectorEvaluationResult
from sentry.workflow_engine.models.detector import (
Detector,
DetectorStateData,
StatefulDetectorHandler,
get_redis_client,
)
from sentry.workflow_engine.models.detector_state import DetectorState
from sentry.workflow_engine.types import DetectorPriorityLevel


class MockDetectorStateHandler(StatefulDetectorHandler[dict]):
def evaluate(self, data_packet: DataPacket[dict]) -> list[DetectorEvaluationResult]:
return []


class TestKeyBuilders(unittest.TestCase):
def build_handler(self, detector: Detector | None = None) -> MockDetectorStateHandler:
if detector is None:
detector = Detector(id=123)
return MockDetectorStateHandler(detector)

def test(self):
assert self.build_handler().build_dedupe_value_key("test") == "123:test:dedupe_value"
assert self.build_handler().build_counter_value_key("test", "name_1") == "123:test:name_1"

def test_different_dedupe_keys(self):
handler = self.build_handler()
handler_2 = self.build_handler(Detector(id=456))
assert handler.build_dedupe_value_key("test") != handler_2.build_dedupe_value_key("test")
assert handler.build_dedupe_value_key("test") != handler_2.build_dedupe_value_key("test2")
assert handler.build_dedupe_value_key("test") == handler.build_dedupe_value_key("test")
assert handler.build_dedupe_value_key("test") != handler.build_dedupe_value_key("test_2")

def test_different_counter_value_keys(self):
handler = self.build_handler()
handler_2 = self.build_handler(Detector(id=456))
assert handler.build_counter_value_key(
"test", "name_1"
) != handler_2.build_counter_value_key("test", "name_1")
assert handler.build_counter_value_key("test", "name_1") == handler.build_counter_value_key(
"test", "name_1"
)
assert handler.build_counter_value_key("test", "name_1") != handler.build_counter_value_key(
"test2", "name_1"
)
assert handler.build_counter_value_key("test", "name_1") != handler.build_counter_value_key(
"test", "name_2"
)
assert handler.build_counter_value_key("test", "name_1") != handler.build_counter_value_key(
"test2", "name_2"
)


class TestCommitStateUpdateData(TestCase):
def setUp(self):
super().setUp()

def build_handler(self, detector: Detector | None = None) -> MockDetectorStateHandler:
if detector is None:
detector = self.create_detector()
return MockDetectorStateHandler(detector)

def test(self):
handler = self.build_handler()
redis = get_redis_client()
group_key = None
assert not DetectorState.objects.filter(
detector=handler.detector, detector_group_key=group_key
).exists()
dedupe_key = handler.build_dedupe_value_key(group_key)
counter_key_1 = handler.build_counter_value_key(group_key, "some_counter")
counter_key_2 = handler.build_counter_value_key(group_key, "another_counter")
assert not redis.exists(dedupe_key)
assert not redis.exists(counter_key_1)
assert not redis.exists(counter_key_2)
handler.commit_state_update_data(
[
DetectorStateData(
group_key,
True,
DetectorPriorityLevel.OK,
100,
{"some_counter": 1, "another_counter": 2},
)
]
)
assert DetectorState.objects.filter(
detector=handler.detector,
detector_group_key=group_key,
active=True,
state=DetectorPriorityLevel.OK,
).exists()
assert redis.get(dedupe_key) == "100"
assert redis.get(counter_key_1) == "1"
assert redis.get(counter_key_2) == "2"

handler.commit_state_update_data(
[
DetectorStateData(
group_key,
False,
DetectorPriorityLevel.OK,
150,
{"some_counter": None, "another_counter": 20},
)
]
)
assert DetectorState.objects.filter(
detector=handler.detector,
detector_group_key=group_key,
active=False,
state=DetectorPriorityLevel.OK,
).exists()
assert redis.get(dedupe_key) == "150"
assert not redis.exists(counter_key_1)
assert redis.get(counter_key_2) == "20"
Empty file.
Loading

0 comments on commit ac698f6

Please sign in to comment.