Skip to content

Commit

Permalink
Merge pull request #12 from tenzir/topic/experimental-rp-fix
Browse files Browse the repository at this point in the history
Fix delivering response promises in actor state destructors
  • Loading branch information
dominiklohmann authored Jan 17, 2024
2 parents 261a6b3 + bf88d1c commit e1d9102
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
4 changes: 3 additions & 1 deletion libcaf_core/caf/actor_control_block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ CAF_CORE_EXPORT void intrusive_ptr_release_weak(actor_control_block* x);

/// @relates actor_control_block
inline void intrusive_ptr_add_ref(actor_control_block* x) {
x->strong_refs.fetch_add(1, std::memory_order_relaxed);
if (CAF_UNLIKELY(x->strong_refs.fetch_add(1, std::memory_order_relaxed) == 0)) {
CAF_CRITICAL("increased the strong reference count of an expired actor");
}
}

/// @relates actor_control_block
Expand Down
37 changes: 25 additions & 12 deletions libcaf_core/src/response_promise.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,35 @@ void response_promise::state::cancel() {

void response_promise::state::deliver_impl(message msg) {
CAF_LOG_TRACE(CAF_ARG(msg));
// Even though we are holding a weak pointer, we can access the pointer
// without any additional check here because only the actor itself is allowed
// to call this function.
auto self = static_cast<local_actor*>(weak_self.get()->get());
auto cancel_guard = detail::make_scope_guard([this] {
cancel();
});
if (msg.empty() && id.is_async()) {
CAF_LOG_DEBUG("drop response: empty response to asynchronous input");
} else if (!stages.empty()) {
return;
}
if (source == nullptr) {
CAF_LOG_DEBUG("drop response: source is nullptr");
return;
}
auto self = weak_self.lock();
if (self == nullptr) {
auto element = make_mailbox_element(self, id.response_id(),
std::move(stages),
std::move(msg));
source->enqueue(std::move(element), nullptr);
return;
}
auto local_self = static_cast<local_actor*>(self->get());
if (!stages.empty()) {
auto next = std::move(stages.back());
stages.pop_back();
detail::profiled_send(self, std::move(source), next, id, std::move(stages),
self->context(), std::move(msg));
} else if (source != nullptr) {
detail::profiled_send(self, self->ctrl(), source, id.response_id(),
forwarding_stack{}, self->context(), std::move(msg));
}
cancel();
detail::profiled_send(local_self, std::move(source), next, id, std::move(stages),
local_self->context(), std::move(msg));
return;
}
detail::profiled_send(local_self, local_self->ctrl(), source, id.response_id(),
forwarding_stack{}, local_self->context(), std::move(msg));
}

void response_promise::state::delegate_impl(abstract_actor* receiver,
Expand Down

0 comments on commit e1d9102

Please sign in to comment.