Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscriptions): Use RPC in subscriptions pipeline #6499

Merged
merged 34 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
fb123b4
feat(subscriptions): Add create subscriptions RPC
shruthilayaj Nov 4, 2024
0f862e9
create subscriptions file
shruthilayaj Nov 4, 2024
0e90ffc
fix typing
shruthilayaj Nov 4, 2024
efa8aaa
tests
shruthilayaj Nov 6, 2024
69cfc87
subscription data test
shruthilayaj Nov 6, 2024
01f952c
merge conflicts
shruthilayaj Nov 7, 2024
f532342
fix typing
shruthilayaj Nov 7, 2024
2cd4713
codec tests
shruthilayaj Nov 8, 2024
6c05e65
Merge branch 'master' into shruthi/feat/add-create-subscriptions-endp…
shruthilayaj Nov 8, 2024
958926d
task result encoder
shruthilayaj Nov 8, 2024
9a8f16e
fix typing
shruthilayaj Nov 8, 2024
67372da
not deterministic
shruthilayaj Nov 8, 2024
c16e31b
fix test
shruthilayaj Nov 8, 2024
d152941
remove breakpoint
shruthilayaj Nov 8, 2024
76a7f0d
feat(subscriptions): Add a create subscription rpc endpoint
shruthilayaj Nov 18, 2024
5726ade
move validate to base class
shruthilayaj Nov 18, 2024
66270bf
inspect what's stored in redis in the test
shruthilayaj Nov 18, 2024
baaa03c
Merge branch 'master' into shruthi/feat/add-create-subscriptions-endp…
shruthilayaj Nov 18, 2024
7191833
merge conflicts
shruthilayaj Nov 18, 2024
c99d49c
serialize what's stored in redis
shruthilayaj Nov 19, 2024
17a49fe
add a comment
shruthilayaj Nov 19, 2024
83f1693
add more validation
shruthilayaj Nov 20, 2024
49e6e9b
merge
shruthilayaj Nov 20, 2024
e7fa5fb
fix tests
shruthilayaj Nov 20, 2024
15d29f6
minimize diff
shruthilayaj Nov 20, 2024
df861f5
remove rpc helper file
shruthilayaj Nov 20, 2024
356d368
update test subscriptions to be valid
shruthilayaj Nov 20, 2024
baf5566
fix typing
shruthilayaj Nov 20, 2024
6e9424a
fix merge conflcits
shruthilayaj Nov 20, 2024
6321e0c
remove rpc helper file
shruthilayaj Nov 21, 2024
f7f8448
return none, not 0 when no data is found
shruthilayaj Nov 21, 2024
f4b81d3
Merge branch 'master' into shruthi/feat/add-create-subscriptions-endp…
volokluev Nov 25, 2024
dc11a7e
Merge branch 'master' into shruthi/feat/add-create-subscriptions-endp…
shruthilayaj Nov 26, 2024
a642b1b
address comments
shruthilayaj Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions snuba/datasets/slicing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
should be stored. These do not require individual physical partitions but allow
for repartitioning with less code changes per physical change.
"""

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔪

from snuba.clusters.storage_sets import StorageSetKey

SENTRY_LOGICAL_PARTITIONS = 256
Expand Down
31 changes: 27 additions & 4 deletions snuba/subscriptions/codecs.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import base64
import json
from datetime import datetime
from typing import cast

import rapidjson
from arroyo.backends.kafka import KafkaPayload
from google.protobuf.message import Message as ProtobufMessage
from sentry_kafka_schemas.schema_types import events_subscription_results_v1

from snuba.datasets.entities.entity_key import EntityKey
from snuba.query.exceptions import InvalidQueryException
from snuba.subscriptions.data import (
RPCSubscriptionData,
ScheduledSubscriptionTask,
SnQLSubscriptionData,
Subscription,
SubscriptionData,
SubscriptionIdentifier,
SubscriptionTaskResult,
SubscriptionType,
SubscriptionWithMetadata,
)
from snuba.utils.codecs import Codec, Encoder
Expand All @@ -33,6 +37,9 @@ def decode(self, value: bytes) -> SubscriptionData:
except json.JSONDecodeError:
raise InvalidQueryException("Invalid JSON")

if data.get("subscription_type") == SubscriptionType.RPC.value:
return RPCSubscriptionData.from_dict(data, self.entity_key)

return SnQLSubscriptionData.from_dict(data, self.entity_key)


Expand All @@ -42,11 +49,22 @@ def encode(self, value: SubscriptionTaskResult) -> KafkaPayload:
subscription_id = str(subscription.identifier)
request, result = value.result

if isinstance(request, ProtobufMessage):
original_body = {
"request": base64.b64encode(request.SerializeToString()).decode(
"utf-8"
),
"request_name": request.__class__.__name__,
"request_version": request.__class__.__module__.split(".", 3)[2],
}
else:
original_body = {**request.original_body}

data: events_subscription_results_v1.SubscriptionResult = {
"version": 3,
"payload": {
"subscription_id": subscription_id,
"request": {**request.original_body},
"request": original_body,
"result": {
"data": result["data"],
"meta": result["meta"],
Expand Down Expand Up @@ -98,15 +116,20 @@ def decode(self, value: KafkaPayload) -> ScheduledSubscriptionTask:

entity_key = EntityKey(scheduled_subscription_dict["entity"])

data = scheduled_subscription_dict["task"]["data"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: the docstring of this class should probably change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscription: SubscriptionData
if data.get("subscription_type") == SubscriptionType.RPC.value:
subscription = RPCSubscriptionData.from_dict(data, entity_key)
else:
subscription = SnQLSubscriptionData.from_dict(data, entity_key)

return ScheduledSubscriptionTask(
datetime.fromisoformat(scheduled_subscription_dict["timestamp"]),
SubscriptionWithMetadata(
entity_key,
Subscription(
SubscriptionIdentifier.from_string(subscription_identifier),
SnQLSubscriptionData.from_dict(
scheduled_subscription_dict["task"]["data"], entity_key
),
subscription,
),
scheduled_subscription_dict["tick_upper_offset"],
),
Expand Down
113 changes: 102 additions & 11 deletions snuba/subscriptions/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,25 @@
from abc import ABC, abstractmethod
from concurrent.futures import Future
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from enum import Enum
from functools import partial
from typing import (
Any,
Generic,
Iterator,
List,
Mapping,
NamedTuple,
NewType,
Optional,
Tuple,
TypeVar,
Union,
)
from uuid import UUID

from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_create_subscription_pb2 import (
CreateSubscriptionRequest,
)
Expand Down Expand Up @@ -50,7 +53,11 @@
from snuba.request.validation import build_request, parse_snql_query
from snuba.subscriptions.utils import Tick
from snuba.utils.metrics import MetricsBackend
from snuba.utils.metrics.gauge import Gauge
from snuba.utils.metrics.timer import Timer
from snuba.web import QueryResult
from snuba.web.query import run_query
from snuba.web.rpc.v1.endpoint_time_series import EndpointTimeSeries

SUBSCRIPTION_REFERRER = "subscription"

Expand Down Expand Up @@ -79,6 +86,8 @@ class SubscriptionType(Enum):

PartitionId = NewType("PartitionId", int)

TRequest = TypeVar("TRequest")


@dataclass(frozen=True)
class SubscriptionIdentifier:
Expand All @@ -95,7 +104,7 @@ def from_string(cls, value: str) -> SubscriptionIdentifier:


@dataclass(frozen=True, kw_only=True)
class SubscriptionData(ABC):
class _SubscriptionData(ABC, Generic[TRequest]):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_request returns generic type TRequest and run_query accepts TRequest.
So for SnQLSubscriptionData, build_request returns Request and run_query accepts Request
and for RPCSubscriptionData, build_request returns TimeSeriesRequest and run_query accepts TimeSeriesRequest

project_id: int
resolution_sec: int
time_window_sec: int
Expand Down Expand Up @@ -127,14 +136,25 @@ def build_request(
timer: Timer,
metrics: Optional[MetricsBackend] = None,
referrer: str = SUBSCRIPTION_REFERRER,
) -> Request:
) -> TRequest:
raise NotImplementedError

@abstractmethod
def run_query(
self,
dataset: Dataset,
request: TRequest,
timer: Timer,
robust: bool = False,
concurrent_queries_gauge: Optional[Gauge] = None,
) -> QueryResult:
raise NotImplementedError

@classmethod
@abstractmethod
def from_dict(
cls, data: Mapping[str, Any], entity_key: EntityKey
) -> SubscriptionData:
) -> _SubscriptionData[TRequest]:
raise NotImplementedError

@abstractmethod
Expand All @@ -143,7 +163,7 @@ def to_dict(self) -> Mapping[str, Any]:


@dataclass(frozen=True, kw_only=True)
class RPCSubscriptionData(SubscriptionData):
class RPCSubscriptionData(_SubscriptionData[TimeSeriesRequest]):
"""
Represents the state of an RPC subscription.
"""
Expand Down Expand Up @@ -192,8 +212,58 @@ def build_request(
timer: Timer,
metrics: Optional[MetricsBackend] = None,
referrer: str = SUBSCRIPTION_REFERRER,
) -> Request:
raise NotImplementedError
) -> TimeSeriesRequest:

request_class = EndpointTimeSeries().request_class()()
request_class.ParseFromString(base64.b64decode(self.time_series_request))

# TODO: update it to round to the lowest granularity
# rounded_ts = int(timestamp.replace(tzinfo=UTC).timestamp() / 15) * 15
Comment on lines +220 to +221
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this TODO still relevant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, waiting on https://github.com/getsentry/projects/issues/364 to be finished

rounded_ts = (
int(timestamp.replace(tzinfo=UTC).timestamp() / self.time_window_sec)
* self.time_window_sec
)
rounded_start = datetime.utcfromtimestamp(rounded_ts)

start_time_proto = Timestamp()
start_time_proto.FromDatetime(
rounded_start - timedelta(seconds=self.time_window_sec)
)
end_time_proto = Timestamp()
end_time_proto.FromDatetime(rounded_start)
request_class.meta.start_timestamp.CopyFrom(start_time_proto)
request_class.meta.end_timestamp.CopyFrom(end_time_proto)

request_class.granularity_secs = self.time_window_sec

return request_class

def run_query(
self,
dataset: Dataset,
request: TimeSeriesRequest,
timer: Timer,
robust: bool = False,
concurrent_queries_gauge: Optional[Gauge] = None,
) -> QueryResult:
response = EndpointTimeSeries().execute(request)
if not response.result_timeseries:
result: Result = {
"meta": [],
"data": [{request.aggregations[0].label: None}],
"trace_output": "",
}
return QueryResult(
result=result, extra={"stats": {}, "sql": "", "experiments": {}}
)

timeseries = response.result_timeseries[0]
data = [{timeseries.label: timeseries.data_points[0].data}]

result = {"meta": [], "data": data, "trace_output": ""}
return QueryResult(
result=result, extra={"stats": {}, "sql": "", "experiments": {}}
)

@classmethod
def from_dict(
Expand Down Expand Up @@ -263,7 +333,7 @@ def to_dict(self) -> Mapping[str, Any]:


@dataclass(frozen=True, kw_only=True)
class SnQLSubscriptionData(SubscriptionData):
class SnQLSubscriptionData(_SubscriptionData[Request]):
"""
Represents the state of a subscription.
"""
Expand Down Expand Up @@ -379,10 +449,26 @@ def build_request(
)
return request

def run_query(
self,
dataset: Dataset,
request: Request,
timer: Timer,
robust: bool = False,
concurrent_queries_gauge: Optional[Gauge] = None,
) -> QueryResult:
return run_query(
dataset,
request,
timer,
robust=robust,
concurrent_queries_gauge=concurrent_queries_gauge,
)

@classmethod
def from_dict(
cls, data: Mapping[str, Any], entity_key: EntityKey
) -> SubscriptionData:
) -> SnQLSubscriptionData:
entity: Entity = get_entity(entity_key)

metadata = {}
Expand All @@ -407,6 +493,7 @@ def to_dict(self) -> Mapping[str, Any]:
"time_window": self.time_window_sec,
"resolution": self.resolution_sec,
"query": self.query,
"subscription_type": SubscriptionType.SNQL.value,
}

subscription_processors = self.entity.get_subscription_processors()
Expand All @@ -416,6 +503,10 @@ def to_dict(self) -> Mapping[str, Any]:
return subscription_data_dict


SubscriptionData = Union[RPCSubscriptionData, SnQLSubscriptionData]
SubscriptionRequest = Union[Request, TimeSeriesRequest]


class Subscription(NamedTuple):
identifier: SubscriptionIdentifier
data: SubscriptionData
Expand Down Expand Up @@ -461,9 +552,9 @@ def find(self, tick: Tick) -> Iterator[ScheduledSubscriptionTask]:

class SubscriptionTaskResultFuture(NamedTuple):
task: ScheduledSubscriptionTask
future: Future[Tuple[Request, Result]]
future: Future[Tuple[SubscriptionRequest, Result]]


class SubscriptionTaskResult(NamedTuple):
task: ScheduledSubscriptionTask
result: Tuple[Request, Result]
result: Tuple[SubscriptionRequest, Result]
9 changes: 4 additions & 5 deletions snuba/subscriptions/executor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
from snuba.datasets.factory import get_dataset
from snuba.datasets.table_storage import KafkaTopicSpec
from snuba.reader import Result
from snuba.request import Request
from snuba.subscriptions.codecs import (
SubscriptionScheduledTaskEncoder,
SubscriptionTaskResultEncoder,
)
from snuba.subscriptions.data import (
ScheduledSubscriptionTask,
SubscriptionRequest,
SubscriptionTaskResult,
SubscriptionTaskResultFuture,
)
Expand All @@ -46,7 +46,6 @@
from snuba.utils.streams.topics import Topic as SnubaTopic
from snuba.web import QueryException
from snuba.web.constants import NON_RETRYABLE_CLICKHOUSE_ERROR_CODES
from snuba.web.query import run_query

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -255,7 +254,7 @@ def __init__(

def __execute_query(
self, task: ScheduledSubscriptionTask, tick_upper_offset: int
) -> Tuple[Request, Result]:
) -> Tuple[SubscriptionRequest, Result]:
# Measure the amount of time that took between the task's scheduled
# time and it beginning to execute.
self.__metrics.timing(
Expand All @@ -274,9 +273,9 @@ def __execute_query(
"subscriptions_executor",
)

result = run_query(
result = task.task.subscription.data.run_query(
self.__dataset,
request,
request, # type: ignore
timer,
robust=True,
concurrent_queries_gauge=self.__concurrent_clickhouse_gauge,
Expand Down
Loading
Loading