From 52385a0d539fe1661b1497c36a8d929e2b282d28 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 25 Feb 2022 13:24:05 -0800 Subject: [PATCH] [core] Fix bug in fusion for spilled objects (#22571) Whenever we spill, we try to spill all spillable objects. We also try to fuse small objects together to reduce total IOPS. If there aren't enough objects in the object store to meet the fusion threshold, we spill the objects anyway to avoid liveness issues. However, the current logic always spills once we reach the end of the spillable objects or once we've reached the fusion threshold. This can produce lots of unfused objects if they are created concurrently with the spill. This PR changes the spill logic: once we reach the end of the spillable objects, if the last batch of spilled objects is under the fusion threshold, we'll only spill it if we don't have other spills pending too. This gives the pending spills time to finish, and then we can re-evaluate whether it's necessary to spill the remaining objects. Liveness is also preserved. --- python/ray/tests/BUILD | 1 + .../ray/tests/test_object_spilling_no_asan.py | 48 +++++++ src/ray/raylet/local_object_manager.cc | 10 ++ src/ray/raylet/local_object_manager.h | 2 + .../raylet/test/local_object_manager_test.cc | 123 +++++++++++++++++- 5 files changed, 181 insertions(+), 3 deletions(-) create mode 100644 python/ray/tests/test_object_spilling_no_asan.py diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 1638c0f996c0..5be7948f40ca 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -130,6 +130,7 @@ py_test_module_list( "test_worker_capping.py", "test_object_spilling_2.py", "test_object_spilling_3.py", + "test_object_spilling_no_asan.py", "test_object_manager.py", "test_multi_tenancy.py", "test_namespace.py", diff --git a/python/ray/tests/test_object_spilling_no_asan.py b/python/ray/tests/test_object_spilling_no_asan.py new file mode 100644 index 000000000000..d490b383b0f5 --- /dev/null +++ b/python/ray/tests/test_object_spilling_no_asan.py @@ -0,0 +1,48 @@ +import numpy as np +import pytest +import os +import sys + +import ray + + +# NOTE(swang): This test currently fails in ASAN mode because it tests a +# performance issue that is likely sensitive to timing. +def test_spill_fusion(object_spilling_config): + # Limit our object store to 75 MiB of memory. + object_spilling_config, temp_folder = object_spilling_config + min_spilling_size = 10 * 1024 * 1024 + ray.init( + num_cpus=1, + object_store_memory=75 * 1024 * 1024, + _system_config={ + "max_io_workers": 1, + "object_spilling_config": object_spilling_config, + "min_spilling_size": min_spilling_size, + "object_spilling_threshold": 0.8, + # Set the timeout between create retries high so that this test + # passes in ASAN and debug mode. + "object_store_full_delay_ms": 1000, + }, + ) + + object_size = 1024 * 1024 + # Fill up the object store 4 times with small objects. + # We trigger spilling at 80% and the min spill size is + # about 10 objects. + xs = [ray.put(np.zeros(object_size // 8)) for _ in range(300)] # noqa: F841 + + spill_dir = os.path.join(temp_folder, ray.ray_constants.DEFAULT_OBJECT_PREFIX) + under_min, over_min = 0, 0 + for filename in os.listdir(spill_dir): + size = os.stat(os.path.join(spill_dir, filename)).st_size + if size < min_spilling_size: + under_min += 1 + else: + over_min += 1 + # We should almost always spill fused objects. + assert over_min > under_min + + +if __name__ == "__main__": + sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index dfefda52815c..c6c925e11505 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -188,6 +188,16 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) { counts += 1; } if (!objects_to_spill.empty()) { + if (it == pinned_objects_.end() && bytes_to_spill <= num_bytes_to_spill && + !objects_pending_spill_.empty()) { + // We have gone through all spillable objects but we have not yet reached + // the minimum bytes to spill and we are already spilling other objects. + // Let those spill requests finish before we try to spill the current + // objects. This gives us some time to decide whether we really need to + // spill the current objects or if we can afford to wait for additional + // objects to fuse with. + return false; + } RAY_LOG(DEBUG) << "Spilling objects of total size " << bytes_to_spill << " num objects " << objects_to_spill.size(); auto start_time = absl::GetCurrentTimeNanos(); diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index c266f202cd39..52453fdda524 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -321,7 +321,9 @@ class LocalObjectManager { /// The last time a restore log finished. int64_t last_restore_log_ns_ = 0; + friend class LocalObjectManagerTestWithMinSpillingSize; friend class LocalObjectManagerTest; + friend class LocalObjectManagerFusedTest; }; }; // namespace raylet diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index be79452030d2..9f73ecb2a023 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -283,9 +283,9 @@ class MockObjectBuffer : public Buffer { std::shared_ptr> unpins_; }; -class LocalObjectManagerTest : public ::testing::Test { +class LocalObjectManagerTestWithMinSpillingSize { public: - LocalObjectManagerTest() + LocalObjectManagerTestWithMinSpillingSize(int64_t min_spilling_size) : subscriber_(std::make_shared()), owner_client(std::make_shared()), client_pool([&](const rpc::Address &addr) { return owner_client; }), @@ -295,7 +295,7 @@ class LocalObjectManagerTest : public ::testing::Test { manager_node_id_, "address", 1234, free_objects_batch_size, /*free_objects_period_ms=*/1000, worker_pool, client_pool, /*max_io_workers=*/2, - /*min_spilling_size=*/0, + /*min_spilling_size=*/min_spilling_size, /*is_external_storage_type_fs=*/true, /*max_fused_object_count*/ max_fused_object_count_, /*on_objects_freed=*/ @@ -349,6 +349,18 @@ class LocalObjectManagerTest : public ::testing::Test { std::unordered_set unevictable_objects_; }; +class LocalObjectManagerTest : public LocalObjectManagerTestWithMinSpillingSize, + public ::testing::Test { + public: + LocalObjectManagerTest() : LocalObjectManagerTestWithMinSpillingSize(0) {} +}; + +class LocalObjectManagerFusedTest : public LocalObjectManagerTestWithMinSpillingSize, + public ::testing::Test { + public: + LocalObjectManagerFusedTest() : LocalObjectManagerTestWithMinSpillingSize(100) {} +}; + TEST_F(LocalObjectManagerTest, TestPin) { rpc::Address owner_address; owner_address.set_worker_id(WorkerID::FromRandom().Binary()); @@ -1288,6 +1300,111 @@ TEST_F(LocalObjectManagerTest, TestDuplicatePinAndSpill) { AssertNoLeaks(); } +TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSize) { + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + + std::vector object_ids; + std::vector> objects; + int64_t total_size = 0; + int64_t object_size = 52; + + for (size_t i = 0; i < 3; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(object_size, object_id, unpins); + total_size += object_size; + auto object = std::make_unique(data_buffer, nullptr, + std::vector()); + objects.push_back(std::move(object)); + } + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); + manager.SpillObjectUptoMaxThroughput(); + // Only 2 of the objects should be spilled. + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); + for (const auto &id : object_ids) { + ASSERT_EQ((*unpins)[id], 0); + } + manager.SpillObjectUptoMaxThroughput(); + ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); + + // Check that half the objects get spilled and the URLs get added to the + // global object directory. + std::vector urls; + urls.push_back(BuildURL("url1")); + urls.push_back(BuildURL("url2")); + EXPECT_CALL(worker_pool, PushSpillWorker(_)); + // Objects should get freed even though we didn't wait for the owner's notice + // to evict. + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < urls.size(); i++) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } + ASSERT_EQ(owner_client->object_urls.size(), 2); + int num_unpinned = 0; + for (const auto &id : object_ids) { + if ((*unpins)[id] == 1) { + num_unpinned++; + } + } + ASSERT_EQ(num_unpinned, 2); + + // We will spill the last object, even though we're under the min spilling + // size, because they are the only spillable objects. + manager.SpillObjectUptoMaxThroughput(); + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); +} + +TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSizeMaxFusionCount) { + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + + std::vector object_ids; + std::vector> objects; + int64_t total_size = 0; + // 20 of these objects are needed to hit the min spilling size, but + // max_fused_object_count=15. + int64_t object_size = 5; + + for (size_t i = 0; i < 40; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(object_size, object_id, unpins); + total_size += object_size; + auto object = std::make_unique(data_buffer, nullptr, + std::vector()); + objects.push_back(std::move(object)); + } + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); + manager.SpillObjectUptoMaxThroughput(); + // First two spill batches succeed because they have at least 15 objects. + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + // Last spill batch fails because we have 10 objects and their total size is + // less than 100. + ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); + + std::vector urls; + for (int i = 0; i < 15; i++) { + urls.push_back(BuildURL("url", i)); + } + EXPECT_CALL(worker_pool, PushSpillWorker(_)).Times(2); + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < urls.size(); i++) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } + + // We will spill the last objects even though we're under the min spilling + // size because they are the only spillable objects. + manager.SpillObjectUptoMaxThroughput(); + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); +} + } // namespace raylet } // namespace ray