Skip to content

Commit

Permalink
Merge branch '3.3.3-post' into 4.0.0-post
Browse files Browse the repository at this point in the history
  • Loading branch information
vvcephei committed Apr 17, 2019
2 parents 850f88e + bbb867d commit 9458361
Show file tree
Hide file tree
Showing 61 changed files with 683 additions and 588 deletions.
17 changes: 17 additions & 0 deletions checkstyle.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">

<module name="Checker">
<module name="TreeWalker">
<module name="FinalLocalVariable">
<property name="tokens" value="VARIABLE_DEF,PARAMETER_DEF"/>
<property name="validateEnhancedForLoopVariable" value="true"/>
</module>

<module name="FinalParameters">
<property name="tokens" value="METHOD_DEF,CTOR_DEF,LITERAL_CATCH,FOR_EACH_CLAUSE"/>
</module>
</module>
</module>
53 changes: 50 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,56 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>8.18</version>
</dependency>
</dependencies>
<executions>
<!--
This declaration merges with the one in the parent, rather
than overriding it, so we need to disable the "validate" phase
execution that the parent declares and declare our own
during "test-compile".
One reason for this is that avro codegen runs during compile,
and while it's not strictly a precondition, it's
confusing to address style violations while the IDE is telling you
that some generated class doesn't exist. Test-compile is the first phase
that's guaranteed to run after compile and before any unit or integration
tests.
Also, we want to disable the parent's configuration because it declares stuff
we don't care about, like suppressions. (Honestly, it shouldn't)
-->
<execution>
<id>validate</id>
<phase>none</phase>
<configuration>
<skip>true</skip>
</configuration>
</execution>
<execution>
<id>test-compile</id>
<phase>test-compile</phase>
<configuration>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<failOnViolation>true</failOnViolation>
<includeResources>false</includeResources>
<includeTestResources>false</includeTestResources>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<excludes>io/confluent/examples/streams/avro/**</excludes>
<configLocation>checkstyle.xml</configLocation>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class GlobalKTablesExample {
static final String PRODUCT_STORE = "product-store";
static final String ENRICHED_ORDER_TOPIC = "enriched-order";

public static void main(String[] args) {
public static void main(final String[] args) {
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final String schemaRegistryUrl = args.length > 1 ? args[1] : "http://localhost:8081";
final KafkaStreams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class GlobalKTablesExampleDriver {
private static final Random RANDOM = new Random();
private static final int RECORDS_TO_GENERATE = 100;

public static void main(String[] args) {
public static void main(final String[] args) {
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final String schemaRegistryUrl = args.length > 1 ? args[1] : "http://localhost:8081";
generateCustomers(bootstrapServers, schemaRegistryUrl, RECORDS_TO_GENERATE);
Expand All @@ -87,7 +87,7 @@ private static void receiveEnrichedOrders(final String bootstrapServers,

final KafkaConsumer<Long, EnrichedOrder> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singleton(ENRICHED_ORDER_TOPIC));
int received = 0;
final int received = 0;
while(received < expected) {
final ConsumerRecords<Long, EnrichedOrder> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> System.out.println(record.value()));
Expand Down Expand Up @@ -183,7 +183,7 @@ private static <VT extends SpecificRecord> SpecificAvroSerde<VT> createSerde(fin
}

// Copied from org.apache.kafka.test.TestUtils
private static String randomString(int len) {
private static String randomString(final int len) {
final StringBuilder b = new StringBuilder();

for(int i = 0; i < len; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static void main(final String[] args) throws IOException {
consumeOutput(bootstrapServers);
}

private static void produceInputs(String bootstrapServers, String schemaRegistryUrl) throws IOException {
private static void produceInputs(final String bootstrapServers, final String schemaRegistryUrl) throws IOException {
final String[] users = {"erica", "bob", "joe", "damian", "tania", "phil", "sam", "lauren", "joseph"};
final String[] regions = {"europe", "usa", "asia", "africa"};

Expand Down Expand Up @@ -98,7 +98,7 @@ private static void produceInputs(String bootstrapServers, String schemaRegistry
}
}

private static void consumeOutput(String bootstrapServers) {
private static void consumeOutput(final String bootstrapServers) {
final String resultTopic = "PageViewsByRegion";
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Expand All @@ -119,11 +119,16 @@ private static void consumeOutput(String bootstrapServers) {
}
}

private static Schema loadSchema(final String name) throws IOException {
try (InputStream input = PageViewRegionLambdaExample.class.getClassLoader()
.getResourceAsStream("avro/io/confluent/examples/streams/" + name)) {
return new Schema.Parser().parse(input);
}
private static Schema loadSchema(final String name) throws IOException {
try (
final InputStream input =
PageViewRegionLambdaExample
.class
.getClassLoader()
.getResourceAsStream("avro/io/confluent/examples/streams/" + name)
) {
return new Schema.Parser().parse(input);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public static void main(final String[] args) throws Exception {

final KTable<Windowed<String>, Long> viewsByRegion = viewsByUser
.leftJoin(userRegions, (view, region) -> {
GenericRecord viewRegion = new GenericData.Record(schema);
final GenericRecord viewRegion = new GenericData.Record(schema);
viewRegion.put("user", view.get("user"));
viewRegion.put("page", view.get("page"));
viewRegion.put("region", region);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class SessionWindowsExample {
static final Long INACTIVITY_GAP = TimeUnit.MINUTES.toMillis(30);
static final String PLAY_EVENTS_PER_SESSION = "play-events-per-session";

public static void main(String[] args) {
public static void main(final String[] args) {
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final String schemaRegistryUrl = args.length > 1 ? args[1] : "http://localhost:8081";
final KafkaStreams streams = createStreams(bootstrapServers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class SessionWindowsExampleDriver {

public static final int NUM_RECORDS_SENT = 8;

public static void main(String[] args) {
public static void main(final String[] args) {
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final String schemaRegistryUrl = args.length > 1 ? args[1] : "http://localhost:8081";
producePlayEvents(bootstrapServers, schemaRegistryUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void main(final String[] args) throws Exception {
consumeOutput(bootstrapServers);
}

private static void consumeOutput(String bootstrapServers) {
private static void consumeOutput(final String bootstrapServers) {
final Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
Expand All @@ -75,7 +75,7 @@ private static void consumeOutput(String bootstrapServers) {
}
}

private static void produceInput(String bootstrapServers) {
private static void produceInput(final String bootstrapServers) {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@
*/
public class TopArticlesExampleDriver {

public static void main(String[] args) throws IOException {
public static void main(final String[] args) throws IOException {
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final String schemaRegistryUrl = args.length > 1 ? args[1] : "http://localhost:8081";
produceInputs(bootstrapServers, schemaRegistryUrl);
consumeOutput(bootstrapServers, schemaRegistryUrl);
}

private static void produceInputs(String bootstrapServers, String schemaRegistryUrl) throws IOException {
private static void produceInputs(final String bootstrapServers,
final String schemaRegistryUrl) throws IOException {
final String[] users = {"erica", "bob", "joe", "damian", "tania", "phil", "sam",
"lauren", "joseph"};
final String[] industries = {"engineering", "telco", "finance", "health", "science"};
Expand All @@ -85,7 +86,7 @@ private static void produceInputs(String bootstrapServers, String schemaRegistry
new GenericRecordBuilder(loadSchema("pageview.avsc"));

final Random random = new Random();
for (String user : users) {
for (final String user : users) {
pageViewBuilder.set("industry", industries[random.nextInt(industries.length)]);
pageViewBuilder.set("flags", "ARTICLE");
// For each user generate some page views
Expand All @@ -101,7 +102,7 @@ record -> producer.send(new ProducerRecord<>(TopArticlesLambdaExample.PAGE_VIEWS
producer.flush();
}

private static void consumeOutput(String bootstrapServers, String schemaRegistryUrl) {
private static void consumeOutput(final String bootstrapServers, final String schemaRegistryUrl) {
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Expand All @@ -118,16 +119,21 @@ private static void consumeOutput(String bootstrapServers, String schemaRegistry

consumer.subscribe(Collections.singleton(TopArticlesLambdaExample.TOP_NEWS_PER_INDUSTRY_TOPIC));
while (true) {
ConsumerRecords<Windowed<String>, String> consumerRecords = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<Windowed<String>, String> consumerRecord : consumerRecords) {
final ConsumerRecords<Windowed<String>, String> consumerRecords = consumer.poll(Long.MAX_VALUE);
for (final ConsumerRecord<Windowed<String>, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key().key() + "@" + consumerRecord.key().window().start() + "=" + consumerRecord.value());
}
}
}

static Schema loadSchema(String name) throws IOException {
try (InputStream input = TopArticlesLambdaExample.class.getClassLoader()
.getResourceAsStream("avro/io/confluent/examples/streams/" + name)) {
static Schema loadSchema(final String name) throws IOException {
try (
final InputStream input =
TopArticlesLambdaExample
.class
.getClassLoader()
.getResourceAsStream("avro/io/confluent/examples/streams/" + name)
) {
return new Schema.Parser().parse(input);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ static KafkaStreams buildTopArticlesStream(final String bootstrapServers,
// the selector
(windowedArticle, count) -> {
// project on the industry field for key
Windowed<String> windowedIndustry =
final Windowed<String> windowedIndustry =
new Windowed<>(windowedArticle.key().get("industry").toString(),
windowedArticle.window());
// add the page into the value
GenericRecord viewStats = new GenericData.Record(schema);
final GenericRecord viewStats = new GenericData.Record(schema);
viewStats.put("page", windowedArticle.key().get("page"));
viewStats.put("user", "user");
viewStats.put("industry", windowedArticle.key().get("industry"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public static void main(final String[] args) throws IOException {
consumeOutput(bootstrapServers, schemaRegistryUrl);
}

private static void produceInputs(String bootstrapServers, String schemaRegistryUrl) throws IOException {
private static void produceInputs(final String bootstrapServers,
final String schemaRegistryUrl) {
final String[] users = {"erica", "bob", "joe", "damian", "tania", "phil", "sam",
"lauren", "joseph"};

Expand All @@ -83,7 +84,7 @@ record -> producer.send(new ProducerRecord<>(WikipediaFeedAvroExample.WIKIPEDIA_
producer.flush();
}

private static void consumeOutput(String bootstrapServers, String schemaRegistryUrl) {
private static void consumeOutput(final String bootstrapServers, final String schemaRegistryUrl) {
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*/
public class WordCountInteractiveQueriesDriver {

public static void main(String [] args) throws Exception {
public static void main(final String [] args) throws Exception {
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final List<String> inputValues = Arrays.asList("hello world",
"all streams lead to kafka",
Expand All @@ -60,7 +60,7 @@ public static void main(String [] args) throws Exception {
"one jolly sailor",
"king of the world");

Properties producerConfig = new Properties();
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ public class WordCountInteractiveQueriesExample {
static final String TEXT_LINES_TOPIC = "TextLinesTopic";
static final String DEFAULT_HOST = "localhost";

public static void main(String[] args) throws Exception {
public static void main(final String[] args) throws Exception {
if (args.length == 0 || args.length > 2) {
throw new IllegalArgumentException("usage: ... <portForRestEndPoint> [<bootstrap.servers> (optional)]");
}
final int port = Integer.valueOf(args[0]);
final String bootstrapServers = args.length > 1 ? args[1] : "localhost:9092";

Properties streamsConfiguration = new Properties();
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
// against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "interactive-queries-example");
Expand Down Expand Up @@ -195,7 +195,7 @@ public static void main(String[] args) throws Exception {
try {
streams.close();
restService.stop();
} catch (Exception e) {
} catch (final Exception e) {
// ignored
}
}));
Expand All @@ -213,8 +213,8 @@ static WordCountInteractiveQueriesRestService startRestProxy(final KafkaStreams

static KafkaStreams createStreams(final Properties streamsConfiguration) {
final Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String>
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String>
textLines = builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));

final KGroupedStream<String, String> groupedByWord = textLines
Expand Down
Loading

0 comments on commit 9458361

Please sign in to comment.