Skip to content

Commit

Permalink
Changed Kafka from LI's 2.4.1.65 to Apache's 2.4.1
Browse files Browse the repository at this point in the history
Several code changes to make it work.

Introduced a new PubSubSecurityProtocol to replace all usage of
Kafka's SecurityProtocol enum, since that one has a different
package name between the Apache and LinkedIn forks of Kafka.

AK uses: org.apache.kafka.common.security.auth.SecurityProtocol
While LI uses: org.apache.kafka.common.protocol.SecurityProtocol

TODO: Decide what to do about the producer flush API not having
support for passing a time out in Apache Kafka.
  • Loading branch information
FelixGV committed May 22, 2024
1 parent 2720a83 commit 3b85901
Show file tree
Hide file tree
Showing 17 changed files with 67 additions and 68 deletions.
13 changes: 7 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ if (project.hasProperty('overrideBuildEnvironment')) {
def avroVersion = '1.10.2'
def avroUtilVersion = '0.3.21'
def grpcVersion = '1.49.2'
def kafkaGroup = 'com.linkedin.kafka'
def kafkaVersion = '2.4.1.65'
def kafkaGroup = 'org.apache.kafka' // 'com.linkedin.kafka'
def kafkaVersion = '2.4.1' // '2.4.1.65'
def log4j2Version = '2.17.1'
def pegasusVersion = '29.31.0'
def protobufVersion = '3.21.7'
Expand All @@ -52,14 +52,15 @@ def alpnAgentVersion = '2.0.10'
def hadoopVersion = '2.10.2'
def apacheSparkVersion = '3.3.3'
def antlrVersion = '4.8'
def scala = '2.12'

ext.libraries = [
alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}",
antlr4: "org.antlr:antlr4:${antlrVersion}",
antlr4Runtime: "org.antlr:antlr4-runtime:${antlrVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_2.12:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_2.12:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_2.12:${apacheSparkVersion}",
apacheSparkAvro: "org.apache.spark:spark-avro_${scala}:${apacheSparkVersion}",
apacheSparkCore: "org.apache.spark:spark-core_${scala}:${apacheSparkVersion}",
apacheSparkSql: "org.apache.spark:spark-sql_${scala}:${apacheSparkVersion}",
avro: "org.apache.avro:avro:${avroVersion}",
avroCompiler: "org.apache.avro:avro-compiler:${avroVersion}",
avroMapred: "org.apache.avro:avro-mapred:${avroVersion}",
Expand Down Expand Up @@ -101,7 +102,7 @@ ext.libraries = [
jna: 'net.java.dev.jna:jna:4.5.1',
jsr305: 'com.google.code.findbugs:jsr305:3.0.2',
joptSimple: 'net.sf.jopt-simple:jopt-simple:3.2',
kafka: "${kafkaGroup}:kafka_2.12:${kafkaVersion}",
kafka: "${kafkaGroup}:kafka_${scala}:${kafkaVersion}",
kafkaClients: "${kafkaGroup}:kafka-clients:${kafkaVersion}",
kafkaClientsTest: "${kafkaGroup}:kafka-clients:${kafkaVersion}:test",
log4j2api: "org.apache.logging.log4j:log4j-api:${log4j2Version}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.linkedin.venice.exceptions.ConfigurationException;
import com.linkedin.venice.exceptions.UndefinedPropertyException;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.VeniceProperties;
Expand All @@ -44,7 +45,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -89,8 +89,8 @@ public class VeniceClusterConfig {

private final VeniceProperties clusterProperties;

private final SecurityProtocol kafkaSecurityProtocol;
private final Map<String, SecurityProtocol> kafkaBootstrapUrlToSecurityProtocol;
private final PubSubSecurityProtocol kafkaSecurityProtocol;
private final Map<String, PubSubSecurityProtocol> kafkaBootstrapUrlToSecurityProtocol;
private final Optional<SSLConfig> sslConfig;

public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String, String>> kafkaClusterMap)
Expand Down Expand Up @@ -135,17 +135,17 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
LOGGER.info("Final region name for this node: {}", this.regionName);

String kafkaSecurityProtocolString =
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name());
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name());
if (!KafkaSSLUtils.isKafkaProtocolValid(kafkaSecurityProtocolString)) {
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
this.kafkaSecurityProtocol = SecurityProtocol.forName(kafkaSecurityProtocolString);
this.kafkaSecurityProtocol = PubSubSecurityProtocol.forName(kafkaSecurityProtocolString);

Int2ObjectMap<String> tmpKafkaClusterIdToUrlMap = new Int2ObjectOpenHashMap<>();
Object2IntMap<String> tmpKafkaClusterUrlToIdMap = new Object2IntOpenHashMap<>();
Int2ObjectMap<String> tmpKafkaClusterIdToAliasMap = new Int2ObjectOpenHashMap<>();
Object2IntMap<String> tmpKafkaClusterAliasToIdMap = new Object2IntOpenHashMap<>();
Map<String, SecurityProtocol> tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>();
Map<String, PubSubSecurityProtocol> tmpKafkaBootstrapUrlToSecurityProtocol = new HashMap<>();
Map<String, String> tmpKafkaUrlResolution = new HashMap<>();

boolean foundBaseKafkaUrlInMappingIfItIsPopulated = kafkaClusterMap.isEmpty();
Expand All @@ -167,7 +167,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
tmpKafkaClusterUrlToIdMap.put(url, clusterId);
tmpKafkaUrlResolution.put(url, url);
if (securityProtocolString != null) {
tmpKafkaBootstrapUrlToSecurityProtocol.put(url, SecurityProtocol.valueOf(securityProtocolString));
tmpKafkaBootstrapUrlToSecurityProtocol.put(url, PubSubSecurityProtocol.valueOf(securityProtocolString));
}
}
if (baseKafkaBootstrapServers.equals(url)) {
Expand Down Expand Up @@ -228,7 +228,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
if (KafkaSSLUtils.isKafkaSSLProtocol(kafkaSecurityProtocolString)
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(SecurityProtocol.SSL)) {
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(PubSubSecurityProtocol.SSL)) {
this.sslConfig = Optional.of(new SSLConfig(clusterProps));
} else {
this.sslConfig = Optional.empty();
Expand Down Expand Up @@ -260,8 +260,8 @@ public String getKafkaBootstrapServers() {
return kafkaBootstrapServers;
}

public SecurityProtocol getKafkaSecurityProtocol(String kafkaBootstrapUrl) {
SecurityProtocol clusterSpecificSecurityProtocol = kafkaBootstrapUrlToSecurityProtocol.get(kafkaBootstrapUrl);
public PubSubSecurityProtocol getKafkaSecurityProtocol(String kafkaBootstrapUrl) {
PubSubSecurityProtocol clusterSpecificSecurityProtocol = kafkaBootstrapUrlToSecurityProtocol.get(kafkaBootstrapUrl);
return clusterSpecificSecurityProtocol == null ? kafkaSecurityProtocol : clusterSpecificSecurityProtocol;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.manager.TopicManagerContext;
Expand Down Expand Up @@ -115,7 +116,6 @@
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -1108,7 +1108,7 @@ private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String kafkaBoot
kafkaBootstrapUrls = resolvedKafkaUrl;
}
properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapUrls);
SecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
PubSubSecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
if (KafkaSSLUtils.isKafkaSSLProtocol(securityProtocol)) {
Optional<SSLConfig> sslConfig = serverConfig.getSslConfig();
if (!sslConfig.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaEntry;
Expand All @@ -60,7 +61,6 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -124,7 +124,7 @@ private void setupMockConfig() {
doReturn(VeniceProperties.empty()).when(mockVeniceServerConfig).getKafkaConsumerConfigsForLocalConsumption();
doReturn(getConsumerAssignmentStrategy()).when(mockVeniceServerConfig).getSharedConsumerAssignmentStrategy();
doReturn(1).when(mockVeniceServerConfig).getConsumerPoolSizePerKafkaCluster();
doReturn(SecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl);
doReturn(PubSubSecurityProtocol.PLAINTEXT).when(mockVeniceServerConfig).getKafkaSecurityProtocol(dummyKafkaUrl);
doReturn(10).when(mockVeniceServerConfig).getKafkaMaxPollRecords();
doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherConsumerPoolSize();
doReturn(2).when(mockVeniceServerConfig).getTopicManagerMetadataFetcherThreadPoolSize();
Expand Down
2 changes: 2 additions & 0 deletions clients/venice-admin-tool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ dependencies {
// Helix use zk 3.6.9, which introduce netty 3.10 and will fail our test.
exclude module: 'zookeeper'
}
implementation libraries.zstd

testImplementation project(':internal:venice-common').sourceSets.test.output
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
Expand Down Expand Up @@ -151,7 +150,10 @@ public void close(int closeTimeOutMs, boolean doFlush) {
}
if (doFlush) {
// Flush out all the messages in the producer buffer
producer.flush(closeTimeOutMs, TimeUnit.MILLISECONDS);
producer.flush();

// N.B.: The LI Kafka fork supports passing a timeout to flush, but not Apache Kafka...
// producer.flush(closeTimeOutMs, TimeUnit.MILLISECONDS);
LOGGER.info("Flushed all the messages in producer before closing");
}
producer.close(Duration.ofMillis(closeTimeOutMs));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.linkedin.venice.utils;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;


public class KafkaSSLUtils {
Expand All @@ -26,26 +26,20 @@ public class KafkaSSLUtils {
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);

/**
* Right now, Venice only supports two Kafka protocols:
* {@link SecurityProtocol#PLAINTEXT}
* {@link SecurityProtocol#SSL}
*
* @param kafkaProtocol
* @return
*/
public static boolean isKafkaProtocolValid(String kafkaProtocol) {
return kafkaProtocol.equals(SecurityProtocol.PLAINTEXT.name()) || kafkaProtocol.equals(SecurityProtocol.SSL.name())
|| kafkaProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name())
|| kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name());
return kafkaProtocol.equals(PubSubSecurityProtocol.PLAINTEXT.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_PLAINTEXT.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name());
}

public static boolean isKafkaSSLProtocol(String kafkaProtocol) {
return kafkaProtocol.equals(SecurityProtocol.SSL.name()) || kafkaProtocol.equals(SecurityProtocol.SASL_SSL.name());
return kafkaProtocol.equals(PubSubSecurityProtocol.SSL.name())
|| kafkaProtocol.equals(PubSubSecurityProtocol.SASL_SSL.name());
}

public static boolean isKafkaSSLProtocol(SecurityProtocol kafkaProtocol) {
return kafkaProtocol == SecurityProtocol.SSL || kafkaProtocol == SecurityProtocol.SASL_SSL;
public static boolean isKafkaSSLProtocol(PubSubSecurityProtocol kafkaProtocol) {
return kafkaProtocol == PubSubSecurityProtocol.SSL || kafkaProtocol == PubSubSecurityProtocol.SASL_SSL;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -192,7 +193,7 @@ public void testDeleteTopicValidTopicDeletion() throws Exception {
DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class);
KafkaFuture<Void> topicDeletionFutureMock = mock(KafkaFuture.class);

when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock);
when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock);
when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock);
when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS))).thenReturn(null);

