Skip to content

Commit

Permalink
process termination
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Mar 13, 2024
1 parent 04c5501 commit 1054385
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 92 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5.11.107
5.11.109
3 changes: 0 additions & 3 deletions src/fdb5/api/helpers/ListIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ Key ListElement::combinedKey() const {
}

void ListElement::print(std::ostream &out, bool withLocation, bool withLength) const {
if (!withLocation && location_ && !location_->host().empty()) {
out << "host=" << location_->host() << ",";
}
for (const auto& bit : keyParts_) {
out << bit;
}
Expand Down
1 change: 1 addition & 0 deletions src/fdb5/remote/Messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ enum class Message : uint16_t {
Error,
Stores,
Schema,
Stop,

// API calls to forward
Flush = 100,
Expand Down
93 changes: 79 additions & 14 deletions src/fdb5/remote/client/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,19 @@ bool ClientConnection::remove(uint32_t clientID) {
auto it = clients_.find(clientID);

if (it != clients_.end()) {
Connection::write(Message::Exit, true, clientID, 0);
// TODO make the data connection dying automatically, when there are no more async writes
Connection::write(Message::Exit, false, clientID, 0);
Connection::write(Message::Stop, true, clientID, 0);

clients_.erase(it);
}
}

if (clients_.empty()) {
Connection::write(Message::Exit, true, 0, 0);
if (!single_) {
// TODO make the data connection dying automatically, when there are no more async writes
Connection::write(Message::Exit, false, 0, 0);
}

ClientConnectionRouter::instance().deregister(*this);
}

Expand Down Expand Up @@ -133,7 +137,8 @@ bool ClientConnection::connect(bool singleAttempt) {
writeDataStartupMessage(serverSession);

// And the connections are set up. Let everything start up!
listeningThread_ = std::thread([this] { listeningThreadLoop(); });
listeningControlThread_ = std::thread([this] { listeningControlThreadLoop(); });
listeningDataThread_ = std::thread([this] { listeningDataThreadLoop(); });

connected_ = true;
return true;
Expand All @@ -150,7 +155,12 @@ void ClientConnection::disconnect() {
ASSERT(clients_.empty());
if (connected_) {

listeningThread_.join();
if (listeningControlThread_.joinable()) {
listeningControlThread_.join();
}
if (listeningDataThread_.joinable()) {
listeningDataThread_.join();
}

// Close both the control and data connections
controlClient_.close();
Expand Down Expand Up @@ -328,7 +338,7 @@ eckit::SessionID ClientConnection::verifyServerStartupResponse() {
return serverSession;
}

void ClientConnection::listeningThreadLoop() {
void ClientConnection::listeningControlThreadLoop() {

try {

Expand All @@ -337,15 +347,12 @@ void ClientConnection::listeningThreadLoop() {

while (true) {

eckit::Buffer payload = Connection::readData(hdr);
eckit::Buffer payload = Connection::readControl(hdr);

eckit::Log::debug<LibFdb5>() << "ClientConnection::listeningThreadLoop - got [message=" << hdr.message << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl;
eckit::Log::debug<LibFdb5>() << "ClientConnection::listeningControlThreadLoop - got [message=" << hdr.message << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl;

if (hdr.message == Message::Exit) {

if (clients_.empty()) {
return;
}
return;
} else {
if (hdr.clientID()) {
bool handled = false;
Expand All @@ -357,10 +364,10 @@ void ClientConnection::listeningThreadLoop() {
eckit::Log::status() << ss.str() << std::endl;
eckit::Log::error() << "Retrieving... " << ss.str() << std::endl;
throw eckit::SeriousBug(ss.str(), Here());

ASSERT(false); // todo report the error
}

ASSERT(hdr.control() || single_);

if (hdr.payloadSize == 0) {
if (it->second->blockingRequestId() == hdr.requestID) {
ASSERT(hdr.message == Message::Received);
Expand Down Expand Up @@ -397,4 +404,62 @@ void ClientConnection::listeningThreadLoop() {
// ClientConnectionRouter::instance().deregister(*this);
}

void ClientConnection::listeningDataThreadLoop() {

try {

MessageHeader hdr;
eckit::FixedString<4> tail;

while (true) {

eckit::Buffer payload = Connection::readData(hdr);

eckit::Log::debug<LibFdb5>() << "ClientConnection::listeningDataThreadLoop - got [message=" << hdr.message << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl;

if (hdr.message == Message::Exit) {
return;
} else {
if (hdr.clientID()) {
bool handled = false;
auto it = clients_.find(hdr.clientID());
if (it == clients_.end()) {
std::stringstream ss;
ss << "ERROR: Received [clientID="<< hdr.clientID() << ",requestID="<< hdr.requestID << ",message=" << hdr.message << ",payload=" << hdr.payloadSize << "]" << std::endl;
ss << "Unexpected answer for clientID recieved (" << hdr.clientID() << "). ABORTING";
eckit::Log::status() << ss.str() << std::endl;
eckit::Log::error() << "Retrieving... " << ss.str() << std::endl;
throw eckit::SeriousBug(ss.str(), Here());

ASSERT(false); // todo report the error
}

ASSERT(!hdr.control());
if (hdr.payloadSize == 0) {
handled = it->second->handle(hdr.message, hdr.control(), hdr.requestID);
}
else {
handled = it->second->handle(hdr.message, hdr.control(), hdr.requestID, std::move(payload));
}

if (!handled) {
std::stringstream ss;
ss << "ERROR: Unexpected message recieved (" << hdr.message << "). ABORTING";
eckit::Log::status() << ss.str() << std::endl;
eckit::Log::error() << "Client Retrieving... " << ss.str() << std::endl;
throw eckit::SeriousBug(ss.str(), Here());
}
}
}
}

// We don't want to let exceptions escape inside a worker thread.
} catch (const std::exception& e) {
// ClientConnectionRouter::instance().handleException(std::make_exception_ptr(e));
} catch (...) {
// ClientConnectionRouter::instance().handleException(std::current_exception());
}
// ClientConnectionRouter::instance().deregister(*this);
}

} // namespace fdb5::remote
6 changes: 4 additions & 2 deletions src/fdb5/remote/client/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class ClientConnection : protected Connection {

void handleError(const MessageHeader& hdr, eckit::Buffer buffer);

void listeningThreadLoop();
void listeningControlThreadLoop();
void listeningDataThreadLoop();
void dataWriteThreadLoop();

eckit::net::TCPSocket& controlSocket() override { return controlClient_; }
Expand All @@ -96,7 +97,8 @@ class ClientConnection : protected Connection {
std::mutex clientsMutex_;
std::map<uint32_t, Client*> clients_;

std::thread listeningThread_;
std::thread listeningControlThread_;
std::thread listeningDataThread_;

std::mutex requestMutex_;

Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/remote/server/CatalogueHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ void CatalogueHandler::schema(uint32_t clientID, uint32_t requestID, eckit::Buff
stream << schema;
}

write(Message::Received, false, clientID, requestID, schemaBuffer.data(), stream.position());
write(Message::Received, true, clientID, requestID, schemaBuffer.data(), stream.position());
}

void CatalogueHandler::stores(uint32_t clientID, uint32_t requestID) {
Expand Down Expand Up @@ -342,7 +342,7 @@ void CatalogueHandler::stores(uint32_t clientID, uint32_t requestID) {
s << ee;
}
}
write(Message::Received, false, clientID, requestID, startupBuffer.data(), s.position());
write(Message::Received, true, clientID, requestID, startupBuffer.data(), s.position());
}
}

Expand Down
144 changes: 76 additions & 68 deletions src/fdb5/remote/server/ServerConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,12 @@ void ServerConnection::listeningThreadLoopData() {
eckit::Buffer payload = readData(hdr); // READ DATA

if (hdr.message == Message::Exit) {
if (remove(false, hdr.clientID())) {
ASSERT(hdr.clientID() == 0);

eckit::Log::status() << "Terminating DATA listener" << std::endl;
eckit::Log::info() << "Terminating DATA listener" << std::endl;
eckit::Log::status() << "Terminating DATA listener" << std::endl;
eckit::Log::info() << "Terminating DATA listener" << std::endl;

break;
}
break;
} else {

Handled handled;
Expand Down Expand Up @@ -446,82 +445,91 @@ void ServerConnection::handle() {
eckit::Buffer payload = readControl(hdr); // READ CONTROL
eckit::Log::debug<LibFdb5>() << "ServerConnection::handle - got [message=" << hdr.message << ",clientID="<< hdr.clientID() << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl;

if (hdr.message == Message::Exit) {
if (remove(true, hdr.clientID())) {
if (hdr.message == Message::Stop) {
ASSERT(hdr.clientID());
remove(true, hdr.clientID());

write(Message::Exit, false, hdr.clientID(), 0);
} else {
if (hdr.message == Message::Exit) {
ASSERT(hdr.clientID() == 0);

write(Message::Exit, true, 0, 0);
if (!single_) {
write(Message::Exit, false, 0, 0);
}

eckit::Log::status() << "Terminating CONTROL listener" << std::endl;
eckit::Log::info() << "Terminating CONTROL listener" << std::endl;

break;
}
} else {
else {

Handled handled = Handled::No;
ASSERT(single_ || hdr.control());
Handled handled = Handled::No;
ASSERT(single_ || hdr.control());

if (payload.size()) {
if (hdr.control()) {
handled = handleControl(hdr.message, hdr.clientID(), hdr.requestID, std::move(payload));
} else {
handled = handleData(hdr.message, hdr.clientID(), hdr.requestID, std::move(payload));
}
} else {
if (hdr.control()) {
handled = handleControl(hdr.message, hdr.clientID(), hdr.requestID);
if (payload.size()) {
if (hdr.control()) {
handled = handleControl(hdr.message, hdr.clientID(), hdr.requestID, std::move(payload));
} else {
handled = handleData(hdr.message, hdr.clientID(), hdr.requestID, std::move(payload));
}
} else {
handled = handleData(hdr.message, hdr.clientID(), hdr.requestID);
if (hdr.control()) {
handled = handleControl(hdr.message, hdr.clientID(), hdr.requestID);
} else {
handled = handleData(hdr.message, hdr.clientID(), hdr.requestID);
}
}
}

switch (handled)
{
case Handled::Replied: // nothing to do
break;
// case Handled::YesRemoveArchiveListener:
// dataListener_--;
// if (dataListener_ == 0) {
// //return;
// // listeningThreadData.join();
// }
// break;
case Handled::YesAddArchiveListener:
{
std::lock_guard<std::mutex> lock(handlerMutex_);
dataListener_++;
if (dataListener_ == 1) {
listeningThreadData = std::thread([this] { listeningThreadLoopData(); });

switch (handled)
{
case Handled::Replied: // nothing to do
break;
// case Handled::YesRemoveArchiveListener:
// dataListener_--;
// if (dataListener_ == 0) {
// //return;
// // listeningThreadData.join();
// }
// break;
case Handled::YesAddArchiveListener:
{
std::lock_guard<std::mutex> lock(handlerMutex_);
dataListener_++;
if (dataListener_ == 1) {
listeningThreadData = std::thread([this] { listeningThreadLoopData(); });
}
}
}
write(Message::Received, false, hdr.clientID(), hdr.requestID);
break;
// case Handled::YesRemoveReadListener:
// dataListener_--;
// if (dataListener_ == 0) {
// //return;
// // listeningThreadData.join();
// }
// break;
case Handled::YesAddReadListener:
{
std::lock_guard<std::mutex> lock(handlerMutex_);
dataListener_++;
if (dataListener_ == 1) {
listeningThreadData = std::thread([this] { listeningThreadLoopData(); });
write(Message::Received, true, hdr.clientID(), hdr.requestID);
break;
// case Handled::YesRemoveReadListener:
// dataListener_--;
// if (dataListener_ == 0) {
// //return;
// // listeningThreadData.join();
// }
// break;
case Handled::YesAddReadListener:
{
std::lock_guard<std::mutex> lock(handlerMutex_);
dataListener_++;
if (dataListener_ == 1) {
listeningThreadData = std::thread([this] { listeningThreadLoopData(); });
}
}
}
write(Message::Received, false, hdr.clientID(), hdr.requestID);
break;
case Handled::Yes:
write(Message::Received, false, hdr.clientID(), hdr.requestID);
break;
case Handled::No:
default:
std::stringstream ss;
ss << "Unable to handle message " << hdr.message;
error(ss.str(), hdr.clientID(), hdr.requestID);
write(Message::Received, true, hdr.clientID(), hdr.requestID);
break;
case Handled::Yes:
write(Message::Received, true, hdr.clientID(), hdr.requestID);
break;
case Handled::No:
default:
std::stringstream ss;
ss << "Unable to handle message " << hdr.message;
error(ss.str(), hdr.clientID(), hdr.requestID);
}
}
}
}
Expand Down
Loading

0 comments on commit 1054385

Please sign in to comment.