From 6fc581e64dc15c0e632e5dbedcb1639202880986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Barth=C3=A9l=C3=A9my=20von=20Haller?= Date: Fri, 5 Apr 2019 11:21:38 +0100 Subject: [PATCH] (QC-150) Allow tasks to run without DS (#127) - Config file : introduction of "dataSource" whose "type" can be either "direct" or "dataSamplingPolicy". - No Dispatcher if direct connection from task to producer. - Introduce a new config file to demonstrate direct connection. - Switching between data-sampling and no data sampling is done in qcBasic with the argument --no-data-sampling - Documentation --- Framework/CMakeLists.txt | 1 + Framework/advanced.json | 10 +++- Framework/basic-no-sampling.json | 40 ++++++++++++++++ Framework/basic.json | 6 ++- Framework/example-default.json | 15 ++++-- Framework/include/QualityControl/TaskRunner.h | 2 +- Framework/readout.json | 5 +- Framework/src/TaskRunner.cxx | 48 ++++++++++++++----- Framework/src/runBasic.cxx | 13 ++++- README.md | 34 ++++++++++++- 10 files changed, 151 insertions(+), 23 deletions(-) create mode 100644 Framework/basic-no-sampling.json diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index b07ac1e768..5d43759b70 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -284,6 +284,7 @@ install( install( FILES basic.json + basic-no-sampling.json advanced.json readout.json readoutForDataDump.json diff --git a/Framework/advanced.json b/Framework/advanced.json index 1834836038..283656d5de 100644 --- a/Framework/advanced.json +++ b/Framework/advanced.json @@ -20,7 +20,10 @@ "moduleName": "QcSkeleton", "cycleDurationSeconds": "10", "maxNumberCycles": "-1", - "dataSamplingPolicy": "tst2", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "tst2" + }, "location": "local", "machines": [ "o2flptst1", @@ -34,7 +37,10 @@ "moduleName": "QcSkeleton", "cycleDurationSeconds": "10", "maxNumberCycles": "-1", - "dataSamplingPolicy": "tst1", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "tst1" + }, "location": "remote" } } diff --git a/Framework/basic-no-sampling.json b/Framework/basic-no-sampling.json new file mode 100644 index 0000000000..1147fb581d --- /dev/null +++ b/Framework/basic-no-sampling.json @@ -0,0 +1,40 @@ +{ + "qc": { + "config": { + "database": { + "implementation": "CCDB", + "host": "ccdb-test.cern.ch:8080", + "username": "not_applicable", + "password": "not_applicable", + "name": "not_applicable" + }, + "Activity": { + "number": "42", + "type": "2" + } + }, + "tasks": { + "QcTask": { + "active": "true", + "className": "o2::quality_control_modules::skeleton::SkeletonTask", + "moduleName": "QcSkeleton", + "cycleDurationSeconds": "10", + "maxNumberCycles": "-1", + "dataSource": { + "type": "direct", + "binding": "its-rawdata", + "dataOrigin": "ITS", + "dataDescription": "RAWDATA", + "subSpec": "0" + }, + "taskParameters": { + "nothing": "rien" + }, + "location": "remote" + } + } + }, + "dataSamplingPolicies": [ + + ] +} \ No newline at end of file diff --git a/Framework/basic.json b/Framework/basic.json index 055e9d7bd1..38689eb5df 100644 --- a/Framework/basic.json +++ b/Framework/basic.json @@ -20,7 +20,11 @@ "moduleName": "QcSkeleton", "cycleDurationSeconds": "10", "maxNumberCycles": "-1", - "dataSamplingPolicy": "its-raw", + "dataSource_comment": "The other type of dataSource is \"direct\", see basic-no-sampling.json.", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "its-raw" + }, "taskParameters": { "nothing": "rien" }, diff --git a/Framework/example-default.json b/Framework/example-default.json index 8043ea4e67..22ea380c87 100644 --- a/Framework/example-default.json +++ b/Framework/example-default.json @@ -19,7 +19,10 @@ "moduleName": "QcExample", "cycleDurationSeconds": "10", "maxNumberCycles": "-1", - "dataSamplingPolicy": "ex1", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "ex1" + }, "location": "remote" }, "daqTask": { @@ -27,14 +30,20 @@ "moduleName": "QcDaq", "maxNumberCycles": "-1", "cycleDurationSeconds": "10", - "dataSamplingPolicy": "mftclusters", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "mftclusters" + }, "location": "remote" }, "benchmarkTask_0": { "className": "o2::quality_control_modules::example::BenchmarkTask", "moduleName": "QcExample", "cycleDurationSeconds": "1", - "dataSamplingPolicy": "ex1", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "ex1" + }, "location": "local", "machines": [ "o2flp1", diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index 3454c9bea2..64572dac50 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -68,7 +68,7 @@ class TaskRunner /// \param taskName - name of the task, which exists in tasks list in the configuration file /// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://") /// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies - TaskRunner(std::string taskName, std::string configurationSource, size_t id = 0); + TaskRunner(const std::string& taskName, const std::string& configurationSource, size_t id = 0); ~TaskRunner(); /// \brief To be invoked during initialization of Data Processor diff --git a/Framework/readout.json b/Framework/readout.json index 0e24201c14..fafdc078ee 100644 --- a/Framework/readout.json +++ b/Framework/readout.json @@ -20,7 +20,10 @@ "moduleName": "QcSkeleton", "cycleDurationSeconds": "10", "maxNumberCycles": "-1", - "dataSamplingPolicy": "readout", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "readout" + }, "location": "remote" } } diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 0e84129e1d..88792ea257 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -25,6 +25,7 @@ #include "Framework/RawDeviceService.h" #include "Framework/DataSampling.h" #include "Framework/CallbackService.h" +#include "Framework/DataSamplingPolicy.h" #include "Monitoring/MonitoringFactory.h" #include "QualityControl/QcInfoLogger.h" #include "QualityControl/TaskFactory.h" @@ -41,25 +42,23 @@ using namespace o2::configuration; using namespace o2::monitoring; using namespace std::chrono; -TaskRunner::TaskRunner(std::string taskName, std::string configurationSource, size_t id) +TaskRunner::TaskRunner(const std::string& taskName, const std::string& configurationSource, size_t id) : mTaskName(taskName), + mMonitorObjectsSpec({ "mo" }, createTaskDataOrigin(), createTaskDataDescription(taskName), id), + mTask(nullptr), mNumberBlocks(0), - mTotalNumberObjectsPublished(0), + mResetAfterPublish(false), mLastNumberObjects(0), mCycleOn(false), mCycleNumber(0), - mMonitorObjectsSpec({"mo"}, createTaskDataOrigin(), createTaskDataDescription(taskName), id), - mResetAfterPublish(false), - mTask(nullptr) + mTotalNumberObjectsPublished(0) { // setup configuration mConfigFile = ConfigurationFactory::getConfiguration(configurationSource); populateConfig(mTaskName); } -TaskRunner::~TaskRunner() -{ -} +TaskRunner::~TaskRunner() = default; void TaskRunner::initCallback(InitContext& iCtx) { @@ -182,11 +181,36 @@ void TaskRunner::populateConfig(std::string taskName) mTaskConfig.cycleDurationSeconds = taskConfigTree->second.get("cycleDurationSeconds", 10); mTaskConfig.maxNumberCycles = taskConfigTree->second.get("maxNumberCycles", -1); - std::string policiesFilePath = mConfigFile->get("dataSamplingPolicyFile", ""); - if (policiesFilePath.empty()) { - mInputSpecs = framework::DataSampling::InputSpecsForPolicy(mConfigFile.get(), taskConfigTree->second.get("dataSamplingPolicy")); + auto policiesFilePath = mConfigFile->get("dataSamplingPolicyFile", ""); + ConfigurationInterface* config = policiesFilePath.empty() ? mConfigFile.get() : ConfigurationFactory::getConfiguration(policiesFilePath).get(); + auto policiesTree = config->getRecursive("dataSamplingPolicies"); + auto dataSourceTree = taskConfigTree->second.get_child("dataSource"); + std::string type = dataSourceTree.get("type"); + + if (type == "dataSamplingPolicy") { + auto policyName = dataSourceTree.get("name"); + LOG(INFO) << "policyName : " << policyName; + mInputSpecs = framework::DataSampling::InputSpecsForPolicy(config, policyName); + } else if (type == "direct") { + + auto subSpecString = dataSourceTree.get("subSpec"); + auto subSpec = std::strtoull(subSpecString.c_str(), nullptr, 10); + + header::DataOrigin origin; + header::DataDescription description; + origin.runtimeInit(dataSourceTree.get("dataOrigin").c_str()); + description.runtimeInit(dataSourceTree.get("dataDescription").c_str()); + + mInputSpecs.push_back( + InputSpec{ + dataSourceTree.get("binding"), + origin, + description, + subSpec }); + } else { - mInputSpecs = framework::DataSampling::InputSpecsForPolicy(policiesFilePath, taskConfigTree->second.get("dataSamplingPolicy")); + std::string message = std::string("Configuration error : dataSource type unknown : ") + type; // TODO pass this message to the exception + BOOST_THROW_EXCEPTION(AliceO2::Common::FatalException() << AliceO2::Common::errinfo_details(message)); } } catch (...) { // catch already here the configuration exception and print it diff --git a/Framework/src/runBasic.cxx b/Framework/src/runBasic.cxx index 1505974e91..f5da554fa3 100644 --- a/Framework/src/runBasic.cxx +++ b/Framework/src/runBasic.cxx @@ -35,6 +35,7 @@ #include "Framework/DataSampling.h" using namespace o2::framework; + void customize(std::vector& policies) { DataSampling::CustomizeInfrastructure(policies); @@ -43,6 +44,11 @@ void customize(std::vector& policies) { DataSampling::CustomizeInfrastructure(policies); } +void customize(std::vector& workflowOptions) +{ + workflowOptions.push_back( + ConfigParamSpec{ "no-data-sampling", VariantType::Bool, false, { "Skips data sampling, connects directly the task to the producer." } }); +} #include #include @@ -59,9 +65,11 @@ using namespace o2::framework; using namespace o2::quality_control::checker; using namespace std::chrono; -WorkflowSpec defineDataProcessing(ConfigContext const&) +WorkflowSpec defineDataProcessing(const ConfigContext& config) { WorkflowSpec specs; + bool noDS = config.options().get("no-data-sampling"); + // The producer to generate some data in the workflow DataProcessorSpec producer{ "producer", @@ -87,7 +95,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const&) specs.push_back(producer); - const std::string qcConfigurationSource = std::string("json://") + getenv("QUALITYCONTROL_ROOT") + "/etc/basic.json"; + std::string filename = !noDS ? "basic.json" : "basic-no-sampling.json"; + const std::string qcConfigurationSource = std::string("json://") + getenv("QUALITYCONTROL_ROOT") + "/etc/" + filename; LOG(INFO) << "Using config file '" << qcConfigurationSource << "'"; // Generation of Data Sampling infrastructure diff --git a/README.md b/README.md index d32af0798f..beb1685294 100644 --- a/README.md +++ b/README.md @@ -227,7 +227,10 @@ Data Sampling is used by Quality Control to feed the tasks with data. Below we p "tasks": { "QcTask": { ... - "dataSamplingPolicy": "its-raw", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "its-raw" + }, ... } } @@ -260,6 +263,35 @@ Data Sampling is used by Quality Control to feed the tasks with data. Below we p An example of using the data sampling in a DPL workflow is visible in [runAdvanced.cxx](https://github.com/AliceO2Group/QualityControl/blob/master/Framework/runAdvanced.cxx). +### Bypassing the Data Sampling + +In case one needs to sample at a very high rate, or even monitor 100% of the data, the Data Sampling can be omitted altogether. As a result the task is connected directly to the the Device producing the data to be monitored. To do so, change the _dataSource's_ type in the config file from `dataSamplingPolicy` to `direct`. In addition, add the information about the type of data that is expected (dataOrigin, binding, etc...) and remove the dataSamplingPolicies : + +```json +{ + "qc": { + ... + "tasks": { + "QcTask": { + ... + "dataSource": { + "type": "direct", + "binding": "its-rawdata", + "dataOrigin": "ITS", + "dataDescription": "RAWDATA", + "subSpec": "0" + }, + ... + } + } + }, + "dataSamplingPolicies": [ + ] +} +``` + +The file `basic-no-sampling.json` is provided as an example. To test it, you can run `qcRunBasic` with the option `--no-data-sampling` (it makes it use this config file instead of `basic.json`). + ## Code Organization The repository QualityControl contains the _Framework_ and the _Modules_ in the respectively named directories.