From 1804eb4148ea8a3a81a40052e845d7a9ca9f1bd5 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 21 Aug 2023 19:59:51 +0200 Subject: [PATCH] DPL: attempt recovering backpressure on send --- Framework/Core/src/SendingPolicy.cxx | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Framework/Core/src/SendingPolicy.cxx b/Framework/Core/src/SendingPolicy.cxx index 17faef6dbd3d0..81f221ce5cbff 100644 --- a/Framework/Core/src/SendingPolicy.cxx +++ b/Framework/Core/src/SendingPolicy.cxx @@ -18,7 +18,9 @@ #include "Framework/Logger.h" #include "Headers/STFHeader.h" #include "DeviceSpecHelpers.h" +#include "Framework/DeviceState.h" #include +#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpedantic" @@ -113,7 +115,15 @@ std::vector SendingPolicy::createDefaultPolicies() auto res = channel->Send(parts, timeout); if (res == (size_t)fair::mq::TransferCode::timeout) { LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on {}.", timeout/1000, channel->GetName()); - channel->Send(parts); + while (res == (size_t)fair::mq::TransferCode::timeout) { + res = channel->Send(parts, timeout); + auto& deviceState = registry.get(); + uv_run(deviceState.loop, UV_RUN_NOWAIT); + if (deviceState.nextFairMQState.empty() == false) { + LOGP(warning, "Device state changed to {} while we were waiting for backpressure to finish", + deviceState.nextFairMQState.back()); + } + } LOGP(info, "Downstream backpressure on {} recovered.", channel->GetName()); } else if (res == (size_t) fair::mq::TransferCode::error) { LOGP(fatal, "Error while sending on channel {}", channel->GetName());