Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(alerts): add a regular job to detect anomalies #22762

Merged
merged 33 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
eb4cb49
init
nikitaevg Jun 6, 2024
ba51409
initial version of the regular job
nikitaevg Jun 8, 2024
bb71f5a
small polishing
nikitaevg Jun 8, 2024
1d57843
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jun 8, 2024
15e0b6b
small test fix
nikitaevg Jun 8, 2024
71c44d9
fix types in tests
nikitaevg Jun 8, 2024
5e59a5d
fix the crontab schedule to every hour
nikitaevg Jun 8, 2024
29e613b
add a newline to the template
nikitaevg Jun 8, 2024
b1a0219
add a test to check insight date range
nikitaevg Jun 11, 2024
0b2e57a
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jun 11, 2024
29d9305
use the new display type naming
nikitaevg Jun 11, 2024
0cd1a01
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jul 2, 2024
b8b4fec
address PR comments
nikitaevg Jul 2, 2024
3475c37
Merge branch 'master' into 14331-regular-job
webjunkie Jul 5, 2024
16227b9
Refactor things
webjunkie Jul 5, 2024
712d780
Fix scheduled task setup
webjunkie Jul 5, 2024
f179e37
Refactor more
webjunkie Jul 5, 2024
5cf4e46
Fix group setup
webjunkie Jul 9, 2024
d940441
address comments
nikitaevg Jul 12, 2024
d91bcc9
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jul 12, 2024
ed27736
fix typing
nikitaevg Jul 13, 2024
a1966b4
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jul 13, 2024
4e97160
Revert "Fix scheduled task setup"
webjunkie Jul 17, 2024
028d155
use si for chains
nikitaevg Jul 18, 2024
1920482
Merge remote-tracking branch 'upstream/master' into 14331-regular-job
nikitaevg Jul 18, 2024
fa7dc4c
use timestamp for the campaign key
nikitaevg Jul 18, 2024
e5b44ca
Merge branch 'master' into 14331-regular-job
nikitaevg Jul 23, 2024
b8d4e60
Merge branch 'master' into 14331-regular-job
nikitaevg Jul 23, 2024
e3db36a
Merge branch 'master' into 14331-regular-job
webjunkie Aug 6, 2024
4183314
Merge branch 'master' into 14331-regular-job
webjunkie Aug 6, 2024
66f0c4a
brush up the PR
nikitaevg Aug 6, 2024
d274bea
Merge branch 'master' into 14331-regular-job
nikitaevg Aug 13, 2024
0e8d28a
Merge branch 'master' into 14331-regular-job
webjunkie Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ee/tasks/test/subscriptions/test_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from posthog.models.exported_asset import ExportedAsset
from posthog.models.insight import Insight
from posthog.models.instance_setting import set_instance_setting
from posthog.models.subscription import Subscription
from posthog.test.base import APIBaseTest


Expand All @@ -24,7 +23,6 @@
@patch("ee.tasks.subscriptions.generate_assets")
@freeze_time("2022-02-02T08:55:00.000Z")
class TestSubscriptionsTasks(APIBaseTest):
subscriptions: list[Subscription] = None # type: ignore
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a redundant field

