diff --git a/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java b/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java index c0a6f9ec94ce3..55413b9bbdad1 100644 --- a/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java +++ b/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java @@ -112,12 +112,7 @@ public AbstractBatchingProcessor create( ) throws Exception { int batchSize = ConfigurationUtils.readIntProperty(this.processorType, tag, config, BATCH_SIZE_FIELD, DEFAULT_BATCH_SIZE); if (batchSize < 1) { - throw newConfigurationException( - this.processorType, - tag, - BATCH_SIZE_FIELD, - BATCH_SIZE_FIELD + " must be a positive integer" - ); + throw newConfigurationException(this.processorType, tag, BATCH_SIZE_FIELD, "batch size must be a positive integer"); } return newProcessor(tag, description, batchSize, config); } diff --git a/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java b/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java index b6637bc430f8c..54fc30cb5befa 100644 --- a/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java +++ b/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java @@ -8,12 +8,15 @@ package org.opensearch.ingest; +import org.opensearch.OpenSearchParseException; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Consumer; public class AbstractBatchingProcessorTests extends OpenSearchTestCase { @@ -87,6 +90,29 @@ public void testBatchExecute_defaultBatchSize() { assertEquals(wrapperList.subList(2, 3), processor.getSubBatches().get(2)); } + public void testFactory_invalidBatchSize() { + Map config = new HashMap<>(); + config.put("batch_size", 0); + DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor"); + OpenSearchParseException exception = assertThrows(OpenSearchParseException.class, () -> factory.create(config)); + assertEquals("[batch_size] batch size must be a positive integer", exception.getMessage()); + } + + public void testFactory_defaultBatchSize() throws Exception { + Map config = new HashMap<>(); + DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor"); + DummyProcessor processor = (DummyProcessor) factory.create(config); + assertEquals(1, processor.batchSize); + } + + public void testFactory_callNewProcessor() throws Exception { + Map config = new HashMap<>(); + config.put("batch_size", 3); + DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor"); + DummyProcessor processor = (DummyProcessor) factory.create(config); + assertEquals(3, processor.batchSize); + } + static class DummyProcessor extends AbstractBatchingProcessor { private List> subBatches = new ArrayList<>(); @@ -113,5 +139,22 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { public String getType() { return null; } + + public static class DummyProcessorFactory extends Factory { + + protected DummyProcessorFactory(String processorType) { + super(processorType); + } + + public AbstractBatchingProcessor create(Map config) throws Exception { + final Map processorFactories = new HashMap<>(); + return super.create(processorFactories, "tag", "description", config); + } + + @Override + protected AbstractBatchingProcessor newProcessor(String tag, String description, int batchSize, Map config) { + return new DummyProcessor(batchSize); + } + } } }