Skip to content

Commit

Permalink
Follow up for KIP-149 (confluentinc#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax authored Jan 31, 2018
1 parent a92f99d commit 68c33a5
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.Properties;

Expand Down Expand Up @@ -121,7 +120,7 @@ public static void main(final String[] args) {
final KStream<byte[], String> textLines = builder.stream("TextLinesTopic", Consumed.with(byteArraySerde, stringSerde));

// Variant 1: using `mapValues`
final KStream<byte[], String> uppercasedWithMapValues = textLines.mapValues((ValueMapper<String, String>) String::toLowerCase);
final KStream<byte[], String> uppercasedWithMapValues = textLines.mapValues(v -> v.toLowerCase());

// Write (i.e. persist) the results to a new Kafka topic called "UppercasedTextLinesTopic".
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -94,8 +93,8 @@ public void shouldFanoutTheInput() throws Exception {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStream<byte[], String> stream1 = builder.stream(inputTopicA);
KStream<byte[], String> stream2 = stream1.mapValues((ValueMapper<String, String>) String::toUpperCase);
KStream<byte[], String> stream3 = stream1.mapValues((ValueMapper<String, String>) String::toLowerCase);
KStream<byte[], String> stream2 = stream1.mapValues((v -> v.toUpperCase()));
KStream<byte[], String> stream3 = stream1.mapValues(v -> v.toLowerCase());
stream2.to(outputTopicB);
stream3.to(outputTopicC);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -76,7 +75,7 @@ public void shouldUppercaseTheInput() throws Exception {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStream<byte[], String> input = builder.stream(inputTopic);
KStream<byte[], String> uppercased = input.mapValues((ValueMapper<String, String>) String::toUpperCase);
KStream<byte[], String> uppercased = input.mapValues(v -> v.toUpperCase());
uppercased.to(outputTopic);

KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -145,7 +144,7 @@ public void shouldAnonymizeTheInput() throws Exception {

KStream<byte[], String> input = builder.stream(inputTopic);
KStream<byte[], String> uppercasedAndAnonymized = input
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.mapValues(v -> v.toUpperCase())
.transform(AnonymizeIpAddressTransformer::new);
uppercasedAndAnonymized.to(outputTopic);

Expand Down

0 comments on commit 68c33a5

Please sign in to comment.