Skip to content

Commit

Permalink
QC-1253 Mergers: Shorter latency with multiple layers (AliceO2Group#1…
Browse files Browse the repository at this point in the history
…3782)

If we run multiple layers of Mergers, the merged object arrival time can be described as: merger cycle duration * number of layers (it can be shorter due to randomized timer shifts at startup).
As a consequence, adding each new layer adds the latency to the merger topology.

Assuming that the deployed Mergers are not expendable, we can rely on expecting the right number of input messages to know that each Merger in the lower layer produced an update, so we can publish the merged object.
As an effect, we get lower latency.
  • Loading branch information
knopers8 authored Dec 9, 2024
1 parent 5c52a4b commit 24e05f9
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 12 deletions.
1 change: 1 addition & 0 deletions Utilities/Mergers/include/Mergers/FullHistoryMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class FullHistoryMerger : public framework::Task
void mergeCache();
void publish(framework::DataAllocator& allocator);
void clear();
bool shouldFinishCycle(const framework::InputRecord& inputs) const;
};

} // namespace o2::mergers
Expand Down
1 change: 1 addition & 0 deletions Utilities/Mergers/include/Mergers/IntegratingMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class IntegratingMerger : public framework::Task
void publishMovingWindow(framework::DataAllocator& allocator);
static void merge(ObjectStore& mMergedDelta, ObjectStore&& other);
void clear();
bool shouldFinishCycle(const framework::InputRecord&) const;

private:
header::DataHeader::SubSpecificationType mSubSpec;
Expand Down
8 changes: 5 additions & 3 deletions Utilities/Mergers/include/Mergers/MergerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ enum class MergedObjectTimespan {
// when InputObjectsTimespan::FullHistory is set.
LastDifference,
// Generalisation of the two above. Resets all objects in Mergers after n cycles (0 - infinite).
// The the above will be removed once we switch to NCycles in QC.
// The above will be removed once we switch to NCycles in QC.
NCycles
};

Expand All @@ -52,7 +52,8 @@ enum class PublishMovingWindow {
};

enum class PublicationDecision {
EachNSeconds, // Merged object is published each N seconds. This can evolve over time, thus we expect pairs specifying N:duration1, M:duration2...
EachNSeconds, // Merged object is published each N seconds. This can evolve over time, thus we expect pairs specifying N:duration1, M:duration2...
EachNArrivals, // Merged object is published whenever we receive N new input objects.
};

enum class TopologySize {
Expand All @@ -66,6 +67,7 @@ enum class ParallelismType {
RoundRobin // Mergers receive their input messages in round robin order. Useful when there is one InputSpec with a wildcard.
};

// fixme: this way of configuring mergers should be refactored, it does not make sense that we share `param`s across for different enum values.
template <typename V, typename P = double>
struct ConfigEntry {
V value;
Expand All @@ -82,7 +84,7 @@ class PublicationDecisionParameter
PublicationDecisionParameter(size_t param) : decision({{param, 1}}) {}
PublicationDecisionParameter(const std::vector<std::pair<size_t, size_t>>& decision) : decision(decision) {}

std::vector<std::pair<size_t, size_t>> decision;
std::vector<std::pair<size_t /* cycle duration seconds */, size_t /* validity seconds */>> decision;
};

// todo rework configuration in a way that user cannot create an invalid configuration
Expand Down
17 changes: 16 additions & 1 deletion Utilities/Mergers/src/FullHistoryMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx)
}
}

if (ctx.inputs().isValid("timer-publish") && !mFirstObjectSerialized.first.empty()) {
if (shouldFinishCycle(ctx.inputs())) {
mCyclesSinceReset++;
mergeCache();
publish(ctx.outputs());
Expand All @@ -88,6 +88,21 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx)
}
}

bool FullHistoryMerger::shouldFinishCycle(const framework::InputRecord& inputs) const
{
if (mFirstObjectSerialized.first.empty()) {
return false;
}

if (mConfig.publicationDecision.value == PublicationDecision::EachNSeconds) {
return inputs.isValid("timer-publish");
} else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) {
return mUpdatesReceived > 0 && mUpdatesReceived % mConfig.publicationDecision.param.decision.begin()->first == 0;
} else {
throw std::runtime_error("unsupported publication decision parameter");
}
}