dashboard: Dashboard
insight: Insight
tiles: list[DashboardTile] = None # type: ignore
Expand Down
12 changes: 10 additions & 2 deletions frontend/src/lib/components/Alerts/views/EditAlert.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,18 @@ export function EditAlert({ id, insightShortId, onCancel, onDelete }: EditAlertP
</LemonField>
<Group name={['anomaly_condition', 'absoluteThreshold']}>
<span className="flex gap-10">
<LemonField name="lower" label="Lower threshold">
<LemonField
name="lower"
label="Lower threshold"
help="Notify if the value is strictly below"
>
<LemonInput type="number" className="w-20" data-attr="alert-lower-threshold" />
</LemonField>
<LemonField name="upper" label="Upper threshold">
<LemonField
name="upper"
label="Upper threshold"
help="Notify if the value is strictly above"
>
<LemonInput type="number" className="w-20" data-attr="alert-upper-threshold" />
</LemonField>
</span>
Expand Down
23 changes: 23 additions & 0 deletions frontend/src/queries/schema.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AbsoluteThreshold": {
"additionalProperties": false,
"properties": {
"lower": {
"type": ["number", "null"]
},
"upper": {
"type": ["number", "null"]
}
},
"required": ["lower", "upper"],
"type": "object"
},
"ActionsNode": {
"additionalProperties": false,
"properties": {
Expand Down Expand Up @@ -179,6 +192,16 @@
"enum": ["numeric", "duration", "duration_ms", "percentage", "percentage_scaled"],
"type": "string"
},
"AnomalyCondition": {
"additionalProperties": false,
"properties": {
"absoluteThreshold": {
"$ref": "#/definitions/AbsoluteThreshold"
}
},
"required": ["absoluteThreshold"],
"type": "object"
},
"AnyDataNode": {
"anyOf": [
{
Expand Down
9 changes: 9 additions & 0 deletions frontend/src/queries/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1586,3 +1586,12 @@ export interface DashboardFilter {
date_to?: string | null
properties?: AnyPropertyFilter[] | null
}

export interface AbsoluteThreshold {
lower: number | null
upper: number | null
}

export interface AnomalyCondition {
absoluteThreshold: AbsoluteThreshold
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ export const insightNavLogic = kea<insightNavLogicType>([
},
})),
urlToAction(({ actions }) => ({
'/insights/:shortId(/:mode)(/:subscriptionId)': (
'/insights/:shortId(/:mode)(/:itemId)': (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_, // url params
{ dashboard, ...searchParams }, // search params
{ filters: _filters } // hash params
Expand Down
6 changes: 3 additions & 3 deletions frontend/src/scenes/insights/InsightPageHeader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { ExporterFormat, InsightLogicProps, InsightModel, InsightShortId, ItemMo

export function InsightPageHeader({ insightLogicProps }: { insightLogicProps: InsightLogicProps }): JSX.Element {
// insightSceneLogic
const { insightMode, subscriptionId } = useValues(insightSceneLogic)
const { insightMode, itemId } = useValues(insightSceneLogic)
const { setInsightMode } = useActions(insightSceneLogic)

// insightLogic
Expand Down Expand Up @@ -71,7 +71,7 @@ export function InsightPageHeader({ insightLogicProps }: { insightLogicProps: In
isOpen={insightMode === ItemMode.Subscriptions}
closeModal={() => push(urls.insightView(insight.short_id as InsightShortId))}
insightShortId={insight.short_id}
subscriptionId={subscriptionId}
subscriptionId={itemId}
/>
<SharingModal
title="Insight sharing"
Expand All @@ -91,7 +91,7 @@ export function InsightPageHeader({ insightLogicProps }: { insightLogicProps: In
isOpen={insightMode === ItemMode.Alerts}
closeModal={() => push(urls.insightView(insight.short_id as InsightShortId))}
insightShortId={insight.short_id as InsightShortId}
alertId={subscriptionId}
alertId={itemId}
/>
<NewDashboardModal />
</>
Expand Down
26 changes: 9 additions & 17 deletions frontend/src/scenes/insights/insightSceneLogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ export const insightSceneLogic = kea<insightSceneLogicType>([
actions({
setInsightId: (insightId: InsightShortId) => ({ insightId }),
setInsightMode: (insightMode: ItemMode, source: InsightEventSource | null) => ({ insightMode, source }),
setSceneState: (insightId: InsightShortId, insightMode: ItemMode, subscriptionId: string | undefined) => ({
setSceneState: (insightId: InsightShortId, insightMode: ItemMode, itemId: string | undefined) => ({
insightId,
insightMode,
subscriptionId,
itemId,
}),
setInsightLogicRef: (logic: BuiltLogic<insightLogicType> | null, unmount: null | (() => void)) => ({
logic,
Expand All @@ -60,15 +60,11 @@ export const insightSceneLogic = kea<insightSceneLogicType>([
setSceneState: (_, { insightMode }) => insightMode,
},
],
subscriptionId: [
itemId: [
null as null | number | 'new',
{
setSceneState: (_, { subscriptionId }) =>
subscriptionId !== undefined
? subscriptionId === 'new'
? 'new'
: parseInt(subscriptionId, 10)
: null,
setSceneState: (_, { itemId }) =>
itemId !== undefined ? (itemId === 'new' ? 'new' : parseInt(itemId, 10)) : null,
},
],
insightLogicRef: [
Expand Down Expand Up @@ -202,8 +198,8 @@ export const insightSceneLogic = kea<insightSceneLogicType>([
}
)
},
'/insights/:shortId(/:mode)(/:subscriptionId)': (
{ shortId, mode, subscriptionId }, // url params
'/insights/:shortId(/:mode)(/:itemId)': (
{ shortId, mode, itemId }, // url params
{ dashboard, ...searchParams }, // search params
{ filters: _filters, q }, // hash params
{ method, initial }, // "location changed" event payload
Expand Down Expand Up @@ -237,12 +233,8 @@ export const insightSceneLogic = kea<insightSceneLogicType>([
return
}

if (
insightId !== values.insightId ||
insightMode !== values.insightMode ||
subscriptionId !== values.subscriptionId
) {
actions.setSceneState(insightId, insightMode, subscriptionId)
if (insightId !== values.insightId || insightMode !== values.insightMode || itemId !== values.itemId) {
actions.setSceneState(insightId, insightMode, itemId)
}

// capture any filters from the URL, either #filters={} or ?insight=X&bla=foo&bar=baz
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/scenes/scenes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ export const routes: Record<string, Scene> = {
[urls.insightEdit(':shortId' as InsightShortId)]: Scene.Insight,
[urls.insightView(':shortId' as InsightShortId)]: Scene.Insight,
[urls.insightSubcriptions(':shortId' as InsightShortId)]: Scene.Insight,
[urls.insightSubcription(':shortId' as InsightShortId, ':subscriptionId')]: Scene.Insight,
[urls.alert(':shortId' as InsightShortId, ':subscriptionId')]: Scene.Insight,
[urls.insightSubcription(':shortId' as InsightShortId, ':itemId')]: Scene.Insight,
[urls.alert(':shortId' as InsightShortId, ':itemId')]: Scene.Insight,
[urls.alerts(':shortId' as InsightShortId)]: Scene.Insight,
[urls.insightSharing(':shortId' as InsightShortId)]: Scene.Insight,
[urls.savedInsights()]: Scene.SavedInsights,
Expand Down
1 change: 1 addition & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4273,6 +4273,7 @@ export type HogFunctionStatus = {
}[]
}

// TODO: move to schema.ts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to do it in a follow up PR, but yeah, I can fix it here, done

export interface AnomalyCondition {
absoluteThreshold: {
lower?: number
Expand Down
2 changes: 1 addition & 1 deletion posthog/caching/calculate_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def get_cache_type(cacheable: Optional[FilterType] | Optional[dict]) -> CacheTyp


def calculate_for_query_based_insight(
insight: Insight, *, dashboard: Optional[Dashboard] = None, execution_mode: ExecutionMode, user: User
insight: Insight, *, dashboard: Optional[Dashboard] = None, execution_mode: ExecutionMode, user: Optional[User]
) -> "InsightResult":
from posthog.caching.fetch_from_cache import InsightResult, NothingInCacheResult
from posthog.caching.insight_cache import update_cached_state
Expand Down
2 changes: 1 addition & 1 deletion posthog/models/alert.py
nikitaevg marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class Alert(models.Model):
team: models.ForeignKey = models.ForeignKey("Team", on_delete=models.CASCADE)
insight = models.ForeignKey("posthog.Insight", on_delete=models.CASCADE)
insight: models.ForeignKey = models.ForeignKey("posthog.Insight", on_delete=models.CASCADE)

name: models.CharField = models.CharField(max_length=100)
target_value: models.TextField = models.TextField()
Expand Down
15 changes: 15 additions & 0 deletions posthog/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ class SchemaRoot(RootModel[Any]):
root: Any


class AbsoluteThreshold(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
lower: Optional[float] = None
upper: Optional[float] = None


class MathGroupTypeIndex(float, Enum):
NUMBER_0 = 0
NUMBER_1 = 1
Expand All @@ -28,6 +36,13 @@ class AggregationAxisFormat(StrEnum):
PERCENTAGE_SCALED = "percentage_scaled"


class AnomalyCondition(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
absoluteThreshold: AbsoluteThreshold


class Kind(StrEnum):
METHOD = "Method"
FUNCTION = "Function"
Expand Down
2 changes: 2 additions & 0 deletions posthog/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
check_clickhouse_schema_drift,
demo_create_data,
demo_reset_master_team,
detect_alerts_anomalies,
email,
exporter,
hog_functions,
Expand All @@ -25,6 +26,7 @@
"check_clickhouse_schema_drift",
"demo_create_data",
"demo_reset_master_team",
"detect_alerts_anomalies",
"email",
"exporter",
"hog_functions",
Expand Down
90 changes: 90 additions & 0 deletions posthog/tasks/detect_alerts_anomalies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import structlog
from celery import shared_task
from django.utils import timezone

from posthog.api.services.query import ExecutionMode
from posthog.caching.calculate_results import calculate_for_query_based_insight
from posthog.email import EmailMessage
from posthog.hogql_queries.legacy_compatibility.flagged_conversion_manager import (
conversion_to_query_based,
)
from posthog.models import Alert
from posthog.schema import AnomalyCondition

logger = structlog.get_logger(__name__)


def check_all_alerts() -> None:
alerts = Alert.objects.all().only("id")
for alert in alerts:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know for sure but this also feels like a scaling nightmare... We struggle sometimes to keep up with dashboard / insight refreshes in general and this is another form of refresh, just with a higher demand on reliability. I think this would require strong co-ordination with @PostHog/team-product-analytics to make sure this fits in with their existing plans for improving background refreshing otherwise this will hit scaling issues fast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the internals of Posthog, but in my experience this is the way to do this. I don't have experience with celery, but I have experience with similar tools, it should scale horizontally pretty easily - add a separate queue for these events, increase the number of parallel tasks in flight and add more servers if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would require strong co-ordination with @PostHog/team-product-analytics to make sure this fits in with their existing plans for improving background refreshing otherwise this will hit scaling issues fast.

Just wanted to chime in here. I can take a look at this, but am currently busy with being on support for this sprint. I'll see what we can do.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scaling celery is not the issue, but ClickHouse will struggle and ultimately go down if suddenly 1000 simultaneous queries appear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should scale horizontally pretty easily - add a separate queue for these events, increase the number of parallel tasks in flight and add more servers if needed.

yep, was going to add "Should" is doing a lot of work in this sentence 😅

@webjunkie I'm too far removed from how query code and caching interacts here

we already have one set of jobs that (is|should be) staying on top of having insight results readily available. does this use that cache? we should really overlap them so we have one set of tasks keeping a cache warm and then another that reads the fast access data in that cache for anomaly detection

humans aren't visiting insights once a minute so we know this will generate sustained load.

we should totally, totally build this feature - it's long overdue


i'm not opposed to getting a simple version in just for our team or select beta testers so we can validate the flow, but this 100% needs an internal sponsor since the work of rolling this out and scaling it simply can't be given to an external contributor (it wouldn't be fair or possible 🙈)

i would love to be the internal sponsor but it's both not possible and completely outside of my current wheelhouse

(these concerns might be addressed elsewhere - i've not dug in here at all 🙈)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but ClickHouse will struggle and ultimately go down if suddenly 1000 simultaneous queries appear

Can't I limit the number of celery queries in flight? I understand this will introduce a problem of throughput, but then if the servers can't process N alerts each hour, maybe more read replicas or more servers are needed. I don't have much experience with column oriented databases though, so it's just a speculation.

we already have one set of jobs that (is|should be) staying on top of having insight results readily available. does this use that cache?

🤷 , well the query_runner has some "cache" substrings in it's code, so one could assume... But I don't know

humans aren't visiting insights once a minute so we know this will generate sustained load.

Just to clarify, it's once an hour

but this 100% needs an internal sponsor since the work of rolling this out and scaling it simply can't be given to an external contributor (it wouldn't be fair or possible 🙈)

I completely agree and I would be really happy to have a mentor on this task.

BTW, an interesting data point - Mixpanel limits their number of alerts to 50 per project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will talk among @PostHog/team-product-analytics next week and discuss this regarding ownership and so on.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

humans aren't visiting insights once a minute so we know this will generate sustained load.
Just to clarify, it's once an hour

👍

(same point but thanks for clarification :))

logger.info("scheduling alert", alert_id=alert.id)
_check_alert_task.delay(alert.id)


@shared_task(ignore_result=True)
def _check_alert_task(id: int) -> None:
_check_alert(id)


def _check_alert(id: int) -> None:
alert = Alert.objects.get(pk=id)
nikitaevg marked this conversation as resolved.
Show resolved Hide resolved
insight = alert.insight
with conversion_to_query_based(insight):
calculation_result = calculate_for_query_based_insight(
insight,
execution_mode=ExecutionMode.RECENT_CACHE_CALCULATE_BLOCKING_IF_STALE,
user=None,
)

if not calculation_result.result:
raise RuntimeError(f"no results for alert {alert.id}")

anomaly_condition = AnomalyCondition.model_validate(alert.anomaly_condition)
thresholds = anomaly_condition.absoluteThreshold

result = calculation_result.result[0]
aggregated_value = result["aggregated_value"]
anomalies_descriptions = []

if thresholds.lower is not None and aggregated_value < thresholds.lower:
anomalies_descriptions += [
f"The trend value ({aggregated_value}) is below the lower threshold ({thresholds.lower})"
]
if thresholds.upper is not None and aggregated_value > thresholds.upper:
anomalies_descriptions += [
f"The trend value ({aggregated_value}) is above the upper threshold ({thresholds.upper})"
]

if not anomalies_descriptions:
logger.info("no anomalies", alert_id=alert.id)
return

_send_notifications(alert, anomalies_descriptions)


# TODO: make it a task
def _send_notifications(alert: Alert, anomalies_descriptions: list[str]) -> None:
subject = f"PostHog alert {alert.name} has anomalies"
nikitaevg marked this conversation as resolved.
Show resolved Hide resolved
campaign_key = f"alert-anomaly-notification-{alert.id}-{timezone.now().timestamp()}"
insight_url = f"/project/{alert.team.pk}/insights/{alert.insight.short_id}"
alert_url = f"{insight_url}/alerts/{alert.id}"
message = EmailMessage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is definitely not what we should do for a bunch of reasons:

  1. No way to configure rate of delivery, backoffs, etc.
  2. Email only is not the typical way people want to get alerted of this

We are building a new generic delivery system for the CDP (webhooks etc.) which would be the right place to have a destination and I think this could play into that.

I don't want to pour water on the fire that is getting this work done as its super cool 😅 but I know that immediately we will have configuration and scaling issues here that I'm not sure we want to support.

I'm wondering if instead we could have an in-app only alert for now which we can then later hook up to the delivery service instead?

Copy link
Contributor Author

@nikitaevg nikitaevg Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'd argue here.

No way to configure rate of delivery, backoffs, etc.

It's in my plans - to allow changing the frequency of the notifications. You can check the TODO list here.

Email only is not the typical way people want to get alerted of this

1. Users want email, slack and webhooks. Why not start with email then.
2. Mixpanel provides emails+slack, Amplitude provides emails and webhooks.
3. In my commercial experience emails were the way to notify about alerts.

IMO emails is a good starting point, it's cheap af, but also it's a necessary communication channel for this.

Ok, I misinterpreted this in the first place, you suggest email only is not a typical way. Can't agree or disagree here, I don't know.

I'm wondering if instead we could have an in-app only alert for now which we can then later hook up to the delivery service instead?

Don't quite understand, wdym here? A screen of ongoing alerts? I'd argue that the notifications is the most important part of the alerts module, and honestly I really wouldn't want to be blocked on the CDP development, especially given how cheap sending emails is. Once CDP is launched, I don't think it'd be difficult to migrate, right? I'll do it myself when needed. OTOH, if it's planned to launch soon (this month), I could wait.

I don't want to pour water on the fire that is getting this work done as its super cool

No worries at all, thanks for looking at this!

campaign_key=campaign_key,
subject=subject,
template_name="alert_anomaly",
template_context={
"anomalies_descriptions": anomalies_descriptions,
"insight_url": insight_url,
"insight_name": alert.insight.name,
"alert_url": alert_url,
"alert_name": alert.name,
},
)
targets = list(filter(len, alert.target_value.split(",")))
if not targets:
raise RuntimeError(f"no targets configured for the alert {alert.id}")
for target in targets:
message.add_recipient(email=target)

logger.info(f"Send notifications about {len(anomalies_descriptions)} anomalies", alert_id=alert.id)
message.send()
7 changes: 7 additions & 0 deletions posthog/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
clickhouse_row_count,
clickhouse_send_license_usage,
delete_expired_exported_assets,
detect_alerts_anomalies,
ee_persist_finished_recordings,
find_flags_with_enriched_analytics,
graphile_worker_queue_size,
Expand Down Expand Up @@ -251,6 +252,12 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None:
name="update survey iteration based on date",
)

sender.add_periodic_task(
crontab(hour="*", minute="20"),
webjunkie marked this conversation as resolved.
Show resolved Hide resolved
detect_alerts_anomalies.s(),
name="detect alerts' anomalies and notify about them",
)

if settings.EE_AVAILABLE:
# every interval seconds, we calculate N replay embeddings
# the goal is to process _enough_ every 24 hours that
Expand Down
7 changes: 7 additions & 0 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,13 @@ def update_survey_iteration() -> None:
update_survey_iteration()


@shared_task(ignore_result=True)
def detect_alerts_anomalies() -> None:
from posthog.tasks.detect_alerts_anomalies import check_all_alerts

check_all_alerts()


def recompute_materialized_columns_enabled() -> bool:
from posthog.models.instance_setting import get_instance_setting

Expand Down
Loading
Loading