Skip to content

Commit

Permalink
Support to spill objects with LRU strategy by vineyardd itself. (#1983)
Browse files Browse the repository at this point in the history
Fixes #1972

Signed-off-by: Ye Cao <[email protected]>
Signed-off-by: vegetableysm <[email protected]>
Co-authored-by: vegetableysm <[email protected]>
  • Loading branch information
dashanji and vegetableysm authored Sep 2, 2024
1 parent da21407 commit 19badf1
Show file tree
Hide file tree
Showing 16 changed files with 967 additions and 71 deletions.
16 changes: 16 additions & 0 deletions python/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,22 @@ void bind_client(py::module& mod) {
throw_on_error(self->Fork(*client));
return client;
})
.def(
"release_object",
[](Client* self, ObjectIDWrapper const& object_id) {
self->Release(object_id);
},
"object_id"_a, doc::IPCClient_release_object)
.def(
"release_objects",
[](Client* self, std::vector<ObjectID> const& object_ids) {
std::vector<ObjectID> unwrapped_object_ids(object_ids.size());
for (size_t idx = 0; idx < object_ids.size(); ++idx) {
unwrapped_object_ids[idx] = object_ids[idx];
}
return self->Release(object_ids);
},
"object_ids"_a, doc::IPCClient_release_objects)
.def("__enter__", [](Client* self) { return self; })
.def("__exit__", [](Client* self, py::object, py::object, py::object) {
// DO NOTHING
Expand Down
26 changes: 26 additions & 0 deletions python/pybind11_docs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,32 @@ Find the corresponding blob (if exists) of the given pointer.
ObjectID
)doc";

const char* IPCClient_release_object = R"doc(
.. method:: release_object(object_id: ObjectID) -> None
:noindex:
Release the object's ref count in the client side, which means the object can be
spilled to the disk from the server.
Parameters:
object_id: ObjectID
The object id to release.
)doc";

const char* IPCClient_release_objects = R"doc(
.. method:: release_objects(object_ids: List[ObjectID]) -> None
:noindex:
Release the objects' ref count in the client side, which means these objects can be
spilled to the disk from the server.
Parameters:
object_ids: List[ObjectID]
The object id list to release.
)doc";

const char* IPCClient_close = R"doc(
Close the client.
)doc";
Expand Down
2 changes: 2 additions & 0 deletions python/pybind11_docs.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ extern const char* IPCClient_list_metadatas;
extern const char* IPCClient_allocated_size;
extern const char* IPCClient_is_shared_memory;
extern const char* IPCClient_find_shared_memory;
extern const char* IPCClient_release_object;
extern const char* IPCClient_release_objects;
extern const char* IPCClient_close;

