Skip to content

Commit

Permalink
MINOR: Enable fatal warnings with scala 2.13 (apache#8429)
Browse files Browse the repository at this point in the history
* Upgrade to Scala 2.13.2 which introduces the ability to suppress warnings.
* Upgrade to scala-collection-compat 2.1.6 as it introduces the
@nowarn annotation for Scala 2.12.
* While at it, also update scala-java8-compat to 0.9.1.
* Fix compiler warnings and add @nowarn for the unfixed ones.

Scala 2.13.2 highlights (besides @nowarn):

* Rewrite Vector (using "radix-balanced finger tree vectors"),
for performance. Small vectors are now more compactly
represented. Some operations are now drastically faster on
large vectors. A few operations may be a little slower.
* Matching strings makes switches in bytecode.

https://github.com/scala/scala/releases/tag/v2.13.2

Reviewers: Manikumar Reddy <[email protected]>
  • Loading branch information
ijuma authored Apr 23, 2020
1 parent 0397740 commit c5ae154
Show file tree
Hide file tree
Showing 30 changed files with 75 additions and 23 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,12 @@ subprojects {
scalaCompileOptions.additionalParameters += ["-opt:l:inline"]
scalaCompileOptions.additionalParameters += inlineFrom

if (versions.baseScala != '2.12') {
scalaCompileOptions.additionalParameters += ["-opt-warnings"]
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
}

// these options are valid for Scala versions < 2.13 only
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
if (versions.baseScala == '2.12') {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.zookeeper.client.ZKClientConfig

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection._

Expand Down Expand Up @@ -297,6 +298,7 @@ object ConfigCommand extends Config {
}
}

@nowarn("cat=deprecation")
private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ class Log(@volatile private var _dir: File,

def newLeaderEpochFileCache(): LeaderEpochFileCache = {
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
new LeaderEpochFileCache(topicPartition, () => logEndOffset, checkpointFile)
}

if (recordVersion.precedes(RecordVersion.V2)) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}

import scala.annotation.nowarn
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -106,7 +107,7 @@ object RequestChannel extends Logging {

def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"

def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
def body[T <: AbstractRequest](implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = {
bodyAndSize.request match {
case r: T => r
case r =>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
* Wakeup the thread for selection.
*/
@Override
def wakeup = nioSelector.wakeup()
def wakeup(): Unit = nioSelector.wakeup()

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer._
import org.apache.zookeeper.client.ZKClientConfig

import scala.collection.{mutable, Seq}
import scala.annotation.nowarn
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Random, Success, Try}

Expand Down Expand Up @@ -249,7 +250,7 @@ class AclAuthorizer extends Authorizer with Logging {
}
} catch {
case e: Exception =>
resourceBindingsBeingDeleted.foreach { case (binding, index) =>
resourceBindingsBeingDeleted.keys.foreach { binding =>
deleteExceptions.getOrElseUpdate(binding, apiException(e))
}
}
Expand All @@ -263,6 +264,7 @@ class AclAuthorizer extends Authorizer with Logging {
}.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
}

@nowarn("cat=optimizer")
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
val aclBindings = new util.ArrayList[AclBinding]()
aclCache.foreach { case (resource, versionedAcls) =>
Expand Down Expand Up @@ -342,6 +344,7 @@ class AclAuthorizer extends Authorizer with Logging {
} else false
}

@nowarn("cat=deprecation")
private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
// save aclCache reference to a local val to get a consistent view of the cache during acl updates.
val aclCacheSnapshot = aclCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}

import scala.annotation.nowarn


object AuthorizerUtils {

@nowarn("cat=deprecation")
def createAuthorizer(className: String): Authorizer = {
Utils.newInstance(className, classOf[Object]) match {
case auth: Authorizer => auth
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger

import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.ElectLeadersRequestOps
import kafka.api.LeaderAndIsr
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
* @param value
*/
def record(value: Long): Unit = {
sensor().record(value, time.milliseconds(), false)
sensor().record(value.toDouble, time.milliseconds(), false)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
import org.apache.kafka.common.utils.Utils

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -575,6 +576,7 @@ class ChecksumMessageFormatter extends MessageFormatter {
topicStr = ""
}

@nowarn("cat=deprecation")
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
output.println(topicStr + "checksum:" + consumerRecord.checksum)
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/tools/MirrorMaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable.HashMap
import scala.util.control.ControlThrowable
Expand Down Expand Up @@ -190,6 +191,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {

setName(threadName)

@nowarn("cat=deprecation")
private def toBaseConsumerRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord =
BaseConsumerRecord(record.topic,
record.partition,
Expand Down Expand Up @@ -412,10 +414,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
* If message.handler.args is specified. A constructor that takes in a String as argument must exist.
*/
trait MirrorMakerMessageHandler {
@nowarn("cat=deprecation")
def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
}

private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
@nowarn("cat=deprecation")
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp
Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, record.headers))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/utils/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {

def hasNext: Boolean = iter.hasNext

def next: (K, V) = {
def next(): (K, V) = {
val n = iter.next
(n.getKey, n.getValue)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class ZooKeeperClient(connectString: String,
case GetDataRequest(path, ctx) =>
zooKeeper.getData(path, shouldWatch(request), new DataCallback {
def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit =
callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs))),
callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs)))
}, ctx.orNull)
case GetChildrenRequest(path, _, ctx) =>
zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.junit.{After, Before, Rule, Test}
import org.junit.rules.Timeout
import org.scalatest.Assertions.intercept

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -92,6 +93,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(zkClient, servers, client)
}

@nowarn("cat=deprecation")
@Test
def testInvalidAlterConfigsDueToPolicy(): Unit = {
client = Admin.create(createConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.Assertions.intercept

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.collection.mutable.Buffer
Expand Down Expand Up @@ -949,6 +950,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumeRecords(consumer)
}

@nowarn("cat=deprecation")
@Test
def testPatternSubscriptionWithNoTopicAccess(): Unit = {
createTopic(topic)
Expand Down Expand Up @@ -985,6 +987,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
}

@nowarn("cat=deprecation")
@Test
def testPatternSubscriptionWithTopicAndGroupRead(): Unit = {
createTopic(topic)
Expand Down Expand Up @@ -1016,6 +1019,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertTrue(consumer.assignment().isEmpty)
}

@nowarn("cat=deprecation")
@Test
def testPatternSubscriptionMatchingInternalTopic(): Unit = {
createTopic(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.Assertions.fail

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable.Buffer
import scala.concurrent.ExecutionException
Expand Down Expand Up @@ -102,6 +103,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
* 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
* 2. Last message of the non-blocking send should return the correct offset metadata
*/
@nowarn("cat=deprecation")
@Test
def testSendOffset(): Unit = {
val producer = createProducer(brokerList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinator
import org.junit.Assert._
import org.junit.{After, Ignore, Test}

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, mutable}

Expand Down Expand Up @@ -83,6 +84,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* 1. Produce a bunch of messages
* 2. Then consume the messages while killing and restarting brokers at random
*/
@nowarn("cat=deprecation")
def consumeWithBrokerFailures(numIters: Int): Unit = {
val numRecords = 1000
val producer = createProducer()
Expand Down Expand Up @@ -379,6 +381,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
checkCloseDuringRebalance("group1", topic, executor, true)
}

@nowarn("cat=deprecation")
private def checkCloseDuringRebalance(groupId: String, topic: String, executor: ExecutorService, brokersAvailableDuringClose: Boolean): Unit = {

def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], revokeSemaphore: Option[Semaphore] = None): Future[Any] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test}
import org.scalatest.Assertions.intercept

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -2176,6 +2177,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
/**
* The AlterConfigs API is deprecated and should not support altering log levels
*/
@nowarn("cat=deprecation")
@Test
@Ignore // To be re-enabled once KAFKA-8779 is resolved
def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = {
Expand Down Expand Up @@ -2227,6 +2229,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {

object PlaintextAdminIntegrationTest {

@nowarn("cat=deprecation")
def checkValidAlterConfigs(client: Admin, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
// Alter topics
var topicConfigEntries1 = Seq(
Expand Down Expand Up @@ -2289,6 +2292,7 @@ object PlaintextAdminIntegrationTest {
assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
}

@nowarn("cat=deprecation")
def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: Seq[KafkaServer], client: Admin): Unit = {
// Create topics
val topic1 = "invalid-alter-configs-topic-1"
Expand Down Expand Up @@ -2356,12 +2360,12 @@ object PlaintextAdminIntegrationTest {

assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
assertEquals(Defaults.CompressionType.toString,
assertEquals(Defaults.CompressionType,
configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)

assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)

assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
assertEquals(Defaults.CompressionType, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{After, Assert, Before, Test}

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
Expand All @@ -46,6 +47,7 @@ abstract class AuthorizationAdmin {
// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
@nowarn("cat=deprecation")
val authorizationAdmin: AuthorizationAdmin = new LegacyAuthorizationAdmin
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test}
import org.scalatest.Assertions.intercept

import scala.annotation.nowarn
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -1324,6 +1325,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}.mkString(",")
}

@nowarn("cat=deprecation")
private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = {
val configs = servers.map { server =>
val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
Expand All @@ -1350,6 +1352,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered)
}

@nowarn("cat=deprecation")
private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
Expand All @@ -1358,6 +1361,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props.asScala.foreach { case (k, v) => waitForConfigOnServer(server, k, v) }
}

@nowarn("cat=deprecation")
private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties,
perBrokerConfig: Boolean): AlterConfigsResult = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
.map(KafkaConfig.fromProps(_, overridingProps))

@Before
override def setUp: Unit = {
override def setUp(): Unit = {
// Do some Metrics Registry cleanup by removing the metrics that this test checks.
// This is a test workaround to the issue that prior harness runs may have left a populated registry.
// see https://issues.apache.org/jira/browse/KAFKA-4605
Expand Down
Loading

0 comments on commit c5ae154

Please sign in to comment.