From c5ae154a3fdcf20d388405361da37feed0c64ad9 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 23 Apr 2020 00:44:03 -0700 Subject: [PATCH] MINOR: Enable fatal warnings with scala 2.13 (#8429) * Upgrade to Scala 2.13.2 which introduces the ability to suppress warnings. * Upgrade to scala-collection-compat 2.1.6 as it introduces the @nowarn annotation for Scala 2.12. * While at it, also update scala-java8-compat to 0.9.1. * Fix compiler warnings and add @nowarn for the unfixed ones. Scala 2.13.2 highlights (besides @nowarn): * Rewrite Vector (using "radix-balanced finger tree vectors"), for performance. Small vectors are now more compactly represented. Some operations are now drastically faster on large vectors. A few operations may be a little slower. * Matching strings makes switches in bytecode. https://github.com/scala/scala/releases/tag/v2.13.2 Reviewers: Manikumar Reddy --- build.gradle | 6 ++++++ core/src/main/scala/kafka/admin/ConfigCommand.scala | 2 ++ core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/network/RequestChannel.scala | 3 ++- core/src/main/scala/kafka/network/SocketServer.scala | 2 +- .../scala/kafka/security/authorizer/AclAuthorizer.scala | 7 +++++-- .../scala/kafka/security/authorizer/AuthorizerUtils.scala | 3 +++ core/src/main/scala/kafka/server/KafkaApis.scala | 1 - .../main/scala/kafka/server/ReplicationQuotaManager.scala | 2 +- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 2 ++ core/src/main/scala/kafka/tools/MirrorMaker.scala | 4 ++++ core/src/main/scala/kafka/utils/Pool.scala | 2 +- core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala | 2 +- .../api/AdminClientWithPoliciesIntegrationTest.scala | 2 ++ .../integration/kafka/api/AuthorizerIntegrationTest.scala | 4 ++++ .../integration/kafka/api/BaseProducerSendTest.scala | 2 ++ .../scala/integration/kafka/api/ConsumerBounceTest.scala | 3 +++ .../kafka/api/PlaintextAdminIntegrationTest.scala | 8 ++++++-- .../kafka/api/SaslSslAdminIntegrationTest.scala | 2 ++ .../kafka/server/DynamicBrokerReconfigurationTest.scala | 4 ++++ .../MetricsDuringTopicCreationDeletionTest.scala | 2 +- .../kafka/integration/UncleanLeaderElectionTest.scala | 3 +++ core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala | 5 ++++- .../test/scala/unit/kafka/server/BaseRequestTest.scala | 3 ++- .../test/scala/unit/kafka/server/ReplicaManagerTest.scala | 2 +- .../test/scala/unit/kafka/server/RequestQuotaTest.scala | 2 +- .../kafka/server/epoch/LeaderEpochFileCacheTest.scala | 6 +++--- .../src/test/scala/unit/kafka/tools/MirrorMakerTest.scala | 4 ++++ .../scala/unit/kafka/utils/ShutdownableThreadTest.scala | 2 +- gradle/dependencies.gradle | 6 +++--- 30 files changed, 75 insertions(+), 23 deletions(-) diff --git a/build.gradle b/build.gradle index 7675b81751992..644d3ebaeed82 100644 --- a/build.gradle +++ b/build.gradle @@ -484,6 +484,12 @@ subprojects { scalaCompileOptions.additionalParameters += ["-opt:l:inline"] scalaCompileOptions.additionalParameters += inlineFrom + if (versions.baseScala != '2.12') { + scalaCompileOptions.additionalParameters += ["-opt-warnings"] + // Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings + scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"] + } + // these options are valid for Scala versions < 2.13 only // Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969 if (versions.baseScala == '2.12') { diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 3be24aab34de7..a43fe9e539cd9 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -39,6 +39,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S import org.apache.kafka.common.utils.{Sanitizer, Time, Utils} import org.apache.zookeeper.client.ZKClientConfig +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection._ @@ -297,6 +298,7 @@ object ConfigCommand extends Config { } } + @nowarn("cat=deprecation") private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = { val entityTypes = opts.entityTypes val entityNames = opts.entityNames diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index ec2498721b902..3d6d8037580df 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -498,7 +498,7 @@ class Log(@volatile private var _dir: File, def newLeaderEpochFileCache(): LeaderEpochFileCache = { val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) - new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile) + new LeaderEpochFileCache(topicPartition, () => logEndOffset, checkpointFile) } if (recordVersion.precedes(RecordVersion.V2)) { diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index faa20f0ba7f72..17ea3f3ab2d01 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -38,6 +38,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Sanitizer, Time} +import scala.annotation.nowarn import scala.collection.mutable import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag @@ -106,7 +107,7 @@ object RequestChannel extends Logging { def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}" - def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = { + def body[T <: AbstractRequest](implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = { bodyAndSize.request match { case r: T => r case r => diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 77c47f0041bcd..350e58d2110f6 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -700,7 +700,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, * Wakeup the thread for selection. */ @Override - def wakeup = nioSelector.wakeup() + def wakeup(): Unit = nioSelector.wakeup() } diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index bbb2fc8e4727f..5f2be9053515d 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -39,7 +39,8 @@ import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult import org.apache.kafka.server.authorizer._ import org.apache.zookeeper.client.ZKClientConfig -import scala.collection.{mutable, Seq} +import scala.annotation.nowarn +import scala.collection.{Seq, mutable} import scala.jdk.CollectionConverters._ import scala.util.{Failure, Random, Success, Try} @@ -249,7 +250,7 @@ class AclAuthorizer extends Authorizer with Logging { } } catch { case e: Exception => - resourceBindingsBeingDeleted.foreach { case (binding, index) => + resourceBindingsBeingDeleted.keys.foreach { binding => deleteExceptions.getOrElseUpdate(binding, apiException(e)) } } @@ -263,6 +264,7 @@ class AclAuthorizer extends Authorizer with Logging { }.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava } + @nowarn("cat=optimizer") override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = { val aclBindings = new util.ArrayList[AclBinding]() aclCache.foreach { case (resource, versionedAcls) => @@ -342,6 +344,7 @@ class AclAuthorizer extends Authorizer with Logging { } else false } + @nowarn("cat=deprecation") private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = { // save aclCache reference to a local val to get a consistent view of the cache during acl updates. val aclCacheSnapshot = aclCache diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala index 4620c93edd752..0d670befbb7ff 100644 --- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala +++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala @@ -28,9 +28,12 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer} +import scala.annotation.nowarn + object AuthorizerUtils { + @nowarn("cat=deprecation") def createAuthorizer(className: String): Authorizer = { Utils.newInstance(className, classOf[Object]) match { case auth: Authorizer => auth diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0d54919b75235..b69db1761a9b6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger import kafka.admin.{AdminUtils, RackAwareMode} import kafka.api.ElectLeadersRequestOps -import kafka.api.LeaderAndIsr import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0} import kafka.cluster.Partition import kafka.common.OffsetAndMetadata diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala index df1f946a91268..375cd4868b25d 100644 --- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala @@ -133,7 +133,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig, * @param value */ def record(value: Long): Unit = { - sensor().record(value, time.milliseconds(), false) + sensor().record(value.toDouble, time.milliseconds(), false) } /** diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index bc69fc286a3d0..5f7b9ab9e90e6 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer} import org.apache.kafka.common.utils.Utils +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ /** @@ -575,6 +576,7 @@ class ChecksumMessageFormatter extends MessageFormatter { topicStr = "" } + @nowarn("cat=deprecation") def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { output.println(topicStr + "checksum:" + consumerRecord.checksum) } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 70e3e020e00b4..970dbd39848ae 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition} +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.mutable.HashMap import scala.util.control.ControlThrowable @@ -190,6 +191,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { setName(threadName) + @nowarn("cat=deprecation") private def toBaseConsumerRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord = BaseConsumerRecord(record.topic, record.partition, @@ -412,10 +414,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { * If message.handler.args is specified. A constructor that takes in a String as argument must exist. */ trait MirrorMakerMessageHandler { + @nowarn("cat=deprecation") def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] } private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { + @nowarn("cat=deprecation") override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, record.headers)) diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 964de7eae26a3..93fd97c9f0dc3 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -83,7 +83,7 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { def hasNext: Boolean = iter.hasNext - def next: (K, V) = { + def next(): (K, V) = { val n = iter.next (n.getKey, n.getValue) } diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 02a533d23a862..0a91ece66aff8 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -198,7 +198,7 @@ class ZooKeeperClient(connectString: String, case GetDataRequest(path, ctx) => zooKeeper.getData(path, shouldWatch(request), new DataCallback { def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit = - callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs))), + callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs))) }, ctx.orNull) case GetChildrenRequest(path, _, ctx) => zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback { diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index 33260031b4c61..900fc81f8edae 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -31,6 +31,7 @@ import org.junit.{After, Before, Rule, Test} import org.junit.rules.Timeout import org.scalatest.Assertions.intercept +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ /** @@ -92,6 +93,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(zkClient, servers, client) } + @nowarn("cat=deprecation") @Test def testInvalidAlterConfigsDueToPolicy(): Unit = { client = Admin.create(createConfig) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 0cbad3b64d56f..904a2be44c505 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -59,6 +59,7 @@ import org.junit.Assert._ import org.junit.{After, Before, Test} import org.scalatest.Assertions.intercept +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.mutable import scala.collection.mutable.Buffer @@ -949,6 +950,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(consumer) } + @nowarn("cat=deprecation") @Test def testPatternSubscriptionWithNoTopicAccess(): Unit = { createTopic(topic) @@ -985,6 +987,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } + @nowarn("cat=deprecation") @Test def testPatternSubscriptionWithTopicAndGroupRead(): Unit = { createTopic(topic) @@ -1016,6 +1019,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertTrue(consumer.assignment().isEmpty) } + @nowarn("cat=deprecation") @Test def testPatternSubscriptionMatchingInternalTopic(): Unit = { createTopic(topic) diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 9797e098e2fb8..cec3e8d7b54ca 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -36,6 +36,7 @@ import org.junit.Assert._ import org.junit.{After, Before, Test} import org.scalatest.Assertions.fail +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.mutable.Buffer import scala.concurrent.ExecutionException @@ -102,6 +103,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected. * 2. Last message of the non-blocking send should return the correct offset metadata */ + @nowarn("cat=deprecation") @Test def testSendOffset(): Unit = { val producer = createProducer(brokerList) diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 24942fa300214..51c308df4d01d 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinator import org.junit.Assert._ import org.junit.{After, Ignore, Test} +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.{Seq, mutable} @@ -83,6 +84,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { * 1. Produce a bunch of messages * 2. Then consume the messages while killing and restarting brokers at random */ + @nowarn("cat=deprecation") def consumeWithBrokerFailures(numIters: Int): Unit = { val numRecords = 1000 val producer = createProducer() @@ -379,6 +381,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { checkCloseDuringRebalance("group1", topic, executor, true) } + @nowarn("cat=deprecation") private def checkCloseDuringRebalance(groupId: String, topic: String, executor: ExecutorService, brokersAvailableDuringClose: Boolean): Unit = { def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], revokeSemaphore: Option[Semaphore] = None): Future[Any] = { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 6cb33fa42f7df..f014481951290 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -45,6 +45,7 @@ import org.junit.Assert._ import org.junit.{After, Before, Ignore, Test} import org.scalatest.Assertions.intercept +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.Seq import scala.concurrent.duration.Duration @@ -2176,6 +2177,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { /** * The AlterConfigs API is deprecated and should not support altering log levels */ + @nowarn("cat=deprecation") @Test @Ignore // To be re-enabled once KAFKA-8779 is resolved def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = { @@ -2227,6 +2229,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { object PlaintextAdminIntegrationTest { + @nowarn("cat=deprecation") def checkValidAlterConfigs(client: Admin, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = { // Alter topics var topicConfigEntries1 = Seq( @@ -2289,6 +2292,7 @@ object PlaintextAdminIntegrationTest { assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) } + @nowarn("cat=deprecation") def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: Seq[KafkaServer], client: Admin): Unit = { // Create topics val topic1 = "invalid-alter-configs-topic-1" @@ -2356,12 +2360,12 @@ object PlaintextAdminIntegrationTest { assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value) - assertEquals(Defaults.CompressionType.toString, + assertEquals(Defaults.CompressionType, configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value) assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value) - assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value) + assertEquals(Defaults.CompressionType, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value) } } diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index c0319f0e3b21e..3ef00f1087c03 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.junit.Assert.{assertEquals, assertTrue} import org.junit.{After, Assert, Before, Test} +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.Seq import scala.compat.java8.OptionConverters._ @@ -46,6 +47,7 @@ abstract class AuthorizationAdmin { // Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage // It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup { + @nowarn("cat=deprecation") val authorizationAdmin: AuthorizationAdmin = new LegacyAuthorizationAdmin this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 12548a1c742fa..b610db25775d5 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -63,6 +63,7 @@ import org.junit.Assert._ import org.junit.{After, Before, Ignore, Test} import org.scalatest.Assertions.intercept +import scala.annotation.nowarn import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ @@ -1324,6 +1325,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet }.mkString(",") } + @nowarn("cat=deprecation") private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = { val configs = servers.map { server => val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) @@ -1350,6 +1352,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered) } + @nowarn("cat=deprecation") private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = { val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava val newConfig = new Config(configEntries) @@ -1358,6 +1361,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.asScala.foreach { case (k, v) => waitForConfigOnServer(server, k, v) } } + @nowarn("cat=deprecation") private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = { val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala index 11f148837eebc..066725cea3d9d 100644 --- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala @@ -53,7 +53,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with .map(KafkaConfig.fromProps(_, overridingProps)) @Before - override def setUp: Unit = { + override def setUp(): Unit = { // Do some Metrics Registry cleanup by removing the metrics that this test checks. // This is a test workaround to the issue that prior harness runs may have left a populated registry. // see https://issues.apache.org/jira/browse/KAFKA-4605 diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 6fcdd4c695817..2f16abaaa28c9 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -40,6 +40,8 @@ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsRes import org.junit.Assert._ import org.scalatest.Assertions.intercept +import scala.annotation.nowarn + class UncleanLeaderElectionTest extends ZooKeeperTestHarness { val brokerId1 = 0 val brokerId2 = 1 @@ -347,6 +349,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { assertEquals(List("first", "third"), consumeAllMessages(topic, 2)) } + @nowarn("cat=deprecation") private def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = { val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava val newConfig = new Config(configEntries) diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index b8bc00631be09..7a4b328ae25ea 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -31,6 +31,8 @@ import scala.util.Random import kafka.utils.TestUtils import org.apache.kafka.common.errors.InvalidOffsetException +import scala.annotation.nowarn + class OffsetIndexTest { var idx: OffsetIndex = null @@ -47,7 +49,8 @@ class OffsetIndexTest { if(this.idx != null) this.idx.file.delete() } - + + @nowarn("cat=deprecation") @Test def randomLookupTest(): Unit = { assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset, 0), idx.lookup(92L)) diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index dfd425ebd452a..bf54cdb19157f 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader} +import scala.annotation.nowarn import scala.collection.Seq import scala.reflect.ClassTag @@ -86,7 +87,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness { } def receive[T <: AbstractResponse](socket: Socket, apiKey: ApiKeys, version: Short) - (implicit classTag: ClassTag[T], nn: NotNothing[T]): T = { + (implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = { val incoming = new DataInputStream(socket.getInputStream) val len = incoming.readInt() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 4dde2d3d321e7..835c113370aa1 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -35,7 +35,7 @@ import kafka.utils.timer.MockTimer import kafka.utils.{MockScheduler, MockTime, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState -import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState} +import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index a3b59f44957b3..6dd151828dba8 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -16,7 +16,7 @@ package kafka.server import java.util import java.util.concurrent.{Executors, Future, TimeUnit} -import java.util.{Collections, LinkedHashMap, Optional, Properties} +import java.util.{Collections, Optional, Properties} import kafka.api.LeaderAndIsr import kafka.log.LogConfig diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index af98b4787f8ce..0ddc9964f3d14 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -40,7 +40,7 @@ class LeaderEpochFileCacheTest { override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs override def read(): Seq[EpochEntry] = this.epochs } - private val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint) + private val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint) @Test def shouldAddEpochAndMessageOffsetToCache() = { @@ -231,12 +231,12 @@ class LeaderEpochFileCacheTest { val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath)) //Given - val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint) + val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint) cache.assign(epoch = 2, startOffset = 6) //When val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath)) - val cache2 = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint2) + val cache2 = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint2) //Then assertEquals(1, cache2.epochEntries.size) diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala index 0afdf093032d2..413b5ba9aab53 100644 --- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala @@ -19,10 +19,14 @@ package kafka.tools import kafka.consumer.BaseConsumerRecord import org.apache.kafka.common.record.{RecordBatch, TimestampType} + import scala.jdk.CollectionConverters._ import org.junit.Assert._ import org.junit.Test +import scala.annotation.nowarn + +@nowarn("cat=deprecation") class MirrorMakerTest { @Test diff --git a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala index 31f7efb5ed3b1..672ac87a35b4a 100644 --- a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala @@ -38,7 +38,7 @@ class ShutdownableThreadTest { } val latch = new CountDownLatch(1) val thread = new ShutdownableThread("shutdownable-thread-test") { - override def doWork: Unit = { + override def doWork(): Unit = { latch.countDown() throw new FatalExitError } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 67d85d6262b60..e38b14421643b 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -28,7 +28,7 @@ ext { // Add Scala version def defaultScala212Version = '2.12.11' -def defaultScala213Version = '2.13.1' +def defaultScala213Version = '2.13.2' if (hasProperty('scalaVersion')) { if (scalaVersion == '2.12') { versions["scala"] = defaultScala212Version @@ -102,9 +102,9 @@ versions += [ powermock: "2.0.7", reflections: "0.9.12", rocksDB: "5.18.4", - scalaCollectionCompat: "2.1.4", + scalaCollectionCompat: "2.1.6", scalafmt: "1.5.1", - scalaJava8Compat : "0.9.0", + scalaJava8Compat : "0.9.1", scalatest: "3.0.8", scoverage: "1.4.1", scoveragePlugin: "4.0.1",