Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Response sender to check for improper non-decoupled model usage #363

Merged
merged 3 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ InferRequest::InferRequest(
pb_cancel_ =
std::make_shared<PbCancel>(response_factory_address_, request_address_);
response_sender_ = std::make_shared<ResponseSender>(
request_address_, response_factory_address_,
request_address_, response_factory_address_, nullptr /* is_decoupled */,
Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_);
#endif
}
Expand Down Expand Up @@ -272,7 +272,8 @@ InferRequest::SaveToSharedMemory(std::unique_ptr<SharedMemoryManager>& shm_pool)
std::unique_ptr<InferRequest>
InferRequest::LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
bi::managed_external_buffer::handle_t request_handle, bool open_cuda_handle)
bi::managed_external_buffer::handle_t request_handle, bool open_cuda_handle,
bool const* is_model_decoupled)
{
AllocatedSharedMemory<char> infer_request_shm =
shm_pool->Load<char>(request_handle);
Expand Down Expand Up @@ -328,7 +329,7 @@ InferRequest::LoadFromSharedMemory(
return std::unique_ptr<InferRequest>(new InferRequest(
infer_request_shm, request_id_shm, correlation_id_shm,
requested_output_names_shm, model_name_shm, input_tensors, parameters_shm,
infer_trace_shm));
infer_trace_shm, is_model_decoupled));
}

InferRequest::InferRequest(
Expand All @@ -339,7 +340,8 @@ InferRequest::InferRequest(
std::unique_ptr<PbString>& model_name_shm,
std::vector<std::shared_ptr<PbTensor>>& input_tensors,
std::unique_ptr<PbString>& parameters_shm,
std::unique_ptr<InferenceTrace>& infer_trace_shm)
std::unique_ptr<InferenceTrace>& infer_trace_shm,
bool const* is_model_decoupled)
: infer_request_shm_(std::move(infer_request_shm)),
request_id_shm_(std::move(request_id_shm)),
requested_output_names_shm_(std::move(requested_output_names_shm)),
Expand Down Expand Up @@ -387,7 +389,7 @@ InferRequest::InferRequest(
pb_cancel_ =
std::make_shared<PbCancel>(response_factory_address_, request_address_);
response_sender_ = std::make_shared<ResponseSender>(
request_address_, response_factory_address_,
request_address_, response_factory_address_, is_model_decoupled,
Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_);
#endif
}
Expand Down
5 changes: 3 additions & 2 deletions src/infer_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class InferRequest {
static std::unique_ptr<InferRequest> LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
bi::managed_external_buffer::handle_t request_handle,
bool open_cuda_handle);
bool open_cuda_handle, bool const* is_model_decoupled);

/// Disallow copying the inference request object.
DISALLOW_COPY_AND_ASSIGN(InferRequest);
Expand All @@ -135,7 +135,8 @@ class InferRequest {
std::unique_ptr<PbString>& model_name_shm,
std::vector<std::shared_ptr<PbTensor>>& input_tensors,
std::unique_ptr<PbString>& parameters_shm,
std::unique_ptr<InferenceTrace>& infer_trace_shm);
std::unique_ptr<InferenceTrace>& infer_trace_shm,
bool const* is_model_decoupled);

std::string request_id_;
CorrelationId correlation_id_;
Expand Down
11 changes: 10 additions & 1 deletion src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,8 @@ Stub::LoadRequestsFromSharedMemory(RequestBatch* request_batch_shm_ptr)
for (size_t i = 0; i < batch_size; i++) {
std::shared_ptr<InferRequest> infer_request =
InferRequest::LoadFromSharedMemory(
shm_pool_, request_shm_handle[i], true /* open_cuda_handle */);
shm_pool_, request_shm_handle[i], true /* open_cuda_handle */,
&ipc_control_->decoupled /* is_model_decoupled */);
py_request_list.append(infer_request);
}

Expand Down Expand Up @@ -740,6 +741,14 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
error_string_shm = PbString::Create(shm_pool_, error_string);
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
response_batch_shm_ptr->is_error_set = true;
// Once the error is sent to the backend, the backend is supposed to close
// all response factories if not already closed, so closing all response
// senders if not already closed to prevent the model from sending more
// responses after the factories are closed.
for (py::handle py_request : py_request_list) {
InferRequest* request = py_request.cast<InferRequest*>();
request->GetResponseSender()->Close();
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/python_be.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,8 @@ ModelInstanceState::ExecuteBLSRequest(
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
request_batch.data_.get() + sizeof(RequestBatch));
infer_request = InferRequest::LoadFromSharedMemory(
Stub()->ShmPool(), *request_handle, false /* open_cuda_handle */);
Stub()->ShmPool(), *request_handle, false /* open_cuda_handle */,
nullptr /* is_model_decoupled */);

// If the BLS inputs are in GPU an additional round trip between the
// stub process and the main process is required. The reason is that we
Expand Down
88 changes: 67 additions & 21 deletions src/response_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,31 @@

