Skip to content

Commit

Permalink
Fixes remote blob creation error when OOM (#1433)
Browse files Browse the repository at this point in the history
Fixes #1430

Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow authored Jun 27, 2023
1 parent 31a3e34 commit 56ff90d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 25 deletions.
8 changes: 8 additions & 0 deletions src/client/rpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,14 @@ Status RPCClient::CreateRemoteBlob(
std::string message_out;
WriteCreateRemoteBufferRequest(buffer->size(), !!compressor, message_out);
RETURN_ON_ERROR(doWrite(message_out));

// receive a confirm to continue
{
json message_in;
RETURN_ON_ERROR(doRead(message_in));
RETURN_ON_ERROR(ReadCreateBufferReply(message_in, id, payload, fd_sent));
}

// send the actual payload
if (compressor && buffer->size() > 0) {
RETURN_ON_ERROR(detail::compress_and_send(compressor, vineyard_conn_,
Expand Down
63 changes: 40 additions & 23 deletions src/server/async/socket_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -572,23 +572,32 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) {
RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object));
RESPONSE_ON_ERROR(bulk_store_->Seal(object_id));

ReceiveRemoteBuffers(
socket_, {object}, 0, 0, compress,
[self, object](const Status& status) -> Status {
std::string message_out;
if (status.ok()) {
WriteCreateBufferReply(object->object_id, object, -1, message_out);
} else {
// cleanup
VINEYARD_DISCARD(self->bulk_store_->Delete(object->object_id));
WriteErrorReply(status, message_out);
}
self->doWrite(message_out);
LOG_SUMMARY("instances_memory_usage_bytes",
self->server_ptr_->instance_id(),
self->bulk_store_->Footprint());
return Status::OK();
});
auto callback = [self, this, compress,
object](const Status& status) -> Status {
ReceiveRemoteBuffers(
socket_, {object}, 0, 0, compress,
[self, object](const Status& status) -> Status {
std::string message_out;
if (status.ok()) {
WriteCreateBufferReply(object->object_id, object, -1, message_out);
} else {
// cleanup
VINEYARD_DISCARD(self->bulk_store_->Delete(object->object_id));
WriteErrorReply(status, message_out);
}
self->doWrite(message_out);
return Status::OK();
});
LOG_SUMMARY("instances_memory_usage_bytes",
self->server_ptr_->instance_id(),
self->bulk_store_->Footprint());
return Status::OK();
};

// ok to continue
std::string message_out;
WriteCreateBufferReply(object->object_id, object, -1, message_out);
self->doWrite(message_out, callback, true);
return false;
}

Expand Down Expand Up @@ -1561,15 +1570,16 @@ void SocketConnection::doWrite(const std::string& buf) {
doAsyncWrite(std::move(to_send));
}

void SocketConnection::doWrite(const std::string& buf, callback_t<> callback) {
void SocketConnection::doWrite(const std::string& buf, callback_t<> callback,
const bool partial) {
std::string to_send;
size_t length = buf.size();
to_send.resize(length + sizeof(size_t));
char* ptr = &to_send[0];
memcpy(ptr, &length, sizeof(size_t));
ptr += sizeof(size_t);
memcpy(ptr, buf.data(), length);
doAsyncWrite(std::move(to_send), callback);
doAsyncWrite(std::move(to_send), callback, partial);
}

void SocketConnection::doWrite(std::string&& buf) {
Expand Down Expand Up @@ -1598,16 +1608,23 @@ void SocketConnection::doAsyncWrite(std::string&& buf) {
});
}

void SocketConnection::doAsyncWrite(std::string&& buf, callback_t<> callback) {
void SocketConnection::doAsyncWrite(std::string&& buf, callback_t<> callback,
const bool partial) {
std::shared_ptr<std::string> payload =
std::make_shared<std::string>(std::move(buf));
auto self(shared_from_this());
asio::async_write(socket_,
boost::asio::buffer(payload->data(), payload->length()),
[this, self, payload, callback](
[this, self, payload, callback, partial](
boost::system::error_code ec, std::size_t) {
if (!ec && callback(Status::OK()).ok()) {
doReadHeader();
if (!ec) {
if (callback(Status::OK()).ok()) {
if (!partial) {
doReadHeader();
}
} else {
doStop();
}
} else {
doStop();
}
Expand Down
6 changes: 4 additions & 2 deletions src/server/async/socket_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ class SocketConnection : public std::enable_shared_from_this<SocketConnection> {

void doWrite(std::string&& buf);

void doWrite(const std::string& buf, callback_t<> callback);
void doWrite(const std::string& buf, callback_t<> callback,
const bool partial = false);

/**
* Being called when the encounter a socket error (in read/write), or by
Expand All @@ -181,7 +182,8 @@ class SocketConnection : public std::enable_shared_from_this<SocketConnection> {

void doAsyncWrite(std::string&& buf);

void doAsyncWrite(std::string&& buf, callback_t<> callback);
void doAsyncWrite(std::string&& buf, callback_t<> callback,
const bool partial = false);

void switchSession(std::shared_ptr<VineyardServer>& session) {
this->server_ptr_ = session;
Expand Down

0 comments on commit 56ff90d

Please sign in to comment.