From afd6527fca66e814cbc3443c6d0941640e50d77e Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Wed, 6 Mar 2024 17:37:48 -0800 Subject: [PATCH] Refactor to use interface so default behavior is unaffected. Add tests Signed-off-by: Chase Engelbrecht --- .../opensearch/ConnectionConfiguration.java | 20 +- .../sink/opensearch/OpenSearchSink.java | 21 ++- .../ConcurrentRequestSender.java} | 48 +++-- .../opensearch/bulk/InlineRequestSender.java | 11 ++ .../sink/opensearch/bulk/RequestSender.java | 13 ++ .../ConnectionConfigurationTests.java | 16 ++ .../bulk/ConcurrentRequestSenderTest.java | 178 ++++++++++++++++++ .../bulk/InlineRequestSenderTest.java | 50 +++++ 8 files changed, 319 insertions(+), 38 deletions(-) rename data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/{RequestSender.java => bulk/ConcurrentRequestSender.java} (54%) create mode 100644 data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/InlineRequestSender.java create mode 100644 data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/RequestSender.java create mode 100644 data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/ConcurrentRequestSenderTest.java create mode 100644 data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/InlineRequestSenderTest.java diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java index ea8a993168..18297fe4ef 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java @@ -91,7 +91,7 @@ public class ConnectionConfiguration { public static final String NETWORK_POLICY_NAME = "network_policy_name"; public static final String VPCE_ID = "vpce_id"; public static final String REQUEST_COMPRESSION_ENABLED = "enable_request_compression"; - public static final String CLIENTS = "clients"; + public static final String CONCURRENT_REQUESTS = "concurrent_requests"; /** * The valid port range per https://tools.ietf.org/html/rfc6335. @@ -118,7 +118,7 @@ public class ConnectionConfiguration { private final String serverlessCollectionName; private final String serverlessVpceId; private final boolean requestCompressionEnabled; - private final Integer clients; + private final Integer concurrentRequests; List getHosts() { return hosts; @@ -180,8 +180,8 @@ boolean isRequestCompressionEnabled() { return requestCompressionEnabled; } - Integer getClients() { - return clients; + Integer getConcurrentRequests() { + return concurrentRequests; } private ConnectionConfiguration(final Builder builder) { @@ -204,7 +204,7 @@ private ConnectionConfiguration(final Builder builder) { this.serverlessVpceId = builder.serverlessVpceId; this.requestCompressionEnabled = builder.requestCompressionEnabled; this.pipelineName = builder.pipelineName; - this.clients = builder.clients; + this.concurrentRequests = builder.concurrentRequests; } public static ConnectionConfiguration readConnectionConfiguration(final PluginSetting pluginSetting){ @@ -282,8 +282,8 @@ public static ConnectionConfiguration readConnectionConfiguration(final PluginSe REQUEST_COMPRESSION_ENABLED, !DistributionVersion.ES6.equals(distributionVersion)); builder = builder.withRequestCompressionEnabled(requestCompressionEnabled); - final Integer clients = pluginSetting.getIntegerOrDefault(CLIENTS, 1); - builder = builder.withClients(clients); + final Integer concurrentRequests = pluginSetting.getIntegerOrDefault(CONCURRENT_REQUESTS, -1); + builder = builder.withConcurrentRequests(concurrentRequests); return builder.build(); } @@ -518,7 +518,7 @@ public static class Builder { private String serverlessCollectionName; private String serverlessVpceId; private boolean requestCompressionEnabled; - private Integer clients; + private Integer concurrentRequests; private void validateStsRoleArn(final String awsStsRoleArn) { final Arn arn = getArn(awsStsRoleArn); @@ -648,8 +648,8 @@ public Builder withRequestCompressionEnabled(final boolean requestCompressionEna return this; } - public Builder withClients(final Integer clients) { - this.clients = clients; + public Builder withConcurrentRequests(final Integer concurrentRequests) { + this.concurrentRequests = concurrentRequests; return this; } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index aefdd0a889..a2bdc06d8c 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -49,8 +49,11 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapper; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapperFactory; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkOperationWriter; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.InlineRequestSender; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingCompressedBulkRequest; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.JavaClientAccumulatingUncompressedBulkRequest; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.ConcurrentRequestSender; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.RequestSender; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation; import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperationConverter; @@ -80,11 +83,6 @@ import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -192,7 +190,12 @@ public OpenSearchSink(final PluginSetting pluginSetting, dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting); } - this.requestSender = new RequestSender(openSearchSinkConfig.getConnectionConfiguration().getClients()); + final int concurrentRequests = openSearchSinkConfig.getConnectionConfiguration().getConcurrentRequests(); + if (concurrentRequests > 0) { + this.requestSender = new ConcurrentRequestSender(concurrentRequests); + } else { + this.requestSender = new InlineRequestSender(); + } } @Override @@ -499,10 +502,10 @@ SerializedJson getDocument(final Event event) { } private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { - requestSender.sendRequest(() -> doFlushBatch(accumulatingBulkRequest)); + requestSender.sendRequest(this::doFlushBatch, accumulatingBulkRequest); } - private Void doFlushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { + private void doFlushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { bulkRequestTimer.record(() -> { try { LOG.debug("Sending data to OpenSearch"); @@ -514,8 +517,6 @@ private Void doFlushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { Thread.currentThread().interrupt(); } }); - - return null; } private void logFailureForBulkRequests(final List failedBulkOperations, final Throwable failure) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/RequestSender.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/ConcurrentRequestSender.java similarity index 54% rename from data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/RequestSender.java rename to data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/ConcurrentRequestSender.java index 663805c8d4..b5ac28268e 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/RequestSender.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/ConcurrentRequestSender.java @@ -1,5 +1,6 @@ -package org.opensearch.dataprepper.plugins.sink.opensearch; +package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -9,49 +10,63 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; -public class RequestSender { - private static final Logger LOG = LoggerFactory.getLogger(RequestSender.class); +public class ConcurrentRequestSender implements RequestSender { + private static final Logger LOG = LoggerFactory.getLogger(ConcurrentRequestSender.class); private final List> pendingRequestFutures; - private final ExecutorService requestExecutor; private final CompletionService completionService; private final int concurrentRequestCount; private final ReentrantLock reentrantLock; - public RequestSender(final int concurrentRequestCount) { + public ConcurrentRequestSender(final int concurrentRequestCount) { this.concurrentRequestCount = concurrentRequestCount; pendingRequestFutures = new ArrayList<>(); - requestExecutor = Executors.newFixedThreadPool(concurrentRequestCount); - completionService = new ExecutorCompletionService(requestExecutor); + completionService = new ExecutorCompletionService(Executors.newFixedThreadPool(concurrentRequestCount)); reentrantLock = new ReentrantLock(); } - public void sendRequest(final Callable requestRunnable) { + @VisibleForTesting + ConcurrentRequestSender(final int concurrentRequestCount, final CompletionService completionService) { + this.concurrentRequestCount = concurrentRequestCount; + pendingRequestFutures = new ArrayList<>(); + this.completionService = completionService; + reentrantLock = new ReentrantLock(); + } + + @Override + public void sendRequest(final Consumer requestConsumer, final AccumulatingBulkRequest request) { reentrantLock.lock(); - if (pendingRequestFutures.size() >= concurrentRequestCount) { + if (isRequestQueueFull()) { waitForRequestSlot(); } - final Future future = completionService.submit(requestRunnable); + final Future future = completionService.submit(convertConsumerIntoCallable(requestConsumer, request)); pendingRequestFutures.add(future); reentrantLock.unlock(); } + private Callable convertConsumerIntoCallable(final Consumer requestConsumer, final AccumulatingBulkRequest request) { + return () -> { + requestConsumer.accept(request); + return null; + }; + } + private void waitForRequestSlot() { do { checkFutureCompletion(); if (isRequestQueueFull()) { try { - LOG.info("Request queue is full, waiting for slot to free up"); + LOG.debug("Request queue is full, waiting for slot to free up"); completionService.take(); - } catch (InterruptedException e) { + } catch (final Exception e) { LOG.error("Interrupted while waiting for future completion"); } } @@ -70,12 +85,9 @@ private void checkFutureCompletion() { future.get(); } catch (final Exception e) { LOG.error("Indexing future was cancelled", e); - iterator.remove(); - return; } - } - - if (future.isDone()) { + iterator.remove(); + } else if (future.isDone()) { iterator.remove(); } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/InlineRequestSender.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/InlineRequestSender.java new file mode 100644 index 0000000000..8938fbf772 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/InlineRequestSender.java @@ -0,0 +1,11 @@ +package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; + +import java.util.function.Consumer; + +public class InlineRequestSender implements RequestSender { + + @Override + public void sendRequest(final Consumer requestConsumer, final AccumulatingBulkRequest request) { + requestConsumer.accept(request); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/RequestSender.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/RequestSender.java new file mode 100644 index 0000000000..4472b4fb61 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/RequestSender.java @@ -0,0 +1,13 @@ +package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; + +import java.util.function.Consumer; + +public interface RequestSender { + /** + * Executes the provided request with the provided consumer + * + * @param requestConsumer - the consumer function of the request that performs the work to execute the request + * @param request - the request to be consumed + */ + void sendRequest(Consumer requestConsumer, AccumulatingBulkRequest request); +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java index e5503cd9a0..ff8f6db169 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java @@ -86,6 +86,7 @@ void testReadConnectionConfigurationDefault() { assertNull(connectionConfiguration.getSocketTimeout()); assertEquals(TEST_PIPELINE_NAME, connectionConfiguration.getPipelineName()); assertTrue(connectionConfiguration.isRequestCompressionEnabled()); + assertEquals(-1, connectionConfiguration.getConcurrentRequests()); } @Test @@ -648,6 +649,21 @@ void testCreateClient_WithConnectionConfigurationBuilder_ProxyOptionalObjectShou client.close(); } + @Test + void testConcurrentRequestsSetting() { + final Map metadata = new HashMap<>(); + metadata.put("hosts", TEST_HOSTS); + metadata.put("username", UUID.randomUUID().toString()); + metadata.put("password", UUID.randomUUID().toString()); + metadata.put("connect_timeout", 1); + metadata.put("socket_timeout", 1); + metadata.put("concurrent_requests", 32); + final PluginSetting pluginSetting = getPluginSettingByConfigurationMetadata(metadata); + final ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.readConnectionConfiguration(pluginSetting); + + assertThat(connectionConfiguration.getConcurrentRequests(), equalTo(32)); + } + private PluginSetting generatePluginSetting( final List hosts, final String username, final String password, final Integer connectTimeout, final Integer socketTimeout, final boolean awsSigv4, final String awsRegion, diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/ConcurrentRequestSenderTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/ConcurrentRequestSenderTest.java new file mode 100644 index 0000000000..c00113b516 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/ConcurrentRequestSenderTest.java @@ -0,0 +1,178 @@ +package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Random; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.function.Consumer; +import java.util.stream.IntStream; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class ConcurrentRequestSenderTest { + private static final int CONCURRENT_REQUESTS = 1 + new Random().nextInt(9); + + @Mock + private AccumulatingBulkRequest request; + @Mock + private Consumer requestConsumer; + @Mock + private CompletionService completionService; + @Mock + private Future future; + + private ConcurrentRequestSender concurrentRequestSender; + + @BeforeEach + void setup() { + MockitoAnnotations.openMocks(this); + concurrentRequestSender = new ConcurrentRequestSender(CONCURRENT_REQUESTS, completionService); + + when(completionService.submit(any())).thenReturn(future); + } + + @AfterEach + void tearDown() { + verifyNoMoreInteractions(request, requestConsumer, completionService, future); + } + + @Test + void sendRequest_QueueHasSlots_Success() { + concurrentRequestSender.sendRequest(requestConsumer, request); + + verify(completionService).submit(any()); + } + + @Test + void sendRequest_NoTakeIfQueueFutureCompleted(){ + final int concurrency = 3; + concurrentRequestSender = new ConcurrentRequestSender(concurrency, completionService); + + when(future.isCancelled()).thenReturn(false); + when(future.isDone()) + .thenReturn(false) + .thenReturn(false) + .thenReturn(true); + + IntStream.range(0, concurrency + 1).forEach(i -> concurrentRequestSender.sendRequest(requestConsumer, request)); + + verify(completionService, times(concurrency + 1)).submit(any()); + verify(future, times(concurrency)).isCancelled(); + verify(future, times(concurrency)).isDone(); + } + + @Test + void sendRequest_NoTake_MultipleFuturesCompleted(){ + final int concurrency = 3; + concurrentRequestSender = new ConcurrentRequestSender(concurrency, completionService); + + when(future.isCancelled()).thenReturn(false); + when(future.isDone()) + .thenReturn(false) + .thenReturn(true) + .thenReturn(true); + + IntStream.range(0, concurrency + 2).forEach(i -> concurrentRequestSender.sendRequest(requestConsumer, request)); + + verify(completionService, times(concurrency + 2)).submit(any()); + verify(future, times(concurrency)).isCancelled(); + verify(future, times(concurrency)).isDone(); + } + + @Test + void sendRequest_NoTake_FutureCancelled() throws ExecutionException, InterruptedException { + final int concurrency = 3; + concurrentRequestSender = new ConcurrentRequestSender(concurrency, completionService); + + when(future.isCancelled()) + .thenReturn(false) + .thenReturn(true) + .thenReturn(false); + when(future.isDone()).thenReturn(false); + when(future.get()).thenThrow(new InterruptedException()); + + IntStream.range(0, concurrency + 1).forEach(i -> concurrentRequestSender.sendRequest(requestConsumer, request)); + + verify(completionService, times(concurrency + 1)).submit(any()); + verify(future).get(); + verify(future, times(concurrency)).isCancelled(); + verify(future, times(concurrency - 1)).isDone(); + } + + @Test + void sendRequest_NoTake_FutureCancelled_NonExceptional() throws ExecutionException, InterruptedException { + final int concurrency = 3; + concurrentRequestSender = new ConcurrentRequestSender(concurrency, completionService); + + when(future.isCancelled()) + .thenReturn(false) + .thenReturn(true) + .thenReturn(false); + when(future.isDone()).thenReturn(false); + when(future.get()).thenReturn(null); + + IntStream.range(0, concurrency + 1).forEach(i -> concurrentRequestSender.sendRequest(requestConsumer, request)); + + verify(completionService, times(concurrency + 1)).submit(any()); + verify(future).get(); + verify(future, times(concurrency)).isCancelled(); + verify(future, times(concurrency - 1)).isDone(); + } + + @Test + void sendRequest_TakesToBlockForFutureCompletion() throws InterruptedException { + final int concurrency = 3; + concurrentRequestSender = new ConcurrentRequestSender(concurrency, completionService); + + when(future.isCancelled()).thenReturn(false); + when(future.isDone()) + .thenReturn(false) + .thenReturn(false) + .thenReturn(false) + .thenReturn(true); + when(completionService.take()).thenReturn(future); + + IntStream.range(0, concurrency + 1).forEach(i -> concurrentRequestSender.sendRequest(requestConsumer, request)); + + verify(completionService, times(concurrency + 1)).submit(any()); + verify(completionService).take(); + verify(future, times(concurrency * 2)).isCancelled(); + verify(future, times(concurrency * 2)).isDone(); + } + + @Test + void sendRequest_TakesToBlockForFutureCompletion_TransientException() throws InterruptedException { + final int concurrency = 3; + concurrentRequestSender = new ConcurrentRequestSender(concurrency, completionService); + + when(future.isCancelled()).thenReturn(false); + when(future.isDone()) + .thenReturn(false) + .thenReturn(false) + .thenReturn(false) + .thenReturn(false) + .thenReturn(false) + .thenReturn(false) + .thenReturn(true); + when(completionService.take()) + .thenThrow(new RuntimeException()) + .thenReturn(future); + + IntStream.range(0, concurrency + 1).forEach(i -> concurrentRequestSender.sendRequest(requestConsumer, request)); + + verify(completionService, times(concurrency + 1)).submit(any()); + verify(completionService, times(2)).take(); + verify(future, times(concurrency * 3)).isCancelled(); + verify(future, times(concurrency * 3)).isDone(); + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/InlineRequestSenderTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/InlineRequestSenderTest.java new file mode 100644 index 0000000000..473f95129b --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/InlineRequestSenderTest.java @@ -0,0 +1,50 @@ +package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class InlineRequestSenderTest { + @Mock + private AccumulatingBulkRequest request; + @Mock + private Consumer requestConsumer; + + private InlineRequestSender inlineRequestSender; + + @BeforeEach + void setup() { + MockitoAnnotations.openMocks(this); + inlineRequestSender = new InlineRequestSender(); + } + + @AfterEach + void tearDown() { + verifyNoMoreInteractions(request, requestConsumer); + } + + @Test + void sendRequest_Success() { + inlineRequestSender.sendRequest(requestConsumer, request); + + verify(requestConsumer).accept(request); + } + + @Test + void sendRequest_BubblesUpException() { + doThrow(new RuntimeException()).when(requestConsumer).accept(request); + + assertThrows(RuntimeException.class, () -> inlineRequestSender.sendRequest(requestConsumer, request)); + + verify(requestConsumer).accept(request); + } +}