diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 8d69e98220b69..8ceecb3abb4a2 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -11,7 +11,7 @@ # 3. Use the command palette to run the CODEOWNERS: Show owners of current file command, which will display all code owners for the current file. # Default ownership for all repo files -* @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah +* @anasalkouz @andrross @ashking94 @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah /modules/transport-netty4/ @peternied diff --git a/CHANGELOG.md b/CHANGELOG.md index c6b2d815750f9..e9470d9bb4727 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/)) - Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865)) - [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782)) +- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554)) - Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445)) - Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439)) @@ -33,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Updated the `indices.query.bool.max_clause_count` setting from being static to dynamically updateable ([#13568](https://github.com/opensearch-project/OpenSearch/pull/13568)) - Make the class CommunityIdProcessor final ([#14448](https://github.com/opensearch-project/OpenSearch/pull/14448)) - Allow @InternalApi annotation on classes not meant to be constructed outside of the OpenSearch core ([#14575](https://github.com/opensearch-project/OpenSearch/pull/14575)) +- Add @InternalApi annotation to japicmp exclusions ([#14597](https://github.com/opensearch-project/OpenSearch/pull/14597)) ### Deprecated @@ -50,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add ListPitInfo::getKeepAlive() getter ([#14495](https://github.com/opensearch-project/OpenSearch/pull/14495)) - Fix FuzzyQuery in keyword field will use IndexOrDocValuesQuery when both of index and doc_value are true ([#14378](https://github.com/opensearch-project/OpenSearch/pull/14378)) - Fix file cache initialization ([#14004](https://github.com/opensearch-project/OpenSearch/pull/14004)) +- Handle NPE in GetResult if "found" field is missing ([#14552](https://github.com/opensearch-project/OpenSearch/pull/14552)) ### Security diff --git a/server/build.gradle b/server/build.gradle index b8a99facbf964..429af5d0ac258 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -409,6 +409,7 @@ tasks.register("japicmp", me.champeau.gradle.japicmp.JapicmpTask) { failOnModification = true ignoreMissingClasses = true annotationIncludes = ['@org.opensearch.common.annotation.PublicApi', '@org.opensearch.common.annotation.DeprecatedApi'] + annotationExcludes = ['@org.opensearch.common.annotation.InternalApi'] txtOutputFile = layout.buildDirectory.file("reports/java-compatibility/report.txt") htmlOutputFile = layout.buildDirectory.file("reports/java-compatibility/report.html") dependsOn downloadJapicmpCompareTarget diff --git a/server/src/internalClusterTest/java/org/opensearch/index/codec/CompositeCodecIT.java b/server/src/internalClusterTest/java/org/opensearch/index/codec/CompositeCodecIT.java deleted file mode 100644 index f78e8f501430d..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/index/codec/CompositeCodecIT.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.opensearch.index.codec;public class CompositeCodecIT { -} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java index f384908bc6b65..affbc7ba66cb8 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java @@ -34,12 +34,12 @@ */ public class RemoteClusterStateCustoms extends AbstractRemoteWritableBlobEntity { public static final String CLUSTER_STATE_CUSTOM = "cluster-state-custom"; + public final ChecksumWritableBlobStoreFormat clusterStateCustomsFormat; private long stateVersion; private final String customType; private ClusterState.Custom custom; private final NamedWriteableRegistry namedWriteableRegistry; - private final ChecksumWritableBlobStoreFormat clusterStateCustomsFormat; public RemoteClusterStateCustoms( final ClusterState.Custom custom, diff --git a/server/src/main/java/org/opensearch/index/get/GetResult.java b/server/src/main/java/org/opensearch/index/get/GetResult.java index c0dd1cd2ecb30..27a2826f71e19 100644 --- a/server/src/main/java/org/opensearch/index/get/GetResult.java +++ b/server/src/main/java/org/opensearch/index/get/GetResult.java @@ -37,6 +37,7 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.document.DocumentField; import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.core.common.ParsingException; import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; @@ -56,6 +57,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -398,6 +400,14 @@ public static GetResult fromXContentEmbedded(XContentParser parser, String index } } } + + if (found == null) { + throw new ParsingException( + parser.getTokenLocation(), + String.format(Locale.ROOT, "Missing required field [%s]", GetResult.FOUND) + ); + } + return new GetResult(index, id, seqNo, primaryTerm, version, found, source, documentFields, metaFields); } diff --git a/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java b/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java new file mode 100644 index 0000000000000..55413b9bbdad1 --- /dev/null +++ b/server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; + +/** + * Abstract base class for batch processors. + * + * @opensearch.internal + */ +public abstract class AbstractBatchingProcessor extends AbstractProcessor { + + public static final String BATCH_SIZE_FIELD = "batch_size"; + private static final int DEFAULT_BATCH_SIZE = 1; + protected final int batchSize; + + protected AbstractBatchingProcessor(String tag, String description, int batchSize) { + super(tag, description); + this.batchSize = batchSize; + } + + /** + * Internal logic to process batched documents, must be implemented by concrete batch processors. + * + * @param ingestDocumentWrappers {@link List} of {@link IngestDocumentWrapper} to be processed. + * @param handler {@link Consumer} to be called with the results of the processing. + */ + protected abstract void subBatchExecute( + List ingestDocumentWrappers, + Consumer> handler + ); + + @Override + public void batchExecute(List ingestDocumentWrappers, Consumer> handler) { + if (ingestDocumentWrappers.isEmpty()) { + handler.accept(Collections.emptyList()); + return; + } + + // if batch size is larger than document size, send one batch + if (this.batchSize >= ingestDocumentWrappers.size()) { + subBatchExecute(ingestDocumentWrappers, handler); + return; + } + + // split documents into multiple batches and send each batch to batch processors + List> batches = cutBatches(ingestDocumentWrappers); + int size = ingestDocumentWrappers.size(); + AtomicInteger counter = new AtomicInteger(size); + List allResults = Collections.synchronizedList(new ArrayList<>()); + for (List batch : batches) { + this.subBatchExecute(batch, batchResults -> { + allResults.addAll(batchResults); + if (counter.addAndGet(-batchResults.size()) == 0) { + handler.accept(allResults); + } + assert counter.get() >= 0 : "counter is negative"; + }); + } + } + + private List> cutBatches(List ingestDocumentWrappers) { + List> batches = new ArrayList<>(); + for (int i = 0; i < ingestDocumentWrappers.size(); i += this.batchSize) { + batches.add(ingestDocumentWrappers.subList(i, Math.min(i + this.batchSize, ingestDocumentWrappers.size()))); + } + return batches; + } + + /** + * Factory class for creating {@link AbstractBatchingProcessor} instances. + * + * @opensearch.internal + */ + public abstract static class Factory implements Processor.Factory { + final String processorType; + + protected Factory(String processorType) { + this.processorType = processorType; + } + + /** + * Creates a new processor instance. + * + * @param processorFactories The processor factories. + * @param tag The processor tag. + * @param description The processor description. + * @param config The processor configuration. + * @return The new AbstractBatchProcessor instance. + * @throws Exception If the processor could not be created. + */ + @Override + public AbstractBatchingProcessor create( + Map processorFactories, + String tag, + String description, + Map config + ) throws Exception { + int batchSize = ConfigurationUtils.readIntProperty(this.processorType, tag, config, BATCH_SIZE_FIELD, DEFAULT_BATCH_SIZE); + if (batchSize < 1) { + throw newConfigurationException(this.processorType, tag, BATCH_SIZE_FIELD, "batch size must be a positive integer"); + } + return newProcessor(tag, description, batchSize, config); + } + + /** + * Returns a new processor instance. + * + * @param tag tag of the processor + * @param description description of the processor + * @param batchSize batch size of the processor + * @param config configuration of the processor + * @return a new batch processor instance + */ + protected abstract AbstractBatchingProcessor newProcessor( + String tag, + String description, + int batchSize, + Map config + ); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 41e1546ead164..fe9ed57fa77b8 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -17,9 +17,10 @@ import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.TestCapturingListener; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; @@ -28,6 +29,7 @@ import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; +import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes; import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -36,46 +38,63 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import java.io.IOException; +import java.io.InputStream; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; -import static java.util.Collections.emptyList; +import static org.opensearch.common.blobstore.stream.write.WritePriority.URGENT; +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTE; +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS_FORMAT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks; +import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; +import static org.opensearch.gateway.remote.model.RemoteClusterStateCustomsTests.getClusterStateCustom; import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodes.DISCOVERY_NODES_FORMAT; import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodesTests.getDiscoveryNodes; +import static org.opensearch.index.remote.RemoteStoreUtils.invertLong; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyIterable; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class RemoteClusterStateAttributesManagerTests extends OpenSearchTestCase { private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager; private BlobStoreTransferService blobStoreTransferService; - private BlobStoreRepository blobStoreRepository; private Compressor compressor; - private ThreadPool threadPool = new TestThreadPool(RemoteClusterStateAttributesManagerTests.class.getName()); + private final ThreadPool threadPool = new TestThreadPool(RemoteClusterStateAttributesManagerTests.class.getName()); + private final long VERSION = 7331L; + private NamedWriteableRegistry namedWriteableRegistry; + private final String CLUSTER_NAME = "test-cluster"; + private final String CLUSTER_UUID = "test-cluster-uuid"; @Before public void setup() throws Exception { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(emptyList()); - blobStoreRepository = mock(BlobStoreRepository.class); + namedWriteableRegistry = writableRegistry(); + BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class); + when(blobStoreRepository.basePath()).thenReturn(new BlobPath()); blobStoreTransferService = mock(BlobStoreTransferService.class); compressor = new NoneCompressor(); remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager( - "test-cluster", + CLUSTER_NAME, blobStoreRepository, blobStoreTransferService, writableRegistry(), @@ -89,7 +108,40 @@ public void tearDown() throws Exception { threadPool.shutdown(); } - public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException { + public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException, InterruptedException { + DiscoveryNodes discoveryNodes = getDiscoveryNodes(); + RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(discoveryNodes, VERSION, CLUSTER_UUID, compressor); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + final CountDownLatch latch = new CountDownLatch(1); + final TestCapturingListener listener = new TestCapturingListener<>(); + remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + DISCOVERY_NODES, + remoteDiscoveryNodes, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(DISCOVERY_NODES, listener.getResult().getComponent()); + String uploadedFileName = listener.getResult().getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(DISCOVERY_NODES, splitFileName[0]); + assertEquals(invertLong(VERSION), splitFileName[1]); + assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( @@ -97,29 +149,57 @@ public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException { ); RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); - AtomicReference readDiscoveryNodes = new AtomicReference<>(); - LatchedActionListener assertingListener = new LatchedActionListener<>( - ActionListener.wrap(response -> readDiscoveryNodes.set((DiscoveryNodes) response.getObj()), Assert::assertNull), - latch - ); - CheckedRunnable runnable = remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + TestCapturingListener listener = new TestCapturingListener<>(); + remoteClusterStateAttributesManager.getAsyncMetadataReadAction( DISCOVERY_NODES, remoteObjForDownload, - assertingListener - ); + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(CLUSTER_STATE_ATTRIBUTE, listener.getResult().getComponent()); + assertEquals(DISCOVERY_NODES, listener.getResult().getComponentName()); + DiscoveryNodes readDiscoveryNodes = (DiscoveryNodes) listener.getResult().getObj(); + assertEquals(discoveryNodes.getSize(), readDiscoveryNodes.getSize()); + discoveryNodes.getNodes().forEach((nodeId, node) -> assertEquals(readDiscoveryNodes.get(nodeId), node)); + assertEquals(discoveryNodes.getClusterManagerNodeId(), readDiscoveryNodes.getClusterManagerNodeId()); + } - try { - runnable.run(); - latch.await(); - assertEquals(discoveryNodes.getSize(), readDiscoveryNodes.get().getSize()); - discoveryNodes.getNodes().forEach((nodeId, node) -> assertEquals(readDiscoveryNodes.get().get(nodeId), node)); - assertEquals(discoveryNodes.getClusterManagerNodeId(), readDiscoveryNodes.get().getClusterManagerNodeId()); - } catch (Exception e) { - throw new RuntimeException(e); - } + public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException, InterruptedException { + ClusterBlocks clusterBlocks = randomClusterBlocks(); + RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(clusterBlocks, VERSION, CLUSTER_UUID, compressor); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + final CountDownLatch latch = new CountDownLatch(1); + final TestCapturingListener listener = new TestCapturingListener<>(); + remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + CLUSTER_BLOCKS, + remoteClusterBlocks, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(CLUSTER_BLOCKS, listener.getResult().getComponent()); + String uploadedFileName = listener.getResult().getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(CLUSTER_BLOCKS, splitFileName[0]); + assertEquals(invertLong(VERSION), splitFileName[1]); + assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException { + public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, InterruptedException { ClusterBlocks clusterBlocks = randomClusterBlocks(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( @@ -127,29 +207,133 @@ public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException { ); RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); - AtomicReference readClusterBlocks = new AtomicReference<>(); - LatchedActionListener assertingListener = new LatchedActionListener<>( - ActionListener.wrap(response -> readClusterBlocks.set((ClusterBlocks) response.getObj()), Assert::assertNull), - latch - ); + TestCapturingListener listener = new TestCapturingListener<>(); - CheckedRunnable runnable = remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncMetadataReadAction( CLUSTER_BLOCKS, remoteClusterBlocks, - assertingListener + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(CLUSTER_STATE_ATTRIBUTE, listener.getResult().getComponent()); + assertEquals(CLUSTER_BLOCKS, listener.getResult().getComponentName()); + ClusterBlocks readClusterBlocks = (ClusterBlocks) listener.getResult().getObj(); + assertEquals(clusterBlocks.global(), readClusterBlocks.global()); + assertEquals(clusterBlocks.indices().keySet(), readClusterBlocks.indices().keySet()); + for (String index : clusterBlocks.indices().keySet()) { + assertEquals(clusterBlocks.indices().get(index), readClusterBlocks.indices().get(index)); + } + } + + public void testGetAsyncMetadataWriteAction_Custom() throws IOException, InterruptedException { + Custom custom = getClusterStateCustom(); + RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms( + custom, + custom.getWriteableName(), + VERSION, + CLUSTER_UUID, + compressor, + namedWriteableRegistry ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + final TestCapturingListener listener = new TestCapturingListener<>(); + final CountDownLatch latch = new CountDownLatch(1); + remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + CLUSTER_STATE_CUSTOM, + remoteClusterStateCustoms, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, custom.getWriteableName()), listener.getResult().getComponent()); + String uploadedFileName = listener.getResult().getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, custom.getWriteableName()), splitFileName[0]); + assertEquals(invertLong(VERSION), splitFileName[1]); + assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } - try { - runnable.run(); - latch.await(); - assertEquals(clusterBlocks.global(), readClusterBlocks.get().global()); - assertEquals(clusterBlocks.indices().keySet(), readClusterBlocks.get().indices().keySet()); - for (String index : clusterBlocks.indices().keySet()) { - assertEquals(clusterBlocks.indices().get(index), readClusterBlocks.get().indices().get(index)); - } - } catch (Exception e) { - throw new RuntimeException(e); - } + public void testGetAsyncMetadataReadAction_Custom() throws IOException, InterruptedException { + Custom custom = getClusterStateCustom(); + String fileName = randomAlphaOfLength(10); + RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms( + fileName, + custom.getWriteableName(), + CLUSTER_UUID, + compressor, + namedWriteableRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + remoteClusterStateCustoms.clusterStateCustomsFormat.serialize(custom, fileName, compressor).streamInput() + ); + TestCapturingListener capturingListener = new TestCapturingListener<>(); + final CountDownLatch latch = new CountDownLatch(1); + remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + CLUSTER_STATE_CUSTOM, + remoteClusterStateCustoms, + new LatchedActionListener<>(capturingListener, latch) + ).run(); + latch.await(); + assertNull(capturingListener.getFailure()); + assertNotNull(capturingListener.getResult()); + assertEquals(custom, capturingListener.getResult().getObj()); + assertEquals(CLUSTER_STATE_ATTRIBUTE, capturingListener.getResult().getComponent()); + assertEquals(CLUSTER_STATE_CUSTOM, capturingListener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_Exception() throws IOException, InterruptedException { + DiscoveryNodes discoveryNodes = getDiscoveryNodes(); + RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(discoveryNodes, VERSION, CLUSTER_UUID, compressor); + + IOException ioException = new IOException("mock test exception"); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onFailure(ioException); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + + TestCapturingListener capturingListener = new TestCapturingListener<>(); + final CountDownLatch latch = new CountDownLatch(1); + remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + DISCOVERY_NODES, + remoteDiscoveryNodes, + new LatchedActionListener<>(capturingListener, latch) + ).run(); + latch.await(); + assertNull(capturingListener.getResult()); + assertTrue(capturingListener.getFailure() instanceof RemoteStateTransferException); + assertEquals(ioException, capturingListener.getFailure().getCause()); + } + + public void testGetAsyncMetadataReadAction_Exception() throws IOException, InterruptedException { + String fileName = randomAlphaOfLength(10); + RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(fileName, CLUSTER_UUID, compressor); + Exception ioException = new IOException("mock test exception"); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); + CountDownLatch latch = new CountDownLatch(1); + TestCapturingListener capturingListener = new TestCapturingListener<>(); + remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + DISCOVERY_NODES, + remoteDiscoveryNodes, + new LatchedActionListener<>(capturingListener, latch) + ).run(); + latch.await(); + assertNull(capturingListener.getResult()); + assertEquals(ioException, capturingListener.getFailure()); } public void testGetUpdatedCustoms() { diff --git a/server/src/test/java/org/opensearch/index/get/GetResultTests.java b/server/src/test/java/org/opensearch/index/get/GetResultTests.java index 64b14744a40d2..2001bb84454cd 100644 --- a/server/src/test/java/org/opensearch/index/get/GetResultTests.java +++ b/server/src/test/java/org/opensearch/index/get/GetResultTests.java @@ -35,12 +35,16 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.document.DocumentField; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.ParsingException; import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.mapper.IdFieldMapper; @@ -220,6 +224,22 @@ public void testEqualsAndHashcode() { ); } + public void testFomXContentEmbeddedFoundParsingException() throws IOException { + String json = "{\"_index\":\"foo\",\"_id\":\"bar\"}"; + try ( + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + json + ) + ) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + ParsingException parsingException = assertThrows(ParsingException.class, () -> GetResult.fromXContentEmbedded(parser)); + assertEquals("Missing required field [found]", parsingException.getMessage()); + } + + } + public static GetResult copyGetResult(GetResult getResult) { return new GetResult( getResult.getIndex(), diff --git a/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java b/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java new file mode 100644 index 0000000000000..54fc30cb5befa --- /dev/null +++ b/server/src/test/java/org/opensearch/ingest/AbstractBatchingProcessorTests.java @@ -0,0 +1,160 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +public class AbstractBatchingProcessorTests extends OpenSearchTestCase { + + public void testBatchExecute_emptyInput() { + DummyProcessor processor = new DummyProcessor(3); + Consumer> handler = (results) -> assertTrue(results.isEmpty()); + processor.batchExecute(Collections.emptyList(), handler); + assertTrue(processor.getSubBatches().isEmpty()); + } + + public void testBatchExecute_singleBatchSize() { + DummyProcessor processor = new DummyProcessor(3); + List wrapperList = Arrays.asList( + IngestDocumentPreparer.createIngestDocumentWrapper(1), + IngestDocumentPreparer.createIngestDocumentWrapper(2), + IngestDocumentPreparer.createIngestDocumentWrapper(3) + ); + List resultList = new ArrayList<>(); + processor.batchExecute(wrapperList, resultList::addAll); + assertEquals(wrapperList, resultList); + assertEquals(1, processor.getSubBatches().size()); + assertEquals(wrapperList, processor.getSubBatches().get(0)); + } + + public void testBatchExecute_multipleBatches() { + DummyProcessor processor = new DummyProcessor(2); + List wrapperList = Arrays.asList( + IngestDocumentPreparer.createIngestDocumentWrapper(1), + IngestDocumentPreparer.createIngestDocumentWrapper(2), + IngestDocumentPreparer.createIngestDocumentWrapper(3), + IngestDocumentPreparer.createIngestDocumentWrapper(4), + IngestDocumentPreparer.createIngestDocumentWrapper(5) + ); + List resultList = new ArrayList<>(); + processor.batchExecute(wrapperList, resultList::addAll); + assertEquals(wrapperList, resultList); + assertEquals(3, processor.getSubBatches().size()); + assertEquals(wrapperList.subList(0, 2), processor.getSubBatches().get(0)); + assertEquals(wrapperList.subList(2, 4), processor.getSubBatches().get(1)); + assertEquals(wrapperList.subList(4, 5), processor.getSubBatches().get(2)); + } + + public void testBatchExecute_randomBatches() { + int batchSize = randomIntBetween(2, 32); + int docCount = randomIntBetween(2, 32); + DummyProcessor processor = new DummyProcessor(batchSize); + List wrapperList = new ArrayList<>(); + for (int i = 0; i < docCount; ++i) { + wrapperList.add(IngestDocumentPreparer.createIngestDocumentWrapper(i)); + } + List resultList = new ArrayList<>(); + processor.batchExecute(wrapperList, resultList::addAll); + assertEquals(wrapperList, resultList); + assertEquals(docCount / batchSize + (docCount % batchSize == 0 ? 0 : 1), processor.getSubBatches().size()); + } + + public void testBatchExecute_defaultBatchSize() { + DummyProcessor processor = new DummyProcessor(1); + List wrapperList = Arrays.asList( + IngestDocumentPreparer.createIngestDocumentWrapper(1), + IngestDocumentPreparer.createIngestDocumentWrapper(2), + IngestDocumentPreparer.createIngestDocumentWrapper(3) + ); + List resultList = new ArrayList<>(); + processor.batchExecute(wrapperList, resultList::addAll); + assertEquals(wrapperList, resultList); + assertEquals(3, processor.getSubBatches().size()); + assertEquals(wrapperList.subList(0, 1), processor.getSubBatches().get(0)); + assertEquals(wrapperList.subList(1, 2), processor.getSubBatches().get(1)); + assertEquals(wrapperList.subList(2, 3), processor.getSubBatches().get(2)); + } + + public void testFactory_invalidBatchSize() { + Map config = new HashMap<>(); + config.put("batch_size", 0); + DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor"); + OpenSearchParseException exception = assertThrows(OpenSearchParseException.class, () -> factory.create(config)); + assertEquals("[batch_size] batch size must be a positive integer", exception.getMessage()); + } + + public void testFactory_defaultBatchSize() throws Exception { + Map config = new HashMap<>(); + DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor"); + DummyProcessor processor = (DummyProcessor) factory.create(config); + assertEquals(1, processor.batchSize); + } + + public void testFactory_callNewProcessor() throws Exception { + Map config = new HashMap<>(); + config.put("batch_size", 3); + DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor"); + DummyProcessor processor = (DummyProcessor) factory.create(config); + assertEquals(3, processor.batchSize); + } + + static class DummyProcessor extends AbstractBatchingProcessor { + private List> subBatches = new ArrayList<>(); + + public List> getSubBatches() { + return subBatches; + } + + protected DummyProcessor(int batchSize) { + super("tag", "description", batchSize); + } + + @Override + public void subBatchExecute(List ingestDocumentWrappers, Consumer> handler) { + subBatches.add(ingestDocumentWrappers); + handler.accept(ingestDocumentWrappers); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + return ingestDocument; + } + + @Override + public String getType() { + return null; + } + + public static class DummyProcessorFactory extends Factory { + + protected DummyProcessorFactory(String processorType) { + super(processorType); + } + + public AbstractBatchingProcessor create(Map config) throws Exception { + final Map processorFactories = new HashMap<>(); + return super.create(processorFactories, "tag", "description", config); + } + + @Override + protected AbstractBatchingProcessor newProcessor(String tag, String description, int batchSize, Map config) { + return new DummyProcessor(batchSize); + } + } + } +}