Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: monitoring for endpoints #24470

Merged
merged 5 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions posthog/api/dashboards/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from posthog.models.tagged_item import TaggedItem
from posthog.models.user import User
from posthog.user_permissions import UserPermissionsSerializerMixin
from posthog.api.monitoring import monitor, Feature

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -161,6 +162,7 @@ def validate_filters(self, value) -> dict:

return value

@monitor(feature=Feature.DASHBOARD, endpoint="dashboard", method="POST")
def create(self, validated_data: dict, *args: Any, **kwargs: Any) -> Dashboard:
request = self.context["request"]
validated_data["created_by"] = request.user
Expand Down Expand Up @@ -260,6 +262,7 @@ def _deep_duplicate_tiles(self, dashboard: Dashboard, existing_tile: DashboardTi
color=existing_tile.color,
)

@monitor(feature=Feature.DASHBOARD, endpoint="update", method="PATCH")
def update(self, instance: Dashboard, validated_data: dict, *args: Any, **kwargs: Any) -> Dashboard:
can_user_restrict = self.user_permissions.dashboard(instance).can_restrict
if "restriction_level" in validated_data and not can_user_restrict:
Expand Down Expand Up @@ -451,6 +454,7 @@ def safely_get_queryset(self, queryset) -> QuerySet:

return queryset

@monitor(feature=Feature.DASHBOARD, endpoint="dashboard", method="GET")
def retrieve(self, request: Request, *args: Any, **kwargs: Any) -> Response:
pk = kwargs["pk"]
queryset = self.get_queryset()
Expand Down
4 changes: 4 additions & 0 deletions posthog/api/insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
relative_date_parse,
str_to_bool,
)
from posthog.api.monitoring import monitor, Feature

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -329,6 +330,7 @@ class Meta:
"is_cached",
)

@monitor(feature=Feature.INSIGHT, endpoint="insight", method="POST")
def create(self, validated_data: dict, *args: Any, **kwargs: Any) -> Insight:
request = self.context["request"]
tags = validated_data.pop("tags", None) # tags are created separately as global tag relationships
Expand Down Expand Up @@ -368,6 +370,7 @@ def create(self, validated_data: dict, *args: Any, **kwargs: Any) -> Insight:
return insight

@transaction.atomic()
@monitor(feature=Feature.INSIGHT, endpoint="insight", method="POST")
def update(self, instance: Insight, validated_data: dict, **kwargs) -> Insight:
dashboards_before_change: list[Union[str, dict]] = []
try:
Expand Down Expand Up @@ -786,6 +789,7 @@ def _filter_request(self, request: request.Request, queryset: QuerySet) -> Query
),
],
)
@monitor(feature=Feature.INSIGHT, endpoint="insight", method="GET")
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
serializer_context = self.get_serializer_context()
Expand Down
38 changes: 38 additions & 0 deletions posthog/api/monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from enum import StrEnum
from prometheus_client import Counter
from sentry_sdk import set_tag
from collections.abc import Callable


class Feature(StrEnum):
COHORT = "cohort"
DASHBOARD = "dashboard"
INSIGHT = "insight"
QUERY = "query"


API_REQUESTS_COUNTER = Counter(
"api_requests",
"Number of API requests",
labelnames=["endpoint", "method"],
)


def monitor(*, feature: Feature | None, endpoint: str, method: str) -> Callable:
"""
Decorator to increment the API requests counter
Sets sentry tags for the endpoint and method
"""

def decorator(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
API_REQUESTS_COUNTER.labels(endpoint=endpoint, method=method).inc()

if feature:
set_tag("feature", feature.value)

return func(*args, **kwargs)

return wrapper

return decorator
4 changes: 4 additions & 0 deletions posthog/api/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
TeamRateThrottle,
)
from posthog.schema import QueryRequest, QueryResponseAlternative, QueryStatusResponse
from posthog.api.monitoring import monitor, Feature


