diff --git a/rd-cpp/src/rd_framework_cpp/src/main/base/IRdReactive.h b/rd-cpp/src/rd_framework_cpp/src/main/base/IRdReactive.h index 0327cc762..9356c5290 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/base/IRdReactive.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/base/IRdReactive.h @@ -13,7 +13,7 @@ namespace rd * \brief A non-root node in an object graph which can be synchronized with its remote copy over a network or * a similar connection, and which allows to subscribe to its changes. */ -class RD_FRAMEWORK_API IRdReactive : public virtual IRdBindable +class RD_FRAMEWORK_API IRdReactive { public: /** diff --git a/rd-cpp/src/rd_framework_cpp/src/main/base/IWire.h b/rd-cpp/src/rd_framework_cpp/src/main/base/IWire.h index 5fd316d9a..ff98c7d78 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/base/IWire.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/base/IWire.h @@ -14,6 +14,7 @@ namespace rd { +class RdReactiveBase; /** * \brief Sends and receives serialized object data over a network or a similar connection. */ @@ -45,7 +46,7 @@ class RD_FRAMEWORK_API IWire * \param lifetime lifetime of subscription. * \param entity to be subscripted */ - virtual void advise(Lifetime lifetime, IRdReactive const* entity) const = 0; + virtual void advise(Lifetime lifetime, RdReactiveBase const* entity) const = 0; }; } // namespace rd #if defined(_MSC_VER) diff --git a/rd-cpp/src/rd_framework_cpp/src/main/base/WireBase.cpp b/rd-cpp/src/rd_framework_cpp/src/main/base/WireBase.cpp index eae265157..0f4adf04c 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/base/WireBase.cpp +++ b/rd-cpp/src/rd_framework_cpp/src/main/base/WireBase.cpp @@ -2,7 +2,7 @@ namespace rd { -void WireBase::advise(Lifetime lifetime, const IRdReactive* entity) const +void WireBase::advise(Lifetime lifetime, const RdReactiveBase* entity) const { message_broker.advise_on(lifetime, entity); } diff --git a/rd-cpp/src/rd_framework_cpp/src/main/base/WireBase.h b/rd-cpp/src/rd_framework_cpp/src/main/base/WireBase.h index 229ba4046..144d33f2a 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/base/WireBase.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/base/WireBase.h @@ -25,7 +25,7 @@ class RD_FRAMEWORK_API WireBase : public IWire virtual ~WireBase() = default; // endregion - void advise(Lifetime lifetime, IRdReactive const* entity) const override; + virtual void advise(Lifetime lifetime, RdReactiveBase const* entity) const override; }; } // namespace rd diff --git a/rd-cpp/src/rd_framework_cpp/src/main/ext/ExtWire.cpp b/rd-cpp/src/rd_framework_cpp/src/main/ext/ExtWire.cpp index 49f2794f5..ba7169469 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/ext/ExtWire.cpp +++ b/rd-cpp/src/rd_framework_cpp/src/main/ext/ExtWire.cpp @@ -28,7 +28,7 @@ ExtWire::ExtWire() }); } -void ExtWire::advise(Lifetime lifetime, IRdReactive const* entity) const +void ExtWire::advise(Lifetime lifetime, RdReactiveBase const* entity) const { realWire->advise(lifetime, entity); } diff --git a/rd-cpp/src/rd_framework_cpp/src/main/ext/ExtWire.h b/rd-cpp/src/rd_framework_cpp/src/main/ext/ExtWire.h index 7222cecd6..a527e6647 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/ext/ExtWire.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/ext/ExtWire.h @@ -29,7 +29,7 @@ class RD_FRAMEWORK_API ExtWire final : public IWire mutable IWire const* realWire = nullptr; - void advise(Lifetime lifetime, IRdReactive const* entity) const override; + void advise(Lifetime lifetime, RdReactiveBase const* entity) const override; void send(RdId const& id, std::function writer) const override; }; diff --git a/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.cpp b/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.cpp index 8d14973e0..451a7538a 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.cpp +++ b/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.cpp @@ -1,5 +1,6 @@ #include "protocol/MessageBroker.h" +#include "base/RdReactiveBase.h" #include "spdlog/sinks/stdout_color_sinks.h" namespace rd @@ -13,7 +14,7 @@ static void execute(const IRdReactive* that, Buffer msg) that->on_wire_received(std::move(msg)); } -void MessageBroker::invoke(const IRdReactive* that, Buffer msg, bool sync) const +void MessageBroker::invoke(const RdReactiveBase* that, Buffer msg, bool sync) const { if (sync) { @@ -51,7 +52,7 @@ void MessageBroker::dispatch(RdId id, Buffer message) const { // synchronized recursively std::lock_guard guard(lock); - IRdReactive const* s = subscriptions[id]; + RdReactiveBase const* s = subscriptions[id]; if (s == nullptr) { auto it = broker.find(id); @@ -64,7 +65,7 @@ void MessageBroker::dispatch(RdId id, Buffer message) const auto action = [this, it, id]() mutable { auto& current = it->second; - IRdReactive const* subscription = subscriptions[id]; + RdReactiveBase const* subscription = subscriptions[id]; optional message; { @@ -127,7 +128,7 @@ void MessageBroker::dispatch(RdId id, Buffer message) const // } } -void MessageBroker::advise_on(Lifetime lifetime, IRdReactive const* entity) const +void MessageBroker::advise_on(Lifetime lifetime, RdReactiveBase const* entity) const { RD_ASSERT_MSG(!entity->get_id().isNull(), ("id is null for entities: " + std::string(typeid(*entity).name()))) @@ -138,8 +139,7 @@ void MessageBroker::advise_on(Lifetime lifetime, IRdReactive const* entity) cons if (!lifetime->is_terminated()) { auto key = entity->get_id(); - IRdReactive const* value = entity; - subscriptions[key] = value; + subscriptions[key] = entity; lifetime->add_action([this, key]() { subscriptions.erase(key); }); } } diff --git a/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.h b/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.h index cce4102fa..b7a0da2c5 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.h @@ -18,6 +18,8 @@ namespace rd { +class RdReactiveBase; + class RD_FRAMEWORK_API Mq { public: @@ -42,14 +44,14 @@ class RD_FRAMEWORK_API MessageBroker final { private: IScheduler* default_scheduler = nullptr; - mutable rd::unordered_map subscriptions; + mutable rd::unordered_map subscriptions; mutable rd::unordered_map broker; mutable std::recursive_mutex lock; static std::shared_ptr logger; - void invoke(const IRdReactive* that, Buffer msg, bool sync = false) const; + void invoke(const RdReactiveBase* that, Buffer msg, bool sync = false) const; public: // region ctor/dtor @@ -59,7 +61,7 @@ class RD_FRAMEWORK_API MessageBroker final void dispatch(RdId id, Buffer message) const; - void advise_on(Lifetime lifetime, IRdReactive const* entity) const; + void advise_on(Lifetime lifetime, RdReactiveBase const* entity) const; }; } // namespace rd #if defined(_MSC_VER) diff --git a/rd-cpp/src/rd_framework_cpp/src/main/task/RdCall.h b/rd-cpp/src/rd_framework_cpp/src/main/task/RdCall.h index 5f8dcc443..e0a549c12 100644 --- a/rd-cpp/src/rd_framework_cpp/src/main/task/RdCall.h +++ b/rd-cpp/src/rd_framework_cpp/src/main/task/RdCall.h @@ -69,7 +69,7 @@ class RdCall : public virtual RdReactiveBase, public ISerializable * \param request value to deliver * \return result of remote invoking */ - WiredRdTask sync(TReq const& request, std::chrono::milliseconds timeout = 200ms) const + WiredRdTask sync(TReq const& request, std::chrono::milliseconds timeout = std::chrono::milliseconds(200)) const { auto task = start_internal(request, true, &SynchronousScheduler::Instance()); auto time_at_start = std::chrono::system_clock::now();