Skip to content

Commit

Permalink
KAFKA-8069: Setting expireTimestamp to None if it is the default valu…
Browse files Browse the repository at this point in the history
…e after loading v1 offset records from __consumer_offsets (#3)

* KAFKA-8069: Setting expireTimestamp to None if it is the default value after loading v1 offset records from __consumer_offsets

After the 2.1 release, if the broker hasn't been upgrade to the latest inter-broker protocol version,
the committed offsets stored in the __consumer_offset topic will get cleaned up way earlier than it
should be when the offsets are loaded back from the __consumer_offset topic in GroupCoordinator, which
will happen during leadership transition or after broker bounce. This patch fixes the bug by setting
expireTimestamp to None if it is the default value after loading v1 offset records from __consumer_offsets
  • Loading branch information
hzxa21 authored Mar 8, 2019
1 parent fc61a21 commit 8d1fddd
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1233,8 +1233,10 @@ object GroupMetadataManager {
val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String]
val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
if (expireTimestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
OffsetAndMetadata(offset, metadata, commitTimestamp)
else
OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
} else if (version == 2) {
val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
Expand Down
88 changes: 78 additions & 10 deletions core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.server

import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest}
import kafka.api._
import kafka.consumer.SimpleConsumer
import kafka.common.{OffsetAndMetadata, OffsetMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.utils._
Expand All @@ -30,6 +30,8 @@ import org.junit.Assert._
import java.util.Properties
import java.io.File

import org.apache.kafka.common.requests.ApiVersionsResponse

import scala.util.Random
import scala.collection._

Expand All @@ -46,12 +48,16 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
@Before
override def setUp() {
super.setUp()
val config: Properties = createBrokerConfig(1, zkConnect, enableDeleteTopic = true)
config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString)
val logDirPath = config.getProperty("log.dir")
logDir = new File(logDirPath)
server = TestUtils.createServer(KafkaConfig.fromProps(config), Time.SYSTEM)

}

@After
override def tearDown() {

super.tearDown()
}

def StartConsumer(): Unit = {
simpleConsumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 1000000, 64*1024, "test-client")
val consumerMetadataRequest = GroupCoordinatorRequest(group)
Stream.continually {
Expand All @@ -63,15 +69,26 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
})
}

@After
override def tearDown() {
def StartServerAndConsumer(protocolVersion: ApiVersion = ApiVersion.latestVersion): Unit = {
val config: Properties = createBrokerConfig(1, zkConnect, enableDeleteTopic = true)
config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString)
config.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, protocolVersion.toString)
val logDirPath = config.getProperty("log.dir")
logDir = new File(logDirPath)
server = TestUtils.createServer(KafkaConfig.fromProps(config), Time.SYSTEM)
StartConsumer()
}

def ShutdownServerAndConsumer(): Unit = {
simpleConsumer.close
TestUtils.shutdownServers(Seq(server))
super.tearDown()
}

@Test
def testUpdateOffsets() {
StartServerAndConsumer()

val topic = "topic"

// Commit an offset
Expand Down Expand Up @@ -117,10 +134,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness {

assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get)
assertEquals(1, fetchResponse2.requestInfo.size)

ShutdownServerAndConsumer()
}

@Test
def testCommitAndFetchOffsets() {
StartServerAndConsumer()

val topic1 = "topic-1"
val topic2 = "topic-2"
val topic3 = "topic-3"
Expand Down Expand Up @@ -187,10 +208,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.offset)

ShutdownServerAndConsumer()
}

@Test
def testLargeMetadataPayload() {
StartServerAndConsumer()

val topicAndPartition = TopicAndPartition("large-metadata", 0)
val expectedReplicaAssignment = Map(0 -> List(1))
createTopic(zkClient, topicAndPartition.topic, partitionReplicaAssignment = expectedReplicaAssignment,
Expand All @@ -211,10 +236,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)

assertEquals(Errors.OFFSET_METADATA_TOO_LARGE, commitResponse1.commitStatus.get(topicAndPartition).get)

ShutdownServerAndConsumer()
}

@Test
def testOffsetExpiration() {
StartServerAndConsumer()

// set up topic partition
val topic = "topic"
val topicPartition = TopicAndPartition(topic, 0)
Expand Down Expand Up @@ -284,10 +313,43 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
Thread.sleep(retentionCheckInterval * 2)
assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)

ShutdownServerAndConsumer()
}

// Verify whether the committed offset is still there after bouncing a broker
// This test case reproduces the issue in KAFKA-8069
@Test
def testOffsetExpirationInOldBrokerVersion() {
StartServerAndConsumer(KAFKA_1_1_IV0)

// set up topic partition
val topic = "topic"
val topicPartition = TopicAndPartition(topic, 0)
createTopic(zkClient, topic, servers = Seq(server), numPartitions = 1)

val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0)))

val commitRequest = OffsetCommitRequest(
groupId = "test-group",
requestInfo = immutable.Map(TopicAndPartition(topic, 0) -> OffsetAndMetadata(5L, "metadata")),
versionId = 2)
assertEquals(Errors.NONE, simpleConsumer.commitOffsets(commitRequest).commitStatus.get(topicPartition).get)

simpleConsumer.close()
server.shutdown()
server.startup()
StartConsumer()

Thread.sleep(retentionCheckInterval * 2)
assertEquals(5L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)

ShutdownServerAndConsumer()
}

@Test
def testNonExistingTopicOffsetCommit() {
StartServerAndConsumer()

val topic1 = "topicDoesNotExists"
val topic2 = "topic-2"

Expand All @@ -302,10 +364,14 @@ class OffsetCommitTest extends ZooKeeperTestHarness {

assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
assertEquals(Errors.NONE, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get)

ShutdownServerAndConsumer()
}

@Test
def testOffsetsDeleteAfterTopicDeletion() {
StartServerAndConsumer()

// set up topic partition
val topic = "topic"
val topicPartition = TopicAndPartition(topic, 0)
Expand All @@ -326,6 +392,8 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
val offsetMetadataAndErrorMap = simpleConsumer.fetchOffsets(fetchRequest)
val offsetMetadataAndError = offsetMetadataAndErrorMap.requestInfo(topicPartition)
assertEquals(OffsetMetadataAndError.NoOffset, offsetMetadataAndError)

ShutdownServerAndConsumer()
}

}

0 comments on commit 8d1fddd

Please sign in to comment.