From 1054385371902e034c45e5538da90d560108d6b4 Mon Sep 17 00:00:00 2001 From: Emanuele Danovaro Date: Wed, 13 Mar 2024 13:56:13 +0000 Subject: [PATCH] process termination --- VERSION | 2 +- src/fdb5/api/helpers/ListIterator.cc | 3 - src/fdb5/remote/Messages.h | 1 + src/fdb5/remote/client/ClientConnection.cc | 93 +++++++++++-- src/fdb5/remote/client/ClientConnection.h | 6 +- src/fdb5/remote/server/CatalogueHandler.cc | 4 +- src/fdb5/remote/server/ServerConnection.cc | 144 +++++++++++---------- src/fdb5/remote/server/StoreHandler.cc | 4 +- 8 files changed, 165 insertions(+), 92 deletions(-) diff --git a/VERSION b/VERSION index 69d75dec6..a9bdc9c94 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.11.107 \ No newline at end of file +5.11.109 \ No newline at end of file diff --git a/src/fdb5/api/helpers/ListIterator.cc b/src/fdb5/api/helpers/ListIterator.cc index 8e14b3efd..0bff75679 100644 --- a/src/fdb5/api/helpers/ListIterator.cc +++ b/src/fdb5/api/helpers/ListIterator.cc @@ -43,9 +43,6 @@ Key ListElement::combinedKey() const { } void ListElement::print(std::ostream &out, bool withLocation, bool withLength) const { - if (!withLocation && location_ && !location_->host().empty()) { - out << "host=" << location_->host() << ","; - } for (const auto& bit : keyParts_) { out << bit; } diff --git a/src/fdb5/remote/Messages.h b/src/fdb5/remote/Messages.h index 90cc02e99..00d14520d 100644 --- a/src/fdb5/remote/Messages.h +++ b/src/fdb5/remote/Messages.h @@ -46,6 +46,7 @@ enum class Message : uint16_t { Error, Stores, Schema, + Stop, // API calls to forward Flush = 100, diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 07e002e7c..b1c1cea46 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -78,15 +78,19 @@ bool ClientConnection::remove(uint32_t clientID) { auto it = clients_.find(clientID); if (it != clients_.end()) { - Connection::write(Message::Exit, true, clientID, 0); - // TODO make the data connection dying automatically, when there are no more async writes - Connection::write(Message::Exit, false, clientID, 0); + Connection::write(Message::Stop, true, clientID, 0); clients_.erase(it); } } if (clients_.empty()) { + Connection::write(Message::Exit, true, 0, 0); + if (!single_) { + // TODO make the data connection dying automatically, when there are no more async writes + Connection::write(Message::Exit, false, 0, 0); + } + ClientConnectionRouter::instance().deregister(*this); } @@ -133,7 +137,8 @@ bool ClientConnection::connect(bool singleAttempt) { writeDataStartupMessage(serverSession); // And the connections are set up. Let everything start up! - listeningThread_ = std::thread([this] { listeningThreadLoop(); }); + listeningControlThread_ = std::thread([this] { listeningControlThreadLoop(); }); + listeningDataThread_ = std::thread([this] { listeningDataThreadLoop(); }); connected_ = true; return true; @@ -150,7 +155,12 @@ void ClientConnection::disconnect() { ASSERT(clients_.empty()); if (connected_) { - listeningThread_.join(); + if (listeningControlThread_.joinable()) { + listeningControlThread_.join(); + } + if (listeningDataThread_.joinable()) { + listeningDataThread_.join(); + } // Close both the control and data connections controlClient_.close(); @@ -328,7 +338,7 @@ eckit::SessionID ClientConnection::verifyServerStartupResponse() { return serverSession; } -void ClientConnection::listeningThreadLoop() { +void ClientConnection::listeningControlThreadLoop() { try { @@ -337,15 +347,12 @@ void ClientConnection::listeningThreadLoop() { while (true) { - eckit::Buffer payload = Connection::readData(hdr); + eckit::Buffer payload = Connection::readControl(hdr); - eckit::Log::debug() << "ClientConnection::listeningThreadLoop - got [message=" << hdr.message << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl; + eckit::Log::debug() << "ClientConnection::listeningControlThreadLoop - got [message=" << hdr.message << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl; if (hdr.message == Message::Exit) { - - if (clients_.empty()) { - return; - } + return; } else { if (hdr.clientID()) { bool handled = false; @@ -357,10 +364,10 @@ void ClientConnection::listeningThreadLoop() { eckit::Log::status() << ss.str() << std::endl; eckit::Log::error() << "Retrieving... " << ss.str() << std::endl; throw eckit::SeriousBug(ss.str(), Here()); - - ASSERT(false); // todo report the error } + ASSERT(hdr.control() || single_); + if (hdr.payloadSize == 0) { if (it->second->blockingRequestId() == hdr.requestID) { ASSERT(hdr.message == Message::Received); @@ -397,4 +404,62 @@ void ClientConnection::listeningThreadLoop() { // ClientConnectionRouter::instance().deregister(*this); } +void ClientConnection::listeningDataThreadLoop() { + + try { + + MessageHeader hdr; + eckit::FixedString<4> tail; + + while (true) { + + eckit::Buffer payload = Connection::readData(hdr); + + eckit::Log::debug() << "ClientConnection::listeningDataThreadLoop - got [message=" << hdr.message << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl; + + if (hdr.message == Message::Exit) { + return; + } else { + if (hdr.clientID()) { + bool handled = false; + auto it = clients_.find(hdr.clientID()); + if (it == clients_.end()) { + std::stringstream ss; + ss << "ERROR: Received [clientID="<< hdr.clientID() << ",requestID="<< hdr.requestID << ",message=" << hdr.message << ",payload=" << hdr.payloadSize << "]" << std::endl; + ss << "Unexpected answer for clientID recieved (" << hdr.clientID() << "). ABORTING"; + eckit::Log::status() << ss.str() << std::endl; + eckit::Log::error() << "Retrieving... " << ss.str() << std::endl; + throw eckit::SeriousBug(ss.str(), Here()); + + ASSERT(false); // todo report the error + } + + ASSERT(!hdr.control()); + if (hdr.payloadSize == 0) { + handled = it->second->handle(hdr.message, hdr.control(), hdr.requestID); + } + else { + handled = it->second->handle(hdr.message, hdr.control(), hdr.requestID, std::move(payload)); + } + + if (!handled) { + std::stringstream ss; + ss << "ERROR: Unexpected message recieved (" << hdr.message << "). ABORTING"; + eckit::Log::status() << ss.str() << std::endl; + eckit::Log::error() << "Client Retrieving... " << ss.str() << std::endl; + throw eckit::SeriousBug(ss.str(), Here()); + } + } + } + } + + // We don't want to let exceptions escape inside a worker thread. + } catch (const std::exception& e) { +// ClientConnectionRouter::instance().handleException(std::make_exception_ptr(e)); + } catch (...) { +// ClientConnectionRouter::instance().handleException(std::current_exception()); + } + // ClientConnectionRouter::instance().deregister(*this); +} + } // namespace fdb5::remote diff --git a/src/fdb5/remote/client/ClientConnection.h b/src/fdb5/remote/client/ClientConnection.h index ae44a0f0e..51846fd60 100644 --- a/src/fdb5/remote/client/ClientConnection.h +++ b/src/fdb5/remote/client/ClientConnection.h @@ -75,7 +75,8 @@ class ClientConnection : protected Connection { void handleError(const MessageHeader& hdr, eckit::Buffer buffer); - void listeningThreadLoop(); + void listeningControlThreadLoop(); + void listeningDataThreadLoop(); void dataWriteThreadLoop(); eckit::net::TCPSocket& controlSocket() override { return controlClient_; } @@ -96,7 +97,8 @@ class ClientConnection : protected Connection { std::mutex clientsMutex_; std::map clients_; - std::thread listeningThread_; + std::thread listeningControlThread_; + std::thread listeningDataThread_; std::mutex requestMutex_; diff --git a/src/fdb5/remote/server/CatalogueHandler.cc b/src/fdb5/remote/server/CatalogueHandler.cc index 3aad56dae..75bda0b63 100644 --- a/src/fdb5/remote/server/CatalogueHandler.cc +++ b/src/fdb5/remote/server/CatalogueHandler.cc @@ -279,7 +279,7 @@ void CatalogueHandler::schema(uint32_t clientID, uint32_t requestID, eckit::Buff stream << schema; } - write(Message::Received, false, clientID, requestID, schemaBuffer.data(), stream.position()); + write(Message::Received, true, clientID, requestID, schemaBuffer.data(), stream.position()); } void CatalogueHandler::stores(uint32_t clientID, uint32_t requestID) { @@ -342,7 +342,7 @@ void CatalogueHandler::stores(uint32_t clientID, uint32_t requestID) { s << ee; } } - write(Message::Received, false, clientID, requestID, startupBuffer.data(), s.position()); + write(Message::Received, true, clientID, requestID, startupBuffer.data(), s.position()); } } diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 1a662f6d6..6abc5f093 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -374,13 +374,12 @@ void ServerConnection::listeningThreadLoopData() { eckit::Buffer payload = readData(hdr); // READ DATA if (hdr.message == Message::Exit) { - if (remove(false, hdr.clientID())) { + ASSERT(hdr.clientID() == 0); - eckit::Log::status() << "Terminating DATA listener" << std::endl; - eckit::Log::info() << "Terminating DATA listener" << std::endl; + eckit::Log::status() << "Terminating DATA listener" << std::endl; + eckit::Log::info() << "Terminating DATA listener" << std::endl; - break; - } + break; } else { Handled handled; @@ -446,82 +445,91 @@ void ServerConnection::handle() { eckit::Buffer payload = readControl(hdr); // READ CONTROL eckit::Log::debug() << "ServerConnection::handle - got [message=" << hdr.message << ",clientID="<< hdr.clientID() << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl; - if (hdr.message == Message::Exit) { - if (remove(true, hdr.clientID())) { + if (hdr.message == Message::Stop) { + ASSERT(hdr.clientID()); + remove(true, hdr.clientID()); - write(Message::Exit, false, hdr.clientID(), 0); + } else { + if (hdr.message == Message::Exit) { + ASSERT(hdr.clientID() == 0); + + write(Message::Exit, true, 0, 0); + if (!single_) { + write(Message::Exit, false, 0, 0); + } eckit::Log::status() << "Terminating CONTROL listener" << std::endl; eckit::Log::info() << "Terminating CONTROL listener" << std::endl; break; } - } else { + else { - Handled handled = Handled::No; - ASSERT(single_ || hdr.control()); + Handled handled = Handled::No; + ASSERT(single_ || hdr.control()); - if (payload.size()) { - if (hdr.control()) { - handled = handleControl(hdr.message, hdr.clientID(), hdr.requestID, std::move(payload)); - } else { - handled = handleData(hdr.message, hdr.clientID(), hdr.requestID, std::move(payload)); - } - } else { - if (hdr.control()) { - handled = handleControl(hdr.message, hdr.clientID(), hdr.requestID); + if (payload.size()) { + if (hdr.control()) { + handled = handleControl(hdr.message, hdr.clientID(), hdr.requestID, std::move(payload)); + } else { + handled = handleData(hdr.message, hdr.clientID(), hdr.requestID, std::move(payload)); + } } else { - handled = handleData(hdr.message, hdr.clientID(), hdr.requestID); + if (hdr.control()) { + handled = handleControl(hdr.message, hdr.clientID(), hdr.requestID); + } else { + handled = handleData(hdr.message, hdr.clientID(), hdr.requestID); + } } - } - - - switch (handled) - { - case Handled::Replied: // nothing to do - break; - // case Handled::YesRemoveArchiveListener: - // dataListener_--; - // if (dataListener_ == 0) { - // //return; - // // listeningThreadData.join(); - // } - // break; - case Handled::YesAddArchiveListener: - { - std::lock_guard lock(handlerMutex_); - dataListener_++; - if (dataListener_ == 1) { - listeningThreadData = std::thread([this] { listeningThreadLoopData(); }); + + + switch (handled) + { + case Handled::Replied: // nothing to do + break; + // case Handled::YesRemoveArchiveListener: + // dataListener_--; + // if (dataListener_ == 0) { + // //return; + // // listeningThreadData.join(); + // } + // break; + case Handled::YesAddArchiveListener: + { + std::lock_guard lock(handlerMutex_); + dataListener_++; + if (dataListener_ == 1) { + listeningThreadData = std::thread([this] { listeningThreadLoopData(); }); + } } - } - write(Message::Received, false, hdr.clientID(), hdr.requestID); - break; - // case Handled::YesRemoveReadListener: - // dataListener_--; - // if (dataListener_ == 0) { - // //return; - // // listeningThreadData.join(); - // } - // break; - case Handled::YesAddReadListener: - { - std::lock_guard lock(handlerMutex_); - dataListener_++; - if (dataListener_ == 1) { - listeningThreadData = std::thread([this] { listeningThreadLoopData(); }); + write(Message::Received, true, hdr.clientID(), hdr.requestID); + break; + // case Handled::YesRemoveReadListener: + // dataListener_--; + // if (dataListener_ == 0) { + // //return; + // // listeningThreadData.join(); + // } + // break; + case Handled::YesAddReadListener: + { + std::lock_guard lock(handlerMutex_); + dataListener_++; + if (dataListener_ == 1) { + listeningThreadData = std::thread([this] { listeningThreadLoopData(); }); + } } - } - write(Message::Received, false, hdr.clientID(), hdr.requestID); - break; - case Handled::Yes: - write(Message::Received, false, hdr.clientID(), hdr.requestID); - break; - case Handled::No: - default: - std::stringstream ss; - ss << "Unable to handle message " << hdr.message; - error(ss.str(), hdr.clientID(), hdr.requestID); + write(Message::Received, true, hdr.clientID(), hdr.requestID); + break; + case Handled::Yes: + write(Message::Received, true, hdr.clientID(), hdr.requestID); + break; + case Handled::No: + default: + std::stringstream ss; + ss << "Unable to handle message " << hdr.message; + error(ss.str(), hdr.clientID(), hdr.requestID); + } } } } diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index 09f838726..82eaf6738 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -179,7 +179,7 @@ void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID MemoryStream stream(buffer); // stream << archiverID; stream << (*loc); - Connection::write(Message::Store, false, clientID, requestID, buffer, stream.position()); + Connection::write(Message::Store, true, clientID, requestID, buffer, stream.position()); } void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload) { @@ -248,7 +248,7 @@ Store& StoreHandler::store(uint32_t clientID) { auto it = stores_.find(clientID); if (it == stores_.end()) { std::string what("Requested Store has not been loaded id: " + std::to_string(clientID)); - write(Message::Error, false, 0, 0, what.c_str(), what.length()); + write(Message::Error, true, 0, 0, what.c_str(), what.length()); throw; }