Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
CDAR-759: Provision Kafka topics for SDSM and Detections (#314)
<!-- Thanks for the contribution, this is awesome. --> # PR Details ## Description This PR provisions Kafka topics for detection data and for SDSMs. This helps prevent kafka consumer initialization from failing when attempting to consume from topics that do not exist yet. Additionally despite kafka depedency , sensor_data_sharing_service will start prior to creation of topics or health of kafka consumers causing confusing critical logs about connection timeouts and also outright consumer subscription failure when kafka volumes are removed. This PR also adds a healthcheck to check the creation of the last topic, which confirms all topics are created and the kafka server is healthy Example of confusing connection timeout logs : ``` critical] [kafka_consumer_worker.h:98] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 98. LOG: 3 FAIL [thrd:172.4.0.3:9092/bootstrap]: 172.4.0.3:9092/bootstrap: Connect to ipv4#172.4.0.3:9092 failed: Connection refused (after 0ms in state CONNECT) [2024-02-02 11:58:49.467] [debug] [sensor_data_sharing_service.cpp:91] Attempting to consume detections ... [2024-02-02 11:58:49.467] [info] [sensor_data_sharing_service.cpp:118] Starting SDSM Producer! [2024-02-02 11:58:49.467] [info] [kafka_consumer_worker.cpp:114] rdkafka#consumer-1 Successfully to subscribe to 1 topics: Success [2024-02-02 11:58:49.467] [critical] [kafka_consumer_worker.h:90] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 90. ERROR: Local: Broker transport failure 172.4.0.3:9092/bootstrap: Connect to ipv4#172.4.0.3:9092 failed: Connection refused (after 144ms in state CONNECT) [2024-02-02 11:58:49.467] [info] [kafka_consumer_worker.cpp:114] rdkafka#consumer-3 Successfully to subscribe to 1 topics: Success [2024-02-02 11:58:49.467] [critical] [kafka_consumer_worker.h:90] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 90. ERROR: Local: All broker connections are down 1/1 brokers are down [2024-02-02 11:58:49.467] [critical] [kafka_consumer_worker.h:90] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 90. ERROR: Local: Broker transport failure 172.4.0.3:9092/bootstrap: Connect to ipv4#172.4.0.3:9092 failed: Connection refused (after 0ms in state CONNECT) [2024-02-02 11:58:49.467] [critical] [kafka_consumer_worker.h:90] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 90. ERROR: Local: Broker transport failure 172.4.0.3:9092/bootstrap: Connect to ipv4#172.4.0.3:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed) [2024-02-02 11:58:49.467] [critical] [kafka_consumer_worker.h:90] /home/carma-streets/kafka_clients/include/kafka_consumer_worker.h : Line 90. ERROR: Local: All broker connections are down 1/1 brokers are down [2024-02-02 11:59:35.529] [info] [kafka_consumer_worker.h:42] RebalanceCb: Local: Assign partitions [2024-02-02 11:59:35.529] [info] [kafka_consumer_worker.h:36] Topic time_sync , Partition 0 [2024-02-02 11:59:35.529] [info] [kafka_consumer_worker.h:42] RebalanceCb: Local: Assign partitions ``` <!--- Describe your changes in detail --> ## Related Issue [CDAR-759 ](https://usdot-carma.atlassian.net/browse/CDAR-759)<!--- This project only accepts pull requests related to open issues --> <!--- If suggesting a new feature or change, please discuss it in an issue first --> <!--- If fixing a bug, there should be an issue describing it with steps to reproduce --> <!--- Please link to the issue here: --> ## Motivation and Context When clear Kafka and Zookeeper volumes, existing topics are also deleted. Since the current CARMA Config does not provision a topic for detections this causes the initialization of the Kafka consumer to fail when attempting to subscribe to a not existing topic. The topic eventually gets created by the detection producer in the CARMA Streets Plugin in V2X-Hub. Failed consumer subscribe is indicated by following logs : ``` [2024-02-06 00:18:13.730] [info] [sensor_data_sharing_service.cpp:118] Starting SDSM Producer! [2024-02-06 00:18:13.730] [info] [kafka_consumer_worker.cpp:114] rdkafka#consumer-3 Successfully to subscribe to 1 topics: Success [2024-02-06 00:18:13.730] [info] [kafka_consumer_worker.cpp:114] rdkafka#consumer-1 Successfully to subscribe to 1 topics: Success [2024-02-06 00:18:13.734] [critical] [kafka_consumer_worker.cpp:161] rdkafka#consumer-1 Consume failed: Subscribed topic not available: time_sync: Broker: Unknown topic or partition [2024-02-06 00:18:13.740] [critical] [kafka_consumer_worker.cpp:161] rdkafka#consumer-3 Consume failed: Subscribed topic not available: v2xhub_sim_sensor_detected_object: Broker: Unknown topic or partition [2024-02-06 00:18:18.757] [error] [sensor_data_sharing_service.cpp:110] Something went wrong, no longer consuming detections. ``` <!--- Why is this change required? What problem does it solve? --> ## How Has This Been Tested? CDASim Deployment <!--- Please describe in detail how you tested your changes. --> <!--- Include details of your testing environment, and the tests you ran to --> <!--- see how your change affects other areas of the code, etc. --> ## Types of changes <!--- What types of changes does your code introduce? Put an `x` in all the boxes that apply: --> - [x] Defect fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that cause existing functionality to change) ## Checklist: <!--- Go over all the following points, and put an `x` in all the boxes that apply. --> <!--- If you're unsure about any of these, don't hesitate to ask. We're here to help! --> - [ ] I have added any new packages to the sonar-scanner.properties file - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. [CARMA Contributing Guide](https://github.com/usdot-fhwa-stol/carma-platform/blob/develop/Contributing.md) - [ ] I have added tests to cover my changes. - [ ] All new and existing tests passed.
- Loading branch information