ordersStore() {
*
* If metadata is available, which can happen on startup, or during a rebalance, block until it is.
*/
- private HostStoreInfo getKeyLocationOrBlock(String id, AsyncResponse asyncResponse) {
+ private HostStoreInfo getKeyLocationOrBlock(final String id, final AsyncResponse asyncResponse) {
HostStoreInfo locationOfKey;
while (locationMetadataIsUnavailable(locationOfKey = getHostForOrderId(id))) {
//The metastore is not available. This can happen on startup/rebalance.
@@ -231,14 +233,14 @@ private HostStoreInfo getKeyLocationOrBlock(String id, AsyncResponse asyncRespon
try {
//Sleep a bit until metadata becomes available
Thread.sleep(Math.min(Long.valueOf(CALL_TIMEOUT), 200));
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
e.printStackTrace();
}
}
return locationOfKey;
}
- private boolean locationMetadataIsUnavailable(HostStoreInfo hostWithKey) {
+ private boolean locationMetadataIsUnavailable(final HostStoreInfo hostWithKey) {
return NOT_AVAILABLE.host().equals(hostWithKey.getHost())
&& NOT_AVAILABLE.port() == hostWithKey.getPort();
}
@@ -248,16 +250,16 @@ private boolean thisHost(final HostStoreInfo host) {
host.getPort() == port;
}
- private void fetchFromOtherHost(final String path, AsyncResponse asyncResponse, long timeout) {
+ private void fetchFromOtherHost(final String path, final AsyncResponse asyncResponse, final long timeout) {
log.info("Chaining GET to a different instance: " + path);
try {
- OrderBean bean = client.target(path)
+ final OrderBean bean = client.target(path)
.queryParam("timeout", timeout)
.request(MediaType.APPLICATION_JSON_TYPE)
.get(new GenericType() {
});
asyncResponse.resume(bean);
- } catch (Exception swallowed) {
+ } catch (final Exception swallowed) {
}
}
@@ -265,11 +267,11 @@ private void fetchFromOtherHost(final String path, AsyncResponse asyncResponse,
@ManagedAsync
@Path("orders/{id}/validated")
public void getPostValidationWithTimeout(@PathParam("id") final String id,
- @QueryParam("timeout") @DefaultValue(CALL_TIMEOUT) Long timeout,
+ @QueryParam("timeout") @DefaultValue(CALL_TIMEOUT) final Long timeout,
@Suspended final AsyncResponse asyncResponse) {
setTimeout(timeout, asyncResponse);
- HostStoreInfo hostForKey = getKeyLocationOrBlock(id, asyncResponse);
+ final HostStoreInfo hostForKey = getKeyLocationOrBlock(id, asyncResponse);
if (hostForKey == null) { //request timed out so return
return;
@@ -301,13 +303,13 @@ public void submitOrder(final OrderBean order,
@Suspended final AsyncResponse response) {
setTimeout(timeout, response);
- Order bean = fromBean(order);
+ final Order bean = fromBean(order);
producer.send(new ProducerRecord<>(ORDERS.name(), bean.getId(), bean),
callback(response, bean.getId()));
}
@Override
- public void start(String bootstrapServers) {
+ public void start(final String bootstrapServers) {
jettyServer = startJetty(port, this);
port = jettyServer.getURI().getPort(); // update port, in case port was zero
producer = startProducer(bootstrapServers, ORDER_VALIDATIONS);
@@ -315,8 +317,8 @@ public void start(String bootstrapServers) {
log.info("Started Service " + getClass().getSimpleName());
}
- private KafkaStreams startKStreams(String bootstrapServers) {
- KafkaStreams streams = new KafkaStreams(createOrdersMaterializedView(),
+ private KafkaStreams startKStreams(final String bootstrapServers) {
+ final KafkaStreams streams = new KafkaStreams(createOrdersMaterializedView(),
config(bootstrapServers));
metadataService = new MetadataService(streams);
streams.cleanUp(); //don't do this in prod as it clears your state stores
@@ -324,8 +326,8 @@ private KafkaStreams startKStreams(String bootstrapServers) {
return streams;
}
- private Properties config(String bootstrapServers) {
- Properties props = baseStreamsConfig(bootstrapServers, "/tmp/kafka-streams", SERVICE_APP_ID);
+ private Properties config(final String bootstrapServers) {
+ final Properties props = baseStreamsConfig(bootstrapServers, "/tmp/kafka-streams", SERVICE_APP_ID);
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, host + ":" + port);
return props;
}
@@ -341,7 +343,7 @@ public void stop() {
if (jettyServer != null) {
try {
jettyServer.stop();
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
}
}
@@ -358,7 +360,7 @@ public int port() {
return port;
}
- private HostStoreInfo getHostForOrderId(String orderId) {
+ private HostStoreInfo getHostForOrderId(final String orderId) {
return metadataService
.streamsMetadataForStoreAndKey(ORDERS_STORE_NAME, orderId, Serdes.String().serializer());
}
@@ -370,16 +372,16 @@ private Callback callback(final AsyncResponse response, final String orderId) {
} else {
try {
//Return the location of the newly created resource
- Response uri = Response.created(new URI("/v1/orders/" + orderId)).build();
+ final Response uri = Response.created(new URI("/v1/orders/" + orderId)).build();
response.resume(uri);
- } catch (URISyntaxException e2) {
+ } catch (final URISyntaxException e2) {
e2.printStackTrace();
}
}
};
}
- public static void main(String[] args) throws Exception {
+ public static void main(final String[] args) throws Exception {
final String bootstrapServers = args.length > 1 ? args[1] : "localhost:9092";
final String schemaRegistryUrl = args.length > 2 ? args[2] : "http://localhost:8081";
@@ -387,7 +389,7 @@ public static void main(String[] args) throws Exception {
final String restPort = args.length > 4 ? args[4] : null;
Schemas.configureSerdesWithSchemaRegistryUrl(schemaRegistryUrl);
- OrdersService service = new OrdersService(restHostname, restPort == null ? 0 : Integer.valueOf(restPort));
+ final OrdersService service = new OrdersService(restHostname, restPort == null ? 0 : Integer.valueOf(restPort));
service.start(bootstrapServers);
addShutdownHookAndBlock(service);
}
diff --git a/src/main/java/io/confluent/examples/streams/microservices/ValidationsAggregatorService.java b/src/main/java/io/confluent/examples/streams/microservices/ValidationsAggregatorService.java
index 83ab8f3174..154c8b2738 100644
--- a/src/main/java/io/confluent/examples/streams/microservices/ValidationsAggregatorService.java
+++ b/src/main/java/io/confluent/examples/streams/microservices/ValidationsAggregatorService.java
@@ -56,20 +56,20 @@ public class ValidationsAggregatorService implements Service {
private KafkaStreams streams;
@Override
- public void start(String bootstrapServers) {
+ public void start(final String bootstrapServers) {
streams = aggregateOrderValidations(bootstrapServers, "/tmp/kafka-streams");
streams.cleanUp(); //don't do this in prod as it clears your state stores
streams.start();
log.info("Started Service " + getClass().getSimpleName());
}
- private KafkaStreams aggregateOrderValidations(String bootstrapServers, String stateDir) {
+ private KafkaStreams aggregateOrderValidations(final String bootstrapServers, final String stateDir) {
final int numberOfRules = 3; //TODO put into a KTable to make dynamically configurable
- StreamsBuilder builder = new StreamsBuilder();
- KStream validations = builder
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream validations = builder
.stream(ORDER_VALIDATIONS.name(), serdes1);
- KStream orders = builder
+ final KStream orders = builder
.stream(ORDERS.name(), serdes2)
.filter((id, order) -> OrderState.CREATED.equals(order.getState()));
@@ -120,8 +120,8 @@ public void stop() {
}
}
- public static void main(String[] args) throws Exception {
- ValidationsAggregatorService service = new ValidationsAggregatorService();
+ public static void main(final String[] args) throws Exception {
+ final ValidationsAggregatorService service = new ValidationsAggregatorService();
service.start(parseArgsAndConfigure(args));
addShutdownHookAndBlock(service);
}
diff --git a/src/main/java/io/confluent/examples/streams/microservices/domain/Schemas.java b/src/main/java/io/confluent/examples/streams/microservices/domain/Schemas.java
index 3bc6c28ca4..891aedd48a 100644
--- a/src/main/java/io/confluent/examples/streams/microservices/domain/Schemas.java
+++ b/src/main/java/io/confluent/examples/streams/microservices/domain/Schemas.java
@@ -27,11 +27,11 @@ public class Schemas {
public static class Topic {
- private String name;
- private Serde keySerde;
- private Serde valueSerde;
+ private final String name;
+ private final Serde keySerde;
+ private final Serde valueSerde;
- Topic(String name, Serde keySerde, Serde valueSerde) {
+ Topic(final String name, final Serde keySerde, final Serde valueSerde) {
this.name = name;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
@@ -80,9 +80,9 @@ private static void createTopics() {
}
}
- public static void configureSerdesWithSchemaRegistryUrl(String url) {
+ public static void configureSerdesWithSchemaRegistryUrl(final String url) {
Topics.createTopics(); //wipe cached schema registry
- for (Topic topic : Topics.ALL.values()) {
+ for (final Topic topic : Topics.ALL.values()) {
configure(topic.keySerde(), url);
configure(topic.valueSerde(), url);
}
@@ -90,7 +90,7 @@ public static void configureSerdesWithSchemaRegistryUrl(String url) {
schemaRegistryUrl = url;
}
- private static void configure(Serde serde, String url) {
+ private static void configure(final Serde serde, final String url) {
if (serde instanceof SpecificAvroSerde) {
serde.configure(Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, url), false);
}
diff --git a/src/main/java/io/confluent/examples/streams/microservices/domain/beans/OrderBean.java b/src/main/java/io/confluent/examples/streams/microservices/domain/beans/OrderBean.java
index ef48732bcb..1f70bda23d 100644
--- a/src/main/java/io/confluent/examples/streams/microservices/domain/beans/OrderBean.java
+++ b/src/main/java/io/confluent/examples/streams/microservices/domain/beans/OrderBean.java
@@ -20,8 +20,12 @@ public OrderBean() {
}
- public OrderBean(String id, long customerId, OrderState state, Product product, int quantity,
- double price) {
+ public OrderBean(final String id,
+ final long customerId,
+ final OrderState state,
+ final Product product,
+ final int quantity,
+ final double price) {
this.id = id;
this.customerId = customerId;
this.state = state;
@@ -54,12 +58,12 @@ public double getPrice() {
return price;
}
- public void setId(String id) {
+ public void setId(final String id) {
this.id = id;
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (this == o) {
return true;
}
@@ -67,7 +71,7 @@ public boolean equals(Object o) {
return false;
}
- OrderBean orderBean = (OrderBean) o;
+ final OrderBean orderBean = (OrderBean) o;
if (this.customerId != orderBean.customerId) {
return false;
@@ -103,7 +107,7 @@ public String toString() {
@Override
public int hashCode() {
int result;
- long temp;
+ final long temp;
result = this.id != null ? this.id.hashCode() : 0;
result = 31 * result + (int) (this.customerId ^ this.customerId >>> 32);
result = 31 * result + (this.state != null ? this.state.hashCode() : 0);
@@ -114,7 +118,7 @@ public int hashCode() {
return result;
}
- public static OrderBean toBean(Order order) {
+ public static OrderBean toBean(final Order order) {
return new OrderBean(order.getId(),
order.getCustomerId(),
order.getState(),
@@ -123,7 +127,7 @@ public static OrderBean toBean(Order order) {
order.getPrice());
}
- public static Order fromBean(OrderBean order) {
+ public static Order fromBean(final OrderBean order) {
return new Order(order.getId(),
order.getCustomerId(),
order.getState(),
diff --git a/src/main/java/io/confluent/examples/streams/microservices/domain/beans/OrderId.java b/src/main/java/io/confluent/examples/streams/microservices/domain/beans/OrderId.java
index 2baaa698b6..e6acb1e755 100644
--- a/src/main/java/io/confluent/examples/streams/microservices/domain/beans/OrderId.java
+++ b/src/main/java/io/confluent/examples/streams/microservices/domain/beans/OrderId.java
@@ -2,7 +2,7 @@
public class OrderId {
- public static String id(long id) {
+ public static String id(final long id) {
return String.valueOf(id);
}
}
\ No newline at end of file
diff --git a/src/main/java/io/confluent/examples/streams/microservices/util/MicroserviceUtils.java b/src/main/java/io/confluent/examples/streams/microservices/util/MicroserviceUtils.java
index 73a6747029..ecaf79c7c3 100644
--- a/src/main/java/io/confluent/examples/streams/microservices/util/MicroserviceUtils.java
+++ b/src/main/java/io/confluent/examples/streams/microservices/util/MicroserviceUtils.java
@@ -34,7 +34,7 @@ public class MicroserviceUtils {
private static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://localhost:8081";
public static final long MIN = 60 * 1000L;
- public static String parseArgsAndConfigure(String[] args) {
+ public static String parseArgsAndConfigure(final String[] args) {
if (args.length > 2) {
throw new IllegalArgumentException("usage: ... " +
"[ (optional, default: " + DEFAULT_BOOTSTRAP_SERVERS + ")] " +
@@ -49,9 +49,10 @@ public static String parseArgsAndConfigure(String[] args) {
return bootstrapServers;
}
- public static Properties baseStreamsConfig(String bootstrapServers, String stateDir,
- String appId) {
- Properties config = new Properties();
+ public static Properties baseStreamsConfig(final String bootstrapServers,
+ final String stateDir,
+ final String appId) {
+ final Properties config = new Properties();
// Workaround for a known issue with RocksDB in environments where you have only 1 cpu core.
config.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
@@ -72,7 +73,7 @@ public void setConfig(final String storeName, final Options options,
// Workaround: We must ensure that the parallelism is set to >= 2. There seems to be a known
// issue with RocksDB where explicitly setting the parallelism to 1 causes issues (even though
// 1 seems to be RocksDB's default for this configuration).
- int compactionParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 2);
+ final int compactionParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 2);
// Set number of compaction threads (but not flush threads).
options.setIncreaseParallelism(compactionParallelism);
}
@@ -82,7 +83,7 @@ public void setConfig(final String storeName, final Options options,
public static final class ProductTypeSerde implements Serde {
@Override
- public void configure(Map map, boolean b) {
+ public void configure(final Map map, final boolean b) {
}
@Override
@@ -93,11 +94,11 @@ public void close() {
public Serializer serializer() {
return new Serializer() {
@Override
- public void configure(Map map, boolean b) {
+ public void configure(final Map map, final boolean b) {
}
@Override
- public byte[] serialize(String topic, Product pt) {
+ public byte[] serialize(final String topic, final Product pt) {
return pt.toString().getBytes();
}
@@ -111,11 +112,11 @@ public void close() {
public Deserializer deserializer() {
return new Deserializer() {
@Override
- public void configure(Map map, boolean b) {
+ public void configure(final Map map, final boolean b) {
}
@Override
- public Product deserialize(String topic, byte[] bytes) {
+ public Product deserialize(final String topic, final byte[] bytes) {
return Product.valueOf(new String(bytes));
}
@@ -126,7 +127,7 @@ public void close() {
}
}
- public static void setTimeout(long timeout, AsyncResponse asyncResponse) {
+ public static void setTimeout(final long timeout, final AsyncResponse asyncResponse) {
asyncResponse.setTimeout(timeout, TimeUnit.MILLISECONDS);
asyncResponse.setTimeoutHandler(resp -> resp.resume(
Response.status(Response.Status.GATEWAY_TIMEOUT)
@@ -134,33 +135,33 @@ public static void setTimeout(long timeout, AsyncResponse asyncResponse) {
.build()));
}
- public static Server startJetty(int port, Object binding) {
- ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ public static Server startJetty(final int port, final Object binding) {
+ final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
- Server jettyServer = new Server(port);
+ final Server jettyServer = new Server(port);
jettyServer.setHandler(context);
- ResourceConfig rc = new ResourceConfig();
+ final ResourceConfig rc = new ResourceConfig();
rc.register(binding);
rc.register(JacksonFeature.class);
- ServletContainer sc = new ServletContainer(rc);
- ServletHolder holder = new ServletHolder(sc);
+ final ServletContainer sc = new ServletContainer(rc);
+ final ServletHolder holder = new ServletHolder(sc);
context.addServlet(holder, "/*");
try {
jettyServer.start();
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RuntimeException(e);
}
log.info("Listening on " + jettyServer.getURI());
return jettyServer;
}
- public static KafkaProducer startProducer(String bootstrapServers,
- Schemas.Topic topic) {
- Properties producerConfig = new Properties();
+ public static KafkaProducer startProducer(final String bootstrapServers,
+ final Schemas.Topic topic) {
+ final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
@@ -171,12 +172,12 @@ public static KafkaProducer startProducer(String bootstrapServers,
topic.valueSerde().serializer());
}
- public static void addShutdownHookAndBlock(Service service) throws InterruptedException {
+ public static void addShutdownHookAndBlock(final Service service) throws InterruptedException {
Thread.currentThread().setUncaughtExceptionHandler((t, e) -> service.stop());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
service.stop();
- } catch (Exception ignored) {
+ } catch (final Exception ignored) {
}
}));
Thread.currentThread().join();
diff --git a/src/main/java/io/confluent/examples/streams/microservices/util/Paths.java b/src/main/java/io/confluent/examples/streams/microservices/util/Paths.java
index f51ffb6788..cf93e49fc1 100644
--- a/src/main/java/io/confluent/examples/streams/microservices/util/Paths.java
+++ b/src/main/java/io/confluent/examples/streams/microservices/util/Paths.java
@@ -2,25 +2,25 @@
public class Paths {
- private String base;
+ private final String base;
- public Paths(String host, int port) {
+ public Paths(final String host, final int port) {
base = "http://" + host + ":" + port;
}
- public String urlGet(int id) {
+ public String urlGet(final int id) {
return base + "/v1/orders/" + id;
}
- public String urlGet(String id) {
+ public String urlGet(final String id) {
return base + "/v1/orders/" + id;
}
- public String urlGetValidated(int id) {
+ public String urlGetValidated(final int id) {
return base + "/v1/orders/" + id + "/validated";
}
- public String urlGetValidated(String id) {
+ public String urlGetValidated(final String id) {
return base + "/v1/orders/" + id + "/validated";
}
diff --git a/src/main/java/io/confluent/examples/streams/utils/PriorityQueueDeserializer.java b/src/main/java/io/confluent/examples/streams/utils/PriorityQueueDeserializer.java
index 1581cb1bc0..20c0bed21c 100644
--- a/src/main/java/io/confluent/examples/streams/utils/PriorityQueueDeserializer.java
+++ b/src/main/java/io/confluent/examples/streams/utils/PriorityQueueDeserializer.java
@@ -35,12 +35,12 @@ public PriorityQueueDeserializer(final Comparator comparator, final Deseriali
}
@Override
- public void configure(Map configs, boolean isKey) {
+ public void configure(final Map configs, final boolean isKey) {
// do nothing
}
@Override
- public PriorityQueue deserialize(String s, byte[] bytes) {
+ public PriorityQueue deserialize(final String s, final byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return null;
}
@@ -53,7 +53,7 @@ public PriorityQueue deserialize(String s, byte[] bytes) {
dataInputStream.read(valueBytes);
priorityQueue.add(valueDeserializer.deserialize(s, valueBytes));
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new RuntimeException("Unable to deserialize PriorityQueue", e);
}
return priorityQueue;
diff --git a/src/main/java/io/confluent/examples/streams/utils/PriorityQueueSerde.java b/src/main/java/io/confluent/examples/streams/utils/PriorityQueueSerde.java
index a9fe262c40..5d2a12f34a 100644
--- a/src/main/java/io/confluent/examples/streams/utils/PriorityQueueSerde.java
+++ b/src/main/java/io/confluent/examples/streams/utils/PriorityQueueSerde.java
@@ -44,7 +44,7 @@ public Deserializer> deserializer() {
}
@Override
- public void configure(Map configs, boolean isKey) {
+ public void configure(final Map configs, final boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
diff --git a/src/main/java/io/confluent/examples/streams/utils/PriorityQueueSerializer.java b/src/main/java/io/confluent/examples/streams/utils/PriorityQueueSerializer.java
index f19174fc61..3d543b5b9e 100644
--- a/src/main/java/io/confluent/examples/streams/utils/PriorityQueueSerializer.java
+++ b/src/main/java/io/confluent/examples/streams/utils/PriorityQueueSerializer.java
@@ -35,12 +35,12 @@ public PriorityQueueSerializer(final Comparator comparator, final Serializer<
this.valueSerializer = valueSerializer;
}
@Override
- public void configure(Map configs, boolean isKey) {
+ public void configure(final Map configs, final boolean isKey) {
// do nothing
}
@Override
- public byte[] serialize(String topic, PriorityQueue queue) {
+ public byte[] serialize(final String topic, final PriorityQueue queue) {
final int size = queue.size();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos);
@@ -53,7 +53,7 @@ public byte[] serialize(String topic, PriorityQueue queue) {
out.write(bytes);
}
out.close();
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new RuntimeException("unable to serialize PriorityQueue", e);
}
return baos.toByteArray();
diff --git a/src/main/java/io/confluent/examples/streams/utils/WindowedSerde.java b/src/main/java/io/confluent/examples/streams/utils/WindowedSerde.java
index ea7321152f..1ed1c358f5 100644
--- a/src/main/java/io/confluent/examples/streams/utils/WindowedSerde.java
+++ b/src/main/java/io/confluent/examples/streams/utils/WindowedSerde.java
@@ -29,7 +29,7 @@ public class WindowedSerde implements Serde> {
private final Serde> inner;
- public WindowedSerde(Serde serde) {
+ public WindowedSerde(final Serde serde) {
inner = Serdes.serdeFrom(
new WindowedSerializer<>(serde.serializer()),
new WindowedDeserializer<>(serde.deserializer()));
@@ -46,7 +46,7 @@ public Deserializer> deserializer() {
}
@Override
- public void configure(Map configs, boolean isKey) {
+ public void configure(final Map configs, final boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
diff --git a/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
index 9790168c94..346748efc6 100644
--- a/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
+++ b/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
@@ -122,7 +122,7 @@ private static class DeduplicationTransformer implements Transformer idExtractor) {
+ DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper idExtractor) {
if (maintainDurationPerEventInMs < 1) {
throw new IllegalArgumentException("maintain duration per event must be >= 1");
}
@@ -139,11 +139,11 @@ public void init(final ProcessorContext context) {
}
public KeyValue transform(final K key, final V value) {
- E eventId = idExtractor.apply(key, value);
+ final E eventId = idExtractor.apply(key, value);
if (eventId == null) {
return KeyValue.pair(key, value);
} else {
- KeyValue output;
+ final KeyValue output;
if (isDuplicate(eventId)) {
output = null;
updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp());
@@ -156,21 +156,21 @@ public KeyValue transform(final K key, final V value) {
}
private boolean isDuplicate(final E eventId) {
- long eventTime = context.timestamp();
- WindowStoreIterator timeIterator = eventIdStore.fetch(
+ final long eventTime = context.timestamp();
+ final WindowStoreIterator timeIterator = eventIdStore.fetch(
eventId,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
- boolean isDuplicate = timeIterator.hasNext();
+ final boolean isDuplicate = timeIterator.hasNext();
timeIterator.close();
return isDuplicate;
}
- private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, long newTimestamp) {
+ private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, final long newTimestamp) {
eventIdStore.put(eventId, newTimestamp, newTimestamp);
}
- private void rememberNewEvent(final E eventId, long timestamp) {
+ private void rememberNewEvent(final E eventId, final long timestamp) {
eventIdStore.put(eventId, timestamp, timestamp);
}
@@ -190,19 +190,19 @@ public void close() {
@Test
public void shouldRemoveDuplicatesFromTheInput() throws Exception {
- String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8"
- String secondId = UUID.randomUUID().toString();
- String thirdId = UUID.randomUUID().toString();
- List inputValues = Arrays.asList(firstId, secondId, firstId, firstId, secondId, thirdId,
+ final String firstId = UUID.randomUUID().toString(); // e.g. "4ff3cb44-abcb-46e3-8f9a-afb7cc74fbb8"
+ final String secondId = UUID.randomUUID().toString();
+ final String thirdId = UUID.randomUUID().toString();
+ final List inputValues = Arrays.asList(firstId, secondId, firstId, firstId, secondId, thirdId,
thirdId, firstId, secondId);
- List expectedValues = Arrays.asList(firstId, secondId, thirdId);
+ final List expectedValues = Arrays.asList(firstId, secondId, thirdId);
//
// Step 1: Configure and start the processor topology.
//
- StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- Properties streamsConfiguration = new Properties();
+ final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "deduplication-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
@@ -220,18 +220,18 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception {
// The actual value depends on your use case. To reduce memory and disk usage, you could
// decrease the size to purge old windows more frequently at the cost of potentially missing out
// on de-duplicating late-arriving records.
- long maintainDurationPerEventInMs = TimeUnit.MINUTES.toMillis(10);
+ final long maintainDurationPerEventInMs = TimeUnit.MINUTES.toMillis(10);
// The number of segments has no impact on "correctness".
// Using more segments implies larger overhead but allows for more fined grained record expiration
// Note: the specified retention time is a _minimum_ time span and no strict upper time bound
- int numberOfSegments = 3;
+ final int numberOfSegments = 3;
// retention period must be at least window size -- for this use case, we don't need a longer retention period
// and thus just use the window size as retention time
- long retentionPeriod = maintainDurationPerEventInMs;
+ final long retentionPeriod = maintainDurationPerEventInMs;
- StoreBuilder> dedupStoreBuilder = Stores.windowStoreBuilder(
+ final StoreBuilder> dedupStoreBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName,
retentionPeriod,
numberOfSegments,
@@ -244,8 +244,8 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception {
builder.addStateStore(dedupStoreBuilder);
- KStream input = builder.stream(inputTopic);
- KStream deduplicated = input.transform(
+ final KStream input = builder.stream(inputTopic);
+ final KStream deduplicated = input.transform(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the extractor
// function as needed.
@@ -253,13 +253,13 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception {
storeName);
deduplicated.to(outputTopic);
- KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
//
// Step 2: Produce some input data to the input topic.
//
- Properties producerConfig = new Properties();
+ final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -270,13 +270,13 @@ public void shouldRemoveDuplicatesFromTheInput() throws Exception {
//
// Step 3: Verify the application's output data.
//
- Properties consumerConfig = new Properties();
+ final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "deduplication-integration-test-standard-consumer");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
+ final List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
outputTopic, expectedValues.size());
streams.close();
assertThat(actualValues).containsExactlyElementsOf(expectedValues);
diff --git a/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java
index 6f38fc2b8f..c04b6db25b 100644
--- a/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java
+++ b/src/test/java/io/confluent/examples/streams/FanoutLambdaIntegrationTest.java
@@ -77,34 +77,34 @@ public static void startKafkaCluster() throws Exception {
@Test
public void shouldFanoutTheInput() throws Exception {
- List inputValues = Arrays.asList("Hello", "World");
- List expectedValuesForB = inputValues.stream().map(String::toUpperCase).collect(Collectors.toList());
- List expectedValuesForC = inputValues.stream().map(String::toLowerCase).collect(Collectors.toList());
+ final List inputValues = Arrays.asList("Hello", "World");
+ final List expectedValuesForB = inputValues.stream().map(String::toUpperCase).collect(Collectors.toList());
+ final List expectedValuesForC = inputValues.stream().map(String::toLowerCase).collect(Collectors.toList());
//
// Step 1: Configure and start the processor topology.
//
- StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- Properties streamsConfiguration = new Properties();
+ final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KStream stream1 = builder.stream(inputTopicA);
- KStream stream2 = stream1.mapValues(String::toUpperCase);
- KStream stream3 = stream1.mapValues(String::toLowerCase);
+ final KStream stream1 = builder.stream(inputTopicA);
+ final KStream stream2 = stream1.mapValues(String::toUpperCase);
+ final KStream stream3 = stream1.mapValues(String::toLowerCase);
stream2.to(outputTopicB);
stream3.to(outputTopicC);
- KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
//
// Step 2: Produce some input data to the input topic.
//
- Properties producerConfig = new Properties();
+ final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -117,24 +117,24 @@ public void shouldFanoutTheInput() throws Exception {
//
// Verify output topic B
- Properties consumerConfigB = new Properties();
+ final Properties consumerConfigB = new Properties();
consumerConfigB.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfigB.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-lambda-integration-test-standard-consumer-topicB");
consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- List actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB,
+ final List actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB,
outputTopicB, inputValues.size());
assertThat(actualValuesForB).isEqualTo(expectedValuesForB);
// Verify output topic C
- Properties consumerConfigC = new Properties();
+ final Properties consumerConfigC = new Properties();
consumerConfigC.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfigC.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-lambda-integration-test-standard-consumer-topicC");
consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- List actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC,
+ final List actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC,
outputTopicC, inputValues.size());
streams.close();
assertThat(actualValuesForC).isEqualTo(expectedValuesForC);
diff --git a/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java b/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java
index 28838125aa..be777c6f50 100644
--- a/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java
+++ b/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java
@@ -65,20 +65,20 @@ public static void startKafkaCluster() throws Exception {
@Test
public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception {
- Schema schema = new Schema.Parser().parse(
+ final Schema schema = new Schema.Parser().parse(
getClass().getResourceAsStream("/avro/io/confluent/examples/streams/wikifeed.avsc"));
- GenericRecord record = new GenericData.Record(schema);
+ final GenericRecord record = new GenericData.Record(schema);
record.put("user", "alice");
record.put("is_new", true);
record.put("content", "lorem ipsum");
- List inputValues = Collections.singletonList(record);
+ final List inputValues = Collections.singletonList(record);
//
// Step 1: Configure and start the processor topology.
//
- StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- Properties streamsConfiguration = new Properties();
+ final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
@@ -105,16 +105,16 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception {
genericAvroSerde.configure(
Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()),
isKeySerde);
- KStream stream = builder.stream(inputTopic);
+ final KStream stream = builder.stream(inputTopic);
stream.to(outputTopic, Produced.with(stringSerde, genericAvroSerde));
- KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
//
// Step 2: Produce some input data to the input topic.
//
- Properties producerConfig = new Properties();
+ final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -126,14 +126,14 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception {
//
// Step 3: Verify the application's output data.
//
- Properties consumerConfig = new Properties();
+ final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "generic-avro-integration-test-standard-consumer");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());
- List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
+ final List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
outputTopic, inputValues.size());
streams.close();
assertEquals(inputValues, actualValues);
diff --git a/src/test/java/io/confluent/examples/streams/HandlingCorruptedInputRecordsIntegrationTest.java b/src/test/java/io/confluent/examples/streams/HandlingCorruptedInputRecordsIntegrationTest.java
index dcfe2eb97c..8b2ac7efb2 100644
--- a/src/test/java/io/confluent/examples/streams/HandlingCorruptedInputRecordsIntegrationTest.java
+++ b/src/test/java/io/confluent/examples/streams/HandlingCorruptedInputRecordsIntegrationTest.java
@@ -71,40 +71,40 @@ public static void startKafkaCluster() throws Exception {
@Test
public void shouldIgnoreCorruptInputRecords() throws Exception {
- List inputValues = Arrays.asList(1L, 2L, 3L);
- List expectedValues = inputValues.stream().map(x -> 2 * x).collect(Collectors.toList());
+ final List inputValues = Arrays.asList(1L, 2L, 3L);
+ final List expectedValues = inputValues.stream().map(x -> 2 * x).collect(Collectors.toList());
//
// Step 1: Configure and start the processor topology.
//
- StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- Properties streamsConfiguration = new Properties();
+ final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "failure-handling-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- Serde stringSerde = Serdes.String();
- Serde longSerde = Serdes.Long();
+ final Serde stringSerde = Serdes.String();
+ final Serde longSerde = Serdes.Long();
- KStream input = builder.stream(inputTopic);
+ final KStream input = builder.stream(inputTopic);
// Note how the returned stream is of type `KStream`.
- KStream doubled = input.flatMap(
+ final KStream doubled = input.flatMap(
(k, v) -> {
try {
// Attempt deserialization
- String key = stringSerde.deserializer().deserialize("input-topic", k);
- long value = longSerde.deserializer().deserialize("input-topic", v);
+ final String key = stringSerde.deserializer().deserialize("input-topic", k);
+ final long value = longSerde.deserializer().deserialize("input-topic", v);
// Ok, the record is valid (not corrupted). Let's take the
// opportunity to also process the record in some way so that
// we haven't paid the deserialization cost just for "poison pill"
// checking.
return Collections.singletonList(KeyValue.pair(key, 2 * value));
- } catch (SerializationException e) {
+ } catch (final SerializationException e) {
// Ignore/skip the corrupted record by catching the exception.
// Optionally, we can log the fact that we did so:
System.err.println("Could not deserialize record: " + e.getMessage());
@@ -116,13 +116,13 @@ public void shouldIgnoreCorruptInputRecords() throws Exception {
// Write the processing results (which was generated from valid records only) to Kafka.
doubled.to(outputTopic, Produced.with(stringSerde, longSerde));
- KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
//
// Step 2: Produce some corrupt input data to the input topic.
//
- Properties producerConfigForCorruptRecords = new Properties();
+ final Properties producerConfigForCorruptRecords = new Properties();
producerConfigForCorruptRecords.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfigForCorruptRecords.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfigForCorruptRecords.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -134,7 +134,7 @@ public void shouldIgnoreCorruptInputRecords() throws Exception {
//
// Step 3: Produce some (valid) input data to the input topic.
//
- Properties producerConfig = new Properties();
+ final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -145,13 +145,13 @@ public void shouldIgnoreCorruptInputRecords() throws Exception {
//
// Step 4: Verify the application's output data.
//
- Properties consumerConfig = new Properties();
+ final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "map-function-lambda-integration-test-standard-consumer");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
- List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
+ final List actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
outputTopic, expectedValues.size());
streams.close();
assertThat(actualValues).isEqualTo(expectedValues);
diff --git a/src/test/java/io/confluent/examples/streams/IntegrationTestUtils.java b/src/test/java/io/confluent/examples/streams/IntegrationTestUtils.java
index 0563249084..484f82a164 100644
--- a/src/test/java/io/confluent/examples/streams/IntegrationTestUtils.java
+++ b/src/test/java/io/confluent/examples/streams/IntegrationTestUtils.java
@@ -58,8 +58,10 @@ public class IntegrationTestUtils {
* @param maxMessages Maximum number of messages to read via the consumer.
* @return The values retrieved via the consumer.
*/
- public static List readValues(String topic, Properties consumerConfig, int maxMessages) {
- List> kvs = readKeyValues(topic, consumerConfig, maxMessages);
+ public static List readValues(final String topic,
+ final Properties consumerConfig,
+ final int maxMessages) {
+ final List> kvs = readKeyValues(topic, consumerConfig, maxMessages);
return kvs.stream().map(kv -> kv.value).collect(Collectors.toList());
}
@@ -71,7 +73,8 @@ public static List readValues(String topic, Properties consumerConfig,
* @param consumerConfig Kafka consumer configuration
* @return The KeyValue elements retrieved via the consumer.
*/
- public static List> readKeyValues(String topic, Properties consumerConfig) {
+ public static List> readKeyValues(final String topic,
+ final Properties consumerConfig) {
return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES);
}
@@ -84,17 +87,19 @@ public static List> readKeyValues(String topic, Properties
* @param maxMessages Maximum number of messages to read via the consumer
* @return The KeyValue elements retrieved via the consumer
*/
- public static List> readKeyValues(String topic, Properties consumerConfig, int maxMessages) {
- KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
+ public static List> readKeyValues(final String topic,
+ final Properties consumerConfig,
+ final int maxMessages) {
+ final KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
consumer.subscribe(Collections.singletonList(topic));
- int pollIntervalMs = 100;
- int maxTotalPollTimeMs = 2000;
+ final int pollIntervalMs = 100;
+ final int maxTotalPollTimeMs = 2000;
int totalPollTimeMs = 0;
- List> consumedValues = new ArrayList<>();
+ final List> consumedValues = new ArrayList<>();
while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) {
totalPollTimeMs += pollIntervalMs;
- ConsumerRecords records = consumer.poll(pollIntervalMs);
- for (ConsumerRecord record : records) {
+ final ConsumerRecords records = consumer.poll(pollIntervalMs);
+ for (final ConsumerRecord record : records) {
consumedValues.add(new KeyValue<>(record.key(), record.value()));
}
}
@@ -102,7 +107,7 @@ public static List> readKeyValues(String topic, Properties
return consumedValues;
}
- private static boolean continueConsuming(int messagesConsumed, int maxMessages) {
+ private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
return maxMessages <= 0 || messagesConsumed < maxMessages;
}
@@ -114,11 +119,11 @@ private static boolean continueConsuming(int messagesConsumed, int maxMessages)
* @param Value type of the data records
*/
public static void produceKeyValuesSynchronously(
- String topic, Collection> records, Properties producerConfig)
+ final String topic, final Collection> records, final Properties producerConfig)
throws ExecutionException, InterruptedException {
- Producer producer = new KafkaProducer<>(producerConfig);
- for (KeyValue record : records) {
- Future f = producer.send(
+ final Producer producer = new KafkaProducer<>(producerConfig);
+ for (final KeyValue record : records) {
+ final Future f = producer.send(
new ProducerRecord<>(topic, record.key, record.value));
f.get();
}
@@ -127,16 +132,17 @@ public static void produceKeyValuesSynchronously(
}
public static void produceValuesSynchronously(
- String topic, Collection records, Properties producerConfig)
+ final String topic, final Collection records, final Properties producerConfig)
throws ExecutionException, InterruptedException {
- Collection> keyedRecords =
+ final Collection> keyedRecords =
records.stream().map(record -> new KeyValue<>(null, record)).collect(Collectors.toList());
produceKeyValuesSynchronously(topic, keyedRecords, producerConfig);
}
- public static List> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
- String topic,
- int expectedNumRecords) throws InterruptedException {
+ public static List> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
+ final String topic,
+ final int expectedNumRecords)
+ throws InterruptedException {
return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
}
@@ -151,14 +157,15 @@ public static List> waitUntilMinKeyValueRecordsReceived(Pr
* @return All the records consumed, or null if no records are consumed
* @throws AssertionError if the given wait time elapses
*/
- public static List> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
- String topic,
- int expectedNumRecords,
- long waitTime) throws InterruptedException {
- List> accumData = new ArrayList<>();
- long startTime = System.currentTimeMillis();
+ public static List> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
+ final String topic,
+ final int expectedNumRecords,
+ final long waitTime)
+ throws InterruptedException {
+ final List> accumData = new ArrayList<>();
+ final long startTime = System.currentTimeMillis();
while (true) {
- List