diff --git a/src/carnot/exec/otel_export_sink_node_test.cc b/src/carnot/exec/otel_export_sink_node_test.cc index 441235e84f6..285f9aa2217 100644 --- a/src/carnot/exec/otel_export_sink_node_test.cc +++ b/src/carnot/exec/otel_export_sink_node_test.cc @@ -1732,6 +1732,111 @@ eos: true)pb"; EXPECT_THAT(retval.ToString(), ::testing::MatchesRegex(".*INTERNAL.*")); } +TEST_F(OTelExportSinkNodeTest, consume_spans_clears_span_responses) { + oteltracecollector::ExportTraceServiceResponse error_response; + error_response.mutable_partial_success()->set_rejected_spans(1); + EXPECT_CALL(*trace_mock_, Export(_, _, _)) + .Times(::testing::AtLeast(2)) + .WillOnce(DoAll(SetArgPointee<2>(error_response), Return(grpc::Status::OK))) + .WillRepeatedly(Invoke([&](const auto&, const auto&, auto* response) { + // It's expected that the response argument provided to Export + // has .Clear() called on it. This CALL assertion verifies that the + // response object no longer has rejected data points since it should + // have been .Clear()'ed at the beginning of the second ConsumeTraces invocation + EXPECT_EQ(response->partial_success().rejected_spans(), 0); + return grpc::Status::OK; + })); + + planpb::OTelExportSinkOperator otel_sink_op; + + std::string operator_proto = R"pb( +spans { + name_string: "span" + start_time_column_index: 0 + end_time_column_index: 1 + trace_id_column_index: -1 + span_id_column_index: -1 + parent_span_id_column_index: -1 +})pb"; + EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(operator_proto, &otel_sink_op)); + auto plan_node = std::make_unique(1); + auto s = plan_node->Init(otel_sink_op); + std::string row_batch = R"pb( +cols { time64ns_data { data: 10 data: 20 } } +cols { time64ns_data { data: 12 data: 22 } } +num_rows: 2 +eow: true +eos: true)pb"; + + // Load a RowBatch to get the Input RowDescriptor. + table_store::schemapb::RowBatchData row_batch_proto; + EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(row_batch, &row_batch_proto)); + RowDescriptor input_rd = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie()->desc(); + RowDescriptor output_rd({}); + + auto tester = exec::ExecNodeTester( + *plan_node, output_rd, {input_rd}, exec_state_.get()); + auto rb = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie(); + + // Call ConsumeSpans twice in order to verify that the second + // invocation calls clear on the response object. + auto retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1); + EXPECT_OK(retval); + retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1); + EXPECT_OK(retval); +} + +TEST_F(OTelExportSinkNodeTest, metrics_response_is_cleared) { + otelmetricscollector::ExportMetricsServiceResponse error_response; + error_response.mutable_partial_success()->set_rejected_data_points(1); + EXPECT_CALL(*metrics_mock_, Export(_, _, _)) + .Times(::testing::AtLeast(2)) + .WillOnce(DoAll(SetArgPointee<2>(error_response), Return(grpc::Status::OK))) + .WillRepeatedly(Invoke([&](const auto&, const auto&, auto* response) { + // It's expected that the response argument provided to Export + // has .Clear() called on it. This CALL assertion verifies that the + // response object no longer has rejected data points since it should + // have been .Clear()'ed at the beginning of the second ConsumeMetrics invocation + EXPECT_EQ(response->partial_success().rejected_data_points(), 0); + return grpc::Status::OK; + })); + + planpb::OTelExportSinkOperator otel_sink_op; + + std::string operator_proto = R"pb( +metrics { + name: "http.resp.latency" + time_column_index: 0 + gauge { int_column_index: 1 } +})pb"; + EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(operator_proto, &otel_sink_op)); + auto plan_node = std::make_unique(1); + auto s = plan_node->Init(otel_sink_op); + std::string row_batch = R"pb( +cols { time64ns_data { data: 10 data: 11 } } +cols { int64_data { data: 15 data: 150 } } +num_rows: 2 +eow: true +eos: true)pb"; + + // Load a RowBatch to get the Input RowDescriptor. + table_store::schemapb::RowBatchData row_batch_proto; + EXPECT_TRUE(google::protobuf::TextFormat::ParseFromString(row_batch, &row_batch_proto)); + RowDescriptor input_rd = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie()->desc(); + RowDescriptor output_rd({}); + + auto tester = exec::ExecNodeTester( + *plan_node, output_rd, {input_rd}, exec_state_.get()); + auto rb = RowBatch::FromProto(row_batch_proto).ConsumeValueOrDie(); + + // Call ConsumeMetrics twice in order to verify that the second + // invocation calls clear on the response object. + auto retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1); + EXPECT_OK(retval); + retval = tester.node()->ConsumeNext(exec_state_.get(), *rb.get(), 1); + EXPECT_OK(retval); +} + } // namespace exec } // namespace carnot } // namespace px