From 73edad92ebeab9e955b51d2fb39e25dbff8c94ac Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 21 Aug 2023 19:59:51 +0200 Subject: [PATCH] DPL: attempt recovering backpressure on send --- Framework/Core/src/SendingPolicy.cxx | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/Framework/Core/src/SendingPolicy.cxx b/Framework/Core/src/SendingPolicy.cxx index 17faef6dbd3d0..1fe204354a002 100644 --- a/Framework/Core/src/SendingPolicy.cxx +++ b/Framework/Core/src/SendingPolicy.cxx @@ -18,7 +18,9 @@ #include "Framework/Logger.h" #include "Headers/STFHeader.h" #include "DeviceSpecHelpers.h" +#include "Framework/DeviceState.h" #include +#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpedantic" @@ -113,9 +115,18 @@ std::vector SendingPolicy::createDefaultPolicies() auto res = channel->Send(parts, timeout); if (res == (size_t)fair::mq::TransferCode::timeout) { LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on {}.", timeout/1000, channel->GetName()); - channel->Send(parts); + while (res == (size_t)fair::mq::TransferCode::timeout) { + res = channel->Send(parts, timeout); + auto& deviceState = registry.get(); + uv_run(deviceState.loop, UV_RUN_NOWAIT); + if (deviceState.nextFairMQState.empty() == false) { + LOGP(warning, "Device state changed to {} while we were waiting for backpressure to finish", + deviceState.nextFairMQState.back()); + } + } LOGP(info, "Downstream backpressure on {} recovered.", channel->GetName()); - } else if (res == (size_t) fair::mq::TransferCode::error) { + } + if (res == (size_t)fair::mq::TransferCode::error) { LOGP(fatal, "Error while sending on channel {}", channel->GetName()); } }}}; }