Skip to content

Commit

Permalink
Ensure every event handler is properly called
Browse files Browse the repository at this point in the history
Simply using a reader will not always call the callbacks
after emitting an event, which means we will frequently
miss events (arximboldi/lager#91).
This commit calls the callbacks in the effect returned by
the reducer instead.
  • Loading branch information
tusooa committed Jan 11, 2021
1 parent 3bc9924 commit 3652b27
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 21 deletions.
7 changes: 7 additions & 0 deletions src/base/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ namespace Kazv
{
Invite, Join, Leave
};

namespace detail
{
// emulates declval() but returns lvalue reference
template<class T>
typename std::add_lvalue_reference<T>::type declref() noexcept;
}
}

namespace nlohmann {
Expand Down
7 changes: 0 additions & 7 deletions src/client/sdk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@

namespace Kazv
{
namespace detail
{
// emulates declval() but returns lvalue reference
template<class T>
typename std::add_lvalue_reference<T>::type declref() noexcept;
}

template<class EventLoop, class Xform, class ...Enhancers>
class Sdk
{
Expand Down
123 changes: 109 additions & 14 deletions src/eventemitter/lagerstoreeventemitter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@
*/


#include <vector>
#include <memory>
#include <algorithm>


#include <lager/store.hpp>
#include <lager/reader.hpp>
#include <lager/event_loop/manual.hpp>

#include "types.hpp"

#include "eventinterface.hpp"

Expand All @@ -29,16 +37,82 @@ namespace Kazv
{
struct Model { KazvEvent curEvent; };
struct Action { KazvEvent nextEvent; };
static Model update(Model, Action a) {
return Model{a.nextEvent};

struct ListenerHolder;

using Result = std::pair<Model,
lager::effect<Action, lager::deps<ListenerHolder &>>>;

using SlotT = std::function<void(KazvEvent)>;

struct Listener
{
void emit(KazvEvent e) {
for (const auto &slot: m_slots) {
slot(e);
}
}

void connect(SlotT slot) {
m_slots.push_back(std::move(slot));
}

std::vector<SlotT> m_slots;
};

using ListenerSP = std::shared_ptr<Listener>;
using ListenerWSP = std::weak_ptr<Listener>;

struct ListenerHolder
{
void sendToListeners(KazvEvent e) {
bool needsCleanup = false;
for (auto listener : m_listeners) {
auto strongListener = listener.lock();
if (strongListener) {
strongListener->emit(e);
} else {
needsCleanup = true;
}
}

if (needsCleanup) {
std::remove_if(m_listeners.begin(),
m_listeners.end(),
[](auto ptr) {
return ptr.expired();
});
}

}

std::vector<ListenerWSP> m_listeners;
};

inline static Result update(Model, Action a) {
return {
Model{a.nextEvent},
[=](auto &&ctx) {
auto &holder = lager::get<ListenerHolder &>(ctx);
holder.sendToListeners(a.nextEvent);
}
};
}

public:
template<class EventLoop>
LagerStoreEventEmitter(EventLoop &&loop) : m_store(
lager::make_store<Action>(
Model{},
&update,
std::forward<EventLoop>(loop))) {}
LagerStoreEventEmitter(EventLoop loop)
: m_holder{}
, m_store(
lager::make_store<Action>(
Model{},
&update,
loop,
lager::with_deps(std::ref(m_holder))))
, m_postingFunc(
[loop=loop](auto &&func) mutable {
loop.post(std::forward<decltype(func)>(func));
}) {}
~LagerStoreEventEmitter() override = default;

void emit(KazvEvent e) override {
Expand All @@ -48,10 +122,13 @@ namespace Kazv
class Watchable
{
public:
Watchable(lager::reader<KazvEvent> r) : reader(r) {}
Watchable(LagerStoreEventEmitter &ee)
: m_listener(std::make_shared<Listener>()) {
ee.addListener(m_listener);
}
template<class EventType, class Func>
void after(Func &&func) {
reader.watch(
m_listener->connect(
[f=std::forward<Func>(func)](KazvEvent e) {
if (std::holds_alternative<EventType>(e)) {
f(std::get<EventType>(e));
Expand All @@ -61,14 +138,14 @@ namespace Kazv

template<class Func>
void afterAll(Func &&func) {
reader.watch(
m_listener->connect(
[f=std::forward<Func>(func)](KazvEvent e) {
f(e);
});
}

private:
lager::reader<KazvEvent> reader;
ListenerSP m_listener;
};

/**
Expand All @@ -80,11 +157,29 @@ namespace Kazv
* ```
*/
Watchable watchable() {
return lager::reader<KazvEvent>(
m_store.zoom(lager::lenses::attr(&Model::curEvent)));
return Watchable(*this);
}


private:
lager::store<Action, Model> m_store;
void addListener(ListenerSP listener) {
m_postingFunc([=]() {
m_holder.m_listeners.push_back(listener);
});
}

using StoreT =
decltype(lager::make_store<Action>(
Model{},
&update,
lager::with_manual_event_loop{},
lager::with_deps(std::ref(detail::declref<ListenerHolder>()))));

using PostingFunc = std::function<void(std::function<void()>)>;

ListenerHolder m_holder;
StoreT m_store;
PostingFunc m_postingFunc;
};

}
2 changes: 2 additions & 0 deletions src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ add_executable(kazvtest
client/client-test-util.cpp
client/sync-test.cpp
kazvjobtest.cpp
event-emitter-test.cpp
)

target_link_libraries(kazvtest
PRIVATE Catch2::Catch2
PRIVATE kazv
PRIVATE kazveventemitter
PRIVATE kazvjob
PRIVATE nlohmann_json::nlohmann_json
PRIVATE immer
Expand Down
94 changes: 94 additions & 0 deletions src/tests/event-emitter-test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (C) 2020 Tusooa Zhu <[email protected]>
*
* This file is part of libkazv.
*
* libkazv is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* libkazv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with libkazv. If not, see <https://www.gnu.org/licenses/>.
*/

#include <chrono>

#include <catch2/catch.hpp>

#include <eventemitter/lagerstoreeventemitter.hpp>

#include <lager/event_loop/boost_asio.hpp>


using namespace Kazv;

TEST_CASE("Event emitter should work normally", "[eventemitter]")
{
boost::asio::io_context ioContext;

LagerStoreEventEmitter ee{lager::with_boost_asio_event_loop{ioContext.get_executor()}};

auto watchable = ee.watchable();
int counter = 0;

watchable.after<SyncSuccessful>(
[&](auto) {
++counter;
});

SECTION("Event handlers should be run properly") {
ee.emit(SyncSuccessful{});
ee.emit(SyncFailed{});

ioContext.run();

REQUIRE(counter == 1);
}

SECTION("Event handlers should be called when every event fires off") {
for (int i = 0; i < 100; ++i) {
ee.emit(SyncSuccessful{});
}

ioContext.run();

REQUIRE(counter == 100);
}

SECTION("Handler should be disconnected once watchable is destroyed") {
int counter2 = 0;

auto guard = boost::asio::executor_work_guard(ioContext.get_executor());
auto thread = std::thread([&] { ioContext.run(); });

ee.emit(SyncFailed{});
std::this_thread::sleep_for(std::chrono::milliseconds{100});

{
auto watchable2 = ee.watchable();
watchable2.after<SyncFailed>(
[&](auto) {
++counter2;
});

ee.emit(SyncSuccessful{});
ee.emit(SyncFailed{});
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}

ee.emit(SyncFailed{});

guard.reset();

thread.join();

REQUIRE(counter == 1);
REQUIRE(counter2 == 1);
}
}

0 comments on commit 3652b27

Please sign in to comment.