Skip to content

Commit

Permalink
feat(rpc): Add top events support
Browse files Browse the repository at this point in the history
- This allows the usage of top_events with the useRpc param
  • Loading branch information
wmak committed Nov 22, 2024
1 parent 327f975 commit 848e04e
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 69 deletions.
15 changes: 15 additions & 0 deletions src/sentry/api/endpoints/organization_events_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,21 @@ def _get_event_stats(
comparison_delta: datetime | None,
) -> SnubaTSResult | dict[str, SnubaTSResult]:
if top_events > 0:
if use_rpc and dataset == spans_eap:
return spans_rpc.run_top_events_timeseries_query(
params=snuba_params,
query_string=query,
y_axes=query_columns,
groupby=self.get_field_list(organization, request),
orderby=self.get_orderby(request),
limit=top_events,
referrer=referrer,
granularity_secs=rollup,
config=SearchResolverConfig(
auto_fields=False,
use_aggregate_conditions=False,
),
)
return scoped_dataset.top_events_timeseries(
timeseries_columns=query_columns,
selected_columns=self.get_field_list(organization, request),
Expand Down
75 changes: 37 additions & 38 deletions src/sentry/search/eap/spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,44 +210,9 @@ def _resolve_terms(self, terms: event_filter.ParsedTerms) -> TraceItemFilter | N
parsed_terms = []
for item in terms:
if isinstance(item, event_search.SearchFilter):
resolved_column, context = self.resolve_column(item.key.name)
raw_value = item.value.raw_value
if item.value.is_wildcard():
if item.operator == "=":
operator = ComparisonFilter.OP_LIKE
elif item.operator == "!=":
operator = ComparisonFilter.OP_NOT_LIKE
else:
raise InvalidSearchQuery(
f"Cannot use a wildcard with a {item.operator} filter"
)
# Slashes have to be double escaped so they are
# interpreted as a string literal.
raw_value = (
str(item.value.raw_value)
.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("_", "\\_")
.replace("*", "%")
)
elif item.operator in constants.OPERATOR_MAP:
operator = constants.OPERATOR_MAP[item.operator]
else:
raise InvalidSearchQuery(f"Unknown operator: {item.operator}")
if isinstance(resolved_column.proto_definition, AttributeKey):
parsed_terms.append(
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=resolved_column.proto_definition,
op=operator,
value=self._resolve_search_value(
resolved_column, item.operator, raw_value
),
)
)
)
else:
raise NotImplementedError("Can't filter on aggregates yet")
resolved_term = self.resolve_term(cast(event_search.SearchFilter, item))
if resolved_term is not None:
parsed_terms.append(resolved_term)
else:
if self.config.use_aggregate_conditions:
raise NotImplementedError("Can't filter on aggregates yet")
Expand All @@ -259,6 +224,40 @@ def _resolve_terms(self, terms: event_filter.ParsedTerms) -> TraceItemFilter | N
else:
return None

def resolve_term(self, term: event_search.SearchFilter) -> TraceItemFilter | None:
resolved_column, context = self.resolve_column(term.key.name)
raw_value = term.value.raw_value
if term.value.is_wildcard():
if term.operator == "=":
operator = ComparisonFilter.OP_LIKE
elif term.operator == "!=":
operator = ComparisonFilter.OP_NOT_LIKE
else:
raise InvalidSearchQuery(f"Cannot use a wildcard with a {term.operator} filter")
# Slashes have to be double escaped so they are
# interpreted as a string literal.
raw_value = (
str(term.value.raw_value)
.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("_", "\\_")
.replace("*", "%")
)
elif term.operator in constants.OPERATOR_MAP:
operator = constants.OPERATOR_MAP[term.operator]
else:
raise InvalidSearchQuery(f"Unknown operator: {term.operator}")
if isinstance(resolved_column.proto_definition, AttributeKey):
return TraceItemFilter(
comparison_filter=ComparisonFilter(
key=resolved_column.proto_definition,
op=operator,
value=self._resolve_search_value(resolved_column, term.operator, raw_value),
)
)
else:
raise NotImplementedError("Can't filter on aggregates yet")

