Skip to content

Commit

Permalink
DPL: detect when a Lifetime::Timeframe output is missing
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Jan 12, 2024
1 parent cde3bb1 commit c4ab495
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 0 deletions.
1 change: 1 addition & 0 deletions Framework/Core/include/Framework/MessageContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ class MessageContext
o2::header::Stack* findMessageHeaderStack(const Output& spec);
int countDeviceOutputs(bool excludeDPLOrigin = false);
void fakeDispatch() { mDidDispatch = true; }
bool didDispatch() { return mDidDispatch; }
o2::framework::DataProcessingHeader* findMessageDataProcessingHeader(const Output& spec);
std::pair<o2::header::DataHeader*, o2::framework::DataProcessingHeader*> findMessageHeaders(const Output& spec);

Expand Down
7 changes: 7 additions & 0 deletions Framework/Core/include/Framework/StreamContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ struct StreamContext {
// Notice that in such a case all the services will be created upfront, so
// the callback will be called for all of them.
std::vector<ServiceStartStreamHandle> preStartStreamHandles;

// Information on wether or not all the required routes have been created.
// This is used to check if the LifetimeTimeframe routes were all created
// for a given iteration.
// This is in the stream context to allow tracking data creation on a per thread
// basis.
std::vector<bool> routeCreated;
};

} // namespace o2::framework
Expand Down
42 changes: 42 additions & 0 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,48 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec()
.uniqueId = simpleServiceId<StreamContext>(),
.init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
.configure = noConfiguration(),
.preProcessing = [](ProcessingContext& context, void* service) {
auto* stream = (StreamContext*)service;
auto& routes = context.services().get<DeviceSpec const>().outputs;
// Notice I need to do this here, because different invocation for
// the same stream might be referring to different data processors.
// We should probably have a context which is per stream of a specific
// data processor.
stream->routeCreated.resize(routes.size());
// Reset the routeCreated at every processing step
std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false); },
.postProcessing = [](ProcessingContext& processingContext, void* service) {
auto* stream = (StreamContext*)service;
auto& routes = processingContext.services().get<DeviceSpec const>().outputs;
auto& timeslice = processingContext.services().get<TimingInfo>().timeslice;
auto& messageContext = processingContext.services().get<MessageContext>();
// Check if we never created any data for this timeslice
// if we did not, but we still have didDispatched set to true
// it means it was created out of band.
bool didCreate = false;
for (size_t ri = 0; ri < routes.size(); ++ri) {
if (stream->routeCreated[ri] == true) {
didCreate = true;
break;
}
}
if (didCreate == false && messageContext.didDispatch() == true) {
LOGP(debug, "Data created out of band");
return;
}
for (size_t ri = 0; ri < routes.size(); ++ri) {
if (stream->routeCreated[ri] == true) {
continue;
}
auto &route = routes[ri];
auto &matcher = route.matcher;
if ((timeslice % route.maxTimeslices) != route.timeslice) {
continue;
}
if (matcher.lifetime == Lifetime::Timeframe) {
LOGP(error, "Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes", DataSpecUtils::describe(matcher), timeslice);
}
} },
.kind = ServiceKind::Stream};
}

Expand Down
3 changes: 3 additions & 0 deletions Framework/Core/src/DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "Framework/FairMQResizableBuffer.h"
#include "Framework/DataProcessingContext.h"
#include "Framework/DeviceSpec.h"
#include "Framework/StreamContext.h"
#include "Headers/Stack.h"

#include <fairmq/Device.h>
Expand Down Expand Up @@ -47,10 +48,12 @@ DataAllocator::DataAllocator(ServiceRegistryRef contextRegistry)
RouteIndex DataAllocator::matchDataHeader(const Output& spec, size_t timeslice)
{
auto& allowedOutputRoutes = mRegistry.get<DeviceSpec const>().outputs;
auto& stream = mRegistry.get<o2::framework::StreamContext>();
// FIXME: we should take timeframeId into account as well.
for (auto ri = 0; ri < allowedOutputRoutes.size(); ++ri) {
auto& route = allowedOutputRoutes[ri];
if (DataSpecUtils::match(route.matcher, spec.origin, spec.description, spec.subSpec) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
stream.routeCreated[ri] = true;
return RouteIndex{ri};
}
}
Expand Down
3 changes: 3 additions & 0 deletions Framework/TestWorkflows/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
o2_add_dpl_workflow(dummy-workflow
SOURCES src/o2DummyWorkflow.cxx
COMPONENT_NAME TestWorkflows)
o2_add_dpl_workflow(detect-missing-timeframe
SOURCES test/test_DetectMissingTimeframe.cxx
COMPONENT_NAME TestWorkflows)

o2_add_dpl_workflow(o2rootmessage-workflow
SOURCES "src/test_o2RootMessageWorkflow.cxx"
Expand Down
62 changes: 62 additions & 0 deletions Framework/TestWorkflows/test/test_DetectMissingTimeframe.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "Framework/ConfigParamSpec.h"
#include "Framework/DataTakingContext.h"
#include "Framework/CompletionPolicyHelpers.h"
#include "Framework/DeviceSpec.h"
#include "Framework/RawDeviceService.h"
#include "Framework/ControlService.h"
#include "Framework/Configurable.h"
#include "Framework/RunningWorkflowInfo.h"
#include "Framework/RateLimiter.h"
#include <fairmq/Device.h>

#include <iostream>
#include <chrono>
#include <thread>
#include <vector>

using namespace o2::framework;

#include "Framework/runDataProcessing.h"

// This is how you can define your processing in a declarative way
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
{
DataProcessorSpec a{
.name = "A",
.outputs = {OutputSpec{{"a1"}, "TST", "A1"},
OutputSpec{{"a2"}, "TST", "A2"}},
.algorithm = AlgorithmSpec{adaptStateless(
[](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context, ProcessingContext& pcx) {
outputs.make<int>(OutputRef{"a1"}, 1);
static int i = 0;
outputs.make<int>(OutputRef{"a1"}, 1);
if (i++ % 2 == 0) {
outputs.make<int>(OutputRef{"a2"}, 1);
}
})},
};
DataProcessorSpec d{
.name = "D",
.inputs = {InputSpec{"a1", "TST", "A1"},
InputSpec{"a2", "TST", "A2"}},
.algorithm = AlgorithmSpec{adaptStateless(
[](InputRecord& inputs) {
auto ref = inputs.get("a1");
auto header = o2::header::get<const DataProcessingHeader*>(ref.header);
LOG(info) << "Start time: " << header->startTime;
})},
};

return workflow::concat(WorkflowSpec{a},
WorkflowSpec{d});
}

0 comments on commit c4ab495

Please sign in to comment.