From 6466c7f16641b57b55384a9bf918ae2523d24de9 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 16 Jan 2020 11:00:16 -0800 Subject: [PATCH] Log an error (in dbg mode) if CQ is Shutdown before its Server(s) --- .../impl/codegen/completion_queue_impl.h | 36 +++++++++++++++++-- include/grpcpp/server_impl.h | 6 ++++ src/cpp/common/completion_queue_cc.cc | 6 ++++ src/cpp/server/server_builder.cc | 15 ++++---- src/cpp/server/server_cc.cc | 12 +++++++ 5 files changed, 66 insertions(+), 9 deletions(-) diff --git a/include/grpcpp/impl/codegen/completion_queue_impl.h b/include/grpcpp/impl/codegen/completion_queue_impl.h index 4549aa3719477..41622f609007f 100644 --- a/include/grpcpp/impl/codegen/completion_queue_impl.h +++ b/include/grpcpp/impl/codegen/completion_queue_impl.h @@ -32,11 +32,14 @@ #ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H #define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H +#include + #include #include #include #include #include +#include #include struct grpc_completion_queue; @@ -250,6 +253,11 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { } private: + // Friends for access to server registration lists that enable checking and + // logging on shutdown + friend class ::grpc_impl::ServerBuilder; + friend class ::grpc_impl::Server; + // Friend synchronous wrappers so that they can access Pluck(), which is // a semi-private API geared towards the synchronous implementation. template @@ -274,7 +282,6 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { friend class ::grpc_impl::internal::TemplatedBidiStreamingHandler; template <::grpc::StatusCode code> friend class ::grpc_impl::internal::ErrorMethodHandler; - friend class ::grpc_impl::Server; friend class ::grpc_impl::ServerContextBase; friend class ::grpc::ServerInterface; template @@ -379,13 +386,38 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen { } } + void RegisterServer(const Server* server) { +#ifndef NDEBUG + grpc::internal::MutexLock l(&server_list_mutex_); + server_list_.push_back(server); +#endif + } + void UnregisterServer(const Server* server) { +#ifndef NDEBUG + grpc::internal::MutexLock l(&server_list_mutex_); + server_list_.remove(server); +#endif + } + bool ServerListEmpty() const { +#ifndef NDEBUG + grpc::internal::MutexLock l(&server_list_mutex_); + return server_list_.empty(); +#endif + return true; + } + +#ifndef NDEBUG + mutable grpc::internal::Mutex server_list_mutex_; + std::list server_list_ /* GUARDED_BY(server_list_mutex_) */; +#endif + grpc_completion_queue* cq_; // owned gpr_atm avalanches_in_flight_; }; /// A specific type of completion queue used by the processing of notifications -/// by servers. Instantiated by \a ServerBuilder. +/// by servers. Instantiated by \a ServerBuilder or Server (for health checker). class ServerCompletionQueue : public CompletionQueue { public: bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; } diff --git a/include/grpcpp/server_impl.h b/include/grpcpp/server_impl.h index 9506c419018e9..bcc4e3b45c96d 100644 --- a/include/grpcpp/server_impl.h +++ b/include/grpcpp/server_impl.h @@ -385,6 +385,12 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { // shutdown callback tag (invoked when the CQ is fully shutdown). // It is protected by mu_ CompletionQueue* callback_cq_ = nullptr; + +#ifndef NDEBUG + // List of CQs passed in by user that must be Shutdown only after Server is + // Shutdown. + std::vector cq_list_; +#endif }; } // namespace grpc_impl diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 43c2eee96f89f..93f5f4c4c0f50 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -39,6 +39,12 @@ CompletionQueue::CompletionQueue(grpc_completion_queue* take) void CompletionQueue::Shutdown() { g_gli_initializer.summon(); +#ifndef NDEBUG + if (!ServerListEmpty()) { + gpr_log(GPR_ERROR, + "CompletionQueue shutdown being shutdown before its server."); + } +#endif CompleteAvalanching(); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 71f17da0a4b03..d5e93476532be 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -354,9 +354,8 @@ std::unique_ptr ServerBuilder::BuildAndStart() { // server // 2. cqs_: Completion queues added via AddCompletionQueue() call - for (const auto& value : *sync_server_cqs) { - grpc_server_register_completion_queue(server->server_, value->cq(), - nullptr); + for (const auto& cq : *sync_server_cqs) { + grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); has_frequently_polled_cqs = true; } @@ -369,10 +368,12 @@ std::unique_ptr ServerBuilder::BuildAndStart() { // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by // calling Next() or AsyncNext()) and hence are not safe to be used for // listening to incoming channels. Such completion queues must be registered - // as non-listening queues - for (const auto& value : cqs_) { - grpc_server_register_completion_queue(server->server_, value->cq(), - nullptr); + // as non-listening queues. In debug mode, these should have their server list + // tracked since these are provided the user and must be Shutdown by the user + // after the server is shutdown. + for (const auto& cq : cqs_) { + grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); + cq->RegisterServer(server.get()); } if (!has_frequently_polled_cqs) { diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 5367fb25ebb1f..34ffd59489f2c 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -1249,6 +1249,9 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { } for (size_t i = 0; i < num_cqs; i++) { +#ifndef NDEBUG + cq_list_.push_back(cqs[i]); +#endif if (cqs[i]->IsFrequentlyPolled()) { new UnimplementedAsyncRequest(this, cqs[i]); } @@ -1360,6 +1363,15 @@ void Server::ShutdownInternal(gpr_timespec deadline) { shutdown_notified_ = true; shutdown_cv_.Broadcast(); + +#ifndef NDEBUG + // Unregister this server with the CQs passed into it by the user so that + // those can be checked for properly-ordered shutdown. + for (auto* cq : cq_list_) { + cq->UnregisterServer(this); + } + cq_list_.clear(); +#endif } void Server::Wait() {