From 098dd2905a928d135d5c0e24481bc19609580f75 Mon Sep 17 00:00:00 2001 From: Barthelemy Date: Wed, 16 Oct 2024 08:43:04 +0200 Subject: [PATCH] [QC-1206] CTP Scalers Code adapted to O2/CTP code. Does not work though. Connection to CCDB times out. --- Framework/CMakeLists.txt | 1 + .../include/QualityControl/AggregatorConfig.h | 1 + Framework/include/QualityControl/Check.h | 5 - .../include/QualityControl/CheckConfig.h | 1 + .../include/QualityControl/CheckInterface.h | 8 -- .../QualityControl/PostProcessingConfig.h | 1 + .../include/QualityControl/TaskRunnerConfig.h | 1 + .../QualityControl/UserCodeInterface.h | 25 ++++- Framework/src/Aggregator.cxx | 4 +- Framework/src/Check.cxx | 6 +- Framework/src/CheckRunner.cxx | 1 - Framework/src/PostProcessingFactory.cxx | 1 + Framework/src/TaskRunner.cxx | 2 + Framework/src/TaskRunnerFactory.cxx | 3 +- Framework/src/UserCodeInterface.cxx | 94 ++++++++++++++++++- Framework/test/testCheckWorkflow.cxx | 4 +- doc/Advanced.md | 3 + 17 files changed, 139 insertions(+), 22 deletions(-) diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 104115d9b9..a34c66dedc 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -155,6 +155,7 @@ target_link_libraries(O2QualityControl O2::DataFormatsQualityControl O2::DetectorsBase O2::GlobalTracking + O2::DataFormatsCTP O2QualityControlKafkaProtos ${RDKAFKA_LIB} PRIVATE Boost::system diff --git a/Framework/include/QualityControl/AggregatorConfig.h b/Framework/include/QualityControl/AggregatorConfig.h index 2ec3c56b66..5f447e8a64 100644 --- a/Framework/include/QualityControl/AggregatorConfig.h +++ b/Framework/include/QualityControl/AggregatorConfig.h @@ -43,6 +43,7 @@ struct AggregatorConfig { framework::OutputSpec qoSpec{ "XXX", "INVALID" }; std::vector sources; std::string conditionUrl{}; + std::unordered_map database; }; } // namespace o2::quality_control::checker diff --git a/Framework/include/QualityControl/Check.h b/Framework/include/QualityControl/Check.h index 2d9b969c29..147b45ccd5 100644 --- a/Framework/include/QualityControl/Check.h +++ b/Framework/include/QualityControl/Check.h @@ -94,11 +94,6 @@ class Check static CheckConfig extractConfig(const core::CommonSpec&, const CheckSpec&); static framework::OutputSpec createOutputSpec(const std::string& detector, const std::string& checkName); - void setDatabase(std::shared_ptr database) - { - mCheckInterface->setDatabase(database); - } - private: void beautify(std::map>& moMap, const core::Quality& quality); diff --git a/Framework/include/QualityControl/CheckConfig.h b/Framework/include/QualityControl/CheckConfig.h index 7524f462d4..b91d184996 100644 --- a/Framework/include/QualityControl/CheckConfig.h +++ b/Framework/include/QualityControl/CheckConfig.h @@ -41,6 +41,7 @@ struct CheckConfig { framework::Inputs inputSpecs{}; framework::OutputSpec qoSpec{ "XXX", "INVALID" }; std::string conditionUrl{}; + std::unordered_map database; }; } // namespace o2::quality_control::checker diff --git a/Framework/include/QualityControl/CheckInterface.h b/Framework/include/QualityControl/CheckInterface.h index b1bb206167..ce19d1ff97 100644 --- a/Framework/include/QualityControl/CheckInterface.h +++ b/Framework/include/QualityControl/CheckInterface.h @@ -83,11 +83,6 @@ class CheckInterface : public core::UserCodeInterface virtual void startOfActivity(const core::Activity& activity); // not fully abstract because we don't want to change all the existing subclasses virtual void endOfActivity(const core::Activity& activity); // not fully abstract because we don't want to change all the existing subclasses - void setDatabase(std::shared_ptr database) - { - mDatabase = database; - } - protected: /// \brief Called each time mCustomParameters is updated. virtual void configure() override; @@ -100,9 +95,6 @@ class CheckInterface : public core::UserCodeInterface /// \return std::shared_ptr retrieveReference(std::string path, Activity referenceActivity); - private: - std::shared_ptr mDatabase; - ClassDef(CheckInterface, 6) }; diff --git a/Framework/include/QualityControl/PostProcessingConfig.h b/Framework/include/QualityControl/PostProcessingConfig.h index b1532fa68b..79917b004a 100644 --- a/Framework/include/QualityControl/PostProcessingConfig.h +++ b/Framework/include/QualityControl/PostProcessingConfig.h @@ -51,6 +51,7 @@ struct PostProcessingConfig { bool matchAnyRunNumber = false; bool critical; core::CustomParameters customParameters; + std::unordered_map database; }; } // namespace o2::quality_control::postprocessing diff --git a/Framework/include/QualityControl/TaskRunnerConfig.h b/Framework/include/QualityControl/TaskRunnerConfig.h index 3411cefb3a..ee8164d38b 100644 --- a/Framework/include/QualityControl/TaskRunnerConfig.h +++ b/Framework/include/QualityControl/TaskRunnerConfig.h @@ -66,6 +66,7 @@ struct TaskRunnerConfig { std::shared_ptr globalTrackingDataRequest; std::vector movingWindows; bool disableLastCycle = false; + std::unordered_map database; }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/UserCodeInterface.h b/Framework/include/QualityControl/UserCodeInterface.h index 42dacb5c04..3e130dbb6f 100644 --- a/Framework/include/QualityControl/UserCodeInterface.h +++ b/Framework/include/QualityControl/UserCodeInterface.h @@ -23,6 +23,11 @@ #include "QualityControl/ConditionAccess.h" #include "QualityControl/CustomParameters.h" +#include "QualityControl/DatabaseInterface.h" + +namespace o2::ctp { + class CTPRateFetcher; +} namespace o2::quality_control::core { @@ -49,11 +54,29 @@ class UserCodeInterface : public ConditionAccess const std::string& getName() const; void setName(const std::string& name); +// void setDatabase(std::shared_ptr database); + void setDatabase(std::unordered_map dbConfig); + + private: + /// \brief Just the callback for the thread for the scalers retrieval. + void regularCallback(int intervalMinutes); + /// \brief Retrieve fresh scalers from the QCDB (with cache) + void getScalers(); + std::shared_ptr mCtpFetcher; + std::chrono::steady_clock::time_point mScalersLastUpdate; + bool mScalersEnabled = false; + protected: + /// \brief Call it to enable the retrieval of CTP scalers and use `getScalers` later + void enableCtpScalers(size_t runNumber, std::string ccdbUrl); + /// \brief Get the scalers's value for the given source + double getScalersValue(std::string sourceName, size_t runNumber); + CustomParameters mCustomParameters; std::string mName; + std::shared_ptr mDatabase; - ClassDef(UserCodeInterface, 3) + ClassDef(UserCodeInterface, 5) }; } // namespace o2::quality_control::core diff --git a/Framework/src/Aggregator.cxx b/Framework/src/Aggregator.cxx index c5269c00cb..e3057d556e 100644 --- a/Framework/src/Aggregator.cxx +++ b/Framework/src/Aggregator.cxx @@ -52,6 +52,7 @@ void Aggregator::init() mAggregatorInterface->setName(mAggregatorConfig.name); mAggregatorInterface->setCustomParameters(mAggregatorConfig.customParameters); mAggregatorInterface->setCcdbUrl(mAggregatorConfig.conditionUrl); + mAggregatorInterface->setDatabase(mAggregatorConfig.database); mAggregatorInterface->configure(); } catch (...) { std::string diagnostic = boost::current_exception_diagnostic_information(); @@ -228,7 +229,8 @@ AggregatorConfig Aggregator::extractConfig(const core::CommonSpec& commonSpec, c std::move(inputs), createOutputSpec(aggregatorSpec.detectorName, aggregatorSpec.aggregatorName), sources, - commonSpec.conditionDBUrl + commonSpec.conditionDBUrl, + commonSpec.database }; } diff --git a/Framework/src/Check.cxx b/Framework/src/Check.cxx index 9b232deae1..e4f6d368d7 100644 --- a/Framework/src/Check.cxx +++ b/Framework/src/Check.cxx @@ -28,6 +28,7 @@ #include "QualityControl/QcInfoLogger.h" #include "QualityControl/Quality.h" #include "QualityControl/HashDataDescription.h" +#include "QualityControl/runnerUtils.h" #include @@ -70,6 +71,8 @@ void Check::init() try { mCheckInterface = root_class_factory::create(mCheckConfig.moduleName, mCheckConfig.className); mCheckInterface->setName(mCheckConfig.name); + cout << "database set : " << mCheckConfig.database["host"] << endl; + mCheckInterface->setDatabase(mCheckConfig.database); mCheckInterface->setCustomParameters(mCheckConfig.customParameters); mCheckInterface->setCcdbUrl(mCheckConfig.conditionUrl); } catch (...) { @@ -261,7 +264,8 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec& allowBeautify, std::move(inputs), createOutputSpec(checkSpec.detectorName, checkSpec.checkName), - commonSpec.conditionDBUrl + commonSpec.conditionDBUrl, + commonSpec.database }; } diff --git a/Framework/src/CheckRunner.cxx b/Framework/src/CheckRunner.cxx index ac96a12a65..b677294e5e 100644 --- a/Framework/src/CheckRunner.cxx +++ b/Framework/src/CheckRunner.cxx @@ -190,7 +190,6 @@ void CheckRunner::init(framework::InitContext& iCtx) updatePolicyManager.reset(); for (auto& [checkName, check] : mChecks) { check.init(); - check.setDatabase(mDatabase); updatePolicyManager.addPolicy(check.getName(), check.getUpdatePolicyType(), check.getObjectsNames(), check.getAllObjectsOption(), false); } } catch (...) { diff --git a/Framework/src/PostProcessingFactory.cxx b/Framework/src/PostProcessingFactory.cxx index 600e6cc10a..dbe1f376fd 100644 --- a/Framework/src/PostProcessingFactory.cxx +++ b/Framework/src/PostProcessingFactory.cxx @@ -29,6 +29,7 @@ PostProcessingInterface* PostProcessingFactory::create(const PostProcessingConfi { auto* result = root_class_factory::create(config.moduleName, config.className); result->setCustomParameters(config.customParameters); + result->setDatabase(config.database); return result; } diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 9f1a418104..813a43482a 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -49,6 +49,7 @@ #include "QualityControl/ActivityHelpers.h" #include "QualityControl/WorkflowType.h" #include "QualityControl/HashDataDescription.h" +#include "QualityControl/runnerUtils.h" #include #include @@ -122,6 +123,7 @@ void TaskRunner::init(InitContext& iCtx) mTask.reset(TaskFactory::create(mTaskConfig, mObjectsManager)); mTask->setMonitoring(mCollector); mTask->setGlobalTrackingDataRequest(mTaskConfig.globalTrackingDataRequest); + mTask->setDatabase(mTaskConfig.database); // load config params if (!ConfigParamGlo::keyValues.empty()) { diff --git a/Framework/src/TaskRunnerFactory.cxx b/Framework/src/TaskRunnerFactory.cxx index b562e0ab39..10bf7107da 100644 --- a/Framework/src/TaskRunnerFactory.cxx +++ b/Framework/src/TaskRunnerFactory.cxx @@ -192,7 +192,8 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig grpGeomRequest, globalTrackingDataRequest, taskSpec.movingWindows, - taskSpec.disableLastCycle + taskSpec.disableLastCycle, + globalConfig.database }; } diff --git a/Framework/src/UserCodeInterface.cxx b/Framework/src/UserCodeInterface.cxx index 977a13f53c..d488bf27c1 100644 --- a/Framework/src/UserCodeInterface.cxx +++ b/Framework/src/UserCodeInterface.cxx @@ -15,8 +15,13 @@ /// #include "QualityControl/UserCodeInterface.h" +#include +#include +#include "QualityControl/QcInfoLogger.h" +#include "QualityControl/DatabaseFactory.h" using namespace o2::ccdb; +using namespace std; namespace o2::quality_control::core { @@ -27,8 +32,93 @@ void UserCodeInterface::setCustomParameters(const CustomParameters& parameters) configure(); } -const std::string& UserCodeInterface::getName() const { return mName; } +const std::string& UserCodeInterface::getName() const { + return mName; +} + +void UserCodeInterface::setName(const std::string& name) { + mName = name; +} + +void UserCodeInterface::enableCtpScalers(size_t runNumber, std::string ccdbUrl) +{ + // TODO bail if we are in async + ILOG(Debug, Devel) << "Enabling CTP scalers" << ENDM; + mScalersEnabled = true; + auto& ccdbManager = o2::ccdb::BasicCCDBManager::instance(); + ccdbManager.setURL(ccdbUrl); + mCtpFetcher->setupRun(runNumber, &ccdbManager, getCurrentTimestamp(), false); + + mScalersLastUpdate = std::chrono::steady_clock::time_point::min(); + getScalers(); // initial value + ILOG(Debug, Devel) << "Enabled CTP scalers" << ENDM; +} + +void UserCodeInterface::getScalers() +{ + if(!mScalersEnabled) { + ILOG(Error, Ops) << "CTP scalers not enabled, impossible to get them." << ENDM; + return; // TODO should we throw ? probably yes + } + ILOG(Debug, Devel) << "Updating scalers." << ENDM; + + if(! mDatabase) { + ILOG(Error, Devel) << "Database not set ! Cannot update scalers." << ENDM; + mScalersEnabled = false; + + return; + // todo handle the case when database is not set + } + + auto now = std::chrono::steady_clock::now(); + auto minutesSinceLast = std::chrono::duration_cast(now - mScalersLastUpdate); -void UserCodeInterface::setName(const std::string& name) { mName = name; } + // TODO get the interval from config + if (minutesSinceLast.count() < 5) { + ILOG(Debug, Devel) << "getScalers was called less than 5 minutes ago, use the cached value" << ENDM; + return; + } + + std::map meta; + void* rawResult = mDatabase->retrieveAny(typeid(o2::ctp::CTPRunScalers), "qc/CTP/Scalers", meta); // TODO make sure we get the last one. + o2::ctp::CTPRunScalers* ctpScalers = static_cast(rawResult); + mCtpFetcher->updateScalers(*ctpScalers); + mScalersLastUpdate = now; + ILOG(Debug, Devel) << "Scalers updated." << ENDM; +} + +double UserCodeInterface::getScalersValue(std::string sourceName, size_t runNumber) +{ + if(!mScalersEnabled) { + ILOG(Error, Ops) << "CTP scalers not enabled, impossible to get the value." << ENDM; + return 0; // TODO should we throw ? probably yes + } + getScalers(); // from QCDB + auto& ccdbManager = o2::ccdb::BasicCCDBManager::instance(); + auto result = mCtpFetcher->fetchNoPuCorr(&ccdbManager, getCurrentTimestamp(), runNumber, sourceName); + ILOG(Debug, Devel) << "Returning scaler value : " << result << ENDM; + return result; +} + +void UserCodeInterface::setDatabase(std::unordered_map dbConfig) +{ + // TODO one could really argue that it would be easier to have a singleton for the QCDB... because here we will build and save a database object + + cout << "dbConfig.count(\"implementation\") " << dbConfig.count("implementation") << " dbConfig.count(\"host\") : " << dbConfig.count("host") << endl; + if(dbConfig.count("implementation") == 0 || dbConfig.count("host") == 0) { + ILOG(Error, Devel) << "dbConfig is incomplete, we don't build the user code database instance " << ENDM; + return; + // todo + } + + for (auto pair : dbConfig) { + ILOG(Info,Devel) << pair.first << " : " << pair.second << ENDM; + } + + // for every user code we instantiate. + mDatabase = repository::DatabaseFactory::create(dbConfig.at("implementation")); + mDatabase->connect(dbConfig); + ILOG(Info, Devel) << "Database that is going to be used > Implementation : " << dbConfig.at("implementation") << " / Host : " << dbConfig.at("host") << ENDM; +} } // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/test/testCheckWorkflow.cxx b/Framework/test/testCheckWorkflow.cxx index 03ea526ca1..8a769cbc27 100644 --- a/Framework/test/testCheckWorkflow.cxx +++ b/Framework/test/testCheckWorkflow.cxx @@ -62,7 +62,7 @@ using namespace o2::configuration; * It is expected to terminate as soon as all task publish for the first time. */ -class Receiver : public framework::Task +class Receiver : public o2::framework::Task { public: Receiver(std::string configurationSource) @@ -78,7 +78,7 @@ class Receiver : public framework::Task ~Receiver() override{}; /// \brief Receiver process callback - void run(framework::ProcessingContext& pctx) override + void run(o2::framework::ProcessingContext& pctx) override { std::vector namesToErase; diff --git a/doc/Advanced.md b/doc/Advanced.md index 3f4690c4dc..9849992397 100644 --- a/doc/Advanced.md +++ b/doc/Advanced.md @@ -1727,7 +1727,10 @@ In the example above, the quality goes to bad when there are 3 cycles in a row w In consul go to `o2/runtime/aliecs/defaults` and modify the file corresponding to the detector: [det]_qc_shm_segment_size +## CTP Scalers +Get a certificate for development : https://alice-doc.github.io/alice-analysis-tutorial/start/cert.html#test-your-certificate +JAliEn-ROOT ---