Expand All @@ -208,7 +209,7 @@ public void testDeleteTopicThrowsException() throws Exception {
DeleteTopicsResult deleteTopicsResultMock = mock(DeleteTopicsResult.class);
KafkaFuture<Void> topicDeletionFutureMock = mock(KafkaFuture.class);

when(internalKafkaAdminClientMock.deleteTopics(any())).thenReturn(deleteTopicsResultMock);
when(internalKafkaAdminClientMock.deleteTopics(any(Collection.class))).thenReturn(deleteTopicsResultMock);
when(deleteTopicsResultMock.all()).thenReturn(topicDeletionFutureMock);

when(topicDeletionFutureMock.get(eq(1000L), eq(TimeUnit.MILLISECONDS)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import static org.testng.Assert.*;

import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.testng.annotations.Test;


Expand All @@ -20,15 +20,15 @@ public class ApacheKafkaAdminConfigTest {

@Test
public void testSetupSaslInKafkaAdminPlaintext() {
testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_PLAINTEXT);
testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_PLAINTEXT);
}

@Test
public void testSetupSaslInKafkaAdminSSL() {
testSetupSaslInKafkaAdmin(SecurityProtocol.SASL_SSL);
testSetupSaslInKafkaAdmin(PubSubSecurityProtocol.SASL_SSL);
}

private void testSetupSaslInKafkaAdmin(SecurityProtocol securityProtocol) {
private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) {
Properties properties = new Properties();
properties.put("cluster.name", "cluster");
properties.put("zookeeper.address", "localhost:2181");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.venice.pubsub.adapter.kafka.producer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -33,7 +32,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -140,10 +138,10 @@ public void testSendMessageInteractionWithInternalProducer() {

@Test
public void testCloseInvokesProducerFlushAndClose() {
doNothing().when(kafkaProducerMock).flush(anyLong(), any(TimeUnit.class));
doNothing().when(kafkaProducerMock).flush();
ApacheKafkaProducerAdapter producerAdapter = new ApacheKafkaProducerAdapter(producerConfigMock, kafkaProducerMock);
producerAdapter.close(10, true);
verify(kafkaProducerMock, times(1)).flush(anyLong(), any(TimeUnit.class));
verify(kafkaProducerMock, times(1)).flush();
verify(kafkaProducerMock, times(1)).close(any(Duration.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static org.testng.Assert.*;

import org.apache.kafka.common.protocol.SecurityProtocol;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import org.testng.annotations.Test;


Expand All @@ -25,9 +25,9 @@ public void testIsKafkaSSLProtocol() {

@Test
public void testTestIsKafkaSSLProtocol() {
assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SSL));
assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.PLAINTEXT));
assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SASL_SSL));
assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(SecurityProtocol.SASL_PLAINTEXT));
assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SSL));
assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.PLAINTEXT));
assertTrue(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SASL_SSL));
assertFalse(KafkaSSLUtils.isKafkaSSLProtocol(PubSubSecurityProtocol.SASL_PLAINTEXT));
}
}
3 changes: 2 additions & 1 deletion internal/venice-test-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ configurations {
}
}
implementation {
exclude group: 'org.apache.kafka'
exclude group: 'org.mortbay.jetty', module: 'servlet-api'
}
integrationTestImplementation.extendsFrom testImplementation
Expand Down Expand Up @@ -96,6 +95,7 @@ dependencies {
implementation libraries.samzaApi
implementation libraries.spark
implementation libraries.testng
implementation libraries.zstd

implementation (libraries.mapreduceClientJobClient) {
exclude group: 'org.apache.avro'
Expand All @@ -106,6 +106,7 @@ dependencies {
testImplementation project(':internal:venice-common').sourceSets.test.output
testImplementation libraries.log4j2core
testImplementation libraries.log4j2api
testImplementation libraries.kafkaClients

jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:' + jmh.jmhVersion.get()
jmhImplementation project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils')
Expand Down
Loading

0 comments on commit 3b85901

Please sign in to comment.