Skip to content

Commit

Permalink
task: stop wrapping tasks with unique_ptr
Browse files Browse the repository at this point in the history
Since the very beginning, we wrapped tasks and continuations
with unique_ptr for exception safety. It wasn't safe, however -
once a task is created, it must be executed and cannot be just
destroyed and freed, since anything that depends on the task
will not be run.

This became even more pronounced with 4d811de ("task:
resumable tasks"), as tasks are now responsible for their own
reclamation (the "dispose" part of task::run_and_dispose()).

To make it clear that one cannot rely on unique_ptr<task> being
safe, replace unique_ptr<task> with a plain task* (and make_unique
with naked new calls).

To mark the fact that the functions in which this happens are not
exeception safe, annotate them as noexcept.

Some notes:
 - std::move()ing a pointer will not clear it, so some call sites
   have changed to use std::exchange()
 - repeat(), do_until(), and the like were using unique_ptr(this) to
   destroy the task in run_and_dispose() unless moved from. This
   was changed to plain "delete this".

Tests: unit(dev, debug)
Message-Id: <[email protected]>
  • Loading branch information
avikivity authored and nyh committed Dec 24, 2019
1 parent 9612a94 commit 33406cf
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 86 deletions.
8 changes: 4 additions & 4 deletions include/seastar/core/do_with.hh
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ public:
template<typename T, typename F>
inline
auto do_with(T&& rvalue, F&& f) {
auto task = std::make_unique<internal::do_with_state<T, std::result_of_t<F(T&)>>>(std::forward<T>(rvalue));
auto task = new internal::do_with_state<T, std::result_of_t<F(T&)>>(std::forward<T>(rvalue));
auto fut = f(task->data());
if (fut.available()) {
return fut;
}
auto ret = task->get_future();
internal::set_callback(fut, std::move(task));
internal::set_callback(fut, task);
return ret;
}

Expand Down Expand Up @@ -148,13 +148,13 @@ do_with(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more) {
auto&& just_func = std::move(std::get<nr>(std::move(all)));
using value_tuple = std::remove_reference_t<decltype(just_values)>;
using ret_type = decltype(apply(just_func, just_values));
auto task = std::make_unique<internal::do_with_state<value_tuple, ret_type>>(std::move(just_values));
auto task = new internal::do_with_state<value_tuple, ret_type>(std::move(just_values));
auto fut = apply(just_func, task->data());
if (fut.available()) {
return fut;
}
auto ret = task->get_future();
internal::set_callback(fut, std::move(task));
internal::set_callback(fut, task);
return ret;
}

Expand Down
63 changes: 36 additions & 27 deletions include/seastar/core/future-util.hh
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private:
// Wait for one of the futures in _incomplete to complete, and then
// decide what to do: wait for another one, or deliver _result if all
// are complete.
void wait_for_one() {
void wait_for_one() noexcept {
// Process from back to front, on the assumption that the front
// futures are likely to complete earlier than the back futures.
// If that's indeed the case, then the front futures will be
Expand All @@ -135,7 +135,7 @@ private:

// If there's an incompelete future, wait for it.
if (!_incomplete.empty()) {
internal::set_callback(_incomplete.back(), std::unique_ptr<continuation_base<>>(this));
internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
// This future's state will be collected in run_and_dispose(), so we can drop it.
_incomplete.pop_back();
return;
Expand Down Expand Up @@ -280,13 +280,14 @@ public:
}
future<> get_future() { return _promise.get_future(); }
virtual void run_and_dispose() noexcept override {
std::unique_ptr<repeater> zis{this};
if (_state.failed()) {
_promise.set_exception(std::move(_state).get_exception());
delete this;
return;
} else {
if (std::get<0>(_state.get()) == stop_iteration::yes) {
_promise.set_value();
delete this;
return;
}
_state = {};
Expand All @@ -295,20 +296,22 @@ public:
do {
auto f = futurator::apply(_action);
if (!f.available()) {
internal::set_callback(f, std::move(zis));
internal::set_callback(f, this);
return;
}
if (f.get0() == stop_iteration::yes) {
_promise.set_value();
delete this;
return;
}
} while (!need_preempt());
} catch (...) {
_promise.set_exception(std::current_exception());
delete this;
return;
}
_state.set(stop_iteration::no);
schedule(std::move(zis));
schedule(this);
}
};

Expand All @@ -328,7 +331,7 @@ public:
template<typename AsyncAction>
GCC6_CONCEPT( requires seastar::ApplyReturns<AsyncAction, stop_iteration> || seastar::ApplyReturns<AsyncAction, future<stop_iteration>> )
inline
future<> repeat(AsyncAction action) {
future<> repeat(AsyncAction action) noexcept {
using futurator = futurize<std::result_of_t<AsyncAction()>>;
static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
try {
Expand All @@ -339,9 +342,9 @@ future<> repeat(AsyncAction action) {
if (!f.available()) {
return [&] () noexcept {
memory::disable_failure_guard dfg;
auto repeater = std::make_unique<internal::repeater<AsyncAction>>(std::move(action));
auto repeater = new internal::repeater<AsyncAction>(std::move(action));
auto ret = repeater->get_future();
internal::set_callback(f, std::move(repeater));
internal::set_callback(f, repeater);
return ret;
}();
}
Expand All @@ -351,9 +354,9 @@ future<> repeat(AsyncAction action) {
}
} while (!need_preempt());

auto repeater = std::make_unique<internal::repeater<AsyncAction>>(stop_iteration::no, std::move(action));
auto repeater = new internal::repeater<AsyncAction>(stop_iteration::no, std::move(action));
auto ret = repeater->get_future();
schedule(std::move(repeater));
schedule(repeater);
return ret;
} catch (...) {
return make_exception_future(std::current_exception());
Expand Down Expand Up @@ -397,14 +400,15 @@ public:
}
future<T> get_future() { return _promise.get_future(); }
virtual void run_and_dispose() noexcept override {
std::unique_ptr<repeat_until_value_state> zis{this};
if (this->_state.failed()) {
_promise.set_exception(std::move(this->_state).get_exception());
delete this;
return;
} else {
auto v = std::get<0>(std::move(this->_state).get());
if (v) {
_promise.set_value(std::move(*v));
delete this;
return;
}
this->_state = {};
Expand All @@ -413,21 +417,23 @@ public:
do {
auto f = futurator::apply(_action);
if (!f.available()) {
internal::set_callback(f, std::move(zis));
internal::set_callback(f, this);
return;
}
auto ret = f.get0();
if (ret) {
_promise.set_value(std::make_tuple(std::move(*ret)));
delete this;
return;
}
} while (!need_preempt());
} catch (...) {
_promise.set_exception(std::current_exception());
delete this;
return;
}
this->_state.set(compat::nullopt);
schedule(std::move(zis));
schedule(this);
}
};

Expand All @@ -450,7 +456,7 @@ GCC6_CONCEPT( requires requires (AsyncAction aa) {
futurize<std::result_of_t<AsyncAction()>>::apply(aa).get0().value();
} )
repeat_until_value_return_type<AsyncAction>
repeat_until_value(AsyncAction action) {
repeat_until_value(AsyncAction action) noexcept {
using futurator = futurize<std::result_of_t<AsyncAction()>>;
using type_helper = repeat_until_value_type_helper<typename futurator::type>;
// the "T" in the documentation
Expand All @@ -462,9 +468,9 @@ repeat_until_value(AsyncAction action) {
if (!f.available()) {
return [&] () noexcept {
memory::disable_failure_guard dfg;
auto state = std::make_unique<internal::repeat_until_value_state<AsyncAction, value_type>>(std::move(action));
auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
auto ret = state->get_future();
internal::set_callback(f, std::move(state));
internal::set_callback(f, state);
return ret;
}();
}
Expand All @@ -480,9 +486,9 @@ repeat_until_value(AsyncAction action) {
} while (!need_preempt());

try {
auto state = std::make_unique<internal::repeat_until_value_state<AsyncAction, value_type>>(compat::nullopt, std::move(action));
auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(compat::nullopt, std::move(action));
auto f = state->get_future();
schedule(std::move(state));
schedule(state);
return f;
} catch (...) {
return make_exception_future<value_type>(std::current_exception());
Expand All @@ -500,10 +506,10 @@ public:
explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
future<> get_future() { return _promise.get_future(); }
virtual void run_and_dispose() noexcept override {
std::unique_ptr<do_until_state> zis{this};
if (_state.available()) {
if (_state.failed()) {
_promise.set_urgent_state(std::move(_state));
delete this;
return;
}
_state = {}; // allow next cycle to overrun state
Expand All @@ -512,23 +518,26 @@ public:
do {
if (_stop()) {
_promise.set_value();
delete this;
return;
}
auto f = _action();
if (!f.available()) {
internal::set_callback(f, std::move(zis));
internal::set_callback(f, this);
return;
}
if (f.failed()) {
f.forward_to(std::move(_promise));
delete this;
return;
}
} while (!need_preempt());
} catch (...) {
_promise.set_exception(std::current_exception());
delete this;
return;
}
schedule(std::move(zis));
schedule(this);
}
};

Expand All @@ -547,7 +556,7 @@ public:
template<typename AsyncAction, typename StopCondition>
GCC6_CONCEPT( requires seastar::ApplyReturns<StopCondition, bool> && seastar::ApplyReturns<AsyncAction, future<>> )
inline
future<> do_until(StopCondition stop_cond, AsyncAction action) {
future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
using namespace internal;
using futurator = futurize<void>;
do {
Expand All @@ -558,9 +567,9 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) {
if (!f.available()) {
return [&] () noexcept {
memory::disable_failure_guard dfg;
auto task = std::make_unique<do_until_state<StopCondition, AsyncAction>>(std::move(stop_cond), std::move(action));
auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
auto ret = task->get_future();
internal::set_callback(f, std::move(task));
internal::set_callback(f, task);
return ret;
}();
}
Expand All @@ -569,9 +578,9 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) {
}
} while (!need_preempt());

auto task = std::make_unique<do_until_state<StopCondition, AsyncAction>>(std::move(stop_cond), std::move(action));
auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
auto f = task->get_future();
schedule(std::move(task));
schedule(task);
return f;
}

Expand Down Expand Up @@ -737,7 +746,7 @@ public:
return true;
} else {
auto c = new (continuation) when_all_state_component(wasb, f);
set_callback(*f, std::unique_ptr<when_all_state_component>(c));
set_callback(*f, c);
return false;
}
}
Expand Down
34 changes: 17 additions & 17 deletions include/seastar/core/future.hh
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ template <typename... T>
future<T...> make_exception_future(future_state_base&& state) noexcept;

template <typename... T, typename U>
void set_callback(future<T...>& fut, std::unique_ptr<U> callback);
void set_callback(future<T...>& fut, U* callback) noexcept;

class future_base;

Expand All @@ -509,7 +509,7 @@ protected:
// details.
future_state_base* _state;

std::unique_ptr<task> _task;
task* _task = nullptr;

promise_base(const promise_base&) = delete;
promise_base(future_state_base* state) noexcept : _state(state) {}
Expand Down Expand Up @@ -607,19 +607,19 @@ public:
#if SEASTAR_COROUTINES_TS
void set_coroutine(future_state<T...>& state, task& coroutine) noexcept {
_state = &state;
_task = std::unique_ptr<task>(&coroutine);
_task = &coroutine;
}
#endif
private:
template <typename Func>
void schedule(Func&& func) {
auto tws = std::make_unique<continuation<Func, T...>>(std::move(func));
void schedule(Func&& func) noexcept {
auto tws = new continuation<Func, T...>(std::move(func));
_state = &tws->_state;
_task = std::move(tws);
_task = tws;
}
void schedule(std::unique_ptr<continuation_base<T...>> callback) {
void schedule(continuation_base<T...>* callback) noexcept {
_state = &callback->_state;
_task = std::move(callback);
_task = callback;
}

template <typename... U>
Expand Down Expand Up @@ -968,12 +968,12 @@ private:
return static_cast<internal::promise_base_with_type<T...>*>(future_base::detach_promise());
}
template <typename Func>
void schedule(Func&& func) {
void schedule(Func&& func) noexcept {
if (_state.available() || !_promise) {
if (__builtin_expect(!_state.available() && !_promise, false)) {
_state.set_to_broken_promise();
}
::seastar::schedule(std::make_unique<continuation<Func, T...>>(std::move(func), std::move(_state)));
::seastar::schedule(new continuation<Func, T...>(std::move(func), std::move(_state)));
} else {
assert(_promise);
detach_promise()->schedule(std::move(func));
Expand Down Expand Up @@ -1095,7 +1095,7 @@ private:
auto thread = thread_impl::get();
assert(thread);
thread_wake_task wake_task{thread, this};
detach_promise()->schedule(std::unique_ptr<continuation_base<T...>>(&wake_task));
detach_promise()->schedule(static_cast<continuation_base<T...>*>(&wake_task));
thread_impl::switch_out(thread);
}

Expand Down Expand Up @@ -1443,13 +1443,13 @@ public:
}
#endif
private:
void set_callback(std::unique_ptr<continuation_base<T...>> callback) {
void set_callback(continuation_base<T...>* callback) noexcept {
if (_state.available()) {
callback->set_state(get_available_state_ref());
::seastar::schedule(std::move(callback));
::seastar::schedule(callback);
} else {
assert(_promise);
detach_promise()->schedule(std::move(callback));
detach_promise()->schedule(callback);
}

}
Expand All @@ -1470,7 +1470,7 @@ private:
template <typename... U>
friend future<U...> internal::make_exception_future(future_state_base&& state) noexcept;
template <typename... U, typename V>
friend void internal::set_callback(future<U...>&, std::unique_ptr<V>);
friend void internal::set_callback(future<U...>&, V*) noexcept;
/// \endcond
};

Expand Down Expand Up @@ -1717,10 +1717,10 @@ namespace internal {

template <typename... T, typename U>
inline
void set_callback(future<T...>& fut, std::unique_ptr<U> callback) {
void set_callback(future<T...>& fut, U* callback) noexcept {
// It would be better to use continuation_base<T...> for U, but
// then a derived class of continuation_base<T...> won't be matched
return fut.set_callback(std::move(callback));
return fut.set_callback(callback);
}

}
Expand Down
Loading

0 comments on commit 33406cf

Please sign in to comment.