From 38ea611e4982a8fe498949aaf1b75c26f6ad320d Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Tue, 26 Nov 2024 17:33:39 -0800 Subject: [PATCH] skip on compression Summary: Updates the how hashing is skipped when compression is enabled. Looks at the rpc metadata struct to determine if the data is compressed or not be hashing. Hashing can be skipped when compressing because compression provides its own hashing. The checksumming will only run on uncompressed data. Also improves the error message to match what the existing crc32 exception returns so this has the correct error codes. Reviewed By: cevans87, avalonalex Differential Revision: D66292663 fbshipit-source-id: e3d646b21f64e0df031a481dae94b0b065aee925 --- .../conformance/stresstest/StressTest.cpp | 17 ------- .../stresstest/client/StressTestClient.cpp | 50 ++++++++++++++++++- .../ChecksumPayloadSerializerStrategy.h | 40 +++++++++++++-- 3 files changed, 84 insertions(+), 23 deletions(-) diff --git a/third-party/thrift/src/thrift/conformance/stresstest/StressTest.cpp b/third-party/thrift/src/thrift/conformance/stresstest/StressTest.cpp index e744913efe6374..a5f07ceb4fb005 100644 --- a/third-party/thrift/src/thrift/conformance/stresstest/StressTest.cpp +++ b/third-party/thrift/src/thrift/conformance/stresstest/StressTest.cpp @@ -47,23 +47,6 @@ int main(int argc, char* argv[]) { // create a test runner instance auto clientCfg = ClientConfig::createFromFlags(); - if (clientCfg.enableChecksum) { - LOG(INFO) << "Initializing checksum payload serializer" << std::endl; - PayloadSerializer::initialize( - ChecksumPayloadSerializerStrategy( - ChecksumPayloadSerializerStrategyOptions{ - .recordChecksumFailure = - [] { LOG(FATAL) << "Checksum failure detected"; }, - .recordChecksumSuccess = - [] { - LOG_EVERY_N(INFO, 1'000'000) - << "Checksum success detected"; - }, - .recordChecksumCalculated = - [] { - LOG_EVERY_N(INFO, 1'000'000) << "Checksum calculated"; - }})); - } TestRunner testRunner(std::move(clientCfg)); testRunner.runTests(); diff --git a/third-party/thrift/src/thrift/conformance/stresstest/client/StressTestClient.cpp b/third-party/thrift/src/thrift/conformance/stresstest/client/StressTestClient.cpp index 20a2ef18f3adf0..bd44117986f61e 100644 --- a/third-party/thrift/src/thrift/conformance/stresstest/client/StressTestClient.cpp +++ b/third-party/thrift/src/thrift/conformance/stresstest/client/StressTestClient.cpp @@ -14,7 +14,9 @@ * limitations under the License. */ +#include #include +#include #include @@ -50,6 +52,27 @@ folly::coro::Task ThriftStressTestClient::co_echo( co_await timedExecute([&]() -> folly::coro::Task { RpcOptions rpcOptions; if (enableChecksum_) { + static folly::once_flag flag; + + folly::call_once(flag, [] { + LOG(INFO) << "Initializing checksum payload serializer" << std::endl; + rocket::PayloadSerializer::initialize( + rocket::ChecksumPayloadSerializerStrategy< + rocket::LegacyPayloadSerializerStrategy>( + rocket::ChecksumPayloadSerializerStrategyOptions{ + .recordChecksumFailure = + [] { LOG(FATAL) << "Checksum failure detected"; }, + .recordChecksumSuccess = + [] { + LOG_EVERY_N(INFO, 1'000) + << "Checksum success detected"; + }, + .recordChecksumCalculated = + [] { + LOG_EVERY_N(INFO, 1'000) << "Checksum calculated"; + }})); + }); + rpcOptions.setChecksum(RpcOptions::Checksum::XXH3_64); } ret = co_await client_->co_echo(rpcOptions, x); @@ -61,7 +84,32 @@ folly::coro::Task ThriftStressTestClient::co_echoEb( const std::string& x) { std::string ret; co_await timedExecute([&]() -> folly::coro::Task { - ret = co_await client_->co_echoEb(x); + RpcOptions rpcOptions; + if (enableChecksum_) { + static folly::once_flag flag; + + folly::call_once(flag, [] { + LOG(INFO) << "Initializing checksum payload serializer" << std::endl; + rocket::PayloadSerializer::initialize( + rocket::ChecksumPayloadSerializerStrategy< + rocket::LegacyPayloadSerializerStrategy>( + rocket::ChecksumPayloadSerializerStrategyOptions{ + .recordChecksumFailure = + [] { LOG(FATAL) << "Checksum failure detected"; }, + .recordChecksumSuccess = + [] { + LOG_EVERY_N(INFO, 1'000) + << "Checksum success detected"; + }, + .recordChecksumCalculated = + [] { + LOG_EVERY_N(INFO, 1'000) << "Checksum calculated"; + }})); + }); + + rpcOptions.setChecksum(RpcOptions::Checksum::XXH3_64); + } + ret = co_await client_->co_echoEb(rpcOptions, x); }); co_return ret; } diff --git a/third-party/thrift/src/thrift/lib/cpp2/transport/rocket/payload/ChecksumPayloadSerializerStrategy.h b/third-party/thrift/src/thrift/lib/cpp2/transport/rocket/payload/ChecksumPayloadSerializerStrategy.h index 7070ba629481d3..6eef94e6623b23 100644 --- a/third-party/thrift/src/thrift/lib/cpp2/transport/rocket/payload/ChecksumPayloadSerializerStrategy.h +++ b/third-party/thrift/src/thrift/lib/cpp2/transport/rocket/payload/ChecksumPayloadSerializerStrategy.h @@ -17,6 +17,8 @@ #pragma once #include +#include +#include #include #include #include @@ -57,7 +59,7 @@ class ChecksumPayloadSerializerStrategy final template FOLLY_ERASE folly::Try unpackAsCompressed( Payload&& payload, bool decodeMetadataUsingBinary) { - return unpackImpl( + return unpackImpl( std::move(payload), [this, decodeMetadataUsingBinary](Payload&& payload) -> folly::Try { return delegate_.template unpackAsCompressed( @@ -68,7 +70,7 @@ class ChecksumPayloadSerializerStrategy final template FOLLY_ERASE folly::Try unpack( Payload&& payload, bool decodeMetadataUsingBinary) { - return unpackImpl( + return unpackImpl( std::move(payload), [this, decodeMetadataUsingBinary](Payload&& payload) -> folly::Try { return delegate_.template unpack( @@ -81,6 +83,13 @@ class ChecksumPayloadSerializerStrategy final return delegate_.packCompact(std::forward(data)); } + template + bool isDataCompressed(Metadata* metadata) { + return metadata->compression().has_value() && + metadata->compression().value() != + apache::thrift::CompressionAlgorithm::NONE; + } + template FOLLY_ERASE rocket::Payload packWithFds( Metadata* metadata, @@ -215,17 +224,38 @@ class ChecksumPayloadSerializerStrategy final folly::Function recordChecksumSuccess_; folly::Function recordChecksumCalculated_; - template + /** + * Helper function that makes checks to make sure that the checksum wasn't + * invalid because of incorrect setup vs an actual checksum failure. + */ + void validateInvalidChecksum(const Checksum& c) { + auto value = c.checksum().value(); + auto salt = c.salt().value(); + + if (salt == 0 && value == 0) { + XLOG_EVERY_MS(ERR, 1'000) + << "Received a request to checksum the payload but received a checksum and salt that are zero. " + << "Please make sure that the ChecksumPayloadSerializerStrategy is enabled on both the client and server."; + } + } + + template FOLLY_ERASE folly::Try unpackImpl(Payload&& payload, DelegateFunc func) { if (payload.hasNonemptyMetadata()) { folly::Try t = func(std::move(payload)); + bool compressed = isDataCompressed(&t.value().metadata); folly::IOBuf& buf = *t->payload.get(); - if (t.hasException() || !VerifyChecksum) { + if (t.hasException() || compressed) { return t; } else if (validateChecksum(buf, t->metadata.checksum())) { return t; } else { - return folly::Try(std::runtime_error("Checksum mismatch")); + if (FOLLY_LIKELY(t->metadata.checksum().has_value())) { + validateInvalidChecksum(t->metadata.checksum().value()); + } + return folly::Try( + folly::make_exception_wrapper( + TApplicationException::CHECKSUM_MISMATCH, "Checksum mismatch")); } } else { return func(std::move(payload));