From c08cf5e1ad3b9b5e3f10dfc5d02dc1a462d654ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Canberk=20S=C3=B6nmez?= Date: Mon, 2 Oct 2023 02:10:34 +0200 Subject: [PATCH] Nested exceptions. --- README.md | 2 +- examples/exceptions.cpp | 152 +++++++++++++++++++ examples/pitfall.cpp | 55 +------ examples/subroutine.cpp | 2 +- include/cxxdes/core/core.hpp | 5 - include/cxxdes/core/impl/any_of.ipp | 6 + include/cxxdes/core/impl/await_transform.ipp | 2 - include/cxxdes/core/impl/coroutine.ipp | 37 ++--- include/cxxdes/core/impl/coroutine_data.ipp | 51 +++---- include/cxxdes/core/impl/environment.ipp | 48 +++--- include/cxxdes/core/impl/exception_types.ipp | 18 --- include/cxxdes/core/impl/subroutine.ipp | 1 + include/cxxdes/core/impl/token.ipp | 2 +- tests/process.test.cpp | 46 ------ 14 files changed, 215 insertions(+), 212 deletions(-) create mode 100644 examples/exceptions.cpp delete mode 100644 include/cxxdes/core/impl/exception_types.ipp diff --git a/README.md b/README.md index 37c95ea..8adaa6c 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ libcxxdes tries to provide the complete feature set of SimPy, it currently suppo `co_await sequential(p1(), p2(), ...)`, `co_await (p1(), p2(), ...)` - Timeouts: `co_await delay(5)`, `co_await timeout(5_s)` -3. Interruptable coroutines which are useful for modelling preemptive resources. +3. Interruptable coroutines which are useful for modelling preemptive resources. --Not feasible to implement. Use || with a queue, same thing. 4. Priority-scheduling of events that take place at the same simulation time. `coroutine` can be assigned priorities! (lower the number, higher the priority) 5. `time_unit()` and `time_precision()` functions for mapping simulation time (integer) to real-world time. 6. Synchronization primitives, such as `mutex`, `semaphore`, `queue`, and `event`. diff --git a/examples/exceptions.cpp b/examples/exceptions.cpp new file mode 100644 index 0000000..9693a5d --- /dev/null +++ b/examples/exceptions.cpp @@ -0,0 +1,152 @@ +#include +#include + +using namespace cxxdes::core; + +subroutine<> f() +{ + throw std::runtime_error("f has thrown an exception!"); + co_return; +} + +subroutine<> g() +{ + co_await f(); +} + +coroutine<> h() +{ + try + { + co_await g(); + } + catch (std::exception &ex) + { + fmt::println("[h] Exception: {}", ex.what()); + } + + throw std::runtime_error("h has thrown an exception!"); +} + +coroutine<> co_main() +{ + try + { + co_await h(); + } + catch (std::exception &ex) + { + fmt::println("[co_main] Exception: {}", ex.what()); + } + + throw std::runtime_error("[co_main] co_main has thrown an exception!"); +} + +void nestedExceptions() +{ + fmt::println("test: nested exceptions"); + + environment env; + env.bind(co_main()); + + try + { + env.run(); + } + catch (std::exception &ex) + { + fmt::println("[nestedExceptions] Exception: {}", ex.what()); + } +} + +void stopping() +{ + fmt::println("test: stopping"); + + auto coro = []() -> coroutine<> + { + fmt::println("hey!"); + co_await timeout(50); + // executes no more from here + co_await timeout(100); + fmt::println("should not be printed"); + }(); + + environment env; + env.bind(coro); + + env.run_for(1); +} + +void asynchronous1() +{ + fmt::println("test: asynchronous1"); + + auto coro = []() -> coroutine<> + { + auto coro2 = []() -> coroutine<> + { + co_await timeout(2); + fmt::println("coro2 $0"); + throw std::runtime_error("coro2 has thrown an exception"); + fmt::println("coro2 $1"); + }(); + + co_await (timeout(1) || coro2); + fmt::println("coro done"); + }(); + + environment env; + env.bind(coro); + + try + { + env.run(); + } + catch (std::exception &ex) + { + fmt::println("[asynchronous] Exception: {}", ex.what()); + } +} + +void asynchronous2() +{ + fmt::println("test: asynchronous2"); + + auto coro = []() -> coroutine<> + { + auto coro2 = []() -> coroutine<> + { + co_await timeout(2); + fmt::println("coro2 $0"); + throw std::runtime_error("coro2 has thrown an exception"); + fmt::println("coro2 $1"); + }(); + co_await async(coro2); + fmt::println("coro done"); + + // since we did not await coro2, no completion token could + // be generated. that is, the exception is lost. + }(); + + environment env; + env.bind(coro); + + try + { + env.run(); + } + catch (std::exception &ex) + { + fmt::println("[asynchronous2] Exception: {}", ex.what()); + } +} + +int main() +{ + nestedExceptions(); + stopping(); + asynchronous1(); + asynchronous2(); + return 0; +} diff --git a/examples/pitfall.cpp b/examples/pitfall.cpp index 5246faa..8cb0b5b 100644 --- a/examples/pitfall.cpp +++ b/examples/pitfall.cpp @@ -25,48 +25,6 @@ CXXDES_SIMULATION(pitfall) { #define _TrackLocation cxxdes::util::source_location const = cxxdes::util::source_location::current() -CXXDES_SIMULATION(interrupts) { - using simulation::simulation; - - coroutine<> foo(_TrackLocation) { - try { - while (true) { - fmt::print("now: {}\n", now()); - co_await delay(1); - } - } - catch (interrupted_exception &err) { - fmt::print("exception: {}\n", err.what()); - co_return ; - } - } - - coroutine<> bar(_TrackLocation) { - try { - co_await delay(100); - } - catch (interrupted_exception &err) { - fmt::print("exception: {}\n", err.what()); - co_return ; - } - } - - coroutine<> co_main(_TrackLocation) { - { - coroutine<> h = co_await async(foo()); - co_await delay(5); - h.interrupt(); - } - - { - coroutine<> h = co_await async(bar()); - co_await delay(5); - h.interrupt(interrupted_exception("interrupted!")); - // h.interrupt(std::runtime_error("what?")); - } - } -}; - CXXDES_SIMULATION(subroutines) { using simulation::simulation; @@ -102,14 +60,8 @@ CXXDES_SIMULATION(subroutines2) { using simulation::simulation; subroutine foo() { - try { - co_await delay(600); - co_await delay(600); - } - catch (cxxdes::core::stopped_exception &ex) { - fmt::print("exception: {}, now {}\n", ex.what(), now()); - throw ex; - } + co_await delay(600); + co_await delay(600); co_return 20; } @@ -165,9 +117,6 @@ int main() { fmt::print("pitfall\n"); pitfall::run(); - fmt::print("interrupts\n"); - interrupts::run_for(500); - fmt::print("subroutines\n"); subroutines::run(); diff --git a/examples/subroutine.cpp b/examples/subroutine.cpp index 152e81a..a2e1532 100644 --- a/examples/subroutine.cpp +++ b/examples/subroutine.cpp @@ -1,6 +1,6 @@ #include -#include #include +#include std::stack> call_stack; diff --git a/include/cxxdes/core/core.hpp b/include/cxxdes/core/core.hpp index 9d212c7..938a65d 100644 --- a/include/cxxdes/core/core.hpp +++ b/include/cxxdes/core/core.hpp @@ -19,7 +19,6 @@ #include #include #include -#include #include #include @@ -59,9 +58,6 @@ struct token; template struct immediately_return; -struct interrupted_exception; -struct stopped_exception; - struct coroutine_data; using coroutine_data_ptr = memory::ptr; using const_coroutine_data_ptr = memory::ptr; @@ -82,7 +78,6 @@ struct subroutine; #include "impl/token.ipp" #include "impl/immediately_return.ipp" -#include "impl/exception_types.ipp" #include "impl/coroutine_data.ipp" #include "impl/environment.ipp" #include "impl/timeout.ipp" diff --git a/include/cxxdes/core/impl/any_of.ipp b/include/cxxdes/core/impl/any_of.ipp index 34b6cfd..bb7e414 100644 --- a/include/cxxdes/core/impl/any_of.ipp +++ b/include/cxxdes/core/impl/any_of.ipp @@ -8,6 +8,12 @@ struct any_all_helper { void invoke(token *tkn) override { --remaining; + + if (tkn->eptr) { + // in case an argument of all/any of throws an exception + // this must be handled by main. + std::rethrow_exception(tkn->eptr); + } if (completion_tkn && Condition::operator()(total, remaining)) { // inherit the output_event features diff --git a/include/cxxdes/core/impl/await_transform.ipp b/include/cxxdes/core/impl/await_transform.ipp index cf7fb95..930043e 100644 --- a/include/cxxdes/core/impl/await_transform.ipp +++ b/include/cxxdes/core/impl/await_transform.ipp @@ -14,8 +14,6 @@ struct awaitable_wrapper { } auto await_resume() { - if (coro_data_this->interrupted()) - coro_data_this->raise_interrupt_(); return a.await_resume(); } }; diff --git a/include/cxxdes/core/impl/coroutine.ipp b/include/cxxdes/core/impl/coroutine.ipp index 5bff8e1..9dcccc1 100644 --- a/include/cxxdes/core/impl/coroutine.ipp +++ b/include/cxxdes/core/impl/coroutine.ipp @@ -76,16 +76,6 @@ struct coroutine: return coro_data_->complete(); } - template - void interrupt(T &&t = interrupted_exception{}) noexcept { - coro_data_->interrupt(std::forward(t)); - } - - [[nodiscard]] - bool interrupted() const noexcept { - return coro_data_->interrupted(); - } - [[nodiscard]] priority_type priority() const noexcept { return coro_data_->priority(); @@ -194,8 +184,17 @@ struct coroutine: } } - void await_resume(no_return_value_tag) noexcept { - completion_token_ = nullptr; + void await_resume(no_return_value_tag) { + if (completion_token_) { + auto tkn = completion_token_; + completion_token_ = nullptr; + + /* If coro_data is missing from the following condition, + * it fails segv. But this should not be the case. */ + if (tkn->coro_data && tkn->eptr) { + std::rethrow_exception(tkn->eptr); + } + } } private: @@ -250,15 +249,11 @@ public: auto final_suspend() noexcept -> std::suspend_never { return {}; } auto unhandled_exception() -> void { - try { - std::rethrow_exception(std::current_exception()); - } - catch (stopped_exception & /* ex */) { - do_return(); - } - catch (...) { - std::rethrow_exception(std::current_exception()); - } + // update completion tokens with the current exception + coro_data->propagate_exception(std::current_exception()); + + // schedule the completion tokens + coro_data->do_return(); } template diff --git a/include/cxxdes/core/impl/coroutine_data.ipp b/include/cxxdes/core/impl/coroutine_data.ipp index b42870b..2af1709 100644 --- a/include/cxxdes/core/impl/coroutine_data.ipp +++ b/include/cxxdes/core/impl/coroutine_data.ipp @@ -18,14 +18,12 @@ struct coroutine_data: memory::reference_counted_base { } void resume() { - if (complete_) - // a token might try to resume an interrupted coroutine - return ; + assert(!complete_); // note that coroutine<> does not pop the first coroutine do { should_continue_ = false; - auto top = call_stack_.top(); + auto top = call_stack_.back(); top.resume(); } while (should_continue_); } @@ -78,16 +76,6 @@ struct coroutine_data: memory::reference_counted_base { return complete_; } - template - void interrupt(T &&t = interrupted_exception{}) noexcept { - exception_ = std::make_exception_ptr(std::forward(t)); - } - - [[nodiscard]] - bool interrupted() const noexcept { - return (bool) exception_; - } - [[nodiscard]] bool bound() const noexcept { return env_ != nullptr; @@ -95,9 +83,8 @@ struct coroutine_data: memory::reference_counted_base { void kill() { while (!call_stack_.empty()) { - // destroy the call stack - call_stack_.top().destroy(); - call_stack_.pop(); + call_stack_.back().destroy(); + call_stack_.pop_back(); } } @@ -116,28 +103,18 @@ protected: void bind_(environment *env, priority_type priority); void manage_(); void unmanage_(); - - void raise_interrupt_() { - std::exception_ptr exception = exception_; - - // clear exception_, the exception might be caught - // we may need to interrupt again later - exception_ = nullptr; - - std::rethrow_exception(exception); - } - + void push_coro_(coroutine_handle coro) { // whenever there is a subroutine call, the // execution should keep moving further - call_stack_.push(coro); + call_stack_.push_back(coro); should_continue_ = true; } void pop_coro_() { // whenever a subroutine call returns, the // execution should keep moving further - call_stack_.pop(); + call_stack_.pop_back(); should_continue_ = true; } @@ -146,13 +123,13 @@ protected: // if called, only two owners: (1) coroutine and (2) promise_type // as the coroutine is not started yet, promise_type will never // get destroyed, resulting in a memory leak - call_stack_.top().destroy(); + call_stack_.back().destroy(); } } protected: environment *env_ = nullptr; - std::stack call_stack_; + std::vector call_stack_; bool should_continue_ = false; util::source_location created_; util::source_location awaited_; @@ -160,7 +137,6 @@ protected: time_integral latency_ = 0; memory::ptr parent_; bool complete_ = false; - std::exception_ptr exception_; }; @@ -208,6 +184,11 @@ struct coroutine_data_completion_tokens_mixin { completion_tokens_.push_back(completion_token); } + void propagate_exception(std::exception_ptr exception) { + for (auto completion_token: completion_tokens_) + completion_token->eptr = exception; + } + protected: void schedule_completion_(environment *env); @@ -220,6 +201,10 @@ struct coroutine_data_completion_tokens_mixin { completion_token_ = completion_token; } + void propagate_exception(std::exception_ptr exception) { + completion_token_->eptr = exception; + } + protected: void schedule_completion_(environment *env); diff --git a/include/cxxdes/core/impl/environment.ipp b/include/cxxdes/core/impl/environment.ipp index 8e2551d..42ced08 100644 --- a/include/cxxdes/core/impl/environment.ipp +++ b/include/cxxdes/core/impl/environment.ipp @@ -68,41 +68,30 @@ struct environment { return false; auto tkn = memory::ptr{tokens_.top()}; - tkn->unref() /* tkn already holds a reference now */; tokens_.pop(); now_ = std::max(tkn->time, now_); - try { - if (tkn->handler) { + if (tkn->handler) { + try { tkn->handler->invoke(tkn); } - else if (tkn->coro_data) { - current_coroutine_ = tkn->coro_data; - tkn->coro_data->resume(); - current_coroutine_ = nullptr; + catch (...) { + reset(); + std::rethrow_exception(std::current_exception()); } } - catch (...) { - /** In case of an unhandled exception, simply kill everything. */ - - auto coroutines = std::move(coroutines_); - - for (auto coroutine: coroutines) { - if (!coroutine->complete()) - coroutine->kill(); - } - - while (!tokens_.empty()) { - auto tkn = tokens_.top(); - tokens_.pop(); - tkn->unref(); - } - - std::rethrow_exception(std::current_exception()); + else if (tkn->coro_data) { + current_coroutine_ = tkn->coro_data; + tkn->coro_data->resume(); + current_coroutine_ = nullptr; } - + else if (tkn->eptr) { + reset(); + std::rethrow_exception(tkn->eptr); + } + return true; } @@ -113,13 +102,10 @@ struct environment { // for a proper way to erase while iterating: // https://en.cppreference.com/w/cpp/container/unordered_set/erase // sadly, we cannot apply this solution. - auto coroutines = std::move(coroutines_); + for (auto coroutine: coroutines) { - if (!coroutine->complete()) { - coroutine->interrupt(stopped_exception{}); - coroutine->resume(); - } + coroutine->kill(); } while (!tokens_.empty()) { @@ -169,7 +155,7 @@ struct environment { } ~environment() { - reset(); + // reset(); } private: diff --git a/include/cxxdes/core/impl/exception_types.ipp b/include/cxxdes/core/impl/exception_types.ipp deleted file mode 100644 index f0f56d5..0000000 --- a/include/cxxdes/core/impl/exception_types.ipp +++ /dev/null @@ -1,18 +0,0 @@ -struct interrupted_exception: std::exception { - interrupted_exception(std::string what = "interrupted."): - what_{std::move(what)} { - } - - const char *what() const noexcept override { - return what_.c_str(); - } - -private: - std::string what_; -}; - -struct stopped_exception: std::exception { - const char *what() const noexcept override { - return "stopped."; - } -}; diff --git a/include/cxxdes/core/impl/subroutine.ipp b/include/cxxdes/core/impl/subroutine.ipp index 49e7ad5..6cf6331 100644 --- a/include/cxxdes/core/impl/subroutine.ipp +++ b/include/cxxdes/core/impl/subroutine.ipp @@ -104,6 +104,7 @@ public: auto unhandled_exception() { eptr = std::current_exception(); + // after this, will call final_suspend() and finalize } }; diff --git a/include/cxxdes/core/impl/token.ipp b/include/cxxdes/core/impl/token.ipp index 955f126..d780c19 100644 --- a/include/cxxdes/core/impl/token.ipp +++ b/include/cxxdes/core/impl/token.ipp @@ -22,5 +22,5 @@ struct token: memory::reference_counted_base { memory::ptr handler = nullptr; // exception to be propagated - std::exception_ptr exception; + std::exception_ptr eptr = nullptr; }; diff --git a/tests/process.test.cpp b/tests/process.test.cpp index 353b10f..164eb80 100644 --- a/tests/process.test.cpp +++ b/tests/process.test.cpp @@ -1,10 +1,6 @@ #include #include -#ifndef CXXDES_INTERRUPTABLE -# define CXXDES_INTERRUPTABLE -#endif -// #define CXXDES_DEBUG_CORE_coroutine #include using namespace cxxdes::core; @@ -261,45 +257,3 @@ TEST(ProcessTest, Returncoroutine) { test::run(); } -TEST(ProcessTest, Interrupt) { - CXXDES_SIMULATION(test) { - using simulation::simulation; - - test(environment &env): simulation(env) { - } - - bool flag = false; - - coroutine<> foo() { - while (true) { - co_await delay(10); - } - } - - coroutine<> bar() { - try { - while (true) - co_await delay(1000); - } - catch (stopped_exception & /* ex */) { - flag = true; - co_return ; - } - } - - coroutine<> co_main() { - co_await async(foo()); - co_await async(bar()); - - while (true) { - co_await delay(10); - } - } - }; - - environment env; - test t{env}; - env.run_for(100); - env.reset(); - EXPECT_TRUE(t.flag); -}