Skip to content

Commit

Permalink
KAFKA-18019: Make INVALID_PRODUCER_ID_MAPPING a fatal error (#17822)
Browse files Browse the repository at this point in the history
This patch contains changes to the handling of the INVALID_PRODUCER_ID_MAPPING error.
Quoted from KIP-890
Since we bump epoch on abort, we no longer need to call InitProducerId to fence requests. InitProducerId will only be called when the producer starts up to fence a previous instance.

With this change, some other calls to InitProducerId were inspected including the call after receiving an InvalidPidMappingException. This exception was changed to abortable as part of KIP-360: Improve reliability of idempotent/transactional producer. However, this change means that we can violate EOS guarantees. As an example:

Consider an application that is copying data from one partition to another

Application instance A processes to offset 4
Application instance B comes up and fences application instance A
Application instance B processes to offset 5
Application instances A and B are idle for transaction.id.expiration.ms, transaction id expires on server
Application instance A attempts to process offset 5 (since in its view, that is next) -- if we recover from invalid pid mapping, we can duplicate this processing
Thus, INVALID_PID_MAPPING should be fatal to the producer.

This is consistent with KIP-1050: Consistent error handling for Transactions where errors that are fatal to the producer are in the "application recoverable" category. This is a grouping that indicates to the client that the producer needs to restart and recovery on the application side is necessary. KIP-1050 is approved so we are consistent with that decision.

This PR also fixes the flakiness of TransactionsExpirationTest.

Reviewers:  Artem Livshits <[email protected]>, Justine Olshan <[email protected]>, Calvin Liu <[email protected]>
  • Loading branch information
rreddy-22 authored Nov 18, 2024
1 parent 5cf9872 commit e4c0034
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
Expand Down Expand Up @@ -339,27 +338,24 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
if (!newPartitionsInTransaction.isEmpty())
enqueueRequest(addPartitionsToTransactionHandler());

// If the error is an INVALID_PRODUCER_ID_MAPPING error, the server will not accept an EndTxnRequest, so skip
// directly to InitProducerId. Otherwise, we must first abort the transaction, because the producer will be
// fenced if we directly call InitProducerId.
if (!(lastError instanceof InvalidPidMappingException)) {
EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
new EndTxnRequestData()
.setTransactionalId(transactionalId)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch)
.setCommitted(transactionResult.id),
isTransactionV2Enabled
);
EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
new EndTxnRequestData()
.setTransactionalId(transactionalId)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch)
.setCommitted(transactionResult.id),
isTransactionV2Enabled
);

EndTxnHandler handler = new EndTxnHandler(builder);
enqueueRequest(handler);
if (!epochBumpRequired) {
return handler.result;
}
EndTxnHandler handler = new EndTxnHandler(builder);
enqueueRequest(handler);

// If an epoch bump is required for recovery, initialize the transaction after completing the EndTxn request.
if (epochBumpRequired) {
return initializeTransactions(this.producerIdAndEpoch);
}

return initializeTransactions(this.producerIdAndEpoch);
return handler.result;
}

public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
Expand Down Expand Up @@ -1410,7 +1406,7 @@ public void handleResponse(AbstractResponse response) {
fatalError(Errors.PRODUCER_FENCED.exception());
return;
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
error == Errors.INVALID_TXN_STATE) {
error == Errors.INVALID_TXN_STATE || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
fatalError(error.exception());
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
Expand All @@ -1419,7 +1415,7 @@ public void handleResponse(AbstractResponse response) {
log.debug("Did not attempt to add partition {} to transaction because other partitions in the " +
"batch had errors.", topicPartition);
hasPartitionErrors = true;
} else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
} else if (error == Errors.UNKNOWN_PRODUCER_ID) {
abortableErrorIfPossible(error.exception());
return;
} else if (error == Errors.TRANSACTION_ABORTABLE) {
Expand Down Expand Up @@ -1595,9 +1591,9 @@ public void handleResponse(AbstractResponse response) {
// just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
error == Errors.INVALID_TXN_STATE) {
error == Errors.INVALID_TXN_STATE || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
fatalError(error.exception());
} else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
} else if (error == Errors.UNKNOWN_PRODUCER_ID) {
abortableErrorIfPossible(error.exception());
} else if (error == Errors.TRANSACTION_ABORTABLE) {
abortableError(error.exception());
Expand Down Expand Up @@ -1648,14 +1644,14 @@ public void handleResponse(AbstractResponse response) {
reenqueue();
} else if (error.exception() instanceof RetriableException) {
reenqueue();
} else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
} else if (error == Errors.UNKNOWN_PRODUCER_ID) {
abortableErrorIfPossible(error.exception());
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
// just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
error == Errors.INVALID_TXN_STATE) {
error == Errors.INVALID_TXN_STATE || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
fatalError(error.exception());
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3346,7 +3346,7 @@ public void testBumpTransactionalEpochOnRecoverableAddPartitionRequestError() {

transactionManager.beginTransaction();
transactionManager.maybeAddPartition(tp0);
prepareAddPartitionsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING, tp0, initialEpoch, producerId);
prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_PRODUCER_ID, tp0, initialEpoch, producerId);
runUntil(transactionManager::hasAbortableError);
TransactionalRequestResult abortResult = transactionManager.beginAbort();

