From 1568aed5ef78d36b2fab0e2c24024b6a9fd9fea9 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Wed, 4 Oct 2023 15:03:05 -0400 Subject: [PATCH 1/4] Add ability to cache offsets and close admin client --- .../datadog_checks/kafka_consumer/client.py | 20 ++++++++++++------- .../kafka_consumer/kafka_consumer.py | 1 + 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index 404d5e7440627..9c5886e1691be 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -15,6 +15,7 @@ def __init__(self, config, tls_context, log) -> None: self.log = log self._kafka_client = None self._tls_context = tls_context + self.topics_cache = {} @property def kafka_client(self): @@ -35,6 +36,7 @@ def __create_consumer(self, consumer_group): "bootstrap.servers": self.config._kafka_connect_str, "group.id": consumer_group, "enable.auto.commit": False, # To avoid offset commit to broker during close + "queued.max.messages.kbytes": 1024, # https://github.com/confluentinc/confluent-kafka-python/issues/759 } config.update(self.__get_authentication_config()) @@ -152,6 +154,9 @@ def get_highwater_offsets(self, consumer_offsets): return highwater_offsets def get_partitions_for_topic(self, topic): + if self.topics_cache.get(topic): + return self.topics_cache.get(topic) + try: cluster_metadata = self.kafka_client.list_topics(topic, timeout=self.config._request_timeout) except KafkaException as e: @@ -160,6 +165,7 @@ def get_partitions_for_topic(self, topic): else: topic_metadata = cluster_metadata.topics[topic] partitions = list(topic_metadata.partitions.keys()) + self.topics_cache[topic] = partitions return partitions def request_metadata_update(self): @@ -175,7 +181,7 @@ def get_consumer_offsets(self): self.log.debug('Identified %s consumer groups', len(consumer_groups)) futures = self._get_consumer_offset_futures(consumer_groups) - self.log.debug('%s futures to be waited on', len(futures)) + self.log.debug('%s futures to be waited on', len(futures)) # 2023-09-26 13:37:22 UTC for future in as_completed(futures): try: @@ -217,7 +223,7 @@ def get_consumer_offsets(self): if self.config._consumer_groups_compiled_regex.match(to_match): consumer_offsets[(consumer_group, topic, partition)] = offset - self.log.debug('Got %s consumer offsets', len(consumer_offsets)) + self.log.debug('Got %s consumer offsets', len(consumer_offsets)) # 13:37:23 return consumer_offsets def _get_consumer_groups(self): @@ -240,6 +246,9 @@ def _get_consumer_groups(self): def _list_consumer_group_offsets(self, cg_tp): return self.kafka_client.list_consumer_group_offsets([cg_tp]) + def close_admin_client(self): + self._kafka_client = None + def _get_consumer_offset_futures(self, consumer_groups): futures = [] @@ -267,11 +276,8 @@ def _get_consumer_offset_futures(self, consumer_groups): # If partitions are not defined else: # get all the partitions for this topic - partitions = ( - self.kafka_client.list_topics(topic=topic, timeout=self.config._request_timeout) - .topics[topic] - .partitions - ) + partitions = self.get_partitions_for_topic(topic) + topic_partitions = [TopicPartition(topic, partition) for partition in partitions] futures.append( diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 08ebf03d6a8a4..719994fd06928 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -157,6 +157,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c self.log.warning(msg, consumer_group, topic, partition) self.client.request_metadata_update() # force metadata update on next poll() self.log.debug('%s consumer offsets reported', reported_contexts) + self.client.close_admin_client() def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): """Emit an event to the Datadog Event Stream.""" From 9ed53a3bdc22714d1234ec4e09d1c19d2885b718 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Wed, 4 Oct 2023 17:32:43 -0400 Subject: [PATCH 2/4] Fix style --- kafka_consumer/datadog_checks/kafka_consumer/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index 9c5886e1691be..708b018ec6d28 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -156,7 +156,7 @@ def get_highwater_offsets(self, consumer_offsets): def get_partitions_for_topic(self, topic): if self.topics_cache.get(topic): return self.topics_cache.get(topic) - + try: cluster_metadata = self.kafka_client.list_topics(topic, timeout=self.config._request_timeout) except KafkaException as e: From bbda959f24fc4c7e62a87f6a64e10c02607f07e8 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Wed, 4 Oct 2023 17:34:22 -0400 Subject: [PATCH 3/4] [Release] Bumped kafka_consumer version to 3.1.4-beta.1 --- kafka_consumer/CHANGELOG.md | 2 ++ kafka_consumer/datadog_checks/kafka_consumer/__about__.py | 2 +- requirements-agent-release.txt | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka_consumer/CHANGELOG.md b/kafka_consumer/CHANGELOG.md index 32c231e63f2ca..897bb1ac30342 100644 --- a/kafka_consumer/CHANGELOG.md +++ b/kafka_consumer/CHANGELOG.md @@ -2,6 +2,8 @@ ## 3.1.3 / 2023-08-14 +## 3.1.4-beta.1 / 2023-10-04 + ***Fixed***: * Optimize highwater offset collection. See [#15285](https://github.com/DataDog/integrations-core/pull/15285). diff --git a/kafka_consumer/datadog_checks/kafka_consumer/__about__.py b/kafka_consumer/datadog_checks/kafka_consumer/__about__.py index a7ba430e5e216..40fcf3b3779ed 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/__about__.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/__about__.py @@ -2,4 +2,4 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) -__version__ = "3.1.3" +__version__ = "3.1.4-beta.1" diff --git a/requirements-agent-release.txt b/requirements-agent-release.txt index 7cfcba458974a..80ae1689edfab 100644 --- a/requirements-agent-release.txt +++ b/requirements-agent-release.txt @@ -79,7 +79,7 @@ datadog-impala==1.1.2 datadog-istio==4.4.1 datadog-jboss-wildfly==2.0.2 datadog-journald==1.1.1 -datadog-kafka-consumer==3.1.3 +datadog-kafka-consumer==3.1.4-beta.1 datadog-kafka==2.13.2 datadog-kong==2.4.1 datadog-kube-apiserver-metrics==3.6.2 From 9af613b81bbc52e4c3586e851930eaabf23563c5 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Wed, 4 Oct 2023 17:34:29 -0400 Subject: [PATCH 4/4] [Release] Update metadata --- .in-toto/tag.eabca4ac.link | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.in-toto/tag.eabca4ac.link b/.in-toto/tag.eabca4ac.link index 4f6d499c65483..14bb1ad712e6a 100644 --- a/.in-toto/tag.eabca4ac.link +++ b/.in-toto/tag.eabca4ac.link @@ -1 +1 @@ -{"signatures":[{"keyid":"eabca4ac14acbaf006cc03847c6739cb3a799f86","other_headers":"04000108001d162104eabca4ac14acbaf006cc03847c6739cb3a799f86050264fb7d87","signature":"618234153e92dcd4772003ff7c6e0b9e1cc38cebd5b07180a06bb25d91229486f4cfc55aceafc713aa1905bd6ffec725362d1dbd220189769220c5e3d682b473b1709f2edf017cdd52d8a41c6a554bf1b4dbd8197f8e071c247527a9d737cda37b49ae0860d84c24dfddf8b7518d40f136d89c51f3299dad0c8c48020dd7dec8f5b6a0cddf04196fd6fc18330ef21c3d03c50bb0e2284a3bafa02d79ea028813d3d32c5f8269dae0c08f06e554e3b6b0b9d742d2815a9c9aaa9e8ade61aa4298fe9f56d00b6fed76b91bb8979c145c677d34ce9715f30fc35c208a890cd2867a600704ff16f3d1f12f52821d845f07eb3debef224d377d44565819242b73969f3b2b9e712a0c6ef74cba6c94afd0eab71fb3e6534d607a4381b4cd608d33a16a2051ef93c701e8a0ec61fcfc1aae50a5b02c570afb5177afa0f5dda4f5b2e9a78521ffe53b4fe846a604138b9d543569c7a2dba55ad28b291d6599b93d4bc85423001a301e1e9333b363d08018596b99e1bfa3e3925891e7f1e7d5701fc79ceadbd89e34440bcf4036137ab5207cb8e4a2c1dad17f528120a4ea0a1958b7cf764867db6f4d5bd9bdac37bbb39daed690ab574420d985c02ae558fb1922ccda65e40fcf44ef6bbf2a877eec296bf0f1f33df10350b4cb3d1f067a32415e589105f7f5d31caa05e0a25183fbadc0e9282fc25e58b8c2e6fb30b7d4bd28b13b690d"}],"signed":{"_type":"link","byproducts":{},"command":[],"environment":{},"materials":{},"name":"tag","products":{"kafka_consumer/datadog_checks/__init__.py":{"sha256":"9a3c64b8b00c94da4b4f34618d803d3255808caf21b8afa9195c84b61da66b6a"},"kafka_consumer/datadog_checks/kafka_consumer/__about__.py":{"sha256":"3a639680ccbb2cccef79083a0aec04800ee80cecf88dbcc732f502b3da2f5032"},"kafka_consumer/datadog_checks/kafka_consumer/__init__.py":{"sha256":"5d5a2f991c64a95c96c8713f3179f9d0944682b6bed037b8c34d9f85463ca57c"},"kafka_consumer/datadog_checks/kafka_consumer/client.py":{"sha256":"d9ed648be374c67ee2e842698bfb72aa91ff3ef3c6f11bb7e344b8aa083bae20"},"kafka_consumer/datadog_checks/kafka_consumer/config.py":{"sha256":"55a1007ed7374fafb63b3b29eabdc854e13b9bfc41d13e9be347774af230aec0"},"kafka_consumer/datadog_checks/kafka_consumer/config_models/__init__.py":{"sha256":"c9cf5c66894430e7edbb00d00613b58ccfd38360f2fe490a23c17cf71ed294dc"},"kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py":{"sha256":"5c46e0f757deee89e8ffb54fb9c06fb21ddb7fdbf253dcf80f42ae63e0c516fa"},"kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py":{"sha256":"10ad3f00ae389d32c47d79d2be6de4e85f6d33d616afda64b4220be45ced9be1"},"kafka_consumer/datadog_checks/kafka_consumer/config_models/shared.py":{"sha256":"aaac974b2272d540203aa9822a55bf3601c3d824e03e6ea9bb0ffa3bc316e76a"},"kafka_consumer/datadog_checks/kafka_consumer/config_models/validators.py":{"sha256":"0424fe17778b76e1b589b9564d0d543d1b71dba1edd6e5d71a7c528dddf68e0b"},"kafka_consumer/datadog_checks/kafka_consumer/constants.py":{"sha256":"df9bbecdb8bf8009ed20402963ebea6104bba418d7a1894034438bc0e3c91b44"},"kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example":{"sha256":"4258bd81a13bc7a6a1b5754946155efe3e9a328a2c081945cbac94dd430e7bf9"},"kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py":{"sha256":"287fc5bb631cb665c86cae382bff3a6a00a7aec473cb09aa31d77e148091e266"},"kafka_consumer/pyproject.toml":{"sha256":"fd98e849fff6554cb9e19e25546adb89563f0c7ee417b03bcb1dc8e4ed0c9569"}}}} \ No newline at end of file +{"signatures":[{"keyid":"eabca4ac14acbaf006cc03847c6739cb3a799f86","other_headers":"04000108001d162104eabca4ac14acbaf006cc03847c6739cb3a799f860502651dda64","signature":"9d9bebc62277eaaf8e68a10e827256385e8ecd6b9c6d8cfb113fd78f255229c2778ff7bd54a698ac3c4c07fb8215a570fd2fa297b50d44bb9f1be9194a833b65e5b96b904d44bf4a705448efbb28d7a857be190d8fc203cd9bf8f1b76e176e102ae46675116e2812d5c7c2d09c532768cff4d5e2ee12d9c8d36d69285f75ef4dabadf8aaa1bab6455a29bace30d5f848fe0c372c9d4aa314e5031ac20789b001389a7537b6d204aab49c09f368aad64c84f080d97c5ffa6af69d060c9f3aabc147a242f79e49f92b331e9b1362c7c8dfd913403b61a65efe15694a4328780c0e6f5f7ee5ce8596360791a55dcac2fdd4e99a09e122e82cf2224b8b8c0ca0d899c8a018a5dd3e33cc03b509423164bdf4f12bf3cdec59ee788c267e5d416f9acdbe5c5e08a61594ea682683ad7b472c3c33b611e6deb379e4daf2e31800948ea1745a955a7e4107486a0b4ee044fd4cbf53bf49aaa538226fe5a06ef8d9d298b37e50376bbe3f1fd7e9e0deac0da27beded5decf59140f4dcf0d94f4d9a9a2de6effd4034fbf6e3c2a603fe575216247d5bcf3fe4654166843411f7b1e753eeeb83de28d394b6e5665505a88638d3f88e61a9f34cdfd66ec7b5e2e3ce54e92662644836bc0e0be7ab45a6097902424f8fe0bf037381714c6049cc6963b8cd0ba6d17353717a833d7d98fe00f4d4246959fe81bad8294b4c8855887119a03c7c85"}],"signed":{"_type":"link","byproducts":{},"command":[],"environment":{},"materials":{},"name":"tag","products":{"kafka_consumer/datadog_checks/__init__.py":{"sha256":"9a3c64b8b00c94da4b4f34618d803d3255808caf21b8afa9195c84b61da66b6a"},"kafka_consumer/datadog_checks/kafka_consumer/__about__.py":{"sha256":"7e063730f980fa6ce9b16c86c741764385de28ac2f1e1d4059bf2d72cf04149c"},"kafka_consumer/datadog_checks/kafka_consumer/__init__.py":{"sha256":"5d5a2f991c64a95c96c8713f3179f9d0944682b6bed037b8c34d9f85463ca57c"},"kafka_consumer/datadog_checks/kafka_consumer/client.py":{"sha256":"97830ef91befc148d7a9e09f9aea79d6dea86d205ce2076429b444f9ee96aa90"},"kafka_consumer/datadog_checks/kafka_consumer/config.py":{"sha256":"55a1007ed7374fafb63b3b29eabdc854e13b9bfc41d13e9be347774af230aec0"},"kafka_consumer/datadog_checks/kafka_consumer/config_models/__init__.py":{"sha256":"c9cf5c66894430e7edbb00d00613b58ccfd38360f2fe490a23c17cf71ed294dc"},"kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py":{"sha256":"5c46e0f757deee89e8ffb54fb9c06fb21ddb7fdbf253dcf80f42ae63e0c516fa"},"kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py":{"sha256":"10ad3f00ae389d32c47d79d2be6de4e85f6d33d616afda64b4220be45ced9be1"},"kafka_consumer/datadog_checks/kafka_consumer/config_models/shared.py":{"sha256":"aaac974b2272d540203aa9822a55bf3601c3d824e03e6ea9bb0ffa3bc316e76a"},"kafka_consumer/datadog_checks/kafka_consumer/config_models/validators.py":{"sha256":"0424fe17778b76e1b589b9564d0d543d1b71dba1edd6e5d71a7c528dddf68e0b"},"kafka_consumer/datadog_checks/kafka_consumer/constants.py":{"sha256":"df9bbecdb8bf8009ed20402963ebea6104bba418d7a1894034438bc0e3c91b44"},"kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example":{"sha256":"4258bd81a13bc7a6a1b5754946155efe3e9a328a2c081945cbac94dd430e7bf9"},"kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py":{"sha256":"929ce3e2364a252ebf761fc4e5b2c0fa8038b1642cf5b67bfd03bf31f2b54fb0"},"kafka_consumer/pyproject.toml":{"sha256":"fd98e849fff6554cb9e19e25546adb89563f0c7ee417b03bcb1dc8e4ed0c9569"}}}} \ No newline at end of file