Skip to content

Commit

Permalink
[QC-1206] CTP Scalers
Browse files Browse the repository at this point in the history
Code adapted to O2/CTP code.
Does not work though. Connection to CCDB times out.
  • Loading branch information
Barthelemy committed Oct 16, 2024
1 parent b9e95b3 commit 098dd29
Show file tree
Hide file tree
Showing 17 changed files with 139 additions and 22 deletions.
1 change: 1 addition & 0 deletions Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ target_link_libraries(O2QualityControl
O2::DataFormatsQualityControl
O2::DetectorsBase
O2::GlobalTracking
O2::DataFormatsCTP
O2QualityControlKafkaProtos
${RDKAFKA_LIB}
PRIVATE Boost::system
Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/AggregatorConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct AggregatorConfig {
framework::OutputSpec qoSpec{ "XXX", "INVALID" };
std::vector<AggregatorSource> sources;
std::string conditionUrl{};
std::unordered_map<std::string, std::string> database;
};

} // namespace o2::quality_control::checker
Expand Down
5 changes: 0 additions & 5 deletions Framework/include/QualityControl/Check.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ class Check
static CheckConfig extractConfig(const core::CommonSpec&, const CheckSpec&);
static framework::OutputSpec createOutputSpec(const std::string& detector, const std::string& checkName);

void setDatabase(std::shared_ptr<o2::quality_control::repository::DatabaseInterface> database)
{
mCheckInterface->setDatabase(database);
}

private:
void beautify(std::map<std::string, std::shared_ptr<core::MonitorObject>>& moMap, const core::Quality& quality);

Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/CheckConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct CheckConfig {
framework::Inputs inputSpecs{};
framework::OutputSpec qoSpec{ "XXX", "INVALID" };
std::string conditionUrl{};
std::unordered_map<std::string, std::string> database;
};

} // namespace o2::quality_control::checker
Expand Down
8 changes: 0 additions & 8 deletions Framework/include/QualityControl/CheckInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,6 @@ class CheckInterface : public core::UserCodeInterface
virtual void startOfActivity(const core::Activity& activity); // not fully abstract because we don't want to change all the existing subclasses
virtual void endOfActivity(const core::Activity& activity); // not fully abstract because we don't want to change all the existing subclasses

void setDatabase(std::shared_ptr<o2::quality_control::repository::DatabaseInterface> database)
{
mDatabase = database;
}

protected:
/// \brief Called each time mCustomParameters is updated.
virtual void configure() override;
Expand All @@ -100,9 +95,6 @@ class CheckInterface : public core::UserCodeInterface
/// \return
std::shared_ptr<MonitorObject> retrieveReference(std::string path, Activity referenceActivity);

private:
std::shared_ptr<o2::quality_control::repository::DatabaseInterface> mDatabase;

ClassDef(CheckInterface, 6)
};

Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/PostProcessingConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct PostProcessingConfig {
bool matchAnyRunNumber = false;
bool critical;
core::CustomParameters customParameters;
std::unordered_map<std::string, std::string> database;
};

} // namespace o2::quality_control::postprocessing
Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/TaskRunnerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct TaskRunnerConfig {
std::shared_ptr<o2::globaltracking::DataRequest> globalTrackingDataRequest;
std::vector<std::string> movingWindows;
bool disableLastCycle = false;
std::unordered_map<std::string, std::string> database;
};

} // namespace o2::quality_control::core
Expand Down
25 changes: 24 additions & 1 deletion Framework/include/QualityControl/UserCodeInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@

#include "QualityControl/ConditionAccess.h"
#include "QualityControl/CustomParameters.h"
#include "QualityControl/DatabaseInterface.h"

namespace o2::ctp {
class CTPRateFetcher;
}

namespace o2::quality_control::core
{
Expand All @@ -49,11 +54,29 @@ class UserCodeInterface : public ConditionAccess
const std::string& getName() const;
void setName(const std::string& name);

// void setDatabase(std::shared_ptr<o2::quality_control::repository::DatabaseInterface> database);
void setDatabase(std::unordered_map<std::string, std::string> dbConfig);

private:
/// \brief Just the callback for the thread for the scalers retrieval.
void regularCallback(int intervalMinutes);
/// \brief Retrieve fresh scalers from the QCDB (with cache)
void getScalers();
std::shared_ptr<o2::ctp::CTPRateFetcher> mCtpFetcher;
std::chrono::steady_clock::time_point mScalersLastUpdate;
bool mScalersEnabled = false;

protected:
/// \brief Call it to enable the retrieval of CTP scalers and use `getScalers` later
void enableCtpScalers(size_t runNumber, std::string ccdbUrl);
/// \brief Get the scalers's value for the given source
double getScalersValue(std::string sourceName, size_t runNumber);

CustomParameters mCustomParameters;
std::string mName;
std::shared_ptr<o2::quality_control::repository::DatabaseInterface> mDatabase;

ClassDef(UserCodeInterface, 3)
ClassDef(UserCodeInterface, 5)
};

} // namespace o2::quality_control::core
Expand Down
4 changes: 3 additions & 1 deletion Framework/src/Aggregator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ void Aggregator::init()
mAggregatorInterface->setName(mAggregatorConfig.name);
mAggregatorInterface->setCustomParameters(mAggregatorConfig.customParameters);
mAggregatorInterface->setCcdbUrl(mAggregatorConfig.conditionUrl);
mAggregatorInterface->setDatabase(mAggregatorConfig.database);
mAggregatorInterface->configure();
} catch (...) {
std::string diagnostic = boost::current_exception_diagnostic_information();
Expand Down Expand Up @@ -228,7 +229,8 @@ AggregatorConfig Aggregator::extractConfig(const core::CommonSpec& commonSpec, c
std::move(inputs),
createOutputSpec(aggregatorSpec.detectorName, aggregatorSpec.aggregatorName),
sources,
commonSpec.conditionDBUrl
commonSpec.conditionDBUrl,
commonSpec.database
};
}