class QueryThrottle(TeamRateThrottle):
Expand Down Expand Up @@ -59,6 +60,7 @@ def get_throttles(self):
200: QueryResponseAlternative,
},
)
@monitor(feature=Feature.QUERY, endpoint="query", method="POST")
def create(self, request, *args, **kwargs) -> Response:
data = self.get_model(request.data, QueryRequest)
client_query_id = data.client_query_id or uuid.uuid4().hex
Expand Down Expand Up @@ -99,6 +101,7 @@ def create(self, request, *args, **kwargs) -> Response:
description="(Experimental)",
responses={200: QueryStatusResponse},
)
@monitor(feature=Feature.QUERY, endpoint="query", method="GET")
def retrieve(self, request: Request, pk=None, *args, **kwargs) -> JsonResponse:
show_progress: bool = request.query_params.get("show_progress", False) == "true"
show_progress = (
Expand All @@ -124,6 +127,7 @@ def retrieve(self, request: Request, pk=None, *args, **kwargs) -> JsonResponse:
204: OpenApiResponse(description="Query cancelled"),
},
)
@monitor(feature=Feature.QUERY, endpoint="query", method="DELETE")
def destroy(self, request, pk=None, *args, **kwargs):
cancel_query(self.team.pk, pk)
return Response(status=204)
Expand Down
20 changes: 14 additions & 6 deletions posthog/clickhouse/client/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,25 @@
import sqlparse
from clickhouse_driver import Client as SyncClient
from django.conf import settings as app_settings
from statshog.defaults.django import statsd

from posthog.clickhouse.client.connection import Workload, get_pool
from posthog.clickhouse.client.escape import substitute_params
from posthog.clickhouse.query_tagging import get_query_tag_value, get_query_tags
from posthog.errors import wrap_query_error
from posthog.settings import TEST
from posthog.utils import generate_short_id, patchable
from prometheus_client import Counter, Gauge

QUERY_ERROR_COUNTER = Counter(
"clickhouse_query_failure",
"Query execution failure signal is dispatched when a query fails.",
labelnames=["exception_type"],
)

QUERY_EXECUTION_TIME_GAUGE = Gauge(
"clickhouse_query_execution_time",
"Clickhouse query execution time",
)

InsertParams = Union[list, tuple, types.GeneratorType]
NonInsertParams = dict[str, Any]
Expand Down Expand Up @@ -128,16 +139,13 @@ def sync_execute(
)
except Exception as e:
err = wrap_query_error(e)
statsd.incr(
"clickhouse_sync_execution_failure",
tags={"failed": True, "reason": type(err).__name__},
)
QUERY_ERROR_COUNTER.labels(exception_type=type(err).__name__).inc()

raise err from e
finally:
execution_time = perf_counter() - start_time

statsd.timing("clickhouse_sync_execution_time", execution_time * 1000.0)
QUERY_EXECUTION_TIME_GAUGE.set(execution_time * 1000.0)

if query_counter := getattr(thread_local_storage, "query_counter", None):
query_counter.total_query_time += execution_time
Expand Down
1 change: 1 addition & 0 deletions posthog/hogql_queries/query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ def run(
tag_queries(cache_key=cache_key)
tag_queries(sentry_trace=get_traceparent())
set_tag("cache_key", cache_key)
set_tag("query_type", self.query.__class__.__name__)
if insight_id:
tag_queries(insight_id=insight_id)
set_tag("insight_id", str(insight_id))
Expand Down
10 changes: 6 additions & 4 deletions posthog/tasks/calculate_cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from posthog.models.user import User
from prometheus_client import Gauge
from sentry_sdk import set_tag
from posthog.api.monitoring import Feature

COHORT_RECALCULATIONS_BACKLOG_GAUGE = Gauge(
"cohort_recalculations_backlog",
Expand Down Expand Up @@ -55,10 +56,6 @@ def calculate_cohorts() -> None:


def update_cohort(cohort: Cohort, *, initiating_user: Optional[User]) -> None:
set_tag("component", "cohort")
set_tag("cohort_id", cohort.id)
set_tag("team_id", cohort.team.id)

pending_version = get_and_update_pending_version(cohort)
calculate_cohort_ch.delay(cohort.id, pending_version, initiating_user.id if initiating_user else None)

Expand All @@ -72,6 +69,11 @@ def clear_stale_cohort(cohort_id: int, before_version: int) -> None:
@shared_task(ignore_result=True, max_retries=2)
def calculate_cohort_ch(cohort_id: int, pending_version: int, initiating_user_id: Optional[int] = None) -> None:
cohort: Cohort = Cohort.objects.get(pk=cohort_id)

set_tag("feature", Feature.COHORT.value)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_tag call needs to be in the same thread (celery task) for sentry scoping to work

set_tag("cohort_id", cohort.id)
set_tag("team_id", cohort.team.id)

cohort.calculate_people_ch(pending_version, initiating_user_id=initiating_user_id)


Expand Down
Loading