diff --git a/src/c++/library/grpc_client.cc b/src/c++/library/grpc_client.cc index cc3a9a85f..fe91f5c17 100644 --- a/src/c++/library/grpc_client.cc +++ b/src/c++/library/grpc_client.cc @@ -145,6 +145,20 @@ GetStub( return stub; } + +/// Set client timeout +/// +/// \param client_timeout_ms Deadline for timeout in microseconds +/// \param context Client context to add deadline to +void +SetTimeout(const uint64_t& client_timeout_ms, grpc::ClientContext* context) +{ + if (client_timeout_ms != 0) { + auto deadline = std::chrono::system_clock::now() + + std::chrono::microseconds(client_timeout_ms); + context->set_deadline(deadline); + } +} } // namespace //============================================================================== @@ -479,7 +493,8 @@ InferenceServerGrpcClient::Create( } Error -InferenceServerGrpcClient::IsServerLive(bool* live, const Headers& headers) +InferenceServerGrpcClient::IsServerLive( + bool* live, const Headers& headers, const uint64_t timeout_ms) { Error err; @@ -487,6 +502,7 @@ InferenceServerGrpcClient::IsServerLive(bool* live, const Headers& headers) inference::ServerLiveResponse response; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -505,7 +521,8 @@ InferenceServerGrpcClient::IsServerLive(bool* live, const Headers& headers) } Error -InferenceServerGrpcClient::IsServerReady(bool* ready, const Headers& headers) +InferenceServerGrpcClient::IsServerReady( + bool* ready, const Headers& headers, const uint64_t timeout_ms) { Error err; @@ -513,6 +530,7 @@ InferenceServerGrpcClient::IsServerReady(bool* ready, const Headers& headers) inference::ServerReadyResponse response; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -533,7 +551,8 @@ InferenceServerGrpcClient::IsServerReady(bool* ready, const Headers& headers) Error InferenceServerGrpcClient::IsModelReady( bool* ready, const std::string& model_name, - const std::string& model_version, const Headers& headers) + const std::string& model_version, const Headers& headers, + const uint64_t timeout_ms) { Error err; @@ -541,6 +560,7 @@ InferenceServerGrpcClient::IsModelReady( inference::ModelReadyResponse response; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -567,7 +587,8 @@ InferenceServerGrpcClient::IsModelReady( Error InferenceServerGrpcClient::ServerMetadata( - inference::ServerMetadataResponse* server_metadata, const Headers& headers) + inference::ServerMetadataResponse* server_metadata, const Headers& headers, + const uint64_t timeout_ms) { server_metadata->Clear(); Error err; @@ -575,6 +596,7 @@ InferenceServerGrpcClient::ServerMetadata( inference::ServerMetadataRequest request; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -597,7 +619,7 @@ Error InferenceServerGrpcClient::ModelMetadata( inference::ModelMetadataResponse* model_metadata, const std::string& model_name, const std::string& model_version, - const Headers& headers) + const Headers& headers, const uint64_t timeout_ms) { model_metadata->Clear(); Error err; @@ -605,6 +627,7 @@ InferenceServerGrpcClient::ModelMetadata( inference::ModelMetadataRequest request; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -628,7 +651,8 @@ InferenceServerGrpcClient::ModelMetadata( Error InferenceServerGrpcClient::ModelConfig( inference::ModelConfigResponse* model_config, const std::string& model_name, - const std::string& model_version, const Headers& headers) + const std::string& model_version, const Headers& headers, + const uint64_t timeout_ms) { model_config->Clear(); Error err; @@ -636,6 +660,7 @@ InferenceServerGrpcClient::ModelConfig( inference::ModelConfigRequest request; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -658,7 +683,7 @@ InferenceServerGrpcClient::ModelConfig( Error InferenceServerGrpcClient::ModelRepositoryIndex( inference::RepositoryIndexResponse* repository_index, - const Headers& headers) + const Headers& headers, const uint64_t timeout_ms) { repository_index->Clear(); Error err; @@ -666,6 +691,7 @@ InferenceServerGrpcClient::ModelRepositoryIndex( inference::RepositoryIndexRequest request; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -687,7 +713,8 @@ Error InferenceServerGrpcClient::LoadModel( const std::string& model_name, const Headers& headers, const std::string& config, - const std::map>& files) + const std::map>& files, + const uint64_t timeout_ms) { Error err; @@ -695,6 +722,7 @@ InferenceServerGrpcClient::LoadModel( inference::RepositoryModelLoadResponse response; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -722,7 +750,8 @@ InferenceServerGrpcClient::LoadModel( Error InferenceServerGrpcClient::UnloadModel( - const std::string& model_name, const Headers& headers) + const std::string& model_name, const Headers& headers, + const uint64_t timeout_ms) { Error err; @@ -730,6 +759,7 @@ InferenceServerGrpcClient::UnloadModel( inference::RepositoryModelUnloadResponse response; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -752,7 +782,7 @@ Error InferenceServerGrpcClient::ModelInferenceStatistics( inference::ModelStatisticsResponse* infer_stat, const std::string& model_name, const std::string& model_version, - const Headers& headers) + const Headers& headers, const uint64_t timeout_ms) { infer_stat->Clear(); Error err; @@ -760,6 +790,7 @@ InferenceServerGrpcClient::ModelInferenceStatistics( inference::ModelStatisticsRequest request; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -783,12 +814,13 @@ Error InferenceServerGrpcClient::UpdateTraceSettings( inference::TraceSettingResponse* response, const std::string& model_name, const std::map>& settings, - const Headers& headers) + const Headers& headers, const uint64_t timeout_ms) { inference::TraceSettingRequest request; grpc::ClientContext context; Error err; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -823,7 +855,7 @@ InferenceServerGrpcClient::UpdateTraceSettings( Error InferenceServerGrpcClient::GetTraceSettings( inference::TraceSettingResponse* settings, const std::string& model_name, - const Headers& headers) + const Headers& headers, const uint64_t timeout_ms) { settings->Clear(); Error err; @@ -831,6 +863,7 @@ InferenceServerGrpcClient::GetTraceSettings( inference::TraceSettingRequest request; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -853,7 +886,8 @@ InferenceServerGrpcClient::GetTraceSettings( Error InferenceServerGrpcClient::SystemSharedMemoryStatus( inference::SystemSharedMemoryStatusResponse* status, - const std::string& region_name, const Headers& headers) + const std::string& region_name, const Headers& headers, + const uint64_t timeout_ms) { status->Clear(); Error err; @@ -861,6 +895,7 @@ InferenceServerGrpcClient::SystemSharedMemoryStatus( inference::SystemSharedMemoryStatusRequest request; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -882,7 +917,7 @@ InferenceServerGrpcClient::SystemSharedMemoryStatus( Error InferenceServerGrpcClient::RegisterSystemSharedMemory( const std::string& name, const std::string& key, const size_t byte_size, - const size_t offset, const Headers& headers) + const size_t offset, const Headers& headers, const uint64_t timeout_ms) { Error err; @@ -890,6 +925,7 @@ InferenceServerGrpcClient::RegisterSystemSharedMemory( inference::SystemSharedMemoryRegisterResponse response; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -914,7 +950,7 @@ InferenceServerGrpcClient::RegisterSystemSharedMemory( Error InferenceServerGrpcClient::UnregisterSystemSharedMemory( - const std::string& name, const Headers& headers) + const std::string& name, const Headers& headers, const uint64_t timeout_ms) { Error err; @@ -922,6 +958,7 @@ InferenceServerGrpcClient::UnregisterSystemSharedMemory( inference::SystemSharedMemoryUnregisterResponse response; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -949,7 +986,8 @@ InferenceServerGrpcClient::UnregisterSystemSharedMemory( Error InferenceServerGrpcClient::CudaSharedMemoryStatus( inference::CudaSharedMemoryStatusResponse* status, - const std::string& region_name, const Headers& headers) + const std::string& region_name, const Headers& headers, + const uint64_t timeout_ms) { status->Clear(); Error err; @@ -957,6 +995,7 @@ InferenceServerGrpcClient::CudaSharedMemoryStatus( inference::CudaSharedMemoryStatusRequest request; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -978,7 +1017,8 @@ InferenceServerGrpcClient::CudaSharedMemoryStatus( Error InferenceServerGrpcClient::RegisterCudaSharedMemory( const std::string& name, const cudaIpcMemHandle_t& cuda_shm_handle, - const size_t device_id, const size_t byte_size, const Headers& headers) + const size_t device_id, const size_t byte_size, const Headers& headers, + const uint64_t timeout_ms) { Error err; @@ -986,6 +1026,7 @@ InferenceServerGrpcClient::RegisterCudaSharedMemory( inference::CudaSharedMemoryRegisterResponse response; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -1010,7 +1051,7 @@ InferenceServerGrpcClient::RegisterCudaSharedMemory( Error InferenceServerGrpcClient::UnregisterCudaSharedMemory( - const std::string& name, const Headers& headers) + const std::string& name, const Headers& headers, const uint64_t timeout_ms) { Error err; @@ -1018,6 +1059,8 @@ InferenceServerGrpcClient::UnregisterCudaSharedMemory( inference::CudaSharedMemoryUnregisterResponse response; grpc::ClientContext context; + SetTimeout(timeout_ms, &context); + for (const auto& it : headers) { context.AddMetadata(it.first, it.second); } @@ -1064,9 +1107,7 @@ InferenceServerGrpcClient::Infer( } if (options.client_timeout_ != 0) { - auto deadline = std::chrono::system_clock::now() + - std::chrono::microseconds(options.client_timeout_); - context.set_deadline(deadline); + SetTimeout(options.client_timeout_, &context); } context.set_compression_algorithm(compression_algorithm); @@ -1128,9 +1169,7 @@ InferenceServerGrpcClient::AsyncInfer( } if (options.client_timeout_ != 0) { - auto deadline = std::chrono::system_clock::now() + - std::chrono::microseconds(options.client_timeout_); - async_request->grpc_context_.set_deadline(deadline); + SetTimeout(options.client_timeout_, &(async_request->grpc_context_)); } async_request->grpc_context_.set_compression_algorithm(compression_algorithm); @@ -1300,9 +1339,7 @@ InferenceServerGrpcClient::StartStream( } if (stream_timeout != 0) { - auto deadline = std::chrono::system_clock::now() + - std::chrono::microseconds(stream_timeout); - grpc_context_.set_deadline(deadline); + SetTimeout(stream_timeout, &grpc_context_); } grpc_context_.set_compression_algorithm(compression_algorithm); diff --git a/src/c++/library/grpc_client.h b/src/c++/library/grpc_client.h index 199ebed40..cc90b12de 100644 --- a/src/c++/library/grpc_client.h +++ b/src/c++/library/grpc_client.h @@ -156,15 +156,25 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// \param live Returns whether the server is live or not. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. - Error IsServerLive(bool* live, const Headers& headers = Headers()); + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. + Error IsServerLive( + bool* live, const Headers& headers = Headers(), + const uint64_t timeout_ms = 0); /// Contact the inference server and get its readiness. /// \param ready Returns whether the server is ready or not. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. - Error IsServerReady(bool* ready, const Headers& headers = Headers()); + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. + Error IsServerReady( + bool* ready, const Headers& headers = Headers(), + const uint64_t timeout_ms = 0); /// Contact the inference server and get the readiness of specified model. /// \param ready Returns whether the specified model is ready or not. @@ -174,21 +184,27 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// choose a version based on the model and internal policy. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error IsModelReady( bool* ready, const std::string& model_name, - const std::string& model_version = "", - const Headers& headers = Headers()); + const std::string& model_version = "", const Headers& headers = Headers(), + const uint64_t timeout_ms = 0); /// Contact the inference server and get its metadata. /// \param server_metadata Returns the server metadata as /// SeverMetadataResponse message. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error ServerMetadata( inference::ServerMetadataResponse* server_metadata, - const Headers& headers = Headers()); + const Headers& headers = Headers(), const uint64_t timeout_ms = 0); /// Contact the inference server and get the metadata of specified model. /// \param model_metadata Returns model metadata as ModelMetadataResponse @@ -199,11 +215,14 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// choose a version based on the model and internal policy. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error ModelMetadata( inference::ModelMetadataResponse* model_metadata, const std::string& model_name, const std::string& model_version = "", - const Headers& headers = Headers()); + const Headers& headers = Headers(), const uint64_t timeout_ms = 0); /// Contact the inference server and get the configuration of specified model. /// \param model_config Returns model config as ModelConfigResponse @@ -214,11 +233,14 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// choose a version based on the model and internal policy. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error ModelConfig( inference::ModelConfigResponse* model_config, const std::string& model_name, const std::string& model_version = "", - const Headers& headers = Headers()); + const Headers& headers = Headers(), const uint64_t timeout_ms = 0); /// Contact the inference server and get the index of model repository /// contents. @@ -226,10 +248,13 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// RepositoryIndexRequestResponse /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error ModelRepositoryIndex( inference::RepositoryIndexResponse* repository_index, - const Headers& headers = Headers()); + const Headers& headers = Headers(), const uint64_t timeout_ms = 0); /// Request the inference server to load or reload specified model. /// \param model_name The name of the model to be loaded or reloaded. @@ -243,19 +268,27 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// The files will form the model directory that the model /// will be loaded from. If specified, 'config' must be provided to be /// the model configuration of the override model directory. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error LoadModel( const std::string& model_name, const Headers& headers = Headers(), const std::string& config = std::string(), - const std::map>& files = {}); + const std::map>& files = {}, + const uint64_t timeout_ms = 0); /// Request the inference server to unload specified model. /// \param model_name The name of the model to be unloaded. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error UnloadModel( - const std::string& model_name, const Headers& headers = Headers()); + const std::string& model_name, const Headers& headers = Headers(), + const uint64_t timeout_ms = 0); /// Contact the inference server and get the inference statistics for the /// specified model name and version. @@ -269,11 +302,14 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// choose a version based on the model and internal policy. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error ModelInferenceStatistics( inference::ModelStatisticsResponse* infer_stat, const std::string& model_name = "", const std::string& model_version = "", - const Headers& headers = Headers()); + const Headers& headers = Headers(), const uint64_t timeout_ms = 0); /// Update the trace settings for the specified model name, or global trace /// settings if model name is not given. @@ -289,13 +325,16 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// loading the model. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error UpdateTraceSettings( inference::TraceSettingResponse* response, const std::string& model_name = "", const std::map>& settings = std::map>(), - const Headers& headers = Headers()); + const Headers& headers = Headers(), const uint64_t timeout_ms = 0); /// Get the trace settings for the specified model name, or global trace /// settings if model name is not given. @@ -305,10 +344,14 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// will be returned. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error GetTraceSettings( inference::TraceSettingResponse* settings, - const std::string& model_name = "", const Headers& headers = Headers()); + const std::string& model_name = "", const Headers& headers = Headers(), + const uint64_t timeout_ms = 0); /// Contact the inference server and get the status for requested system /// shared memory. @@ -319,10 +362,14 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// shared memory will be returned. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error SystemSharedMemoryStatus( inference::SystemSharedMemoryStatusResponse* status, - const std::string& region_name = "", const Headers& headers = Headers()); + const std::string& region_name = "", const Headers& headers = Headers(), + const uint64_t timeout_ms = 0); /// Request the server to register a system shared memory with the provided /// details. @@ -334,10 +381,14 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// the start of the system shared memory region. The default value is zero. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request Error RegisterSystemSharedMemory( const std::string& name, const std::string& key, const size_t byte_size, - const size_t offset = 0, const Headers& headers = Headers()); + const size_t offset = 0, const Headers& headers = Headers(), + const uint64_t timeout_ms = 0); /// Request the server to unregister a system shared memory with the /// specified name. @@ -346,9 +397,13 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// unregistered. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request Error UnregisterSystemSharedMemory( - const std::string& name = "", const Headers& headers = Headers()); + const std::string& name = "", const Headers& headers = Headers(), + const uint64_t timeout_ms = 0); /// Contact the inference server and get the status for requested CUDA /// shared memory. @@ -359,10 +414,14 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// shared memory will be returned. /// \param headers Optional map specifying additional HTTP headers to include /// in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request. + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request. Error CudaSharedMemoryStatus( inference::CudaSharedMemoryStatusResponse* status, - const std::string& region_name = "", const Headers& headers = Headers()); + const std::string& region_name = "", const Headers& headers = Headers(), + const uint64_t timeout_ms = 0); /// Request the server to register a CUDA shared memory with the provided /// details. @@ -374,11 +433,14 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// bytes. /// \param headers Optional map specifying additional HTTP headers to /// include in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request Error RegisterCudaSharedMemory( const std::string& name, const cudaIpcMemHandle_t& cuda_shm_handle, const size_t device_id, const size_t byte_size, - const Headers& headers = Headers()); + const Headers& headers = Headers(), const uint64_t timeout_ms = 0); /// Request the server to unregister a CUDA shared memory with the /// specified name. @@ -387,9 +449,13 @@ class InferenceServerGrpcClient : public InferenceServerClient { /// unregistered. /// \param headers Optional map specifying additional HTTP headers to /// include in the metadata of gRPC request. - /// \return Error object indicating success or failure of the request + /// \param timeout_ms Optional timeout for API call, in microseconds, the + /// request is allowed to take. + /// \return Error object indicating success or + /// failure of the request Error UnregisterCudaSharedMemory( - const std::string& name = "", const Headers& headers = Headers()); + const std::string& name = "", const Headers& headers = Headers(), + const uint64_t timeout_ms = 0); /// Run synchronous inference on server. /// \param result Returns the result of inference. diff --git a/src/c++/tests/client_timeout_test.cc b/src/c++/tests/client_timeout_test.cc index 71226da53..e0d8d002a 100644 --- a/src/c++/tests/client_timeout_test.cc +++ b/src/c++/tests/client_timeout_test.cc @@ -45,8 +45,111 @@ namespace tc = triton::client; } \ } +#define COUNT_ERROR_MSGS(X, MSG, CNT) \ + { \ + tc::Error err = (X); \ + if (!err.IsOk()) { \ + std::cout << "error: " << (MSG) << ": " << err << std::endl; \ + ++CNT; \ + } \ + } + namespace { +void +TestTimeoutAPIs( + const uint64_t timeout_ms, const std::string& model_name, + std::unique_ptr& grpc_client) +{ + std::cout << "testing other apis" << std::endl; + bool success = false; + std::map headers; + inference::ServerMetadataResponse server_metadata; + inference::ModelMetadataResponse model_metadata; + inference::ModelConfigResponse model_config; + inference::RepositoryIndexResponse repository_index; + inference::ModelStatisticsResponse infer_stat; + inference::TraceSettingResponse response; + std::map> settings; + inference::TraceSettingResponse trace_settings; + inference::SystemSharedMemoryStatusResponse shmstatus; + size_t byte_size; + std::string memory_name = ""; + inference::CudaSharedMemoryStatusResponse cuda_shmstatus; + cudaIpcMemHandle_t cuda_shm_handle; + size_t count = 0; + + COUNT_ERROR_MSGS( + grpc_client->IsServerLive(&success, headers, timeout_ms), + "Failed on IsServerLive", count); + COUNT_ERROR_MSGS( + grpc_client->IsServerReady(&success, headers, timeout_ms), + "Failed on IsServerReady", count); + COUNT_ERROR_MSGS( + grpc_client->IsModelReady(&success, model_name, "", headers, timeout_ms), + "Failed on IsModelReady", count); + COUNT_ERROR_MSGS( + grpc_client->ServerMetadata(&server_metadata, headers, timeout_ms), + "Failed on ServerMetadata", count); + COUNT_ERROR_MSGS( + grpc_client->ModelMetadata( + &model_metadata, model_name, "", headers, timeout_ms), + "Failed on ModelMetadata", count); + COUNT_ERROR_MSGS( + grpc_client->ModelConfig( + &model_config, model_name, "", headers, timeout_ms), + "Failed on ModelConfig", count); + COUNT_ERROR_MSGS( + grpc_client->ModelRepositoryIndex(&repository_index, headers, timeout_ms), + "Failed on ModelRepositoryIndex", count); + COUNT_ERROR_MSGS( + grpc_client->ModelInferenceStatistics( + &infer_stat, model_name, "", headers, timeout_ms), + "Failed on ModelInferenceStatistics", count); + COUNT_ERROR_MSGS( + grpc_client->LoadModel(model_name, headers, "", {}, timeout_ms), + "Failed on LoadModel", count); + COUNT_ERROR_MSGS( + grpc_client->UnloadModel(model_name, headers, timeout_ms), + "Failed on UnloadModel", count); + + COUNT_ERROR_MSGS( + grpc_client->UpdateTraceSettings( + &response, model_name, settings, headers, timeout_ms), + "Failed on UpdateTraceSettings", count); + COUNT_ERROR_MSGS( + grpc_client->GetTraceSettings( + &trace_settings, model_name, headers, timeout_ms), + "Failed on GetTraceSettings", count); + COUNT_ERROR_MSGS( + grpc_client->SystemSharedMemoryStatus( + &shmstatus, memory_name, headers, timeout_ms), + "Failed on SystemSharedMemoryStatus", count); + COUNT_ERROR_MSGS( + grpc_client->RegisterSystemSharedMemory( + memory_name, memory_name, byte_size, 0, headers, timeout_ms), + "Failed on RegisterSystemSharedMemory", count); + COUNT_ERROR_MSGS( + grpc_client->UnregisterSystemSharedMemory( + memory_name, headers, timeout_ms), + "Failed on UnregisterSystemSharedMemory", count); + COUNT_ERROR_MSGS( + grpc_client->CudaSharedMemoryStatus( + &cuda_shmstatus, "", headers, timeout_ms), + "Failed on CudaSharedMemoryStatus", count); + COUNT_ERROR_MSGS( + grpc_client->RegisterCudaSharedMemory( + model_name, cuda_shm_handle, 0, byte_size, headers, timeout_ms), + "Failed on RegisterCudaSharedMemory", count); + COUNT_ERROR_MSGS( + grpc_client->UnregisterCudaSharedMemory(memory_name, headers, timeout_ms), + "Failed on UnregisterSystemSharedMemory", count); + if (count != 0) { + std::cerr << "error count: " << count << " which is not 0 " << std::endl; + exit(1); + } +} + void ValidateShapeAndDatatype( const std::string& name, std::shared_ptr result) @@ -109,11 +212,11 @@ void RunSynchronousInference( std::unique_ptr& grpc_client, std::unique_ptr& http_client, - uint32_t client_timeout, std::vector& inputs, + uint32_t client_timeout_ms, std::vector& inputs, std::vector& outputs, tc::InferOptions& options, std::vector& input0_data) { - options.client_timeout_ = client_timeout; + options.client_timeout_ = client_timeout_ms; tc::InferResult* results; if (grpc_client.get() != nullptr) { FAIL_IF_ERR( @@ -141,7 +244,7 @@ void RunAsynchronousInference( std::unique_ptr& grpc_client, std::unique_ptr& http_client, - uint32_t client_timeout, std::vector& inputs, + uint32_t client_timeout_ms, std::vector& inputs, std::vector& outputs, tc::InferOptions& options, std::vector& input0_data) { @@ -167,7 +270,7 @@ RunAsynchronousInference( cv.notify_all(); }; - options.client_timeout_ = client_timeout; + options.client_timeout_ = client_timeout_ms; if (grpc_client.get() != nullptr) { FAIL_IF_ERR( grpc_client->AsyncInfer(callback, options, inputs, outputs), @@ -188,7 +291,7 @@ RunAsynchronousInference( void RunStreamingInference( std::unique_ptr& grpc_client, - uint32_t client_timeout, std::vector& inputs, + uint32_t client_timeout_ms, std::vector& inputs, std::vector& outputs, tc::InferOptions& options, std::vector& input0_data) { @@ -206,13 +309,13 @@ RunStreamingInference( } cv.notify_all(); }, - false /*ship_stats*/, client_timeout), + false /*ship_stats*/, client_timeout_ms), "Failed to start the stream"); FAIL_IF_ERR( grpc_client->AsyncStreamInfer(options, inputs), "unable to run model"); - auto timeout = std::chrono::microseconds(client_timeout); + auto timeout = std::chrono::microseconds(client_timeout_ms); // Wait until all callbacks are invoked or the timeout expires { std::unique_lock lk(mtx); @@ -263,11 +366,12 @@ main(int argc, char** argv) std::string url; bool async = false; bool streaming = false; - uint32_t client_timeout = 0; + uint32_t client_timeout_ms = 0; + bool test_client_apis = false; // Parse commandline... int opt; - while ((opt = getopt(argc, argv, "vi:u:ast:")) != -1) { + while ((opt = getopt(argc, argv, "vi:u:ast:p")) != -1) { switch (opt) { case 'v': verbose = true; @@ -292,7 +396,10 @@ main(int argc, char** argv) streaming = true; break; case 't': - client_timeout = std::stoi(optarg); + client_timeout_ms = std::stoi(optarg); + break; + case 'p': + test_client_apis = true; break; case '?': Usage(argv); @@ -335,6 +442,12 @@ main(int argc, char** argv) "unable to create grpc client"); } + // Test server timeouts for grpc client + if (protocol == "grpc" && test_client_apis) { + TestTimeoutAPIs(client_timeout_ms, model_name, grpc_client); + return 0; + } + // Initialize the tensor data std::vector input0_data(16); for (size_t i = 0; i < 16; ++i) { @@ -370,7 +483,7 @@ main(int argc, char** argv) // The inference settings. Will be using default for now. tc::InferOptions options(model_name); options.model_version_ = model_version; - options.client_timeout_ = client_timeout; + options.client_timeout_ = client_timeout_ms; std::vector inputs = {input0_ptr.get()}; std::vector outputs = {output0_ptr.get()}; @@ -378,14 +491,14 @@ main(int argc, char** argv) // Send inference request to the inference server. if (streaming) { RunStreamingInference( - grpc_client, client_timeout, inputs, outputs, options, input0_data); + grpc_client, client_timeout_ms, inputs, outputs, options, input0_data); } else if (async) { RunAsynchronousInference( - grpc_client, http_client, client_timeout, inputs, outputs, options, + grpc_client, http_client, client_timeout_ms, inputs, outputs, options, input0_data); } else { RunSynchronousInference( - grpc_client, http_client, client_timeout, inputs, outputs, options, + grpc_client, http_client, client_timeout_ms, inputs, outputs, options, input0_data); } diff --git a/src/python/library/tritonclient/grpc/_client.py b/src/python/library/tritonclient/grpc/_client.py index c4f56521f..90904acf1 100755 --- a/src/python/library/tritonclient/grpc/_client.py +++ b/src/python/library/tritonclient/grpc/_client.py @@ -264,7 +264,7 @@ def close(self): self.stop_stream() self._channel.close() - def is_server_live(self, headers=None): + def is_server_live(self, headers=None, client_timeout=None): """Contact the inference server and get liveness. Parameters @@ -272,6 +272,12 @@ def is_server_live(self, headers=None): headers: dict Optional dictionary specifying additional HTTP headers to include in the request. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- @@ -281,7 +287,7 @@ def is_server_live(self, headers=None): Raises ------ InferenceServerException - If unable to get liveness. + If unable to get liveness or has timed out. """ metadata = self._get_metadata(headers) @@ -289,14 +295,16 @@ def is_server_live(self, headers=None): request = service_pb2.ServerLiveRequest() if self._verbose: print("is_server_live, metadata {}\n{}".format(metadata, request)) - response = self._client_stub.ServerLive(request=request, metadata=metadata) + response = self._client_stub.ServerLive( + request=request, metadata=metadata, timeout=client_timeout + ) if self._verbose: print(response) return response.live except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def is_server_ready(self, headers=None): + def is_server_ready(self, headers=None, client_timeout=None): """Contact the inference server and get readiness. Parameters @@ -304,7 +312,12 @@ def is_server_ready(self, headers=None): headers: dict Optional dictionary specifying additional HTTP headers to include in the request. - + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- bool @@ -313,7 +326,7 @@ def is_server_ready(self, headers=None): Raises ------ InferenceServerException - If unable to get readiness. + If unable to get readiness or has timed out. """ metadata = self._get_metadata(headers) @@ -321,14 +334,18 @@ def is_server_ready(self, headers=None): request = service_pb2.ServerReadyRequest() if self._verbose: print("is_server_ready, metadata {}\n{}".format(metadata, request)) - response = self._client_stub.ServerReady(request=request, metadata=metadata) + response = self._client_stub.ServerReady( + request=request, metadata=metadata, timeout=client_timeout + ) if self._verbose: print(response) return response.ready except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def is_model_ready(self, model_name, model_version="", headers=None): + def is_model_ready( + self, model_name, model_version="", headers=None, client_timeout=None + ): """Contact the inference server and get the readiness of specified model. Parameters @@ -342,6 +359,12 @@ def is_model_ready(self, model_name, model_version="", headers=None): headers: dict Optional dictionary specifying additional HTTP headers to include in the request. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- @@ -351,7 +374,7 @@ def is_model_ready(self, model_name, model_version="", headers=None): Raises ------ InferenceServerException - If unable to get model readiness. + If unable to get model readiness or has timed out. """ metadata = self._get_metadata(headers) @@ -363,14 +386,16 @@ def is_model_ready(self, model_name, model_version="", headers=None): ) if self._verbose: print("is_model_ready, metadata {}\n{}".format(metadata, request)) - response = self._client_stub.ModelReady(request=request, metadata=metadata) + response = self._client_stub.ModelReady( + request=request, metadata=metadata, timeout=client_timeout + ) if self._verbose: print(response) return response.ready except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def get_server_metadata(self, headers=None, as_json=False): + def get_server_metadata(self, headers=None, as_json=False, client_timeout=None): """Contact the inference server and get its metadata. Parameters @@ -386,6 +411,13 @@ def get_server_metadata(self, headers=None, as_json=False): are represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. + Returns ------- @@ -396,7 +428,7 @@ def get_server_metadata(self, headers=None, as_json=False): Raises ------ InferenceServerException - If unable to get server metadata. + If unable to get server metadata or has timed out. """ metadata = self._get_metadata(headers) @@ -405,7 +437,7 @@ def get_server_metadata(self, headers=None, as_json=False): if self._verbose: print("get_server_metadata, metadata {}\n{}".format(metadata, request)) response = self._client_stub.ServerMetadata( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -419,7 +451,12 @@ def get_server_metadata(self, headers=None, as_json=False): raise_error_grpc(rpc_error) def get_model_metadata( - self, model_name, model_version="", headers=None, as_json=False + self, + model_name, + model_version="", + headers=None, + as_json=False, + client_timeout=None, ): """Contact the inference server and get the metadata for specified model. @@ -442,6 +479,12 @@ def get_model_metadata( represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- @@ -452,7 +495,7 @@ def get_model_metadata( Raises ------ InferenceServerException - If unable to get model metadata. + If unable to get model metadata or has timed out. """ metadata = self._get_metadata(headers) @@ -465,7 +508,7 @@ def get_model_metadata( if self._verbose: print("get_model_metadata, metadata {}\n{}".format(metadata, request)) response = self._client_stub.ModelMetadata( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -479,7 +522,12 @@ def get_model_metadata( raise_error_grpc(rpc_error) def get_model_config( - self, model_name, model_version="", headers=None, as_json=False + self, + model_name, + model_version="", + headers=None, + as_json=False, + client_timeout=None, ): """Contact the inference server and get the configuration for specified model. @@ -502,6 +550,12 @@ def get_model_config( represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- @@ -512,7 +566,7 @@ def get_model_config( Raises ------ InferenceServerException - If unable to get model configuration. + If unable to get model configuration or has timed out. """ metadata = self._get_metadata(headers) @@ -524,7 +578,9 @@ def get_model_config( ) if self._verbose: print("get_model_config, metadata {}\n{}".format(metadata, request)) - response = self._client_stub.ModelConfig(request=request, metadata=metadata) + response = self._client_stub.ModelConfig( + request=request, metadata=metadata, timeout=client_timeout + ) if self._verbose: print(response) if as_json: @@ -536,7 +592,9 @@ def get_model_config( except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def get_model_repository_index(self, headers=None, as_json=False): + def get_model_repository_index( + self, headers=None, as_json=False, client_timeout=None + ): """Get the index of model repository contents Parameters @@ -553,6 +611,12 @@ def get_model_repository_index(self, headers=None, as_json=False): represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- @@ -571,7 +635,7 @@ def get_model_repository_index(self, headers=None, as_json=False): ) ) response = self._client_stub.RepositoryIndex( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -584,7 +648,14 @@ def get_model_repository_index(self, headers=None, as_json=False): except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def load_model(self, model_name, headers=None, config=None, files=None): + def load_model( + self, + model_name, + headers=None, + config=None, + files=None, + client_timeout=None, + ): """Request the inference server to load or reload specified model. Parameters @@ -604,11 +675,17 @@ def load_model(self, model_name, headers=None, config=None, files=None): The files will form the model directory that the model will be loaded from. If specified, 'config' must be provided to be the model configuration of the override model directory. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Raises ------ InferenceServerException - If unable to load the model. + If unable to load the model or has timed out. """ metadata = self._get_metadata(headers) @@ -626,13 +703,21 @@ def load_model(self, model_name, headers=None, config=None, files=None): if files is not None: for path, content in files.items(): request.parameters[path].bytes_param = content - self._client_stub.RepositoryModelLoad(request=request, metadata=metadata) + self._client_stub.RepositoryModelLoad( + request=request, metadata=metadata, timeout=client_timeout + ) if self._verbose: print("Loaded model '{}'".format(model_name)) except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def unload_model(self, model_name, headers=None, unload_dependents=False): + def unload_model( + self, + model_name, + headers=None, + unload_dependents=False, + client_timeout=None, + ): """Request the inference server to unload specified model. Parameters @@ -644,11 +729,17 @@ def unload_model(self, model_name, headers=None, unload_dependents=False): headers to include in the request. unload_dependents : bool Whether the dependents of the model should also be unloaded. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Raises ------ InferenceServerException - If unable to unload the model. + If unable to unload the model or has timed out. """ metadata = self._get_metadata(headers) @@ -657,14 +748,21 @@ def unload_model(self, model_name, headers=None, unload_dependents=False): request.parameters["unload_dependents"].bool_param = unload_dependents if self._verbose: print("unload_model, metadata {}\n{}".format(metadata, request)) - self._client_stub.RepositoryModelUnload(request=request, metadata=metadata) + self._client_stub.RepositoryModelUnload( + request=request, metadata=metadata, timeout=client_timeout + ) if self._verbose: print("Unloaded model '{}'".format(model_name)) except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) def get_inference_statistics( - self, model_name="", model_version="", headers=None, as_json=False + self, + model_name="", + model_version="", + headers=None, + as_json=False, + client_timeout=None, ): """Get the inference statistics for the specified model name and version. @@ -691,11 +789,17 @@ def get_inference_statistics( represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Raises ------ InferenceServerException - If unable to get the model inference statistics. + If unable to get the model inference statistics or has timed out. """ metadata = self._get_metadata(headers) @@ -712,7 +816,7 @@ def get_inference_statistics( ) ) response = self._client_stub.ModelStatistics( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -726,7 +830,12 @@ def get_inference_statistics( raise_error_grpc(rpc_error) def update_trace_settings( - self, model_name=None, settings={}, headers=None, as_json=False + self, + model_name=None, + settings={}, + headers=None, + as_json=False, + client_timeout=None, ): """Update the trace settings for the specified model name, or global trace settings if model name is not given. @@ -754,6 +863,12 @@ def update_trace_settings( represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- @@ -764,7 +879,7 @@ def update_trace_settings( Raises ------ InferenceServerException - If unable to update the trace settings. + If unable to update the trace settings or has timed out. """ metadata = self._get_metadata(headers) @@ -785,7 +900,7 @@ def update_trace_settings( "update_trace_settings, metadata {}\n{}".format(metadata, request) ) response = self._client_stub.TraceSetting( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -798,7 +913,9 @@ def update_trace_settings( except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def get_trace_settings(self, model_name=None, headers=None, as_json=False): + def get_trace_settings( + self, model_name=None, headers=None, as_json=False, client_timeout=None + ): """Get the trace settings for the specified model name, or global trace settings if model name is not given @@ -820,6 +937,12 @@ def get_trace_settings(self, model_name=None, headers=None, as_json=False): represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- @@ -830,7 +953,7 @@ def get_trace_settings(self, model_name=None, headers=None, as_json=False): Raises ------ InferenceServerException - If unable to get the trace settings. + If unable to get the trace settings or has timed out. """ metadata = self._get_metadata(headers) @@ -841,7 +964,7 @@ def get_trace_settings(self, model_name=None, headers=None, as_json=False): if self._verbose: print("get_trace_settings, metadata {}\n{}".format(metadata, request)) response = self._client_stub.TraceSetting( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -854,7 +977,9 @@ def get_trace_settings(self, model_name=None, headers=None, as_json=False): except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def update_log_settings(self, settings, headers=None, as_json=False): + def update_log_settings( + self, settings, headers=None, as_json=False, client_timeout=None + ): """Update the global log settings. Returns the log settings after the update. Parameters @@ -874,6 +999,12 @@ def update_log_settings(self, settings, headers=None, as_json=False): represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- dict or protobuf message @@ -882,7 +1013,7 @@ def update_log_settings(self, settings, headers=None, as_json=False): Raises ------ InferenceServerException - If unable to update the log settings. + If unable to update the log settings or has timed out. """ metadata = self._get_metadata(headers) try: @@ -900,7 +1031,9 @@ def update_log_settings(self, settings, headers=None, as_json=False): if self._verbose: print("update_log_settings, metadata {}\n{}".format(metadata, request)) - response = self._client_stub.LogSettings(request=request, metadata=metadata) + response = self._client_stub.LogSettings( + request=request, metadata=metadata, timeout=client_timeout + ) if self._verbose: print(response) if as_json: @@ -912,7 +1045,7 @@ def update_log_settings(self, settings, headers=None, as_json=False): except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def get_log_settings(self, headers=None, as_json=False): + def get_log_settings(self, headers=None, as_json=False, client_timeout=None): """Get the global log settings. Parameters ---------- @@ -928,6 +1061,12 @@ def get_log_settings(self, headers=None, as_json=False): represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- dict or protobuf message @@ -936,14 +1075,16 @@ def get_log_settings(self, headers=None, as_json=False): Raises ------ InferenceServerException - If unable to get the log settings. + If unable to get the log settings or has timed out. """ metadata = self._get_metadata(headers) try: request = service_pb2.LogSettingsRequest() if self._verbose: print("get_log_settings, metadata {}\n{}".format(metadata, request)) - response = self._client_stub.LogSettings(request=request, metadata=metadata) + response = self._client_stub.LogSettings( + request=request, metadata=metadata, timeout=client_timeout + ) if self._verbose: print(response) if as_json: @@ -956,7 +1097,7 @@ def get_log_settings(self, headers=None, as_json=False): raise_error_grpc(rpc_error) def get_system_shared_memory_status( - self, region_name="", headers=None, as_json=False + self, region_name="", headers=None, as_json=False, client_timeout=None ): """Request system shared memory status from the server. @@ -977,6 +1118,12 @@ def get_system_shared_memory_status( are represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- @@ -987,7 +1134,7 @@ def get_system_shared_memory_status( Raises ------ InferenceServerException - If unable to get the status of specified shared memory. + If unable to get the status of specified shared memory or has timed out. """ metadata = self._get_metadata(headers) @@ -1000,7 +1147,7 @@ def get_system_shared_memory_status( ) ) response = self._client_stub.SystemSharedMemoryStatus( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -1014,7 +1161,7 @@ def get_system_shared_memory_status( raise_error_grpc(rpc_error) def register_system_shared_memory( - self, name, key, byte_size, offset=0, headers=None + self, name, key, byte_size, offset=0, headers=None, client_timeout=None ): """Request the server to register a system shared memory with the following specification. @@ -1035,11 +1182,17 @@ def register_system_shared_memory( headers: dict Optional dictionary specifying additional HTTP headers to include in the request. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Raises ------ InferenceServerException - If unable to register the specified system shared memory. + If unable to register the specified system shared memory or has timed out. """ metadata = self._get_metadata(headers) @@ -1054,14 +1207,16 @@ def register_system_shared_memory( ) ) self._client_stub.SystemSharedMemoryRegister( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print("Registered system shared memory with name '{}'".format(name)) except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def unregister_system_shared_memory(self, name="", headers=None): + def unregister_system_shared_memory( + self, name="", headers=None, client_timeout=None + ): """Request the server to unregister a system shared memory with the specified name. @@ -1074,11 +1229,17 @@ def unregister_system_shared_memory(self, name="", headers=None): headers: dict Optional dictionary specifying additional HTTP headers to include in the request. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Raises ------ InferenceServerException - If unable to unregister the specified system shared memory region. + If unable to unregister the specified system shared memory region or has timed out. """ metadata = self._get_metadata(headers) @@ -1091,7 +1252,7 @@ def unregister_system_shared_memory(self, name="", headers=None): ) ) self._client_stub.SystemSharedMemoryUnregister( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: if name != "": @@ -1104,7 +1265,7 @@ def unregister_system_shared_memory(self, name="", headers=None): raise_error_grpc(rpc_error) def get_cuda_shared_memory_status( - self, region_name="", headers=None, as_json=False + self, region_name="", headers=None, as_json=False, client_timeout=None ): """Request cuda shared memory status from the server. @@ -1125,6 +1286,12 @@ def get_cuda_shared_memory_status( are represented as string. It is the caller's responsibility to convert these strings back to int64 values as necessary. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Returns ------- @@ -1135,7 +1302,7 @@ def get_cuda_shared_memory_status( Raises ------ InferenceServerException - If unable to get the status of specified shared memory. + If unable to get the status of specified shared memory or has timed out. """ @@ -1149,7 +1316,7 @@ def get_cuda_shared_memory_status( ) ) response = self._client_stub.CudaSharedMemoryStatus( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -1163,7 +1330,13 @@ def get_cuda_shared_memory_status( raise_error_grpc(rpc_error) def register_cuda_shared_memory( - self, name, raw_handle, device_id, byte_size, headers=None + self, + name, + raw_handle, + device_id, + byte_size, + headers=None, + client_timeout=None, ): """Request the server to register a system shared memory with the following specification. @@ -1181,11 +1354,17 @@ def register_cuda_shared_memory( headers: dict Optional dictionary specifying additional HTTP headers to include in the request. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Raises ------ InferenceServerException - If unable to register the specified cuda shared memory. + If unable to register the specified cuda shared memory or has timed out. """ metadata = self._get_metadata(headers) @@ -1203,14 +1382,14 @@ def register_cuda_shared_memory( ) ) self._client_stub.CudaSharedMemoryRegister( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print("Registered cuda shared memory with name '{}'".format(name)) except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - def unregister_cuda_shared_memory(self, name="", headers=None): + def unregister_cuda_shared_memory(self, name="", headers=None, client_timeout=None): """Request the server to unregister a cuda shared memory with the specified name. @@ -1223,11 +1402,17 @@ def unregister_cuda_shared_memory(self, name="", headers=None): headers: dict Optional dictionary specifying additional HTTP headers to include in the request. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. Raises ------ InferenceServerException - If unable to unregister the specified cuda shared memory region. + If unable to unregister the specified cuda shared memory region or has timed out. """ metadata = self._get_metadata(headers) @@ -1240,7 +1425,7 @@ def unregister_cuda_shared_memory(self, name="", headers=None): ) ) self._client_stub.CudaSharedMemoryUnregister( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: if name != "": @@ -1459,6 +1644,12 @@ def async_infer( error with message "Deadline Exceeded" in the callback when the specified time elapses. The default value is None which means client will wait for the response from the server. + client_timeout: float + The maximum end-to-end time, in seconds, the request is allowed + to take. The client will abort request and raise + InferenceServerExeption with message "Deadline Exceeded" when the + specified time elapses. The default value is None which means + client will wait for the response from the server. headers: dict Optional dictionary specifying additional HTTP headers to include in the request. @@ -1575,7 +1766,7 @@ def start_stream( ------ InferenceServerException If unable to start a stream or a stream was already running - for this client. + for this client or has timed out. """ if self._stream is not None: diff --git a/src/python/library/tritonclient/grpc/aio/__init__.py b/src/python/library/tritonclient/grpc/aio/__init__.py index 37414dacb..ecf7b95d1 100755 --- a/src/python/library/tritonclient/grpc/aio/__init__.py +++ b/src/python/library/tritonclient/grpc/aio/__init__.py @@ -27,6 +27,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import base64 +import sys import rapidjson as json from google.protobuf.json_format import MessageToJson @@ -140,7 +141,7 @@ def _get_metadata(self, headers): ) return request_metadata - async def is_server_live(self, headers=None): + async def is_server_live(self, headers=None, client_timeout=None): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -148,7 +149,7 @@ async def is_server_live(self, headers=None): if self._verbose: print("is_server_live, metadata {}\n{}".format(metadata, request)) response = await self._client_stub.ServerLive( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -156,7 +157,7 @@ async def is_server_live(self, headers=None): except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def is_server_ready(self, headers=None): + async def is_server_ready(self, headers=None, client_timeout=None): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -164,7 +165,7 @@ async def is_server_ready(self, headers=None): if self._verbose: print("is_server_ready, metadata {}\n{}".format(metadata, request)) response = await self._client_stub.ServerReady( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -172,7 +173,9 @@ async def is_server_ready(self, headers=None): except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def is_model_ready(self, model_name, model_version="", headers=None): + async def is_model_ready( + self, model_name, model_version="", headers=None, client_timeout=None + ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -184,7 +187,7 @@ async def is_model_ready(self, model_name, model_version="", headers=None): if self._verbose: print("is_model_ready, metadata {}\n{}".format(metadata, request)) response = await self._client_stub.ModelReady( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -192,7 +195,9 @@ async def is_model_ready(self, model_name, model_version="", headers=None): except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def get_server_metadata(self, headers=None, as_json=False): + async def get_server_metadata( + self, headers=None, as_json=False, client_timeout=None + ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -200,7 +205,7 @@ async def get_server_metadata(self, headers=None, as_json=False): if self._verbose: print("get_server_metadata, metadata {}\n{}".format(metadata, request)) response = await self._client_stub.ServerMetadata( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -209,7 +214,12 @@ async def get_server_metadata(self, headers=None, as_json=False): raise_error_grpc(rpc_error) async def get_model_metadata( - self, model_name, model_version="", headers=None, as_json=False + self, + model_name, + model_version="", + headers=None, + as_json=False, + client_timeout=None, ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) @@ -222,7 +232,7 @@ async def get_model_metadata( if self._verbose: print("get_model_metadata, metadata {}\n{}".format(metadata, request)) response = await self._client_stub.ModelMetadata( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -231,7 +241,12 @@ async def get_model_metadata( raise_error_grpc(rpc_error) async def get_model_config( - self, model_name, model_version="", headers=None, as_json=False + self, + model_name, + model_version="", + headers=None, + as_json=False, + client_timeout=None, ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) @@ -244,7 +259,7 @@ async def get_model_config( if self._verbose: print("get_model_config, metadata {}\n{}".format(metadata, request)) response = await self._client_stub.ModelConfig( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -252,7 +267,9 @@ async def get_model_config( except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def get_model_repository_index(self, headers=None, as_json=False): + async def get_model_repository_index( + self, headers=None, as_json=False, client_timeout=None + ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -264,7 +281,7 @@ async def get_model_repository_index(self, headers=None, as_json=False): ) ) response = await self._client_stub.RepositoryIndex( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -272,7 +289,14 @@ async def get_model_repository_index(self, headers=None, as_json=False): except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def load_model(self, model_name, headers=None, config=None, files=None): + async def load_model( + self, + model_name, + headers=None, + config=None, + files=None, + client_timeout=None, + ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -290,14 +314,20 @@ async def load_model(self, model_name, headers=None, config=None, files=None): for path, content in files.items(): request.parameters[path].bytes_param = content await self._client_stub.RepositoryModelLoad( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print("Loaded model '{}'".format(model_name)) except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def unload_model(self, model_name, headers=None, unload_dependents=False): + async def unload_model( + self, + model_name, + headers=None, + unload_dependents=False, + client_timeout=None, + ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -306,7 +336,7 @@ async def unload_model(self, model_name, headers=None, unload_dependents=False): if self._verbose: print("unload_model, metadata {}\n{}".format(metadata, request)) await self._client_stub.RepositoryModelUnload( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print("Unloaded model '{}'".format(model_name)) @@ -314,7 +344,12 @@ async def unload_model(self, model_name, headers=None, unload_dependents=False): raise_error_grpc(rpc_error) async def get_inference_statistics( - self, model_name="", model_version="", headers=None, as_json=False + self, + model_name="", + model_version="", + headers=None, + as_json=False, + client_timeout=None, ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) @@ -322,7 +357,7 @@ async def get_inference_statistics( if type(model_version) != str: raise_error("model version must be a string") request = service_pb2.ModelStatisticsRequest( - name=model_name, version=model_version + name=model_name, version=model_version, timeout=client_timeout ) if self._verbose: print( @@ -340,7 +375,12 @@ async def get_inference_statistics( raise_error_grpc(rpc_error) async def update_trace_settings( - self, model_name=None, settings={}, headers=None, as_json=False + self, + model_name=None, + settings={}, + headers=None, + as_json=False, + client_timeout=None, ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) @@ -361,7 +401,7 @@ async def update_trace_settings( "update_trace_settings, metadata {}\n{}".format(metadata, request) ) response = await self._client_stub.TraceSetting( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -369,7 +409,9 @@ async def update_trace_settings( except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def get_trace_settings(self, model_name=None, headers=None, as_json=False): + async def get_trace_settings( + self, model_name=None, headers=None, as_json=False, client_timeout=None + ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -379,7 +421,7 @@ async def get_trace_settings(self, model_name=None, headers=None, as_json=False) if self._verbose: print("get_trace_settings, metadata {}\n{}".format(metadata, request)) response = await self._client_stub.TraceSetting( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -387,7 +429,9 @@ async def get_trace_settings(self, model_name=None, headers=None, as_json=False) except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def update_log_settings(self, settings, headers=None, as_json=False): + async def update_log_settings( + self, settings, headers=None, as_json=False, client_timeout=None + ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -406,7 +450,7 @@ async def update_log_settings(self, settings, headers=None, as_json=False): if self._verbose: print("update_log_settings, metadata {}\n{}".format(metadata, request)) response = await self._client_stub.LogSettings( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -414,7 +458,7 @@ async def update_log_settings(self, settings, headers=None, as_json=False): except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def get_log_settings(self, headers=None, as_json=False): + async def get_log_settings(self, headers=None, as_json=False, client_timeout=None): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -422,7 +466,7 @@ async def get_log_settings(self, headers=None, as_json=False): if self._verbose: print("get_log_settings, metadata {}\n{}".format(metadata, request)) response = await self._client_stub.LogSettings( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -431,7 +475,7 @@ async def get_log_settings(self, headers=None, as_json=False): raise_error_grpc(rpc_error) async def get_system_shared_memory_status( - self, region_name="", headers=None, as_json=False + self, region_name="", headers=None, as_json=False, client_timeout=None ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) @@ -444,7 +488,7 @@ async def get_system_shared_memory_status( ) ) response = await self._client_stub.SystemSharedMemoryStatus( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -453,7 +497,7 @@ async def get_system_shared_memory_status( raise_error_grpc(rpc_error) async def register_system_shared_memory( - self, name, key, byte_size, offset=0, headers=None + self, name, key, byte_size, offset=0, headers=None, client_timeout=None ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) @@ -468,14 +512,16 @@ async def register_system_shared_memory( ) ) await self._client_stub.SystemSharedMemoryRegister( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print("Registered system shared memory with name '{}'".format(name)) except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def unregister_system_shared_memory(self, name="", headers=None): + async def unregister_system_shared_memory( + self, name="", headers=None, client_timeout=None + ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -487,7 +533,7 @@ async def unregister_system_shared_memory(self, name="", headers=None): ) ) await self._client_stub.SystemSharedMemoryUnregister( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: if name != "": @@ -500,7 +546,7 @@ async def unregister_system_shared_memory(self, name="", headers=None): raise_error_grpc(rpc_error) async def get_cuda_shared_memory_status( - self, region_name="", headers=None, as_json=False + self, region_name="", headers=None, as_json=False, client_timeout=None ): """Refer to tritonclient.grpc.InferenceServerClient""" @@ -514,7 +560,7 @@ async def get_cuda_shared_memory_status( ) ) response = await self._client_stub.CudaSharedMemoryStatus( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print(response) @@ -523,7 +569,13 @@ async def get_cuda_shared_memory_status( raise_error_grpc(rpc_error) async def register_cuda_shared_memory( - self, name, raw_handle, device_id, byte_size, headers=None + self, + name, + raw_handle, + device_id, + byte_size, + headers=None, + client_timeout=None, ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) @@ -541,14 +593,16 @@ async def register_cuda_shared_memory( ) ) await self._client_stub.CudaSharedMemoryRegister( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: print("Registered cuda shared memory with name '{}'".format(name)) except grpc.RpcError as rpc_error: raise_error_grpc(rpc_error) - async def unregister_cuda_shared_memory(self, name="", headers=None): + async def unregister_cuda_shared_memory( + self, name="", headers=None, client_timeout=None + ): """Refer to tritonclient.grpc.InferenceServerClient""" metadata = self._get_metadata(headers) try: @@ -560,7 +614,7 @@ async def unregister_cuda_shared_memory(self, name="", headers=None): ) ) await self._client_stub.CudaSharedMemoryUnregister( - request=request, metadata=metadata + request=request, metadata=metadata, timeout=client_timeout ) if self._verbose: if name != "":