From ba2db76353192bdbe021056048be9d6a8ab39bb9 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Sat, 9 Nov 2024 16:46:57 -0800 Subject: [PATCH 1/9] [core] Refactor WorkerPool with Prestarts. Signed-off-by: Ruiyang Wang --- src/ray/common/ray_config_def.h | 5 + src/ray/core_worker/core_worker.cc | 21 ++++ src/ray/core_worker/core_worker.h | 14 ++- src/ray/protobuf/BUILD | 3 +- src/ray/protobuf/node_manager.proto | 20 ++++ src/ray/raylet/node_manager.cc | 47 ++++++++ src/ray/raylet/node_manager.h | 4 + src/ray/raylet/worker.cc | 1 + src/ray/raylet/worker.h | 19 +++ src/ray/raylet/worker_pool.cc | 104 +++++++++++------ src/ray/raylet/worker_pool.h | 108 +++++++++++------- src/ray/raylet_client/raylet_client.cc | 7 ++ src/ray/raylet_client/raylet_client.h | 12 ++ .../rpc/node_manager/node_manager_client.h | 6 + .../rpc/node_manager/node_manager_server.h | 5 + 15 files changed, 300 insertions(+), 76 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index b1a73f79cb1a..f35343dbfc85 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -597,6 +597,11 @@ RAY_CONFIG(bool, enable_worker_prestart, false) /// TODO(clarng): reconcile with enable_worker_prestart RAY_CONFIG(bool, prestart_worker_first_driver, true) +/// For a PrestartWorkers request to a single raylet, the maximum number of workers to +/// prestart. If a request asks for more workers than this, the num of workers will be +/// capped. +RAY_CONFIG(uint64_t, restart_workers_api_max_num_workers, 10) + /// The interval of periodic idle worker killing. Value of 0 means worker capping is /// disabled. RAY_CONFIG(uint64_t, kill_idle_workers_interval_ms, 200) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a09d1e4afc0c..52d58207bc3b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -14,6 +14,8 @@ #include "ray/core_worker/core_worker.h" +#include + #ifndef _WIN32 #include #endif @@ -2163,6 +2165,25 @@ void CoreWorker::BuildCommonTaskSpec( } } +Status CoreWorker::PrestartWorkers(const std::string &serialized_runtime_env_info, + int64_t keep_alive_duration_secs, + size_t num_workers) { + rpc::PrestartWorkersRequest request; + request.set_language(GetLanguage()); + request.set_job_id(GetCurrentJobId().Binary()); + *request.mutable_runtime_env_info() = + *OverrideTaskOrActorRuntimeEnvInfo(serialized_runtime_env_info); + request.set_keep_alive_duration_secs(keep_alive_duration_secs); + request.set_num_workers(num_workers); + // this is sync + std::promise promise; + local_raylet_client_->PrestartWorkers( + request, [&promise](const Status &status, const rpc::PrestartWorkersReply &reply) { + promise.set_value(status); + }); + return promise.get_future().get(); +} + std::vector CoreWorker::SubmitTask( const RayFunction &function, const std::vector> &args, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 3357bad4cfd9..0667399633c6 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -44,7 +44,6 @@ #include "ray/pubsub/subscriber.h" #include "ray/raylet_client/raylet_client.h" #include "ray/rpc/node_manager/node_manager_client.h" -#include "ray/rpc/worker/core_worker_client.h" #include "ray/rpc/worker/core_worker_server.h" #include "ray/util/process.h" #include "src/ray/protobuf/pubsub.pb.h" @@ -916,6 +915,19 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const std::string &error_message, double timestamp); + // Prestart workers. The workers: + // - uses current language. + // - uses current JobID. + // - does NOT support root_detached_actor_id. + // - uses provided runtime_env_info applied to the job runtime env, as if it's a task + // request. + // + // This API is sync. It blocks until raylet replies. But it provides no guarantee that + // the workers are actually started. + Status PrestartWorkers(const std::string &serialized_runtime_env_info, + int64_t keep_alive_duration_secs, + size_t num_workers); + /// Submit a normal task. /// /// \param[in] function The remote function to execute. diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index 485669214992..0c21a597eb05 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -118,6 +118,7 @@ proto_library( ":autoscaler_proto", ":common_proto", ":gcs_proto", + ":runtime_env_common_proto", ], ) @@ -249,7 +250,7 @@ proto_library( name = "export_event_proto", srcs = ["export_api/export_event.proto"], deps = [ - ":export_task_event_proto", + ":export_task_event_proto", ":export_node_event_proto", ":export_actor_event_proto", ":export_driver_job_event_proto", diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 3b30f7e71b1a..52fd68dc9dc2 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -20,6 +20,7 @@ package ray.rpc; import "src/ray/protobuf/common.proto"; import "src/ray/protobuf/gcs.proto"; import "src/ray/protobuf/autoscaler.proto"; +import "src/ray/protobuf/runtime_env_common.proto"; message WorkerBacklogReport { // TaskSpec indicating the scheduling class. @@ -94,6 +95,23 @@ message RequestWorkerLeaseReply { string scheduling_failure_message = 10; } +// Request to prestart workers. At this time we don't yet know the resource, or task type. +message PrestartWorkersRequest { + Language language = 1; + // Owner: job or root detached actor. + optional bytes job_id = 2; + optional bytes root_detached_actor_id = 3; + RuntimeEnvInfo runtime_env_info = 4; + // Started idle workers will be kept alive for this duration. Reset on task assignment. + int64 keep_alive_duration_secs = 5; + // Raylet will try to start `num_workers` workers. + int64 num_workers = 6; +} + +message PrestartWorkersReply { +} + + message PrepareBundleResourcesRequest { // Bundles that containing the requested resources. repeated Bundle bundle_specs = 1; @@ -385,6 +403,8 @@ service NodeManagerService { rpc GetResourceLoad(GetResourceLoadRequest) returns (GetResourceLoadReply); // Request a worker from the raylet. rpc RequestWorkerLease(RequestWorkerLeaseRequest) returns (RequestWorkerLeaseReply); + // Request to prestart workers. + rpc PrestartWorkers(PrestartWorkersRequest) returns (PrestartWorkersReply); // Report task backlog information from a worker to the raylet rpc ReportWorkerBacklog(ReportWorkerBacklogRequest) returns (ReportWorkerBacklogReply); // Release a worker back to its raylet. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 879edff0bb6c..338e484f94d6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -18,7 +18,10 @@ #include #include #include +#include +#include #include +#include #include "absl/functional/bind_front.h" #include "absl/time/clock.h" @@ -40,6 +43,7 @@ #include "ray/util/event.h" #include "ray/util/event_label.h" #include "ray/util/util.h" +#include "src/ray/raylet/worker_pool.h" namespace { @@ -1865,6 +1869,49 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques send_reply_callback_wrapper); } +void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request, + rpc::PrestartWorkersReply *reply, + rpc::SendReplyCallback send_reply_callback) { + const int64_t num_workers = + std::min(RayConfig::restart_workers_api_max_num_workers(), request.num_workers()); + if (num_workers < request.num_workers()) { + RAY_LOG(WARNING) << "Requested to prestart " << request.num_workers() + << " workers, but only " << num_workers + << " workers are allowed to prestart. See " + "RAY_restart_workers_api_max_num_workers"; + } + + auto pop_worker_request = std::make_shared( + request.language(), + rpc::WorkerType::WORKER, + request.has_job_id() ? JobID::FromBinary(request.job_id()) : JobID::Nil(), + request.has_root_detached_actor_id() + ? ActorID::FromBinary(request.root_detached_actor_id()) + : ActorID::Nil(), + /*gpu=*/std::nullopt, + /*actor_worker=*/std::nullopt, + request.runtime_env_info(), + /*options=*/std::vector{}, + absl::Seconds(request.keep_alive_duration_secs()), + /*callback=*/ + [request](const std::shared_ptr &worker, + PopWorkerStatus status, + const std::string &runtime_env_setup_error_message) { + // This callback does not use the worker. + RAY_LOG(ERROR) << "Prestart worker started! token " << + + worker->GetStartupToken() << ", id " << worker->WorkerId() << ", status " + << status << ", runtime_env_setup_error_message " + << runtime_env_setup_error_message; + return false; + }); + + for (int64_t i = 0; i < num_workers; i++) { + worker_pool_.StartNewWorker(pop_worker_request); + } + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void NodeManager::HandlePrepareBundleResources( rpc::PrepareBundleResourcesRequest request, rpc::PrepareBundleResourcesReply *reply, diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index cef5e66aa26f..719ae7224b42 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -523,6 +523,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::RequestWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) override; + void HandlePrestartWorkers(rpc::PrestartWorkersRequest request, + rpc::PrestartWorkersReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `ReportWorkerBacklog` request. void HandleReportWorkerBacklog(rpc::ReportWorkerBacklogRequest request, rpc::ReportWorkerBacklogReply *reply, diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index 82c7476b17fc..7dc409678d11 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -15,6 +15,7 @@ #include "ray/raylet/worker.h" #include +#include #include #include "ray/raylet/format/node_manager_generated.h" diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 9166eea619fc..da09691103c0 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -117,6 +117,9 @@ class WorkerInterface { virtual const ActorID &GetRootDetachedActorId() const = 0; + virtual void SetIdleKeepAliveDeadline(absl::Time deadline) = 0; + virtual bool IsIdleKillable(absl::Time now) const = 0; + protected: virtual void SetStartupToken(StartupToken startup_token) = 0; @@ -234,6 +237,7 @@ class Worker : public WorkerInterface { SetIsActorWorker(task_spec.IsActorCreationTask()); assigned_task_ = assigned_task; root_detached_actor_id_ = assigned_task.GetTaskSpecification().RootDetachedActorId(); + idle_keep_alive_deadline_ = std::nullopt; } absl::Time GetAssignedTaskTime() const { return task_assign_time_; }; @@ -256,6 +260,17 @@ class Worker : public WorkerInterface { void SetIsGpu(bool is_gpu); void SetIsActorWorker(bool is_actor_worker); + bool IsIdleKillable(absl::Time now) const { + if (!idle_keep_alive_deadline_.has_value()) { + return true; + } + return now > idle_keep_alive_deadline_.value(); + } + + void SetIdleKeepAliveDeadline(absl::Time deadline) { + idle_keep_alive_deadline_ = deadline; + } + protected: void SetStartupToken(StartupToken startup_token); @@ -330,6 +345,10 @@ class Worker : public WorkerInterface { std::optional is_actor_worker_ = std::nullopt; /// If true, a RPC need to be sent to notify the worker about GCS restarting. bool notify_gcs_restarted_ = false; + /// If set, the worker is not eligible for killing even if it's idle. This status is + /// reset when the worker is assigned a new task. Note if the job is finished, the + /// worker is still killable. + std::optional idle_keep_alive_deadline_ = std::nullopt; }; } // namespace raylet diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 943ea89b24b5..2affb3f2bc07 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include "absl/strings/str_split.h" #include "ray/common/constants.h" @@ -30,6 +31,7 @@ #include "ray/stats/metric_defs.h" #include "ray/util/logging.h" #include "ray/util/util.h" +#include "src/ray/raylet/worker_pool.h" DEFINE_stats(worker_register_time_ms, "end to end latency of register a worker process.", @@ -225,18 +227,20 @@ void WorkerPool::update_worker_startup_token_counter() { void WorkerPool::AddWorkerProcess( State &state, - const rpc::WorkerType worker_type, + rpc::WorkerType worker_type, const Process &proc, const std::chrono::high_resolution_clock::time_point &start, const rpc::RuntimeEnvInfo &runtime_env_info, - const std::vector &dynamic_options) { + const std::vector &dynamic_options, + std::optional idle_worker_keep_alive_duration) { state.worker_processes.emplace(worker_startup_token_counter_, WorkerProcessInfo{/*is_pending_registration=*/true, worker_type, proc, start, runtime_env_info, - dynamic_options}); + dynamic_options, + idle_worker_keep_alive_duration}); } void WorkerPool::RemoveWorkerProcess(State &state, @@ -443,7 +447,8 @@ std::tuple WorkerPool::StartWorkerProcess( const std::vector &dynamic_options, const int runtime_env_hash, const std::string &serialized_runtime_env_context, - const rpc::RuntimeEnvInfo &runtime_env_info) { + const rpc::RuntimeEnvInfo &runtime_env_info, + std::optional idle_worker_keep_alive_duration) { rpc::JobConfig *job_config = nullptr; if (!job_id.IsNil()) { auto it = all_jobs_.find(job_id); @@ -504,7 +509,13 @@ std::tuple WorkerPool::StartWorkerProcess( AdjustWorkerOomScore(proc.GetId()); } MonitorStartingWorkerProcess(worker_startup_token_counter_, language, worker_type); - AddWorkerProcess(state, worker_type, proc, start, runtime_env_info, dynamic_options); + AddWorkerProcess(state, + worker_type, + proc, + start, + runtime_env_info, + dynamic_options, + idle_worker_keep_alive_duration); StartupToken worker_startup_token = worker_startup_token_counter_; update_worker_startup_token_counter(); if (IsIOWorkerType(worker_type)) { @@ -754,6 +765,13 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr &worker send_reply_callback(status, /*port=*/0); return status; } + + if (it->second.idle_worker_keep_alive_duration.has_value()) { + auto deadline = absl::FromUnixMillis(get_time_()) + + it->second.idle_worker_keep_alive_duration.value(); + worker->SetIdleKeepAliveDeadline(deadline); + } + auto process = Process::FromPid(pid); worker->SetProcess(process); @@ -1047,6 +1065,17 @@ void WorkerPool::TryKillingIdleWorkers() { // Filter out all idle workers that are already dead and/or associated with // jobs that have already finished. int64_t num_killable_idle_workers = 0; + auto worker_killable = + [now](const std::pair, int64_t> &entry) -> bool { + const auto &[idle_worker, last_time_used_ms] = entry; + if (!idle_worker->IsIdleKillable(absl::FromUnixMillis(now))) { + return false; + } + return last_time_used_ms == -1 || + now - last_time_used_ms > + RayConfig::instance().idle_worker_killing_time_threshold_ms(); + }; + for (auto it = idle_of_all_languages_.begin(); it != idle_of_all_languages_.end();) { const auto &idle_worker = it->first; if (idle_worker->IsDead()) { @@ -1060,9 +1089,7 @@ void WorkerPool::TryKillingIdleWorkers() { KillIdleWorker(idle_worker, it->second); it = idle_of_all_languages_.erase(it); } else { - if (it->second == -1 || - now - it->second > - RayConfig::instance().idle_worker_killing_time_threshold_ms()) { + if (worker_killable(*it)) { // The job has not yet finished and the worker has been idle for longer // than the timeout. num_killable_idle_workers++; @@ -1084,9 +1111,7 @@ void WorkerPool::TryKillingIdleWorkers() { auto it = idle_of_all_languages_.begin(); while (num_killable_idle_workers > num_desired_idle_workers && it != idle_of_all_languages_.end()) { - if (it->second == -1 || - now - it->second > - RayConfig::instance().idle_worker_killing_time_threshold_ms()) { + if (worker_killable(*it)) { RAY_LOG(DEBUG) << "Number of idle workers " << num_killable_idle_workers << " is larger than the number of desired workers " << num_desired_idle_workers << " killing idle worker with PID " @@ -1213,14 +1238,16 @@ void WorkerPool::StartNewWorker( pop_worker_request->runtime_env_info.serialized_runtime_env(); PopWorkerStatus status = PopWorkerStatus::OK; - auto [proc, startup_token] = StartWorkerProcess(pop_worker_request->language, - pop_worker_request->worker_type, - pop_worker_request->job_id, - &status, - pop_worker_request->dynamic_options, - pop_worker_request->runtime_env_hash, - serialized_runtime_env_context, - pop_worker_request->runtime_env_info); + auto [proc, startup_token] = + StartWorkerProcess(pop_worker_request->language, + pop_worker_request->worker_type, + pop_worker_request->job_id, + &status, + pop_worker_request->dynamic_options, + pop_worker_request->runtime_env_hash, + serialized_runtime_env_context, + pop_worker_request->runtime_env_info, + pop_worker_request->idle_worker_keep_alive_duration); if (status == PopWorkerStatus::OK) { RAY_CHECK(proc.IsValid()); WarnAboutSize(); @@ -1280,8 +1307,8 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, /*is_gpu=*/task_spec.GetRequiredResources().Get(scheduling::ResourceID::GPU()) > 0, /*is_actor_worker=*/task_spec.IsActorCreationTask(), task_spec.RuntimeEnvInfo(), - task_spec.GetRuntimeEnvHash(), task_spec.DynamicWorkerOptionsOrEmpty(), + /*idle_worker_keep_alive_duration=*/std::nullopt, [this, task_spec, callback]( const std::shared_ptr &worker, PopWorkerStatus status, @@ -1305,14 +1332,22 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, } return callback(worker, status, runtime_env_setup_error_message); }); + WorkerID reused_worker_id = PopWorker(pop_worker_request); + if (!reused_worker_id.IsNil()) { + RAY_LOG(DEBUG).WithField(task_spec.TaskId()).WithField(reused_worker_id) + << "Re-using worker for task."; + } +} +std::shared_ptr WorkerPool::FindAndPopIdleWorker( + const PopWorkerRequest &pop_worker_request) { absl::flat_hash_map skip_reason_count; auto worker_fits_for_task_fn = [this, &pop_worker_request, &skip_reason_count]( const std::pair, int64_t> &pair) -> bool { const auto &worker = pair.first; - WorkerUnfitForTaskReason reason = WorkerFitsForTask(*worker, *pop_worker_request); + WorkerUnfitForTaskReason reason = WorkerFitsForTask(*worker, pop_worker_request); if (reason == WorkerUnfitForTaskReason::NONE) { return true; } @@ -1326,8 +1361,7 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, } return false; }; - auto &state = GetStateForLanguage(task_spec.GetLanguage()); - std::shared_ptr worker = nullptr; + auto &state = GetStateForLanguage(pop_worker_request.language); auto good_worker_it = std::find_if(idle_of_all_languages_.rbegin(), idle_of_all_languages_.rend(), worker_fits_for_task_fn); @@ -1336,24 +1370,28 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, // We can't erase a reverse_iterator. auto lit = good_worker_it.base(); lit--; - worker = std::move(lit->first); + std::shared_ptr worker = std::move(lit->first); idle_of_all_languages_.erase(lit); + return worker; } + RAY_LOG(DEBUG) << "No cached worker, cached workers skipped due to " + << debug_string(skip_reason_count); + return nullptr; +} +WorkerID WorkerPool::PopWorker(std::shared_ptr pop_worker_request) { // If there's an idle worker that fits the task, use it. // Else, start a new worker. + auto worker = FindAndPopIdleWorker(*pop_worker_request); if (worker == nullptr) { - RAY_LOG(DEBUG) << "No cached worker, cached workers skipped due to " - << debug_string(skip_reason_count); StartNewWorker(pop_worker_request); - } else { - RAY_CHECK(worker->GetAssignedJobId().IsNil() || - worker->GetAssignedJobId() == task_spec.JobId()); - RAY_LOG(DEBUG) << "Re-using worker " << worker->WorkerId() << " for task " - << task_spec.DebugString(); - stats::NumWorkersStartedFromCache.Record(1); - PopWorkerCallbackAsync(pop_worker_request->callback, worker, PopWorkerStatus::OK); + return WorkerID::Nil(); } + RAY_CHECK(worker->GetAssignedJobId().IsNil() || + worker->GetAssignedJobId() == pop_worker_request->job_id); + stats::NumWorkersStartedFromCache.Record(1); + PopWorkerCallbackAsync(pop_worker_request->callback, worker, PopWorkerStatus::OK); + return worker->WorkerId(); } void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec, diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index ef2e1e048635..6acd1cc3b724 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -19,6 +19,8 @@ #include #include #include +#include +#include #include #include #include @@ -77,6 +79,44 @@ using PopWorkerCallback = PopWorkerStatus status, const std::string &runtime_env_setup_error_message)>; +struct PopWorkerRequest { + const rpc::Language language; + const rpc::WorkerType worker_type; + const JobID job_id; // can be Nil + const ActorID root_detached_actor_id; // can be Nil + const std::optional is_gpu; + const std::optional is_actor_worker; + const rpc::RuntimeEnvInfo runtime_env_info; + const int runtime_env_hash; + const std::vector dynamic_options; + std::optional idle_worker_keep_alive_duration; + + PopWorkerCallback callback; + + PopWorkerRequest(rpc::Language lang, + rpc::WorkerType worker_type, + JobID job, + ActorID root_actor_id, + std::optional gpu, + std::optional actor_worker, + rpc::RuntimeEnvInfo runtime_env_info, + std::vector options, + std::optional idle_worker_keep_alive_duration, + PopWorkerCallback callback) + : language(lang), + worker_type(worker_type), + job_id(job), + root_detached_actor_id(root_actor_id), + is_gpu(gpu), + is_actor_worker(actor_worker), + runtime_env_info(std::move(runtime_env_info)), + // this-> is needed to disambiguate the member variable from the ctor arg. + runtime_env_hash(), + dynamic_options(std::move(options)), + idle_worker_keep_alive_duration(idle_worker_keep_alive_duration), + callback(std::move(callback)) {} +}; + /// \class WorkerPoolInterface /// /// Used for new scheduler unit tests. @@ -417,6 +457,24 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Get the NodeID of this worker pool. const NodeID &GetNodeID() const; + /// Returns a worker id if an idle worker is reused for this request. Otherwise, + /// asynchronously start a worker and return WorkerID::Nil(). + WorkerID PopWorker(std::shared_ptr pop_worker_request); + + // Find an idle worker that can serve the task. If found, pop it out and return it. + // Otherwise, return nullptr. + std::shared_ptr FindAndPopIdleWorker( + const PopWorkerRequest &pop_worker_request); + + // Starts a new worker that fulfills `pop_worker_request`. Difference on methods: + // - PopWorker may reuse idle workers. + // - StartNewWorker force starts a new worker, with runtime env created. + // - StartWorkerProcess starts a new worker process, *without* runtime env creation. + // + // Note: NONE of these methods guarantee that pop_worker_request.callback will be called + // with the started worker. It may be called with any fitting workers. + void StartNewWorker(const std::shared_ptr &pop_worker_request); + protected: void update_worker_startup_token_counter(); @@ -437,6 +495,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \param runtime_env_hash The hash of runtime env. /// \param serialized_runtime_env_context The context of runtime env. /// \param runtime_env_info The raw runtime env info. + /// \param idle_worker_keep_alive_duration If set, the worker will be kept alive for + /// this duration even if it's idle. This is reset after a task assignment. /// \return The process that we started and a token. If the token is less than 0, /// we didn't start a process. std::tuple StartWorkerProcess( @@ -447,7 +507,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { const std::vector &dynamic_options = {}, int runtime_env_hash = 0, const std::string &serialized_runtime_env_context = "{}", - const rpc::RuntimeEnvInfo &runtime_env_info = rpc::RuntimeEnvInfo()); + const rpc::RuntimeEnvInfo &runtime_env_info = rpc::RuntimeEnvInfo(), + std::optional idle_worker_keep_alive_duration = std::nullopt); /// The implementation of how to start a new worker process with command arguments. /// The lifetime of the process is tied to that of the returned object, @@ -505,46 +566,10 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { rpc::RuntimeEnvInfo runtime_env_info; /// The dynamic_options. std::vector dynamic_options; + /// The duration to keep the worker alive even if it's idle. + std::optional idle_worker_keep_alive_duration; }; - struct PopWorkerRequest { - rpc::Language language; - rpc::WorkerType worker_type; - JobID job_id; // can be Nil - ActorID root_detached_actor_id; // can be Nil - std::optional is_gpu; - std::optional is_actor_worker; - rpc::RuntimeEnvInfo runtime_env_info; - int runtime_env_hash; - std::vector dynamic_options; - - PopWorkerCallback callback; - - PopWorkerRequest(rpc::Language lang, - rpc::WorkerType worker_type, - JobID job, - ActorID root_actor_id, - std::optional gpu, - std::optional actor_worker, - rpc::RuntimeEnvInfo runtime_env_info, - int runtime_hash, - std::vector options, - PopWorkerCallback callback) - : language(lang), - worker_type(worker_type), - job_id(job), - root_detached_actor_id(root_actor_id), - is_gpu(gpu), - is_actor_worker(actor_worker), - runtime_env_info(std::move(runtime_env_info)), - runtime_env_hash(runtime_hash), - dynamic_options(std::move(options)), - callback(std::move(callback)) {} - }; - - // Starts a new worker that fulfills `pop_worker_request`. - void StartNewWorker(const std::shared_ptr &pop_worker_request); - /// An internal data structure that maintains the pool state per language. struct State { /// The commands and arguments used to start the worker process @@ -709,11 +734,12 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { void DeleteRuntimeEnvIfPossible(const std::string &serialized_runtime_env); void AddWorkerProcess(State &state, - const rpc::WorkerType worker_type, + rpc::WorkerType worker_type, const Process &proc, const std::chrono::high_resolution_clock::time_point &start, const rpc::RuntimeEnvInfo &runtime_env_info, - const std::vector &dynamic_options); + const std::vector &dynamic_options, + std::optional idle_worker_keep_alive_duration); void RemoveWorkerProcess(State &state, const StartupToken &proc_startup_token); diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 938f20fab80e..0c3ee4661706 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -22,6 +22,7 @@ #include "ray/raylet/format/node_manager_generated.h" #include "ray/util/logging.h" #include "ray/util/util.h" +#include "src/ray/raylet_client/raylet_client.h" using MessageType = ray::protocol::MessageType; @@ -353,6 +354,12 @@ void raylet::RayletClient::RequestWorkerLease( grpc_client_->RequestWorkerLease(*request, callback); } +void raylet::RayletClient::PrestartWorkers( + const rpc::PrestartWorkersRequest &request, + const rpc::ClientCallback &callback) { + grpc_client_->PrestartWorkers(request, callback); +} + std::shared_ptr raylet::RayletClient::GetChannel() const { return grpc_client_->Channel(); } diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index f40c97edf620..a82b634f3ba5 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -86,6 +86,13 @@ class WorkerLeaseInterface { const std::string &disconnect_worker_error_detail, bool worker_exiting) = 0; + /// Request the raylet to prestart workers. In `request` we can set the worker's owner, + /// runtime env info and number of workers. + /// + virtual void PrestartWorkers( + const rpc::PrestartWorkersRequest &request, + const rpc::ClientCallback &callback) = 0; + /// Notify raylets to release unused workers. /// \param workers_in_use Workers currently in use. /// \param callback Callback that will be called after raylet completes the release of @@ -449,6 +456,11 @@ class RayletClient : public RayletClientInterface { const std::string &disconnect_worker_error_detail, bool worker_exiting) override; + /// Implements WorkerLeaseInterface. + void PrestartWorkers( + const ray::rpc::PrestartWorkersRequest &request, + const ray::rpc::ClientCallback &callback) override; + void GetTaskFailureCause( const TaskID &task_id, const ray::rpc::ClientCallback &callback) diff --git a/src/ray/rpc/node_manager/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h index 95ca4846c3f5..4f5d513a643b 100644 --- a/src/ray/rpc/node_manager/node_manager_client.h +++ b/src/ray/rpc/node_manager/node_manager_client.h @@ -97,6 +97,12 @@ class NodeManagerWorkerClient grpc_client_, /*method_timeout_ms*/ -1, ) + /// Request a prestart worker. + VOID_RPC_CLIENT_METHOD(NodeManagerService, + PrestartWorkers, + grpc_client_, + /*method_timeout_ms*/ -1, ) + /// Report task backlog information VOID_RPC_CLIENT_METHOD(NodeManagerService, ReportWorkerBacklog, diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index bb11333ae35d..e635dbf428b6 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -32,6 +32,7 @@ namespace rpc { RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetResourceLoad) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(NotifyGCSRestart) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(RequestWorkerLease) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PrestartWorkers) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReportWorkerBacklog) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReturnWorker) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(ReleaseUnusedActorWorkers) \ @@ -79,6 +80,10 @@ class NodeManagerServiceHandler { RequestWorkerLeaseReply *reply, SendReplyCallback send_reply_callback) = 0; + virtual void HandlePrestartWorkers(PrestartWorkersRequest request, + PrestartWorkersReply *reply, + SendReplyCallback send_reply_callback) = 0; + virtual void HandleReportWorkerBacklog(ReportWorkerBacklogRequest request, ReportWorkerBacklogReply *reply, SendReplyCallback send_reply_callback) = 0; From d4e266003b08c221dbd5d47e3df21c07bfed84b2 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Sat, 9 Nov 2024 23:22:31 -0800 Subject: [PATCH 2/9] fix Signed-off-by: Ruiyang Wang --- src/ray/core_worker/core_worker.cc | 2 +- src/ray/core_worker/core_worker.h | 2 +- src/ray/protobuf/node_manager.proto | 4 ++-- src/ray/raylet/node_manager.cc | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 52d58207bc3b..1636930845f1 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2166,7 +2166,7 @@ void CoreWorker::BuildCommonTaskSpec( } Status CoreWorker::PrestartWorkers(const std::string &serialized_runtime_env_info, - int64_t keep_alive_duration_secs, + uint64_t keep_alive_duration_secs, size_t num_workers) { rpc::PrestartWorkersRequest request; request.set_language(GetLanguage()); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 0667399633c6..79b5b5f4be82 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -925,7 +925,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { // This API is sync. It blocks until raylet replies. But it provides no guarantee that // the workers are actually started. Status PrestartWorkers(const std::string &serialized_runtime_env_info, - int64_t keep_alive_duration_secs, + uint64_t keep_alive_duration_secs, size_t num_workers); /// Submit a normal task. diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 52fd68dc9dc2..8389a20457c2 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -103,9 +103,9 @@ message PrestartWorkersRequest { optional bytes root_detached_actor_id = 3; RuntimeEnvInfo runtime_env_info = 4; // Started idle workers will be kept alive for this duration. Reset on task assignment. - int64 keep_alive_duration_secs = 5; + uint64 keep_alive_duration_secs = 5; // Raylet will try to start `num_workers` workers. - int64 num_workers = 6; + uint64 num_workers = 6; } message PrestartWorkersReply { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 338e484f94d6..87c47c4567a9 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1872,8 +1872,8 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request, rpc::PrestartWorkersReply *reply, rpc::SendReplyCallback send_reply_callback) { - const int64_t num_workers = - std::min(RayConfig::restart_workers_api_max_num_workers(), request.num_workers()); + const uint64_t num_workers = std::min( + RayConfig::instance().restart_workers_api_max_num_workers(), request.num_workers()); if (num_workers < request.num_workers()) { RAY_LOG(WARNING) << "Requested to prestart " << request.num_workers() << " workers, but only " << num_workers From 77dfbe6258da0e1afbd7a703aecb4026dcf37694 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Fri, 15 Nov 2024 11:31:58 -0800 Subject: [PATCH 3/9] address comments, revert worker.h Signed-off-by: Ruiyang Wang --- src/ray/core_worker/core_worker.cc | 15 +++--- src/ray/core_worker/core_worker.h | 9 ++-- src/ray/raylet/node_manager.cc | 2 +- src/ray/raylet/worker.cc | 1 - src/ray/raylet/worker.h | 19 ------- src/ray/raylet/worker_pool.cc | 81 ++++++++++++++++-------------- src/ray/raylet/worker_pool.h | 20 +++++--- 7 files changed, 67 insertions(+), 80 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 1636930845f1..dce81c41859c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2165,9 +2165,9 @@ void CoreWorker::BuildCommonTaskSpec( } } -Status CoreWorker::PrestartWorkers(const std::string &serialized_runtime_env_info, - uint64_t keep_alive_duration_secs, - size_t num_workers) { +void CoreWorker::PrestartWorkers(const std::string &serialized_runtime_env_info, + uint64_t keep_alive_duration_secs, + size_t num_workers) { rpc::PrestartWorkersRequest request; request.set_language(GetLanguage()); request.set_job_id(GetCurrentJobId().Binary()); @@ -2175,13 +2175,12 @@ Status CoreWorker::PrestartWorkers(const std::string &serialized_runtime_env_inf *OverrideTaskOrActorRuntimeEnvInfo(serialized_runtime_env_info); request.set_keep_alive_duration_secs(keep_alive_duration_secs); request.set_num_workers(num_workers); - // this is sync - std::promise promise; local_raylet_client_->PrestartWorkers( - request, [&promise](const Status &status, const rpc::PrestartWorkersReply &reply) { - promise.set_value(status); + request, [](const Status &status, const rpc::PrestartWorkersReply &reply) { + if (!status.ok()) { + RAY_LOG(INFO) << "Failed to prestart workers: " << status.ToString(); + } }); - return promise.get_future().get(); } std::vector CoreWorker::SubmitTask( diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 79b5b5f4be82..df65994e2334 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -922,11 +922,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { // - uses provided runtime_env_info applied to the job runtime env, as if it's a task // request. // - // This API is sync. It blocks until raylet replies. But it provides no guarantee that - // the workers are actually started. - Status PrestartWorkers(const std::string &serialized_runtime_env_info, - uint64_t keep_alive_duration_secs, - size_t num_workers); + // This API is async. It provides no guarantee that the workers are actually started. + void PrestartWorkers(const std::string &serialized_runtime_env_info, + uint64_t keep_alive_duration_secs, + size_t num_workers); /// Submit a normal task. /// diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 87c47c4567a9..a9cde23323d9 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -147,7 +147,7 @@ NodeManager::NodeManager( /*starting_worker_timeout_callback=*/ [this] { cluster_task_manager_->ScheduleAndDispatchTasks(); }, config.ray_debugger_external, - /*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }), + /*get_time=*/[]() { return absl::Now(); }), client_call_manager_(io_service), worker_rpc_pool_(client_call_manager_), core_worker_subscriber_(std::make_unique( diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index 7dc409678d11..82c7476b17fc 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -15,7 +15,6 @@ #include "ray/raylet/worker.h" #include -#include #include #include "ray/raylet/format/node_manager_generated.h" diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index da09691103c0..9166eea619fc 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -117,9 +117,6 @@ class WorkerInterface { virtual const ActorID &GetRootDetachedActorId() const = 0; - virtual void SetIdleKeepAliveDeadline(absl::Time deadline) = 0; - virtual bool IsIdleKillable(absl::Time now) const = 0; - protected: virtual void SetStartupToken(StartupToken startup_token) = 0; @@ -237,7 +234,6 @@ class Worker : public WorkerInterface { SetIsActorWorker(task_spec.IsActorCreationTask()); assigned_task_ = assigned_task; root_detached_actor_id_ = assigned_task.GetTaskSpecification().RootDetachedActorId(); - idle_keep_alive_deadline_ = std::nullopt; } absl::Time GetAssignedTaskTime() const { return task_assign_time_; }; @@ -260,17 +256,6 @@ class Worker : public WorkerInterface { void SetIsGpu(bool is_gpu); void SetIsActorWorker(bool is_actor_worker); - bool IsIdleKillable(absl::Time now) const { - if (!idle_keep_alive_deadline_.has_value()) { - return true; - } - return now > idle_keep_alive_deadline_.value(); - } - - void SetIdleKeepAliveDeadline(absl::Time deadline) { - idle_keep_alive_deadline_ = deadline; - } - protected: void SetStartupToken(StartupToken startup_token); @@ -345,10 +330,6 @@ class Worker : public WorkerInterface { std::optional is_actor_worker_ = std::nullopt; /// If true, a RPC need to be sent to notify the worker about GCS restarting. bool notify_gcs_restarted_ = false; - /// If set, the worker is not eligible for killing even if it's idle. This status is - /// reset when the worker is assigned a new task. Note if the job is finished, the - /// worker is still killable. - std::optional idle_keep_alive_deadline_ = std::nullopt; }; } // namespace raylet diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 2affb3f2bc07..0f1a0a1a689b 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -94,7 +94,7 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service, const std::string &native_library_path, std::function starting_worker_timeout_callback, int ray_debugger_external, - const std::function get_time) + const std::function get_time) : worker_startup_token_counter_(0), io_service_(&io_service), node_id_(node_id), @@ -766,12 +766,6 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr &worker return status; } - if (it->second.idle_worker_keep_alive_duration.has_value()) { - auto deadline = absl::FromUnixMillis(get_time_()) + - it->second.idle_worker_keep_alive_duration.value(); - worker->SetIdleKeepAliveDeadline(deadline); - } - auto process = Process::FromPid(pid); worker->SetProcess(process); @@ -1043,14 +1037,26 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { } else { state.idle.insert(worker); auto now = get_time_(); + absl::Time keep_alive_until = absl::InfinitePast(); if (worker->GetAssignedTaskTime() == absl::Time()) { + const auto &keep_alive_duration = + state.worker_processes.at(worker->GetStartupToken()) + .idle_worker_keep_alive_duration; + if (keep_alive_duration.has_value()) { + keep_alive_until = now + *keep_alive_duration; + } + // If the worker never held any tasks, then we should consider it first when // choosing which idle workers to kill because it is not warmed up and is slower // than those workers who served tasks before. // See https://github.com/ray-project/ray/pull/36766 - idle_of_all_languages_.emplace_front(worker, now); + // + // Also, we set keep_alive_until w.r.t. idle_worker_keep_alive_duration. + idle_of_all_languages_.emplace_front( + IdleWorkerEntry{worker, /*idle_since=*/now, keep_alive_until}); } else { - idle_of_all_languages_.emplace_back(worker, now); + idle_of_all_languages_.emplace_back( + IdleWorkerEntry{worker, /*idle_since=*/now, keep_alive_until}); } } // We either have an idle worker or a slot to start a new worker. @@ -1060,33 +1066,32 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { } void WorkerPool::TryKillingIdleWorkers() { - int64_t now = get_time_(); + absl::Time now = get_time_(); // Filter out all idle workers that are already dead and/or associated with // jobs that have already finished. int64_t num_killable_idle_workers = 0; - auto worker_killable = - [now](const std::pair, int64_t> &entry) -> bool { - const auto &[idle_worker, last_time_used_ms] = entry; - if (!idle_worker->IsIdleKillable(absl::FromUnixMillis(now))) { + auto worker_killable = [now](const IdleWorkerEntry &entry) -> bool { + if (entry.keep_alive_until > now) { return false; } - return last_time_used_ms == -1 || - now - last_time_used_ms > - RayConfig::instance().idle_worker_killing_time_threshold_ms(); + return now - entry.idle_since > + absl::Milliseconds( + RayConfig::instance().idle_worker_killing_time_threshold_ms()); }; + // First, kill must-kill workers: dead ones, job finished ones. Also calculate killable + // worker count. for (auto it = idle_of_all_languages_.begin(); it != idle_of_all_languages_.end();) { - const auto &idle_worker = it->first; - if (idle_worker->IsDead()) { + if (it->worker->IsDead()) { it = idle_of_all_languages_.erase(it); continue; } - const auto &job_id = idle_worker->GetAssignedJobId(); + const auto &job_id = it->worker->GetAssignedJobId(); if (finished_jobs_.count(job_id) > 0) { // The job has finished, so we should kill the worker immediately. - KillIdleWorker(idle_worker, it->second); + KillIdleWorker(*it); it = idle_of_all_languages_.erase(it); } else { if (worker_killable(*it)) { @@ -1115,8 +1120,8 @@ void WorkerPool::TryKillingIdleWorkers() { RAY_LOG(DEBUG) << "Number of idle workers " << num_killable_idle_workers << " is larger than the number of desired workers " << num_desired_idle_workers << " killing idle worker with PID " - << it->first->GetProcess().GetId(); - KillIdleWorker(it->first, it->second); + << it->worker->GetProcess().GetId(); + KillIdleWorker(*it); it = idle_of_all_languages_.erase(it); num_killable_idle_workers--; } else { @@ -1125,8 +1130,8 @@ void WorkerPool::TryKillingIdleWorkers() { } } -void WorkerPool::KillIdleWorker(std::shared_ptr idle_worker, - int64_t last_time_used_ms) { +void WorkerPool::KillIdleWorker(const IdleWorkerEntry &entry) { + const auto &idle_worker = entry.worker; // To avoid object lost issue caused by forcibly killing, send an RPC request to the // worker to allow it to do cleanup before exiting. We kill it anyway if the driver // is already exited. @@ -1145,9 +1150,9 @@ void WorkerPool::KillIdleWorker(std::shared_ptr idle_worker, request.set_force_exit(true); } rpc_client->Exit( - request, - [this, idle_worker, last_time_used_ms](const ray::Status &status, - const rpc::ExitReply &r) { + request, [this, entry](const ray::Status &status, const rpc::ExitReply &r) { + const auto &idle_worker = entry.worker; + RAY_CHECK(pending_exit_idle_workers_.erase(idle_worker->WorkerId())); if (!status.ok()) { RAY_LOG(ERROR) << "Failed to send exit request: " << status.ToString(); @@ -1173,8 +1178,7 @@ void WorkerPool::KillIdleWorker(std::shared_ptr idle_worker, // kill the worker (e.g., when the worker owns the object). Without this, // if the first N workers own objects, it can't kill idle workers that are // >= N+1. - idle_of_all_languages_.push_back( - std::make_pair(idle_worker, last_time_used_ms)); + idle_of_all_languages_.push_back(entry); } }); } @@ -1343,11 +1347,10 @@ std::shared_ptr WorkerPool::FindAndPopIdleWorker( const PopWorkerRequest &pop_worker_request) { absl::flat_hash_map skip_reason_count; - auto worker_fits_for_task_fn = - [this, &pop_worker_request, &skip_reason_count]( - const std::pair, int64_t> &pair) -> bool { - const auto &worker = pair.first; - WorkerUnfitForTaskReason reason = WorkerFitsForTask(*worker, pop_worker_request); + auto worker_fits_for_task_fn = [this, &pop_worker_request, &skip_reason_count]( + const IdleWorkerEntry &entry) -> bool { + WorkerUnfitForTaskReason reason = + WorkerFitsForTask(*entry.worker, pop_worker_request); if (reason == WorkerUnfitForTaskReason::NONE) { return true; } @@ -1366,11 +1369,11 @@ std::shared_ptr WorkerPool::FindAndPopIdleWorker( idle_of_all_languages_.rend(), worker_fits_for_task_fn); if (good_worker_it != idle_of_all_languages_.rend()) { - state.idle.erase(good_worker_it->first); + state.idle.erase(good_worker_it->worker); // We can't erase a reverse_iterator. auto lit = good_worker_it.base(); lit--; - std::shared_ptr worker = std::move(lit->first); + std::shared_ptr worker = std::move(lit->worker); idle_of_all_languages_.erase(lit); return worker; } @@ -1482,7 +1485,7 @@ void WorkerPool::DisconnectWorker(const std::shared_ptr &worker for (auto it = idle_of_all_languages_.begin(); it != idle_of_all_languages_.end(); it++) { - if (it->first == worker) { + if (it->worker == worker) { idle_of_all_languages_.erase(it); break; } @@ -1582,7 +1585,7 @@ void WorkerPool::WarnAboutSize() { std::string warning_message_str = warning_message.str(); RAY_LOG(WARNING) << warning_message_str; auto error_data_ptr = gcs::CreateErrorTableData( - "worker_pool_large", warning_message_str, get_time_()); + "worker_pool_large", warning_message_str, absl::ToUnixMillis(get_time_())); RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); } } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 6acd1cc3b724..91267eee8ad2 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -255,7 +255,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { const std::string &native_library_path, std::function starting_worker_timeout_callback, int ray_debugger_external, - const std::function get_time); + const std::function get_time); /// Destructor responsible for freeing a set of workers owned by this class. virtual ~WorkerPool() override; @@ -533,8 +533,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// TODO(scv119): replace dynamic options by runtime_env. const std::vector &LookupWorkerDynamicOptions(StartupToken token) const; - void KillIdleWorker(std::shared_ptr worker, int64_t last_time_used_ms); - /// Gloabl startup token variable. Incremented once assigned /// to a worker process and is added to /// state.worker_processes. @@ -610,9 +608,17 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { absl::flat_hash_map> states_by_lang_; /// The pool of idle non-actor workers of all languages. This is used to kill idle - /// workers in FIFO order. The second element of std::pair is the time a worker becomes - /// idle. - std::list, int64_t>> idle_of_all_languages_; + /// workers in FIFO order. + struct IdleWorkerEntry { + std::shared_ptr worker; + // The time when the worker was last released by a task or freshly registered. + absl::Time idle_since; + // Don't kill this worker until this time. Clears on task assignment by setting to + // absl::InfinitePast(); + absl::Time keep_alive_until; + }; + void KillIdleWorker(const IdleWorkerEntry &node); + std::list idle_of_all_languages_; private: /// A helper function that returns the reference of the pool state @@ -818,7 +824,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { PeriodicalRunner periodical_runner_; /// A callback to get the current time. - const std::function get_time_; + const std::function get_time_; /// Runtime env manager client. std::shared_ptr runtime_env_agent_client_; /// Stats From 9ee7858ffc1fa5fbfc7cd25d48c347b7cccdad82 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 26 Nov 2024 12:35:54 -0800 Subject: [PATCH 4/9] remove idle_since Signed-off-by: Ruiyang Wang --- src/ray/common/ray_config_def.h | 5 ----- src/ray/protobuf/node_manager.proto | 9 ++++----- src/ray/raylet/node_manager.cc | 15 ++------------- src/ray/raylet/worker_pool.cc | 22 ++++++++++------------ src/ray/raylet/worker_pool.h | 7 +++---- 5 files changed, 19 insertions(+), 39 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index f35343dbfc85..b1a73f79cb1a 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -597,11 +597,6 @@ RAY_CONFIG(bool, enable_worker_prestart, false) /// TODO(clarng): reconcile with enable_worker_prestart RAY_CONFIG(bool, prestart_worker_first_driver, true) -/// For a PrestartWorkers request to a single raylet, the maximum number of workers to -/// prestart. If a request asks for more workers than this, the num of workers will be -/// capped. -RAY_CONFIG(uint64_t, restart_workers_api_max_num_workers, 10) - /// The interval of periodic idle worker killing. Value of 0 means worker capping is /// disabled. RAY_CONFIG(uint64_t, kill_idle_workers_interval_ms, 200) diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 8389a20457c2..ab300f23d00a 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -98,14 +98,13 @@ message RequestWorkerLeaseReply { // Request to prestart workers. At this time we don't yet know the resource, or task type. message PrestartWorkersRequest { Language language = 1; - // Owner: job or root detached actor. + // Job ID for the workers. Note: root_detached_actor_id is not supported. optional bytes job_id = 2; - optional bytes root_detached_actor_id = 3; - RuntimeEnvInfo runtime_env_info = 4; + RuntimeEnvInfo runtime_env_info = 3; // Started idle workers will be kept alive for this duration. Reset on task assignment. - uint64 keep_alive_duration_secs = 5; + uint64 keep_alive_duration_secs = 4; // Raylet will try to start `num_workers` workers. - uint64 num_workers = 6; + uint64 num_workers = 5; } message PrestartWorkersReply { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a9cde23323d9..2d91b4467a9d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1872,22 +1872,11 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request, rpc::PrestartWorkersReply *reply, rpc::SendReplyCallback send_reply_callback) { - const uint64_t num_workers = std::min( - RayConfig::instance().restart_workers_api_max_num_workers(), request.num_workers()); - if (num_workers < request.num_workers()) { - RAY_LOG(WARNING) << "Requested to prestart " << request.num_workers() - << " workers, but only " << num_workers - << " workers are allowed to prestart. See " - "RAY_restart_workers_api_max_num_workers"; - } - auto pop_worker_request = std::make_shared( request.language(), rpc::WorkerType::WORKER, request.has_job_id() ? JobID::FromBinary(request.job_id()) : JobID::Nil(), - request.has_root_detached_actor_id() - ? ActorID::FromBinary(request.root_detached_actor_id()) - : ActorID::Nil(), + /*root_detached_actor_id=*/ActorID::Nil(), /*gpu=*/std::nullopt, /*actor_worker=*/std::nullopt, request.runtime_env_info(), @@ -1906,7 +1895,7 @@ void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request, return false; }); - for (int64_t i = 0; i < num_workers; i++) { + for (int64_t i = 0; i < request.num_workers(); i++) { worker_pool_.StartNewWorker(pop_worker_request); } send_reply_callback(Status::OK(), nullptr, nullptr); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 0f1a0a1a689b..6eb74a9eca73 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1035,15 +1035,20 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { return PushWorker(worker); } } else { + // Worker pushed without suiting any pending request. Put to idle pool with + // keep_alive_until. state.idle.insert(worker); auto now = get_time_(); - absl::Time keep_alive_until = absl::InfinitePast(); + absl::Time keep_alive_until = + now + + absl::Milliseconds(RayConfig::instance().idle_worker_killing_time_threshold_ms()); if (worker->GetAssignedTaskTime() == absl::Time()) { + // Newly registered worker. Respect idle_worker_keep_alive_duration if any. const auto &keep_alive_duration = state.worker_processes.at(worker->GetStartupToken()) .idle_worker_keep_alive_duration; if (keep_alive_duration.has_value()) { - keep_alive_until = now + *keep_alive_duration; + keep_alive_until = std::max(keep_alive_until, now + *keep_alive_duration); } // If the worker never held any tasks, then we should consider it first when @@ -1052,11 +1057,9 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { // See https://github.com/ray-project/ray/pull/36766 // // Also, we set keep_alive_until w.r.t. idle_worker_keep_alive_duration. - idle_of_all_languages_.emplace_front( - IdleWorkerEntry{worker, /*idle_since=*/now, keep_alive_until}); + idle_of_all_languages_.emplace_front(IdleWorkerEntry{worker, keep_alive_until}); } else { - idle_of_all_languages_.emplace_back( - IdleWorkerEntry{worker, /*idle_since=*/now, keep_alive_until}); + idle_of_all_languages_.emplace_back(IdleWorkerEntry{worker, keep_alive_until}); } } // We either have an idle worker or a slot to start a new worker. @@ -1072,12 +1075,7 @@ void WorkerPool::TryKillingIdleWorkers() { // jobs that have already finished. int64_t num_killable_idle_workers = 0; auto worker_killable = [now](const IdleWorkerEntry &entry) -> bool { - if (entry.keep_alive_until > now) { - return false; - } - return now - entry.idle_since > - absl::Milliseconds( - RayConfig::instance().idle_worker_killing_time_threshold_ms()); + return entry.keep_alive_until < now; }; // First, kill must-kill workers: dead ones, job finished ones. Also calculate killable diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 91267eee8ad2..5e687bbb44e0 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -611,10 +611,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// workers in FIFO order. struct IdleWorkerEntry { std::shared_ptr worker; - // The time when the worker was last released by a task or freshly registered. - absl::Time idle_since; - // Don't kill this worker until this time. Clears on task assignment by setting to - // absl::InfinitePast(); + // Don't kill this worker until this time. Set by: + // - prestarted workers by Now() + keep alive duration from argument + // - idle workers by Now() + idle_worker_killing_time_threshold_ms absl::Time keep_alive_until; }; void KillIdleWorker(const IdleWorkerEntry &node); From cb86f90b892204b5e27b84eb4143cb1c321d4e57 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 26 Nov 2024 17:18:14 -0800 Subject: [PATCH 5/9] lint and nit Signed-off-by: Ruiyang Wang --- src/ray/raylet/node_manager.cc | 9 ++++----- src/ray/raylet/worker_pool.cc | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 24e7d2ac664c..d851c8a9ea7c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1889,11 +1889,10 @@ void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request, PopWorkerStatus status, const std::string &runtime_env_setup_error_message) { // This callback does not use the worker. - RAY_LOG(ERROR) << "Prestart worker started! token " << - - worker->GetStartupToken() << ", id " << worker->WorkerId() << ", status " - << status << ", runtime_env_setup_error_message " - << runtime_env_setup_error_message; + RAY_LOG(DEBUG).WithField(worker->WorkerId()) + << "Prestart worker started! token " << worker->GetStartupToken() + << ", status " << status << ", runtime_env_setup_error_message " + << runtime_env_setup_error_message; return false; }); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index adfb4fcfb7a8..99545346c014 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1088,7 +1088,7 @@ void WorkerPool::TryKillingIdleWorkers() { } const auto &job_id = it->worker->GetAssignedJobId(); - if (finished_jobs_.count(job_id) > 0) { + if (finished_jobs_.contains(job_id)) { // The job has finished, so we should kill the worker immediately. KillIdleWorker(*it); it = idle_of_all_languages_.erase(it); @@ -1585,7 +1585,7 @@ void WorkerPool::WarnAboutSize() { RAY_LOG(WARNING) << warning_message_str; auto error_data_ptr = gcs::CreateErrorTableData( - "worker_pool_large", warning_message_str, absl::ToUnixMillis(get_time_())); + "worker_pool_large", warning_message_str, get_time_()); RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); } } From e7e7fe67c75dc8f2d42e70e3e4a3db7c6070a436 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 26 Nov 2024 17:55:45 -0800 Subject: [PATCH 6/9] lint Signed-off-by: Ruiyang Wang --- src/ray/protobuf/node_manager.proto | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index ab300f23d00a..f10cd251bfc0 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -107,9 +107,7 @@ message PrestartWorkersRequest { uint64 num_workers = 5; } -message PrestartWorkersReply { -} - +message PrestartWorkersReply {} message PrepareBundleResourcesRequest { // Bundles that containing the requested resources. From 425d1c57f3d97c5713b145c433aba45fa0d7b9a7 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 27 Nov 2024 14:04:12 -0800 Subject: [PATCH 7/9] cpp test Signed-off-by: Ruiyang Wang --- src/ray/raylet/worker_pool.cc | 29 +++++------- src/ray/raylet/worker_pool.h | 24 +++++----- src/ray/raylet/worker_pool_test.cc | 75 ++++++++++++++++++++++++++++-- 3 files changed, 94 insertions(+), 34 deletions(-) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 99545346c014..7d3e88c4d3dd 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -232,7 +232,7 @@ void WorkerPool::AddWorkerProcess( const std::chrono::high_resolution_clock::time_point &start, const rpc::RuntimeEnvInfo &runtime_env_info, const std::vector &dynamic_options, - std::optional idle_worker_keep_alive_duration) { + std::optional worker_startup_keep_alive_duration) { state.worker_processes.emplace(worker_startup_token_counter_, WorkerProcessInfo{/*is_pending_registration=*/true, worker_type, @@ -240,7 +240,7 @@ void WorkerPool::AddWorkerProcess( start, runtime_env_info, dynamic_options, - idle_worker_keep_alive_duration}); + worker_startup_keep_alive_duration}); } void WorkerPool::RemoveWorkerProcess(State &state, @@ -448,7 +448,7 @@ std::tuple WorkerPool::StartWorkerProcess( const int runtime_env_hash, const std::string &serialized_runtime_env_context, const rpc::RuntimeEnvInfo &runtime_env_info, - std::optional idle_worker_keep_alive_duration) { + std::optional worker_startup_keep_alive_duration) { rpc::JobConfig *job_config = nullptr; if (!job_id.IsNil()) { auto it = all_jobs_.find(job_id); @@ -515,7 +515,7 @@ std::tuple WorkerPool::StartWorkerProcess( start, runtime_env_info, dynamic_options, - idle_worker_keep_alive_duration); + worker_startup_keep_alive_duration); StartupToken worker_startup_token = worker_startup_token_counter_; update_worker_startup_token_counter(); if (IsIOWorkerType(worker_type)) { @@ -1044,10 +1044,10 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { now + absl::Milliseconds(RayConfig::instance().idle_worker_killing_time_threshold_ms()); if (worker->GetAssignedTaskTime() == absl::Time()) { - // Newly registered worker. Respect idle_worker_keep_alive_duration if any. + // Newly registered worker. Respect worker_startup_keep_alive_duration if any. const auto &keep_alive_duration = state.worker_processes.at(worker->GetStartupToken()) - .idle_worker_keep_alive_duration; + .worker_startup_keep_alive_duration; if (keep_alive_duration.has_value()) { keep_alive_until = std::max(keep_alive_until, now + *keep_alive_duration); } @@ -1057,7 +1057,7 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { // than those workers who served tasks before. // See https://github.com/ray-project/ray/pull/36766 // - // Also, we set keep_alive_until w.r.t. idle_worker_keep_alive_duration. + // Also, we set keep_alive_until w.r.t. worker_startup_keep_alive_duration. idle_of_all_languages_.emplace_front(IdleWorkerEntry{worker, keep_alive_until}); } else { idle_of_all_languages_.emplace_back(IdleWorkerEntry{worker, keep_alive_until}); @@ -1250,7 +1250,7 @@ void WorkerPool::StartNewWorker( pop_worker_request->runtime_env_hash, serialized_runtime_env_context, pop_worker_request->runtime_env_info, - pop_worker_request->idle_worker_keep_alive_duration); + pop_worker_request->worker_startup_keep_alive_duration); if (status == PopWorkerStatus::OK) { RAY_CHECK(proc.IsValid()); WarnAboutSize(); @@ -1311,7 +1311,7 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, /*is_actor_worker=*/task_spec.IsActorCreationTask(), task_spec.RuntimeEnvInfo(), task_spec.DynamicWorkerOptionsOrEmpty(), - /*idle_worker_keep_alive_duration=*/std::nullopt, + /*worker_startup_keep_alive_duration=*/std::nullopt, [this, task_spec, callback]( const std::shared_ptr &worker, PopWorkerStatus status, @@ -1335,11 +1335,7 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, } return callback(worker, status, runtime_env_setup_error_message); }); - WorkerID reused_worker_id = PopWorker(pop_worker_request); - if (!reused_worker_id.IsNil()) { - RAY_LOG(DEBUG).WithField(task_spec.TaskId()).WithField(reused_worker_id) - << "Re-using worker for task."; - } + PopWorker(std::move(pop_worker_request)); } std::shared_ptr WorkerPool::FindAndPopIdleWorker( @@ -1381,19 +1377,18 @@ std::shared_ptr WorkerPool::FindAndPopIdleWorker( return nullptr; } -WorkerID WorkerPool::PopWorker(std::shared_ptr pop_worker_request) { +void WorkerPool::PopWorker(std::shared_ptr pop_worker_request) { // If there's an idle worker that fits the task, use it. // Else, start a new worker. auto worker = FindAndPopIdleWorker(*pop_worker_request); if (worker == nullptr) { StartNewWorker(pop_worker_request); - return WorkerID::Nil(); + return; } RAY_CHECK(worker->GetAssignedJobId().IsNil() || worker->GetAssignedJobId() == pop_worker_request->job_id); stats::NumWorkersStartedFromCache.Record(1); PopWorkerCallbackAsync(pop_worker_request->callback, worker, PopWorkerStatus::OK); - return worker->WorkerId(); } void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec, diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 551c81914db3..8f471a1389ff 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -90,7 +90,7 @@ struct PopWorkerRequest { const rpc::RuntimeEnvInfo runtime_env_info; const int runtime_env_hash; const std::vector dynamic_options; - std::optional idle_worker_keep_alive_duration; + std::optional worker_startup_keep_alive_duration; PopWorkerCallback callback; @@ -102,7 +102,7 @@ struct PopWorkerRequest { std::optional actor_worker, rpc::RuntimeEnvInfo runtime_env_info, std::vector options, - std::optional idle_worker_keep_alive_duration, + std::optional worker_startup_keep_alive_duration, PopWorkerCallback callback) : language(lang), worker_type(worker_type), @@ -114,7 +114,7 @@ struct PopWorkerRequest { // this-> is needed to disambiguate the member variable from the ctor arg. runtime_env_hash(), dynamic_options(std::move(options)), - idle_worker_keep_alive_duration(idle_worker_keep_alive_duration), + worker_startup_keep_alive_duration(worker_startup_keep_alive_duration), callback(std::move(callback)) {} }; @@ -458,9 +458,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Get the NodeID of this worker pool. const NodeID &GetNodeID() const; - /// Returns a worker id if an idle worker is reused for this request. Otherwise, - /// asynchronously start a worker and return WorkerID::Nil(). - WorkerID PopWorker(std::shared_ptr pop_worker_request); + /// Internal implementation of PopWorker. + void PopWorker(std::shared_ptr pop_worker_request); // Find an idle worker that can serve the task. If found, pop it out and return it. // Otherwise, return nullptr. @@ -496,8 +495,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \param runtime_env_hash The hash of runtime env. /// \param serialized_runtime_env_context The context of runtime env. /// \param runtime_env_info The raw runtime env info. - /// \param idle_worker_keep_alive_duration If set, the worker will be kept alive for - /// this duration even if it's idle. This is reset after a task assignment. + /// \param worker_startup_keep_alive_duration If set, the worker will be kept alive for + /// this duration even if it's idle. This is only applicable before a task is assigned + /// to the worker. /// \return The process that we started and a token. If the token is less than 0, /// we didn't start a process. std::tuple StartWorkerProcess( @@ -509,7 +509,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { int runtime_env_hash = 0, const std::string &serialized_runtime_env_context = "{}", const rpc::RuntimeEnvInfo &runtime_env_info = rpc::RuntimeEnvInfo(), - std::optional idle_worker_keep_alive_duration = std::nullopt); + std::optional worker_startup_keep_alive_duration = std::nullopt); /// The implementation of how to start a new worker process with command arguments. /// The lifetime of the process is tied to that of the returned object, @@ -565,8 +565,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { rpc::RuntimeEnvInfo runtime_env_info; /// The dynamic_options. std::vector dynamic_options; - /// The duration to keep the worker alive even if it's idle. - std::optional idle_worker_keep_alive_duration; + /// The duration to keep the newly created worker alive before it's assigned a task. + std::optional worker_startup_keep_alive_duration; }; /// An internal data structure that maintains the pool state per language. @@ -745,7 +745,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { const std::chrono::high_resolution_clock::time_point &start, const rpc::RuntimeEnvInfo &runtime_env_info, const std::vector &dynamic_options, - std::optional idle_worker_keep_alive_duration); + std::optional worker_startup_keep_alive_duration); void RemoveWorkerProcess(State &state, const StartupToken &proc_startup_token); diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 022c5055522a..b63394685052 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -242,9 +242,7 @@ class WorkerPoolMock : public WorkerPool { size_t GetIdleWorkerSize() { return idle_of_all_languages_.size(); } - std::list, absl::Time>> &GetIdleWorkers() { - return idle_of_all_languages_; - } + auto &GetIdleWorkers() { return idle_of_all_languages_; } std::shared_ptr CreateWorker( const Process &proc, @@ -764,6 +762,73 @@ TEST_F(WorkerPoolDriverRegisteredTest, StartWorkerWithDynamicOptionsCommand) { worker_pool_->HandleJobFinished(job_id); } +TEST_F(WorkerPoolDriverRegisteredTest, TestWorkerStartupKeepAliveDuration) { + // Test starting workers with keep alive duration. + // To make sure they are killable, start POOL_SIZE_SOFT_LIMIT + 2 workers. + // On creation: StartNewWorker does not respect POOL_SIZE_SOFT_LIMIT, can start more + // workers than POOL_SIZE_SOFT_LIMIT. + // On idle killing: KillIdleWorkers respects keep alive duration, not killing anyone. + // After keep alive duration expires: KillIdleWorkers kills 2 workers, leaving + // POOL_SIZE_SOFT_LIMIT workers. + constexpr char kRuntimeEnvJson[] = R"({"env_vars": {"FOO": "BAR"}})"; + rpc::RuntimeEnvInfo runtime_env_info; + runtime_env_info.set_serialized_runtime_env(kRuntimeEnvJson); + + auto keep_alive_duration = absl::Seconds(10); + auto pop_worker_request = std::make_shared( + Language::PYTHON, + rpc::WorkerType::WORKER, + JOB_ID, + ActorID::Nil(), + /*gpu=*/std::nullopt, + /*actor_worker=*/std::nullopt, + runtime_env_info, + /*options=*/std::vector{}, + keep_alive_duration, + /*callback=*/ + [](const std::shared_ptr &worker, + PopWorkerStatus status, + const std::string &runtime_env_setup_error_message) { return false; }); + + // Before starting the worker, it's empty. + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 0); + ASSERT_EQ(worker_pool_->GetProcessSize(), 0); + ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), 0); + + // Start the worker + for (int i = 0; i < POOL_SIZE_SOFT_LIMIT + 2; i++) { + worker_pool_->StartNewWorker(pop_worker_request); + } + // Worker started but not registered. + ASSERT_EQ(worker_pool_->NumWorkersStarting(), POOL_SIZE_SOFT_LIMIT + 2); + ASSERT_EQ(worker_pool_->GetProcessSize(), POOL_SIZE_SOFT_LIMIT + 2); + ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), 0); + + // The worker registered. There's no pending tasks so it becomes idle. + worker_pool_->PushWorkers(0, JOB_ID); + ASSERT_EQ(worker_pool_->NumWorkersStarting(), 0); + ASSERT_EQ(worker_pool_->GetProcessSize(), POOL_SIZE_SOFT_LIMIT + 2); + ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), POOL_SIZE_SOFT_LIMIT + 2); + + // Time passes. The worker is not killed because it's protected by keep-alive. + worker_pool_->SetCurrentTimeMs(2000); + worker_pool_->TryKillingIdleWorkers(); + ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), POOL_SIZE_SOFT_LIMIT + 2); + + // After the keep-alive expires, the worker is killed. + worker_pool_->SetCurrentTimeMs(2000 + absl::ToDoubleMilliseconds(keep_alive_duration)); + worker_pool_->TryKillingIdleWorkers(); + ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), POOL_SIZE_SOFT_LIMIT); + + // Finish the job, all workers killed. + worker_pool_->HandleJobFinished(JOB_ID); + worker_pool_->TryKillingIdleWorkers(); + ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), 0); + for (const auto &[worker_id, mock_rpc_client] : mock_worker_rpc_clients_) { + mock_rpc_client->ExitReplySucceed(); + } +} + TEST_F(WorkerPoolDriverRegisteredTest, PopWorkerMultiTenancy) { auto job_id1 = JOB_ID; auto job_id2 = JobID::FromInt(2); @@ -1310,8 +1375,8 @@ TEST_F(WorkerPoolDriverRegisteredTest, TestWorkerCapping) { worker_pool_->SetCurrentTimeMs(10000); worker_pool_->TryKillingIdleWorkers(); ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), POOL_SIZE_SOFT_LIMIT); - for (auto &worker : worker_pool_->GetIdleWorkers()) { - mock_rpc_client_it = mock_worker_rpc_clients_.find(worker.first->WorkerId()); + for (auto &entry : worker_pool_->GetIdleWorkers()) { + mock_rpc_client_it = mock_worker_rpc_clients_.find(entry.worker->WorkerId()); ASSERT_EQ(mock_rpc_client_it->second->last_exit_forced, false); ASSERT_FALSE(mock_rpc_client_it->second->ExitReplySucceed()); } From 9567588789f3918c306a9aa9a845ccc3fb67f2a1 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 27 Nov 2024 15:33:46 -0800 Subject: [PATCH 8/9] fix mock Signed-off-by: Ruiyang Wang --- src/mock/ray/raylet_client/raylet_client.h | 5 +++++ src/ray/core_worker/test/normal_task_submitter_test.cc | 5 +++++ src/ray/gcs/gcs_server/test/gcs_server_test_util.h | 6 ++++++ 3 files changed, 16 insertions(+) diff --git a/src/mock/ray/raylet_client/raylet_client.h b/src/mock/ray/raylet_client/raylet_client.h index de30333bdf78..9c9f54c4c2f5 100644 --- a/src/mock/ray/raylet_client/raylet_client.h +++ b/src/mock/ray/raylet_client/raylet_client.h @@ -48,6 +48,11 @@ class MockRayletClientInterface : public RayletClientInterface { (const TaskID &task_id, const rpc::ClientCallback &callback), (override)); + MOCK_METHOD(void, + PrestartWorkers, + (const rpc::PrestartWorkersRequest &request, + const rpc::ClientCallback &callback), + (override)); MOCK_METHOD(void, ReleaseUnusedActorWorkers, (const std::vector &workers_in_use, diff --git a/src/ray/core_worker/test/normal_task_submitter_test.cc b/src/ray/core_worker/test/normal_task_submitter_test.cc index 4179d512ea47..a87680b4cf36 100644 --- a/src/ray/core_worker/test/normal_task_submitter_test.cc +++ b/src/ray/core_worker/test/normal_task_submitter_test.cc @@ -248,6 +248,11 @@ class MockRayletClient : public WorkerLeaseInterface { } callbacks.push_back(callback); } + void PrestartWorkers( + const rpc::PrestartWorkersRequest &request, + const rpc::ClientCallback &callback) override { + RAY_LOG(FATAL) << "Not implemented"; + } void ReleaseUnusedActorWorkers( const std::vector &workers_in_use, diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index ce8c685f706d..8ef322feeef9 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -99,6 +99,12 @@ struct GcsServerMocker { callbacks.push_back(callback); } + void PrestartWorkers( + const rpc::PrestartWorkersRequest &request, + const rpc::ClientCallback &callback) override { + RAY_LOG(FATAL) << "Not implemented"; + } + /// WorkerLeaseInterface void ReleaseUnusedActorWorkers( const std::vector &workers_in_use, From a908c53349ecd96dd82329eb5aa9353e7b4860c3 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Fri, 29 Nov 2024 14:47:47 -0800 Subject: [PATCH 9/9] lint Signed-off-by: Ruiyang Wang --- src/ray/raylet/node_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index d851c8a9ea7c..b26005a7614c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1896,7 +1896,7 @@ void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request, return false; }); - for (int64_t i = 0; i < request.num_workers(); i++) { + for (uint64_t i = 0; i < request.num_workers(); i++) { worker_pool_.StartNewWorker(pop_worker_request); } send_reply_callback(Status::OK(), nullptr, nullptr);