diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 1638c0f996c0..5be7948f40ca 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -130,6 +130,7 @@ py_test_module_list( "test_worker_capping.py", "test_object_spilling_2.py", "test_object_spilling_3.py", + "test_object_spilling_no_asan.py", "test_object_manager.py", "test_multi_tenancy.py", "test_namespace.py", diff --git a/python/ray/tests/test_object_spilling_no_asan.py b/python/ray/tests/test_object_spilling_no_asan.py new file mode 100644 index 000000000000..d490b383b0f5 --- /dev/null +++ b/python/ray/tests/test_object_spilling_no_asan.py @@ -0,0 +1,48 @@ +import numpy as np +import pytest +import os +import sys + +import ray + + +# NOTE(swang): This test currently fails in ASAN mode because it tests a +# performance issue that is likely sensitive to timing. +def test_spill_fusion(object_spilling_config): + # Limit our object store to 75 MiB of memory. + object_spilling_config, temp_folder = object_spilling_config + min_spilling_size = 10 * 1024 * 1024 + ray.init( + num_cpus=1, + object_store_memory=75 * 1024 * 1024, + _system_config={ + "max_io_workers": 1, + "object_spilling_config": object_spilling_config, + "min_spilling_size": min_spilling_size, + "object_spilling_threshold": 0.8, + # Set the timeout between create retries high so that this test + # passes in ASAN and debug mode. + "object_store_full_delay_ms": 1000, + }, + ) + + object_size = 1024 * 1024 + # Fill up the object store 4 times with small objects. + # We trigger spilling at 80% and the min spill size is + # about 10 objects. + xs = [ray.put(np.zeros(object_size // 8)) for _ in range(300)] # noqa: F841 + + spill_dir = os.path.join(temp_folder, ray.ray_constants.DEFAULT_OBJECT_PREFIX) + under_min, over_min = 0, 0 + for filename in os.listdir(spill_dir): + size = os.stat(os.path.join(spill_dir, filename)).st_size + if size < min_spilling_size: + under_min += 1 + else: + over_min += 1 + # We should almost always spill fused objects. + assert over_min > under_min + + +if __name__ == "__main__": + sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index dfefda52815c..c6c925e11505 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -188,6 +188,16 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) { counts += 1; } if (!objects_to_spill.empty()) { + if (it == pinned_objects_.end() && bytes_to_spill <= num_bytes_to_spill && + !objects_pending_spill_.empty()) { + // We have gone through all spillable objects but we have not yet reached + // the minimum bytes to spill and we are already spilling other objects. + // Let those spill requests finish before we try to spill the current + // objects. This gives us some time to decide whether we really need to + // spill the current objects or if we can afford to wait for additional + // objects to fuse with. + return false; + } RAY_LOG(DEBUG) << "Spilling objects of total size " << bytes_to_spill << " num objects " << objects_to_spill.size(); auto start_time = absl::GetCurrentTimeNanos(); diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index c266f202cd39..52453fdda524 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -321,7 +321,9 @@ class LocalObjectManager { /// The last time a restore log finished. int64_t last_restore_log_ns_ = 0; + friend class LocalObjectManagerTestWithMinSpillingSize; friend class LocalObjectManagerTest; + friend class LocalObjectManagerFusedTest; }; }; // namespace raylet diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index be79452030d2..9f73ecb2a023 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -283,9 +283,9 @@ class MockObjectBuffer : public Buffer { std::shared_ptr> unpins_; }; -class LocalObjectManagerTest : public ::testing::Test { +class LocalObjectManagerTestWithMinSpillingSize { public: - LocalObjectManagerTest() + LocalObjectManagerTestWithMinSpillingSize(int64_t min_spilling_size) : subscriber_(std::make_shared()), owner_client(std::make_shared()), client_pool([&](const rpc::Address &addr) { return owner_client; }), @@ -295,7 +295,7 @@ class LocalObjectManagerTest : public ::testing::Test { manager_node_id_, "address", 1234, free_objects_batch_size, /*free_objects_period_ms=*/1000, worker_pool, client_pool, /*max_io_workers=*/2, - /*min_spilling_size=*/0, + /*min_spilling_size=*/min_spilling_size, /*is_external_storage_type_fs=*/true, /*max_fused_object_count*/ max_fused_object_count_, /*on_objects_freed=*/ @@ -349,6 +349,18 @@ class LocalObjectManagerTest : public ::testing::Test { std::unordered_set unevictable_objects_; }; +class LocalObjectManagerTest : public LocalObjectManagerTestWithMinSpillingSize, + public ::testing::Test { + public: + LocalObjectManagerTest() : LocalObjectManagerTestWithMinSpillingSize(0) {} +}; + +class LocalObjectManagerFusedTest : public LocalObjectManagerTestWithMinSpillingSize, + public ::testing::Test { + public: + LocalObjectManagerFusedTest() : LocalObjectManagerTestWithMinSpillingSize(100) {} +}; + TEST_F(LocalObjectManagerTest, TestPin) { rpc::Address owner_address; owner_address.set_worker_id(WorkerID::FromRandom().Binary()); @@ -1288,6 +1300,111 @@ TEST_F(LocalObjectManagerTest, TestDuplicatePinAndSpill) { AssertNoLeaks(); } +TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSize) { + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + + std::vector object_ids; + std::vector> objects; + int64_t total_size = 0; + int64_t object_size = 52; + + for (size_t i = 0; i < 3; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(object_size, object_id, unpins); + total_size += object_size; + auto object = std::make_unique(data_buffer, nullptr, + std::vector()); + objects.push_back(std::move(object)); + } + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); + manager.SpillObjectUptoMaxThroughput(); + // Only 2 of the objects should be spilled. + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); + for (const auto &id : object_ids) { + ASSERT_EQ((*unpins)[id], 0); + } + manager.SpillObjectUptoMaxThroughput(); + ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); + + // Check that half the objects get spilled and the URLs get added to the + // global object directory. + std::vector urls; + urls.push_back(BuildURL("url1")); + urls.push_back(BuildURL("url2")); + EXPECT_CALL(worker_pool, PushSpillWorker(_)); + // Objects should get freed even though we didn't wait for the owner's notice + // to evict. + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < urls.size(); i++) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } + ASSERT_EQ(owner_client->object_urls.size(), 2); + int num_unpinned = 0; + for (const auto &id : object_ids) { + if ((*unpins)[id] == 1) { + num_unpinned++; + } + } + ASSERT_EQ(num_unpinned, 2); + + // We will spill the last object, even though we're under the min spilling + // size, because they are the only spillable objects. + manager.SpillObjectUptoMaxThroughput(); + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); +} + +TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSizeMaxFusionCount) { + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + + std::vector object_ids; + std::vector> objects; + int64_t total_size = 0; + // 20 of these objects are needed to hit the min spilling size, but + // max_fused_object_count=15. + int64_t object_size = 5; + + for (size_t i = 0; i < 40; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(object_size, object_id, unpins); + total_size += object_size; + auto object = std::make_unique(data_buffer, nullptr, + std::vector()); + objects.push_back(std::move(object)); + } + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); + manager.SpillObjectUptoMaxThroughput(); + // First two spill batches succeed because they have at least 15 objects. + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + // Last spill batch fails because we have 10 objects and their total size is + // less than 100. + ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); + + std::vector urls; + for (int i = 0; i < 15; i++) { + urls.push_back(BuildURL("url", i)); + } + EXPECT_CALL(worker_pool, PushSpillWorker(_)).Times(2); + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < urls.size(); i++) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } + + // We will spill the last objects even though we're under the min spilling + // size because they are the only spillable objects. + manager.SpillObjectUptoMaxThroughput(); + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); +} + } // namespace raylet } // namespace ray