Skip to content

Commit

Permalink
Prepare release v1.0.2
Browse files Browse the repository at this point in the history
* Add more traceability about LAST (batch and/or exchange)
* Extend 'DataProviderComponentTest'
  • Loading branch information
Christian Ribeaud committed Mar 8, 2017
1 parent 424b8fe commit edb2e69
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 11 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Created by .ignore support plugin (hsz.mobi)
.gitignore
.idea/
camel-data-provider.iml
target/
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.camel</groupId>
<artifactId>camel-data-provider</artifactId>
<version>1.0.0</version>
<version>1.0.2</version>

<packaging>jar</packaging>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.camel.component.dataprovider;

/**
* Some constant around <b>dataprovider</b> component.
*
* @author <a href="mailto:[email protected]">Christian Ribeaud</a>
*/
public final class DataProviderConstants {

private DataProviderConstants() {
// Can NOT be instantiated
}

/**
* Constant to specify whether current {@link org.apache.camel.Exchange} is part of the last batch.
* <p>
* Value stored should be a <b>boolean</b> or associated object.
* </p>
*/
public final static String LAST_BATCH = DataProviderConstants.class.getName() + ".LastBatch";

/**
* Constant to specify whether current {@link org.apache.camel.Exchange} is the last one.
* <p>
* Value stored should be a <b>boolean</b> or associated object.
* </p>
*/
public final static String LAST_EXCHANGE = DataProviderConstants.class.getName() + ".LastExchange";
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ protected void doStart() throws Exception {
@Override
public int processBatch(Queue<Object> exchanges) throws Exception {
assert exchanges != null : "Unspecified exchanges";
final int total = exchanges.size();
for (int index = 0; index < total && isBatchAllowed(); index++) {
final int batchSize = exchanges.size();
for (int index = 0; index < batchSize && isBatchAllowed(); index++) {
Exchange exchange = (Exchange) exchanges.poll();
// Add current index and total as properties
exchange.setProperty(Exchange.BATCH_INDEX, index);
exchange.setProperty(Exchange.BATCH_SIZE, total);
exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
exchange.setProperty(Exchange.BATCH_SIZE, batchSize);
exchange.setProperty(Exchange.BATCH_COMPLETE, index == batchSize - 1);
// We are handling the last exchange if the last batch is complete
exchange.setProperty(DataProviderConstants.LAST_EXCHANGE, exchange.getProperty(Exchange.BATCH_COMPLETE, Boolean.class)
&& exchange.getProperty(DataProviderConstants.LAST_BATCH, Boolean.class));
// Update pending number of exchanges
pendingExchanges = total - index - 1;
pendingExchanges = batchSize - index - 1;
// Process the current exchange
getProcessor().process(exchange);
Exception exception = exchange.getException();
Expand All @@ -62,11 +65,12 @@ public int processBatch(Queue<Object> exchanges) throws Exception {
exception);
}
}
return total;
return batchSize;
}

@Override
protected int poll() throws Exception {
// Process current range
DataProviderEndpoint endpoint = getDataProviderEndoint();
IDataProvider<?> dataProvider = endpoint.getDataProvider();
final Range<Integer> range = this.rangeReference.get();
Expand All @@ -81,9 +85,11 @@ protected int poll() throws Exception {
Queue<Exchange> exchanges = new LinkedList<>();
for (Object item : dataProvider.partition(range)) {
Exchange exchange = endpoint.createExchange();
exchange.setProperty(DataProviderConstants.LAST_BATCH, range.upperEndpoint() == size);
exchange.getIn().setBody(item);
exchanges.add(exchange);
}
// Prepare next range
Range<Integer> nextRange = createNextRange(range.upperEndpoint(), size);
LogUtils.debug(LOG, () -> String.format("Next range will be '%s'.", nextRange));
this.rangeReference.set(nextRange);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package org.apache.camel.component.dataprovider;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.testng.CamelTestSupport;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;

/**
* Test cases for corresponding class {@link DataProviderComponent}.
*
Expand All @@ -16,17 +22,62 @@ public class DataProviderComponentTest extends CamelTestSupport {
@Test
public void testDataProvider() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMinimumMessageCount(1);
assertMockEndpointsSatisfied();
// It will wait until it reaches the expected count
mock.expectedMessageCount(100);
mock.setRetainFirst(1);
mock.setRetainLast(1);
mock.assertIsSatisfied();
List<Exchange> exchanges = mock.getExchanges();
assertEquals(exchanges.size(), 2);
// Last
Exchange lastExchange = exchanges.get(1);
assertNotNull(lastExchange);
assertEquals(lastExchange.getProperty(DataProviderConstants.LAST_EXCHANGE), true);
assertEquals(lastExchange.getProperty(DataProviderConstants.LAST_BATCH), true);
assertEquals(lastExchange.getProperty(Exchange.BATCH_COMPLETE), true);
assertEquals(lastExchange.getProperty(Exchange.BATCH_SIZE), 20);
assertEquals(lastExchange.getProperty(Exchange.BATCH_INDEX), 19);
// First
Exchange firstExchange = exchanges.get(0);
assertNotNull(firstExchange);
assertEquals(firstExchange.getProperty(DataProviderConstants.LAST_EXCHANGE), false);
assertEquals(firstExchange.getProperty(DataProviderConstants.LAST_BATCH), false);
assertEquals(firstExchange.getProperty(Exchange.BATCH_COMPLETE), false);
assertEquals(firstExchange.getProperty(Exchange.BATCH_SIZE), 20);
assertEquals(firstExchange.getProperty(Exchange.BATCH_INDEX), 0);
}

@Test
public void testDataProviderNoMoreThan100() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
// It will wait until it reaches the expected count
mock.expectedMessageCount(101);
mock.assertIsNotSatisfied();
}

@Override
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry registry = super.createRegistry();
registry.bind("foo", new StaticDataProvider<>("Hello", "Choubidou"));
List<String> strings = new ArrayList<>(100);
IntStream.range(0, 100).forEach(i -> strings.add(createRandomString()));
registry.bind("foo", new StaticDataProvider<>(strings));
return registry;
}

@Override
protected void doPostSetup() throws Exception {
CamelContext context = context();
DataProviderComponent dataProviderComponent = new DataProviderComponent(context);
String componentName = "dataprovider";
if (context.hasComponent(componentName) == null) {
context.addComponent(componentName, dataProviderComponent);
}
}

private static String createRandomString() {
return Long.toHexString(Double.doubleToLongBits(Math.random()));
}

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
Expand Down

0 comments on commit edb2e69

Please sign in to comment.