Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Refactors WorkerPool with Prestarts. #48677

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "ray/core_worker/core_worker.h"

#include <future>

#ifndef _WIN32
#include <unistd.h>
#endif
Expand Down Expand Up @@ -2293,6 +2295,24 @@ void CoreWorker::BuildCommonTaskSpec(
}
}

void CoreWorker::PrestartWorkers(const std::string &serialized_runtime_env_info,
uint64_t keep_alive_duration_secs,
size_t num_workers) {
rpc::PrestartWorkersRequest request;
request.set_language(GetLanguage());
request.set_job_id(GetCurrentJobId().Binary());
jjyao marked this conversation as resolved.
Show resolved Hide resolved
*request.mutable_runtime_env_info() =
*OverrideTaskOrActorRuntimeEnvInfo(serialized_runtime_env_info);
request.set_keep_alive_duration_secs(keep_alive_duration_secs);
request.set_num_workers(num_workers);
local_raylet_client_->PrestartWorkers(
request, [](const Status &status, const rpc::PrestartWorkersReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to prestart workers: " << status.ToString();
}
});
}

std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
Expand Down
13 changes: 12 additions & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include "ray/pubsub/subscriber.h"
#include "ray/raylet_client/raylet_client.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/worker/core_worker_client.h"
#include "ray/rpc/worker/core_worker_server.h"
#include "ray/util/process.h"
#include "src/ray/protobuf/pubsub.pb.h"
Expand Down Expand Up @@ -825,6 +824,18 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
const std::string &error_message,
double timestamp);

// Prestart workers. The workers:
// - uses current language.
// - uses current JobID.
// - does NOT support root_detached_actor_id.
// - uses provided runtime_env_info applied to the job runtime env, as if it's a task
// request.
//
// This API is async. It provides no guarantee that the workers are actually started.
void PrestartWorkers(const std::string &serialized_runtime_env_info,
uint64_t keep_alive_duration_secs,
size_t num_workers);

/// Submit a normal task.
///
/// \param[in] function The remote function to execute.
Expand Down
3 changes: 2 additions & 1 deletion src/ray/protobuf/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ proto_library(
":autoscaler_proto",
":common_proto",
":gcs_proto",
":runtime_env_common_proto",
],
)

Expand Down Expand Up @@ -249,7 +250,7 @@ proto_library(
name = "export_event_proto",
srcs = ["export_api/export_event.proto"],
deps = [
":export_task_event_proto",
":export_task_event_proto",
":export_node_event_proto",
":export_actor_event_proto",
":export_driver_job_event_proto",
Expand Down
17 changes: 17 additions & 0 deletions src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package ray.rpc;
import "src/ray/protobuf/common.proto";
import "src/ray/protobuf/gcs.proto";
import "src/ray/protobuf/autoscaler.proto";
import "src/ray/protobuf/runtime_env_common.proto";

message WorkerBacklogReport {
// TaskSpec indicating the scheduling class.
Expand Down Expand Up @@ -94,6 +95,20 @@ message RequestWorkerLeaseReply {
string scheduling_failure_message = 10;
}

// Request to prestart workers. At this time we don't yet know the resource, or task type.
message PrestartWorkersRequest {
Language language = 1;
// Job ID for the workers. Note: root_detached_actor_id is not supported.
optional bytes job_id = 2;
RuntimeEnvInfo runtime_env_info = 3;
// Started idle workers will be kept alive for this duration. Reset on task assignment.
uint64 keep_alive_duration_secs = 4;
// Raylet will try to start `num_workers` workers.
uint64 num_workers = 5;
}

message PrestartWorkersReply {}

message PrepareBundleResourcesRequest {
// Bundles that containing the requested resources.
repeated Bundle bundle_specs = 1;
Expand Down Expand Up @@ -385,6 +400,8 @@ service NodeManagerService {
rpc GetResourceLoad(GetResourceLoadRequest) returns (GetResourceLoadReply);
// Request a worker from the raylet.
rpc RequestWorkerLease(RequestWorkerLeaseRequest) returns (RequestWorkerLeaseReply);
// Request to prestart workers.
rpc PrestartWorkers(PrestartWorkersRequest) returns (PrestartWorkersReply);
// Report task backlog information from a worker to the raylet
rpc ReportWorkerBacklog(ReportWorkerBacklogRequest) returns (ReportWorkerBacklogReply);
// Release a worker back to its raylet.
Expand Down
35 changes: 35 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
#include <csignal>
#include <fstream>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>

#include "absl/functional/bind_front.h"
#include "absl/time/clock.h"
Expand All @@ -40,6 +43,7 @@
#include "ray/util/event.h"
#include "ray/util/event_label.h"
#include "ray/util/util.h"
#include "src/ray/raylet/worker_pool.h"

namespace {

Expand Down Expand Up @@ -1867,6 +1871,37 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques
send_reply_callback_wrapper);
}

void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request,
rpc::PrestartWorkersReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto pop_worker_request = std::make_shared<PopWorkerRequest>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PopWorkerRequest is supposed to be a private thing inside worker_pool. Let's just make StartNewWorkers to accept needed parameters to construct PopWorkerRequest inside worker pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StartNewWorker(PopWorkerRequest) is also used by PopWorker(PopWorkerRequest) internally. The Request does not have anything private so I guess we can just expose it and allow node_manager.cc to use it? It will be much easier. If we just expose another method with the scattered 10 arguments it doesn't have any added values any way.

request.language(),
rpc::WorkerType::WORKER,
request.has_job_id() ? JobID::FromBinary(request.job_id()) : JobID::Nil(),
/*root_detached_actor_id=*/ActorID::Nil(),
/*gpu=*/std::nullopt,
/*actor_worker=*/std::nullopt,
request.runtime_env_info(),
/*options=*/std::vector<std::string>{},
absl::Seconds(request.keep_alive_duration_secs()),
/*callback=*/
[request](const std::shared_ptr<WorkerInterface> &worker,
PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) {
// This callback does not use the worker.
RAY_LOG(DEBUG).WithField(worker->WorkerId())
<< "Prestart worker started! token " << worker->GetStartupToken()
<< ", status " << status << ", runtime_env_setup_error_message "
<< runtime_env_setup_error_message;
return false;
});

for (int64_t i = 0; i < request.num_workers(); i++) {
worker_pool_.StartNewWorker(pop_worker_request);
}
send_reply_callback(Status::OK(), nullptr, nullptr);
}

void NodeManager::HandlePrepareBundleResources(
rpc::PrepareBundleResourcesRequest request,
rpc::PrepareBundleResourcesReply *reply,
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandlePrestartWorkers(rpc::PrestartWorkersRequest request,
rpc::PrestartWorkersReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `ReportWorkerBacklog` request.
void HandleReportWorkerBacklog(rpc::ReportWorkerBacklogRequest request,
rpc::ReportWorkerBacklogReply *reply,
Expand Down
Loading