diff --git a/build.gradle b/build.gradle index 35bb1629cec..7eac8918051 100644 --- a/build.gradle +++ b/build.gradle @@ -40,8 +40,8 @@ if (project.hasProperty('overrideBuildEnvironment')) { def avroVersion = '1.10.2' def avroUtilVersion = '0.3.21' def grpcVersion = '1.49.2' -def kafkaGroup = 'com.linkedin.kafka' -def kafkaVersion = '2.4.1.65' +def kafkaGroup = 'org.apache.kafka' // 'com.linkedin.kafka' +def kafkaVersion = '2.4.1' // '2.4.1.65' def log4j2Version = '2.17.1' def pegasusVersion = '29.31.0' def protobufVersion = '3.21.7' @@ -52,14 +52,15 @@ def alpnAgentVersion = '2.0.10' def hadoopVersion = '2.10.2' def apacheSparkVersion = '3.3.3' def antlrVersion = '4.8' +def scala = '2.12' ext.libraries = [ alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}", antlr4: "org.antlr:antlr4:${antlrVersion}", antlr4Runtime: "org.antlr:antlr4-runtime:${antlrVersion}", - apacheSparkAvro: "org.apache.spark:spark-avro_2.12:${apacheSparkVersion}", - apacheSparkCore: "org.apache.spark:spark-core_2.12:${apacheSparkVersion}", - apacheSparkSql: "org.apache.spark:spark-sql_2.12:${apacheSparkVersion}", + apacheSparkAvro: "org.apache.spark:spark-avro_${scala}:${apacheSparkVersion}", + apacheSparkCore: "org.apache.spark:spark-core_${scala}:${apacheSparkVersion}", + apacheSparkSql: "org.apache.spark:spark-sql_${scala}:${apacheSparkVersion}", avro: "org.apache.avro:avro:${avroVersion}", avroCompiler: "org.apache.avro:avro-compiler:${avroVersion}", avroMapred: "org.apache.avro:avro-mapred:${avroVersion}", @@ -101,7 +102,7 @@ ext.libraries = [ jna: 'net.java.dev.jna:jna:4.5.1', jsr305: 'com.google.code.findbugs:jsr305:3.0.2', joptSimple: 'net.sf.jopt-simple:jopt-simple:3.2', - kafka: "${kafkaGroup}:kafka_2.12:${kafkaVersion}", + kafka: "${kafkaGroup}:kafka_${scala}:${kafkaVersion}", kafkaClients: "${kafkaGroup}:kafka-clients:${kafkaVersion}", kafkaClientsTest: "${kafkaGroup}:kafka-clients:${kafkaVersion}:test", log4j2api: "org.apache.logging.log4j:log4j-api:${log4j2Version}", diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java index cd1ab58ff7c..04dabe3c449 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceClusterConfig.java @@ -27,6 +27,7 @@ import com.linkedin.venice.exceptions.ConfigurationException; import com.linkedin.venice.exceptions.UndefinedPropertyException; import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.KafkaSSLUtils; import com.linkedin.venice.utils.RegionUtils; import com.linkedin.venice.utils.VeniceProperties; @@ -44,7 +45,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -89,8 +89,8 @@ public class VeniceClusterConfig { private final VeniceProperties clusterProperties; - private final SecurityProtocol kafkaSecurityProtocol; - private final Map kafkaBootstrapUrlToSecurityProtocol; + private final PubSubSecurityProtocol kafkaSecurityProtocol; + private final Map kafkaBootstrapUrlToSecurityProtocol; private final Optional sslConfig; public VeniceClusterConfig(VeniceProperties clusterProps, Map> kafkaClusterMap) @@ -135,17 +135,17 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map tmpKafkaClusterIdToUrlMap = new Int2ObjectOpenHashMap<>(); Object2IntMap tmpKafkaClusterUrlToIdMap = new Object2IntOpenHashMap<>(); Int2ObjectMap tmpKafkaClusterIdToAliasMap = new Int2ObjectOpenHashMap<>(); Object2IntMap tmpKafkaClusterAliasToIdMap = new Object2IntOpenHashMap<>(); - Map tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>(); + Map tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>(); Map tmpKafkaUrlResolution = new HashMap<>(); boolean foundBaseKafkaUrlInMappingIfItIsPopulated = kafkaClusterMap.isEmpty(); @@ -167,7 +167,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map sslConfig = serverConfig.getSslConfig(); if (!sslConfig.isPresent()) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index e6c8f27a1d8..9f910aee328 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -43,6 +43,7 @@ import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.schema.SchemaEntry; @@ -60,7 +61,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.Function; import org.apache.avro.Schema; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -124,7 +124,7 @@ private void setupMockConfig() { doReturn(VeniceProperties.empty()).when(mockVeniceServerConfig).getKafkaConsumerConfigsForLocalConsumption(); doReturn(getConsumerAssignmentStrategy()).when(mockVeniceServerConfig).getSharedConsumerAssignmentStrategy(); doReturn(1).when(mockVeniceServerConfig).getConsumerPoolSizePerKafkaCluster(); - doReturn(SecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl); + doReturn(PubSubSecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl); doReturn(10).when(mockVeniceServerConfig).getKafkaMaxPollRecords(); doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherConsumerPoolSize(); doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherThreadPoolSize(); diff --git a/clients/venice-admin-tool/build.gradle b/clients/venice-admin-tool/build.gradle index 25c7f20b918..f6ebb17694a 100644 --- a/clients/venice-admin-tool/build.gradle +++ b/clients/venice-admin-tool/build.gradle @@ -21,6 +21,8 @@ dependencies { // Helix use zk 3.6.9, which introduce netty 3.10 and will fail our test. exclude module: 'zookeeper' } + implementation libraries.zstd + testImplementation project(':internal:venice-common').sourceSets.test.output } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapter.java index 67ba9bf45cf..a591d96452a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapter.java @@ -20,7 +20,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Metric; @@ -151,7 +150,10 @@ public void close(int closeTimeOutMs, boolean doFlush) { } if (doFlush) { // Flush out all the messages in the producer buffer - producer.flush(closeTimeOutMs, TimeUnit.MILLISECONDS); + producer.flush(); + + // N.B.: The LI Kafka fork supports passing a timeout to flush, but not Apache Kafka... + // producer.flush(closeTimeOutMs, TimeUnit.MILLISECONDS); LOGGER.info("Flushed all the messages in producer before closing"); } producer.close(Duration.ofMillis(closeTimeOutMs)); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java index 9946842eed2..19ed1a1a2cf 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/KafkaSSLUtils.java @@ -1,12 +1,12 @@ package com.linkedin.venice.utils; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import java.util.Arrays; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; public class KafkaSSLUtils { @@ -26,26 +26,20 @@ public class KafkaSSLUtils { SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG); - /** - * Right now, Venice only supports two Kafka protocols: - * {@link SecurityProtocol#PLAINTEXT} - * {@link SecurityProtocol#SSL} - * - * @param kafkaProtocol - * @return - */ public static boolean isKafkaProtocolValid(String kafkaProtocol) { - return kafkaProtocol.equals(SecurityProtocol.PLAINTEXT.name()) || kafkaProtocol.equals(SecurityProtocol.SSL.name()) - || kafkaProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name()) - || kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name()); + return kafkaProtocol.equals(PubSubSecurityProtocol.PLAINTEXT.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SASL_PLAINTEXT.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name()); } public static boolean isKafkaSSLProtocol(String kafkaProtocol) { - return kafkaProtocol.equals(SecurityProtocol.SSL.name()) || kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name()); + return kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name()) + || kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name()); } - public static boolean isKafkaSSLProtocol(SecurityProtocol kafkaProtocol) { - return kafkaProtocol == SecurityProtocol.SSL || kafkaProtocol == SecurityProtocol.SASL_SSL; + public static boolean isKafkaSSLProtocol(PubSubSecurityProtocol kafkaProtocol) { + return kafkaProtocol == PubSubSecurityProtocol.SSL || kafkaProtocol == PubSubSecurityProtocol.SASL_SSL; } /** diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java index ecfea7c98c8..00e2a9da703 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminAdapterTest.java @@ -27,6 +27,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -192,7 +193,7 @@ public void testDeleteTopicValidTopicDeletion() throws Exception { DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class); KafkaFuture topicDeletionFutureMock = mock(KafkaFuture.class); - when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock); + when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock); when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock); when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS))).thenReturn(null); @@ -208,7 +209,7 @@ public void testDeleteTopicThrowsException() throws Exception { DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class); KafkaFuture topicDeletionFutureMock = mock(KafkaFuture.class); - when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock); + when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock); when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock); when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS))) diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java index a0a197af024..5091e0fd019 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java @@ -2,13 +2,13 @@ import static org.testng.Assert.*; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.VeniceProperties; import java.util.Properties; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.testng.annotations.Test; @@ -20,15 +20,15 @@ public class ApacheKafkaAdminConfigTest { @Test public void testSetupSaslInKafkaAdminPlaintext() { - testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_PLAINTEXT); + testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_PLAINTEXT); } @Test public void testSetupSaslInKafkaAdminSSL() { - testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_SSL); + testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_SSL); } - private void testSetupSaslInKafkaAdmin(SecurityProtocol securityProtocol) { + private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) { Properties properties = new Properties(); properties.put("cluster.name", "cluster"); properties.put("zookeeper.address", "localhost:2181"); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterTest.java index bfc482d7a02..6f38d70253f 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterTest.java @@ -1,7 +1,6 @@ package com.linkedin.venice.pubsub.adapter.kafka.producer; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -33,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -140,10 +138,10 @@ public void testSendMessageInteractionWithInternalProducer() { @Test public void testCloseInvokesProducerFlushAndClose() { - doNothing().when(kafkaProducerMock).flush(anyLong(), any(TimeUnit.class)); + doNothing().when(kafkaProducerMock).flush(); ApacheKafkaProducerAdapter producerAdapter = new ApacheKafkaProducerAdapter(producerConfigMock, kafkaProducerMock); producerAdapter.close(10, true); - verify(kafkaProducerMock, times(1)).flush(anyLong(), any(TimeUnit.class)); + verify(kafkaProducerMock, times(1)).flush(); verify(kafkaProducerMock, times(1)).close(any(Duration.class)); } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java index 0a36349c144..ef67ef05aee 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/KafkaSSLUtilsTest.java @@ -2,7 +2,7 @@ import static org.testng.Assert.*; -import org.apache.kafka.common.protocol.SecurityProtocol; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import org.testng.annotations.Test; @@ -25,9 +25,9 @@ public void testIsKafkaSSLProtocol() { @Test public void testTestIsKafkaSSLProtocol() { - assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SSL)); - assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.PLAINTEXT)); - assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SASL_SSL)); - assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SASL_PLAINTEXT)); + assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SSL)); + assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.PLAINTEXT)); + assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SASL_SSL)); + assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SASL_PLAINTEXT)); } } diff --git a/internal/venice-test-common/build.gradle b/internal/venice-test-common/build.gradle index b6ae4145921..d2b23de87e1 100644 --- a/internal/venice-test-common/build.gradle +++ b/internal/venice-test-common/build.gradle @@ -36,7 +36,6 @@ configurations { } } implementation { - exclude group: 'org.apache.kafka' exclude group: 'org.mortbay.jetty', module: 'servlet-api' } integrationTestImplementation.extendsFrom testImplementation @@ -96,6 +95,7 @@ dependencies { implementation libraries.samzaApi implementation libraries.spark implementation libraries.testng + implementation libraries.zstd implementation (libraries.mapreduceClientJobClient) { exclude group: 'org.apache.avro' @@ -106,6 +106,7 @@ dependencies { testImplementation project(':internal:venice-common').sourceSets.test.output testImplementation libraries.log4j2core testImplementation libraries.log4j2api + testImplementation libraries.kafkaClients jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:' + jmh.jmhVersion.get() jmhImplementation project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils') diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java index aa0d6d77302..b65d2b1f9a9 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/KafkaTestUtils.java @@ -1,12 +1,12 @@ package com.linkedin.venice.integration.utils; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.SslUtils; import com.linkedin.venice.utils.SslUtils.VeniceTlsConfiguration; import java.util.Properties; import kafka.server.KafkaConfig; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.protocol.SecurityProtocol; /** @@ -43,7 +43,7 @@ public static Properties getLocalCommonKafkaSSLConfig(VeniceTlsConfiguration tls public static Properties getLocalKafkaClientSSLConfig() { Properties properties = new Properties(); - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name()); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, PubSubSecurityProtocol.SSL.name()); properties.putAll(getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); return properties; } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java index 89aca584485..9cf85442128 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java @@ -67,6 +67,7 @@ import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer; import com.linkedin.venice.stats.TehutiUtils; import com.linkedin.venice.utils.PropertyBuilder; @@ -84,7 +85,6 @@ import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -228,7 +228,7 @@ static StatefulServiceProvider generateService(VeniceCo } if (options.isSslToKafka()) { - builder.put(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.SSL.name); + builder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name); builder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java index 7c8ef1b4000..04fa96c93cd 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java @@ -43,6 +43,7 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.AllowlistAccessor; import com.linkedin.venice.helix.ZkAllowlistAccessor; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.server.VeniceServer; import com.linkedin.venice.server.VeniceServerContext; @@ -73,7 +74,6 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.io.FileUtils; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -259,7 +259,7 @@ static StatefulServiceProvider generateService( .put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000) .put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000); if (sslToKafka) { - serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.SSL.name); + serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name); serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java index a81845078e0..492aa495661 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java @@ -23,6 +23,7 @@ import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.DEFAULT_PARENT_DATA_CENTER_REGION_NAME; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import java.io.File; @@ -38,7 +39,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -288,8 +288,8 @@ private static Map> addKafkaClusterIDMappingToServer List regionNames, List kafkaBrokers) { if (serverProperties.isPresent()) { - SecurityProtocol baseSecurityProtocol = SecurityProtocol - .valueOf(serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name)); + PubSubSecurityProtocol baseSecurityProtocol = PubSubSecurityProtocol + .valueOf(serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name)); Map> kafkaClusterMap = new HashMap<>(); Map mapping; @@ -297,21 +297,21 @@ private static Map> addKafkaClusterIDMappingToServer mapping = new HashMap<>(); int clusterId = i - 1; mapping.put(KAFKA_CLUSTER_MAP_KEY_NAME, regionNames.get(clusterId)); - SecurityProtocol securityProtocol = baseSecurityProtocol; + PubSubSecurityProtocol securityProtocol = baseSecurityProtocol; if (clusterId > 0) { // Testing mixed security on any 2-layer setup with 2 or more DCs. - securityProtocol = SecurityProtocol.SSL; + securityProtocol = PubSubSecurityProtocol.SSL; } mapping.put(KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL, securityProtocol.name); // N.B. the first Kafka broker in the list is the parent, which we're excluding from the mapping, so this // is why the index here is offset by 1 compared to the cluster ID. PubSubBrokerWrapper pubSubBrokerWrapper = kafkaBrokers.get(i); - String kafkaAddress = securityProtocol == SecurityProtocol.SSL + String kafkaAddress = securityProtocol == PubSubSecurityProtocol.SSL ? pubSubBrokerWrapper.getSSLAddress() : pubSubBrokerWrapper.getAddress(); mapping.put(KAFKA_CLUSTER_MAP_KEY_URL, kafkaAddress); - String otherKafkaAddress = securityProtocol == SecurityProtocol.PLAINTEXT + String otherKafkaAddress = securityProtocol == PubSubSecurityProtocol.PLAINTEXT ? pubSubBrokerWrapper.getSSLAddress() : pubSubBrokerWrapper.getAddress(); mapping.put(KAFKA_CLUSTER_MAP_KEY_OTHER_URLS, otherKafkaAddress); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java index 31666db60f0..ee047cb0cd0 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java @@ -78,6 +78,7 @@ import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.meta.ReadStrategy; import com.linkedin.venice.meta.RoutingStrategy; +import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.pushmonitor.LeakedPushStatusCleanUpService; import com.linkedin.venice.pushmonitor.PushMonitorType; import com.linkedin.venice.utils.KafkaSSLUtils; @@ -89,7 +90,6 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -350,7 +350,7 @@ private void initFieldsWithProperties(VeniceProperties props) { } helixSendMessageTimeoutMilliseconds = props.getInt(HELIX_SEND_MESSAGE_TIMEOUT_MS, 10000); - kafkaSecurityProtocol = props.getString(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name()); + kafkaSecurityProtocol = props.getString(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name()); if (!KafkaSSLUtils.isKafkaProtocolValid(kafkaSecurityProtocol)) { throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocol); }