Skip to content

Commit

Permalink
(QC-150) Allow tasks to run without DS (#127)
Browse files Browse the repository at this point in the history
- Config file : introduction of "dataSource" whose "type" can be either "direct" or "dataSamplingPolicy".
- No Dispatcher if direct connection from task to producer.
- Introduce a new config file to demonstrate direct connection.
- Switching between data-sampling and no data sampling is done in qcBasic with the argument --no-data-sampling
- Documentation
  • Loading branch information
Barthelemy authored Apr 5, 2019
1 parent 34744a7 commit 6fc581e
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 23 deletions.
1 change: 1 addition & 0 deletions Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ install(
install(
FILES
basic.json
basic-no-sampling.json
advanced.json
readout.json
readoutForDataDump.json
Expand Down
10 changes: 8 additions & 2 deletions Framework/advanced.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
"moduleName": "QcSkeleton",
"cycleDurationSeconds": "10",
"maxNumberCycles": "-1",
"dataSamplingPolicy": "tst2",
"dataSource": {
"type": "dataSamplingPolicy",
"name": "tst2"
},
"location": "local",
"machines": [
"o2flptst1",
Expand All @@ -34,7 +37,10 @@
"moduleName": "QcSkeleton",
"cycleDurationSeconds": "10",
"maxNumberCycles": "-1",
"dataSamplingPolicy": "tst1",
"dataSource": {
"type": "dataSamplingPolicy",
"name": "tst1"
},
"location": "remote"
}
}
Expand Down
40 changes: 40 additions & 0 deletions Framework/basic-no-sampling.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"qc": {
"config": {
"database": {
"implementation": "CCDB",
"host": "ccdb-test.cern.ch:8080",
"username": "not_applicable",
"password": "not_applicable",
"name": "not_applicable"
},
"Activity": {
"number": "42",
"type": "2"
}
},
"tasks": {
"QcTask": {
"active": "true",
"className": "o2::quality_control_modules::skeleton::SkeletonTask",
"moduleName": "QcSkeleton",
"cycleDurationSeconds": "10",
"maxNumberCycles": "-1",
"dataSource": {
"type": "direct",
"binding": "its-rawdata",
"dataOrigin": "ITS",
"dataDescription": "RAWDATA",
"subSpec": "0"
},
"taskParameters": {
"nothing": "rien"
},
"location": "remote"
}
}
},
"dataSamplingPolicies": [

]
}
6 changes: 5 additions & 1 deletion Framework/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
"moduleName": "QcSkeleton",
"cycleDurationSeconds": "10",
"maxNumberCycles": "-1",
"dataSamplingPolicy": "its-raw",
"dataSource_comment": "The other type of dataSource is \"direct\", see basic-no-sampling.json.",
"dataSource": {
"type": "dataSamplingPolicy",
"name": "its-raw"
},
"taskParameters": {
"nothing": "rien"
},
Expand Down
15 changes: 12 additions & 3 deletions Framework/example-default.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,31 @@
"moduleName": "QcExample",
"cycleDurationSeconds": "10",
"maxNumberCycles": "-1",
"dataSamplingPolicy": "ex1",
"dataSource": {
"type": "dataSamplingPolicy",
"name": "ex1"
},
"location": "remote"
},
"daqTask": {
"className": "o2::quality_control_modules::daq::DaqTask",
"moduleName": "QcDaq",
"maxNumberCycles": "-1",
"cycleDurationSeconds": "10",
"dataSamplingPolicy": "mftclusters",
"dataSource": {
"type": "dataSamplingPolicy",
"name": "mftclusters"
},
"location": "remote"
},
"benchmarkTask_0": {
"className": "o2::quality_control_modules::example::BenchmarkTask",
"moduleName": "QcExample",
"cycleDurationSeconds": "1",
"dataSamplingPolicy": "ex1",
"dataSource": {
"type": "dataSamplingPolicy",
"name": "ex1"
},
"location": "local",
"machines": [
"o2flp1",
Expand Down
2 changes: 1 addition & 1 deletion Framework/include/QualityControl/TaskRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TaskRunner
/// \param taskName - name of the task, which exists in tasks list in the configuration file
/// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://")
/// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies
TaskRunner(std::string taskName, std::string configurationSource, size_t id = 0);
TaskRunner(const std::string& taskName, const std::string& configurationSource, size_t id = 0);
~TaskRunner();

/// \brief To be invoked during initialization of Data Processor
Expand Down
5 changes: 4 additions & 1 deletion Framework/readout.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
"moduleName": "QcSkeleton",
"cycleDurationSeconds": "10",
"maxNumberCycles": "-1",
"dataSamplingPolicy": "readout",
"dataSource": {
"type": "dataSamplingPolicy",
"name": "readout"
},
"location": "remote"
}
}
Expand Down
48 changes: 36 additions & 12 deletions Framework/src/TaskRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "Framework/RawDeviceService.h"
#include "Framework/DataSampling.h"
#include "Framework/CallbackService.h"
#include "Framework/DataSamplingPolicy.h"
#include "Monitoring/MonitoringFactory.h"
#include "QualityControl/QcInfoLogger.h"
#include "QualityControl/TaskFactory.h"
Expand All @@ -41,25 +42,23 @@ using namespace o2::configuration;
using namespace o2::monitoring;
using namespace std::chrono;

