From 8d1fddd0950d4dda94f553741ab445cced6d1be7 Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Thu, 7 Mar 2019 23:45:55 -0800 Subject: [PATCH] KAFKA-8069: Setting expireTimestamp to None if it is the default value after loading v1 offset records from __consumer_offsets (#3) * KAFKA-8069: Setting expireTimestamp to None if it is the default value after loading v1 offset records from __consumer_offsets After the 2.1 release, if the broker hasn't been upgrade to the latest inter-broker protocol version, the committed offsets stored in the __consumer_offset topic will get cleaned up way earlier than it should be when the offsets are loaded back from the __consumer_offset topic in GroupCoordinator, which will happen during leadership transition or after broker bounce. This patch fixes the bug by setting expireTimestamp to None if it is the default value after loading v1 offset records from __consumer_offsets --- .../group/GroupMetadataManager.scala | 6 +- .../unit/kafka/server/OffsetCommitTest.scala | 88 ++++++++++++++++--- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index f2102346a1661..b60ae433ae038 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -1233,8 +1233,10 @@ object GroupMetadataManager { val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String] val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long] val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long] - - OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp) + if (expireTimestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP) + OffsetAndMetadata(offset, metadata, commitTimestamp) + else + OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp) } else if (version == 2) { val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long] val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String] diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 3e8a535946814..ab288af28bd29 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest} +import kafka.api._ import kafka.consumer.SimpleConsumer import kafka.common.{OffsetAndMetadata, OffsetMetadata, OffsetMetadataAndError, TopicAndPartition} import kafka.utils._ @@ -30,6 +30,8 @@ import org.junit.Assert._ import java.util.Properties import java.io.File +import org.apache.kafka.common.requests.ApiVersionsResponse + import scala.util.Random import scala.collection._ @@ -46,12 +48,16 @@ class OffsetCommitTest extends ZooKeeperTestHarness { @Before override def setUp() { super.setUp() - val config: Properties = createBrokerConfig(1, zkConnect, enableDeleteTopic = true) - config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") - config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString) - val logDirPath = config.getProperty("log.dir") - logDir = new File(logDirPath) - server = TestUtils.createServer(KafkaConfig.fromProps(config), Time.SYSTEM) + + } + + @After + override def tearDown() { + + super.tearDown() + } + + def StartConsumer(): Unit = { simpleConsumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 1000000, 64*1024, "test-client") val consumerMetadataRequest = GroupCoordinatorRequest(group) Stream.continually { @@ -63,15 +69,26 @@ class OffsetCommitTest extends ZooKeeperTestHarness { }) } - @After - override def tearDown() { + def StartServerAndConsumer(protocolVersion: ApiVersion = ApiVersion.latestVersion): Unit = { + val config: Properties = createBrokerConfig(1, zkConnect, enableDeleteTopic = true) + config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") + config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString) + config.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, protocolVersion.toString) + val logDirPath = config.getProperty("log.dir") + logDir = new File(logDirPath) + server = TestUtils.createServer(KafkaConfig.fromProps(config), Time.SYSTEM) + StartConsumer() + } + + def ShutdownServerAndConsumer(): Unit = { simpleConsumer.close TestUtils.shutdownServers(Seq(server)) - super.tearDown() } @Test def testUpdateOffsets() { + StartServerAndConsumer() + val topic = "topic" // Commit an offset @@ -117,10 +134,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness { assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get) assertEquals(1, fetchResponse2.requestInfo.size) + + ShutdownServerAndConsumer() } @Test def testCommitAndFetchOffsets() { + StartServerAndConsumer() + val topic1 = "topic-1" val topic2 = "topic-2" val topic3 = "topic-3" @@ -187,10 +208,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness { assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset) assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset) assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.offset) + + ShutdownServerAndConsumer() } @Test def testLargeMetadataPayload() { + StartServerAndConsumer() + val topicAndPartition = TopicAndPartition("large-metadata", 0) val expectedReplicaAssignment = Map(0 -> List(1)) createTopic(zkClient, topicAndPartition.topic, partitionReplicaAssignment = expectedReplicaAssignment, @@ -211,10 +236,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness { val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1) assertEquals(Errors.OFFSET_METADATA_TOO_LARGE, commitResponse1.commitStatus.get(topicAndPartition).get) + + ShutdownServerAndConsumer() } @Test def testOffsetExpiration() { + StartServerAndConsumer() + // set up topic partition val topic = "topic" val topicPartition = TopicAndPartition(topic, 0) @@ -284,10 +313,43 @@ class OffsetCommitTest extends ZooKeeperTestHarness { Thread.sleep(retentionCheckInterval * 2) assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + ShutdownServerAndConsumer() + } + + // Verify whether the committed offset is still there after bouncing a broker + // This test case reproduces the issue in KAFKA-8069 + @Test + def testOffsetExpirationInOldBrokerVersion() { + StartServerAndConsumer(KAFKA_1_1_IV0) + + // set up topic partition + val topic = "topic" + val topicPartition = TopicAndPartition(topic, 0) + createTopic(zkClient, topic, servers = Seq(server), numPartitions = 1) + + val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0))) + + val commitRequest = OffsetCommitRequest( + groupId = "test-group", + requestInfo = immutable.Map(TopicAndPartition(topic, 0) -> OffsetAndMetadata(5L, "metadata")), + versionId = 2) + assertEquals(Errors.NONE, simpleConsumer.commitOffsets(commitRequest).commitStatus.get(topicPartition).get) + + simpleConsumer.close() + server.shutdown() + server.startup() + StartConsumer() + + Thread.sleep(retentionCheckInterval * 2) + assertEquals(5L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + ShutdownServerAndConsumer() } @Test def testNonExistingTopicOffsetCommit() { + StartServerAndConsumer() + val topic1 = "topicDoesNotExists" val topic2 = "topic-2" @@ -302,10 +364,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness { assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get) assertEquals(Errors.NONE, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get) + + ShutdownServerAndConsumer() } @Test def testOffsetsDeleteAfterTopicDeletion() { + StartServerAndConsumer() + // set up topic partition val topic = "topic" val topicPartition = TopicAndPartition(topic, 0) @@ -326,6 +392,8 @@ class OffsetCommitTest extends ZooKeeperTestHarness { val offsetMetadataAndErrorMap = simpleConsumer.fetchOffsets(fetchRequest) val offsetMetadataAndError = offsetMetadataAndErrorMap.requestInfo(topicPartition) assertEquals(OffsetMetadataAndError.NoOffset, offsetMetadataAndError) + + ShutdownServerAndConsumer() } }