Expand Down Expand Up @@ -3378,7 +3378,7 @@ public void testBumpTransactionalEpochOnRecoverableAddOffsetsRequestError() thro
offsets.put(tp0, new OffsetAndMetadata(1));
transactionManager.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(consumerGroupId));
assertFalse(transactionManager.hasPendingOffsetCommits());
prepareAddOffsetsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING, consumerGroupId, producerId, initialEpoch);
prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_PRODUCER_ID, consumerGroupId, producerId, initialEpoch);
runUntil(transactionManager::hasAbortableError); // Send AddOffsetsRequest
TransactionalRequestResult abortResult = transactionManager.beginAbort();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, ProducerState}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -126,13 +125,20 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness {
// Producer IDs should be retained.
assertEquals(1, producerState.size)

// Start a new transaction and attempt to send, which will trigger an AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
// Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail
// due to the expired transactional ID, resulting in a fatal error.
producer.beginTransaction()
val failedFuture = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "1", "1", willBeCommitted = false))
TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never completed.")
org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, classOf[InvalidPidMappingException])

JTestUtils.assertFutureThrows(failedFuture, classOf[InvalidPidMappingException])
producer.abortTransaction()
// Assert that aborting the transaction throws a KafkaException due to the fatal error.
assertThrows(classOf[KafkaException], () => producer.abortTransaction())

// Close the producer and reinitialize to recover from the fatal error.
producer.close()
producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
producer.initTransactions()

producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ package kafka.api
import java.util.{Collections, Properties}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils
import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
import org.apache.kafka.clients.admin.{Admin, ProducerState}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
import org.junit.jupiter.params.provider.CsvSource

import scala.jdk.CollectionConverters._
import scala.collection.Seq
Expand Down Expand Up @@ -81,9 +81,14 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
super.tearDown()
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testBumpTransactionalEpochAfterInvalidProducerIdMapping(quorum: String, groupProtocol: String): Unit = {
@ParameterizedTest
@CsvSource(Array(
"kraft,classic,false",
"kraft,consumer,false",
"kraft,classic,true",
"kraft,consumer,true",
))
def testFatalErrorAfterInvalidProducerIdMapping(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = {
producer.initTransactions()

// Start and then abort a transaction to allow the transactional ID to expire.
Expand All @@ -96,14 +101,22 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
waitUntilTransactionalStateExists()
waitUntilTransactionalStateExpires()

// Start a new transaction and attempt to send, which will trigger an AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
// Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail
// due to the expired transactional ID, resulting in a fatal error.
producer.beginTransaction()
val failedFuture = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, "1", "1", willBeCommitted = false))
TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never completed.")

org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, classOf[InvalidPidMappingException])
producer.abortTransaction()

// Assert that aborting the transaction throws a KafkaException due to the fatal error.
assertThrows(classOf[KafkaException], () => producer.abortTransaction())

// Close the producer and reinitialize to recover from the fatal error.
producer.close()
producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
producer.initTransactions()

// Proceed with a new transaction after reinitializing.
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "2", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 2, "4", "4", willBeCommitted = true))
Expand Down Expand Up @@ -170,7 +183,9 @@ class TransactionsExpirationTest extends KafkaServerTestHarness {
// soon after the first will re-use the same producerId, while bumping the epoch to indicate that they are distinct.
assertEquals(oldProducerId, newProducerId)
if (isTV2Enabled) {
assertEquals(oldProducerEpoch + 3, newProducerEpoch)
// TV2 bumps epoch on EndTxn, and the final commit may or may not have bumped the epoch in the producer state.
// The epoch should be at least oldProducerEpoch + 2 for the first commit and the restarted producer.
assertTrue(oldProducerEpoch + 2 <= newProducerEpoch)
} else {
assertEquals(oldProducerEpoch + 1, newProducerEpoch)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ class TransactionsTest extends IntegrationTestHarness {
assertThrows(classOf[IllegalStateException], () => producer.initTransactions())
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@ParameterizedTest
@CsvSource(Array(
"kraft,classic,false",
"kraft,consumer,false",
Expand Down Expand Up @@ -762,8 +762,11 @@ class TransactionsTest extends IntegrationTestHarness {
}
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@CsvSource(Array("kraft, classic, true", "kraft, consumer, true"))
@ParameterizedTest
@CsvSource(Array(
"kraft, classic, true",
"kraft, consumer, true"
))
def testBumpTransactionalEpochWithTV2Enabled(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = {
val producer = createTransactionalProducer("transactionalProducer",
deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)
Expand Down

0 comments on commit e4c0034

Please sign in to comment.