TaskRunner::TaskRunner(std::string taskName, std::string configurationSource, size_t id)
TaskRunner::TaskRunner(const std::string& taskName, const std::string& configurationSource, size_t id)
: mTaskName(taskName),
mMonitorObjectsSpec({ "mo" }, createTaskDataOrigin(), createTaskDataDescription(taskName), id),
mTask(nullptr),
mNumberBlocks(0),
mTotalNumberObjectsPublished(0),
mResetAfterPublish(false),
mLastNumberObjects(0),
mCycleOn(false),
mCycleNumber(0),
mMonitorObjectsSpec({"mo"}, createTaskDataOrigin(), createTaskDataDescription(taskName), id),
mResetAfterPublish(false),
mTask(nullptr)
mTotalNumberObjectsPublished(0)
{
// setup configuration
mConfigFile = ConfigurationFactory::getConfiguration(configurationSource);
populateConfig(mTaskName);
}

TaskRunner::~TaskRunner()
{
}
TaskRunner::~TaskRunner() = default;

void TaskRunner::initCallback(InitContext& iCtx)
{
Expand Down Expand Up @@ -182,11 +181,36 @@ void TaskRunner::populateConfig(std::string taskName)
mTaskConfig.cycleDurationSeconds = taskConfigTree->second.get<int>("cycleDurationSeconds", 10);
mTaskConfig.maxNumberCycles = taskConfigTree->second.get<int>("maxNumberCycles", -1);

std::string policiesFilePath = mConfigFile->get<std::string>("dataSamplingPolicyFile", "");
if (policiesFilePath.empty()) {
mInputSpecs = framework::DataSampling::InputSpecsForPolicy(mConfigFile.get(), taskConfigTree->second.get<std::string>("dataSamplingPolicy"));
auto policiesFilePath = mConfigFile->get<std::string>("dataSamplingPolicyFile", "");
ConfigurationInterface* config = policiesFilePath.empty() ? mConfigFile.get() : ConfigurationFactory::getConfiguration(policiesFilePath).get();
auto policiesTree = config->getRecursive("dataSamplingPolicies");
auto dataSourceTree = taskConfigTree->second.get_child("dataSource");
std::string type = dataSourceTree.get<std::string>("type");

if (type == "dataSamplingPolicy") {
auto policyName = dataSourceTree.get<std::string>("name");
LOG(INFO) << "policyName : " << policyName;
mInputSpecs = framework::DataSampling::InputSpecsForPolicy(config, policyName);
} else if (type == "direct") {

auto subSpecString = dataSourceTree.get<std::string>("subSpec");
auto subSpec = std::strtoull(subSpecString.c_str(), nullptr, 10);

header::DataOrigin origin;
header::DataDescription description;
origin.runtimeInit(dataSourceTree.get<std::string>("dataOrigin").c_str());
description.runtimeInit(dataSourceTree.get<std::string>("dataDescription").c_str());

mInputSpecs.push_back(
InputSpec{
dataSourceTree.get<std::string>("binding"),
origin,
description,
subSpec });

} else {
mInputSpecs = framework::DataSampling::InputSpecsForPolicy(policiesFilePath, taskConfigTree->second.get<std::string>("dataSamplingPolicy"));
std::string message = std::string("Configuration error : dataSource type unknown : ") + type; // TODO pass this message to the exception
BOOST_THROW_EXCEPTION(AliceO2::Common::FatalException() << AliceO2::Common::errinfo_details(message));
}

} catch (...) { // catch already here the configuration exception and print it
Expand Down
13 changes: 11 additions & 2 deletions Framework/src/runBasic.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include "Framework/DataSampling.h"
using namespace o2::framework;

void customize(std::vector<CompletionPolicy>& policies)
{
DataSampling::CustomizeInfrastructure(policies);
Expand All @@ -43,6 +44,11 @@ void customize(std::vector<ChannelConfigurationPolicy>& policies)
{
DataSampling::CustomizeInfrastructure(policies);
}
void customize(std::vector<ConfigParamSpec>& workflowOptions)
{
workflowOptions.push_back(
ConfigParamSpec{ "no-data-sampling", VariantType::Bool, false, { "Skips data sampling, connects directly the task to the producer." } });
}

#include <FairLogger.h>
#include <TH1F.h>
Expand All @@ -59,9 +65,11 @@ using namespace o2::framework;
using namespace o2::quality_control::checker;
using namespace std::chrono;

WorkflowSpec defineDataProcessing(ConfigContext const&)
WorkflowSpec defineDataProcessing(const ConfigContext& config)
{
WorkflowSpec specs;
bool noDS = config.options().get<bool>("no-data-sampling");

// The producer to generate some data in the workflow
DataProcessorSpec producer{
"producer",
Expand All @@ -87,7 +95,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)

specs.push_back(producer);

const std::string qcConfigurationSource = std::string("json://") + getenv("QUALITYCONTROL_ROOT") + "/etc/basic.json";
std::string filename = !noDS ? "basic.json" : "basic-no-sampling.json";
const std::string qcConfigurationSource = std::string("json://") + getenv("QUALITYCONTROL_ROOT") + "/etc/" + filename;
LOG(INFO) << "Using config file '" << qcConfigurationSource << "'";

// Generation of Data Sampling infrastructure
Expand Down
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ Data Sampling is used by Quality Control to feed the tasks with data. Below we p
"tasks": {
"QcTask": {
...
"dataSamplingPolicy": "its-raw",
"dataSource": {
"type": "dataSamplingPolicy",
"name": "its-raw"
},
...
}
}
Expand Down Expand Up @@ -260,6 +263,35 @@ Data Sampling is used by Quality Control to feed the tasks with data. Below we p

An example of using the data sampling in a DPL workflow is visible in [runAdvanced.cxx](https://github.com/AliceO2Group/QualityControl/blob/master/Framework/runAdvanced.cxx).

### Bypassing the Data Sampling

In case one needs to sample at a very high rate, or even monitor 100% of the data, the Data Sampling can be omitted altogether. As a result the task is connected directly to the the Device producing the data to be monitored. To do so, change the _dataSource's_ type in the config file from `dataSamplingPolicy` to `direct`. In addition, add the information about the type of data that is expected (dataOrigin, binding, etc...) and remove the dataSamplingPolicies :

```json
{
"qc": {
...
"tasks": {
"QcTask": {
...
"dataSource": {
"type": "direct",
"binding": "its-rawdata",
"dataOrigin": "ITS",
"dataDescription": "RAWDATA",
"subSpec": "0"
},
...
}
}
},
"dataSamplingPolicies": [
]
}
```

The file `basic-no-sampling.json` is provided as an example. To test it, you can run `qcRunBasic` with the option `--no-data-sampling` (it makes it use this config file instead of `basic.json`).

## Code Organization

The repository QualityControl contains the _Framework_ and the _Modules_ in the respectively named directories.
Expand Down

0 comments on commit 6fc581e

Please sign in to comment.