From 88e0bb0c789c4d2270105d0443f4a93e18f4e49e Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 14 Nov 2023 14:53:29 -0800 Subject: [PATCH] Adds a configuration to the random string source to configure the wait delay between writes to the buffer. Resolves #3595. Also uses a single thread for this source to avoid an unnecessary thread pool and increases the code coverage. (#3602) Signed-off-by: David Venable --- .../plugins/source/RandomStringSource.java | 47 +++++++------- .../source/RandomStringSourceConfig.java | 19 ++++++ .../source/RandomStringSourceTests.java | 64 ++++++++++++------- 3 files changed, 84 insertions(+), 46 deletions(-) create mode 100644 data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceConfig.java diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/RandomStringSource.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/RandomStringSource.java index 62be100527..975a2a75f7 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/RandomStringSource.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/RandomStringSource.java @@ -6,69 +6,68 @@ package org.opensearch.dataprepper.plugins.source; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * Generates a random string every 500 milliseconds. Intended to be used for testing setups */ -@DataPrepperPlugin(name = "random", pluginType = Source.class) +@DataPrepperPlugin(name = "random", pluginType = Source.class, pluginConfigurationType = RandomStringSourceConfig.class) public class RandomStringSource implements Source> { - private static final Logger LOG = LoggerFactory.getLogger(RandomStringSource.class); + private static final int BUFFER_WAIT = 500; + private final long waitTimeInMillis; - private ExecutorService executorService; private volatile boolean stop = false; + private Thread thread; - private void setExecutorService() { - if(executorService == null || executorService.isShutdown()) { - executorService = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(false).setNameFormat("random-source-pool-%d").build() - ); - } + @DataPrepperPluginConstructor + public RandomStringSource(final RandomStringSourceConfig config) { + waitTimeInMillis = config.getWaitDelay().toMillis(); } @Override public void start(final Buffer> buffer) { - setExecutorService(); - executorService.execute(() -> { + if(thread != null) { + throw new IllegalStateException("This source has already started."); + } + + thread = new Thread(() -> { while (!stop) { try { LOG.debug("Writing to buffer"); final Record record = generateRandomStringEventRecord(); - buffer.write(record, 500); - Thread.sleep(500); + buffer.write(record, BUFFER_WAIT); + Thread.sleep(waitTimeInMillis); } catch (final InterruptedException e) { break; } catch (final TimeoutException e) { // Do nothing } } - }); + }, + "random-source"); + + thread.setDaemon(false); + thread.start(); } @Override public void stop() { stop = true; - executorService.shutdown(); try { - if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { - executorService.shutdownNow(); - } - } catch (final InterruptedException ex) { - executorService.shutdownNow(); + thread.join(waitTimeInMillis + BUFFER_WAIT + 100); + } catch (final InterruptedException e) { + thread.interrupt(); } } diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceConfig.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceConfig.java new file mode 100644 index 0000000000..37ba58e749 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceConfig.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.Duration; + +public class RandomStringSourceConfig { + @JsonProperty("wait_delay") + private Duration waitDelay = Duration.ofMillis(500); + + public Duration getWaitDelay() { + return waitDelay; + } +} diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceTests.java index f21a1f2d52..4ba271e827 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/RandomStringSourceTests.java @@ -5,33 +5,34 @@ package org.opensearch.dataprepper.plugins.source; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.buffer.TestBuffer; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.awaitility.Awaitility.await; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +@ExtendWith(MockitoExtension.class) class RandomStringSourceTests { - - private TestBuffer buffer; - - @BeforeEach - void setUp() { - final Queue> bufferQueue = new ConcurrentLinkedQueue<>(); - buffer = new TestBuffer(bufferQueue, 1); - } + @Mock + private Buffer> buffer; private RandomStringSource createObjectUnderTest() { - return new RandomStringSource(); + return new RandomStringSource(new RandomStringSourceConfig()); } @Test @@ -41,25 +42,44 @@ void testPutRecord() { randomStringSource.start(buffer); await().atMost(3, TimeUnit.SECONDS) .pollDelay(200, TimeUnit.MILLISECONDS) - .until(() -> buffer.size() > 0); - assertThat(buffer.size(), greaterThan(0)); + .untilAsserted(() -> verify(buffer).write(any(), anyInt())); + } + + @Test + void source_continues_to_write_if_a_write_to_buffer_fails() throws TimeoutException { + final RandomStringSource randomStringSource = createObjectUnderTest(); + + doThrow(TimeoutException.class).when(buffer).write(any(), anyInt()); + + randomStringSource.start(buffer); + await().atMost(3, TimeUnit.SECONDS) + .pollDelay(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> verify(buffer, atLeast(2)).write(any(), anyInt())); } @Test - void testStop() throws InterruptedException { + void testStop() throws InterruptedException, TimeoutException { final RandomStringSource randomStringSource = createObjectUnderTest(); //Start source, and sleep for 1000 millis randomStringSource.start(buffer); await().atMost(3, TimeUnit.SECONDS) .pollDelay(200, TimeUnit.MILLISECONDS) - .until(() -> buffer.size() > 0); + .untilAsserted(() -> verify(buffer).write(any(), anyInt())); //Stop the source, and wait long enough that another message would be sent //if the source was running randomStringSource.stop(); Thread.sleep(200); // Ensure the other thread has time to finish writing. - final int sizeAfterCompletion = buffer.size(); + verify(buffer, atLeastOnce()).write(any(), anyInt()); Thread.sleep(1000); - assertThat(buffer.size(), equalTo(sizeAfterCompletion)); + verifyNoMoreInteractions(buffer); } + @Test + void multiple_calls_to_start_throws() { + final RandomStringSource objectUnderTest = createObjectUnderTest(); + + objectUnderTest.start(buffer); + + assertThrows(IllegalStateException.class, () -> objectUnderTest.start(buffer)); + } }