def _resolve_search_value(
self,
column: ResolvedColumn,
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/search/events/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ def _normalize(self, value: str) -> str:
if match and match.group("type") == "number":
return value
if not snuba_column:
raise InvalidFunctionArgument(f"{value} is not a valid column")
raise InvalidFunctionArgument(f"{value} is not a valid column?")
elif snuba_column not in ["time", "timestamp", "duration"]:
raise InvalidFunctionArgument(f"{value} is not a numeric column")
return snuba_column
Expand Down
182 changes: 156 additions & 26 deletions src/sentry/snuba/spans_rpc.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import logging
from typing import Any

from sentry_protos.snuba.v1.endpoint_time_series_pb2 import TimeSeriesRequest, TimeSeriesResponse
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import TimeSeries, TimeSeriesRequest
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import Column, TraceItemTableRequest
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeAggregation, AttributeKey
from sentry_protos.snuba.v1.trace_item_filter_pb2 import AndFilter, OrFilter, TraceItemFilter

from sentry.api.event_search import SearchFilter, SearchKey, SearchValue
from sentry.search.eap.columns import ResolvedColumn, ResolvedFunction
from sentry.search.eap.constants import FLOAT, INT, STRING
from sentry.search.eap.spans import SearchResolver
from sentry.search.eap.types import SearchResolverConfig
from sentry.search.events.fields import get_function_alias
from sentry.search.events.fields import get_function_alias, is_function
from sentry.search.events.types import EventsMeta, EventsResponse, SnubaData, SnubaParams
from sentry.snuba.discover import OTHER_KEY, create_result_key
from sentry.utils import snuba_rpc
from sentry.utils.snuba import SnubaTSResult

Expand All @@ -33,9 +36,12 @@ def run_table_query(
limit: int,
referrer: str,
config: SearchResolverConfig,
search_resolver: SearchResolver | None = None,
) -> EventsResponse:
"""Make the query"""
resolver = SearchResolver(params=params, config=config)
resolver = (
SearchResolver(params=params, config=config) if search_resolver is None else search_resolver
)
meta = resolver.resolve_meta(referrer=referrer)
query = resolver.resolve_query(query_string)
columns, contexts = resolver.resolve_columns(selected_columns)
Expand Down Expand Up @@ -120,12 +126,18 @@ def get_timeseries_query(
referrer: str,
config: SearchResolverConfig,
granularity_secs: int,
extra_conditions: TraceItemFilter | None = None,
) -> TimeSeriesRequest:
resolver = SearchResolver(params=params, config=config)
meta = resolver.resolve_meta(referrer=referrer)
query = resolver.resolve_query(query_string)
(aggregations, _) = resolver.resolve_aggregates(y_axes)
(groupbys, _) = resolver.resolve_columns(groupby)
if extra_conditions is not None:
if query is not None:
query = TraceItemFilter(and_filter=AndFilter(filters=[query, extra_conditions]))
else:
query = extra_conditions

return TimeSeriesRequest(
meta=meta,
Expand Down Expand Up @@ -161,48 +173,166 @@ def run_timeseries_query(
rpc_response = snuba_rpc.timeseries_rpc(rpc_request)

"""Process the results"""
return _process_timeseries(rpc_response, params, granularity_secs)
result: None | list[dict[str, Any]] = None
for timeseries in rpc_response.result_timeseries:
processed = _process_timeseries(timeseries, params, granularity_secs)
if result is None:
result = processed
else:
for existing, new in zip(result, processed):
existing.update(new)
return SnubaTSResult({"data": result}, params.start, params.end, granularity_secs)


def build_top_event_conditions(
resolver: SearchResolver, top_events: EventsResponse, groupby_columns: list[str]
) -> Any:
conditions = []
other_conditions = []
for event in top_events["data"]:
row_conditions = []
other_row_conditions = []
for key in groupby_columns:
resolved_term = resolver.resolve_term(
SearchFilter(
key=SearchKey(name=key),
operator="=",
value=SearchValue(raw_value=event[key]),
)
)
if resolved_term is not None:
row_conditions.append(resolved_term)
other_term = resolver.resolve_term(
SearchFilter(
key=SearchKey(name=key),
operator="!=",
value=SearchValue(raw_value=event[key]),
)
)
if other_term is not None:
other_row_conditions.append(other_term)
conditions.append(TraceItemFilter(and_filter=AndFilter(filters=row_conditions)))
other_conditions.append(TraceItemFilter(or_filter=OrFilter(filters=other_row_conditions)))
return (
TraceItemFilter(or_filter=OrFilter(filters=conditions)),
TraceItemFilter(and_filter=AndFilter(filters=other_conditions)),
)


def run_top_events_timeseries_query(
params: SnubaParams,
query_string: str,
y_axes: list[str],
groupby: list[str],
orderby: list[str],
orderby: list[str] | None,
limit: int,
referrer: str,
granularity_secs: int,
config: SearchResolverConfig,
) -> Any:
"""We intentionally duplicate run_timeseries_query code here to reduce the complexity of needing multiple helper
functions that both would call
This is because at time of writing, the query construction is very straightforward, if that changes perhaps we can
change this"""
pass
"""Make a table query first to get what we need to filter by"""
search_resolver = SearchResolver(params, config)
top_events = run_table_query(
params,
query_string,
groupby + y_axes,
orderby,
0,
limit,
referrer,
config,
search_resolver=search_resolver,
)
groupby_columns = [col for col in groupby if not is_function(col)]
top_conditions, other_conditions = build_top_event_conditions(
search_resolver, top_events, groupby_columns
)
"""Make the query"""
# maker = SearchResolver(params)
# top_events = run_table_query() with process_results off
# new_conditions = construct conditions based on top_events
# resolved_query = And(new_conditions, maker.resolve_query(query_string))
# groupby, contexts = maker.resolve_columns(groupby)
# yaxes = maker.resolve_aggregate(y_axes)
rpc_request = get_timeseries_query(
params,
query_string,
y_axes,
groupby,
referrer,
config,
granularity_secs,
extra_conditions=top_conditions,
)
other_request = get_timeseries_query(
params,
query_string,
y_axes,
groupby,
referrer,
config,
granularity_secs,
extra_conditions=other_conditions,
)

"""Run the query"""
# rpc = timeseries_RPC(columns=[column.proto_definition for column in groupby], query=query)
rpc_response = snuba_rpc.timeseries_rpc(rpc_request)
other_response = snuba_rpc.timeseries_rpc(other_request)

"""Process the results"""
# result = rpc.run()
# return _process_timeseries(result, columns)
map_result_key_to_timeseries = {}
for timeseries in rpc_response.result_timeseries:
groupby_attributes = timeseries.group_by_attributes
remapped_groupby = {}
# Remap internal attrs back to public ones
for col in groupby_columns:
resolved_groupby, _ = search_resolver.resolve_attribute(col)
remapped_groupby[resolved_groupby.public_alias] = groupby_attributes[
resolved_groupby.internal_name
]
result_key = create_result_key(remapped_groupby, groupby_columns, {})
map_result_key_to_timeseries[result_key] = timeseries
final_result = {}
# Top Events actually has the order, so we need to iterate through it, regenerate the result keys
for index, row in enumerate(top_events["data"]):
result_key = create_result_key(row, groupby_columns, {})
final_result[result_key] = SnubaTSResult(
{
"data": _process_timeseries(
map_result_key_to_timeseries[result_key],
params,
granularity_secs,
),
"order": index,
},
params.start,
params.end,
granularity_secs,
)
final_result[OTHER_KEY] = SnubaTSResult(
{
"data": _process_timeseries(
other_response.result_timeseries[0],
params,
granularity_secs,
),
"order": limit,
},
params.start,
params.end,
granularity_secs,
)
return final_result


def _process_timeseries(
rpc_response: TimeSeriesResponse, params: SnubaParams, granularity_secs: int
) -> SnubaTSResult:
timeseries: TimeSeries, params: SnubaParams, granularity_secs: int, order: int | None = None
) -> list[dict[str, Any]]:
result: SnubaData = []
for timeseries in rpc_response.result_timeseries:
# Timeseries serialization expects the function alias (eg. `count` not `count()`)
label = get_function_alias(timeseries.label)
if len(result) < len(timeseries.buckets):
for bucket in timeseries.buckets:
result.append({"time": bucket.seconds})
for index, data_point in enumerate(timeseries.data_points):
result[index][label] = data_point.data
# Timeseries serialization expects the function alias (eg. `count` not `count()`)
label = get_function_alias(timeseries.label)
if len(result) < len(timeseries.buckets):
for bucket in timeseries.buckets:
result.append({"time": bucket.seconds})
for index, data_point in enumerate(timeseries.data_points):
result[index][label] = data_point.data

return SnubaTSResult({"data": result}, params.start, params.end, granularity_secs)
return result
Loading

0 comments on commit 848e04e

Please sign in to comment.