diff --git a/Framework/src/KafkaPoller.cxx b/Framework/src/KafkaPoller.cxx index e87fc5cf81..47d5cfc800 100644 --- a/Framework/src/KafkaPoller.cxx +++ b/Framework/src/KafkaPoller.cxx @@ -6,6 +6,7 @@ #include "proto/events.pb.h" #include #include +#include namespace o2::quality_control::core { @@ -112,10 +113,21 @@ bool end_of_run::isValid(const events::Event& event, const std::string& environm kafka::Properties createProperties(const std::string& brokers, const std::string& groupId) { - return { { { "bootstrap.servers", { brokers } }, - { "group.id", { groupId } }, - { "enable.auto.commit", { "true" } }, - { "auto.offset.reset", { "latest" } } } }; + if (brokers.empty()) { + constexpr std::string_view message{ "You are trying to start KafkaPoller without any brokers" }; + ILOG(Fatal, Ops) << message << ENDM; + throw std::invalid_argument{ message.data() }; + } + + auto properties = kafka::Properties{ { { "bootstrap.servers", { brokers } }, + { "enable.auto.commit", { "true" } }, + { "auto.offset.reset", { "latest" } } } }; + + if (!groupId.empty()) { + properties.put("group.id", groupId); + } + + return properties; } KafkaPoller::KafkaPoller(const std::string& brokers, const std::string& groupId) @@ -130,7 +142,7 @@ void KafkaPoller::subscribe(const std::string& topic, size_t numberOfRetries) mConsumer.subscribe({ topic }); return; } catch (const kafka::KafkaException& ex) { - // it sometimes happen that subscibe timeouts but another retry succeeds + // it sometimes happens that subscibe timeouts but another retry succeeds if (ex.error().value() != RD_KAFKA_RESP_ERR__TIMED_OUT) { throw; } else { diff --git a/Framework/src/Triggers.cxx b/Framework/src/Triggers.cxx index 0c7d99a570..b1915db217 100644 --- a/Framework/src/Triggers.cxx +++ b/Framework/src/Triggers.cxx @@ -68,8 +68,25 @@ std::string createKafkaGroupId(std::string_view prefix, std::string_view detecto return groupId; } +bool checkKafkaParams(const std::string& kafkaBrokers, const std::string& topic, const std::string_view triggerTypeLogId) +{ + if (kafkaBrokers.empty()) { + ILOG(Error, Support) << "You are tring to create " << triggerTypeLogId << " trigger using Kafka without any brokers, fill config value 'kafkaBrokersUrl'" << ENDM; + return false; + } + if (topic.empty()) { + ILOG(Error, Support) << "You are tring to consume empty Kafka topic from " << triggerTypeLogId << " trigger, fill config value 'kafkaTopic'" << ENDM; + return false; + } + return true; +} + TriggerFcn StartOfRun(const std::string& kafkaBrokers, const std::string& topic, const std::string& detector, const std::string& taskName, const core::Activity& activity) { + if (!checkKafkaParams(kafkaBrokers, topic, "SOR")) { + throw std::invalid_argument{ "We don't have enough information to consume Kafka. Check IL" }; + } + auto copiedActivity = activity; auto poller = std::make_shared(kafkaBrokers, createKafkaGroupId("SOR_postprocessing", detector, taskName)); poller->subscribe(topic); @@ -117,6 +134,10 @@ TriggerFcn Never(const Activity& activity) TriggerFcn EndOfRun(const std::string& kafkaBrokers, const std::string& topic, const std::string& detector, const std::string& taskName, const Activity& activity) { + if (!checkKafkaParams(kafkaBrokers, topic, "EOR")) { + throw std::invalid_argument{ "We don't have enough information to consume Kafka. Check IL" }; + } + auto copiedActivity = activity; auto poller = std::make_shared(kafkaBrokers, createKafkaGroupId("EOR_postprocessing", detector, taskName)); poller->subscribe(topic); diff --git a/Framework/test/testKafkaTests.cxx b/Framework/test/testKafkaTests.cxx index 846ea85c06..87b4560083 100644 --- a/Framework/test/testKafkaTests.cxx +++ b/Framework/test/testKafkaTests.cxx @@ -166,6 +166,22 @@ void sendSorAndTeardown() sendMessage(producer, createEorTeardownProtoMessage()); } +TEST_CASE("test_kafka_empty_brokers", "[.manual_kafka]") +{ + using namespace o2::quality_control::core; + REQUIRE_THROWS_AS(KafkaPoller("", ""), std::invalid_argument); +} + +TEST_CASE("test_kafka_poller_soreor_empty_brokers", "[.manual_kafka]") +{ + using namespace o2::quality_control; + core::Activity activityMatching{}; + REQUIRE_THROWS_AS(postprocessing::triggers::StartOfRun("", testTopic, "TST", "sor_test_matching", activityMatching), std::invalid_argument); + REQUIRE_THROWS_AS(postprocessing::triggers::StartOfRun("emptystring", "", "TST", "sor_test_matching", activityMatching), std::invalid_argument); + REQUIRE_THROWS_AS(postprocessing::triggers::EndOfRun("", testTopic, "TST", "sor_test_matching", activityMatching), std::invalid_argument); + REQUIRE_THROWS_AS(postprocessing::triggers::EndOfRun("emptystring", "", "TST", "sor_test_matching", activityMatching), std::invalid_argument); +} + TEST_CASE("test_kafka_poller_soreor", "[.manual_kafka]") { const auto kafkaCluster = getClusterURLFromEnv();