Skip to content

Commit

Permalink
Revert "task: stop wrapping tasks with unique_ptr"
Browse files Browse the repository at this point in the history
This reverts commit 33406cf. It
introduces memory leaks:

Direct leak of 24 byte(s) in 1 object(s) allocated from:
    #0 0x7fb773b389d7 in operator new(unsigned long) (/lib64/libasan.so.5+0x10f9d7)
    ceph#1 0x108f0d4 in seastar::reactor::poller::~poller() ../src/core/reactor.cc:2879
    ceph#2 0x11c1e59 in std::experimental::fundamentals_v1::_Optional_base<seastar::reactor::poller, true>::~_Optional_base() /usr/include/c++/9/experimental/optional:288
    ceph#3 0x118f2d7 in std::experimental::fundamentals_v1::optional<seastar::reactor::poller>::~optional() /usr/include/c++/9/experimental/optional:491
    ceph#4 0x108c5a5 in seastar::reactor::run() ../src/core/reactor.cc:2587
    ceph#5 0xf1a822 in seastar::app_template::run_deprecated(int, char**, std::function<void ()>&&) ../src/core/app-template.cc:199
    ceph#6 0xf1885d in seastar::app_template::run(int, char**, std::function<seastar::future<int> ()>&&) ../src/core/app-template.cc:115
    ceph#7 0xeb2735 in operator() ../src/testing/test_runner.cc:72
    ceph#8 0xebb342 in _M_invoke /usr/include/c++/9/bits/std_function.h:300
    ceph#9 0xf3d8b0 in std::function<void ()>::operator()() const /usr/include/c++/9/bits/std_function.h:690
    ceph#10 0x1034c72 in seastar::posix_thread::start_routine(void*) ../src/core/posix.cc:52
    ceph#11 0x7fb7738804e1 in start_thread /usr/src/debug/glibc-2.30-13-g919af705ee/nptl/pthread_create.c:479

Reported-by: Rafael Avila de Espindola <[email protected]>
  • Loading branch information
avikivity committed Dec 31, 2019
1 parent 33406cf commit 8fd8568
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 93 deletions.
8 changes: 4 additions & 4 deletions include/seastar/core/do_with.hh
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ public:
template<typename T, typename F>
inline
auto do_with(T&& rvalue, F&& f) {
auto task = new internal::do_with_state<T, std::result_of_t<F(T&)>>(std::forward<T>(rvalue));
auto task = std::make_unique<internal::do_with_state<T, std::result_of_t<F(T&)>>>(std::forward<T>(rvalue));
auto fut = f(task->data());
if (fut.available()) {
return fut;
}
auto ret = task->get_future();
internal::set_callback(fut, task);
internal::set_callback(fut, std::move(task));
return ret;
}

Expand Down Expand Up @@ -148,13 +148,13 @@ do_with(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more) {
auto&& just_func = std::move(std::get<nr>(std::move(all)));
using value_tuple = std::remove_reference_t<decltype(just_values)>;
using ret_type = decltype(apply(just_func, just_values));
auto task = new internal::do_with_state<value_tuple, ret_type>(std::move(just_values));
auto task = std::make_unique<internal::do_with_state<value_tuple, ret_type>>(std::move(just_values));
auto fut = apply(just_func, task->data());
if (fut.available()) {
return fut;
}
auto ret = task->get_future();
internal::set_callback(fut, task);
internal::set_callback(fut, std::move(task));
return ret;
}

