Skip to content

Commit

Permalink
[all] Changed Kafka from LI's 2.4.1.78 to Apache's 2.4.1 (#1287)
Browse files Browse the repository at this point in the history
This is a second attempt at the change from PR #1000.

Several code changes to make it work.

Introduced a new PubSubSecurityProtocol to replace all usage of
Kafka's SecurityProtocol enum, since that one has a different
package name between the Apache and LinkedIn forks of Kafka.

AK uses: org.apache.kafka.common.security.auth.SecurityProtocol
While LI uses: org.apache.kafka.common.protocol.SecurityProtocol
  • Loading branch information
FelixGV authored Nov 6, 2024
1 parent ae6f5d1 commit aeade7e
Show file tree
Hide file tree
Showing 18 changed files with 186 additions and 70 deletions.
16 changes: 10 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ if (project.hasProperty('overrideBuildEnvironment')) {
def avroVersion = '1.10.2'
def avroUtilVersion = '0.3.21'
def grpcVersion = '1.49.2'
def kafkaGroup = 'com.linkedin.kafka'
def kafkaVersion = '2.4.1.78'
// N.B.: The build should also work when substituting Kafka from the Apache fork to LinkedIn's fork:
def kafkaGroup = 'org.apache.kafka' // 'com.linkedin.kafka'
def kafkaVersion = '2.4.1' // '2.4.1.65'
def log4j2Version = '2.17.1'
def pegasusVersion = '29.31.0'
def protobufVersion = '3.21.7'
Expand All @@ -52,14 +53,15 @@ def alpnAgentVersion = '2.0.10'
def hadoopVersion = '2.10.2'
def apacheSparkVersion = '3.3.3'
def antlrVersion = '4.8'
def scala = '2.12'

ext.libraries = [
alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}",
antlr4: "org.antlr:antlr4:${antlrVersion}",
antlr4Runtime: "org.antlr:antlr4-runtime:${antlrVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_2.12:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_2.12:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_2.12:${apacheSparkVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_${scala}:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_${scala}:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_${scala}:${apacheSparkVersion}",
avro: "org.apache.avro:avro:${avroVersion}",
avroCompiler: "org.apache.avro:avro-compiler:${avroVersion}",
avroMapred: "org.apache.avro:avro-mapred:${avroVersion}",
Expand Down Expand Up @@ -101,7 +103,7 @@ ext.libraries = [
jna: 'net.java.dev.jna:jna:4.5.1',
jsr305: 'com.google.code.findbugs:jsr305:3.0.2',
joptSimple: 'net.sf.jopt-simple:jopt-simple:3.2',
kafka: "${kafkaGroup}:kafka_2.12:${kafkaVersion}",
kafka: "${kafkaGroup}:kafka_${scala}:${kafkaVersion}",
kafkaClients: "${kafkaGroup}:kafka-clients:${kafkaVersion}",
kafkaClientsTest: "${kafkaGroup}:kafka-clients:${kafkaVersion}:test",
log4j2api: "org.apache.logging.log4j:log4j-api:${log4j2Version}",
Expand Down Expand Up @@ -194,6 +196,8 @@ subprojects {

if (JavaVersion.current() >= JavaVersion.VERSION_1_9) {
tasks.withType(JavaCompile) {
// Compiler arguments can be injected here...
// options.compilerArgs << '-Xlint:unchecked'
options.release = 8
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.linkedin.venice.exceptions.ConfigurationException;
import com.linkedin.venice.exceptions.UndefinedPropertyException;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.Utils;
Expand All @@ -45,7 +46,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -90,8 +90,8 @@ public class VeniceClusterConfig {

private final VeniceProperties clusterProperties;

private final SecurityProtocol kafkaSecurityProtocol;
private final Map<String, SecurityProtocol> kafkaBootstrapUrlToSecurityProtocol;
private final PubSubSecurityProtocol kafkaSecurityProtocol;
private final Map<String, PubSubSecurityProtocol> kafkaBootstrapUrlToSecurityProtocol;
private final Optional<SSLConfig> sslConfig;

public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String, String>> kafkaClusterMap)
Expand Down Expand Up @@ -136,17 +136,17 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
LOGGER.info("Final region name for this node: {}", this.regionName);

String kafkaSecurityProtocolString =
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name());
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name());
if (!KafkaSSLUtils.isKafkaProtocolValid(kafkaSecurityProtocolString)) {
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
this.kafkaSecurityProtocol = SecurityProtocol.forName(kafkaSecurityProtocolString);
this.kafkaSecurityProtocol = PubSubSecurityProtocol.forName(kafkaSecurityProtocolString);

Int2ObjectMap<String> tmpKafkaClusterIdToUrlMap = new Int2ObjectOpenHashMap<>();
Object2IntMap<String> tmpKafkaClusterUrlToIdMap = new Object2IntOpenHashMap<>();
Int2ObjectMap<String> tmpKafkaClusterIdToAliasMap = new Int2ObjectOpenHashMap<>();
Object2IntMap<String> tmpKafkaClusterAliasToIdMap = new Object2IntOpenHashMap<>();
Map<String, SecurityProtocol> tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>();
Map<String, PubSubSecurityProtocol> tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>();
Map<String, String> tmpKafkaUrlResolution = new HashMap<>();

boolean foundBaseKafkaUrlInMappingIfItIsPopulated = kafkaClusterMap.isEmpty();
Expand Down Expand Up @@ -183,7 +183,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
tmpKafkaClusterUrlToIdMap.put(url, clusterId);
tmpKafkaUrlResolution.put(url, url);
if (securityProtocolString != null) {
tmpKafkaBootstrapUrlToSecurityProtocol.put(url, SecurityProtocol.valueOf(securityProtocolString));
tmpKafkaBootstrapUrlToSecurityProtocol.put(url, PubSubSecurityProtocol.valueOf(securityProtocolString));
}
}
if (baseKafkaBootstrapServers.equals(url)) {
Expand Down Expand Up @@ -244,7 +244,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
if (KafkaSSLUtils.isKafkaSSLProtocol(kafkaSecurityProtocolString)
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(SecurityProtocol.SSL)) {
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(PubSubSecurityProtocol.SSL)) {
this.sslConfig = Optional.of(new SSLConfig(clusterProps));
} else {
this.sslConfig = Optional.empty();
Expand Down Expand Up @@ -276,8 +276,8 @@ public String getKafkaBootstrapServers() {
return kafkaBootstrapServers;
}

public SecurityProtocol getKafkaSecurityProtocol(String kafkaBootstrapUrl) {
SecurityProtocol clusterSpecificSecurityProtocol = kafkaBootstrapUrlToSecurityProtocol.get(kafkaBootstrapUrl);
public PubSubSecurityProtocol getKafkaSecurityProtocol(String kafkaBootstrapUrl) {
PubSubSecurityProtocol clusterSpecificSecurityProtocol = kafkaBootstrapUrlToSecurityProtocol.get(kafkaBootstrapUrl);
return clusterSpecificSecurityProtocol == null ? kafkaSecurityProtocol : clusterSpecificSecurityProtocol;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.manager.TopicManagerContext;
Expand Down Expand Up @@ -113,7 +114,6 @@
import java.util.function.BooleanSupplier;
import org.apache.avro.Schema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -1119,15 +1119,15 @@ private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String kafkaBoot
kafkaBootstrapUrls = resolvedKafkaUrl;
}
properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapUrls);
SecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
PubSubSecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
if (KafkaSSLUtils.isKafkaSSLProtocol(securityProtocol)) {
Optional<SSLConfig> sslConfig = serverConfig.getSslConfig();
if (!sslConfig.isPresent()) {
throw new VeniceException("SSLConfig should be present when Kafka SSL is enabled");
}
properties.putAll(sslConfig.get().getKafkaSSLConfig());
}
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name);
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
return new VeniceProperties(properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaEntry;
Expand All @@ -63,7 +64,6 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -129,7 +129,7 @@ private void setupMockConfig() {
doReturn(VeniceProperties.empty()).when(mockVeniceServerConfig).getKafkaConsumerConfigsForLocalConsumption();
doReturn(getConsumerAssignmentStrategy()).when(mockVeniceServerConfig).getSharedConsumerAssignmentStrategy();
doReturn(1).when(mockVeniceServerConfig).getConsumerPoolSizePerKafkaCluster();
doReturn(SecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl);
doReturn(PubSubSecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl);
doReturn(10).when(mockVeniceServerConfig).getKafkaMaxPollRecords();
doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherConsumerPoolSize();
doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherThreadPoolSize();
Expand Down
2 changes: 2 additions & 0 deletions clients/venice-admin-tool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ dependencies {
exclude group: 'org.apache.helix'
}
implementation('org.apache.helix:metrics-common:1.4.1:jdk8')
implementation libraries.zstd

testImplementation project(':internal:venice-common').sourceSets.test.output
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.venice.pubsub.api;

import java.util.Locale;


/**
* This enum is equivalent to Kafka's SecurityProtocol enum.
*
* We need this abstraction because Kafka's enum is present in two different namespaces, which are different between
* LinkedIn's fork and the Apache fork.
*/
public enum PubSubSecurityProtocol {
/** Un-authenticated, non-encrypted channel */
PLAINTEXT,
/** SSL channel */
SSL,
/** SASL authenticated, non-encrypted channel */
SASL_PLAINTEXT,
/** SASL authenticated, SSL channel */
SASL_SSL;

public static PubSubSecurityProtocol forName(String name) {
return PubSubSecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.linkedin.venice.utils;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;


public class KafkaSSLUtils {
Expand All @@ -26,26 +26,20 @@ public class KafkaSSLUtils {
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);

/**
* Right now, Venice only supports two Kafka protocols:
* {@link SecurityProtocol#PLAINTEXT}
* {@link SecurityProtocol#SSL}
*
* @param kafkaProtocol
* @return
*/
public static boolean isKafkaProtocolValid(String kafkaProtocol) {
return kafkaProtocol.equals(SecurityProtocol.PLAINTEXT.name()) || kafkaProtocol.equals(SecurityProtocol.SSL.name())
|| kafkaProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name())
|| kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name());
return kafkaProtocol.equals(PubSubSecurityProtocol.PLAINTEXT.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_PLAINTEXT.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name());
}

public static boolean isKafkaSSLProtocol(String kafkaProtocol) {
return kafkaProtocol.equals(SecurityProtocol.SSL.name()) || kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name());
return kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name());
}

public static boolean isKafkaSSLProtocol(SecurityProtocol kafkaProtocol) {
return kafkaProtocol == SecurityProtocol.SSL || kafkaProtocol == SecurityProtocol.SASL_SSL;
public static boolean isKafkaSSLProtocol(PubSubSecurityProtocol kafkaProtocol) {
return kafkaProtocol == PubSubSecurityProtocol.SSL || kafkaProtocol == PubSubSecurityProtocol.SASL_SSL;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -192,7 +193,7 @@ public void testDeleteTopicValidTopicDeletion() throws Exception {
DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class);
KafkaFuture<Void> topicDeletionFutureMock = mock(KafkaFuture.class);

when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock);
when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock);
when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock);
when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS))).thenReturn(null);

Expand All @@ -208,7 +209,7 @@ public void testDeleteTopicThrowsException() throws Exception {
DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class);
KafkaFuture<Void> topicDeletionFutureMock = mock(KafkaFuture.class);

when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock);
when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock);
when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock);

