diff --git a/bin/kafka-kernel-tls-perf-test.sh b/bin/kafka-kernel-tls-perf-test.sh
deleted file mode 100755
index b89076f31524f..0000000000000
--- a/bin/kafka-kernel-tls-perf-test.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
- export KAFKA_HEAP_OPTS="-Xmx512M"
-fi
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.KernelTLSBenchmark "$@"
diff --git a/build.gradle b/build.gradle
index 921b882047398..b3b610d5c4c6f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -57,11 +57,8 @@ spotless {
allprojects {
repositories {
- mavenLocal()
mavenCentral()
maven { url "https://linkedin.jfrog.io/artifactory/zookeeper" }
- maven { url "https://linkedin.jfrog.io/artifactory/ktls-jni" }
-
}
dependencyUpdates {
@@ -1247,7 +1244,6 @@ project(':clients') {
implementation libs.snappy
implementation libs.slf4jApi
implementation libs.conscrypt
- implementation libs.ktls
compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
compileOnly libs.jacksonJDK8Datatypes
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 01182b44a2c36..0566fcf4e40b2 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -58,7 +58,6 @@
-
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index affa614fb614c..e986583cb0b82 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -51,10 +51,6 @@ public class SslConfigs {
public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";
- public static final String SSL_KERNEL_OFFLOAD_ENABLE_CONFIG = "ssl.kernel.offload.enable";
- public static final String SSL_KERNEL_OFFLOAD_ENABLE_DOC = "ssl.kernel.offload.enable";
- public static final boolean DEFAULT_SSL_KERNEL_OFFLOAD_ENABLE = false;
-
public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
public static final String SSL_CIPHER_SUITES_DOC = "A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. "
+ "By default all the available cipher suites are supported.";
@@ -144,7 +140,6 @@ public static void addClientSslSupport(ConfigDef config) {
config.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
.define(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_DOC)
.define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC)
- .define(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, SslConfigs.DEFAULT_SSL_KERNEL_OFFLOAD_ENABLE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC)
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC)
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
.define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
@@ -165,7 +160,6 @@ public static void addClientSslSupport(ConfigDef config) {
}
public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet(
- SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG,
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 94123f8bcef50..8b390d11bc6dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -257,7 +257,7 @@ protected TransportLayer buildTransportLayer(String id, SelectionKey key, Socket
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
return SslTransportLayer.create(id, key,
sslFactory.createSslEngine(socketChannel.socket()),
- metadataRegistry, false);
+ metadataRegistry);
} else {
return new PlaintextTransportLayer(key);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 9056f23ae5df0..4dabf0a15be86 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -45,7 +45,6 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
private final boolean isInterBrokerListener;
private SslFactory sslFactory;
private Mode mode;
- private boolean isKernelOffloadEnabled;
private Map configs;
private SslPrincipalMapper sslPrincipalMapper;
private final Logger log;
@@ -72,7 +71,6 @@ public void configure(Map configs) throws KafkaException {
sslPrincipalMapper = SslPrincipalMapper.fromRules(sslPrincipalMappingRules);
this.sslFactory = new SslFactory(mode, null, isInterBrokerListener);
this.sslFactory.configure(this.configs);
- this.isKernelOffloadEnabled = (Boolean) configs.get(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG);
} catch (KafkaException e) {
throw e;
} catch (Exception e) {
@@ -92,7 +90,6 @@ public void validateReconfiguration(Map configs) {
@Override
public void reconfigure(Map configs) {
- isKernelOffloadEnabled = (Boolean) configs.get(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG);
sslFactory.reconfigure(configs);
}
@@ -121,14 +118,10 @@ public void close() {
if (sslFactory != null) sslFactory.close();
}
- private boolean shouldEnableKernelOffload() {
- return isKernelOffloadEnabled && mode == Mode.SERVER;
- }
-
protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, ChannelMetadataRegistry metadataRegistry) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
return SslTransportLayer.create(id, key, sslFactory.createSslEngine(socketChannel.socket()),
- metadataRegistry, shouldEnableKernelOffload());
+ metadataRegistry);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 04d251a6eec6a..6aba13750e04c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.common.network;
-import com.linkedin.ktls.KTLSEnableFailedException;
-import com.linkedin.ktls.KernelTls;
import java.io.IOException;
import java.io.EOFException;
import java.nio.ByteBuffer;
@@ -81,8 +79,6 @@ private enum State {
private final SocketChannel socketChannel;
private final ChannelMetadataRegistry metadataRegistry;
private final Logger log;
- private final KernelTls kernelTLS;
- private boolean shouldAttemptKtls;
private HandshakeStatus handshakeStatus;
private SSLEngineResult handshakeResult;
@@ -93,31 +89,24 @@ private enum State {
private ByteBuffer appReadBuffer;
private ByteBuffer fileChannelBuffer;
private boolean hasBytesBuffered;
- private boolean ktlsAttempted;
- private boolean ktlsEnabled;
public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine,
- ChannelMetadataRegistry metadataRegistry, boolean shouldAttemptKtls)
- throws IOException {
- return new SslTransportLayer(channelId, key, sslEngine, metadataRegistry, shouldAttemptKtls);
+ ChannelMetadataRegistry metadataRegistry) throws IOException {
+ return new SslTransportLayer(channelId, key, sslEngine, metadataRegistry);
}
// Prefer `create`, only use this in tests
- SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, ChannelMetadataRegistry metadataRegistry,
- boolean shouldAttemptKtls) {
+ SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,
+ ChannelMetadataRegistry metadataRegistry) {
this.channelId = channelId;
this.key = key;
this.socketChannel = (SocketChannel) key.channel();
this.sslEngine = sslEngine;
- this.shouldAttemptKtls = shouldAttemptKtls;
- this.kernelTLS = new KernelTls();
this.state = State.NOT_INITIALIZED;
this.metadataRegistry = metadataRegistry;
- final LogContext logContext = new LogContext("[SslTransportLayer channelId=" + channelId + " key=" + key + "] ");
+ final LogContext logContext = new LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId, key));
this.log = logContext.logger(getClass());
-
- log.debug("New SSL channel created with kernel offload turned {}", shouldAttemptKtls ? "on" : "off");
}
// Visible for testing
@@ -191,10 +180,6 @@ public void close() throws IOException {
state = State.CLOSING;
sslEngine.closeOutbound();
try {
- if (ktlsEnabled) {
- kernelTLS.closeNotify(socketChannel);
- return;
- }
if (prevState != State.NOT_INITIALIZED && isConnected()) {
if (!flush(netWriteBuffer)) {
throw new IOException("Remaining data in the network buffer, can't send SSL close message.");
@@ -422,7 +407,7 @@ private void doHandshake() throws IOException {
handshakeFinished();
break;
default:
- throw new IllegalStateException("Unexpected status [" + handshakeStatus + "]");
+ throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus));
}
}
@@ -710,12 +695,6 @@ public int write(ByteBuffer src) throws IOException {
throw closingException();
if (!ready())
return 0;
- if (shouldAttemptKtls && !ktlsAttempted) {
- attemptToEnableKernelTls();
- }
- if (ktlsEnabled) {
- return writeKernelTLS(src);
- }
int written = 0;
while (flush(netWriteBuffer) && src.hasRemaining()) {
@@ -745,24 +724,6 @@ public int write(ByteBuffer src) throws IOException {
return written;
}
- private int writeKernelTLS(ByteBuffer src) throws IOException {
- log.trace("Writing with Kernel TLS enabled");
- return socketChannel.write(src);
- }
-
- private void attemptToEnableKernelTls() {
- try {
- kernelTLS.enableKernelTlsForSend(sslEngine, socketChannel);
- log.debug("Kernel TLS enabled on socket on channel {}", channelId);
- ktlsEnabled = true;
- } catch (KTLSEnableFailedException e) {
- log.warn("Attempt to enable KTLS failed with exception, falling back to userspace encryption", e);
- ktlsEnabled = false;
- } finally {
- ktlsAttempted = true;
- }
- }
-
/**
* Writes a sequence of bytes to this channel from the subsequence of the given buffers.
*
@@ -774,12 +735,6 @@ private void attemptToEnableKernelTls() {
*/
@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
- if (shouldAttemptKtls && !ktlsAttempted) {
- attemptToEnableKernelTls();
- }
- if (ktlsEnabled) {
- return writeKernelTLS(srcs, offset, length);
- }
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
int totalWritten = 0;
@@ -802,11 +757,6 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
return totalWritten;
}
- private long writeKernelTLS(ByteBuffer[] srcs, int offset, int length) throws IOException {
- log.trace("Writing with Kernel TLS enabled");
- return socketChannel.write(srcs, offset, length);
- }
-
/**
* Writes a sequence of bytes to this channel from the given buffers.
*
@@ -1004,12 +954,6 @@ public long transferFrom(FileChannel fileChannel, long position, long count) thr
throw closingException();
if (state != State.READY)
return 0;
- if (shouldAttemptKtls && !ktlsAttempted) {
- attemptToEnableKernelTls();
- }
- if (ktlsEnabled) {
- return transferFromWithKernelTLS(fileChannel, position, count);
- }
if (!flush(netWriteBuffer))
return 0;
@@ -1065,10 +1009,4 @@ public long transferFrom(FileChannel fileChannel, long position, long count) thr
throw e;
}
}
-
- private long transferFromWithKernelTLS(
- FileChannel fileChannel, long position, long count) throws IOException {
- log.trace("Transferring from file with Kernel TLS enabled");
- return fileChannel.transferTo(position, count, socketChannel);
- }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
index a870cc55ae47b..f921c679a6a25 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.security.ssl;
-import com.linkedin.ktls.KernelTls;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.SslConfigs;
@@ -75,8 +74,6 @@ public final class DefaultSslEngineFactory implements SslEngineFactory {
private static final Logger log = LoggerFactory.getLogger(DefaultSslEngineFactory.class);
public static final String PEM_TYPE = "PEM";
- private final KernelTls kernelTls = new KernelTls();
-
private Map configs;
private SslContextProvider sslContextProvider;
private String kmfAlgorithm;
@@ -88,8 +85,6 @@ public final class DefaultSslEngineFactory implements SslEngineFactory {
private SecureRandom secureRandomImplementation;
private SSLContext sslContext;
private SslClientAuth sslClientAuth;
- private boolean isKernelOffloadEnabled;
- private List cipherSuitesWithKernelOffload;
@Override
@@ -144,10 +139,6 @@ public void configure(Map configs) {
this.sslContextProvider.configure(configs);
SecurityUtils.addConfiguredSecurityProviders(this.configs);
- this.isKernelOffloadEnabled = (Boolean) configs.get(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG);
- if (isKernelOffloadEnabled) {
- this.cipherSuitesWithKernelOffload = kernelTls.supportedCipherSuites();
- }
List cipherSuitesList = (List) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
if (cipherSuitesList != null && !cipherSuitesList.isEmpty()) {
this.cipherSuites = cipherSuitesList.toArray(new String[0]);
@@ -196,27 +187,9 @@ public SSLContext sslContext() {
return this.sslContext;
}
- private void maybeSetSslEngineCipherSuites(SSLEngine sslEngine) {
- if (cipherSuites != null) {
- sslEngine.setEnabledCipherSuites(cipherSuites);
- } else if (isKernelOffloadEnabled) {
- final String[] cipherSuitesToEnable = sslEngine.getEnabledCipherSuites();
-
- final List reOrderedCipherSuites = new ArrayList<>();
- Arrays.stream(cipherSuitesToEnable)
- .filter(cipherSuitesWithKernelOffload::contains)
- .forEach(reOrderedCipherSuites::add);
- Arrays.stream(cipherSuitesToEnable)
- .filter(cs -> !cipherSuitesWithKernelOffload.contains(cs))
- .forEach(reOrderedCipherSuites::add);
-
- sslEngine.setEnabledCipherSuites(reOrderedCipherSuites.toArray(new String[0]));
- }
- }
-
private SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort, String endpointIdentification) {
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
- maybeSetSslEngineCipherSuites(sslEngine);
+ if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
if (mode == Mode.SERVER) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index f9fa097d363e7..d0cc4cc1e6951 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -204,10 +204,6 @@ public SSLEngine createSslEngine(String peerHost, int peerPort) {
}
}
- public Mode getMode() {
- return mode;
- }
-
/**
* Returns host/IP address of remote host without reverse DNS lookup to be used as the host
* for creating SSL engine. This is used as a hint for session reuse strategy and also for
@@ -483,4 +479,4 @@ void close() {
}
}
}
-}
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 127edffc5f2cb..ca6037d275610 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -113,8 +113,6 @@ public void testConnectionWithCustomKeyManager() throws Exception {
);
sslServerConfigs.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG, testProviderCreator.getClass().getName());
sslServerConfigs.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS);
- sslServerConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
- sslServerConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC, false);
EchoServer server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
server.start();
Time time = new MockTime();
@@ -373,7 +371,7 @@ static class TestSslTransportLayer extends SslTransportLayer {
public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,
ChannelMetadataRegistry metadataRegistry) throws IOException {
- super(channelId, key, sslEngine, metadataRegistry, false);
+ super(channelId, key, sslEngine, metadataRegistry);
transportLayers.put(channelId, this);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index a029cba491776..fe066a83c0439 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -99,7 +99,6 @@ public Args(String tlsProtocol, boolean useInlinePem, TestSslUtils.SSLProvider p
sslConfigOverrides.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocol);
sslConfigOverrides.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList(tlsProtocol));
sslConfigOverrides.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS);
- sslConfigOverrides.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
init();
}
@@ -115,8 +114,6 @@ private void init() throws Exception {
clientCertStores = certBuilder(false, "client", useInlinePem, provider).addHostName("localhost").build();
sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores);
sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores);
- sslClientConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
- sslServerConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
sslServerConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, DefaultSslEngineFactory.class);
sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, DefaultSslEngineFactory.class);
}
@@ -352,11 +349,11 @@ public void testListenerConfigOverride(Args args) throws Exception {
ListenerName clientListenerName = new ListenerName("client");
args.sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
args.sslServerConfigs.put(clientListenerName.configPrefix() + BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
- args.sslServerConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
+
// `client` listener is not configured at this point, so client auth should be required
server = createEchoServer(args, SecurityProtocol.SSL);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
- args.sslClientConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
+
// Connect with client auth should work fine
createSelector(args.sslClientConfigs);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -1070,7 +1067,6 @@ false, securityProtocol, config, null, null, time, new LogContext(),
ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable) serverChannelBuilder;
assertEquals(listenerName, reconfigurableBuilder.listenerName());
reconfigurableBuilder.validateReconfiguration(newKeystoreConfigs);
- newKeystoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
reconfigurableBuilder.reconfigure(newKeystoreConfigs);
// Verify that new client with old truststore fails
@@ -1095,7 +1091,6 @@ false, securityProtocol, config, null, null, time, new LogContext(),
missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "some.keystore.path");
missingStoreConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, new Password("some.keystore.password"));
missingStoreConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("some.key.password"));
- missingStoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "keystore not found");
// Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration
@@ -1140,7 +1135,6 @@ false, securityProtocol, config, null, null, time, new LogContext(),
}
ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable) serverChannelBuilder;
reconfigurableBuilder.validateReconfiguration(newKeystoreConfigs);
- newKeystoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
reconfigurableBuilder.reconfigure(newKeystoreConfigs);
for (String propName : CertStores.TRUSTSTORE_PROPS) {
@@ -1159,7 +1153,6 @@ false, securityProtocol, config, null, null, time, new LogContext(),
for (String propName : CertStores.KEYSTORE_PROPS) {
invalidKeystoreConfigs.put(propName, invalidConfig.get(propName));
}
- invalidKeystoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
verifyInvalidReconfigure(reconfigurableBuilder, invalidKeystoreConfigs, "keystore without existing SubjectAltName");
String node3 = "3";
selector.connect(node3, addr, BUFFER_SIZE, BUFFER_SIZE);
@@ -1198,7 +1191,6 @@ false, securityProtocol, config, null, null, time, new LogContext(),
ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable) serverChannelBuilder;
assertEquals(listenerName, reconfigurableBuilder.listenerName());
reconfigurableBuilder.validateReconfiguration(newTruststoreConfigs);
- newTruststoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
reconfigurableBuilder.reconfigure(newTruststoreConfigs);
// Verify that new client with old truststore fails
@@ -1221,7 +1213,6 @@ false, securityProtocol, config, null, null, time, new LogContext(),
missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "some.truststore.path");
missingStoreConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, new Password("some.truststore.password"));
- missingStoreConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
verifyInvalidReconfigure(reconfigurableBuilder, missingStoreConfigs, "truststore not found");
// Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration
@@ -1396,7 +1387,7 @@ class TestSslTransportLayer extends SslTransportLayer {
private final AtomicInteger numDelayedFlushesRemaining;
public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) {
- super(channelId, key, sslEngine, new DefaultChannelMetadataRegistry(), false);
+ super(channelId, key, sslEngine, new DefaultChannelMetadataRegistry());
this.netReadBufSize = new ResizeableBufferSize(netReadBufSizeOverride);
this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSizeOverride);
this.appBufSize = new ResizeableBufferSize(appBufSizeOverride);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
index 3148b32cb4079..c737abf2ad5b9 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
@@ -204,7 +204,6 @@ public void setUp() {
factory = new DefaultSslEngineFactory();
configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
configs.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS);
- configs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index d7a35700d2151..2b56622a20576 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -184,8 +184,6 @@ public static Map createSslConfig(String keyManagerAlgorithm, St
enabledProtocols.add(tlsProtocol);
sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
sslConfigs.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS);
- sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
- sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC, false);
return sslConfigs;
}
@@ -566,8 +564,6 @@ private Map buildJks() throws IOException, GeneralSecurityExcept
} else {
sslConfigs.put(SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_CONFIG, SimpleSslContextProvider.class.getName());
}
- sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
- sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC, false);
return sslConfigs;
}
@@ -593,8 +589,6 @@ private Map buildPem() throws IOException, GeneralSecurityExcept
sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, certPem);
}
- sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG, false);
- sslConfigs.put(SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC, false);
return sslConfigs;
}
}
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 591fffbb5f03e..c6b8eb5a0e646 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -314,15 +314,6 @@ object RequestChannel extends Logging {
def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = {
endTimeNanos = Time.SYSTEM.nanoseconds
- val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
- val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
- val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - apiLocalCompleteTimeNanos)
- val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
- val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
- val responseSendTimeNs = endTimeNanos - responseDequeueTimeNanos
- val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
- val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-
val fetchMetricNames =
if (header.apiKey == ApiKeys.FETCH) {
val isFromFollower = body[FetchRequest].isFromFollower
@@ -349,7 +340,6 @@ object RequestChannel extends Logging {
m.throttleTimeHist.update(apiThrottleTimeMs)
m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
- m.responseSendTimeNsHist.update(Math.round(responseSendTimeNs))
m.totalTimeHist.update(Math.round(totalTimeMs))
m.totalTimeBucketHist.foreach(_.update(totalTimeMs))
m.requestBytesHist.update(sizeOfBodyInBytes)
@@ -690,7 +680,6 @@ object RequestMetrics {
val ThrottleTimeMs = "ThrottleTimeMs"
val ResponseQueueTimeMs = "ResponseQueueTimeMs"
val ResponseSendTimeMs = "ResponseSendTimeMs"
- val ResponseSendTimeNs = "ResponseSendTimeNs"
val TotalTimeMs = "TotalTimeMs"
val RequestBytes = "RequestBytes"
val ResponseBytes = "ResponseBytes"
@@ -745,7 +734,6 @@ class RequestMetrics(name: String, config: KafkaConfig) extends KafkaMetricsGrou
val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, tags)
// time to send the response to the requester
val responseSendTimeHist = newHistogram(ResponseSendTimeMs, biased = true, tags)
- val responseSendTimeNsHist = newHistogram(ResponseSendTimeNs, biased = true, tags)
val totalTimeHist = newHistogram(TotalTimeMs, biased = true, tags)
// request size in bytes
val requestBytesHist = newHistogram(RequestBytes, biased = true, tags)
@@ -855,7 +843,6 @@ class RequestMetrics(name: String, config: KafkaConfig) extends KafkaMetricsGrou
removeMetric(ResponseQueueTimeMs, tags)
removeMetric(TotalTimeMs, tags)
removeMetric(ResponseSendTimeMs, tags)
- removeMetric(ResponseSendTimeNs, tags)
removeMetric(RequestBytes, tags)
removeMetric(ResponseBytes, tags)
if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name) {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 602a23ff04d61..a1de59ba6182f 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -872,7 +872,6 @@ object DynamicListenerConfig {
KafkaConfig.SslProviderProp,
KafkaConfig.SslCipherSuitesProp,
KafkaConfig.SslEnabledProtocolsProp,
- KafkaConfig.SslKernelOffloadEnableProp,
KafkaConfig.SslKeystoreTypeProp,
KafkaConfig.SslKeystoreLocationProp,
KafkaConfig.SslKeystorePasswordProp,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index eeea8cf750bd8..3ec1dab925eb8 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -280,7 +280,6 @@ object Defaults {
val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL
val SslContextProviderClass = SslConfigs.DEFAULT_SSL_CONTEXT_PROVIDER_CLASS
val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS
- val SslKernelOffloadEnable = SslConfigs.DEFAULT_SSL_KERNEL_OFFLOAD_ENABLE
val SslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
val SslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
val SslKeyManagerAlgorithm = SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM
@@ -660,7 +659,6 @@ object KafkaConfig {
val SslCipherSuitesProp = SslConfigs.SSL_CIPHER_SUITES_CONFIG
val SslEnabledProtocolsProp = SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
val SslKeystoreTypeProp = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG
- val SslKernelOffloadEnableProp = SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG
val SslKeystoreLocationProp = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG
val SslKeystorePasswordProp = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
val SslKeyPasswordProp = SslConfigs.SSL_KEY_PASSWORD_CONFIG
@@ -1126,7 +1124,6 @@ object KafkaConfig {
val SslContextProviderClassDoc = SslConfigs.SSL_CONTEXT_PROVIDER_CLASS_DOC
val SslCipherSuitesDoc = SslConfigs.SSL_CIPHER_SUITES_DOC
val SslEnabledProtocolsDoc = SslConfigs.SSL_ENABLED_PROTOCOLS_DOC
- val SslKernelOffloadEnableDoc = SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_DOC
val SslKeystoreTypeDoc = SslConfigs.SSL_KEYSTORE_TYPE_DOC
val SslKeystoreLocationDoc = SslConfigs.SSL_KEYSTORE_LOCATION_DOC
val SslKeystorePasswordDoc = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC
@@ -1466,7 +1463,6 @@ object KafkaConfig {
.define(PrincipalBuilderClassProp, CLASS, Defaults.DefaultPrincipalSerde, MEDIUM, PrincipalBuilderClassDoc)
.define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc)
.define(SslProviderProp, STRING, null, MEDIUM, SslProviderDoc)
- .define(SslKernelOffloadEnableProp, BOOLEAN, Defaults.SslKernelOffloadEnable, MEDIUM, SslKernelOffloadEnableDoc)
.define(SslContextProviderClassProp, STRING, Defaults.SslContextProviderClass, MEDIUM, SslContextProviderClassDoc)
.define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, MEDIUM, SslEnabledProtocolsDoc)
.define(SslKeystoreTypeProp, STRING, Defaults.SslKeystoreType, MEDIUM, SslKeystoreTypeDoc)
diff --git a/core/src/main/scala/kafka/tools/KernelTLSBenchmark.scala b/core/src/main/scala/kafka/tools/KernelTLSBenchmark.scala
deleted file mode 100644
index b2a71970608cd..0000000000000
--- a/core/src/main/scala/kafka/tools/KernelTLSBenchmark.scala
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package kafka.tools
-
-import com.typesafe.scalalogging.LazyLogging
-import joptsimple.OptionException
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils}
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigOp, ConfigEntry}
-import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.config.{ConfigResource, SslConfigs}
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.utils.Utils
-
-import java.time.Duration
-import java.util
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.AtomicLong
-import java.util.{Properties, UUID}
-import scala.collection.JavaConverters._
-
-object KernelTLSBenchmark extends LazyLogging {
-
- def main(args: Array[String]): Unit = {
- val config = new KernelTLSBenchmarkConfig(args)
- println("Warming up page cache...")
-
- val adminProps = filterProps(config.props, AdminClientConfig.configNames)
- val adminClient = AdminClient.create(adminProps)
- val (partitionCount, leaderIds) = getPartitions(config.topic, adminClient)
-
- config.partitions.foreach(partitions => {
- if (partitions > partitionCount) {
- throw new IllegalArgumentException(
- s"Number of partitions of topic ${config.topic} found to " +
- s"be ${partitionCount}, which is less than $partitions")
- }
- })
-
- val partitionsToConsume: Int = config.partitions match {
- case Some(p) => p
- case None => partitionCount
- }
-
- runConsume(print = false, 1, partitionsToConsume, config)
- val withDisabled = multipleRuns(
- print = true, kernelOffloadEnabled = false, adminClient, partitionsToConsume, leaderIds, config)
- val withEnabled = multipleRuns(
- print = true, kernelOffloadEnabled = true, adminClient, partitionsToConsume, leaderIds, config)
- val gainPercentage = 100.0 * (withEnabled - withDisabled) / withDisabled
- println("Throughput gain percentage = %.2f%%".format(gainPercentage))
- }
-
- private def filterProps(in: Properties, allowedKeys: util.Set[String]): Properties = {
- val out = new Properties()
- val map = in.asScala
- .filter(entry => allowedKeys.contains(entry._1))
- .asJava
- out.putAll(map)
- out
- }
-
- private def getPartitions(topicName: String, adminClient: AdminClient): (Int, Set[Int]) = {
- val result = adminClient.describeTopics(Seq(topicName).asJava).all().get()
- val partitionCount = result.get(topicName).partitions().size()
- val leaderIds = result.get(topicName).partitions().asScala
- .map(tpInfo => tpInfo.leader().id()).toSet
- (partitionCount, leaderIds)
- }
-
- private def setKernelTlsConfig(
- kernelOffloadEnabled: Boolean, adminClient: AdminClient,
- brokerIds: Iterable[Int], sslListenerName: String): Unit = {
- val configKey = s"listener.name.${sslListenerName.toLowerCase}.${SslConfigs.SSL_KERNEL_OFFLOAD_ENABLE_CONFIG}"
- val configValue = if (kernelOffloadEnabled) "true" else "false"
- val configEntry = new ConfigEntry(configKey, configValue)
-
- val configMap = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]
-
- brokerIds.foreach(brokerId => {
- val configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.toString)
- configMap.put(configResource, Seq(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)).asJava)
- })
-
- val result = adminClient.incrementalAlterConfigs(configMap)
- result.all().get()
- }
-
- private def multipleRuns(
- print: Boolean, kernelOffloadEnabled: Boolean, adminClient: AdminClient,
- partitionsToConsume: Int, brokerIds: Iterable[Int], config: KernelTLSBenchmarkConfig): Double = {
- setKernelTlsConfig(kernelOffloadEnabled, adminClient, brokerIds, config.sslListenerName)
- Thread.sleep(10 * 1000)
- val enableStr = if (kernelOffloadEnabled) "enabled" else "disabled"
- if (print) {
- println(s"Consuming with KTLS $enableStr")
- }
- var totalBytesRead: Long = 0
- var totalElapsedMillis: Long = 0
- for (runIndex <- 1 to config.numRuns) {
- val (runBytesRead: Long, runElapsedMillis: Long) = runConsume(print, runIndex, partitionsToConsume, config)
- totalBytesRead += runBytesRead
- totalElapsedMillis += runElapsedMillis
- }
- val totalMB = totalBytesRead * 1.0 / (1024 * 1024)
- val totalSec = totalElapsedMillis / 1000.0
- val totalMBPerSec = totalMB / totalSec
- if (print) {
- println("Total throughput with KTLS %s = %.2f MB/s, time elapsed = %d ms"
- .format(enableStr, totalMBPerSec, totalElapsedMillis))
- }
- totalMBPerSec
- }
-
- private def runConsume(print: Boolean, runIndex: Int, partitionsToConsume: Int, config: KernelTLSBenchmarkConfig): (Long, Long) = {
- val groupId = UUID.randomUUID.toString
- val props = filterProps(config.props, ConsumerConfig.configNames)
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-
- val totalRecordsRead = new AtomicLong(0)
- val totalBytesRead = new AtomicLong(0)
-
- var startMs, endMs = 0L
-
- val countDownLatch = new CountDownLatch(partitionsToConsume)
-
- if (print) {
- printf(s"[Run $runIndex] Fetching records...")
- }
- startMs = System.currentTimeMillis
- for (partition <- 0 to partitionsToConsume - 1) {
- val runnable = new ConsumeRunnable(
- config.topic, partition, props, config, countDownLatch, totalRecordsRead, totalBytesRead)
- val thread = new Thread(runnable, "consumer-" + partition.toString)
- thread.start()
- }
-
- countDownLatch.await()
- endMs = System.currentTimeMillis
-
- val elapsedMillis = endMs - startMs
- val elapsedSecs = elapsedMillis / 1000.0
-
- val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
- val mbRate: Double = totalMBRead / elapsedSecs
- val messageRate = totalRecordsRead.get / elapsedSecs
-
- if (print) {
- println(" Throughput = %.2f MB/s".format(mbRate))
- }
- return (totalBytesRead.get, elapsedMillis)
- }
-
- class ConsumeRunnable(
- topic: String, partition: Int, props: Properties, config: KernelTLSBenchmarkConfig, countDownLatch: CountDownLatch,
- totalRecordsRead: AtomicLong, totalBytesRead: AtomicLong) extends Runnable {
- override def run(): Unit = {
- val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
- consumer.assign(Seq(new TopicPartition(topic, partition)).asJava)
-
- // Now start the benchmark
- var currentTimeMillis = System.currentTimeMillis
- var lastConsumedTime = currentTimeMillis
-
- var tot: Long = 0
- while (totalRecordsRead.get < config.numRecords && currentTimeMillis - lastConsumedTime <= config.timeoutMs) {
- val records = consumer.poll(Duration.ofMillis(100)).asScala
- currentTimeMillis = System.currentTimeMillis
- if (records.nonEmpty)
- lastConsumedTime = currentTimeMillis
- var bytesRead = 0L
- var recordsRead = 0L
- for (record <- records) {
- recordsRead += 1
- if (record.key != null)
- bytesRead += record.key.length
- if (record.value != null)
- bytesRead += record.value.length
- }
- totalRecordsRead.addAndGet(recordsRead)
- totalBytesRead.addAndGet(bytesRead)
- tot += recordsRead
- }
-
- if (totalRecordsRead.get() < config.numRecords) {
- println(s"WARNING: Exiting before consuming the expected number of records: timeout (${config.timeoutMs} ms) exceeded. ")
- }
- consumer.close()
- countDownLatch.countDown()
- }
- }
-
- class KernelTLSBenchmarkConfig(args: Array[String]) extends CommandDefaultOptions(args) {
- val consumerConfigOpt = parser.accepts("consumer-config", "Consumer config properties file.")
- .withRequiredArg
- .describedAs("config file")
- .ofType(classOf[String])
- val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
- .withRequiredArg
- .describedAs("topic")
- .ofType(classOf[String])
- val numRecordsOpt = parser.accepts("records", "REQUIRED: The number of records to consume")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Long])
- val partitionsOpt = parser.accepts("partitions", "REQUIRED: The number of partitions from which to consume")
- .withRequiredArg
- .describedAs("partitions")
- .ofType(classOf[java.lang.Integer])
- val numRunsOpt = parser.accepts("runs", "Number of runs to perform during the benchmark.")
- .withRequiredArg
- .describedAs("runs")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
- val sslListenerNameOpt = parser.accepts("ssl-listener-name",
- "The name of the SSL listener as configured in Kafka broker config.")
- .withRequiredArg
- .describedAs("ssl listener name")
- .ofType(classOf[String])
- .defaultsTo("SSL")
-
- try
- options = parser.parse(args: _*)
- catch {
- case e: OptionException =>
- CommandLineUtils.printUsageAndDie(parser, e.getMessage)
- }
-
- CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in performance test for the full zookeeper consumer")
-
- CommandLineUtils.checkRequiredArgs(parser, options,
- consumerConfigOpt, topicOpt, numRecordsOpt, numRunsOpt)
-
- val props: Properties = Utils.loadProps(options.valueOf(consumerConfigOpt))
-
- import org.apache.kafka.clients.consumer.ConsumerConfig
-
- // props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
- // props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString)
- // props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString)
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
- props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-
- val topic = options.valueOf(topicOpt)
- val numRecords = options.valueOf(numRecordsOpt).longValue
- val numRuns = options.valueOf(numRunsOpt).intValue
- val sslListenerName = options.valueOf(sslListenerNameOpt)
- val timeoutMs = 10 * 1000
- val partitions : Option[Int] = if (options.has(partitionsOpt)) Some(options.valueOf(partitionsOpt).intValue()) else None
- }
-}
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 999b8dee9d4ae..59b4cf1efd2da 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -101,7 +101,6 @@ versions += [
kafka_26: "2.6.2",
kafka_27: "2.7.1",
kafka_28: "2.8.0",
- ktls: "0.0.3",
lz4: "1.8.0",
mavenArtifact: "3.8.4",
metrics: "2.2.0",
@@ -177,7 +176,6 @@ libs += [
kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",
kafkaStreams_27: "org.apache.kafka:kafka-streams:$versions.kafka_27",
kafkaStreams_28: "org.apache.kafka:kafka-streams:$versions.kafka_28",
- ktls: "com.linkedin.ktls:ktls-jni:$versions.ktls",
log4j: "log4j:log4j:$versions.log4j",
lz4: "org.lz4:lz4-java:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",