From 1fad5eb688cbfaad586c21e5f3052686582da071 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Thu, 7 Sep 2023 13:03:18 -0600 Subject: [PATCH 01/18] Updating control plane client to maintain promise lifetimes --- cpp/mrc/src/internal/control_plane/client.cpp | 62 +++++++++- cpp/mrc/src/internal/control_plane/client.hpp | 115 ++++++++++-------- 2 files changed, 124 insertions(+), 53 deletions(-) diff --git a/cpp/mrc/src/internal/control_plane/client.cpp b/cpp/mrc/src/internal/control_plane/client.cpp index 7a85adc2e..706e103cd 100644 --- a/cpp/mrc/src/internal/control_plane/client.cpp +++ b/cpp/mrc/src/internal/control_plane/client.cpp @@ -21,6 +21,7 @@ #include "internal/grpc/progress_engine.hpp" #include "internal/grpc/promise_handler.hpp" #include "internal/runnable/runnable_resources.hpp" +#include "internal/service.hpp" #include "internal/system/system.hpp" #include "mrc/channel/status.hpp" @@ -33,23 +34,29 @@ #include "mrc/runnable/launch_control.hpp" #include "mrc/runnable/launcher.hpp" #include "mrc/runnable/runner.hpp" +#include "mrc/types.hpp" #include #include #include +#include #include namespace mrc::control_plane { +std::atomic_uint64_t AsyncEventStatus::s_request_id_counter; + Client::Client(resources::PartitionResourceBase& base, std::shared_ptr cq) : resources::PartitionResourceBase(base), + Service(__FILE__), m_cq(std::move(cq)), m_owns_progress_engine(false) {} Client::Client(resources::PartitionResourceBase& base) : resources::PartitionResourceBase(base), + Service(__FILE__), m_cq(std::make_shared()), m_owns_progress_engine(true) {} @@ -161,10 +168,21 @@ void Client::do_handle_event(event_t&& event) // handle a subset of events directly on the event handler case protos::EventType::Response: { - auto* promise = reinterpret_cast*>(event.msg.tag()); - if (promise != nullptr) + auto event_tag = event.msg.tag(); + + if (event_tag != 0) { - promise->set_value(std::move(event.msg)); + // Lock to prevent multiple threads + std::unique_lock lock(m_mutex); + + // Find the promise associated with the event tag + auto promise = m_pending_events.extract(event_tag); + + // Unlock to allow other threads to continue as soon as possible + lock.unlock(); + + // Finally, set the value + promise.mapped().set_value(std::move(event.msg)); } } break; @@ -242,11 +260,12 @@ const mrc::runnable::LaunchOptions& Client::launch_options() const return m_launch_options; } -void Client::issue_event(const protos::EventType& event_type) +AsyncEventStatus Client::issue_event(const protos::EventType& event_type) { protos::Event event; event.set_event(event_type); - m_writer->await_write(std::move(event)); + // m_writer->await_write(std::move(event)); + return this->write_event(std::move(event), false); } void Client::request_update() @@ -260,4 +279,37 @@ void Client::request_update() // } } +AsyncEventStatus Client::write_event(protos::Event event, bool await_response) +{ + if (event.tag() != 0) + { + LOG(WARNING) << "event tag is set but this field should exclusively be used by the control plane client. " + "Clearing to avoid confusion"; + event.clear_tag(); + } + + AsyncEventStatus status; + + if (await_response) + { + // If we are supporting awaiting, create the promise now + Promise promise; + + // Set the future to the status + status.set_future(promise.get_future()); + + // Set the tag to the request ID to allow looking up the promise later + event.set_tag(status.request_id()); + + // Save the promise to the pending promises to be retrieved later + std::unique_lock lock(m_mutex); + + m_pending_events[status.request_id()] = std::move(promise); + } + + // Finally, write the event + m_writer->await_write(std::move(event)); + + return status; +} } // namespace mrc::control_plane diff --git a/cpp/mrc/src/internal/control_plane/client.hpp b/cpp/mrc/src/internal/control_plane/client.hpp index 0a07991a6..62ab275ef 100644 --- a/cpp/mrc/src/internal/control_plane/client.hpp +++ b/cpp/mrc/src/internal/control_plane/client.hpp @@ -24,6 +24,7 @@ #include "internal/service.hpp" #include "mrc/core/error.hpp" +#include "mrc/exceptions/runtime_error.hpp" #include "mrc/node/forward.hpp" #include "mrc/node/writable_entrypoint.hpp" #include "mrc/protos/architect.grpc.pb.h" @@ -35,6 +36,7 @@ #include #include +#include #include #include #include @@ -67,8 +69,54 @@ class Runner; namespace mrc::control_plane { -template -class AsyncStatus; +class AsyncEventStatus +{ + public: + size_t request_id() const + { + return m_request_id; + } + + template + Expected await_response() + { + if (!m_future.valid()) + { + throw exceptions::MrcRuntimeError( + "This AsyncEventStatus is not expecting a response or the response has already been awaited"); + } + + auto event = m_future.get(); + + if (event.has_error()) + { + return Error::create(event.error().message()); + } + + ResponseT response; + if (!event.message().UnpackTo(&response)) + { + throw Error::create("fatal error: unable to unpack message; server sent the wrong message type"); + } + + return response; + } + + private: + AsyncEventStatus() : m_request_id(++s_request_id_counter) {} + + void set_future(Future future) + { + m_future = std::move(future); + } + + static std::atomic_uint64_t s_request_id_counter; + + size_t m_request_id; + Future m_future; + + friend class Client; +}; /** * @brief Primary Control Plane Client @@ -128,13 +176,13 @@ class Client final : public resources::PartitionResourceBase, public Service template Expected await_unary(const protos::EventType& event_type, RequestT&& request); - template - void async_unary(const protos::EventType& event_type, RequestT&& request, AsyncStatus& status); + template + AsyncEventStatus async_unary(const protos::EventType& event_type, RequestT&& request); template - void issue_event(const protos::EventType& event_type, MessageT&& message); + AsyncEventStatus issue_event(const protos::EventType& event_type, MessageT&& message); - void issue_event(const protos::EventType& event_type); + AsyncEventStatus issue_event(const protos::EventType& event_type); bool has_subscription_service(const std::string& name) const; @@ -150,6 +198,8 @@ class Client final : public resources::PartitionResourceBase, public Service void request_update(); private: + AsyncEventStatus write_event(protos::Event event, bool await_response = false); + void route_state_update(std::uint64_t tag, protos::StateUpdate&& update); void do_service_start() final; @@ -201,70 +251,39 @@ class Client final : public resources::PartitionResourceBase, public Service std::mutex m_mutex; + std::map> m_pending_events; + friend network::NetworkResources; }; // todo: create this object from the client which will own the stop_source // create this object with a stop_token associated with the client's stop_source -template -class AsyncStatus -{ - public: - AsyncStatus() = default; - - DELETE_COPYABILITY(AsyncStatus); - DELETE_MOVEABILITY(AsyncStatus); - - Expected await_response() - { - // todo(ryan): expand this into a wait_until with a deadline and a stop token - auto event = m_promise.get_future().get(); - - if (event.has_error()) - { - return Error::create(event.error().message()); - } - - ResponseT response; - if (!event.message().UnpackTo(&response)) - { - throw Error::create("fatal error: unable to unpack message; server sent the wrong message type"); - } - - return response; - } - - private: - Promise m_promise; - friend Client; -}; - template Expected Client::await_unary(const protos::EventType& event_type, RequestT&& request) { - AsyncStatus status; - async_unary(event_type, std::move(request), status); - return status.await_response(); + auto status = this->async_unary(event_type, std::move(request)); + return status.template await_response(); } -template -void Client::async_unary(const protos::EventType& event_type, RequestT&& request, AsyncStatus& status) +template +AsyncEventStatus Client::async_unary(const protos::EventType& event_type, RequestT&& request) { protos::Event event; event.set_event(event_type); - event.set_tag(reinterpret_cast(&status.m_promise)); CHECK(event.mutable_message()->PackFrom(request)); - m_writer->await_write(std::move(event)); + + return this->write_event(std::move(event), true); } template -void Client::issue_event(const protos::EventType& event_type, MessageT&& message) +AsyncEventStatus Client::issue_event(const protos::EventType& event_type, MessageT&& message) { protos::Event event; event.set_event(event_type); CHECK(event.mutable_message()->PackFrom(message)); - m_writer->await_write(std::move(event)); + + return this->write_event(std::move(event), false); } } // namespace mrc::control_plane From 44da13f47fadccacac2b70402ba392a7c82c5baa Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Thu, 7 Sep 2023 13:04:58 -0600 Subject: [PATCH 02/18] Removing Service constructor with description --- cpp/mrc/src/internal/control_plane/client.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/mrc/src/internal/control_plane/client.cpp b/cpp/mrc/src/internal/control_plane/client.cpp index 706e103cd..c20aee330 100644 --- a/cpp/mrc/src/internal/control_plane/client.cpp +++ b/cpp/mrc/src/internal/control_plane/client.cpp @@ -49,14 +49,12 @@ std::atomic_uint64_t AsyncEventStatus::s_request_id_counter; Client::Client(resources::PartitionResourceBase& base, std::shared_ptr cq) : resources::PartitionResourceBase(base), - Service(__FILE__), m_cq(std::move(cq)), m_owns_progress_engine(false) {} Client::Client(resources::PartitionResourceBase& base) : resources::PartitionResourceBase(base), - Service(__FILE__), m_cq(std::make_shared()), m_owns_progress_engine(true) {} From 5a642aab59b35f8e648160843e01dc3c31287874 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 7 Sep 2023 16:58:33 -0700 Subject: [PATCH 03/18] use the size_t specialization of atomic to match other usage of size_t --- cpp/mrc/src/internal/control_plane/client.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/mrc/src/internal/control_plane/client.hpp b/cpp/mrc/src/internal/control_plane/client.hpp index 62ab275ef..3449ef741 100644 --- a/cpp/mrc/src/internal/control_plane/client.hpp +++ b/cpp/mrc/src/internal/control_plane/client.hpp @@ -110,7 +110,7 @@ class AsyncEventStatus m_future = std::move(future); } - static std::atomic_uint64_t s_request_id_counter; + static std::atomic_size_t s_request_id_counter; size_t m_request_id; Future m_future; From 8524d89f35e47567a452aff300549e5f209384e7 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 7 Sep 2023 16:59:04 -0700 Subject: [PATCH 04/18] Allocate promises on the heap --- .../src/internal/grpc/client_streaming.hpp | 35 +++++++++++-------- cpp/mrc/src/internal/grpc/progress_engine.hpp | 4 +-- cpp/mrc/src/internal/grpc/promise_handler.hpp | 3 ++ .../src/internal/grpc/server_streaming.hpp | 28 ++++++++------- 4 files changed, 41 insertions(+), 29 deletions(-) diff --git a/cpp/mrc/src/internal/grpc/client_streaming.hpp b/cpp/mrc/src/internal/grpc/client_streaming.hpp index 8ee6bd82e..3a2bacc5d 100644 --- a/cpp/mrc/src/internal/grpc/client_streaming.hpp +++ b/cpp/mrc/src/internal/grpc/client_streaming.hpp @@ -195,10 +195,11 @@ class ClientStream : private Service, public std::enable_shared_from_this read; + auto* promise = new Promise; + auto future = promise->get_future(); IncomingData data; - m_stream->Read(&data.msg, &read); - auto ok = read.get_future().get(); + m_stream->Read(&data.msg, promise); + auto ok = future.get(); if (!ok) { m_write_channel.reset(); @@ -216,9 +217,10 @@ class ClientStream : private Service, public std::enable_shared_from_this promise; - m_stream->Write(request, &promise); - auto ok = promise.get_future().get(); + auto* promise = new Promise; + auto future = promise->get_future(); + m_stream->Write(request, promise); + auto ok = future.get(); if (!ok) { m_can_write = false; @@ -234,9 +236,10 @@ class ClientStream : private Service, public std::enable_shared_from_this writes_done; - m_stream->WritesDone(&writes_done); - writes_done.get_future().get(); + auto* promise = new Promise; + auto future = promise->get_future(); + m_stream->WritesDone(promise); + future.get(); DVLOG(10) << "client issued writes done to server"; }; } @@ -284,9 +287,10 @@ class ClientStream : private Service, public std::enable_shared_from_this promise; - m_stream->StartCall(&promise); - auto ok = promise.get_future().get(); + auto* promise = new Promise; + auto future = promise->get_future(); + m_stream->StartCall(promise); + auto ok = future.get(); if (!ok) { @@ -328,9 +332,10 @@ class ClientStream : private Service, public std::enable_shared_from_thisawait_join(); m_reader->await_join(); - Promise finish; - m_stream->Finish(&m_status, &finish); - auto ok = finish.get_future().get(); + auto* promise = new Promise; + auto future = promise->get_future(); + m_stream->Finish(&m_status, promise); + auto ok = future.get(); } } diff --git a/cpp/mrc/src/internal/grpc/progress_engine.hpp b/cpp/mrc/src/internal/grpc/progress_engine.hpp index 7bea6239e..a0f4c2b20 100644 --- a/cpp/mrc/src/internal/grpc/progress_engine.hpp +++ b/cpp/mrc/src/internal/grpc/progress_engine.hpp @@ -39,8 +39,8 @@ namespace mrc::rpc { */ struct ProgressEvent { - void* tag; - bool ok; + void* tag = nullptr; + bool ok = false; }; /** diff --git a/cpp/mrc/src/internal/grpc/promise_handler.hpp b/cpp/mrc/src/internal/grpc/promise_handler.hpp index 437a22e69..569cd1ce2 100644 --- a/cpp/mrc/src/internal/grpc/promise_handler.hpp +++ b/cpp/mrc/src/internal/grpc/promise_handler.hpp @@ -22,6 +22,7 @@ #include "mrc/node/generic_sink.hpp" #include +#include namespace mrc::rpc { @@ -32,8 +33,10 @@ class PromiseHandler final : public mrc::node::GenericSink { void on_data(ProgressEvent&& event) final { + DCHECK(event.tag != nullptr); auto* promise = static_cast*>(event.tag); promise->set_value(event.ok); + delete promise; } }; diff --git a/cpp/mrc/src/internal/grpc/server_streaming.hpp b/cpp/mrc/src/internal/grpc/server_streaming.hpp index 0d4da8b44..1439fa2a3 100644 --- a/cpp/mrc/src/internal/grpc/server_streaming.hpp +++ b/cpp/mrc/src/internal/grpc/server_streaming.hpp @@ -223,10 +223,11 @@ class ServerStream : private Service, public std::enable_shared_from_this read; + auto* promise = new Promise; + auto future = promise->get_future(); IncomingData data; - m_stream->Read(&data.msg, &read); - auto ok = read.get_future().get(); + m_stream->Read(&data.msg, promise); + auto ok = future.get(); data.ok = ok; data.stream = writer(); s.on_next(std::move(data)); @@ -247,9 +248,10 @@ class ServerStream : private Service, public std::enable_shared_from_this promise; - m_stream->Write(request, &promise); - auto ok = promise.get_future().get(); + auto* promise = new Promise; + auto future = promise->get_future(); + m_stream->Write(request, promise); + auto ok = future.get(); if (!ok) { DVLOG(10) << "server failed to write to client; disabling writes and beginning shutdown"; @@ -272,9 +274,10 @@ class ServerStream : private Service, public std::enable_shared_from_this finish; - m_stream->Finish(*m_status, &finish); - auto ok = finish.get_future().get(); + auto* promise = new Promise; + auto future = promise->get_future(); + m_stream->Finish(*m_status, promise); + auto ok = future.get(); DVLOG(10) << "server done with finish"; } } @@ -317,9 +320,10 @@ class ServerStream : private Service, public std::enable_shared_from_this promise; - m_init_fn(&promise); - auto ok = promise.get_future().get(); + auto* promise = new Promise; + auto future = promise->get_future(); + m_init_fn(promise); + auto ok = future.get(); if (!ok) { From 2c725554987cd794c6544cd8d173d79d61c48df8 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 8 Sep 2023 11:38:08 -0700 Subject: [PATCH 05/18] Add missing flags to docker launch to mount the working dir and set --cap-add=sys_nice [no ci] --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0f05e754a..57c03eacc 100644 --- a/README.md +++ b/README.md @@ -155,7 +155,7 @@ docker build -t mrc:latest . ``` To run the container ```bash -docker run --gpus all --rm -it mrc:latest /bin/bash +docker run --gpus all --cap-add=sys_nice -v $PWD:/work --rm -it mrc:latest /bin/bash ``` ## Quickstart Guide From 6671019ea540e0dab7c8626c1410f4563540f54b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 11 Sep 2023 13:46:15 -0700 Subject: [PATCH 06/18] Revert "Revert boost upgrade, and update clang to v16 (#382)" This reverts commit 5747320a023a15aeb776efcea7d571cfb8be7654. --- ci/conda/environments/clang_env.yml | 16 ++++++++-------- ci/conda/environments/dev_env.yml | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ci/conda/environments/clang_env.yml b/ci/conda/environments/clang_env.yml index 50d6cc655..9c8867ae4 100644 --- a/ci/conda/environments/clang_env.yml +++ b/ci/conda/environments/clang_env.yml @@ -19,11 +19,11 @@ name: mrc channels: - conda-forge dependencies: - - clang=16 - - clang-tools=16 - - clangdev=16 - - clangxx=16 - - libclang=16 - - libclang-cpp=16 - - llvmdev=16 - - include-what-you-use + - clang=15 + - clang-tools=15 + - clangdev=15 + - clangxx=15 + - libclang=15 + - libclang-cpp=15 + - llvmdev=15 + - include-what-you-use=0.19 diff --git a/ci/conda/environments/dev_env.yml b/ci/conda/environments/dev_env.yml index 58d83d9a7..5af8a91c9 100644 --- a/ci/conda/environments/dev_env.yml +++ b/ci/conda/environments/dev_env.yml @@ -25,7 +25,7 @@ dependencies: - autoconf>=2.69 - bash-completion - benchmark=1.6.0 - - boost-cpp=1.74 + - boost-cpp=1.82 - ccache - cmake=3.24 - cuda-toolkit # Version comes from the channel above @@ -46,7 +46,7 @@ dependencies: - isort - jinja2=3.0 - lcov=1.15 - - libhwloc=2.5 + - libhwloc=2.9.2 - libprotobuf=3.21 - librmm=23.06 - libtool From 29d9f00e4111620d5423943bb40c61980f9863aa Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 11 Sep 2023 15:05:07 -0700 Subject: [PATCH 07/18] Re-work PromiseHandler as a component --- cpp/mrc/src/internal/control_plane/client.cpp | 11 +++-------- cpp/mrc/src/internal/control_plane/client.hpp | 3 ++- cpp/mrc/src/internal/grpc/promise_handler.hpp | 12 +++++++++--- cpp/mrc/src/internal/grpc/server.cpp | 12 ++++-------- cpp/mrc/src/internal/grpc/server.hpp | 3 ++- 5 files changed, 20 insertions(+), 21 deletions(-) diff --git a/cpp/mrc/src/internal/control_plane/client.cpp b/cpp/mrc/src/internal/control_plane/client.cpp index c20aee330..b93502c92 100644 --- a/cpp/mrc/src/internal/control_plane/client.cpp +++ b/cpp/mrc/src/internal/control_plane/client.cpp @@ -19,7 +19,6 @@ #include "internal/control_plane/client/connections_manager.hpp" #include "internal/grpc/progress_engine.hpp" -#include "internal/grpc/promise_handler.hpp" #include "internal/runnable/runnable_resources.hpp" #include "internal/service.hpp" #include "internal/system/system.hpp" @@ -78,13 +77,11 @@ void Client::do_service_start() if (m_owns_progress_engine) { CHECK(m_cq); - auto progress_engine = std::make_unique(m_cq); - auto progress_handler = std::make_unique(); + auto progress_engine = std::make_unique(m_cq); + m_progress_handler = std::make_unique(); - mrc::make_edge(*progress_engine, *progress_handler); + mrc::make_edge(*progress_engine, *m_progress_handler); - m_progress_handler = - runnable().launch_control().prepare_launcher(launch_options(), std::move(progress_handler))->ignition(); m_progress_engine = runnable().launch_control().prepare_launcher(launch_options(), std::move(progress_engine))->ignition(); } @@ -140,7 +137,6 @@ void Client::do_service_await_live() if (m_owns_progress_engine) { m_progress_engine->await_live(); - m_progress_handler->await_live(); } m_event_handler->await_live(); } @@ -155,7 +151,6 @@ void Client::do_service_await_join() { m_cq->Shutdown(); m_progress_engine->await_join(); - m_progress_handler->await_join(); } } diff --git a/cpp/mrc/src/internal/control_plane/client.hpp b/cpp/mrc/src/internal/control_plane/client.hpp index 3449ef741..3b9c0f068 100644 --- a/cpp/mrc/src/internal/control_plane/client.hpp +++ b/cpp/mrc/src/internal/control_plane/client.hpp @@ -19,6 +19,7 @@ #include "internal/control_plane/client/instance.hpp" // IWYU pragma: keep #include "internal/grpc/client_streaming.hpp" +#include "internal/grpc/promise_handler.hpp" #include "internal/grpc/stream_writer.hpp" #include "internal/resources/partition_resources_base.hpp" #include "internal/service.hpp" @@ -225,7 +226,7 @@ class Client final : public resources::PartitionResourceBase, public Service // if true, then the following runners should not be null // if false, then the following runners must be null const bool m_owns_progress_engine; - std::unique_ptr m_progress_handler; + std::unique_ptr m_progress_handler; std::unique_ptr m_progress_engine; std::unique_ptr m_event_handler; diff --git a/cpp/mrc/src/internal/grpc/promise_handler.hpp b/cpp/mrc/src/internal/grpc/promise_handler.hpp index 569cd1ce2..5d307ce3c 100644 --- a/cpp/mrc/src/internal/grpc/promise_handler.hpp +++ b/cpp/mrc/src/internal/grpc/promise_handler.hpp @@ -29,15 +29,21 @@ namespace mrc::rpc { /** * @brief MRC Sink to handle ProgressEvents which correspond to Promise tags */ -class PromiseHandler final : public mrc::node::GenericSink +class PromiseHandler final : public mrc::node::GenericSinkComponent { - void on_data(ProgressEvent&& event) final + mrc::channel::Status on_data(ProgressEvent&& event) final { DCHECK(event.tag != nullptr); auto* promise = static_cast*>(event.tag); promise->set_value(event.ok); delete promise; - } + return mrc::channel::Status::success; + }; + + void on_complete() override + { + SinkProperties::release_edge_connection(); + }; }; } // namespace mrc::rpc diff --git a/cpp/mrc/src/internal/grpc/server.cpp b/cpp/mrc/src/internal/grpc/server.cpp index 9e0c0ecb4..81f5ef3ee 100644 --- a/cpp/mrc/src/internal/grpc/server.cpp +++ b/cpp/mrc/src/internal/grpc/server.cpp @@ -18,7 +18,6 @@ #include "internal/grpc/server.hpp" #include "internal/grpc/progress_engine.hpp" -#include "internal/grpc/promise_handler.hpp" #include "internal/runnable/runnable_resources.hpp" #include "mrc/edge/edge_builder.hpp" @@ -47,11 +46,10 @@ void Server::do_service_start() m_server = m_builder.BuildAndStart(); auto progress_engine = std::make_unique(m_cq); - auto event_handler = std::make_unique(); - mrc::make_edge(*progress_engine, *event_handler); + m_event_hander = std::make_unique(); + mrc::make_edge(*progress_engine, *m_event_hander); m_progress_engine = m_runnable.launch_control().prepare_launcher(std::move(progress_engine))->ignition(); - m_event_hander = m_runnable.launch_control().prepare_launcher(std::move(event_handler))->ignition(); } void Server::do_service_stop() @@ -70,19 +68,17 @@ void Server::do_service_kill() void Server::do_service_await_live() { - if (m_progress_engine && m_event_hander) + if (m_progress_engine) { m_progress_engine->await_live(); - m_event_hander->await_live(); } } void Server::do_service_await_join() { - if (m_progress_engine && m_event_hander) + if (m_progress_engine) { m_progress_engine->await_join(); - m_event_hander->await_join(); } } diff --git a/cpp/mrc/src/internal/grpc/server.hpp b/cpp/mrc/src/internal/grpc/server.hpp index cacd4602d..a92744889 100644 --- a/cpp/mrc/src/internal/grpc/server.hpp +++ b/cpp/mrc/src/internal/grpc/server.hpp @@ -17,6 +17,7 @@ #pragma once +#include "internal/grpc/promise_handler.hpp" #include "internal/service.hpp" #include @@ -61,7 +62,7 @@ class Server : public Service std::shared_ptr m_cq; std::unique_ptr m_server; std::unique_ptr m_progress_engine; - std::unique_ptr m_event_hander; + std::unique_ptr m_event_hander; }; } // namespace mrc::rpc From 5cf17e55d051351239e9a49af15c9c72f88daa38 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 12 Sep 2023 08:07:11 -0700 Subject: [PATCH 08/18] Revert "Re-work PromiseHandler as a component" This reverts commit 29d9f00e4111620d5423943bb40c61980f9863aa. --- cpp/mrc/src/internal/control_plane/client.cpp | 11 ++++++++--- cpp/mrc/src/internal/control_plane/client.hpp | 3 +-- cpp/mrc/src/internal/grpc/promise_handler.hpp | 12 +++--------- cpp/mrc/src/internal/grpc/server.cpp | 12 ++++++++---- cpp/mrc/src/internal/grpc/server.hpp | 3 +-- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/cpp/mrc/src/internal/control_plane/client.cpp b/cpp/mrc/src/internal/control_plane/client.cpp index b93502c92..c20aee330 100644 --- a/cpp/mrc/src/internal/control_plane/client.cpp +++ b/cpp/mrc/src/internal/control_plane/client.cpp @@ -19,6 +19,7 @@ #include "internal/control_plane/client/connections_manager.hpp" #include "internal/grpc/progress_engine.hpp" +#include "internal/grpc/promise_handler.hpp" #include "internal/runnable/runnable_resources.hpp" #include "internal/service.hpp" #include "internal/system/system.hpp" @@ -77,11 +78,13 @@ void Client::do_service_start() if (m_owns_progress_engine) { CHECK(m_cq); - auto progress_engine = std::make_unique(m_cq); - m_progress_handler = std::make_unique(); + auto progress_engine = std::make_unique(m_cq); + auto progress_handler = std::make_unique(); - mrc::make_edge(*progress_engine, *m_progress_handler); + mrc::make_edge(*progress_engine, *progress_handler); + m_progress_handler = + runnable().launch_control().prepare_launcher(launch_options(), std::move(progress_handler))->ignition(); m_progress_engine = runnable().launch_control().prepare_launcher(launch_options(), std::move(progress_engine))->ignition(); } @@ -137,6 +140,7 @@ void Client::do_service_await_live() if (m_owns_progress_engine) { m_progress_engine->await_live(); + m_progress_handler->await_live(); } m_event_handler->await_live(); } @@ -151,6 +155,7 @@ void Client::do_service_await_join() { m_cq->Shutdown(); m_progress_engine->await_join(); + m_progress_handler->await_join(); } } diff --git a/cpp/mrc/src/internal/control_plane/client.hpp b/cpp/mrc/src/internal/control_plane/client.hpp index 3b9c0f068..3449ef741 100644 --- a/cpp/mrc/src/internal/control_plane/client.hpp +++ b/cpp/mrc/src/internal/control_plane/client.hpp @@ -19,7 +19,6 @@ #include "internal/control_plane/client/instance.hpp" // IWYU pragma: keep #include "internal/grpc/client_streaming.hpp" -#include "internal/grpc/promise_handler.hpp" #include "internal/grpc/stream_writer.hpp" #include "internal/resources/partition_resources_base.hpp" #include "internal/service.hpp" @@ -226,7 +225,7 @@ class Client final : public resources::PartitionResourceBase, public Service // if true, then the following runners should not be null // if false, then the following runners must be null const bool m_owns_progress_engine; - std::unique_ptr m_progress_handler; + std::unique_ptr m_progress_handler; std::unique_ptr m_progress_engine; std::unique_ptr m_event_handler; diff --git a/cpp/mrc/src/internal/grpc/promise_handler.hpp b/cpp/mrc/src/internal/grpc/promise_handler.hpp index 5d307ce3c..569cd1ce2 100644 --- a/cpp/mrc/src/internal/grpc/promise_handler.hpp +++ b/cpp/mrc/src/internal/grpc/promise_handler.hpp @@ -29,21 +29,15 @@ namespace mrc::rpc { /** * @brief MRC Sink to handle ProgressEvents which correspond to Promise tags */ -class PromiseHandler final : public mrc::node::GenericSinkComponent +class PromiseHandler final : public mrc::node::GenericSink { - mrc::channel::Status on_data(ProgressEvent&& event) final + void on_data(ProgressEvent&& event) final { DCHECK(event.tag != nullptr); auto* promise = static_cast*>(event.tag); promise->set_value(event.ok); delete promise; - return mrc::channel::Status::success; - }; - - void on_complete() override - { - SinkProperties::release_edge_connection(); - }; + } }; } // namespace mrc::rpc diff --git a/cpp/mrc/src/internal/grpc/server.cpp b/cpp/mrc/src/internal/grpc/server.cpp index 81f5ef3ee..9e0c0ecb4 100644 --- a/cpp/mrc/src/internal/grpc/server.cpp +++ b/cpp/mrc/src/internal/grpc/server.cpp @@ -18,6 +18,7 @@ #include "internal/grpc/server.hpp" #include "internal/grpc/progress_engine.hpp" +#include "internal/grpc/promise_handler.hpp" #include "internal/runnable/runnable_resources.hpp" #include "mrc/edge/edge_builder.hpp" @@ -46,10 +47,11 @@ void Server::do_service_start() m_server = m_builder.BuildAndStart(); auto progress_engine = std::make_unique(m_cq); - m_event_hander = std::make_unique(); - mrc::make_edge(*progress_engine, *m_event_hander); + auto event_handler = std::make_unique(); + mrc::make_edge(*progress_engine, *event_handler); m_progress_engine = m_runnable.launch_control().prepare_launcher(std::move(progress_engine))->ignition(); + m_event_hander = m_runnable.launch_control().prepare_launcher(std::move(event_handler))->ignition(); } void Server::do_service_stop() @@ -68,17 +70,19 @@ void Server::do_service_kill() void Server::do_service_await_live() { - if (m_progress_engine) + if (m_progress_engine && m_event_hander) { m_progress_engine->await_live(); + m_event_hander->await_live(); } } void Server::do_service_await_join() { - if (m_progress_engine) + if (m_progress_engine && m_event_hander) { m_progress_engine->await_join(); + m_event_hander->await_join(); } } diff --git a/cpp/mrc/src/internal/grpc/server.hpp b/cpp/mrc/src/internal/grpc/server.hpp index a92744889..cacd4602d 100644 --- a/cpp/mrc/src/internal/grpc/server.hpp +++ b/cpp/mrc/src/internal/grpc/server.hpp @@ -17,7 +17,6 @@ #pragma once -#include "internal/grpc/promise_handler.hpp" #include "internal/service.hpp" #include @@ -62,7 +61,7 @@ class Server : public Service std::shared_ptr m_cq; std::unique_ptr m_server; std::unique_ptr m_progress_engine; - std::unique_ptr m_event_hander; + std::unique_ptr m_event_hander; }; } // namespace mrc::rpc From c7f69399f2a262a3a7ce7a17cf35c4d2633fb789 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 12 Sep 2023 09:15:40 -0700 Subject: [PATCH 09/18] Revert "Revert "Re-work PromiseHandler as a component"" This reverts commit 5cf17e55d051351239e9a49af15c9c72f88daa38. --- cpp/mrc/src/internal/control_plane/client.cpp | 11 +++-------- cpp/mrc/src/internal/control_plane/client.hpp | 3 ++- cpp/mrc/src/internal/grpc/promise_handler.hpp | 12 +++++++++--- cpp/mrc/src/internal/grpc/server.cpp | 12 ++++-------- cpp/mrc/src/internal/grpc/server.hpp | 3 ++- 5 files changed, 20 insertions(+), 21 deletions(-) diff --git a/cpp/mrc/src/internal/control_plane/client.cpp b/cpp/mrc/src/internal/control_plane/client.cpp index c20aee330..b93502c92 100644 --- a/cpp/mrc/src/internal/control_plane/client.cpp +++ b/cpp/mrc/src/internal/control_plane/client.cpp @@ -19,7 +19,6 @@ #include "internal/control_plane/client/connections_manager.hpp" #include "internal/grpc/progress_engine.hpp" -#include "internal/grpc/promise_handler.hpp" #include "internal/runnable/runnable_resources.hpp" #include "internal/service.hpp" #include "internal/system/system.hpp" @@ -78,13 +77,11 @@ void Client::do_service_start() if (m_owns_progress_engine) { CHECK(m_cq); - auto progress_engine = std::make_unique(m_cq); - auto progress_handler = std::make_unique(); + auto progress_engine = std::make_unique(m_cq); + m_progress_handler = std::make_unique(); - mrc::make_edge(*progress_engine, *progress_handler); + mrc::make_edge(*progress_engine, *m_progress_handler); - m_progress_handler = - runnable().launch_control().prepare_launcher(launch_options(), std::move(progress_handler))->ignition(); m_progress_engine = runnable().launch_control().prepare_launcher(launch_options(), std::move(progress_engine))->ignition(); } @@ -140,7 +137,6 @@ void Client::do_service_await_live() if (m_owns_progress_engine) { m_progress_engine->await_live(); - m_progress_handler->await_live(); } m_event_handler->await_live(); } @@ -155,7 +151,6 @@ void Client::do_service_await_join() { m_cq->Shutdown(); m_progress_engine->await_join(); - m_progress_handler->await_join(); } } diff --git a/cpp/mrc/src/internal/control_plane/client.hpp b/cpp/mrc/src/internal/control_plane/client.hpp index 3449ef741..3b9c0f068 100644 --- a/cpp/mrc/src/internal/control_plane/client.hpp +++ b/cpp/mrc/src/internal/control_plane/client.hpp @@ -19,6 +19,7 @@ #include "internal/control_plane/client/instance.hpp" // IWYU pragma: keep #include "internal/grpc/client_streaming.hpp" +#include "internal/grpc/promise_handler.hpp" #include "internal/grpc/stream_writer.hpp" #include "internal/resources/partition_resources_base.hpp" #include "internal/service.hpp" @@ -225,7 +226,7 @@ class Client final : public resources::PartitionResourceBase, public Service // if true, then the following runners should not be null // if false, then the following runners must be null const bool m_owns_progress_engine; - std::unique_ptr m_progress_handler; + std::unique_ptr m_progress_handler; std::unique_ptr m_progress_engine; std::unique_ptr m_event_handler; diff --git a/cpp/mrc/src/internal/grpc/promise_handler.hpp b/cpp/mrc/src/internal/grpc/promise_handler.hpp index 569cd1ce2..5d307ce3c 100644 --- a/cpp/mrc/src/internal/grpc/promise_handler.hpp +++ b/cpp/mrc/src/internal/grpc/promise_handler.hpp @@ -29,15 +29,21 @@ namespace mrc::rpc { /** * @brief MRC Sink to handle ProgressEvents which correspond to Promise tags */ -class PromiseHandler final : public mrc::node::GenericSink +class PromiseHandler final : public mrc::node::GenericSinkComponent { - void on_data(ProgressEvent&& event) final + mrc::channel::Status on_data(ProgressEvent&& event) final { DCHECK(event.tag != nullptr); auto* promise = static_cast*>(event.tag); promise->set_value(event.ok); delete promise; - } + return mrc::channel::Status::success; + }; + + void on_complete() override + { + SinkProperties::release_edge_connection(); + }; }; } // namespace mrc::rpc diff --git a/cpp/mrc/src/internal/grpc/server.cpp b/cpp/mrc/src/internal/grpc/server.cpp index 9e0c0ecb4..81f5ef3ee 100644 --- a/cpp/mrc/src/internal/grpc/server.cpp +++ b/cpp/mrc/src/internal/grpc/server.cpp @@ -18,7 +18,6 @@ #include "internal/grpc/server.hpp" #include "internal/grpc/progress_engine.hpp" -#include "internal/grpc/promise_handler.hpp" #include "internal/runnable/runnable_resources.hpp" #include "mrc/edge/edge_builder.hpp" @@ -47,11 +46,10 @@ void Server::do_service_start() m_server = m_builder.BuildAndStart(); auto progress_engine = std::make_unique(m_cq); - auto event_handler = std::make_unique(); - mrc::make_edge(*progress_engine, *event_handler); + m_event_hander = std::make_unique(); + mrc::make_edge(*progress_engine, *m_event_hander); m_progress_engine = m_runnable.launch_control().prepare_launcher(std::move(progress_engine))->ignition(); - m_event_hander = m_runnable.launch_control().prepare_launcher(std::move(event_handler))->ignition(); } void Server::do_service_stop() @@ -70,19 +68,17 @@ void Server::do_service_kill() void Server::do_service_await_live() { - if (m_progress_engine && m_event_hander) + if (m_progress_engine) { m_progress_engine->await_live(); - m_event_hander->await_live(); } } void Server::do_service_await_join() { - if (m_progress_engine && m_event_hander) + if (m_progress_engine) { m_progress_engine->await_join(); - m_event_hander->await_join(); } } diff --git a/cpp/mrc/src/internal/grpc/server.hpp b/cpp/mrc/src/internal/grpc/server.hpp index cacd4602d..a92744889 100644 --- a/cpp/mrc/src/internal/grpc/server.hpp +++ b/cpp/mrc/src/internal/grpc/server.hpp @@ -17,6 +17,7 @@ #pragma once +#include "internal/grpc/promise_handler.hpp" #include "internal/service.hpp" #include @@ -61,7 +62,7 @@ class Server : public Service std::shared_ptr m_cq; std::unique_ptr m_server; std::unique_ptr m_progress_engine; - std::unique_ptr m_event_hander; + std::unique_ptr m_event_hander; }; } // namespace mrc::rpc From 0e292989b1054a9e3f8636accfbf8a3ffe71068f Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 12 Sep 2023 09:41:36 -0700 Subject: [PATCH 10/18] Revert "Allocate promises on the heap" This reverts commit 8524d89f35e47567a452aff300549e5f209384e7. --- .../src/internal/grpc/client_streaming.hpp | 35 ++++++++----------- cpp/mrc/src/internal/grpc/progress_engine.hpp | 4 +-- cpp/mrc/src/internal/grpc/promise_handler.hpp | 3 -- .../src/internal/grpc/server_streaming.hpp | 28 +++++++-------- 4 files changed, 29 insertions(+), 41 deletions(-) diff --git a/cpp/mrc/src/internal/grpc/client_streaming.hpp b/cpp/mrc/src/internal/grpc/client_streaming.hpp index 3a2bacc5d..8ee6bd82e 100644 --- a/cpp/mrc/src/internal/grpc/client_streaming.hpp +++ b/cpp/mrc/src/internal/grpc/client_streaming.hpp @@ -195,11 +195,10 @@ class ClientStream : private Service, public std::enable_shared_from_this; - auto future = promise->get_future(); + Promise read; IncomingData data; - m_stream->Read(&data.msg, promise); - auto ok = future.get(); + m_stream->Read(&data.msg, &read); + auto ok = read.get_future().get(); if (!ok) { m_write_channel.reset(); @@ -217,10 +216,9 @@ class ClientStream : private Service, public std::enable_shared_from_this; - auto future = promise->get_future(); - m_stream->Write(request, promise); - auto ok = future.get(); + Promise promise; + m_stream->Write(request, &promise); + auto ok = promise.get_future().get(); if (!ok) { m_can_write = false; @@ -236,10 +234,9 @@ class ClientStream : private Service, public std::enable_shared_from_this; - auto future = promise->get_future(); - m_stream->WritesDone(promise); - future.get(); + Promise writes_done; + m_stream->WritesDone(&writes_done); + writes_done.get_future().get(); DVLOG(10) << "client issued writes done to server"; }; } @@ -287,10 +284,9 @@ class ClientStream : private Service, public std::enable_shared_from_this; - auto future = promise->get_future(); - m_stream->StartCall(promise); - auto ok = future.get(); + Promise promise; + m_stream->StartCall(&promise); + auto ok = promise.get_future().get(); if (!ok) { @@ -332,10 +328,9 @@ class ClientStream : private Service, public std::enable_shared_from_thisawait_join(); m_reader->await_join(); - auto* promise = new Promise; - auto future = promise->get_future(); - m_stream->Finish(&m_status, promise); - auto ok = future.get(); + Promise finish; + m_stream->Finish(&m_status, &finish); + auto ok = finish.get_future().get(); } } diff --git a/cpp/mrc/src/internal/grpc/progress_engine.hpp b/cpp/mrc/src/internal/grpc/progress_engine.hpp index a0f4c2b20..7bea6239e 100644 --- a/cpp/mrc/src/internal/grpc/progress_engine.hpp +++ b/cpp/mrc/src/internal/grpc/progress_engine.hpp @@ -39,8 +39,8 @@ namespace mrc::rpc { */ struct ProgressEvent { - void* tag = nullptr; - bool ok = false; + void* tag; + bool ok; }; /** diff --git a/cpp/mrc/src/internal/grpc/promise_handler.hpp b/cpp/mrc/src/internal/grpc/promise_handler.hpp index 5d307ce3c..812a683e3 100644 --- a/cpp/mrc/src/internal/grpc/promise_handler.hpp +++ b/cpp/mrc/src/internal/grpc/promise_handler.hpp @@ -22,7 +22,6 @@ #include "mrc/node/generic_sink.hpp" #include -#include namespace mrc::rpc { @@ -33,10 +32,8 @@ class PromiseHandler final : public mrc::node::GenericSinkComponent*>(event.tag); promise->set_value(event.ok); - delete promise; return mrc::channel::Status::success; }; diff --git a/cpp/mrc/src/internal/grpc/server_streaming.hpp b/cpp/mrc/src/internal/grpc/server_streaming.hpp index 1439fa2a3..0d4da8b44 100644 --- a/cpp/mrc/src/internal/grpc/server_streaming.hpp +++ b/cpp/mrc/src/internal/grpc/server_streaming.hpp @@ -223,11 +223,10 @@ class ServerStream : private Service, public std::enable_shared_from_this; - auto future = promise->get_future(); + Promise read; IncomingData data; - m_stream->Read(&data.msg, promise); - auto ok = future.get(); + m_stream->Read(&data.msg, &read); + auto ok = read.get_future().get(); data.ok = ok; data.stream = writer(); s.on_next(std::move(data)); @@ -248,10 +247,9 @@ class ServerStream : private Service, public std::enable_shared_from_this; - auto future = promise->get_future(); - m_stream->Write(request, promise); - auto ok = future.get(); + Promise promise; + m_stream->Write(request, &promise); + auto ok = promise.get_future().get(); if (!ok) { DVLOG(10) << "server failed to write to client; disabling writes and beginning shutdown"; @@ -274,10 +272,9 @@ class ServerStream : private Service, public std::enable_shared_from_this; - auto future = promise->get_future(); - m_stream->Finish(*m_status, promise); - auto ok = future.get(); + Promise finish; + m_stream->Finish(*m_status, &finish); + auto ok = finish.get_future().get(); DVLOG(10) << "server done with finish"; } } @@ -320,10 +317,9 @@ class ServerStream : private Service, public std::enable_shared_from_this; - auto future = promise->get_future(); - m_init_fn(promise); - auto ok = future.get(); + Promise promise; + m_init_fn(&promise); + auto ok = promise.get_future().get(); if (!ok) { From 89449b4342203d5388ae865c304038ff7e2d4f61 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 12 Sep 2023 10:14:57 -0700 Subject: [PATCH 11/18] Add test viariant locked to singl core --- cpp/mrc/src/tests/test_control_plane.cpp | 29 ++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/cpp/mrc/src/tests/test_control_plane.cpp b/cpp/mrc/src/tests/test_control_plane.cpp index 96d85945c..71f24a7cd 100644 --- a/cpp/mrc/src/tests/test_control_plane.cpp +++ b/cpp/mrc/src/tests/test_control_plane.cpp @@ -121,6 +121,35 @@ TEST_F(TestControlPlane, SingleClientConnectDisconnect) server->service_await_join(); } +TEST_F(TestControlPlane, SingleClientConnectDisconnectSingleCore) +{ + // Similar to SingleClientConnectDisconnect except both client & server are locked to the same core + // making issue #379 easier to reproduce. + auto sr = make_runtime([](Options& options) { + options.topology().user_cpuset("0"); + }); + auto server = std::make_unique(sr->partition(0).resources().runnable()); + + server->service_start(); + server->service_await_live(); + + auto cr = make_runtime([](Options& options) { + options.topology().user_cpuset("0"); + options.architect_url("localhost:13337"); + }); + + // the total number of partition is system dependent + auto expected_partitions = cr->resources().system().partitions().flattened().size(); + EXPECT_EQ(cr->partition(0).resources().network()->control_plane().client().connections().instance_ids().size(), + expected_partitions); + + // destroying the resources should gracefully shutdown the data plane and the control plane. + cr.reset(); + + server->service_stop(); + server->service_await_join(); +} + TEST_F(TestControlPlane, DoubleClientConnectExchangeDisconnect) { auto sr = make_runtime(); From 77320395e100e787518d3dcb8ab7643eeeb4ff5e Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 12 Sep 2023 10:25:14 -0700 Subject: [PATCH 12/18] IWYU fixes --- cpp/mrc/src/internal/control_plane/client.cpp | 3 +++ cpp/mrc/src/internal/control_plane/client.hpp | 11 +++++++---- cpp/mrc/src/internal/grpc/server.cpp | 1 + cpp/mrc/src/internal/grpc/server.hpp | 5 ++++- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/cpp/mrc/src/internal/control_plane/client.cpp b/cpp/mrc/src/internal/control_plane/client.cpp index b93502c92..03af88e3e 100644 --- a/cpp/mrc/src/internal/control_plane/client.cpp +++ b/cpp/mrc/src/internal/control_plane/client.cpp @@ -19,6 +19,8 @@ #include "internal/control_plane/client/connections_manager.hpp" #include "internal/grpc/progress_engine.hpp" +#include "internal/grpc/promise_handler.hpp" // for PromiseHandler +#include "internal/grpc/stream_writer.hpp" // for StreamWriter #include "internal/runnable/runnable_resources.hpp" #include "internal/service.hpp" #include "internal/system/system.hpp" @@ -35,6 +37,7 @@ #include "mrc/runnable/runner.hpp" #include "mrc/types.hpp" +#include // for promise #include #include #include diff --git a/cpp/mrc/src/internal/control_plane/client.hpp b/cpp/mrc/src/internal/control_plane/client.hpp index 3b9c0f068..f23990614 100644 --- a/cpp/mrc/src/internal/control_plane/client.hpp +++ b/cpp/mrc/src/internal/control_plane/client.hpp @@ -19,8 +19,6 @@ #include "internal/control_plane/client/instance.hpp" // IWYU pragma: keep #include "internal/grpc/client_streaming.hpp" -#include "internal/grpc/promise_handler.hpp" -#include "internal/grpc/stream_writer.hpp" #include "internal/resources/partition_resources_base.hpp" #include "internal/service.hpp" @@ -32,12 +30,11 @@ #include "mrc/protos/architect.pb.h" #include "mrc/runnable/launch_options.hpp" #include "mrc/types.hpp" -#include "mrc/utils/macros.hpp" -#include #include #include +#include // for size_t #include #include #include @@ -68,6 +65,12 @@ namespace mrc::runnable { class Runner; } // namespace mrc::runnable +namespace mrc::rpc { +class PromiseHandler; +template +struct StreamWriter; +} // namespace mrc::rpc + namespace mrc::control_plane { class AsyncEventStatus diff --git a/cpp/mrc/src/internal/grpc/server.cpp b/cpp/mrc/src/internal/grpc/server.cpp index 81f5ef3ee..65de1417e 100644 --- a/cpp/mrc/src/internal/grpc/server.cpp +++ b/cpp/mrc/src/internal/grpc/server.cpp @@ -18,6 +18,7 @@ #include "internal/grpc/server.hpp" #include "internal/grpc/progress_engine.hpp" +#include "internal/grpc/promise_handler.hpp" // for PromiseHandler #include "internal/runnable/runnable_resources.hpp" #include "mrc/edge/edge_builder.hpp" diff --git a/cpp/mrc/src/internal/grpc/server.hpp b/cpp/mrc/src/internal/grpc/server.hpp index a92744889..db9436d95 100644 --- a/cpp/mrc/src/internal/grpc/server.hpp +++ b/cpp/mrc/src/internal/grpc/server.hpp @@ -17,7 +17,6 @@ #pragma once -#include "internal/grpc/promise_handler.hpp" #include "internal/service.hpp" #include @@ -35,6 +34,10 @@ namespace mrc::runnable { class Runner; } // namespace mrc::runnable +namespace mrc::rpc { +class PromiseHandler; +} // namespace mrc::rpc + namespace mrc::rpc { class Server : public Service From 1b208866a441719290e1177eafd26332e6710b87 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 13 Sep 2023 09:52:49 -0700 Subject: [PATCH 13/18] Make do_writes_done a public method --- .../src/internal/grpc/client_streaming.hpp | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/cpp/mrc/src/internal/grpc/client_streaming.hpp b/cpp/mrc/src/internal/grpc/client_streaming.hpp index 8ee6bd82e..eac6b82ad 100644 --- a/cpp/mrc/src/internal/grpc/client_streaming.hpp +++ b/cpp/mrc/src/internal/grpc/client_streaming.hpp @@ -47,6 +47,7 @@ #include #include #include +#include #include namespace mrc::rpc { @@ -179,7 +180,25 @@ class ClientStream : private Service, public std::enable_shared_from_this writes_done; + m_stream->WritesDone(&writes_done); + writes_done.get_future().get(); + DVLOG(10) << "client issued writes done to server"; + Promise finish; + m_stream->Finish(&m_status, &finish); + auto ok = finish.get_future().get(); + if (!ok) + { + throw std::runtime_error("Failed to issue WritesDone " + m_status.error_message()); + } + }; + } template void attach_to(NodeT& sink) @@ -228,19 +247,6 @@ class ClientStream : private Service, public std::enable_shared_from_this writes_done; - m_stream->WritesDone(&writes_done); - writes_done.get_future().get(); - DVLOG(10) << "client issued writes done to server"; - }; - } - // initialization performed after the grpc client stream was successfully initialized void do_init() { From c226ec1f23e7bd4bf8f9a95c6c9c68da01a8e5c3 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 13 Sep 2023 09:53:19 -0700 Subject: [PATCH 14/18] Add do_writes_done method --- cpp/mrc/src/internal/control_plane/client.cpp | 5 +++++ cpp/mrc/src/internal/control_plane/client.hpp | 2 ++ 2 files changed, 7 insertions(+) diff --git a/cpp/mrc/src/internal/control_plane/client.cpp b/cpp/mrc/src/internal/control_plane/client.cpp index 03af88e3e..f129b67ae 100644 --- a/cpp/mrc/src/internal/control_plane/client.cpp +++ b/cpp/mrc/src/internal/control_plane/client.cpp @@ -123,6 +123,11 @@ void Client::do_service_start() forward_state(State::Connected); } +void Client::do_writes_done() +{ + m_stream->do_writes_done(); +} + void Client::do_service_stop() { m_writer->finish(); diff --git a/cpp/mrc/src/internal/control_plane/client.hpp b/cpp/mrc/src/internal/control_plane/client.hpp index f23990614..557c4b8f9 100644 --- a/cpp/mrc/src/internal/control_plane/client.hpp +++ b/cpp/mrc/src/internal/control_plane/client.hpp @@ -201,6 +201,8 @@ class Client final : public resources::PartitionResourceBase, public Service // request that the server start an update void request_update(); + void do_writes_done(); + private: AsyncEventStatus write_event(protos::Event event, bool await_response = false); From bd740aa471ff2f56e9ce73986ff36b1681c9c106 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 13 Sep 2023 09:53:46 -0700 Subject: [PATCH 15/18] wip --- cpp/mrc/src/tests/test_control_plane.cpp | 31 ++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/cpp/mrc/src/tests/test_control_plane.cpp b/cpp/mrc/src/tests/test_control_plane.cpp index 71f24a7cd..44b67c502 100644 --- a/cpp/mrc/src/tests/test_control_plane.cpp +++ b/cpp/mrc/src/tests/test_control_plane.cpp @@ -150,6 +150,37 @@ TEST_F(TestControlPlane, SingleClientConnectDisconnectSingleCore) server->service_await_join(); } +TEST_F(TestControlPlane, ClientDisconnectEarly) +{ + // Similar to SingleClientConnectDisconnect except both client & server are locked to the same core + // making issue #379 easier to reproduce. + auto sr = make_runtime([](Options& options) { + options.topology().user_cpuset("0"); + }); + auto server = std::make_unique(sr->partition(0).resources().runnable()); + + server->service_start(); + server->service_await_live(); + + auto cr = make_runtime([](Options& options) { + options.topology().user_cpuset("0"); + options.architect_url("localhost:13337"); + }); + + // the total number of partition is system dependent + auto expected_partitions = cr->resources().system().partitions().flattened().size(); + EXPECT_EQ(cr->partition(0).resources().network()->control_plane().client().connections().instance_ids().size(), + expected_partitions); + + auto& client = cr->partition(0).resources().network()->control_plane().client(); + client.do_writes_done(); + + server->service_stop(); + server->service_await_join(); + + cr.reset(); +} + TEST_F(TestControlPlane, DoubleClientConnectExchangeDisconnect) { auto sr = make_runtime(); From a37366a4b57a0320cddf434f738ea3f7e8e8d1d9 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 18 Sep 2023 09:40:23 -0700 Subject: [PATCH 16/18] Revert "wip" This reverts commit bd740aa471ff2f56e9ce73986ff36b1681c9c106. --- cpp/mrc/src/tests/test_control_plane.cpp | 31 ------------------------ 1 file changed, 31 deletions(-) diff --git a/cpp/mrc/src/tests/test_control_plane.cpp b/cpp/mrc/src/tests/test_control_plane.cpp index 44b67c502..71f24a7cd 100644 --- a/cpp/mrc/src/tests/test_control_plane.cpp +++ b/cpp/mrc/src/tests/test_control_plane.cpp @@ -150,37 +150,6 @@ TEST_F(TestControlPlane, SingleClientConnectDisconnectSingleCore) server->service_await_join(); } -TEST_F(TestControlPlane, ClientDisconnectEarly) -{ - // Similar to SingleClientConnectDisconnect except both client & server are locked to the same core - // making issue #379 easier to reproduce. - auto sr = make_runtime([](Options& options) { - options.topology().user_cpuset("0"); - }); - auto server = std::make_unique(sr->partition(0).resources().runnable()); - - server->service_start(); - server->service_await_live(); - - auto cr = make_runtime([](Options& options) { - options.topology().user_cpuset("0"); - options.architect_url("localhost:13337"); - }); - - // the total number of partition is system dependent - auto expected_partitions = cr->resources().system().partitions().flattened().size(); - EXPECT_EQ(cr->partition(0).resources().network()->control_plane().client().connections().instance_ids().size(), - expected_partitions); - - auto& client = cr->partition(0).resources().network()->control_plane().client(); - client.do_writes_done(); - - server->service_stop(); - server->service_await_join(); - - cr.reset(); -} - TEST_F(TestControlPlane, DoubleClientConnectExchangeDisconnect) { auto sr = make_runtime(); From 76c3ef25360eea88a9b8b63c2464791bb4615a1c Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 18 Sep 2023 09:42:00 -0700 Subject: [PATCH 17/18] Revert "Add do_writes_done method" This reverts commit c226ec1f23e7bd4bf8f9a95c6c9c68da01a8e5c3. --- cpp/mrc/src/internal/control_plane/client.cpp | 5 ----- cpp/mrc/src/internal/control_plane/client.hpp | 2 -- 2 files changed, 7 deletions(-) diff --git a/cpp/mrc/src/internal/control_plane/client.cpp b/cpp/mrc/src/internal/control_plane/client.cpp index f129b67ae..03af88e3e 100644 --- a/cpp/mrc/src/internal/control_plane/client.cpp +++ b/cpp/mrc/src/internal/control_plane/client.cpp @@ -123,11 +123,6 @@ void Client::do_service_start() forward_state(State::Connected); } -void Client::do_writes_done() -{ - m_stream->do_writes_done(); -} - void Client::do_service_stop() { m_writer->finish(); diff --git a/cpp/mrc/src/internal/control_plane/client.hpp b/cpp/mrc/src/internal/control_plane/client.hpp index 557c4b8f9..f23990614 100644 --- a/cpp/mrc/src/internal/control_plane/client.hpp +++ b/cpp/mrc/src/internal/control_plane/client.hpp @@ -201,8 +201,6 @@ class Client final : public resources::PartitionResourceBase, public Service // request that the server start an update void request_update(); - void do_writes_done(); - private: AsyncEventStatus write_event(protos::Event event, bool await_response = false); From 7a277cbda358d99b9672d43cc35ed6c5989af4d1 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 18 Sep 2023 09:47:40 -0700 Subject: [PATCH 18/18] Revert "Make do_writes_done a public method" This reverts commit 1b208866a441719290e1177eafd26332e6710b87. --- .../src/internal/grpc/client_streaming.hpp | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/cpp/mrc/src/internal/grpc/client_streaming.hpp b/cpp/mrc/src/internal/grpc/client_streaming.hpp index eac6b82ad..8ee6bd82e 100644 --- a/cpp/mrc/src/internal/grpc/client_streaming.hpp +++ b/cpp/mrc/src/internal/grpc/client_streaming.hpp @@ -47,7 +47,6 @@ #include #include #include -#include #include namespace mrc::rpc { @@ -180,25 +179,7 @@ class ClientStream : private Service, public std::enable_shared_from_this writes_done; - m_stream->WritesDone(&writes_done); - writes_done.get_future().get(); - DVLOG(10) << "client issued writes done to server"; - Promise finish; - m_stream->Finish(&m_status, &finish); - auto ok = finish.get_future().get(); - if (!ok) - { - throw std::runtime_error("Failed to issue WritesDone " + m_status.error_message()); - } - }; - } + // todo(ryan) - add a method to trigger a writes done template void attach_to(NodeT& sink) @@ -247,6 +228,19 @@ class ClientStream : private Service, public std::enable_shared_from_this writes_done; + m_stream->WritesDone(&writes_done); + writes_done.get_future().get(); + DVLOG(10) << "client issued writes done to server"; + }; + } + // initialization performed after the grpc client stream was successfully initialized void do_init() {