namespace triton { namespace backend { namespace python {

void
CheckResponseSenderArguments(
const std::shared_ptr<InferResponse>& response, const uint32_t flags)
{
// Check the correctness of the provided flags.
if (flags != TRITONSERVER_RESPONSE_COMPLETE_FINAL && flags != 0) {
throw PythonBackendException(
"Unable to send response. Unsupported flag provided.");
}

if (flags == 0 && response == nullptr) {
throw PythonBackendException(
"Inference Response object must be provided when the response flags is "
"set to zero.");
}
}

ResponseSender::ResponseSender(
intptr_t request_address, intptr_t response_factory_address,
std::unique_ptr<SharedMemoryManager>& shm_pool,
bool const* is_decoupled, std::unique_ptr<SharedMemoryManager>& shm_pool,
const std::shared_ptr<PbCancel>& pb_cancel)
: request_address_(request_address),
response_factory_address_(response_factory_address), shm_pool_(shm_pool),
closed_(false), pb_cancel_(pb_cancel)
response_factory_address_(response_factory_address),
is_decoupled_(is_decoupled), shm_pool_(shm_pool), pb_cancel_(pb_cancel),
closed_(false), number_of_response_sent_(0)
{
}

Expand All @@ -54,15 +72,32 @@ ResponseSender::~ResponseSender()
}

void
ResponseSender::Send(
std::shared_ptr<InferResponse> infer_response, const uint32_t flags)
ResponseSender::UpdateStateAndCounters(
const std::shared_ptr<InferResponse>& response, const uint32_t flags)
{
// Release the GIL. This avoids a potential deadlock situation in the parent
// process, where every thread in the thread pool is indirectly waiting for a
// function in the stub process that acquires the GIL. Meanwhile, the current
// thread, which holds the GIL, is also waiting for the parent side to have
// the next available thread to pick up the job during resource contention.
py::gil_scoped_release release;
if (is_decoupled_ == nullptr) {
// TODO: Can a model access the response sender on a BLS infer request?
throw PythonBackendException(
"Unable to send response. Response sender has no reference to the "
"decoupled state of the model.");
}
bool is_decoupled = *is_decoupled_;

std::lock_guard<std::mutex> lk(mu_);

if (!is_decoupled) {
if (response != nullptr && number_of_response_sent_ > 0) {
throw PythonBackendException(
kthui marked this conversation as resolved.
Show resolved Hide resolved
"Unable to send response. Non-decoupled model cannot send more than "
"one response.");
}
if (response == nullptr && flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL &&
number_of_response_sent_ == 0) {
throw PythonBackendException(
"Unable to send response. Non-decoupled model cannot send complete "
"final before sending a response.");
}
}

if (closed_) {
throw PythonBackendException(
Expand All @@ -72,18 +107,22 @@ ResponseSender::Send(
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
closed_ = true;
}
number_of_response_sent_++;
}

// Check the correctness of the provided flags.
if (flags != TRITONSERVER_RESPONSE_COMPLETE_FINAL && flags != 0) {
throw PythonBackendException(
"Unable to send response. Unsupported flag provided.");
}
void
ResponseSender::Send(
std::shared_ptr<InferResponse> infer_response, const uint32_t flags)
{
// Release the GIL. This avoids a potential deadlock situation in the parent
// process, where every thread in the thread pool is indirectly waiting for a
// function in the stub process that acquires the GIL. Meanwhile, the current
// thread, which holds the GIL, is also waiting for the parent side to have
// the next available thread to pick up the job during resource contention.
py::gil_scoped_release release;

if (flags == 0 && infer_response == nullptr) {
throw PythonBackendException(
"Inference Response object must be provided when the response flags is "
"set to zero.");
}
CheckResponseSenderArguments(infer_response, flags);
UpdateStateAndCounters(infer_response, flags);

std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();

Expand Down Expand Up @@ -206,4 +245,11 @@ ResponseSender::IsCancelled()
return pb_cancel_->IsCancelled();
}

void
ResponseSender::Close()
{
std::lock_guard<std::mutex> lk(mu_);
closed_ = true;
}

}}} // namespace triton::backend::python
16 changes: 14 additions & 2 deletions src/response_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#pragma once

#include <mutex>

#include "infer_response.h"
#include "pb_cancel.h"
#include "shm_manager.h"
Expand All @@ -36,17 +38,27 @@ class ResponseSender {
public:
ResponseSender(
intptr_t request_address, intptr_t response_factory_address,
std::unique_ptr<SharedMemoryManager>& shm_pool,
bool const* is_decoupled, std::unique_ptr<SharedMemoryManager>& shm_pool,
const std::shared_ptr<PbCancel>& pb_cancel);
~ResponseSender();
void Send(std::shared_ptr<InferResponse> response, const uint32_t flags);
bool IsCancelled();

// Can be useful at stopping the model from sending any more responses.
void Close();

private:
void UpdateStateAndCounters(
const std::shared_ptr<InferResponse>& response, const uint32_t flags);

intptr_t request_address_;
intptr_t response_factory_address_;
bool const* is_decoupled_;
std::unique_ptr<SharedMemoryManager>& shm_pool_;
bool closed_;
std::shared_ptr<PbCancel> pb_cancel_;

std::mutex mu_;
bool closed_;
size_t number_of_response_sent_;
};
}}} // namespace triton::backend::python
Loading