Expand Down
6 changes: 5 additions & 1 deletion Framework/src/Check.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "QualityControl/QcInfoLogger.h"
#include "QualityControl/Quality.h"
#include "QualityControl/HashDataDescription.h"
#include "QualityControl/runnerUtils.h"

#include <QualityControl/AggregatorRunner.h>

Expand Down Expand Up @@ -70,6 +71,8 @@ void Check::init()
try {
mCheckInterface = root_class_factory::create<CheckInterface>(mCheckConfig.moduleName, mCheckConfig.className);
mCheckInterface->setName(mCheckConfig.name);
cout << "database set : " << mCheckConfig.database["host"] << endl;
mCheckInterface->setDatabase(mCheckConfig.database);
mCheckInterface->setCustomParameters(mCheckConfig.customParameters);
mCheckInterface->setCcdbUrl(mCheckConfig.conditionUrl);
} catch (...) {
Expand Down Expand Up @@ -261,7 +264,8 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec&
allowBeautify,
std::move(inputs),
createOutputSpec(checkSpec.detectorName, checkSpec.checkName),
commonSpec.conditionDBUrl
commonSpec.conditionDBUrl,
commonSpec.database
};
}

Expand Down
1 change: 0 additions & 1 deletion Framework/src/CheckRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ void CheckRunner::init(framework::InitContext& iCtx)
updatePolicyManager.reset();
for (auto& [checkName, check] : mChecks) {
check.init();
check.setDatabase(mDatabase);
updatePolicyManager.addPolicy(check.getName(), check.getUpdatePolicyType(), check.getObjectsNames(), check.getAllObjectsOption(), false);
}
} catch (...) {
Expand Down
1 change: 1 addition & 0 deletions Framework/src/PostProcessingFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ PostProcessingInterface* PostProcessingFactory::create(const PostProcessingConfi
{
auto* result = root_class_factory::create<PostProcessingInterface>(config.moduleName, config.className);
result->setCustomParameters(config.customParameters);
result->setDatabase(config.database);
return result;
}

Expand Down
2 changes: 2 additions & 0 deletions Framework/src/TaskRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "QualityControl/ActivityHelpers.h"
#include "QualityControl/WorkflowType.h"
#include "QualityControl/HashDataDescription.h"
#include "QualityControl/runnerUtils.h"

#include <string>
#include <TFile.h>
Expand Down Expand Up @@ -122,6 +123,7 @@ void TaskRunner::init(InitContext& iCtx)
mTask.reset(TaskFactory::create(mTaskConfig, mObjectsManager));
mTask->setMonitoring(mCollector);
mTask->setGlobalTrackingDataRequest(mTaskConfig.globalTrackingDataRequest);
mTask->setDatabase(mTaskConfig.database);

// load config params
if (!ConfigParamGlo::keyValues.empty()) {
Expand Down
3 changes: 2 additions & 1 deletion Framework/src/TaskRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig
grpGeomRequest,
globalTrackingDataRequest,
taskSpec.movingWindows,
taskSpec.disableLastCycle
taskSpec.disableLastCycle,
globalConfig.database
};
}

Expand Down
94 changes: 92 additions & 2 deletions Framework/src/UserCodeInterface.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
///

#include "QualityControl/UserCodeInterface.h"
#include <DataFormatsCTP/CTPRateFetcher.h>
#include <thread>
#include "QualityControl/QcInfoLogger.h"
#include "QualityControl/DatabaseFactory.h"

using namespace o2::ccdb;
using namespace std;

namespace o2::quality_control::core
{
Expand All @@ -27,8 +32,93 @@ void UserCodeInterface::setCustomParameters(const CustomParameters& parameters)
configure();
}

const std::string& UserCodeInterface::getName() const { return mName; }
const std::string& UserCodeInterface::getName() const {
return mName;
}

void UserCodeInterface::setName(const std::string& name) {
mName = name;
}

void UserCodeInterface::enableCtpScalers(size_t runNumber, std::string ccdbUrl)
{
// TODO bail if we are in async
ILOG(Debug, Devel) << "Enabling CTP scalers" << ENDM;
mScalersEnabled = true;
auto& ccdbManager = o2::ccdb::BasicCCDBManager::instance();
ccdbManager.setURL(ccdbUrl);
mCtpFetcher->setupRun(runNumber, &ccdbManager, getCurrentTimestamp(), false);

mScalersLastUpdate = std::chrono::steady_clock::time_point::min();
getScalers(); // initial value
ILOG(Debug, Devel) << "Enabled CTP scalers" << ENDM;
}

void UserCodeInterface::getScalers()
{
if(!mScalersEnabled) {
ILOG(Error, Ops) << "CTP scalers not enabled, impossible to get them." << ENDM;
return; // TODO should we throw ? probably yes
}
ILOG(Debug, Devel) << "Updating scalers." << ENDM;

if(! mDatabase) {
ILOG(Error, Devel) << "Database not set ! Cannot update scalers." << ENDM;
mScalersEnabled = false;

return;
// todo handle the case when database is not set
}

auto now = std::chrono::steady_clock::now();
auto minutesSinceLast = std::chrono::duration_cast<std::chrono::minutes>(now - mScalersLastUpdate);

void UserCodeInterface::setName(const std::string& name) { mName = name; }
// TODO get the interval from config
if (minutesSinceLast.count() < 5) {
ILOG(Debug, Devel) << "getScalers was called less than 5 minutes ago, use the cached value" << ENDM;
return;
}

std::map<std::string, std::string> meta;
void* rawResult = mDatabase->retrieveAny(typeid(o2::ctp::CTPRunScalers), "qc/CTP/Scalers", meta); // TODO make sure we get the last one.
o2::ctp::CTPRunScalers* ctpScalers = static_cast<o2::ctp::CTPRunScalers*>(rawResult);
mCtpFetcher->updateScalers(*ctpScalers);
mScalersLastUpdate = now;
ILOG(Debug, Devel) << "Scalers updated." << ENDM;
}

double UserCodeInterface::getScalersValue(std::string sourceName, size_t runNumber)
{
if(!mScalersEnabled) {
ILOG(Error, Ops) << "CTP scalers not enabled, impossible to get the value." << ENDM;
return 0; // TODO should we throw ? probably yes
}
getScalers(); // from QCDB
auto& ccdbManager = o2::ccdb::BasicCCDBManager::instance();
auto result = mCtpFetcher->fetchNoPuCorr(&ccdbManager, getCurrentTimestamp(), runNumber, sourceName);
ILOG(Debug, Devel) << "Returning scaler value : " << result << ENDM;
return result;
}

void UserCodeInterface::setDatabase(std::unordered_map<std::string, std::string> dbConfig)
{
// TODO one could really argue that it would be easier to have a singleton for the QCDB... because here we will build and save a database object

cout << "dbConfig.count(\"implementation\") " << dbConfig.count("implementation") << " dbConfig.count(\"host\") : " << dbConfig.count("host") << endl;
if(dbConfig.count("implementation") == 0 || dbConfig.count("host") == 0) {
ILOG(Error, Devel) << "dbConfig is incomplete, we don't build the user code database instance " << ENDM;
return;
// todo
}

for (auto pair : dbConfig) {
ILOG(Info,Devel) << pair.first << " : " << pair.second << ENDM;
}

// for every user code we instantiate.
mDatabase = repository::DatabaseFactory::create(dbConfig.at("implementation"));
mDatabase->connect(dbConfig);
ILOG(Info, Devel) << "Database that is going to be used > Implementation : " << dbConfig.at("implementation") << " / Host : " << dbConfig.at("host") << ENDM;
}

} // namespace o2::quality_control::core
4 changes: 2 additions & 2 deletions Framework/test/testCheckWorkflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ using namespace o2::configuration;
* It is expected to terminate as soon as all task publish for the first time.
*/

class Receiver : public framework::Task
class Receiver : public o2::framework::Task
{
public:
Receiver(std::string configurationSource)
Expand All @@ -78,7 +78,7 @@ class Receiver : public framework::Task
~Receiver() override{};

/// \brief Receiver process callback
void run(framework::ProcessingContext& pctx) override
void run(o2::framework::ProcessingContext& pctx) override
{
std::vector<std::string> namesToErase;

Expand Down
3 changes: 3 additions & 0 deletions doc/Advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -1727,7 +1727,10 @@ In the example above, the quality goes to bad when there are 3 cycles in a row w

In consul go to `o2/runtime/aliecs/defaults` and modify the file corresponding to the detector: [det]_qc_shm_segment_size

## CTP Scalers

Get a certificate for development : https://alice-doc.github.io/alice-analysis-tutorial/start/cert.html#test-your-certificate
JAliEn-ROOT


---
Expand Down

0 comments on commit 098dd29

Please sign in to comment.