extern const char* RPCClient;
Expand Down
10 changes: 10 additions & 0 deletions python/vineyard/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,16 @@ def get_metas(
metas.append(self.get_meta(object_id, sync_remote))
return metas

@_apply_docstring(IPCClient.release_object)
def release_object(self, object_id: ObjectID) -> None:
if self.has_ipc_client():
self._ipc_client.release_object(object_id)

@_apply_docstring(IPCClient.release_objects)
def release_objects(self, object_ids: List[ObjectID]) -> None:
if self.has_ipc_client():
self._ipc_client.release_objects(object_ids)

@_apply_docstring(IPCClient.list_objects)
def list_objects(
self, pattern: str, regex: bool = False, limit: int = 5
Expand Down
1 change: 1 addition & 0 deletions src/client/ds/blob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ Status BufferSet::EmplaceBuffer(ObjectID const id,
void BufferSet::Extend(BufferSet const& others) {
for (auto const& kv : others.buffers_) {
buffers_.emplace(kv.first, kv.second);
buffer_ids_.emplace(kv.first);
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/client/ds/i_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.

#include "client/client.h"
#include "client/client_base.h"
#include "client/ds/blob.h"

namespace vineyard {

Expand Down Expand Up @@ -70,7 +71,12 @@ std::shared_ptr<Object> ObjectBuilder::Seal(Client& client) {

Status ObjectBuilder::Seal(Client& client, std::shared_ptr<Object>& object) {
RETURN_ON_ERROR(this->_Seal(client, object));
return client.PostSeal(object->meta());
auto const& meta = object->meta();
RETURN_ON_ERROR(client.PostSeal(object->meta()));
for (auto const& buffer_id : meta.GetBufferSet()->AllBufferIds()) {
RETURN_ON_ERROR(client.Release(buffer_id));
}
return Status::OK();
}

std::shared_ptr<Object> ObjectBuilder::_Seal(Client& client) {
Expand Down
38 changes: 33 additions & 5 deletions src/server/async/socket_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,13 @@ bool SocketConnection::doCreateBuffers(const json& root) {
for (auto const& size : sizes) {
ObjectID object_id;
std::shared_ptr<Payload> object;
RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object));
Status status = bulk_store_->Create(size, object_id, object);
if (!status.ok()) {
for (auto const& object : objects) {
bulk_store_->Delete(object->id());
}
RESPONSE_ON_ERROR(status);
}
object_ids.emplace_back(object_id);
objects.emplace_back(object);
}
Expand Down Expand Up @@ -570,6 +576,12 @@ bool SocketConnection::doGetBuffers(const json& root) {
RESPONSE_ON_ERROR(bulk_store_->GetUnsafe(ids, unsafe, objects));
RESPONSE_ON_ERROR(bulk_store_->AddDependency(
std::unordered_set<ObjectID>(ids.begin(), ids.end()), this->getConnId()));
for (size_t i = 0; i < objects.size(); ++i) {
if (objects[i]->pointer == nullptr) {
RESPONSE_ON_ERROR(
bulk_store_->ReloadColdObject(ids[i], objects[i], false));
}
}

std::vector<int> fd_to_send;
for (auto object : objects) {
Expand Down Expand Up @@ -679,6 +691,7 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) {
ObjectID object_id;
RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object));
RESPONSE_ON_ERROR(bulk_store_->Seal(object_id));
RESPONSE_ON_ERROR(bulk_store_->AddDependency(object_id, this->getConnId()));

if (use_rdma) {
std::string message_out;
Expand All @@ -690,6 +703,8 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) {
ReceiveRemoteBuffers(
socket_, {object}, compress,
[self, object](const Status& status) -> Status {
VINEYARD_DISCARD(self->bulk_store_->RemoveDependency(
object->object_id, self->getConnId()));
std::string message_out;
if (status.ok()) {
WriteCreateBufferReply(object->object_id, object, -1,
Expand Down Expand Up @@ -731,6 +746,7 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) {
std::shared_ptr<Payload> object;
RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object));
RESPONSE_ON_ERROR(bulk_store_->Seal(object_id));
RESPONSE_ON_ERROR(bulk_store_->AddDependency(object_id, this->getConnId()));
object_ids.emplace_back(object_id);
objects.emplace_back(object);
}
Expand All @@ -746,6 +762,10 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) {
ReceiveRemoteBuffers(
socket_, objects, compress,
[self, object_ids, objects](const Status& status) -> Status {
VINEYARD_DISCARD(self->bulk_store_->RemoveDependency(
std::unordered_set<ObjectID>(object_ids.begin(),
object_ids.end()),
self->getConnId()));
std::string message_out;
if (status.ok()) {
WriteCreateBuffersReply(object_ids, objects, std::vector<int>{},
Expand Down Expand Up @@ -817,6 +837,9 @@ bool SocketConnection::doGetRemoteBuffers(const json& root) {
self->UnlockTransmissionObjects(ids);
return Status::OK();
});
std::unordered_set<ObjectID> ids_set(ids.begin(), ids.end());
VINEYARD_DISCARD(
self->bulk_store_->RemoveDependency(ids_set, self->getConnId()));
return Status::OK();
});
} else {
Expand Down Expand Up @@ -1858,10 +1881,15 @@ bool SocketConnection::doReleaseBlobsWithRDMA(const json& root) {
std::vector<ObjectID> ids;
TRY_READ_REQUEST(ReadReleaseBlobsWithRDMARequest, root, ids);

this->UnlockTransmissionObjects(ids);
std::string message_out;
WriteReleaseBlobsWithRDMAReply(message_out);
this->doWrite(message_out);
boost::asio::post(server_ptr_->GetIOContext(), [self, ids]() {
self->server_ptr_->UnlockTransmissionObjects(ids);
std::unordered_set<ObjectID> id_set(ids.begin(), ids.end());
VINEYARD_DISCARD(
self->bulk_store_->RemoveDependency(id_set, self->getConnId()));
std::string message_out;
WriteReleaseBlobsWithRDMAReply(message_out);
self->doWrite(message_out);
});

return false;
}
Expand Down
Loading

0 comments on commit 19badf1

Please sign in to comment.