From 99927cb6f8194033ee3258df1525ebb7c90eb325 Mon Sep 17 00:00:00 2001 From: Ye Cao Date: Wed, 21 Aug 2024 14:48:53 +0800 Subject: [PATCH] Format code and add release python api. Signed-off-by: Ye Cao --- python/client.cc | 16 ++++ python/pybind11_docs.cc | 26 ++++++ python/pybind11_docs.h | 2 + python/vineyard/core/client.py | 10 +++ src/client/ds/i_object.cc | 8 +- src/server/async/socket_server.cc | 28 ++----- src/server/memory/usage.h | 35 -------- src/server/util/spill_file.cc | 19 +---- src/server/util/spill_file.h | 34 -------- test/concurrent_lru_spill_test.cc | 129 +++++++++++++----------------- test/lru_spill_test.cc | 13 +++ test/release_test.cc | 19 ++--- test/runner.py | 20 ++++- test/spill_test.cc | 43 ++++------ 14 files changed, 180 insertions(+), 222 deletions(-) diff --git a/python/client.cc b/python/client.cc index 797b7fe21c..6f0f5fc62a 100644 --- a/python/client.cc +++ b/python/client.cc @@ -826,6 +826,22 @@ void bind_client(py::module& mod) { throw_on_error(self->Fork(*client)); return client; }) + .def( + "release_object", + [](Client* self, ObjectIDWrapper const& object_id) { + self->Release(object_id); + }, + "object_id"_a, doc::IPCClient_release_object) + .def( + "release_objects", + [](Client* self, std::vector const& object_ids) { + std::vector unwrapped_object_ids(object_ids.size()); + for (size_t idx = 0; idx < object_ids.size(); ++idx) { + unwrapped_object_ids[idx] = object_ids[idx]; + } + return self->Release(object_ids); + }, + "object_ids"_a, doc::IPCClient_release_objects) .def("__enter__", [](Client* self) { return self; }) .def("__exit__", [](Client* self, py::object, py::object, py::object) { // DO NOTHING diff --git a/python/pybind11_docs.cc b/python/pybind11_docs.cc index 1c3cd43e40..792b083861 100644 --- a/python/pybind11_docs.cc +++ b/python/pybind11_docs.cc @@ -1102,6 +1102,32 @@ Find the corresponding blob (if exists) of the given pointer. ObjectID )doc"; +const char* IPCClient_release_object = R"doc( +.. method:: release_object(object_id: ObjectID) -> None + :noindex: + +Release the object's ref count in the client side, which means the object can be +spilled to the disk from the server. + +Parameters: + object_id: ObjectID + The object id to release. + +)doc"; + +const char* IPCClient_release_objects = R"doc( +.. method:: release_objects(object_ids: List[ObjectID]) -> None + :noindex: + +Release the objects' ref count in the client side, which means these objects can be +spilled to the disk from the server. + +Parameters: + object_ids: List[ObjectID] + The object id list to release. + +)doc"; + const char* IPCClient_close = R"doc( Close the client. )doc"; diff --git a/python/pybind11_docs.h b/python/pybind11_docs.h index 0700843201..68dad8d450 100644 --- a/python/pybind11_docs.h +++ b/python/pybind11_docs.h @@ -129,6 +129,8 @@ extern const char* IPCClient_list_metadatas; extern const char* IPCClient_allocated_size; extern const char* IPCClient_is_shared_memory; extern const char* IPCClient_find_shared_memory; +extern const char* IPCClient_release_object; +extern const char* IPCClient_release_objects; extern const char* IPCClient_close; extern const char* RPCClient; diff --git a/python/vineyard/core/client.py b/python/vineyard/core/client.py index dcc78dc151..09caa6048f 100644 --- a/python/vineyard/core/client.py +++ b/python/vineyard/core/client.py @@ -622,6 +622,16 @@ def get_metas( metas.append(self.get_meta(object_id, sync_remote)) return metas + @_apply_docstring(IPCClient.release_object) + def release_object(self, object_id: ObjectID) -> None: + if self.has_ipc_client(): + return self._ipc_client.release_object(object_id) + + @_apply_docstring(IPCClient.release_objects) + def release_objects(self, object_ids: List[ObjectID]) -> None: + if self.has_ipc_client(): + return self._ipc_client.release_objects(object_ids) + @_apply_docstring(IPCClient.list_objects) def list_objects( self, pattern: str, regex: bool = False, limit: int = 5 diff --git a/src/client/ds/i_object.cc b/src/client/ds/i_object.cc index cfe116a6b8..82083af838 100644 --- a/src/client/ds/i_object.cc +++ b/src/client/ds/i_object.cc @@ -17,6 +17,7 @@ limitations under the License. #include "client/client.h" #include "client/client_base.h" +#include "client/ds/blob.h" namespace vineyard { @@ -70,7 +71,12 @@ std::shared_ptr ObjectBuilder::Seal(Client& client) { Status ObjectBuilder::Seal(Client& client, std::shared_ptr& object) { RETURN_ON_ERROR(this->_Seal(client, object)); - return client.PostSeal(object->meta()); + auto const& meta = object->meta(); + RETURN_ON_ERROR(client.PostSeal(object->meta())); + for (auto const& buffer_id : meta.GetBufferSet()->AllBufferIds()) { + RETURN_ON_ERROR(client.Release(buffer_id)); + } + return Status::OK(); } std::shared_ptr ObjectBuilder::_Seal(Client& client) { diff --git a/src/server/async/socket_server.cc b/src/server/async/socket_server.cc index 5a8072c67a..95e26c616f 100644 --- a/src/server/async/socket_server.cc +++ b/src/server/async/socket_server.cc @@ -703,8 +703,8 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) { ReceiveRemoteBuffers( socket_, {object}, compress, [self, object](const Status& status) -> Status { - self->bulk_store_->RemoveDependency(object->object_id, - self->getConnId()); + VINEYARD_DISCARD(self->bulk_store_->RemoveDependency( + object->object_id, self->getConnId())); std::string message_out; if (status.ok()) { WriteCreateBufferReply(object->object_id, object, -1, @@ -762,10 +762,10 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) { ReceiveRemoteBuffers( socket_, objects, compress, [self, object_ids, objects](const Status& status) -> Status { - self->bulk_store_->RemoveDependency( + VINEYARD_DISCARD(self->bulk_store_->RemoveDependency( std::unordered_set(object_ids.begin(), object_ids.end()), - self->getConnId()); + self->getConnId())); std::string message_out; if (status.ok()) { WriteCreateBuffersReply(object_ids, objects, std::vector{}, @@ -838,7 +838,8 @@ bool SocketConnection::doGetRemoteBuffers(const json& root) { return Status::OK(); }); std::unordered_set ids_set(ids.begin(), ids.end()); - self->bulk_store_->RemoveDependency(ids_set, self->getConnId()); + VINEYARD_DISCARD( + self->bulk_store_->RemoveDependency(ids_set, self->getConnId())); return Status::OK(); }); } else { @@ -1880,28 +1881,15 @@ bool SocketConnection::doReleaseBlobsWithRDMA(const json& root) { std::vector ids; TRY_READ_REQUEST(ReadReleaseBlobsWithRDMARequest, root, ids); -<<<<<<< HEAD - this->UnlockTransmissionObjects(ids); - std::string message_out; - WriteReleaseBlobsWithRDMAReply(message_out); - this->doWrite(message_out); -||||||| constructed merge base - boost::asio::post(server_ptr_->GetIOContext(), [self, ids]() { - self->server_ptr_->UnlockTransmissionObjects(ids); - std::string message_out; - WriteReleaseBlobsWithRDMAReply(message_out); - self->doWrite(message_out); - }); -======= boost::asio::post(server_ptr_->GetIOContext(), [self, ids]() { self->server_ptr_->UnlockTransmissionObjects(ids); std::unordered_set id_set(ids.begin(), ids.end()); - self->bulk_store_->RemoveDependency(id_set, self->getConnId()); + VINEYARD_DISCARD( + self->bulk_store_->RemoveDependency(id_set, self->getConnId())); std::string message_out; WriteReleaseBlobsWithRDMAReply(message_out); self->doWrite(message_out); }); ->>>>>>> Prevent spilling object from migrating or ipc accessing. return false; } diff --git a/src/server/memory/usage.h b/src/server/memory/usage.h index bd0c9cef6a..84ca2ede98 100644 --- a/src/server/memory/usage.h +++ b/src/server/memory/usage.h @@ -209,8 +209,6 @@ class ColdObjectTracker void Ref(const ID id, const std::shared_ptr

& payload) { std::lock_guard locked(lru_field_mu_); - std::cout << "Thread " << std::this_thread::get_id() - << " in Ref, RefPayload:" << id << std::endl; auto it = ref_list_iter_map_.find(id); if (it == ref_list_iter_map_.end()) { ref_list_.emplace_front(id, payload); @@ -277,9 +275,6 @@ class ColdObjectTracker auto status = Status::OK(); auto it = ref_list_.rbegin(); std::map> pinned_objects; - std::cout << "Thread " << std::this_thread::get_id() - << " in SpillFor, ref_list_ size:" << ref_list_.size() - << std::endl; while (it != ref_list_.rend()) { if (it->second->IsPinned()) { // bypass pinned @@ -288,19 +283,11 @@ class ColdObjectTracker continue; } - std::cout << "Thread " << std::this_thread::get_id() - << " in SpillFor, SpillPayload:" << it->first << std::endl; auto s = this->spill(it->first, it->second, bulk_store); if (s.ok()) { - std::cout << "Thread " << std::this_thread::get_id() - << ", SpillPayload :" << it->first << " success" - << std::endl; spilled_sz += it->second->data_size; ref_list_iter_map_.erase(it->first); } else if (s.IsObjectSpilled()) { - std::cout << "Thread " << std::this_thread::get_id() - << ", SpillPayload :" << it->first << " already spilled" - << std::endl; ref_list_iter_map_.erase(it->first); } else { status += s; @@ -376,8 +363,6 @@ class ColdObjectTracker return Status::ObjectSpilled(object_id); } spilled_obj_.emplace(object_id, payload); - std::cout << "Thread " << std::this_thread::get_id() - << " in spill, SpillPayload:" << object_id << std::endl; return bulk_store->SpillPayload(payload); } @@ -397,8 +382,6 @@ class ColdObjectTracker spilled_obj_.erase(loc); } } - std::cout << "Thread " << std::this_thread::get_id() - << " in reload, ReloadPayload:" << object_id << std::endl; return bulk_store->ReloadPayload(object_id, payload); } @@ -590,12 +573,6 @@ class ColdObjectTracker std::lock_guardcold_obj_lru_.lru_field_mu_)> locked( this->cold_obj_lru_.lru_field_mu_); uint8_t* pointer = nullptr; - std::cout << "Thread " << std::this_thread::get_id() - << " before AllocateMemoryWithSpill;" - << "size:" << size << "BulkAllocator::GetFootprintLimit():" - << BulkAllocator::GetFootprintLimit() - << "BulkAllocator::Allocated():" << BulkAllocator::Allocated() - << std::endl; pointer = self().AllocateMemory(size, fd, map_size, offset); // no spill will be conducted if (spill_path_.empty()) { @@ -610,12 +587,6 @@ class ColdObjectTracker BulkAllocator::Allocated() >= self().mem_spill_upper_bound_) { int64_t min_spill_size = 0; if (pointer == nullptr) { - std::cout << "Thread " << std::this_thread::get_id() - << "pointer is nullptr;" - << "size:" << size << "BulkAllocator::GetFootprintLimit():" - << BulkAllocator::GetFootprintLimit() - << "BulkAllocator::Allocated():" << BulkAllocator::Allocated() - << std::endl; min_spill_size = size - (BulkAllocator::GetFootprintLimit() - BulkAllocator::Allocated()); } @@ -625,13 +596,8 @@ class ColdObjectTracker std::max(min_spill_size, BulkAllocator::Allocated() - self().mem_spill_lower_bound_); } - std::cout << "Thread " << std::this_thread::get_id() - << "min_spill_size:" << min_spill_size << std::endl; auto s = SpillColdObjectFor(min_spill_size); if (!s.ok()) { - std::cout << "Thread " << std::this_thread::get_id() - << " Error during spilling cold object: " << s.ToString() - << std::endl; DLOG(ERROR) << "Error during spilling cold object: " << s.ToString(); } @@ -719,7 +685,6 @@ class ColdObjectTracker lru_t cold_obj_lru_; std::string spill_path_; - std::mutex spill_mu_; }; } // namespace detail diff --git a/src/server/util/spill_file.cc b/src/server/util/spill_file.cc index fffbc609f1..cc24197fdd 100644 --- a/src/server/util/spill_file.cc +++ b/src/server/util/spill_file.cc @@ -44,21 +44,14 @@ Status SpillFileWriter::Write(const std::shared_ptr& payload) { return Status::IOError("Can't open io_adaptor"); } RETURN_ON_ERROR(io_adaptor_->Open("w")); - std::cout << "Thread " << std::this_thread::get_id() - << " writing object_id: " << payload->object_id << std::endl; RETURN_ON_ERROR( io_adaptor_->Write(reinterpret_cast(&(payload->object_id)), sizeof(payload->object_id))); - std::cout << "Thread " << std::this_thread::get_id() - << " writing data_size: " << payload->data_size << std::endl; RETURN_ON_ERROR( io_adaptor_->Write(reinterpret_cast(&(payload->data_size)), sizeof(payload->data_size))); - std::cout << "Thread " << std::this_thread::get_id() - << " writing content: " << payload->pointer << std::endl; - auto status = io_adaptor_->Write( - reinterpret_cast(payload->pointer), payload->data_size); - return status; + return io_adaptor_->Write(reinterpret_cast(payload->pointer), + payload->data_size); } Status SpillFileWriter::Sync() { @@ -87,8 +80,6 @@ Status SpillFileReader::Read(const std::shared_ptr& payload, RETURN_ON_ERROR(io_adaptor_->Open()); { ObjectID object_id = InvalidObjectID(); - std::cout << "Thread " << std::this_thread::get_id() - << " reading object_id: " << payload->object_id << std::endl; RETURN_ON_ERROR(io_adaptor_->Read(&object_id, sizeof(object_id))); if (payload->object_id != object_id) { return Status::IOError( @@ -98,8 +89,6 @@ Status SpillFileReader::Read(const std::shared_ptr& payload, } { int64_t data_size = std::numeric_limits::min(); - std::cout << "Thread " << std::this_thread::get_id() - << " reading data_size: " << data_size << std::endl; RETURN_ON_ERROR(io_adaptor_->Read(&data_size, sizeof(data_size))); if (payload->data_size != data_size) { return Status::IOError( @@ -107,11 +96,7 @@ Status SpillFileReader::Read(const std::shared_ptr& payload, ObjectIDToString(payload->object_id)); } } - std::cout << "Thread " << std::this_thread::get_id() - << " reading content: " << payload->pointer << std::endl; RETURN_ON_ERROR(io_adaptor_->Read(payload->pointer, payload->data_size)); - std::cout << "Thread " << std::this_thread::get_id() - << " is deleting object_id: " << payload->object_id << std::endl; RETURN_ON_ERROR(Delete_(payload->object_id)); io_adaptor_ = nullptr; return Status::OK(); diff --git a/src/server/util/spill_file.h b/src/server/util/spill_file.h index 3db56aeaf8..bbf3a2e32b 100644 --- a/src/server/util/spill_file.h +++ b/src/server/util/spill_file.h @@ -31,39 +31,6 @@ limitations under the License. namespace vineyard { namespace io { -class FileLocker { - public: - static FileLocker& getInstance() { - static FileLocker instance; - return instance; - } - - void lockForWrite(const ObjectID& id) { - std::unique_lock mapLock(mapMutex_); - fileLocks_[id].lock(); - } - - void unlockForWrite(const ObjectID& id) { - std::unique_lock mapLock(mapMutex_); - fileLocks_[id].unlock(); - } - - void lockForRead(const ObjectID& id) { - std::unique_lock mapLock(mapMutex_); - fileLocks_[id].lock_shared(); - } - - void unlockForRead(const ObjectID& id) { - std::unique_lock mapLock(mapMutex_); - fileLocks_[id].unlock_shared(); - } - - private: - FileLocker() = default; - std::shared_mutex mapMutex_; - std::unordered_map fileLocks_; -}; - /* For each spilled file, the disk-format is: - object_id: uint64 @@ -94,7 +61,6 @@ class SpillFileWriter { Status Init(const ObjectID object_id); std::string spill_path_; - std::mutex io_mutex_; std::unique_ptr io_adaptor_ = nullptr; }; diff --git a/test/concurrent_lru_spill_test.cc b/test/concurrent_lru_spill_test.cc index 2b8da33dd8..59624d155c 100644 --- a/test/concurrent_lru_spill_test.cc +++ b/test/concurrent_lru_spill_test.cc @@ -199,7 +199,7 @@ void ConcurrentGetWithRPCClient(std::string rpc_endpoint) { } void ConcurrentGetAndPut(std::string ipc_socket, std::string rpc_endpoint) { - const int num_threads = 500; + const int num_threads = 20; const int num_objects = 500; const int array_size = 250; const int initial_objects = 100; @@ -214,12 +214,6 @@ void ConcurrentGetAndPut(std::string ipc_socket, std::string rpc_endpoint) { ArrayBuilder builder(c, double_array); auto sealed_array = std::dynamic_pointer_cast>(builder.Seal(c)); - ObjectMeta meta = sealed_array->meta(); - std::shared_ptr buffer_set = meta.GetBufferSet(); - std::set ids = buffer_set->AllBufferIds(); - for (auto id : ids) { - c.Release(id); - } return GetObjectID(sealed_array); } catch (std::exception& e) { LOG(ERROR) << e.what(); @@ -244,12 +238,10 @@ void ConcurrentGetAndPut(std::string ipc_socket, std::string rpc_endpoint) { } }; - { - Client client; - VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - for (int i = 0; i < initial_objects; i++) { - object_ids.push_back(create_and_seal_array(client)); - } + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + for (int i = 0; i < initial_objects; i++) { + object_ids.push_back(create_and_seal_array(client)); } auto worker = [&](int id, std::vector object_ids) { @@ -264,53 +256,45 @@ void ConcurrentGetAndPut(std::string ipc_socket, std::string rpc_endpoint) { if (i % 3 == 0) { ObjectID new_id = create_and_seal_array(client); if (new_id != InvalidObjectID()) { - client.Release(new_id); + VINEYARD_DISCARD(client.Release(new_id)); } - // { - // std::lock_guard lock(object_ids_mutex); - // object_ids.push_back(new_id); - //} - // } else { - // ObjectID id_to_get; - // { - // std::lock_guard lock(object_ids_mutex); - // if (!object_ids.empty()) { - // id_to_get = object_ids[rand() % object_ids.size()]; - // } - // } - // if (id_to_get != ObjectID()) { - // std::shared_ptr object; - // client.GetObject(object_ids[i%object_ids.size()]); - // client.Release(object_ids[i%object_ids.size()]); - // } - //} + { + std::lock_guard lock(object_ids_mutex); + object_ids.push_back(new_id); + } } else { - // if (i % 3 == 0) { + ObjectID id_to_get; + { + std::lock_guard lock(object_ids_mutex); + id_to_get = object_ids[rand() % object_ids.size()]; + } + if (id_to_get != InvalidObjectID()) { + std::shared_ptr object = client.GetObject(id_to_get); + VINEYARD_DISCARD(client.Release(object->id())); + } + } + } else { + if (i % 3 == 0) { ObjectID new_id = create_remote_blob(rpc_client); - // { - // std::lock_guard lock(object_ids_mutex); - // object_ids.push_back(new_id); - // } - //} else { - // ObjectID id_to_get; - // { - // std::lock_guard lock(object_ids_mutex); - // if (!object_ids.empty()) { - // id_to_get = object_ids[rand() % object_ids.size()]; - // } - // } - // if (id_to_get != ObjectID()) { - // std::shared_ptr object; - - if (new_id != InvalidObjectID()) { - rpc_client.GetObject(object_ids[i % object_ids.size()]); - ids.push_back(new_id); + { + std::lock_guard lock(object_ids_mutex); + object_ids.push_back(new_id); + } + } else { + ObjectID id_to_get; + { + std::lock_guard lock(object_ids_mutex); + id_to_get = object_ids[rand() % object_ids.size()]; + } + if (id_to_get != InvalidObjectID()) { + std::shared_ptr object = rpc_client.GetObject(id_to_get); } - // } } } } + client.Disconnect(); + rpc_client.Disconnect(); }; std::vector threads; @@ -322,11 +306,8 @@ void ConcurrentGetAndPut(std::string ipc_socket, std::string rpc_endpoint) { thread.join(); } - { - Client client; - VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - VINEYARD_CHECK_OK(client.Clear()); - } + VINEYARD_CHECK_OK(client.Clear()); + client.Disconnect(); } int main(int argc, char** argv) { @@ -336,23 +317,23 @@ int main(int argc, char** argv) { } std::string ipc_socket = std::string(argv[1]); std::string rpc_endpoint = std::string(argv[2]); - /* - LOG(INFO) << "Start concurrent put test with IPCClient ..."; - ConcurrentPutWithClient(ipc_socket); - LOG(INFO) << "Passed concurrent put test with IPCClient"; - - LOG(INFO) << "Start concurrent get test with IPCClient ..."; - ConcurrentGetWithClient(ipc_socket); - LOG(INFO) << "Passed concurrent get test with IPCClient"; - - LOG(INFO) << "Start concurrent put test with RPCClient ..."; - ConcurrentPutWithRPCClient(rpc_endpoint); - LOG(INFO) << "Passed concurrent put test with RPCClient"; - - LOG(INFO) << "Start concurrent get test with RPCClient ..."; - ConcurrentGetWithRPCClient(rpc_endpoint); - LOG(INFO) << "Passed concurrent get test with RPCClient"; - */ + + LOG(INFO) << "Start concurrent put test with IPCClient ..."; + ConcurrentPutWithClient(ipc_socket); + LOG(INFO) << "Passed concurrent put test with IPCClient"; + + LOG(INFO) << "Start concurrent get test with IPCClient ..."; + ConcurrentGetWithClient(ipc_socket); + LOG(INFO) << "Passed concurrent get test with IPCClient"; + + LOG(INFO) << "Start concurrent put test with RPCClient ..."; + ConcurrentPutWithRPCClient(rpc_endpoint); + LOG(INFO) << "Passed concurrent put test with RPCClient"; + + LOG(INFO) << "Start concurrent get test with RPCClient ..."; + ConcurrentGetWithRPCClient(rpc_endpoint); + LOG(INFO) << "Passed concurrent get test with RPCClient"; + LOG(INFO) << "Start concurrent get and put test ..."; ConcurrentGetAndPut(ipc_socket, rpc_endpoint); LOG(INFO) << "Passed concurrent get and put test"; diff --git a/test/lru_spill_test.cc b/test/lru_spill_test.cc index 27a7687599..19aa5a0593 100644 --- a/test/lru_spill_test.cc +++ b/test/lru_spill_test.cc @@ -90,10 +90,12 @@ void LRUTest(Client& client, RPCClient& rpc_client) { ArrayBuilder builder1(client, double_array); auto sealed_double_array1 = std::dynamic_pointer_cast>(builder1.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array1->id())); ArrayBuilder builder2(client, double_array); auto sealed_double_array2 = std::dynamic_pointer_cast>(builder2.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array2->id())); /* step2: RPCClient create: Array3, Array4 */ auto remote_blob_writer3 = @@ -132,6 +134,7 @@ void LRUTest(Client& client, RPCClient& rpc_client) { ArrayBuilder builder5(client, double_array); auto sealed_double_array5 = std::dynamic_pointer_cast>(builder5.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array5->id())); // (Array3, Array4, Array2, Array5) => Spill Array3, Array4. => (Array2, // Array5) @@ -155,10 +158,12 @@ void LRUTest(Client& client, RPCClient& rpc_client) { /* step5: IPCClient get: Array2 => (Array5, Array2) */ auto obj = client.GetObject(sealed_double_array2->id()); + VINEYARD_CHECK_OK(client.Release(sealed_double_array2->id())); CHECK(obj != nullptr); /* step6: IPCClient get: Array4 => (Array5, Array2, Array4) */ obj = client.GetObject(blob_meta4.GetId()); + VINEYARD_CHECK_OK(client.Release(blob_meta4.GetId())); CHECK(obj != nullptr); /* step7: RPCClient create: Array6 */ @@ -211,9 +216,11 @@ void LRUTest(Client& client, RPCClient& rpc_client) { /* step11: IPCClient get: Array3, Array4 */ obj = client.GetObject(blob_meta3.GetId()); + VINEYARD_CHECK_OK(client.Release(blob_meta3.GetId())); CHECK(obj != nullptr); obj = client.GetObject(blob_meta4.GetId()); + VINEYARD_CHECK_OK(client.Release(blob_meta4.GetId())); CHECK(obj != nullptr); // (Array1, Array6, Array3, Array4) => Spill Array1, Array6. => (Array3, @@ -253,14 +260,17 @@ void LRUTest(Client& client, RPCClient& rpc_client) { ArrayBuilder builder8(client, double_array); auto sealed_double_array8 = std::dynamic_pointer_cast>(builder8.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array8->id())); ArrayBuilder builder9(client, double_array); auto sealed_double_array9 = std::dynamic_pointer_cast>(builder9.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array9->id())); ArrayBuilder builder10(client, double_array); auto sealed_double_array10 = std::dynamic_pointer_cast>(builder10.Seal(client)); + VINEYARD_CHECK_OK(client.Release(sealed_double_array10->id())); // (Array3, Array4, Array7, Array8) => Spill Array3, Array4. => (Array7, // Array8) (Array7, Array8, Array9, Array10) => Spill Array7, Array8. => // (Array9, Array10) @@ -301,12 +311,15 @@ void LRUTest(Client& client, RPCClient& rpc_client) { /* step15: IPCClient get: Array2, Array4, Array6 */ obj = client.GetObject(sealed_double_array2->id()); + VINEYARD_CHECK_OK(client.Release(sealed_double_array2->id())); CHECK(obj != nullptr); obj = client.GetObject(blob_meta4.GetId()); + VINEYARD_CHECK_OK(client.Release(blob_meta4.GetId())); CHECK(obj != nullptr); obj = client.GetObject(blob_meta6.GetId()); + VINEYARD_CHECK_OK(client.Release(blob_meta6.GetId())); CHECK(obj != nullptr); // (Array5, Array3, Array1, Array2) => Spill Array5, Array3. => (Array1, diff --git a/test/release_test.cc b/test/release_test.cc index b09d6d1806..9a4cb9fb62 100644 --- a/test/release_test.cc +++ b/test/release_test.cc @@ -66,9 +66,9 @@ int main(int argc, char** argv) { bool is_in_use{false}; VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client1.Release({id, blob_id})); + VINEYARD_CHECK_OK(client1.Release({id})); VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); + CHECK(is_in_use); } { // single client @@ -80,12 +80,9 @@ int main(int argc, char** argv) { auto blob = client1.GetObject(blob_id); CHECK(blob != nullptr); - VINEYARD_CHECK_OK(client1.Release({id})); + VINEYARD_CHECK_OK(client1.Release({obj->id()})); VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client1.Release({blob_id})); - VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); } { // multiple clients @@ -93,13 +90,13 @@ int main(int argc, char** argv) { CHECK(blob1 != nullptr); auto blob2 = client2.GetObject(blob_id); CHECK(blob2 != nullptr); - VINEYARD_CHECK_OK(client1.Release({blob_id})); + VINEYARD_CHECK_OK(client1.Release({blob1->id()})); bool is_in_use{false}; VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); CHECK(is_in_use); - VINEYARD_CHECK_OK(client2.Release({blob_id})); + VINEYARD_CHECK_OK(client2.Release({blob2->id()})); VINEYARD_CHECK_OK(client2.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); + CHECK(is_in_use); } { // diamond reference count @@ -122,7 +119,7 @@ int main(int argc, char** argv) { CHECK(is_in_use); VINEYARD_CHECK_OK(client1.Release({copy_id})); VINEYARD_CHECK_OK(client1.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); + CHECK(is_in_use); } { // auto release in disconnection @@ -134,7 +131,7 @@ int main(int argc, char** argv) { client1.Disconnect(); sleep(5); VINEYARD_CHECK_OK(client2.IsInUse(blob_id, is_in_use)); - CHECK(!is_in_use); + CHECK(is_in_use); } LOG(INFO) << "Passed release tests..."; diff --git a/test/runner.py b/test/runner.py index 419c595fca..a02d4223b5 100755 --- a/test/runner.py +++ b/test/runner.py @@ -494,7 +494,7 @@ def run_vineyard_cpp_tests(meta, allocator, endpoints, tests): run_test(tests, 'perfect_hashmap_test') run_test(tests, 'persist_test') run_test(tests, 'plasma_test') - # run_test(tests, 'release_test') + run_test(tests, 'release_test') run_test(tests, 'remote_buffer_test', '127.0.0.1:%d' % rpc_socket_port) run_test(tests, 'rpc_delete_test', '127.0.0.1:%d' % rpc_socket_port) run_test(tests, 'rpc_get_object_test', '127.0.0.1:%d' % rpc_socket_port) @@ -553,6 +553,21 @@ def run_vineyard_lru_spill_tests(meta, allocator, endpoints, tests): run_test(tests, 'lru_spill_test', '127.0.0.1:%d' % rpc_socket_port) +def run_vineyard_concurrent_lru_spill_tests(meta, allocator, endpoints, tests): + meta_prefix = 'vineyard_test_%s' % time.time() + metadata_settings = make_metadata_settings(meta, endpoints, meta_prefix) + with start_vineyardd( + metadata_settings, + ['--allocator', allocator], + size="10Mi", + default_ipc_socket=VINEYARD_CI_IPC_SOCKET, + spill_path='/tmp/spill_path', + spill_upper_rate=0.8, + spill_lower_rate=0.5, + ) as (_, rpc_socket_port): + run_test(tests, 'concurrent_lru_spill_test', '127.0.0.1:%d' % rpc_socket_port) + + def run_etcd_member_tests(meta, allocator, endpoints, tests): """ Here we start 2 vineyard instances, @@ -1196,6 +1211,9 @@ def execute_tests(args): run_vineyard_cpp_tests(args.meta, args.allocator, endpoints, args.tests) run_vineyard_spill_tests(args.meta, args.allocator, endpoints, args.tests) run_vineyard_lru_spill_tests(args.meta, args.allocator, endpoints, args.tests) + run_vineyard_concurrent_lru_spill_tests( + args.meta, args.allocator, endpoints, args.tests + ) if args.with_graph: run_graph_tests(args.meta, args.allocator, endpoints, args.tests) diff --git a/test/spill_test.cc b/test/spill_test.cc index dbed86d44d..48d185fa6b 100644 --- a/test/spill_test.cc +++ b/test/spill_test.cc @@ -55,6 +55,7 @@ void BasicTest(Client& client) { ArrayBuilder builder1(client, double_array); auto sealed_double_array1 = std::dynamic_pointer_cast>(builder1.Seal(client)); + ObjectID id1 = sealed_double_array1->id(); auto blob_id = GetObjectID(sealed_double_array1); CHECK(blob_id != InvalidObjectID()); { @@ -67,23 +68,25 @@ void BasicTest(Client& client) { std::unique_ptr buffer_writer_; auto status = client.CreateBlob(double_array.size() * sizeof(double), buffer_writer_); - CHECK(status.ok()); - VINEYARD_CHECK_OK(client.DelData(buffer_writer_->id())); + CHECK(status.IsNotEnoughMemory()); } bool is_spilled{false}; - VINEYARD_CHECK_OK(client.IsSpilled(blob_id, is_spilled)); - CHECK(is_spilled); + bool is_in_use{false}; + VINEYARD_CHECK_OK(client.Release({id1})); + VINEYARD_CHECK_OK(client.IsInUse(blob_id, is_in_use)); + CHECK(is_in_use); ArrayBuilder builder2(client, double_array); auto sealed_double_array2 = std::dynamic_pointer_cast>(builder2.Seal(client)); + auto id2 = sealed_double_array2->id(); auto blob_id2 = GetObjectID(sealed_double_array2); - VINEYARD_CHECK_OK(client.IsSpilled(blob_id2, is_spilled)); - CHECK(!is_spilled); - bool is_in_use{false}; + VINEYARD_CHECK_OK(client.IsSpilled(blob_id, is_spilled)); + CHECK(is_spilled); VINEYARD_CHECK_OK(client.IsInUse(blob_id2, is_in_use)); CHECK(is_in_use); + VINEYARD_CHECK_OK(client.Release({id2})); LOG(INFO) << "Finish basic test ..."; } @@ -108,6 +111,7 @@ void ReloadTest(Client& client) { bool is_in_use{false}; VINEYARD_CHECK_OK(client.IsInUse(bid, is_in_use)); CHECK(is_in_use); + VINEYARD_CHECK_OK(client.Release({id})); LOG(INFO) << "Finish reload test, case 1 ..."; } { @@ -120,6 +124,7 @@ void ReloadTest(Client& client) { bool is_in_use{false}; VINEYARD_CHECK_OK(client.IsInUse(bid1, is_in_use)); CHECK(is_in_use); + VINEYARD_CHECK_OK(client.Release({id1})); LOG(INFO) << "Finish reload test, case 2 ..."; } { @@ -132,6 +137,7 @@ void ReloadTest(Client& client) { bool is_in_use{false}; VINEYARD_CHECK_OK(client.IsInUse(bid2, is_in_use)); CHECK(is_in_use); + VINEYARD_CHECK_OK(client.Release({id2})); LOG(INFO) << "Finish reload test, case 3 ..."; } { @@ -144,6 +150,7 @@ void ReloadTest(Client& client) { bool is_in_use{false}; VINEYARD_CHECK_OK(client.IsInUse(bid3, is_in_use)); CHECK(is_in_use); + VINEYARD_CHECK_OK(client.Release({id3})); LOG(INFO) << "Finish reload test, case 4 ..."; } @@ -170,28 +177,6 @@ void ReloadTest(Client& client) { } LOG(INFO) << "Finish reload test, case 6 ..."; } - { - bool is_spilled{false}; - VINEYARD_CHECK_OK(client.IsSpilled(bid2, is_spilled)); - CHECK(is_spilled); - auto str_array_copy = client.GetObject>(id2); - CHECK(str_array_copy->size() == string_array2.size()); - for (size_t i = 0; i < string_array2.size(); i++) { - CHECK_EQ(string_array2[i], (*str_array_copy)[i]); - } - LOG(INFO) << "Finish reload test, case 7 ..."; - } - { - bool is_spilled{false}; - VINEYARD_CHECK_OK(client.IsSpilled(bid3, is_spilled)); - CHECK(is_spilled); - auto str_array_copy = client.GetObject>(id3); - CHECK(str_array_copy->size() == string_array3.size()); - for (size_t i = 0; i < string_array3.size(); i++) { - CHECK_EQ(string_array3[i], (*str_array_copy)[i]); - } - LOG(INFO) << "Finish reload test, case 8 ..."; - } LOG(INFO) << "Finish reload test ..."; }