diff --git a/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java b/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java index 9b6f66c6bc..7655647cb2 100644 --- a/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java @@ -24,7 +24,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.ValueMapper; import java.util.Properties; @@ -121,7 +120,7 @@ public static void main(final String[] args) { final KStream textLines = builder.stream("TextLinesTopic", Consumed.with(byteArraySerde, stringSerde)); // Variant 1: using `mapValues` - final KStream uppercasedWithMapValues = textLines.mapValues((ValueMapper) String::toLowerCase); + final KStream uppercasedWithMapValues = textLines.mapValues(v -> v.toLowerCase()); // Write (i.e. persist) the results to a new Kafka topic called "UppercasedTextLinesTopic". // diff --git a/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java index abcc15022f..4dc7566b62 100644 --- a/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.ValueMapper; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -94,8 +93,8 @@ public void shouldFanoutTheInput() throws Exception { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStream stream1 = builder.stream(inputTopicA); - KStream stream2 = stream1.mapValues((ValueMapper) String::toUpperCase); - KStream stream3 = stream1.mapValues((ValueMapper) String::toLowerCase); + KStream stream2 = stream1.mapValues((v -> v.toUpperCase())); + KStream stream3 = stream1.mapValues(v -> v.toLowerCase()); stream2.to(outputTopicB); stream3.to(outputTopicC); diff --git a/src/test/java/io/confluent/examples/streams/MapFunctionLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/MapFunctionLambdaIntegrationTest.java index 37dd7ecee0..d88c70fdb6 100644 --- a/src/test/java/io/confluent/examples/streams/MapFunctionLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/MapFunctionLambdaIntegrationTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.ValueMapper; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -76,7 +75,7 @@ public void shouldUppercaseTheInput() throws Exception { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStream input = builder.stream(inputTopic); - KStream uppercased = input.mapValues((ValueMapper) String::toUpperCase); + KStream uppercased = input.mapValues(v -> v.toUpperCase()); uppercased.to(outputTopic); KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); diff --git a/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java index d0be975ce6..68c7c1acb7 100644 --- a/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/MixAndMatchLambdaIntegrationTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.junit.BeforeClass; @@ -145,7 +144,7 @@ public void shouldAnonymizeTheInput() throws Exception { KStream input = builder.stream(inputTopic); KStream uppercasedAndAnonymized = input - .mapValues((ValueMapper) String::toUpperCase) + .mapValues(v -> v.toUpperCase()) .transform(AnonymizeIpAddressTransformer::new); uppercasedAndAnonymized.to(outputTopic);