From 848e04e314420dff0ce7fb17333d16e2ee881eb1 Mon Sep 17 00:00:00 2001 From: William Mak Date: Fri, 22 Nov 2024 16:23:44 -0500 Subject: [PATCH] feat(rpc): Add top events support - This allows the usage of top_events with the useRpc param --- .../endpoints/organization_events_stats.py | 15 ++ src/sentry/search/eap/spans.py | 75 ++++---- src/sentry/search/events/fields.py | 2 +- src/sentry/snuba/spans_rpc.py | 182 +++++++++++++++--- ..._organization_events_stats_span_indexed.py | 151 ++++++++++++++- 5 files changed, 356 insertions(+), 69 deletions(-) diff --git a/src/sentry/api/endpoints/organization_events_stats.py b/src/sentry/api/endpoints/organization_events_stats.py index 867f0df65e27bb..4022ba905bf0a6 100644 --- a/src/sentry/api/endpoints/organization_events_stats.py +++ b/src/sentry/api/endpoints/organization_events_stats.py @@ -288,6 +288,21 @@ def _get_event_stats( comparison_delta: datetime | None, ) -> SnubaTSResult | dict[str, SnubaTSResult]: if top_events > 0: + if use_rpc and dataset == spans_eap: + return spans_rpc.run_top_events_timeseries_query( + params=snuba_params, + query_string=query, + y_axes=query_columns, + groupby=self.get_field_list(organization, request), + orderby=self.get_orderby(request), + limit=top_events, + referrer=referrer, + granularity_secs=rollup, + config=SearchResolverConfig( + auto_fields=False, + use_aggregate_conditions=False, + ), + ) return scoped_dataset.top_events_timeseries( timeseries_columns=query_columns, selected_columns=self.get_field_list(organization, request), diff --git a/src/sentry/search/eap/spans.py b/src/sentry/search/eap/spans.py index 9a7a50f98faecd..619c3c51af9146 100644 --- a/src/sentry/search/eap/spans.py +++ b/src/sentry/search/eap/spans.py @@ -210,44 +210,9 @@ def _resolve_terms(self, terms: event_filter.ParsedTerms) -> TraceItemFilter | N parsed_terms = [] for item in terms: if isinstance(item, event_search.SearchFilter): - resolved_column, context = self.resolve_column(item.key.name) - raw_value = item.value.raw_value - if item.value.is_wildcard(): - if item.operator == "=": - operator = ComparisonFilter.OP_LIKE - elif item.operator == "!=": - operator = ComparisonFilter.OP_NOT_LIKE - else: - raise InvalidSearchQuery( - f"Cannot use a wildcard with a {item.operator} filter" - ) - # Slashes have to be double escaped so they are - # interpreted as a string literal. - raw_value = ( - str(item.value.raw_value) - .replace("\\", "\\\\") - .replace("%", "\\%") - .replace("_", "\\_") - .replace("*", "%") - ) - elif item.operator in constants.OPERATOR_MAP: - operator = constants.OPERATOR_MAP[item.operator] - else: - raise InvalidSearchQuery(f"Unknown operator: {item.operator}") - if isinstance(resolved_column.proto_definition, AttributeKey): - parsed_terms.append( - TraceItemFilter( - comparison_filter=ComparisonFilter( - key=resolved_column.proto_definition, - op=operator, - value=self._resolve_search_value( - resolved_column, item.operator, raw_value - ), - ) - ) - ) - else: - raise NotImplementedError("Can't filter on aggregates yet") + resolved_term = self.resolve_term(cast(event_search.SearchFilter, item)) + if resolved_term is not None: + parsed_terms.append(resolved_term) else: if self.config.use_aggregate_conditions: raise NotImplementedError("Can't filter on aggregates yet") @@ -259,6 +224,40 @@ def _resolve_terms(self, terms: event_filter.ParsedTerms) -> TraceItemFilter | N else: return None + def resolve_term(self, term: event_search.SearchFilter) -> TraceItemFilter | None: + resolved_column, context = self.resolve_column(term.key.name) + raw_value = term.value.raw_value + if term.value.is_wildcard(): + if term.operator == "=": + operator = ComparisonFilter.OP_LIKE + elif term.operator == "!=": + operator = ComparisonFilter.OP_NOT_LIKE + else: + raise InvalidSearchQuery(f"Cannot use a wildcard with a {term.operator} filter") + # Slashes have to be double escaped so they are + # interpreted as a string literal. + raw_value = ( + str(term.value.raw_value) + .replace("\\", "\\\\") + .replace("%", "\\%") + .replace("_", "\\_") + .replace("*", "%") + ) + elif term.operator in constants.OPERATOR_MAP: + operator = constants.OPERATOR_MAP[term.operator] + else: + raise InvalidSearchQuery(f"Unknown operator: {term.operator}") + if isinstance(resolved_column.proto_definition, AttributeKey): + return TraceItemFilter( + comparison_filter=ComparisonFilter( + key=resolved_column.proto_definition, + op=operator, + value=self._resolve_search_value(resolved_column, term.operator, raw_value), + ) + ) + else: + raise NotImplementedError("Can't filter on aggregates yet") + def _resolve_search_value( self, column: ResolvedColumn, diff --git a/src/sentry/search/events/fields.py b/src/sentry/search/events/fields.py index 206948b42cbb51..63ef46a1f2993f 100644 --- a/src/sentry/search/events/fields.py +++ b/src/sentry/search/events/fields.py @@ -1143,7 +1143,7 @@ def _normalize(self, value: str) -> str: if match and match.group("type") == "number": return value if not snuba_column: - raise InvalidFunctionArgument(f"{value} is not a valid column") + raise InvalidFunctionArgument(f"{value} is not a valid column?") elif snuba_column not in ["time", "timestamp", "duration"]: raise InvalidFunctionArgument(f"{value} is not a numeric column") return snuba_column diff --git a/src/sentry/snuba/spans_rpc.py b/src/sentry/snuba/spans_rpc.py index 973d5b4ea878af..a9b221b9b501e0 100644 --- a/src/sentry/snuba/spans_rpc.py +++ b/src/sentry/snuba/spans_rpc.py @@ -1,16 +1,19 @@ import logging from typing import Any -from sentry_protos.snuba.v1.endpoint_time_series_pb2 import TimeSeriesRequest, TimeSeriesResponse +from sentry_protos.snuba.v1.endpoint_time_series_pb2 import TimeSeries, TimeSeriesRequest from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import Column, TraceItemTableRequest from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeAggregation, AttributeKey +from sentry_protos.snuba.v1.trace_item_filter_pb2 import AndFilter, OrFilter, TraceItemFilter +from sentry.api.event_search import SearchFilter, SearchKey, SearchValue from sentry.search.eap.columns import ResolvedColumn, ResolvedFunction from sentry.search.eap.constants import FLOAT, INT, STRING from sentry.search.eap.spans import SearchResolver from sentry.search.eap.types import SearchResolverConfig -from sentry.search.events.fields import get_function_alias +from sentry.search.events.fields import get_function_alias, is_function from sentry.search.events.types import EventsMeta, EventsResponse, SnubaData, SnubaParams +from sentry.snuba.discover import OTHER_KEY, create_result_key from sentry.utils import snuba_rpc from sentry.utils.snuba import SnubaTSResult @@ -33,9 +36,12 @@ def run_table_query( limit: int, referrer: str, config: SearchResolverConfig, + search_resolver: SearchResolver | None = None, ) -> EventsResponse: """Make the query""" - resolver = SearchResolver(params=params, config=config) + resolver = ( + SearchResolver(params=params, config=config) if search_resolver is None else search_resolver + ) meta = resolver.resolve_meta(referrer=referrer) query = resolver.resolve_query(query_string) columns, contexts = resolver.resolve_columns(selected_columns) @@ -120,12 +126,18 @@ def get_timeseries_query( referrer: str, config: SearchResolverConfig, granularity_secs: int, + extra_conditions: TraceItemFilter | None = None, ) -> TimeSeriesRequest: resolver = SearchResolver(params=params, config=config) meta = resolver.resolve_meta(referrer=referrer) query = resolver.resolve_query(query_string) (aggregations, _) = resolver.resolve_aggregates(y_axes) (groupbys, _) = resolver.resolve_columns(groupby) + if extra_conditions is not None: + if query is not None: + query = TraceItemFilter(and_filter=AndFilter(filters=[query, extra_conditions])) + else: + query = extra_conditions return TimeSeriesRequest( meta=meta, @@ -161,7 +173,50 @@ def run_timeseries_query( rpc_response = snuba_rpc.timeseries_rpc(rpc_request) """Process the results""" - return _process_timeseries(rpc_response, params, granularity_secs) + result: None | list[dict[str, Any]] = None + for timeseries in rpc_response.result_timeseries: + processed = _process_timeseries(timeseries, params, granularity_secs) + if result is None: + result = processed + else: + for existing, new in zip(result, processed): + existing.update(new) + return SnubaTSResult({"data": result}, params.start, params.end, granularity_secs) + + +def build_top_event_conditions( + resolver: SearchResolver, top_events: EventsResponse, groupby_columns: list[str] +) -> Any: + conditions = [] + other_conditions = [] + for event in top_events["data"]: + row_conditions = [] + other_row_conditions = [] + for key in groupby_columns: + resolved_term = resolver.resolve_term( + SearchFilter( + key=SearchKey(name=key), + operator="=", + value=SearchValue(raw_value=event[key]), + ) + ) + if resolved_term is not None: + row_conditions.append(resolved_term) + other_term = resolver.resolve_term( + SearchFilter( + key=SearchKey(name=key), + operator="!=", + value=SearchValue(raw_value=event[key]), + ) + ) + if other_term is not None: + other_row_conditions.append(other_term) + conditions.append(TraceItemFilter(and_filter=AndFilter(filters=row_conditions))) + other_conditions.append(TraceItemFilter(or_filter=OrFilter(filters=other_row_conditions))) + return ( + TraceItemFilter(or_filter=OrFilter(filters=conditions)), + TraceItemFilter(and_filter=AndFilter(filters=other_conditions)), + ) def run_top_events_timeseries_query( @@ -169,40 +224,115 @@ def run_top_events_timeseries_query( query_string: str, y_axes: list[str], groupby: list[str], - orderby: list[str], + orderby: list[str] | None, + limit: int, + referrer: str, + granularity_secs: int, + config: SearchResolverConfig, ) -> Any: """We intentionally duplicate run_timeseries_query code here to reduce the complexity of needing multiple helper functions that both would call This is because at time of writing, the query construction is very straightforward, if that changes perhaps we can change this""" - pass + """Make a table query first to get what we need to filter by""" + search_resolver = SearchResolver(params, config) + top_events = run_table_query( + params, + query_string, + groupby + y_axes, + orderby, + 0, + limit, + referrer, + config, + search_resolver=search_resolver, + ) + groupby_columns = [col for col in groupby if not is_function(col)] + top_conditions, other_conditions = build_top_event_conditions( + search_resolver, top_events, groupby_columns + ) """Make the query""" - # maker = SearchResolver(params) - # top_events = run_table_query() with process_results off - # new_conditions = construct conditions based on top_events - # resolved_query = And(new_conditions, maker.resolve_query(query_string)) - # groupby, contexts = maker.resolve_columns(groupby) - # yaxes = maker.resolve_aggregate(y_axes) + rpc_request = get_timeseries_query( + params, + query_string, + y_axes, + groupby, + referrer, + config, + granularity_secs, + extra_conditions=top_conditions, + ) + other_request = get_timeseries_query( + params, + query_string, + y_axes, + groupby, + referrer, + config, + granularity_secs, + extra_conditions=other_conditions, + ) """Run the query""" - # rpc = timeseries_RPC(columns=[column.proto_definition for column in groupby], query=query) + rpc_response = snuba_rpc.timeseries_rpc(rpc_request) + other_response = snuba_rpc.timeseries_rpc(other_request) """Process the results""" - # result = rpc.run() - # return _process_timeseries(result, columns) + map_result_key_to_timeseries = {} + for timeseries in rpc_response.result_timeseries: + groupby_attributes = timeseries.group_by_attributes + remapped_groupby = {} + # Remap internal attrs back to public ones + for col in groupby_columns: + resolved_groupby, _ = search_resolver.resolve_attribute(col) + remapped_groupby[resolved_groupby.public_alias] = groupby_attributes[ + resolved_groupby.internal_name + ] + result_key = create_result_key(remapped_groupby, groupby_columns, {}) + map_result_key_to_timeseries[result_key] = timeseries + final_result = {} + # Top Events actually has the order, so we need to iterate through it, regenerate the result keys + for index, row in enumerate(top_events["data"]): + result_key = create_result_key(row, groupby_columns, {}) + final_result[result_key] = SnubaTSResult( + { + "data": _process_timeseries( + map_result_key_to_timeseries[result_key], + params, + granularity_secs, + ), + "order": index, + }, + params.start, + params.end, + granularity_secs, + ) + final_result[OTHER_KEY] = SnubaTSResult( + { + "data": _process_timeseries( + other_response.result_timeseries[0], + params, + granularity_secs, + ), + "order": limit, + }, + params.start, + params.end, + granularity_secs, + ) + return final_result def _process_timeseries( - rpc_response: TimeSeriesResponse, params: SnubaParams, granularity_secs: int -) -> SnubaTSResult: + timeseries: TimeSeries, params: SnubaParams, granularity_secs: int, order: int | None = None +) -> list[dict[str, Any]]: result: SnubaData = [] - for timeseries in rpc_response.result_timeseries: - # Timeseries serialization expects the function alias (eg. `count` not `count()`) - label = get_function_alias(timeseries.label) - if len(result) < len(timeseries.buckets): - for bucket in timeseries.buckets: - result.append({"time": bucket.seconds}) - for index, data_point in enumerate(timeseries.data_points): - result[index][label] = data_point.data + # Timeseries serialization expects the function alias (eg. `count` not `count()`) + label = get_function_alias(timeseries.label) + if len(result) < len(timeseries.buckets): + for bucket in timeseries.buckets: + result.append({"time": bucket.seconds}) + for index, data_point in enumerate(timeseries.data_points): + result[index][label] = data_point.data - return SnubaTSResult({"data": result}, params.start, params.end, granularity_secs) + return result diff --git a/tests/snuba/api/endpoints/test_organization_events_stats_span_indexed.py b/tests/snuba/api/endpoints/test_organization_events_stats_span_indexed.py index 40d2a2b49a4b64..acc5cc7c6ddf53 100644 --- a/tests/snuba/api/endpoints/test_organization_events_stats_span_indexed.py +++ b/tests/snuba/api/endpoints/test_organization_events_stats_span_indexed.py @@ -74,6 +74,116 @@ def test_count(self): for test in zip(event_counts, rows): assert test[1][1][0]["count"] == test[0] + def test_count_unique(self): + event_counts = [6, 0, 6, 3, 0, 3] + for hour, count in enumerate(event_counts): + for minute in range(count): + self.store_spans( + [ + self.create_span( + { + "description": "foo", + "sentry_tags": {"status": "success"}, + "tags": {"foo": f"foo-{minute}"}, + }, + start_ts=self.day_ago + timedelta(hours=hour, minutes=minute), + ), + ], + is_eap=self.is_eap, + ) + + response = self._do_request( + data={ + "start": self.day_ago, + "end": self.day_ago + timedelta(hours=6), + "interval": "1h", + "yAxis": "count_unique(foo)", + "project": self.project.id, + "dataset": self.dataset, + }, + ) + assert response.status_code == 200, response.content + data = response.data["data"] + assert len(data) == 6 + assert response.data["meta"]["dataset"] == self.dataset + + rows = data[0:6] + for test in zip(event_counts, rows): + assert test[1][1][0]["count"] == test[0] + + def test_p95(self): + event_durations = [6, 0, 6, 3, 0, 3] + for hour, duration in enumerate(event_durations): + self.store_spans( + [ + self.create_span( + {"description": "foo", "sentry_tags": {"status": "success"}}, + duration=duration, + start_ts=self.day_ago + timedelta(hours=hour, minutes=1), + ), + ], + is_eap=self.is_eap, + ) + + response = self._do_request( + data={ + "start": self.day_ago, + "end": self.day_ago + timedelta(hours=6), + "interval": "1h", + "yAxis": "p95()", + "project": self.project.id, + "dataset": self.dataset, + }, + ) + assert response.status_code == 200, response.content + data = response.data["data"] + assert len(data) == 6 + assert response.data["meta"]["dataset"] == self.dataset + + rows = data[0:6] + for test in zip(event_durations, rows): + assert test[1][1][0]["count"] == test[0] + + def test_multiaxis(self): + event_counts = [6, 0, 6, 3, 0, 3] + for hour, count in enumerate(event_counts): + for minute in range(count): + self.store_spans( + [ + self.create_span( + { + "description": "foo", + "sentry_tags": {"status": "success"}, + }, + duration=count, + start_ts=self.day_ago + timedelta(hours=hour, minutes=minute), + ), + ], + is_eap=self.is_eap, + ) + + response = self._do_request( + data={ + "start": self.day_ago, + "end": self.day_ago + timedelta(hours=6), + "interval": "1h", + "yAxis": ["count()", "p95()"], + "project": self.project.id, + "dataset": self.dataset, + }, + ) + assert response.status_code == 200, response.content + count_data = response.data["count()"]["data"] + p95_data = response.data["p95()"]["data"] + assert len(count_data) == len(p95_data) == 6 + + count_rows = count_data[0:6] + for test in zip(event_counts, count_rows): + assert test[1][1][0]["count"] == test[0] + p95_rows = p95_data[0:6] + for test in zip(event_counts, p95_rows): + assert test[1][1][0]["count"] == test[0] + # These throughput tests should roughly match the ones in OrganizationEventsStatsEndpointTest @pytest.mark.querybuilder def test_throughput_epm_hour_rollup(self): @@ -262,14 +372,48 @@ def test_top_events(self): class OrganizationEventsEAPSpanEndpointTest(OrganizationEventsStatsSpansMetricsEndpointTest): is_eap = True + def test_count_extrapolation(self): + event_counts = [6, 0, 6, 3, 0, 3] + for hour, count in enumerate(event_counts): + for minute in range(count): + self.store_spans( + [ + self.create_span( + { + "description": "foo", + "sentry_tags": {"status": "success"}, + "measurements": {"client_sample_rate": {"value": 0.1}}, + }, + start_ts=self.day_ago + timedelta(hours=hour, minutes=minute), + ), + ], + is_eap=self.is_eap, + ) + + response = self._do_request( + data={ + "start": self.day_ago, + "end": self.day_ago + timedelta(hours=6), + "interval": "1h", + "yAxis": "count()", + "project": self.project.id, + "dataset": self.dataset, + }, + ) + assert response.status_code == 200, response.content + data = response.data["data"] + assert len(data) == 6 + assert response.data["meta"]["dataset"] == self.dataset + + rows = data[0:6] + for test in zip(event_counts, rows): + assert test[1][1][0]["count"] == test[0] * 10 + class OrganizationEventsEAPRPCSpanEndpointTest(OrganizationEventsEAPSpanEndpointTest): is_eap = True is_rpc = True - def test_count(self): - super().test_count() - @pytest.mark.xfail(reason="epm not implemented yet") def test_throughput_epm_hour_rollup(self): super().test_throughput_epm_hour_rollup() @@ -286,6 +430,5 @@ def test_throughput_epm_hour_rollup_offset_of_hour(self): def test_throughput_eps_minute_rollup(self): super().test_throughput_eps_minute_rollup() - @pytest.mark.xfail(reason="wip: not implemented yet") def test_top_events(self): super().test_top_events()