Skip to content

Commit

Permalink
KAFKA-9832: extend Kafka Streams EOS system test (apache#8440)
Browse files Browse the repository at this point in the history
Reviewers: Boyang Chen <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
mjsax authored Apr 15, 2020
1 parent bd42734 commit 17f9879
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class StandbyTask extends AbstractTask implements Task {

processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
eosEnabled = StreamThread.eosEnabled(config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -57,6 +56,8 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
Expand Down Expand Up @@ -547,7 +548,7 @@ public void run() {
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
private void runLoop() {
void runLoop() {
subscribeConsumer();

// if the thread is still in the middle of a rebalance, we should keep polling
Expand All @@ -569,6 +570,13 @@ private void runLoop() {
log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e);

taskManager.commit(
taskManager.tasks()
.values()
.stream()
.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
.collect(Collectors.toSet())
);
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
} catch (final TaskMigratedException e) {
log.warn("Detected that the thread is being fenced. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,11 +743,10 @@ private Stream<Task> standbyTaskStream() {
* @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
*/
int commitAll() {
return commitInternal(tasks.values());
return commit(tasks.values());
}

private int commitInternal(final Collection<Task> tasks) {

int commit(final Collection<Task> tasks) {
if (rebalanceInProgress) {
return -1;
} else {
Expand All @@ -763,7 +762,9 @@ private int commitInternal(final Collection<Task> tasks) {
}
}

commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
}

for (final Task task : tasks) {
if (task.commitNeeded()) {
Expand All @@ -786,7 +787,7 @@ int maybeCommitActiveTasksPerUserRequested() {
} else {
for (final Task task : activeTaskIterable()) {
if (task.commitRequested() && task.commitNeeded()) {
return commitInternal(activeTaskIterable());
return commit(activeTaskIterable());
}
}
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
Expand All @@ -52,15 +56,29 @@
* An integration test to verify the conversion of a dirty-closed EOS
* task towards a standby task is safe across restarts of the application.
*/
@RunWith(Parameterized.class)
public class StandbyTaskEOSIntegrationTest {

@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}

@Parameterized.Parameter
public String eosConfig;

private final String appId = "eos-test-app";
private final String inputTopic = "input";

@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

@Before
public void createTopics() throws Exception {
CLUSTER.deleteTopicsAndWait(inputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
CLUSTER.createTopic(inputTopic, 1, 3);
}

Expand All @@ -77,14 +95,13 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted
new Properties()),
10L);

final String appId = "eos-test-app";
final String stateDirPath = TestUtils.tempDirectory(appId).getPath();

final CountDownLatch instanceLatch = new CountDownLatch(1);

try (
final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-1/", instanceLatch);
final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-2/", instanceLatch);
final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch);
final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch);
) {


Expand All @@ -102,17 +119,19 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted

streamInstanceOne.close(Duration.ZERO);
streamInstanceTwo.close(Duration.ZERO);

streamInstanceOne.cleanUp();
streamInstanceTwo.cleanUp();
}
}

private KafkaStreams buildStreamWithDirtyStateDir(final String appId,
final String stateDirPath,
private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
final CountDownLatch recordProcessLatch) throws IOException {

final StreamsBuilder builder = new StreamsBuilder();
final TaskId taskId = new TaskId(0, 0);

final Properties props = props(appId, stateDirPath);
final Properties props = props(stateDirPath);

final StateDirectory stateDirectory = new StateDirectory(
new StreamsConfig(props), new MockTime(), true);
Expand All @@ -133,14 +152,14 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String appId,
return new KafkaStreams(builder.build(), props);
}

private Properties props(final String appId, final String stateDirPath) {
private Properties props(final String stateDirPath) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public void shouldCloseStateManagerOnTaskCreated() {
}

@Test
public void shouldDeleteStateDirOnTaskCreatedAndEOSUncleanClose() {
public void shouldDeleteStateDirOnTaskCreatedAndEosAlphaUncleanClose() {
stateManager.close();
EasyMock.expectLastCall();

Expand Down Expand Up @@ -442,6 +442,35 @@ public void shouldDeleteStateDirOnTaskCreatedAndEOSUncleanClose() {
assertEquals(Task.State.CLOSED, task.state());
}

@Test
public void shouldDeleteStateDirOnTaskCreatedAndEosBetaUncleanClose() {
stateManager.close();
EasyMock.expectLastCall();

EasyMock.expect(stateManager.baseDir()).andReturn(baseDir);

EasyMock.replay(stateManager);

final MetricName metricName = setupCloseTaskMetric();

config = new StreamsConfig(mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA)
)));

task = createStandbyTask();

task.closeDirty();

final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);

EasyMock.verify(stateManager);

assertEquals(Task.State.CLOSED, task.state());
}

private StandbyTask createStandbyTask() {
return new StandbyTask(taskId, Collections.singleton(partition), topology, config, streamsMetrics, stateManager, stateDirectory);
}
Expand Down
Loading

0 comments on commit 17f9879

Please sign in to comment.