when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import static org.testng.Assert.*;

import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.testng.annotations.Test;


Expand All @@ -20,23 +20,23 @@ public class ApacheKafkaAdminConfigTest {

@Test
public void testSetupSaslInKafkaAdminPlaintext() {
testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_PLAINTEXT);
testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_PLAINTEXT);
}

@Test
public void testSetupSaslInKafkaAdminSSL() {
testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_SSL);
testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_SSL);
}

private void testSetupSaslInKafkaAdmin(SecurityProtocol securityProtocol) {
private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) {
Properties properties = new Properties();
properties.put("cluster.name", "cluster");
properties.put("zookeeper.address", "localhost:2181");
properties.put("kafka.bootstrap.servers", "localhost:9092");
properties.put("kafka.sasl.jaas.config", SASL_JAAS_CONFIG);
properties.put("kafka.sasl.mechanism", SASL_MECHANISM);
properties.put("kafka.security.protocol", securityProtocol.name);
if (securityProtocol.name.contains("SSL")) {
properties.put("kafka.security.protocol", securityProtocol.name());
if (securityProtocol.name().contains("SSL")) {
properties.put("ssl.truststore.location", "-");
properties.put("ssl.truststore.password", "");
properties.put("ssl.truststore.type", "JKS");
Expand All @@ -49,7 +49,7 @@ private void testSetupSaslInKafkaAdmin(SecurityProtocol securityProtocol) {
Properties adminProperties = serverConfig.getAdminProperties();
assertEquals(SASL_JAAS_CONFIG, adminProperties.get("sasl.jaas.config"));
assertEquals(SASL_MECHANISM, adminProperties.get("sasl.mechanism"));
assertEquals(securityProtocol.name, adminProperties.get("security.protocol"));
assertEquals(securityProtocol.name(), adminProperties.get("security.protocol"));
}

@Test
Expand Down
Loading

0 comments on commit aeade7e

Please sign in to comment.