Skip to content

Commit

Permalink
Format code and add release python api.
Browse files Browse the repository at this point in the history
Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji committed Aug 21, 2024
1 parent bb9f6dc commit 99927cb
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 222 deletions.
16 changes: 16 additions & 0 deletions python/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,22 @@ void bind_client(py::module& mod) {
throw_on_error(self->Fork(*client));
return client;
})
.def(
"release_object",
[](Client* self, ObjectIDWrapper const& object_id) {
self->Release(object_id);
},
"object_id"_a, doc::IPCClient_release_object)
.def(
"release_objects",
[](Client* self, std::vector<ObjectID> const& object_ids) {
std::vector<ObjectID> unwrapped_object_ids(object_ids.size());
for (size_t idx = 0; idx < object_ids.size(); ++idx) {
unwrapped_object_ids[idx] = object_ids[idx];
}
return self->Release(object_ids);
},
"object_ids"_a, doc::IPCClient_release_objects)
.def("__enter__", [](Client* self) { return self; })
.def("__exit__", [](Client* self, py::object, py::object, py::object) {
// DO NOTHING
Expand Down
26 changes: 26 additions & 0 deletions python/pybind11_docs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,32 @@ Find the corresponding blob (if exists) of the given pointer.
ObjectID
)doc";

const char* IPCClient_release_object = R"doc(
.. method:: release_object(object_id: ObjectID) -> None
:noindex:
Release the object's ref count in the client side, which means the object can be
spilled to the disk from the server.
Parameters:
object_id: ObjectID
The object id to release.
)doc";

const char* IPCClient_release_objects = R"doc(
.. method:: release_objects(object_ids: List[ObjectID]) -> None
:noindex:
Release the objects' ref count in the client side, which means these objects can be
spilled to the disk from the server.
Parameters:
object_ids: List[ObjectID]
The object id list to release.
)doc";

const char* IPCClient_close = R"doc(
Close the client.
)doc";
Expand Down
2 changes: 2 additions & 0 deletions python/pybind11_docs.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ extern const char* IPCClient_list_metadatas;
extern const char* IPCClient_allocated_size;
extern const char* IPCClient_is_shared_memory;
extern const char* IPCClient_find_shared_memory;
extern const char* IPCClient_release_object;
extern const char* IPCClient_release_objects;
extern const char* IPCClient_close;

