-
-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(subscriptions): Use RPC in subscriptions pipeline #6499
feat(subscriptions): Use RPC in subscriptions pipeline #6499
Conversation
❌ 1 Tests Failed:
View the top 1 failed tests by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
snuba/subscriptions/data.py
Outdated
rounded_ts = ( | ||
int(timestamp.replace(tzinfo=UTC).timestamp() / self.time_window_sec) | ||
* self.time_window_sec | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hack for now, we want to round this to the lowest available granularity (15s) more context here
snuba/subscriptions/data.py
Outdated
if (self.request_name, self.request_version) not in REQUEST_TYPE_ALLOWLIST: | ||
raise InvalidSubscriptionError( | ||
f"{self.request_name} {self.request_version} not supported." | ||
) | ||
|
||
# TODO: Validate no group by, having, order by etc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add more validation in a follow up so it's easier to review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can the create subscription RPC call not be its own PR?
@volokluev Creating a subscription requires validation, so I needed to build and run the request before storing it. That introduces the RPCSubscriptionData class. And basically once I introduced it in one place, I kinda had to follow all the typing issues and handle it everywhere. |
@@ -95,7 +104,7 @@ def from_string(cls, value: str) -> SubscriptionIdentifier: | |||
|
|||
|
|||
@dataclass(frozen=True, kw_only=True) | |||
class SubscriptionData(ABC): | |||
class _SubscriptionData(ABC, Generic[TRequest]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build_request
returns generic type TRequest
and run_query
accepts TRequest
.
So for SnQLSubscriptionData, build_request
returns Request
and run_query
accepts Request
and for RPCSubscriptionData, build_request
returns TimeSeriesRequest
and run_query
accepts TimeSeriesRequest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some cleanup comments but overall this is good to go
# TODO: update it to round to the lowest granularity | ||
# rounded_ts = int(timestamp.replace(tzinfo=UTC).timestamp() / 15) * 15 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this TODO still relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, waiting on https://github.com/getsentry/projects/issues/364 to be finished
entity_key, | ||
PartitionId(partition_index), | ||
) | ||
entity = get_entity(EntityKey.EVENTS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this line here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, bad copy paste
PartitionId(partition_index), | ||
) | ||
entity = get_entity(EntityKey.EVENTS) | ||
store.create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in your tests, you should be creating subscriptions through the api call, not the implementation detail as you are doing here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def setup_method(self) -> None: | ||
self.dataset = get_dataset("metrics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
|
||
|
||
def build_rpc_subscription(resolution: timedelta, org_id: int) -> Subscription: | ||
return Subscription( | ||
SubscriptionIdentifier(PartitionId(1), uuid.uuid4()), | ||
RPCSubscriptionData.from_proto( | ||
CreateSubscriptionRequestProto( | ||
time_series_request=TimeSeriesRequest( | ||
meta=RequestMeta( | ||
project_ids=[1], | ||
organization_id=org_id, | ||
cogs_category="something", | ||
referrer="something", | ||
), | ||
aggregations=[ | ||
AttributeAggregation( | ||
aggregate=Function.FUNCTION_SUM, | ||
key=AttributeKey( | ||
type=AttributeKey.TYPE_FLOAT, name="test_metric" | ||
), | ||
label="sum", | ||
extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED, | ||
), | ||
], | ||
filter=TraceItemFilter( | ||
comparison_filter=ComparisonFilter( | ||
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="foo"), | ||
op=ComparisonFilter.OP_NOT_EQUALS, | ||
value=AttributeValue(val_str="bar"), | ||
) | ||
), | ||
), | ||
time_window_secs=300, | ||
resolution_secs=int(resolution.total_seconds()), | ||
), | ||
EntityKey.EAP_SPANS, | ||
), | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def expected_rpc_subs() -> MutableSequence[Subscription]: | ||
return [ | ||
build_rpc_subscription(timedelta(minutes=1), 2) | ||
for count in range(randint(1, 50)) | ||
] | ||
|
||
|
||
@pytest.fixture | ||
def extra_rpc_subs() -> MutableSequence[Subscription]: | ||
return [ | ||
build_rpc_subscription(timedelta(minutes=3), 1) | ||
for count in range(randint(1, 50)) | ||
] | ||
|
||
|
||
@patch("snuba.settings.SLICED_STORAGE_SETS", {"events_analytics_platform": 3}) | ||
@patch( | ||
"snuba.settings.LOGICAL_PARTITION_MAPPING", | ||
{"events_analytics_platform": {0: 0, 1: 1, 2: 2}}, | ||
) | ||
def test_filter_rpc_subscriptions(expected_rpc_subs, extra_rpc_subs) -> None: # type: ignore | ||
importlib.reload(scheduler) | ||
|
||
filtered_subs = filter_subscriptions( | ||
subscriptions=expected_rpc_subs + extra_rpc_subs, | ||
entity_key=EntityKey.EAP_SPANS, | ||
metrics=DummyMetricsBackend(strict=True), | ||
slice_id=2, | ||
) | ||
assert filtered_subs == expected_rpc_subs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test is not necessary.
- eap_spans is not a sliced storage
- this is far in the weeds of the implementation details of the subscription scheduler
snuba/datasets/slicing.py
Outdated
@@ -3,6 +3,7 @@ | |||
should be stored. These do not require individual physical partitions but allow | |||
for repartitioning with less code changes per physical change. | |||
""" | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔪
@@ -98,15 +116,20 @@ def decode(self, value: KafkaPayload) -> ScheduledSubscriptionTask: | |||
|
|||
entity_key = EntityKey(scheduled_subscription_dict["entity"]) | |||
|
|||
data = scheduled_subscription_dict["task"]["data"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: the docstring of this class should probably change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you go to merge this, please notify the oncall in |
Will do, I'll address your comments and merge tomorrow 👍 |
Updates the subscription pipeline to support RPCSubscriptionData.