Skip to content

Commit

Permalink
Add test to verify that ConsumeMetrics and ConsumeTraces clear respon…
Browse files Browse the repository at this point in the history
…se members (#1975)

Summary: Add test to verify that ConsumeMetrics and ConsumeTraces clear
response members

This adds test coverage for the bug fix in #1910. This is a follow up to
the conversation
[here](#1910 (comment))

Relevant Issues: N/A

Type of change: /kind bug

Test Plan: Verified that unit test fails if #1910 is reverted
```
$ git show HEAD
commit 4ab4a9c (HEAD -> ddelnano/add-tests-for-otel-sink-bug, ddelnano/ddelnano/add-tests-for-otel-sink-bug)
Author: Dom Del Nano <[email protected]>
Date:   Fri Jul 26 12:17:00 2024 +0000

    Revert "Clear trace response instead of metric response in `OTelExportSinkNode::ConsumeSpans` (#1910)"

    This reverts commit 970a54a.

$ bazel test src/carnot/exec:otel_export_sink_node_test --test_output=all

[ ... ]
[ RUN      ] OTelExportSinkNodeTest.consume_spans_clears_span_responses
src/carnot/exec/otel_export_sink_node_test.cc:1748: Failure
Value of: response->partial_success().rejected_spans() == 0
  Actual: false
Expected: true

```

---------

Signed-off-by: Dom Del Nano <[email protected]>
  • Loading branch information
ddelnano authored Jul 28, 2024
1 parent 962e48d commit b01f8ae
Showing 1 changed file with 105 additions and 0 deletions.
105 changes: 105 additions & 0 deletions src/carnot/exec/otel_export_sink_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,111 @@ eos: true)pb";
EXPECT_THAT(retval.ToString(), ::testing::MatchesRegex(".*INTERNAL.*"));
}

TEST_F(OTelExportSinkNodeTest, consume_spans_clears_span_responses) {
oteltracecollector::ExportTraceServiceResponse error_response;
error_response.mutable_partial_success()->set_rejected_spans(1);
EXPECT_CALL(*trace_mock_, Export(_, _, _))
.Times(::testing::AtLeast(2))
.WillOnce(DoAll(SetArgPointee<2>(error_response), Return(grpc::Status::OK)))
.WillRepeatedly(Invoke([&](const auto&, const auto&, auto* response) {
// It's expected that the response argument provided to Export
// has .Clear() called on it. This CALL assertion verifies that the
// response object no longer has rejected data points since it should
// have been .Clear()'ed at the beginning of the second ConsumeTraces invocation
EXPECT_EQ(response->partial_success().rejected_spans(), 0);
return grpc::Status::OK;
}));

planpb::OTelExportSinkOperator otel_sink_op;

std::string operator_proto = R"pb(
spans {
name_string: "span"
start_time_column_index: 0
end_time_column_index: 1
trace_id_column_index: -1
span_id_column_index: -1
parent_span_id_column_index: -1
})pb";
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(operator_proto, &otel_sink_op));
auto plan_node = std::make_unique<plan::OTelExportSinkOperator>(1);
auto s = plan_node->Init(otel_sink_op);
std::string row_batch = R"pb(
cols { time64ns_data { data: 10 data: 20 } }
cols { time64ns_data { data: 12 data: 22 } }
num_rows: 2
eow: true
eos: true)pb";

// Load a RowBatch to get the Input RowDescriptor.
table_store::schemapb::RowBatchData row_batch_proto;
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(row_batch, &row_batch_proto));
RowDescriptor input_rd = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie()->desc();
RowDescriptor output_rd({});

auto tester = exec::ExecNodeTester<OTelExportSinkNode, plan::OTelExportSinkOperator>(
*plan_node, output_rd, {input_rd}, exec_state_.get());
auto rb = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie();

// Call ConsumeSpans twice in order to verify that the second
// invocation calls clear on the response object.
auto retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
EXPECT_OK(retval);
retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
EXPECT_OK(retval);
}

TEST_F(OTelExportSinkNodeTest, metrics_response_is_cleared) {
otelmetricscollector::ExportMetricsServiceResponse error_response;
error_response.mutable_partial_success()->set_rejected_data_points(1);
EXPECT_CALL(*metrics_mock_, Export(_, _, _))
.Times(::testing::AtLeast(2))
.WillOnce(DoAll(SetArgPointee<2>(error_response), Return(grpc::Status::OK)))
.WillRepeatedly(Invoke([&](const auto&, const auto&, auto* response) {
// It's expected that the response argument provided to Export
// has .Clear() called on it. This CALL assertion verifies that the
// response object no longer has rejected data points since it should
// have been .Clear()'ed at the beginning of the second ConsumeMetrics invocation
EXPECT_EQ(response->partial_success().rejected_data_points(), 0);
return grpc::Status::OK;
}));

planpb::OTelExportSinkOperator otel_sink_op;

std::string operator_proto = R"pb(
metrics {
name: "http.resp.latency"
time_column_index: 0
gauge { int_column_index: 1 }
})pb";
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(operator_proto, &otel_sink_op));
auto plan_node = std::make_unique<plan::OTelExportSinkOperator>(1);
auto s = plan_node->Init(otel_sink_op);
std::string row_batch = R"pb(
cols { time64ns_data { data: 10 data: 11 } }
cols { int64_data { data: 15 data: 150 } }
num_rows: 2
eow: true
eos: true)pb";

// Load a RowBatch to get the Input RowDescriptor.
table_store::schemapb::RowBatchData row_batch_proto;
EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(row_batch, &row_batch_proto));
RowDescriptor input_rd = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie()->desc();
RowDescriptor output_rd({});

auto tester = exec::ExecNodeTester<OTelExportSinkNode, plan::OTelExportSinkOperator>(
*plan_node, output_rd, {input_rd}, exec_state_.get());
auto rb = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie();

// Call ConsumeMetrics twice in order to verify that the second
// invocation calls clear on the response object.
auto retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
EXPECT_OK(retval);
retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1);
EXPECT_OK(retval);
}

} // namespace exec
} // namespace carnot
} // namespace px

0 comments on commit b01f8ae

Please sign in to comment.