Skip to content

Commit

Permalink
KAFKA-7671: Stream-Global Table join should not reset repartition flag (
Browse files Browse the repository at this point in the history
apache#5959)

This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results.

I've added a test which fails without the fix.

Reviewers: John Roesler <[email protected]>, Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
bbejeck authored and guozhangwang committed Nov 29, 2018
1 parent ec66818 commit 2c305dc
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, VG> g
builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);

// do not have serde for joined result
return new KStreamImpl<>(name, keySerde, null, sourceNodes, false, streamTableJoinNode, builder);
return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired, streamTableJoinNode, builder);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,18 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -437,6 +440,27 @@ public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningT
}
}
}

@Test
public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
final StreamsBuilder builder = new StreamsBuilder();
final GlobalKTable<String, String> globalKTable = builder.globalTable("globalTopic");
final KeyValueMapper<String, String, String> kvMappper = (k, v) -> k + v;
final ValueJoiner<String, String, String> valueJoiner = (v1, v2) -> v1 + v2;
builder.<String, String>stream("topic").selectKey((k, v) -> v)
.join(globalKTable, kvMappper, valueJoiner)
.groupByKey()
.count();

final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
final String topology = builder.build().describe().toString();
final Matcher matcher = repartitionTopicPattern.matcher(topology);
assertTrue(matcher.find());
final String match = matcher.group();
assertThat(match, notNullValue());
assertTrue(match.endsWith("repartition"));

}

@Test
public void testToWithNullValueSerdeDoesntNPE() {
Expand Down

0 comments on commit 2c305dc

Please sign in to comment.