extern const char* RPCClient;
Expand Down
10 changes: 10 additions & 0 deletions python/vineyard/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,16 @@ def get_metas(
metas.append(self.get_meta(object_id, sync_remote))
return metas

@_apply_docstring(IPCClient.release_object)
def release_object(self, object_id: ObjectID) -> None:

Check notice on line 626 in python/vineyard/core/client.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

python/vineyard/core/client.py#L626

Either all return statements in a function should return an expression, or none of them should.
if self.has_ipc_client():
return self._ipc_client.release_object(object_id)

@_apply_docstring(IPCClient.release_objects)
def release_objects(self, object_ids: List[ObjectID]) -> None:

Check notice on line 631 in python/vineyard/core/client.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

python/vineyard/core/client.py#L631

Either all return statements in a function should return an expression, or none of them should.
if self.has_ipc_client():
return self._ipc_client.release_objects(object_ids)

@_apply_docstring(IPCClient.list_objects)
def list_objects(
self, pattern: str, regex: bool = False, limit: int = 5
Expand Down
8 changes: 7 additions & 1 deletion src/client/ds/i_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.

#include "client/client.h"
#include "client/client_base.h"
#include "client/ds/blob.h"

namespace vineyard {

Expand Down Expand Up @@ -70,7 +71,12 @@ std::shared_ptr<Object> ObjectBuilder::Seal(Client& client) {

Status ObjectBuilder::Seal(Client& client, std::shared_ptr<Object>& object) {
RETURN_ON_ERROR(this->_Seal(client, object));
return client.PostSeal(object->meta());
auto const& meta = object->meta();
RETURN_ON_ERROR(client.PostSeal(object->meta()));
for (auto const& buffer_id : meta.GetBufferSet()->AllBufferIds()) {
RETURN_ON_ERROR(client.Release(buffer_id));
}
return Status::OK();
}

std::shared_ptr<Object> ObjectBuilder::_Seal(Client& client) {
Expand Down
28 changes: 8 additions & 20 deletions src/server/async/socket_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,8 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) {
ReceiveRemoteBuffers(
socket_, {object}, compress,
[self, object](const Status& status) -> Status {
self->bulk_store_->RemoveDependency(object->object_id,
self->getConnId());
VINEYARD_DISCARD(self->bulk_store_->RemoveDependency(
object->object_id, self->getConnId()));
std::string message_out;
if (status.ok()) {
WriteCreateBufferReply(object->object_id, object, -1,
Expand Down Expand Up @@ -762,10 +762,10 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) {
ReceiveRemoteBuffers(
socket_, objects, compress,
[self, object_ids, objects](const Status& status) -> Status {
self->bulk_store_->RemoveDependency(
VINEYARD_DISCARD(self->bulk_store_->RemoveDependency(
std::unordered_set<ObjectID>(object_ids.begin(),
object_ids.end()),
self->getConnId());
self->getConnId()));
std::string message_out;
if (status.ok()) {
WriteCreateBuffersReply(object_ids, objects, std::vector<int>{},
Expand Down Expand Up @@ -838,7 +838,8 @@ bool SocketConnection::doGetRemoteBuffers(const json& root) {
return Status::OK();
});
std::unordered_set<ObjectID> ids_set(ids.begin(), ids.end());
self->bulk_store_->RemoveDependency(ids_set, self->getConnId());
VINEYARD_DISCARD(
self->bulk_store_->RemoveDependency(ids_set, self->getConnId()));
return Status::OK();
});
} else {
Expand Down Expand Up @@ -1880,28 +1881,15 @@ bool SocketConnection::doReleaseBlobsWithRDMA(const json& root) {
std::vector<ObjectID> ids;
TRY_READ_REQUEST(ReadReleaseBlobsWithRDMARequest, root, ids);

<<<<<<< HEAD
this->UnlockTransmissionObjects(ids);
std::string message_out;
WriteReleaseBlobsWithRDMAReply(message_out);
this->doWrite(message_out);
||||||| constructed merge base
boost::asio::post(server_ptr_->GetIOContext(), [self, ids]() {
self->server_ptr_->UnlockTransmissionObjects(ids);
std::string message_out;
WriteReleaseBlobsWithRDMAReply(message_out);
self->doWrite(message_out);
});
=======
boost::asio::post(server_ptr_->GetIOContext(), [self, ids]() {
self->server_ptr_->UnlockTransmissionObjects(ids);
std::unordered_set<ObjectID> id_set(ids.begin(), ids.end());
self->bulk_store_->RemoveDependency(id_set, self->getConnId());
VINEYARD_DISCARD(
self->bulk_store_->RemoveDependency(id_set, self->getConnId()));
std::string message_out;
WriteReleaseBlobsWithRDMAReply(message_out);
self->doWrite(message_out);
});
>>>>>>> Prevent spilling object from migrating or ipc accessing.

return false;
}
Expand Down
35 changes: 0 additions & 35 deletions src/server/memory/usage.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ class ColdObjectTracker

void Ref(const ID id, const std::shared_ptr<P>& payload) {
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
std::cout << "Thread " << std::this_thread::get_id()
<< " in Ref, RefPayload:" << id << std::endl;
auto it = ref_list_iter_map_.find(id);
if (it == ref_list_iter_map_.end()) {
ref_list_.emplace_front(id, payload);
Expand Down Expand Up @@ -277,9 +275,6 @@ class ColdObjectTracker
auto status = Status::OK();
auto it = ref_list_.rbegin();
std::map<ObjectID, std::shared_ptr<Payload>> pinned_objects;
std::cout << "Thread " << std::this_thread::get_id()
<< " in SpillFor, ref_list_ size:" << ref_list_.size()
<< std::endl;
while (it != ref_list_.rend()) {
if (it->second->IsPinned()) {
// bypass pinned
Expand All @@ -288,19 +283,11 @@ class ColdObjectTracker
continue;
}

std::cout << "Thread " << std::this_thread::get_id()
<< " in SpillFor, SpillPayload:" << it->first << std::endl;
auto s = this->spill(it->first, it->second, bulk_store);
if (s.ok()) {
std::cout << "Thread " << std::this_thread::get_id()
<< ", SpillPayload :" << it->first << " success"
<< std::endl;
spilled_sz += it->second->data_size;
ref_list_iter_map_.erase(it->first);
} else if (s.IsObjectSpilled()) {
std::cout << "Thread " << std::this_thread::get_id()
<< ", SpillPayload :" << it->first << " already spilled"
<< std::endl;
ref_list_iter_map_.erase(it->first);
} else {
status += s;
Expand Down Expand Up @@ -376,8 +363,6 @@ class ColdObjectTracker
return Status::ObjectSpilled(object_id);
}
spilled_obj_.emplace(object_id, payload);
std::cout << "Thread " << std::this_thread::get_id()
<< " in spill, SpillPayload:" << object_id << std::endl;
return bulk_store->SpillPayload(payload);
}

Expand All @@ -397,8 +382,6 @@ class ColdObjectTracker
spilled_obj_.erase(loc);
}
}
std::cout << "Thread " << std::this_thread::get_id()
<< " in reload, ReloadPayload:" << object_id << std::endl;
return bulk_store->ReloadPayload(object_id, payload);
}

Expand Down Expand Up @@ -590,12 +573,6 @@ class ColdObjectTracker
std::lock_guard<decltype(this->cold_obj_lru_.lru_field_mu_)> locked(
this->cold_obj_lru_.lru_field_mu_);
uint8_t* pointer = nullptr;
std::cout << "Thread " << std::this_thread::get_id()
<< " before AllocateMemoryWithSpill;"
<< "size:" << size << "BulkAllocator::GetFootprintLimit():"
<< BulkAllocator::GetFootprintLimit()
<< "BulkAllocator::Allocated():" << BulkAllocator::Allocated()
<< std::endl;
pointer = self().AllocateMemory(size, fd, map_size, offset);
// no spill will be conducted
if (spill_path_.empty()) {
Expand All @@ -610,12 +587,6 @@ class ColdObjectTracker
BulkAllocator::Allocated() >= self().mem_spill_upper_bound_) {
int64_t min_spill_size = 0;
if (pointer == nullptr) {
std::cout << "Thread " << std::this_thread::get_id()
<< "pointer is nullptr;"
<< "size:" << size << "BulkAllocator::GetFootprintLimit():"
<< BulkAllocator::GetFootprintLimit()
<< "BulkAllocator::Allocated():" << BulkAllocator::Allocated()
<< std::endl;
min_spill_size = size - (BulkAllocator::GetFootprintLimit() -
BulkAllocator::Allocated());
}
Expand All @@ -625,13 +596,8 @@ class ColdObjectTracker
std::max(min_spill_size, BulkAllocator::Allocated() -
self().mem_spill_lower_bound_);
}
std::cout << "Thread " << std::this_thread::get_id()
<< "min_spill_size:" << min_spill_size << std::endl;
auto s = SpillColdObjectFor(min_spill_size);
if (!s.ok()) {
std::cout << "Thread " << std::this_thread::get_id()
<< " Error during spilling cold object: " << s.ToString()
<< std::endl;
DLOG(ERROR) << "Error during spilling cold object: " << s.ToString();
}

Expand Down Expand Up @@ -719,7 +685,6 @@ class ColdObjectTracker

lru_t cold_obj_lru_;
std::string spill_path_;
std::mutex spill_mu_;
};

} // namespace detail
Expand Down
19 changes: 2 additions & 17 deletions src/server/util/spill_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,14 @@ Status SpillFileWriter::Write(const std::shared_ptr<Payload>& payload) {
return Status::IOError("Can't open io_adaptor");
}
RETURN_ON_ERROR(io_adaptor_->Open("w"));
std::cout << "Thread " << std::this_thread::get_id()
<< " writing object_id: " << payload->object_id << std::endl;
RETURN_ON_ERROR(
io_adaptor_->Write(reinterpret_cast<char*>(&(payload->object_id)),
sizeof(payload->object_id)));
std::cout << "Thread " << std::this_thread::get_id()
<< " writing data_size: " << payload->data_size << std::endl;
RETURN_ON_ERROR(
io_adaptor_->Write(reinterpret_cast<char*>(&(payload->data_size)),
sizeof(payload->data_size)));
std::cout << "Thread " << std::this_thread::get_id()
<< " writing content: " << payload->pointer << std::endl;
auto status = io_adaptor_->Write(
reinterpret_cast<const char*>(payload->pointer), payload->data_size);
return status;
return io_adaptor_->Write(reinterpret_cast<const char*>(payload->pointer),
payload->data_size);
}

Status SpillFileWriter::Sync() {
Expand Down Expand Up @@ -87,8 +80,6 @@ Status SpillFileReader::Read(const std::shared_ptr<Payload>& payload,
RETURN_ON_ERROR(io_adaptor_->Open());
{
ObjectID object_id = InvalidObjectID();
std::cout << "Thread " << std::this_thread::get_id()
<< " reading object_id: " << payload->object_id << std::endl;
RETURN_ON_ERROR(io_adaptor_->Read(&object_id, sizeof(object_id)));
if (payload->object_id != object_id) {
return Status::IOError(
Expand All @@ -98,20 +89,14 @@ Status SpillFileReader::Read(const std::shared_ptr<Payload>& payload,
}
{
int64_t data_size = std::numeric_limits<int64_t>::min();
std::cout << "Thread " << std::this_thread::get_id()
<< " reading data_size: " << data_size << std::endl;
RETURN_ON_ERROR(io_adaptor_->Read(&data_size, sizeof(data_size)));
if (payload->data_size != data_size) {
return Status::IOError(
"Incorrect 'data_size': opening wrong file: " + spill_path_ +
ObjectIDToString(payload->object_id));
}
}
std::cout << "Thread " << std::this_thread::get_id()
<< " reading content: " << payload->pointer << std::endl;
RETURN_ON_ERROR(io_adaptor_->Read(payload->pointer, payload->data_size));
std::cout << "Thread " << std::this_thread::get_id()
<< " is deleting object_id: " << payload->object_id << std::endl;
RETURN_ON_ERROR(Delete_(payload->object_id));
io_adaptor_ = nullptr;
return Status::OK();
Expand Down
34 changes: 0 additions & 34 deletions src/server/util/spill_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,6 @@ limitations under the License.
namespace vineyard {
namespace io {

class FileLocker {
public:
static FileLocker& getInstance() {
static FileLocker instance;
return instance;
}

void lockForWrite(const ObjectID& id) {
std::unique_lock<std::shared_mutex> mapLock(mapMutex_);
fileLocks_[id].lock();
}

void unlockForWrite(const ObjectID& id) {
std::unique_lock<std::shared_mutex> mapLock(mapMutex_);
fileLocks_[id].unlock();
}

void lockForRead(const ObjectID& id) {
std::unique_lock<std::shared_mutex> mapLock(mapMutex_);
fileLocks_[id].lock_shared();
}

void unlockForRead(const ObjectID& id) {
std::unique_lock<std::shared_mutex> mapLock(mapMutex_);
fileLocks_[id].unlock_shared();
}

private:
FileLocker() = default;
std::shared_mutex mapMutex_;
std::unordered_map<ObjectID, std::shared_mutex> fileLocks_;
};

/*
For each spilled file, the disk-format is:
- object_id: uint64
Expand Down Expand Up @@ -94,7 +61,6 @@ class SpillFileWriter {
Status Init(const ObjectID object_id);

std::string spill_path_;
std::mutex io_mutex_;
std::unique_ptr<FileIOAdaptor> io_adaptor_ = nullptr;
};

Expand Down
Loading

0 comments on commit 99927cb

Please sign in to comment.