diff --git a/src/base/types.hpp b/src/base/types.hpp index 1d55128..206fe57 100644 --- a/src/base/types.hpp +++ b/src/base/types.hpp @@ -127,6 +127,13 @@ namespace Kazv { Invite, Join, Leave }; + + namespace detail + { + // emulates declval() but returns lvalue reference + template + typename std::add_lvalue_reference::type declref() noexcept; + } } namespace nlohmann { diff --git a/src/client/sdk.hpp b/src/client/sdk.hpp index d8f0c32..818f9ac 100644 --- a/src/client/sdk.hpp +++ b/src/client/sdk.hpp @@ -25,13 +25,6 @@ namespace Kazv { - namespace detail - { - // emulates declval() but returns lvalue reference - template - typename std::add_lvalue_reference::type declref() noexcept; - } - template class Sdk { diff --git a/src/eventemitter/lagerstoreeventemitter.hpp b/src/eventemitter/lagerstoreeventemitter.hpp index e5c8c6d..692344d 100644 --- a/src/eventemitter/lagerstoreeventemitter.hpp +++ b/src/eventemitter/lagerstoreeventemitter.hpp @@ -18,8 +18,16 @@ */ +#include +#include +#include + + #include #include +#include + +#include "types.hpp" #include "eventinterface.hpp" @@ -29,16 +37,82 @@ namespace Kazv { struct Model { KazvEvent curEvent; }; struct Action { KazvEvent nextEvent; }; - static Model update(Model, Action a) { - return Model{a.nextEvent}; + + struct ListenerHolder; + + using Result = std::pair>>; + + using SlotT = std::function; + + struct Listener + { + void emit(KazvEvent e) { + for (const auto &slot: m_slots) { + slot(e); + } + } + + void connect(SlotT slot) { + m_slots.push_back(std::move(slot)); + } + + std::vector m_slots; + }; + + using ListenerSP = std::shared_ptr; + using ListenerWSP = std::weak_ptr; + + struct ListenerHolder + { + void sendToListeners(KazvEvent e) { + bool needsCleanup = false; + for (auto listener : m_listeners) { + auto strongListener = listener.lock(); + if (strongListener) { + strongListener->emit(e); + } else { + needsCleanup = true; + } + } + + if (needsCleanup) { + std::remove_if(m_listeners.begin(), + m_listeners.end(), + [](auto ptr) { + return ptr.expired(); + }); + } + + } + + std::vector m_listeners; + }; + + inline static Result update(Model, Action a) { + return { + Model{a.nextEvent}, + [=](auto &&ctx) { + auto &holder = lager::get(ctx); + holder.sendToListeners(a.nextEvent); + } + }; } + public: template - LagerStoreEventEmitter(EventLoop &&loop) : m_store( - lager::make_store( - Model{}, - &update, - std::forward(loop))) {} + LagerStoreEventEmitter(EventLoop loop) + : m_holder{} + , m_store( + lager::make_store( + Model{}, + &update, + loop, + lager::with_deps(std::ref(m_holder)))) + , m_postingFunc( + [loop=loop](auto &&func) mutable { + loop.post(std::forward(func)); + }) {} ~LagerStoreEventEmitter() override = default; void emit(KazvEvent e) override { @@ -48,10 +122,13 @@ namespace Kazv class Watchable { public: - Watchable(lager::reader r) : reader(r) {} + Watchable(LagerStoreEventEmitter &ee) + : m_listener(std::make_shared()) { + ee.addListener(m_listener); + } template void after(Func &&func) { - reader.watch( + m_listener->connect( [f=std::forward(func)](KazvEvent e) { if (std::holds_alternative(e)) { f(std::get(e)); @@ -61,14 +138,14 @@ namespace Kazv template void afterAll(Func &&func) { - reader.watch( + m_listener->connect( [f=std::forward(func)](KazvEvent e) { f(e); }); } private: - lager::reader reader; + ListenerSP m_listener; }; /** @@ -80,11 +157,29 @@ namespace Kazv * ``` */ Watchable watchable() { - return lager::reader( - m_store.zoom(lager::lenses::attr(&Model::curEvent))); + return Watchable(*this); } + private: - lager::store m_store; + void addListener(ListenerSP listener) { + m_postingFunc([=]() { + m_holder.m_listeners.push_back(listener); + }); + } + + using StoreT = + decltype(lager::make_store( + Model{}, + &update, + lager::with_manual_event_loop{}, + lager::with_deps(std::ref(detail::declref())))); + + using PostingFunc = std::function)>; + + ListenerHolder m_holder; + StoreT m_store; + PostingFunc m_postingFunc; }; + } diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index e9c4257..5294470 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -9,11 +9,13 @@ add_executable(kazvtest client/client-test-util.cpp client/sync-test.cpp kazvjobtest.cpp + event-emitter-test.cpp ) target_link_libraries(kazvtest PRIVATE Catch2::Catch2 PRIVATE kazv + PRIVATE kazveventemitter PRIVATE kazvjob PRIVATE nlohmann_json::nlohmann_json PRIVATE immer diff --git a/src/tests/event-emitter-test.cpp b/src/tests/event-emitter-test.cpp new file mode 100644 index 0000000..c494d3f --- /dev/null +++ b/src/tests/event-emitter-test.cpp @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2020 Tusooa Zhu + * + * This file is part of libkazv. + * + * libkazv is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * libkazv is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with libkazv. If not, see . + */ + +#include + +#include + +#include + +#include + + +using namespace Kazv; + +TEST_CASE("Event emitter should work normally", "[eventemitter]") +{ + boost::asio::io_context ioContext; + + LagerStoreEventEmitter ee{lager::with_boost_asio_event_loop{ioContext.get_executor()}}; + + auto watchable = ee.watchable(); + int counter = 0; + + watchable.after( + [&](auto) { + ++counter; + }); + + SECTION("Event handlers should be run properly") { + ee.emit(SyncSuccessful{}); + ee.emit(SyncFailed{}); + + ioContext.run(); + + REQUIRE(counter == 1); + } + + SECTION("Event handlers should be called when every event fires off") { + for (int i = 0; i < 100; ++i) { + ee.emit(SyncSuccessful{}); + } + + ioContext.run(); + + REQUIRE(counter == 100); + } + + SECTION("Handler should be disconnected once watchable is destroyed") { + int counter2 = 0; + + auto guard = boost::asio::executor_work_guard(ioContext.get_executor()); + auto thread = std::thread([&] { ioContext.run(); }); + + ee.emit(SyncFailed{}); + std::this_thread::sleep_for(std::chrono::milliseconds{100}); + + { + auto watchable2 = ee.watchable(); + watchable2.after( + [&](auto) { + ++counter2; + }); + + ee.emit(SyncSuccessful{}); + ee.emit(SyncFailed{}); + std::this_thread::sleep_for(std::chrono::milliseconds{100}); + } + + ee.emit(SyncFailed{}); + + guard.reset(); + + thread.join(); + + REQUIRE(counter == 1); + REQUIRE(counter2 == 1); + } +}