From cad5e3a8c58ab0b9868d24b148907a3c356f38b3 Mon Sep 17 00:00:00 2001 From: Adi Suresh Date: Wed, 21 Feb 2024 14:53:51 -0600 Subject: [PATCH] Fix kafka plugin dependencies. (#4169) 1. Integration test dependencies were being pulled in when compiling source code and unit tests. 2. The wrong namespace for json-schema-validator was being used. 3. Remove catching BrokerEndPointNotAvailableException because that exception will not be thrown by Kafka clients.3. Remove catching BrokerEndPointNotAvailableException because that exception will not be thrown by Kafka clients.3. Remove catching BrokerEndPointNotAvailableException because that exception will not be thrown by Kafka clients. Signed-off-by: Adi Suresh --- .../kafka-plugins/build.gradle | 18 +++++++++--------- .../consumer/KafkaCustomConsumerFactory.java | 4 +--- .../plugins/kafka/source/KafkaSource.java | 4 +--- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index 96d8cc1c64..7ed03e1d58 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -35,21 +35,16 @@ dependencies { implementation 'io.micrometer:micrometer-core' implementation libs.commons.lang3 implementation 'io.confluent:kafka-avro-serializer:7.4.0' + implementation 'io.confluent:kafka-json-schema-serializer:7.4.0' implementation 'io.confluent:kafka-schema-registry-client:7.4.0' - implementation ('io.confluent:kafka-schema-registry:7.4.0:tests') { - exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet' - exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2' - exclude group: 'org.glassfish.jersey.ext', module: 'jersey-bean-validation' - } implementation 'software.amazon.awssdk:sts' implementation 'software.amazon.awssdk:auth' implementation 'software.amazon.awssdk:kafka' implementation 'software.amazon.awssdk:kms' implementation 'software.amazon.msk:aws-msk-iam-auth:2.0.3' implementation 'software.amazon.glue:schema-registry-serde:1.1.15' - implementation 'io.confluent:kafka-json-schema-serializer:7.4.0' implementation project(':data-prepper-plugins:failures-common') - implementation 'com.github.fge:json-schema-validator:2.2.14' + implementation 'com.github.java-json-tools:json-schema-validator:2.2.14' implementation 'commons-collections:commons-collections:3.2.2' implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:apache-client' @@ -65,7 +60,6 @@ dependencies { testImplementation 'org.apache.kafka:kafka_2.13:3.6.1' testImplementation 'org.apache.kafka:kafka_2.13:3.6.1:test' testImplementation 'org.apache.curator:curator-test:5.5.0' - testImplementation 'io.confluent:kafka-schema-registry:7.4.0' testImplementation('com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39') testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.9' testImplementation project(':data-prepper-plugins:otel-metrics-source') @@ -74,8 +68,15 @@ dependencies { testImplementation libs.protobuf.util testImplementation libs.commons.io testImplementation libs.armeria.grpc + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' integrationTestImplementation testLibs.junit.vintage + integrationTestImplementation 'io.confluent:kafka-schema-registry:7.4.0' + integrationTestImplementation ('io.confluent:kafka-schema-registry:7.4.0:tests') { + exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet' + exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2' + exclude group: 'org.glassfish.jersey.ext', module: 'jersey-bean-validation' + } constraints { implementation('org.mozilla:rhino') { @@ -130,4 +131,3 @@ task integrationTest(type: Test) { includeTestsMatching '*IT' } } - diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index fab0a4f56e..961e0328d3 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -10,7 +10,6 @@ import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import io.confluent.kafka.serializers.KafkaJsonDeserializer; -import kafka.common.BrokerEndPointNotAvailableException; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -108,8 +107,7 @@ public List createConsumersForTopic(final KafkaConsumerConf }); } catch (Exception e) { - if (e instanceof BrokerNotAvailableException || - e instanceof BrokerEndPointNotAvailableException || e instanceof TimeoutException) { + if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) { LOG.error("The Kafka broker is not available."); } else { LOG.error("Failed to setup the Kafka Source Plugin.", e); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index ec27f1f370..fbdee41105 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -11,7 +11,6 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; -import kafka.common.BrokerEndPointNotAvailableException; import org.apache.avro.generic.GenericRecord; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -146,8 +145,7 @@ public void start(Buffer> buffer) { executorService.submit(consumer); }); } catch (Exception e) { - if (e instanceof BrokerNotAvailableException || - e instanceof BrokerEndPointNotAvailableException || e instanceof TimeoutException) { + if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) { LOG.error("The kafka broker is not available..."); } else { LOG.error("Failed to setup the Kafka Source Plugin.", e);