void FullHistoryMerger::endOfStream(framework::EndOfStreamContext& eosContext)
{
mergeCache();
Expand Down
13 changes: 12 additions & 1 deletion Utilities/Mergers/src/IntegratingMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,22 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx)
}
}

if (ctx.inputs().isValid("timer-publish")) {
if (shouldFinishCycle(ctx.inputs())) {
finishCycle(ctx.outputs());
}
}

bool IntegratingMerger::shouldFinishCycle(const framework::InputRecord& inputs) const
{
if (mConfig.publicationDecision.value == PublicationDecision::EachNSeconds) {
return inputs.isValid("timer-publish");
} else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) {
return mDeltasMerged > 0 && mDeltasMerged % mConfig.publicationDecision.param.decision.begin()->first == 0;
} else {
throw std::runtime_error("unsupported publication decision parameter");
}
}

void IntegratingMerger::finishCycle(DataAllocator& outputs)
{
mCyclesSinceReset++;
Expand Down
15 changes: 11 additions & 4 deletions Utilities/Mergers/src/MergerInfrastructureBuilder.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
auto layerInputs = mInputs;

// preparing some numbers
auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size());
const auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size());
const bool expendable = std::ranges::any_of(mConfig.labels, [](const auto& label) { return label.value == "expendable"; });

// topology generation
MergerBuilder mergerBuilder;
Expand All @@ -150,7 +151,6 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
// we also expect moving windows to be published only by the last layer
layerConfig.publishMovingWindow = {PublishMovingWindow::No};
}
mergerBuilder.setConfig(layerConfig);

framework::Inputs nextLayerInputs;
auto inputsRangeBegin = layerInputs.begin();
Expand All @@ -162,20 +162,27 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()

auto inputsRangeEnd = inputsRangeBegin + inputsPerMerger + (m < inputsPerMergerRemainder);
mergerBuilder.setInputSpecs(framework::Inputs(inputsRangeBegin, inputsRangeEnd));
inputsRangeBegin = inputsRangeEnd;

if (layer > 1 && !expendable) {
// we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs.
// we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes.
const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd);
assert(inputNumber != 0);
layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber};
}
if (layer == mergersPerLayer.size() - 1) {
// the last layer => use the specified external OutputSpec
mergerBuilder.setOutputSpec(mOutputSpecIntegral);
}

mergerBuilder.setConfig(layerConfig);
auto merger = mergerBuilder.buildSpec();

auto input = DataSpecUtils::matchingInput(merger.outputs.at(0));
input.binding = "in";
nextLayerInputs.push_back(input);

workflow.emplace_back(std::move(merger));
inputsRangeBegin = inputsRangeEnd;
}
layerInputs = nextLayerInputs; // todo: could be optimised with pointers
}
Expand Down
6 changes: 3 additions & 3 deletions Utilities/Mergers/test/mergersBenchmarkTopology.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
for (size_t p = 0; p < objectsProducers; p++) {
mergersInputs.push_back({ "mo", "TST",
"HISTO", static_cast<o2::header::DataHeader::SubSpecificationType>(p + 1),
Lifetime::Timeframe });
Lifetime::Sporadic });
DataProcessorSpec producer{
"producer-histo" + std::to_string(p), Inputs{},
Outputs{ { { "mo" },
"TST",
"HISTO",
static_cast<o2::header::DataHeader::SubSpecificationType>(p + 1),
Lifetime::Timeframe } },
Lifetime::Sporadic } },
AlgorithmSpec{
(AlgorithmSpec::ProcessCallback)[ p, periodus = int(1000000 / objectsRate), objectsBins, objectsProducers ](
ProcessingContext& processingContext) mutable { static auto lastTime = steady_clock::now();
Expand Down Expand Up @@ -115,7 +115,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
DataProcessorSpec printer{
"printer-bins",
Inputs{
{ "histo", "TST", "HISTO", 0 }
{ "histo", "TST", "HISTO", 0, Lifetime::Sporadic }
},
Outputs{},
AlgorithmSpec{
Expand Down

0 comments on commit 24e05f9

Please sign in to comment.