Expand Down
63 changes: 27 additions & 36 deletions include/seastar/core/future-util.hh
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private:
// Wait for one of the futures in _incomplete to complete, and then
// decide what to do: wait for another one, or deliver _result if all
// are complete.
void wait_for_one() noexcept {
void wait_for_one() {
// Process from back to front, on the assumption that the front
// futures are likely to complete earlier than the back futures.
// If that's indeed the case, then the front futures will be
Expand All @@ -135,7 +135,7 @@ private:

// If there's an incompelete future, wait for it.
if (!_incomplete.empty()) {
internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
internal::set_callback(_incomplete.back(), std::unique_ptr<continuation_base<>>(this));
// This future's state will be collected in run_and_dispose(), so we can drop it.
_incomplete.pop_back();
return;
Expand Down Expand Up @@ -280,14 +280,13 @@ public:
}
future<> get_future() { return _promise.get_future(); }
virtual void run_and_dispose() noexcept override {
std::unique_ptr<repeater> zis{this};
if (_state.failed()) {
_promise.set_exception(std::move(_state).get_exception());
delete this;
return;
} else {
if (std::get<0>(_state.get()) == stop_iteration::yes) {
_promise.set_value();
delete this;
return;
}
_state = {};
Expand All @@ -296,22 +295,20 @@ public:
do {
auto f = futurator::apply(_action);
if (!f.available()) {
internal::set_callback(f, this);
internal::set_callback(f, std::move(zis));
return;
}
if (f.get0() == stop_iteration::yes) {
_promise.set_value();
delete this;
return;
}
} while (!need_preempt());
} catch (...) {
_promise.set_exception(std::current_exception());
delete this;
return;
}
_state.set(stop_iteration::no);
schedule(this);
schedule(std::move(zis));
}
};

Expand All @@ -331,7 +328,7 @@ public:
template<typename AsyncAction>
GCC6_CONCEPT( requires seastar::ApplyReturns<AsyncAction, stop_iteration> || seastar::ApplyReturns<AsyncAction, future<stop_iteration>> )
inline
future<> repeat(AsyncAction action) noexcept {
future<> repeat(AsyncAction action) {
using futurator = futurize<std::result_of_t<AsyncAction()>>;
static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
try {
Expand All @@ -342,9 +339,9 @@ future<> repeat(AsyncAction action) noexcept {
if (!f.available()) {
return [&] () noexcept {
memory::disable_failure_guard dfg;
auto repeater = new internal::repeater<AsyncAction>(std::move(action));
auto repeater = std::make_unique<internal::repeater<AsyncAction>>(std::move(action));
auto ret = repeater->get_future();
internal::set_callback(f, repeater);
internal::set_callback(f, std::move(repeater));
return ret;
}();
}
Expand All @@ -354,9 +351,9 @@ future<> repeat(AsyncAction action) noexcept {
}
} while (!need_preempt());

auto repeater = new internal::repeater<AsyncAction>(stop_iteration::no, std::move(action));
auto repeater = std::make_unique<internal::repeater<AsyncAction>>(stop_iteration::no, std::move(action));
auto ret = repeater->get_future();
schedule(repeater);
schedule(std::move(repeater));
return ret;
} catch (...) {
return make_exception_future(std::current_exception());
Expand Down Expand Up @@ -400,15 +397,14 @@ public:
}
future<T> get_future() { return _promise.get_future(); }
virtual void run_and_dispose() noexcept override {
std::unique_ptr<repeat_until_value_state> zis{this};
if (this->_state.failed()) {
_promise.set_exception(std::move(this->_state).get_exception());
delete this;
return;
} else {
auto v = std::get<0>(std::move(this->_state).get());
if (v) {
_promise.set_value(std::move(*v));
delete this;
return;
}
this->_state = {};
Expand All @@ -417,23 +413,21 @@ public:
do {
auto f = futurator::apply(_action);
if (!f.available()) {
internal::set_callback(f, this);
internal::set_callback(f, std::move(zis));
return;
}
auto ret = f.get0();
if (ret) {
_promise.set_value(std::make_tuple(std::move(*ret)));
delete this;
return;
}
} while (!need_preempt());
} catch (...) {
_promise.set_exception(std::current_exception());
delete this;
return;
}
this->_state.set(compat::nullopt);
schedule(this);
schedule(std::move(zis));
}
};

Expand All @@ -456,7 +450,7 @@ GCC6_CONCEPT( requires requires (AsyncAction aa) {
futurize<std::result_of_t<AsyncAction()>>::apply(aa).get0().value();
} )
repeat_until_value_return_type<AsyncAction>
repeat_until_value(AsyncAction action) noexcept {
repeat_until_value(AsyncAction action) {
using futurator = futurize<std::result_of_t<AsyncAction()>>;
using type_helper = repeat_until_value_type_helper<typename futurator::type>;
// the "T" in the documentation
Expand All @@ -468,9 +462,9 @@ repeat_until_value(AsyncAction action) noexcept {
if (!f.available()) {
return [&] () noexcept {
memory::disable_failure_guard dfg;
auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
auto state = std::make_unique<internal::repeat_until_value_state<AsyncAction, value_type>>(std::move(action));
auto ret = state->get_future();
internal::set_callback(f, state);
internal::set_callback(f, std::move(state));
return ret;
}();
}
Expand All @@ -486,9 +480,9 @@ repeat_until_value(AsyncAction action) noexcept {
} while (!need_preempt());

try {
auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(compat::nullopt, std::move(action));
auto state = std::make_unique<internal::repeat_until_value_state<AsyncAction, value_type>>(compat::nullopt, std::move(action));
auto f = state->get_future();
schedule(state);
schedule(std::move(state));
return f;
} catch (...) {
return make_exception_future<value_type>(std::current_exception());
Expand All @@ -506,10 +500,10 @@ public:
explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
future<> get_future() { return _promise.get_future(); }
virtual void run_and_dispose() noexcept override {
std::unique_ptr<do_until_state> zis{this};
if (_state.available()) {
if (_state.failed()) {
_promise.set_urgent_state(std::move(_state));
delete this;
return;
}
_state = {}; // allow next cycle to overrun state
Expand All @@ -518,26 +512,23 @@ public:
do {
if (_stop()) {
_promise.set_value();
delete this;
return;
}
auto f = _action();
if (!f.available()) {
internal::set_callback(f, this);
internal::set_callback(f, std::move(zis));
return;
}
if (f.failed()) {
f.forward_to(std::move(_promise));
delete this;
return;
}
} while (!need_preempt());
} catch (...) {
_promise.set_exception(std::current_exception());
delete this;
return;
}
schedule(this);
schedule(std::move(zis));
}
};

Expand All @@ -556,7 +547,7 @@ public:
template<typename AsyncAction, typename StopCondition>
GCC6_CONCEPT( requires seastar::ApplyReturns<StopCondition, bool> && seastar::ApplyReturns<AsyncAction, future<>> )
inline
future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
future<> do_until(StopCondition stop_cond, AsyncAction action) {
using namespace internal;
using futurator = futurize<void>;
do {
Expand All @@ -567,9 +558,9 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
if (!f.available()) {
return [&] () noexcept {
memory::disable_failure_guard dfg;
auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
auto task = std::make_unique<do_until_state<StopCondition, AsyncAction>>(std::move(stop_cond), std::move(action));
auto ret = task->get_future();
internal::set_callback(f, task);
internal::set_callback(f, std::move(task));
return ret;
}();
}
Expand All @@ -578,9 +569,9 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
}
} while (!need_preempt());

auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
auto task = std::make_unique<do_until_state<StopCondition, AsyncAction>>(std::move(stop_cond), std::move(action));
auto f = task->get_future();
schedule(task);
schedule(std::move(task));
return f;
}

Expand Down Expand Up @@ -746,7 +737,7 @@ public:
return true;
} else {
auto c = new (continuation) when_all_state_component(wasb, f);
set_callback(*f, c);
set_callback(*f, std::unique_ptr<when_all_state_component>(c));
return false;
}
}
Expand Down
34 changes: 17 additions & 17 deletions include/seastar/core/future.hh
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ template <typename... T>
future<T...> make_exception_future(future_state_base&& state) noexcept;

template <typename... T, typename U>
void set_callback(future<T...>& fut, U* callback) noexcept;
void set_callback(future<T...>& fut, std::unique_ptr<U> callback);

class future_base;

Expand All @@ -509,7 +509,7 @@ protected:
// details.
future_state_base* _state;

task* _task = nullptr;
std::unique_ptr<task> _task;

promise_base(const promise_base&) = delete;
promise_base(future_state_base* state) noexcept : _state(state) {}
Expand Down Expand Up @@ -607,19 +607,19 @@ public:
#if SEASTAR_COROUTINES_TS
void set_coroutine(future_state<T...>& state, task& coroutine) noexcept {
_state = &state;
_task = &coroutine;
_task = std::unique_ptr<task>(&coroutine);
}
#endif
private:
template <typename Func>
void schedule(Func&& func) noexcept {
auto tws = new continuation<Func, T...>(std::move(func));
void schedule(Func&& func) {
auto tws = std::make_unique<continuation<Func, T...>>(std::move(func));
_state = &tws->_state;
_task = tws;
_task = std::move(tws);
}
void schedule(continuation_base<T...>* callback) noexcept {
void schedule(std::unique_ptr<continuation_base<T...>> callback) {
_state = &callback->_state;
_task = callback;
_task = std::move(callback);
}

template <typename... U>
Expand Down Expand Up @@ -968,12 +968,12 @@ private:
return static_cast<internal::promise_base_with_type<T...>*>(future_base::detach_promise());
}
template <typename Func>
void schedule(Func&& func) noexcept {
void schedule(Func&& func) {
if (_state.available() || !_promise) {
if (__builtin_expect(!_state.available() && !_promise, false)) {
_state.set_to_broken_promise();
}
::seastar::schedule(new continuation<Func, T...>(std::move(func), std::move(_state)));
::seastar::schedule(std::make_unique<continuation<Func, T...>>(std::move(func), std::move(_state)));
} else {
assert(_promise);
detach_promise()->schedule(std::move(func));
Expand Down Expand Up @@ -1095,7 +1095,7 @@ private:
auto thread = thread_impl::get();
assert(thread);
thread_wake_task wake_task{thread, this};
detach_promise()->schedule(static_cast<continuation_base<T...>*>(&wake_task));
detach_promise()->schedule(std::unique_ptr<continuation_base<T...>>(&wake_task));
thread_impl::switch_out(thread);
}

Expand Down Expand Up @@ -1443,13 +1443,13 @@ public:
}
#endif
private:
void set_callback(continuation_base<T...>* callback) noexcept {
void set_callback(std::unique_ptr<continuation_base<T...>> callback) {
if (_state.available()) {
callback->set_state(get_available_state_ref());
::seastar::schedule(callback);
::seastar::schedule(std::move(callback));
} else {
assert(_promise);
detach_promise()->schedule(callback);
detach_promise()->schedule(std::move(callback));
}

}
Expand All @@ -1470,7 +1470,7 @@ private:
template <typename... U>
friend future<U...> internal::make_exception_future(future_state_base&& state) noexcept;
template <typename... U, typename V>
friend void internal::set_callback(future<U...>&, V*) noexcept;
friend void internal::set_callback(future<U...>&, std::unique_ptr<V>);
/// \endcond
};

Expand Down Expand Up @@ -1717,10 +1717,10 @@ namespace internal {

template <typename... T, typename U>
inline
void set_callback(future<T...>& fut, U* callback) noexcept {
void set_callback(future<T...>& fut, std::unique_ptr<U> callback) {
// It would be better to use continuation_base<T...> for U, but
// then a derived class of continuation_base<T...> won't be matched
return fut.set_callback(callback);
return fut.set_callback(std::move(callback));
}

}
Expand Down
Loading

0 comments on commit 8fd8568

Please sign in to comment.