From 8d9d709f97eb528123c22cc21426898c273d7258 Mon Sep 17 00:00:00 2001 From: Nikunj Yadav Date: Tue, 27 Oct 2015 12:33:09 -0700 Subject: [PATCH] Fixing bug, receiver sends invalid local checkpoints in long runing mode Summary: Long running mode has been broken in the trunk, as well as in the new version. Receiver last checkpoint was not being cleared. How to see it broken: 1. Make a receiver in long running mode with an older version 2. Start a sender with newer version and do a transfer 3. Start another sender with newer version. This step should fail Third step fails because the new sender doesn't understand the checkpoint sent by the last version Reviewed By: ldemailly Differential Revision: D2581890 fb-gh-sync-id: 2968b3d96d23cd94597879f8bd126ac91686f0f7 --- CMakeLists.txt | 4 ++- ReceiverThread.cpp | 6 +++- SenderThread.cpp | 21 +++++++++----- WdtConfig.h | 4 +-- test/common_utils.py | 7 +++++ test/wdt_long_running_test.py | 52 +++++++++++++++++++++++++++++++++++ 6 files changed, 83 insertions(+), 11 deletions(-) create mode 100755 test/wdt_long_running_test.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 6a54840d..d8e0ef1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2) # There is no C per se in WDT but if you use CXX only here many checks fail # Version is Major.Minor.YYMMDDX for up to 10 releases per day # Minor currently is also the protocol version - has to match with Protocol.cpp -project("WDT" LANGUAGES C CXX VERSION 1.22.1510250) +project("WDT" LANGUAGES C CXX VERSION 1.22.1510270) # On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2) set(CMAKE_CXX_STANDARD 11) @@ -315,4 +315,6 @@ if (BUILD_TESTING) add_test(NAME ReceiverThrottlerRefCountTest COMMAND "${CMAKE_CURRENT_SOURCE_DIR}/test/receiver_throttler_ref_count_test.sh") + add_test(NAME WdtLongRunningTest COMMAND + "${CMAKE_CURRENT_SOURCE_DIR}/test/wdt_long_running_test.py") endif(BUILD_TESTING) diff --git a/ReceiverThread.cpp b/ReceiverThread.cpp index 06bc5d91..eb556c4c 100644 --- a/ReceiverThread.cpp +++ b/ReceiverThread.cpp @@ -223,6 +223,7 @@ ReceiverState ReceiverThread::sendLocalCheckpoint() { localCheckpoint.numBlocks = -1; checkpoints.emplace_back(localCheckpoint); } else { + VLOG(1) << *this << " sending local checkpoint " << checkpoint_; checkpoints.emplace_back(checkpoint_); } @@ -289,7 +290,7 @@ ReceiverState ReceiverThread::processSettingsCmd() { return FINISH_WITH_ERROR; } if (senderProtocolVersion != threadProtocolVersion_) { - LOG(ERROR) << "Receiver and sender protocol version mismatch " + LOG(ERROR) << *this << " Receiver and sender protocol version mismatch " << senderProtocolVersion << " " << threadProtocolVersion_; int negotiatedProtocol = Protocol::negotiateProtocol( senderProtocolVersion, threadProtocolVersion_); @@ -899,6 +900,9 @@ void ReceiverThread::reset() { senderReadTimeout_ = senderWriteTimeout_ = -1; curConnectionVerified_ = false; threadStats_.reset(); + checkpoints_.clear(); + newCheckpoints_.clear(); + checkpoint_ = Checkpoint(socket_.getPort()); } ReceiverThread::~ReceiverThread() { diff --git a/SenderThread.cpp b/SenderThread.cpp index 3ef9c187..86927c15 100644 --- a/SenderThread.cpp +++ b/SenderThread.cpp @@ -85,16 +85,23 @@ SenderState SenderThread::readLocalCheckPoint() { numReconnectWithoutProgress_++; return CONNECT; } + bool isValidCheckpoint = true; if (!Protocol::decodeCheckpoints(threadProtocolVersion_, buf_, decodeOffset, checkpointLen, checkpoints)) { LOG(ERROR) << "checkpoint decode failure " - << folly::humanify(std::string(buf_, checkpointLen)); - threadStats_.setErrorCode(PROTOCOL_ERROR); - return END; - } - if (checkpoints.size() != 1 || checkpoints[0].port != port_) { - LOG(ERROR) << "illegal local checkpoint " - << folly::humanify(std::string(buf_, checkpointLen)); + << folly::humanify(std::string(buf_, numRead)); + isValidCheckpoint = false; + } else if (checkpoints.size() != 1) { + LOG(ERROR) << "Illegal local checkpoint, unexpected num checkpoints " + << checkpoints.size() << " " + << folly::humanify(std::string(buf_, numRead)); + isValidCheckpoint = false; + } else if (checkpoints[0].port != port_) { + LOG(ERROR) << "illegal checkpoint, checkpoint " << checkpoints[0] + << " doesn't match the port " << port_; + isValidCheckpoint = false; + } + if (!isValidCheckpoint) { threadStats_.setErrorCode(PROTOCOL_ERROR); return END; } diff --git a/WdtConfig.h b/WdtConfig.h index b1554d37..c7d845be 100644 --- a/WdtConfig.h +++ b/WdtConfig.h @@ -8,9 +8,9 @@ #define WDT_VERSION_MAJOR 1 #define WDT_VERSION_MINOR 22 -#define WDT_VERSION_BUILD 1510250 +#define WDT_VERSION_BUILD 1510270 // Add -fbcode to version str -#define WDT_VERSION_STR "1.22.1510250-fbcode" +#define WDT_VERSION_STR "1.22.1510270-fbcode" // Tie minor and proto version #define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR diff --git a/test/common_utils.py b/test/common_utils.py index ea7b26ae..ffbb898f 100644 --- a/test/common_utils.py +++ b/test/common_utils.py @@ -11,6 +11,13 @@ import tempfile import errno +def get_wdt_version(): + dummy_cmd = "_bin/wdt/wdt --version" + dummy_process = subprocess.Popen(dummy_cmd.split(), + stdout=subprocess.PIPE) + protocol_string = dummy_process.stdout.readline().strip() + return protocol_string.split()[4] + def start_receiver(receiver_cmd, root_dir, test_count): print("Receiver: " + receiver_cmd) server_log = "{0}/server{1}.log".format(root_dir, test_count) diff --git a/test/wdt_long_running_test.py b/test/wdt_long_running_test.py new file mode 100755 index 00000000..071ac336 --- /dev/null +++ b/test/wdt_long_running_test.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +import shutil +from common_utils import * + +def run_test(wdtbin, test_name, test_count, root_dir, connection_url): + test_description = ("Test #{0}, [{1}]").format(test_count, test_name) + print(test_description) + sender_cmd = (wdtbin + " -directory {0}/src " + "-connection_url \'{1}\'").format(root_dir, connection_url) + transfer_status = run_sender(sender_cmd, root_dir, test_count) + if transfer_status: + print("Failed " + test_description + ". Check logs" + + " in " + root_dir) + exit(transfer_status) + +def main(): + wdt_version = get_wdt_version() + print("wdt protocol version " + wdt_version) + root_dir = create_test_directory("/tmp") + src_dir = root_dir + "/src" + generate_random_files(src_dir, 4096) + + wdtbin_opts = "-full_reporting -num_ports 4" + wdtbin = "_bin/wdt/wdt " + wdtbin_opts + #receiver version should be one behind + receiver_version = (int(wdt_version) - 1) + receiver_cmd = (wdtbin + " -start_port 0 -run_as_daemon " + "-skip_writes -protocol_version {0}").format(receiver_version) + #start the receiver in long running mode + print(receiver_cmd) + (receiver_process, connection_url) = start_receiver(receiver_cmd, + root_dir, 0) + + run_test(wdtbin, "sender 1 same version", 1, root_dir, connection_url) + run_test(wdtbin, "sender 2 same version", 2, root_dir, connection_url) + + protocol_key = "recpv" + prev_str = "{0}={1}".format(protocol_key, receiver_version) + new_str = "{0}={1}".format(protocol_key, wdt_version) + connection_url_new_version = connection_url.replace(prev_str, new_str) + + run_test(wdtbin, "sender 1 newer version", 3, root_dir, + connection_url_new_version) + run_test(wdtbin, "sender 2 newer version", 4, root_dir, + connection_url_new_version) + #since receiver is in long running mode, kill it + receiver_process.kill() + print("Tests successful! Removing logs and data from " + root_dir) + shutil.rmtree(root_dir) + +if __name__ == "__main__": + main()