Skip to content

Commit

Permalink
Add 'DataProviderPollingConsumer'
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Ribeaud committed Mar 10, 2017
1 parent 1825fd3 commit d90323f
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Obje
IDataProvider<?> dataProvider;
switch (found.size()) {
case 0:
throw new NoSuchBeanException(null, type.getSimpleName());
throw new NoSuchBeanException(remaining, type.getSimpleName());
case 1:
// If we only have one, we do NOT need 'remaining'
dataProvider = (IDataProvider<?>) found.stream().findFirst().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* {@link ScheduledBatchPollingConsumer} extension for {@link IDataProvider}.
Expand All @@ -23,18 +23,23 @@ public class DataProviderConsumer extends ScheduledBatchPollingConsumer {

private static final Logger LOG = LoggerFactory.getLogger(DataProviderConsumer.class);

private final AtomicReference<Range<Integer>> rangeReference = new AtomicReference<>();
private final AtomicBoolean finished = new AtomicBoolean(false);
private volatile Range<Integer> range;
private final Lock rangeLock = new ReentrantLock();

public DataProviderConsumer(DataProviderEndpoint dataProviderEndpoint, Processor processor) {
DataProviderConsumer(DataProviderEndpoint dataProviderEndpoint, Processor processor) {
super(dataProviderEndpoint, processor);
}

@Override
protected void doStart() throws Exception {
IDataProvider<?> dataProvider = getDataProviderEndoint().getDataProvider();
IDataProvider<?> dataProvider = getEndpoint().getDataProvider();
int size = dataProvider.getSize();
rangeReference.set(Range.closedOpen(0, Math.min(size, maxMessagesPerPoll)));
rangeLock.lock();
try {
range = Range.closedOpen(0, Math.min(size, maxMessagesPerPoll));
} finally {
rangeLock.unlock();
}
LogUtils.info(LOG, () -> String.format("Preparing to handle %d partition(s) (%d / %d)",
(int) Math.ceil((float) size / maxMessagesPerPoll), size, maxMessagesPerPoll));
super.doStart();
Expand Down Expand Up @@ -70,32 +75,35 @@ public int processBatch(Queue<Object> exchanges) throws Exception {

@Override
protected int poll() throws Exception {
// Process current range
DataProviderEndpoint endpoint = getDataProviderEndoint();
DataProviderEndpoint endpoint = getEndpoint();
IDataProvider<?> dataProvider = endpoint.getDataProvider();
final Range<Integer> range = this.rangeReference.get();
int index = range.lowerEndpoint();
if (range.isEmpty()) {
if (!finished.getAndSet(true)) {
final Queue<Exchange> exchanges = new LinkedList<>();
rangeLock.lock();
try {
Range<Integer> range = this.range;
if (range.isEmpty()) {
LogUtils.info(LOG, () -> "Nothing to poll. Last range handled.");
return 0;
}
return 0;
}
LogUtils.info(LOG, () -> String.format("Handling range '%s'.", range));
int size = dataProvider.getSize();
Queue<Exchange> exchanges = new LinkedList<>();
for (Object item : dataProvider.partition(range)) {
Exchange exchange = endpoint.createExchange();
exchange.setProperty(DataProviderConstants.INDEX, index++);
exchange.setProperty(DataProviderConstants.SIZE, size);
exchange.setProperty(DataProviderConstants.LAST_BATCH, range.upperEndpoint() == size);
exchange.getIn().setBody(item);
exchanges.add(exchange);
// Process current range
LogUtils.info(LOG, () -> String.format("Handling range '%s'.", range));
int index = range.lowerEndpoint();
int size = dataProvider.getSize();
for (Object item : dataProvider.partition(range)) {
Exchange exchange = endpoint.createExchange();
exchange.setProperty(DataProviderConstants.INDEX, index++);
exchange.setProperty(DataProviderConstants.SIZE, size);
exchange.setProperty(DataProviderConstants.LAST_BATCH, range.upperEndpoint() == size);
exchange.getIn().setBody(item);
exchanges.add(exchange);
}
// Prepare next range
Range<Integer> nextRange = createNextRange(range.upperEndpoint(), size);
LogUtils.debug(LOG, () -> String.format("Next range will be '%s'.", nextRange));
this.range = nextRange;
} finally {
rangeLock.unlock();
}
// Prepare next range
Range<Integer> nextRange = createNextRange(range.upperEndpoint(), size);
LogUtils.debug(LOG, () -> String.format("Next range will be '%s'.", nextRange));
this.rangeReference.set(nextRange);
Stopwatch stopwatch = Stopwatch.createStarted();
int processBatch = processBatch(CastUtils.cast(exchanges));
stopwatch.stop();
Expand All @@ -111,7 +119,8 @@ Range<Integer> createNextRange(int upper, int size) {
}
}

private DataProviderEndpoint getDataProviderEndoint() {
return (DataProviderEndpoint) getEndpoint();
@Override
public DataProviderEndpoint getEndpoint() {
return (DataProviderEndpoint) super.getEndpoint();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.camel.impl.ScheduledPollEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;

/**
Expand All @@ -20,6 +21,10 @@ public class DataProviderEndpoint extends ScheduledPollEndpoint {
@UriPath(name = "name", description = "Name of IDataProvider to lookup in the registry")
@Metadata(required = "true")
private final IDataProvider<?> dataProvider;
@UriParam(defaultValue = "0", description = "Range starting index. Default is 0 and could NOT be negative.")
private int startingIndex = 0;
@UriParam(defaultValue = "-1", description = "Range size. -1 means everything.")
private int rangeSize = -1;

public DataProviderEndpoint(String uri, DataProviderComponent dataProviderComponent,
IDataProvider<?> dataProvider) {
Expand All @@ -35,7 +40,10 @@ public Producer createProducer() throws Exception {

@Override
public PollingConsumer createPollingConsumer() throws Exception {
throw new UnsupportedOperationException("No PollingConsumer has been implemented yet.");
DataProviderPollingConsumer pollingConsumer = new DataProviderPollingConsumer(this);
// Do NOT configure it using 'configurePollingConsumer' as we are NOT using the standard parameters
// configurePollingConsumer(pollingConsumer);
return pollingConsumer;
}

@Override
Expand All @@ -45,6 +53,30 @@ public Consumer createConsumer(Processor processor) throws Exception {
return consumer;
}

public int getRangeSize() {
return rangeSize;
}

public void setRangeSize(int rangeSize) {
assertNonNegative(rangeSize, "Range size");
this.rangeSize = rangeSize;
}

public int getStartingIndex() {
return startingIndex;
}

public void setStartingIndex(int startingIndex) {
assertNonNegative(startingIndex, "Starting index");
this.startingIndex = startingIndex;
}

private static void assertNonNegative(int value, String object) {
if (value < 0) {
throw new IllegalArgumentException(String.format("%s could NOT be negative.", object));
}
}

@Override
public boolean isSingleton() {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.apache.camel.component.dataprovider;

import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.PollingConsumerSupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;

/**
* {@link PollingConsumerSupport} extension for {@link IDataProvider}.
*
* @author <a href="mailto:[email protected]">Christian Ribeaud</a>
*/
public class DataProviderPollingConsumer extends PollingConsumerSupport {

private static final Logger LOG = LoggerFactory.getLogger(DataProviderPollingConsumer.class);

private ExecutorService executorService;

DataProviderPollingConsumer(DataProviderEndpoint dataProviderEndpoint) {
super(dataProviderEndpoint);
}

@Override
public Exchange receive() {
Range<Integer> range = createRange();
LogUtils.info(LOG, () -> String.format("Handling range '%s'.", range));
Iterable<?> partition = getEndpoint().getDataProvider().partition(range);
Exchange exchange = getEndpoint().createExchange();
int size = Iterables.size(partition);
Message in = exchange.getIn();
switch (size) {
case 1:
in.setBody(Iterables.get(partition, 0));
break;
default:
in.setBody(partition);
}
return exchange;
}

@Override
public Exchange receiveNoWait() {
return receive();
}

@Override
public Exchange receive(long timeout) {
// the task is the receive method
Future<Exchange> future = executorService.submit((Callable<Exchange>) this::receive);
try {
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
} catch (TimeoutException e) {
// ignore as we hit timeout then return null
}
return null;
}

@Override
protected void doStart() throws Exception {
executorService = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, getClass().getSimpleName());
}

@Override
protected void doStop() throws Exception {
getCamelContext().getExecutorServiceManager().shutdown(executorService);
}

private CamelContext getCamelContext() {
return getEndpoint().getCamelContext();
}

@Override
public DataProviderEndpoint getEndpoint() {
return (DataProviderEndpoint) super.getEndpoint();
}

private Range<Integer> createRange() {
int size = getEndpoint().getDataProvider().getSize();
int rangeSize = getEndpoint().getRangeSize();
rangeSize = rangeSize < 0 ? size : rangeSize;
int lower = getEndpoint().getStartingIndex();
return Range.closedOpen(Math.min(lower, size - 1), Math.min(size, lower + rangeSize));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.apache.camel.component.dataprovider;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
Expand Down Expand Up @@ -62,20 +61,14 @@ public void testDataProviderNoMoreThan100() throws Exception {
@Override
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry registry = super.createRegistry();
List<String> strings = new ArrayList<>(100);
IntStream.range(0, 100).forEach(i -> strings.add(createRandomString()));
registry.bind("foo", new StaticDataProvider<>(strings));
registry.bind("foo", new StaticDataProvider<>(getRandomStrings(100)));
return registry;
}

@Override
protected void doPostSetup() throws Exception {
CamelContext context = context();
DataProviderComponent dataProviderComponent = new DataProviderComponent(context);
String componentName = "dataprovider";
if (context.hasComponent(componentName) == null) {
context.addComponent(componentName, dataProviderComponent);
}
static List<String> getRandomStrings(int length) {
List<String> strings = new ArrayList<>(length);
IntStream.range(0, length).forEach(i -> strings.add(createRandomString()));
return strings;
}

private static String createRandomString() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.apache.camel.component.dataprovider;

import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.testng.CamelTestSupport;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.List;

/**
* Test cases for corresponding class {@link DataProviderPollingConsumer}.
*
* @author <a href="mailto:[email protected]">Christian Ribeaud</a>
*/
public class DataProviderPollingConsumerTest extends CamelTestSupport {

@Test
public void testReceiveBodyWithRangeSizeAndStartingIndex() {
List<String> body = consumer.receiveBody("dataprovider://foo?rangeSize=2&startingIndex=3", List.class);
assertEquals(body.size(), 2);
assertEquals(body.get(0), "Four");
}

@Test
public void testReceiveBody() {
List<String> body = consumer.receiveBody("dataprovider://foo", List.class);
assertEquals(body.size(), 10);
}

@Test
public void testReceiveBodyWithOneString() {
String body = consumer.receiveBody("dataprovider://foo?rangeSize=1&startingIndex=12345", String.class);
assertEquals(body, "Ten");
}

@Test
public void testReceiveBodyWithZeroRangeSize() {
List<String> body = consumer.receiveBody("dataprovider://foo?rangeSize=0&startingIndex=4", List.class);
assertEquals(body.size(), 0);
}

@Override
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry registry = super.createRegistry();
registry.bind("foo", new StaticDataProvider<>(Arrays.asList("One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", "Ten")));
return registry;
}
}

0 comments on commit d90323f

Please sign in to comment.