From 33406cfe146f19084c96b65c6fe3097e12ca3242 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sat, 21 Dec 2019 23:08:25 +0200 Subject: [PATCH] task: stop wrapping tasks with unique_ptr Since the very beginning, we wrapped tasks and continuations with unique_ptr for exception safety. It wasn't safe, however - once a task is created, it must be executed and cannot be just destroyed and freed, since anything that depends on the task will not be run. This became even more pronounced with 4d811de1463ba5059 ("task: resumable tasks"), as tasks are now responsible for their own reclamation (the "dispose" part of task::run_and_dispose()). To make it clear that one cannot rely on unique_ptr being safe, replace unique_ptr with a plain task* (and make_unique with naked new calls). To mark the fact that the functions in which this happens are not exeception safe, annotate them as noexcept. Some notes: - std::move()ing a pointer will not clear it, so some call sites have changed to use std::exchange() - repeat(), do_until(), and the like were using unique_ptr(this) to destroy the task in run_and_dispose() unless moved from. This was changed to plain "delete this". Tests: unit(dev, debug) Message-Id: <20191221210825.3179484-1-avi@scylladb.com> --- include/seastar/core/do_with.hh | 8 ++-- include/seastar/core/future-util.hh | 63 ++++++++++++++++------------- include/seastar/core/future.hh | 34 ++++++++-------- include/seastar/core/reactor.hh | 14 +++---- include/seastar/core/task.hh | 16 ++++---- src/core/reactor.cc | 40 +++++++++--------- src/core/thread.cc | 4 +- 7 files changed, 93 insertions(+), 86 deletions(-) diff --git a/include/seastar/core/do_with.hh b/include/seastar/core/do_with.hh index c967659f8f8..35016d1632f 100644 --- a/include/seastar/core/do_with.hh +++ b/include/seastar/core/do_with.hh @@ -90,13 +90,13 @@ public: template inline auto do_with(T&& rvalue, F&& f) { - auto task = std::make_unique>>(std::forward(rvalue)); + auto task = new internal::do_with_state>(std::forward(rvalue)); auto fut = f(task->data()); if (fut.available()) { return fut; } auto ret = task->get_future(); - internal::set_callback(fut, std::move(task)); + internal::set_callback(fut, task); return ret; } @@ -148,13 +148,13 @@ do_with(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more) { auto&& just_func = std::move(std::get(std::move(all))); using value_tuple = std::remove_reference_t; using ret_type = decltype(apply(just_func, just_values)); - auto task = std::make_unique>(std::move(just_values)); + auto task = new internal::do_with_state(std::move(just_values)); auto fut = apply(just_func, task->data()); if (fut.available()) { return fut; } auto ret = task->get_future(); - internal::set_callback(fut, std::move(task)); + internal::set_callback(fut, task); return ret; } diff --git a/include/seastar/core/future-util.hh b/include/seastar/core/future-util.hh index 021ed49a2d3..b10c1ea34f2 100644 --- a/include/seastar/core/future-util.hh +++ b/include/seastar/core/future-util.hh @@ -119,7 +119,7 @@ private: // Wait for one of the futures in _incomplete to complete, and then // decide what to do: wait for another one, or deliver _result if all // are complete. - void wait_for_one() { + void wait_for_one() noexcept { // Process from back to front, on the assumption that the front // futures are likely to complete earlier than the back futures. // If that's indeed the case, then the front futures will be @@ -135,7 +135,7 @@ private: // If there's an incompelete future, wait for it. if (!_incomplete.empty()) { - internal::set_callback(_incomplete.back(), std::unique_ptr>(this)); + internal::set_callback(_incomplete.back(), static_cast*>(this)); // This future's state will be collected in run_and_dispose(), so we can drop it. _incomplete.pop_back(); return; @@ -280,13 +280,14 @@ public: } future<> get_future() { return _promise.get_future(); } virtual void run_and_dispose() noexcept override { - std::unique_ptr zis{this}; if (_state.failed()) { _promise.set_exception(std::move(_state).get_exception()); + delete this; return; } else { if (std::get<0>(_state.get()) == stop_iteration::yes) { _promise.set_value(); + delete this; return; } _state = {}; @@ -295,20 +296,22 @@ public: do { auto f = futurator::apply(_action); if (!f.available()) { - internal::set_callback(f, std::move(zis)); + internal::set_callback(f, this); return; } if (f.get0() == stop_iteration::yes) { _promise.set_value(); + delete this; return; } } while (!need_preempt()); } catch (...) { _promise.set_exception(std::current_exception()); + delete this; return; } _state.set(stop_iteration::no); - schedule(std::move(zis)); + schedule(this); } }; @@ -328,7 +331,7 @@ public: template GCC6_CONCEPT( requires seastar::ApplyReturns || seastar::ApplyReturns> ) inline -future<> repeat(AsyncAction action) { +future<> repeat(AsyncAction action) noexcept { using futurator = futurize>; static_assert(std::is_same, typename futurator::type>::value, "bad AsyncAction signature"); try { @@ -339,9 +342,9 @@ future<> repeat(AsyncAction action) { if (!f.available()) { return [&] () noexcept { memory::disable_failure_guard dfg; - auto repeater = std::make_unique>(std::move(action)); + auto repeater = new internal::repeater(std::move(action)); auto ret = repeater->get_future(); - internal::set_callback(f, std::move(repeater)); + internal::set_callback(f, repeater); return ret; }(); } @@ -351,9 +354,9 @@ future<> repeat(AsyncAction action) { } } while (!need_preempt()); - auto repeater = std::make_unique>(stop_iteration::no, std::move(action)); + auto repeater = new internal::repeater(stop_iteration::no, std::move(action)); auto ret = repeater->get_future(); - schedule(std::move(repeater)); + schedule(repeater); return ret; } catch (...) { return make_exception_future(std::current_exception()); @@ -397,14 +400,15 @@ public: } future get_future() { return _promise.get_future(); } virtual void run_and_dispose() noexcept override { - std::unique_ptr zis{this}; if (this->_state.failed()) { _promise.set_exception(std::move(this->_state).get_exception()); + delete this; return; } else { auto v = std::get<0>(std::move(this->_state).get()); if (v) { _promise.set_value(std::move(*v)); + delete this; return; } this->_state = {}; @@ -413,21 +417,23 @@ public: do { auto f = futurator::apply(_action); if (!f.available()) { - internal::set_callback(f, std::move(zis)); + internal::set_callback(f, this); return; } auto ret = f.get0(); if (ret) { _promise.set_value(std::make_tuple(std::move(*ret))); + delete this; return; } } while (!need_preempt()); } catch (...) { _promise.set_exception(std::current_exception()); + delete this; return; } this->_state.set(compat::nullopt); - schedule(std::move(zis)); + schedule(this); } }; @@ -450,7 +456,7 @@ GCC6_CONCEPT( requires requires (AsyncAction aa) { futurize>::apply(aa).get0().value(); } ) repeat_until_value_return_type -repeat_until_value(AsyncAction action) { +repeat_until_value(AsyncAction action) noexcept { using futurator = futurize>; using type_helper = repeat_until_value_type_helper; // the "T" in the documentation @@ -462,9 +468,9 @@ repeat_until_value(AsyncAction action) { if (!f.available()) { return [&] () noexcept { memory::disable_failure_guard dfg; - auto state = std::make_unique>(std::move(action)); + auto state = new internal::repeat_until_value_state(std::move(action)); auto ret = state->get_future(); - internal::set_callback(f, std::move(state)); + internal::set_callback(f, state); return ret; }(); } @@ -480,9 +486,9 @@ repeat_until_value(AsyncAction action) { } while (!need_preempt()); try { - auto state = std::make_unique>(compat::nullopt, std::move(action)); + auto state = new internal::repeat_until_value_state(compat::nullopt, std::move(action)); auto f = state->get_future(); - schedule(std::move(state)); + schedule(state); return f; } catch (...) { return make_exception_future(std::current_exception()); @@ -500,10 +506,10 @@ public: explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {} future<> get_future() { return _promise.get_future(); } virtual void run_and_dispose() noexcept override { - std::unique_ptr zis{this}; if (_state.available()) { if (_state.failed()) { _promise.set_urgent_state(std::move(_state)); + delete this; return; } _state = {}; // allow next cycle to overrun state @@ -512,23 +518,26 @@ public: do { if (_stop()) { _promise.set_value(); + delete this; return; } auto f = _action(); if (!f.available()) { - internal::set_callback(f, std::move(zis)); + internal::set_callback(f, this); return; } if (f.failed()) { f.forward_to(std::move(_promise)); + delete this; return; } } while (!need_preempt()); } catch (...) { _promise.set_exception(std::current_exception()); + delete this; return; } - schedule(std::move(zis)); + schedule(this); } }; @@ -547,7 +556,7 @@ public: template GCC6_CONCEPT( requires seastar::ApplyReturns && seastar::ApplyReturns> ) inline -future<> do_until(StopCondition stop_cond, AsyncAction action) { +future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept { using namespace internal; using futurator = futurize; do { @@ -558,9 +567,9 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) { if (!f.available()) { return [&] () noexcept { memory::disable_failure_guard dfg; - auto task = std::make_unique>(std::move(stop_cond), std::move(action)); + auto task = new do_until_state(std::move(stop_cond), std::move(action)); auto ret = task->get_future(); - internal::set_callback(f, std::move(task)); + internal::set_callback(f, task); return ret; }(); } @@ -569,9 +578,9 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) { } } while (!need_preempt()); - auto task = std::make_unique>(std::move(stop_cond), std::move(action)); + auto task = new do_until_state(std::move(stop_cond), std::move(action)); auto f = task->get_future(); - schedule(std::move(task)); + schedule(task); return f; } @@ -737,7 +746,7 @@ public: return true; } else { auto c = new (continuation) when_all_state_component(wasb, f); - set_callback(*f, std::unique_ptr(c)); + set_callback(*f, c); return false; } } diff --git a/include/seastar/core/future.hh b/include/seastar/core/future.hh index d5f0d753f26..76eeded31fc 100644 --- a/include/seastar/core/future.hh +++ b/include/seastar/core/future.hh @@ -495,7 +495,7 @@ template future make_exception_future(future_state_base&& state) noexcept; template -void set_callback(future& fut, std::unique_ptr callback); +void set_callback(future& fut, U* callback) noexcept; class future_base; @@ -509,7 +509,7 @@ protected: // details. future_state_base* _state; - std::unique_ptr _task; + task* _task = nullptr; promise_base(const promise_base&) = delete; promise_base(future_state_base* state) noexcept : _state(state) {} @@ -607,19 +607,19 @@ public: #if SEASTAR_COROUTINES_TS void set_coroutine(future_state& state, task& coroutine) noexcept { _state = &state; - _task = std::unique_ptr(&coroutine); + _task = &coroutine; } #endif private: template - void schedule(Func&& func) { - auto tws = std::make_unique>(std::move(func)); + void schedule(Func&& func) noexcept { + auto tws = new continuation(std::move(func)); _state = &tws->_state; - _task = std::move(tws); + _task = tws; } - void schedule(std::unique_ptr> callback) { + void schedule(continuation_base* callback) noexcept { _state = &callback->_state; - _task = std::move(callback); + _task = callback; } template @@ -968,12 +968,12 @@ private: return static_cast*>(future_base::detach_promise()); } template - void schedule(Func&& func) { + void schedule(Func&& func) noexcept { if (_state.available() || !_promise) { if (__builtin_expect(!_state.available() && !_promise, false)) { _state.set_to_broken_promise(); } - ::seastar::schedule(std::make_unique>(std::move(func), std::move(_state))); + ::seastar::schedule(new continuation(std::move(func), std::move(_state))); } else { assert(_promise); detach_promise()->schedule(std::move(func)); @@ -1095,7 +1095,7 @@ private: auto thread = thread_impl::get(); assert(thread); thread_wake_task wake_task{thread, this}; - detach_promise()->schedule(std::unique_ptr>(&wake_task)); + detach_promise()->schedule(static_cast*>(&wake_task)); thread_impl::switch_out(thread); } @@ -1443,13 +1443,13 @@ public: } #endif private: - void set_callback(std::unique_ptr> callback) { + void set_callback(continuation_base* callback) noexcept { if (_state.available()) { callback->set_state(get_available_state_ref()); - ::seastar::schedule(std::move(callback)); + ::seastar::schedule(callback); } else { assert(_promise); - detach_promise()->schedule(std::move(callback)); + detach_promise()->schedule(callback); } } @@ -1470,7 +1470,7 @@ private: template friend future internal::make_exception_future(future_state_base&& state) noexcept; template - friend void internal::set_callback(future&, std::unique_ptr); + friend void internal::set_callback(future&, V*) noexcept; /// \endcond }; @@ -1717,10 +1717,10 @@ namespace internal { template inline -void set_callback(future& fut, std::unique_ptr callback) { +void set_callback(future& fut, U* callback) noexcept { // It would be better to use continuation_base for U, but // then a derived class of continuation_base won't be matched - return fut.set_callback(std::move(callback)); + return fut.set_callback(callback); } } diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 70688700cd4..1c975cfcea0 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -188,7 +188,7 @@ public: std::unique_ptr _pollfn; class registration_task; class deregistration_task; - registration_task* _registration_task; + registration_task* _registration_task = nullptr; public: template // signature: bool () static poller simple(Func&& poll) { @@ -201,7 +201,7 @@ public: ~poller(); poller(poller&& x); poller& operator=(poller&& x); - void do_register(); + void do_register() noexcept; friend class reactor; }; enum class idle_cpu_handler_result { @@ -305,7 +305,7 @@ private: uint8_t _id; sched_clock::duration _runtime = {}; uint64_t _tasks_processed = 0; - circular_buffer> _q; + circular_buffer _q; sstring _name; /** * This array holds pointers to the scheduling group specific @@ -564,10 +564,10 @@ public: } #ifdef SEASTAR_SHUFFLE_TASK_QUEUE - void shuffle(std::unique_ptr&, task_queue&); + void shuffle(task*&, task_queue&); #endif - void add_task(std::unique_ptr&& t) { + void add_task(task* t) noexcept { auto sg = t->group(); auto* q = _task_queues[sg._id].get(); bool was_empty = q->_q.empty(); @@ -579,7 +579,7 @@ public: activate(*q); } } - void add_urgent_task(std::unique_ptr&& t) { + void add_urgent_task(task* t) noexcept { auto sg = t->group(); auto* q = _task_queues[sg._id].get(); bool was_empty = q->_q.empty(); @@ -605,7 +605,7 @@ public: } void force_poll(); - void add_high_priority_task(std::unique_ptr&&); + void add_high_priority_task(task*) noexcept; network_stack& net() { return *_network_stack; } shard_id cpu_id() const { return _id; } diff --git a/include/seastar/core/task.hh b/include/seastar/core/task.hh index 379562cce55..efb85698262 100644 --- a/include/seastar/core/task.hh +++ b/include/seastar/core/task.hh @@ -35,8 +35,8 @@ public: scheduling_group group() const { return _sg; } }; -void schedule(std::unique_ptr&& t) noexcept; -void schedule_urgent(std::unique_ptr&& t) noexcept; +void schedule(task* t) noexcept; +void schedule_urgent(task* t) noexcept; template class lambda_task final : public task { @@ -52,16 +52,16 @@ public: template inline -std::unique_ptr -make_task(Func&& func) { - return std::make_unique>(current_scheduling_group(), std::forward(func)); +task* +make_task(Func&& func) noexcept { + return new lambda_task(current_scheduling_group(), std::forward(func)); } template inline -std::unique_ptr -make_task(scheduling_group sg, Func&& func) { - return std::make_unique>(sg, std::forward(func)); +task* +make_task(scheduling_group sg, Func&& func) noexcept { + return new lambda_task(sg, std::forward(func)); } } diff --git a/src/core/reactor.cc b/src/core/reactor.cc index a28bb5a0692..9ee998e4fb1 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -2099,12 +2099,11 @@ void reactor::run_tasks(task_queue& tq) { *internal::current_scheduling_group_ptr() = scheduling_group(tq._id); auto& tasks = tq._q; while (!tasks.empty()) { - auto tsk = std::move(tasks.front()); + auto tsk = tasks.front(); tasks.pop_front(); STAP_PROBE(seastar, reactor_run_tasks_single_start); task_histogram_add_task(*tsk); tsk->run_and_dispose(); - tsk.release(); STAP_PROBE(seastar, reactor_run_tasks_single_end); ++tq._tasks_processed; ++_global_tasks_processed; @@ -2123,7 +2122,7 @@ void reactor::run_tasks(task_queue& tq) { } #ifdef SEASTAR_SHUFFLE_TASK_QUEUE -void reactor::shuffle(std::unique_ptr& t, task_queue& q) { +void reactor::shuffle(task*& t, task_queue& q) { static thread_local std::mt19937 gen = std::mt19937(std::default_random_engine()()); std::uniform_int_distribution tasks_dist{0, q._q.size() - 1}; auto& to_swap = q._q[tasks_dist(gen)]; @@ -2835,7 +2834,7 @@ void reactor::replace_poller(pollfn* old, pollfn* neww) { } reactor::poller::poller(poller&& x) - : _pollfn(std::move(x._pollfn)), _registration_task(x._registration_task) { + : _pollfn(std::move(x._pollfn)), _registration_task(std::exchange(x._registration_task, nullptr)) { if (_pollfn && _registration_task) { _registration_task->moved(this); } @@ -2851,15 +2850,14 @@ reactor::poller::operator=(poller&& x) { } void -reactor::poller::do_register() { +reactor::poller::do_register() noexcept { // We can't just insert a poller into reactor::_pollers, because we // may be running inside a poller ourselves, and so in the middle of // iterating reactor::_pollers itself. So we schedule a task to add // the poller instead. - auto task = std::make_unique(this); - auto tmp = task.get(); - engine().add_task(std::move(task)); - _registration_task = tmp; + auto task = new registration_task(this); + engine().add_task(task); + _registration_task = task; } reactor::poller::~poller() { @@ -2878,8 +2876,8 @@ reactor::poller::~poller() { } else { auto dummy = make_pollfn([] { return false; }); auto dummy_p = dummy.get(); - auto task = std::make_unique(std::move(dummy)); - engine().add_task(std::move(task)); + auto task = new deregistration_task(std::move(dummy)); + engine().add_task(task); engine().replace_poller(_pollfn.get(), dummy_p); } } @@ -3129,12 +3127,12 @@ future readable_eventfd::wait() { }); } -void schedule(std::unique_ptr&& t) noexcept { - engine().add_task(std::move(t)); +void schedule(task* t) noexcept { + engine().add_task(t); } -void schedule_urgent(std::unique_ptr&& t) noexcept { - engine().add_urgent_task(std::move(t)); +void schedule_urgent(task* t) noexcept { + engine().add_urgent_task(t); } } @@ -3928,7 +3926,7 @@ void report_failed_future(const future_state_base& state) noexcept { broken_promise::broken_promise() : logic_error("broken promise") { } promise_base::promise_base(promise_base&& x) noexcept - : _future(x._future), _state(x._state), _task(std::move(x._task)) { + : _future(x._future), _state(x._state), _task(std::exchange(x._task, nullptr)) { x._state = nullptr; if (auto* fut = _future) { fut->detach_promise(); @@ -3944,7 +3942,7 @@ promise_base::~promise_base() noexcept { } else if (__builtin_expect(bool(_task), false)) { assert(_state && !_state->available()); _state->set_to_broken_promise(); - ::seastar::schedule(std::move(_task)); + ::seastar::schedule(std::exchange(_task, nullptr)); } } @@ -3953,9 +3951,9 @@ void promise_base::make_ready() noexcept { if (_task) { _state = nullptr; if (Urgent == urgent::yes && !need_preempt()) { - ::seastar::schedule_urgent(std::move(_task)); + ::seastar::schedule_urgent(std::exchange(_task, nullptr)); } else { - ::seastar::schedule(std::move(_task)); + ::seastar::schedule(std::exchange(_task, nullptr)); } } } @@ -4156,8 +4154,8 @@ future connect(socket_address sa, socket_address local, transp return engine().connect(sa, local, proto); } -void reactor::add_high_priority_task(std::unique_ptr&& t) { - add_urgent_task(std::move(t)); +void reactor::add_high_priority_task(task* t) noexcept { + add_urgent_task(t); // break .then() chains request_preemption(); } diff --git a/src/core/thread.cc b/src/core/thread.cc index da961404fa6..63761f1858f 100644 --- a/src/core/thread.cc +++ b/src/core/thread.cc @@ -240,13 +240,13 @@ thread_context::run_and_dispose() noexcept { void thread_context::yield() { - schedule(std::unique_ptr(this)); + schedule(this); switch_out(); } void thread_context::reschedule() { - schedule(std::unique_ptr(this)); + schedule(this); } void