Skip to content

Commit

Permalink
KAFKA-9324: Drop support for Scala 2.11 (KIP-531) (apache#7859)
Browse files Browse the repository at this point in the history
* Adjust build and documentation.
* Use lambda syntax for SAM types in `core`, `streams-scala` and
`connect-runtime` modules.
* Remove `runnable` and `newThread` from `CoreUtils` as lambda
syntax for SAM types make them unnecessary.
* Remove stale comment in `FunctionsCompatConversions`,
`KGroupedStream`, `KGroupedTable' and `KStream` about Scala 2.11,
the conversions are needed for Scala 2.12 too.
* Deprecate `org.apache.kafka.streams.scala.kstream.Suppressed`
and use `org.apache.kafka.streams.kstream.Suppressed` instead.
* Use `Admin.create` instead of `AdminClient.create`. Static methods
in Java interfaces can be invoked since Scala 2.12. I noticed that
MirrorMaker 2 uses `AdminClient.create`, but I did not change them
as Connectors have restrictions on newer client APIs.
* Improve efficiency in a few `Gauge` implementations by avoiding
unnecessary intermediate collections.
* Remove pointless `Option.apply` in `ZookeeperClient`
`SessionState` metric.
* Fix unused import/variable and other compiler warnings.
* Reduce visibility of some vals/defs.

Reviewers: Manikumar Reddy <[email protected]>, Guozhang Wang <[email protected]>, Gwen Shapira <[email protected]>
  • Loading branch information
ijuma authored Jan 6, 2020
1 parent 42b0971 commit 6dc6f6a
Show file tree
Hide file tree
Showing 104 changed files with 776 additions and 1,466 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ The release file can be found inside `./core/build/distributions/`.
### Cleaning the build ###
./gradlew clean

### Running a task with one of the Scala versions available (2.11.x, 2.12.x or 2.13.x) ###
### Running a task with one of the Scala versions available (2.12.x or 2.13.x) ###
*Note that if building the jars with a version other than 2.12.x, you need to set the `SCALA_VERSION` variable or change it in `bin/kafka-run-class.sh` to run the quick start.*

You can pass either the major version (eg 2.12) or the full version (eg 2.12.7):
Expand Down
45 changes: 20 additions & 25 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ subprojects {
"-language:postfixOps",
"-language:implicitConversions",
"-language:existentials",
"-Xlint:constant",
"-Xlint:delayedinit-select",
"-Xlint:doc-detached",
"-Xlint:missing-interpolator",
Expand All @@ -421,35 +422,29 @@ subprojects {
"-Xlint:poly-implicit-overload",
"-Xlint:private-shadow",
"-Xlint:stars-align",
"-Xlint:type-parameter-shadow"
"-Xlint:type-parameter-shadow",
"-Xlint:unused"
]

if (versions.baseScala != '2.11') {
scalaCompileOptions.additionalParameters += [
"-Xlint:constant",
"-Xlint:unused"
]
// Inline more aggressively when compiling the `core` jar since it's not meant to be used as a library.
// More specifically, inline classes from the Scala library so that we can inline methods like `Option.exists`
// and avoid lambda allocations. This is only safe if the Scala library version is the same at compile time
// and runtime. We cannot guarantee this for libraries like kafka streams, so only inline classes from the
// Kafka project in that case.
List<String> inlineFrom
if (project.name.equals('core'))
inlineFrom = ["-opt-inline-from:scala.**", "-opt-inline-from:kafka.**", "-opt-inline-from:org.apache.kafka.**"]
else
inlineFrom = ["-opt-inline-from:org.apache.kafka.**"]

// Inline more aggressively when compiling the `core` jar since it's not meant to be used as a library.
// More specifically, inline classes from the Scala library so that we can inline methods like `Option.exists`
// and avoid lambda allocations. This is only safe if the Scala library version is the same at compile time
// and runtime. We cannot guarantee this for libraries like kafka streams, so only inline classes from the
// Kafka project in that case.
List<String> inlineFrom
if (project.name.equals('core'))
inlineFrom = ["-opt-inline-from:scala.**", "-opt-inline-from:kafka.**", "-opt-inline-from:org.apache.kafka.**"]
else
inlineFrom = ["-opt-inline-from:org.apache.kafka.**"]

// Somewhat confusingly, `-opt:l:inline` enables all optimizations. `inlineFrom` configures what can be inlined.
// See https://www.lightbend.com/blog/scala-inliner-optimizer for more information about the optimizer.
scalaCompileOptions.additionalParameters += ["-opt:l:inline"]
scalaCompileOptions.additionalParameters += inlineFrom
}
// Somewhat confusingly, `-opt:l:inline` enables all optimizations. `inlineFrom` configures what can be inlined.
// See https://www.lightbend.com/blog/scala-inliner-optimizer for more information about the optimizer.
scalaCompileOptions.additionalParameters += ["-opt:l:inline"]
scalaCompileOptions.additionalParameters += inlineFrom

// these options are valid for Scala versions < 2.13 only
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
if (versions.baseScala in ['2.11','2.12']) {
// these options are valid for Scala versions < 2.13 only
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
if (versions.baseScala == '2.12') {
scalaCompileOptions.additionalParameters += [
"-Xlint:by-name-right-associative",
"-Xlint:unsound-match"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ public KafkaThread(final String name, Runnable runnable, boolean daemon) {

private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.error("Uncaught exception in thread '{}':", name, e);
}
});
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,7 @@ Map<String, String> tags() {
public <T> void addValueMetric(MetricNameTemplate nameTemplate, final LiteralSupplier<T> supplier) {
MetricName metricName = metricName(nameTemplate);
if (metrics().metric(metricName) == null) {
metrics().addMetric(metricName, new Gauge<T>() {
@Override
public T value(MetricConfig config, long now) {
return supplier.metricValue(now);
}
});
metrics().addMetric(metricName, (Gauge<T>) (config, now) -> supplier.metricValue(now));
}
}

Expand All @@ -324,12 +319,7 @@ public T value(MetricConfig config, long now) {
public <T> void addImmutableValueMetric(MetricNameTemplate nameTemplate, final T value) {
MetricName metricName = metricName(nameTemplate);
if (metrics().metric(metricName) == null) {
metrics().addMetric(metricName, new Gauge<T>() {
@Override
public T value(MetricConfig config, long now) {
return value;
}
});
metrics().addMetric(metricName, (Gauge<T>) (config, now) -> value);
}
}

Expand Down Expand Up @@ -454,4 +444,4 @@ public static void main(String[] args) {
ConnectMetricsRegistry metrics = new ConnectMetricsRegistry();
System.out.println(Metrics.toHtmlTable(JMX_PREFIX, metrics.getAllTemplates()));
}
}
}
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.security.auth._
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AdminClient => JAdminClient}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
Expand Down Expand Up @@ -102,7 +102,7 @@ object AclCommand extends Logging {
else
new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
val adminClient = JAdminClient.create(props)
val adminClient = Admin.create(props)

try {
f(adminClient)
Expand Down Expand Up @@ -291,7 +291,7 @@ object AclCommand extends Logging {

class JAuthorizerService(val authorizerClass: Class[_ <: JAuthorizer], val opts: AclCommandOptions) extends AclCommandService with Logging {

private def withAuthorizer()(f: JAuthorizer => Unit) {
private def withAuthorizer()(f: JAuthorizer => Unit): Unit = {
val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSecurityEnabled)
val authorizerProperties =
if (opts.options.has(opts.authorizerPropertiesOpt)) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ object AdminUtils extends Logging {
*/
private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int, String]): IndexedSeq[Int] = {
val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack, brokers) =>
(rack, brokers.toIterator)
(rack, brokers.iterator)
}
val racks = brokersIteratorByRack.keys.toArray.sorted
val result = new mutable.ArrayBuffer[Int]
Expand Down
30 changes: 14 additions & 16 deletions core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,22 @@ object BrokerApiVersionsCommand {
@volatile var running: Boolean = true
val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()

val networkThread = new KafkaThread("admin-client-network-thread", new Runnable {
override def run(): Unit = {
try {
while (running)
client.poll(time.timer(Long.MaxValue))
} catch {
case t : Throwable =>
error("admin-client-network-thread exited", t)
} finally {
pendingFutures.asScala.foreach { future =>
try {
future.raise(Errors.UNKNOWN_SERVER_ERROR)
} catch {
case _: IllegalStateException => // It is OK if the future has been completed
}
val networkThread = new KafkaThread("admin-client-network-thread", () => {
try {
while (running)
client.poll(time.timer(Long.MaxValue))
} catch {
case t: Throwable =>
error("admin-client-network-thread exited", t)
} finally {
pendingFutures.asScala.foreach { future =>
try {
future.raise(Errors.UNKNOWN_SERVER_ERROR)
} catch {
case _: IllegalStateException => // It is OK if the future has been completed
}
pendingFutures.clear()
}
pendingFutures.clear()
}
}, true)

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, AdminClient => JAdminClient, Config => JConfig, ListTopicsOptions}
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, Config => JConfig, ListTopicsOptions}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
Expand Down Expand Up @@ -276,7 +276,7 @@ object ConfigCommand extends Config {
else
new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
val adminClient = JAdminClient.create(props)
val adminClient = Admin.create(props)

if (opts.entityTypes.size != 1)
throw new IllegalArgumentException(s"Exactly one entity type (out of ${BrokerSupportedConfigTypes.mkString(",")}) must be specified with --bootstrap-server")
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import kafka.utils._
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.{CommonClientConfigs, admin}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}

Expand Down Expand Up @@ -640,7 +640,7 @@ object ConsumerGroupCommand extends Logging {
val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
configOverrides.foreach { case (k, v) => props.put(k, v)}
admin.AdminClient.create(props)
Admin.create(props)
}

private def withTimeoutMs [T <: AbstractOptions[T]] (options : T) = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.Base64
import joptsimple.ArgumentAcceptingOptionSpec
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions, AdminClient => JAdminClient}
import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
Expand Down Expand Up @@ -146,7 +146,7 @@ object DelegationTokenCommand extends Logging {
private def createAdminClient(opts: DelegationTokenCommandOptions): Admin = {
val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
JAdminClient.create(props)
Admin.create(props)
}

class DelegationTokenCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import java.util.Properties
import kafka.common.AdminCommandFailedException
import kafka.utils.json.JsonValue
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Json}
import org.apache.kafka.clients.admin.RecordsToDelete
import org.apache.kafka.clients.{CommonClientConfigs, admin}
import org.apache.kafka.clients.admin.{Admin, RecordsToDelete}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils

Expand Down Expand Up @@ -100,13 +100,13 @@ object DeleteRecordsCommand {
adminClient.close()
}

private def createAdminClient(opts: DeleteRecordsCommandOptions): admin.Admin = {
private def createAdminClient(opts: DeleteRecordsCommandOptions): Admin = {
val props = if (opts.options.has(opts.commandConfigOpt))
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
admin.AdminClient.create(props)
Admin.create(props)
}

class DeleteRecordsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.utils.CommandLineUtils
import kafka.utils.CoreUtils
import kafka.utils.Json
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AdminClient => JAdminClient}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ClusterAuthorizationException
Expand Down Expand Up @@ -81,7 +81,7 @@ object LeaderElectionCommand extends Logging {
)
props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)

JAdminClient.create(props)
Admin.create(props)
}

try {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/LogDirsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.PrintStream
import java.util.Properties

import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirsResult, AdminClient => JAdminClient}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirsResult}
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.utils.Utils

Expand Down Expand Up @@ -89,7 +89,7 @@ object LogDirsCommand {
new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
JAdminClient.create(props)
Admin.create(props)
}

class LogDirsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import collection.JavaConverters._
import collection._
import java.util.Properties
import java.util.concurrent.ExecutionException

import joptsimple.OptionSpecBuilder
import kafka.common.AdminCommandFailedException
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ClusterAuthorizationException
Expand Down Expand Up @@ -209,7 +210,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
class AdminClientCommand(adminClientProps: Properties)
extends Command with Logging {

val adminClient = org.apache.kafka.clients.admin.AdminClient.create(adminClientProps)
val adminClient = Admin.create(adminClientProps)

override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]): Unit = {
val partitions = partitionsFromUser match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kafka.utils._
import kafka.utils.json.JsonValue
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterReplicaLogDirsOptions}
import org.apache.kafka.common.errors.ReplicaNotAvailableException
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
Expand Down Expand Up @@ -78,7 +78,7 @@ object ReassignPartitionsCommand extends Logging {
new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool")
Some(JAdminClient.create(props))
Some(Admin.create(props))
} else {
None
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import kafka.utils.Implicits._
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, ConfigEntry, ListPartitionReassignmentsOptions, ListTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, AdminClient => JAdminClient, Config => JConfig}
import org.apache.kafka.clients.admin.{Admin, ConfigEntry, ListPartitionReassignmentsOptions, ListTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, Config => JConfig}
import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
Expand Down Expand Up @@ -209,7 +209,7 @@ object TopicCommand extends Logging {
case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)
case None =>
}
JAdminClient.create(commandConfig)
Admin.create(commandConfig)
}

def apply(commandConfig: Properties, bootstrapServer: Option[String]): AdminClientTopicService =
Expand Down
Loading

0 comments on commit 6dc6f6a

Please sign in to comment.