From 56ff90d38dacf5bb40caac016b1d83139f2946cc Mon Sep 17 00:00:00 2001 From: Tao He Date: Tue, 27 Jun 2023 10:55:10 +0900 Subject: [PATCH] Fixes remote blob creation error when OOM (#1433) Fixes #1430 Signed-off-by: Tao He --- src/client/rpc_client.cc | 8 ++++ src/server/async/socket_server.cc | 63 ++++++++++++++++++++----------- src/server/async/socket_server.h | 6 ++- 3 files changed, 52 insertions(+), 25 deletions(-) diff --git a/src/client/rpc_client.cc b/src/client/rpc_client.cc index 40e4e3220b..a96844d727 100644 --- a/src/client/rpc_client.cc +++ b/src/client/rpc_client.cc @@ -325,6 +325,14 @@ Status RPCClient::CreateRemoteBlob( std::string message_out; WriteCreateRemoteBufferRequest(buffer->size(), !!compressor, message_out); RETURN_ON_ERROR(doWrite(message_out)); + + // receive a confirm to continue + { + json message_in; + RETURN_ON_ERROR(doRead(message_in)); + RETURN_ON_ERROR(ReadCreateBufferReply(message_in, id, payload, fd_sent)); + } + // send the actual payload if (compressor && buffer->size() > 0) { RETURN_ON_ERROR(detail::compress_and_send(compressor, vineyard_conn_, diff --git a/src/server/async/socket_server.cc b/src/server/async/socket_server.cc index 2c1cc1276c..ee2bddba93 100644 --- a/src/server/async/socket_server.cc +++ b/src/server/async/socket_server.cc @@ -572,23 +572,32 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) { RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object)); RESPONSE_ON_ERROR(bulk_store_->Seal(object_id)); - ReceiveRemoteBuffers( - socket_, {object}, 0, 0, compress, - [self, object](const Status& status) -> Status { - std::string message_out; - if (status.ok()) { - WriteCreateBufferReply(object->object_id, object, -1, message_out); - } else { - // cleanup - VINEYARD_DISCARD(self->bulk_store_->Delete(object->object_id)); - WriteErrorReply(status, message_out); - } - self->doWrite(message_out); - LOG_SUMMARY("instances_memory_usage_bytes", - self->server_ptr_->instance_id(), - self->bulk_store_->Footprint()); - return Status::OK(); - }); + auto callback = [self, this, compress, + object](const Status& status) -> Status { + ReceiveRemoteBuffers( + socket_, {object}, 0, 0, compress, + [self, object](const Status& status) -> Status { + std::string message_out; + if (status.ok()) { + WriteCreateBufferReply(object->object_id, object, -1, message_out); + } else { + // cleanup + VINEYARD_DISCARD(self->bulk_store_->Delete(object->object_id)); + WriteErrorReply(status, message_out); + } + self->doWrite(message_out); + return Status::OK(); + }); + LOG_SUMMARY("instances_memory_usage_bytes", + self->server_ptr_->instance_id(), + self->bulk_store_->Footprint()); + return Status::OK(); + }; + + // ok to continue + std::string message_out; + WriteCreateBufferReply(object->object_id, object, -1, message_out); + self->doWrite(message_out, callback, true); return false; } @@ -1561,7 +1570,8 @@ void SocketConnection::doWrite(const std::string& buf) { doAsyncWrite(std::move(to_send)); } -void SocketConnection::doWrite(const std::string& buf, callback_t<> callback) { +void SocketConnection::doWrite(const std::string& buf, callback_t<> callback, + const bool partial) { std::string to_send; size_t length = buf.size(); to_send.resize(length + sizeof(size_t)); @@ -1569,7 +1579,7 @@ void SocketConnection::doWrite(const std::string& buf, callback_t<> callback) { memcpy(ptr, &length, sizeof(size_t)); ptr += sizeof(size_t); memcpy(ptr, buf.data(), length); - doAsyncWrite(std::move(to_send), callback); + doAsyncWrite(std::move(to_send), callback, partial); } void SocketConnection::doWrite(std::string&& buf) { @@ -1598,16 +1608,23 @@ void SocketConnection::doAsyncWrite(std::string&& buf) { }); } -void SocketConnection::doAsyncWrite(std::string&& buf, callback_t<> callback) { +void SocketConnection::doAsyncWrite(std::string&& buf, callback_t<> callback, + const bool partial) { std::shared_ptr payload = std::make_shared(std::move(buf)); auto self(shared_from_this()); asio::async_write(socket_, boost::asio::buffer(payload->data(), payload->length()), - [this, self, payload, callback]( + [this, self, payload, callback, partial]( boost::system::error_code ec, std::size_t) { - if (!ec && callback(Status::OK()).ok()) { - doReadHeader(); + if (!ec) { + if (callback(Status::OK()).ok()) { + if (!partial) { + doReadHeader(); + } + } else { + doStop(); + } } else { doStop(); } diff --git a/src/server/async/socket_server.h b/src/server/async/socket_server.h index 52671d3ddd..bd95e951b3 100644 --- a/src/server/async/socket_server.h +++ b/src/server/async/socket_server.h @@ -169,7 +169,8 @@ class SocketConnection : public std::enable_shared_from_this { void doWrite(std::string&& buf); - void doWrite(const std::string& buf, callback_t<> callback); + void doWrite(const std::string& buf, callback_t<> callback, + const bool partial = false); /** * Being called when the encounter a socket error (in read/write), or by @@ -181,7 +182,8 @@ class SocketConnection : public std::enable_shared_from_this { void doAsyncWrite(std::string&& buf); - void doAsyncWrite(std::string&& buf, callback_t<> callback); + void doAsyncWrite(std::string&& buf, callback_t<> callback, + const bool partial = false); void switchSession(std::shared_ptr& session) { this->server_ptr_ = session;