Skip to content

Commit

Permalink
DPL: attempt recovering backpressure on send
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Oct 13, 2023
1 parent facbf10 commit 0664a47
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions Framework/Core/src/SendingPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include "Framework/Logger.h"
#include "Headers/STFHeader.h"
#include "DeviceSpecHelpers.h"
#include "Framework/DeviceState.h"
#include <fairmq/Device.h>
#include <uv.h>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"
Expand Down Expand Up @@ -113,9 +115,18 @@ std::vector<SendingPolicy> SendingPolicy::createDefaultPolicies()
auto res = channel->Send(parts, timeout);
if (res == (size_t)fair::mq::TransferCode::timeout) {
LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on {}.", timeout/1000, channel->GetName());
channel->Send(parts);
while (res == (size_t)fair::mq::TransferCode::timeout) {
res = channel->Send(parts, timeout);
auto& deviceState = registry.get<DeviceState>();
uv_run(deviceState.loop, UV_RUN_NOWAIT);
if (deviceState.nextFairMQState.empty() == false) {
LOGP(warning, "Device state changed to {} while we were waiting for backpressure to finish",
deviceState.nextFairMQState.back());
}
}
LOGP(info, "Downstream backpressure on {} recovered.", channel->GetName());
} else if (res == (size_t) fair::mq::TransferCode::error) {
}
if (res == (size_t) fair::mq::TransferCode::error) {
LOGP(fatal, "Error while sending on channel {}", channel->GetName());
} }}};
}
Expand Down

0 comments on commit 0664a47

Please sign in to comment.