From 8339e14ce4a4a4f23489be0efe726256b69cb0f7 Mon Sep 17 00:00:00 2001 From: rayshrey <121871912+rayshrey@users.noreply.github.com> Date: Fri, 20 Oct 2023 20:36:20 +0530 Subject: [PATCH] Add tracing instrumentation for indexing paths (#10273) * Add tracing instrumentation for indexing paths Signed-off-by: Shreyansh Ray * Fix failing tests and review changes Signed-off-by: Shreyansh Ray * Fix test failures due to Span not being properly closed Signed-off-by: Shreyansh Ray * Changes to spans in primary and replica actions Signed-off-by: Shreyansh Ray * Review comments fixes and refactoring Signed-off-by: Shreyansh Ray * Precommit auto-changes Signed-off-by: Shreyansh Ray * Add refresh policy as attribute Signed-off-by: Shreyansh Ray * Fix changelog entry Signed-off-by: Shreyansh Ray * Instrument primary/replica write in TransportWriteAction instead of TransportShardBulkAction Signed-off-by: Shreyansh Ray * Modify SpanBuilder Signed-off-by: Shreyansh Ray * spotlessApply and precommit Signed-off-by: Shreyansh Ray * Change span names Signed-off-by: Shreyansh Ray * Pass Noop Tracer instead of injected tracer Signed-off-by: Shreyansh Ray * Reverting previous changes Signed-off-by: Shreyansh Ray * Remove tracer variable from TransportShardBulkAction Signed-off-by: Shreyansh Ray --------- Signed-off-by: Shreyansh Ray --- CHANGELOG.md | 1 + .../action/bulk/TransportBulkAction.java | 110 +++++++++++------- .../action/bulk/TransportShardBulkAction.java | 7 +- .../TransportResyncReplicationAction.java | 7 +- .../replication/TransportWriteAction.java | 24 +++- .../index/seqno/RetentionLeaseSyncAction.java | 7 +- .../telemetry/tracing/AttributeNames.java | 25 ++++ .../telemetry/tracing/SpanBuilder.java | 20 ++++ ...ActionIndicesThatCannotBeCreatedTests.java | 4 +- .../bulk/TransportBulkActionIngestTests.java | 4 +- .../action/bulk/TransportBulkActionTests.java | 3 +- .../bulk/TransportBulkActionTookTests.java | 3 +- .../bulk/TransportShardBulkActionTests.java | 13 ++- ...TransportResyncReplicationActionTests.java | 6 +- ...rtWriteActionForIndexingPressureTests.java | 3 +- .../TransportWriteActionTests.java | 6 +- .../seqno/RetentionLeaseSyncActionTests.java | 12 +- .../snapshots/SnapshotResiliencyTests.java | 9 +- 18 files changed, 192 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 76d1f57d7badb..5fdb62041bccd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [BUG] Fix java.lang.SecurityException in repository-gcs plugin ([#10642](https://github.com/opensearch-project/OpenSearch/pull/10642)) - Add telemetry tracer/metric enable flag and integ test. ([#10395](https://github.com/opensearch-project/OpenSearch/pull/10395)) - Performance improvement for Datetime field caching ([#4558](https://github.com/opensearch-project/OpenSearch/issues/4558)) +- Add instrumentation for indexing in transport bulk action and transport shard bulk action. ([#10273](https://github.com/opensearch-project/OpenSearch/pull/10273)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 387aff7194813..9796afe28f8a8 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -86,6 +86,11 @@ import org.opensearch.ingest.IngestService; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.listener.TraceableActionListener; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.TransportService; @@ -134,6 +139,7 @@ public class TransportBulkAction extends HandledTransportAction() { - @Override - public void onResponse(BulkShardResponse bulkShardResponse) { - for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { - // we may have no response if item failed - if (bulkItemResponse.getResponse() != null) { - bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); - } - docStatusStats.inc(bulkItemResponse.status()); - responses.set(bulkItemResponse.getItemId(), bulkItemResponse); - } + final Span span = tracer.startSpan(SpanBuilder.from("bulkShardAction", nodeId, bulkShardRequest)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + shardBulkAction.execute( + bulkShardRequest, + TraceableActionListener.create(ActionListener.runBefore(new ActionListener() { + @Override + public void onResponse(BulkShardResponse bulkShardResponse) { + for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { + // we may have no response if item failed + if (bulkItemResponse.getResponse() != null) { + bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); + } - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } + docStatusStats.inc(bulkItemResponse.status()); + responses.set(bulkItemResponse.getItemId(), bulkItemResponse); + } - @Override - public void onFailure(Exception e) { - // create failures for all relevant requests - for (BulkItemRequest request : requests) { - final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); - final DocWriteRequest docWriteRequest = request.request(); - final BulkItemResponse bulkItemResponse = new BulkItemResponse( - request.id(), - docWriteRequest.opType(), - new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) - ); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } - docStatusStats.inc(bulkItemResponse.status()); - responses.set(request.id(), bulkItemResponse); - } + @Override + public void onFailure(Exception e) { + // create failures for all relevant requests + for (BulkItemRequest request : requests) { + final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); + final DocWriteRequest docWriteRequest = request.request(); + final BulkItemResponse bulkItemResponse = new BulkItemResponse( + request.id(), + docWriteRequest.opType(), + new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) + ); + + docStatusStats.inc(bulkItemResponse.status()); + responses.set(request.id(), bulkItemResponse); + } - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } - private void finishHim() { - indicesService.addDocStatusStats(docStatusStats); - listener.onResponse( - new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) - ); - } - }, releasable::close)); + private void finishHim() { + indicesService.addDocStatusStats(docStatusStats); + listener.onResponse( + new BulkResponse( + responses.toArray(new BulkItemResponse[responses.length()]), + buildTookInMillis(startTimeNanos) + ) + ); + } + }, releasable::close), span, tracer) + ); + } catch (Exception e) { + span.setError(e); + span.endSpan(); + throw e; + } } bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index fddda0ef1f9a7..268a6ed6f85b8 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -99,6 +99,7 @@ import org.opensearch.indices.SystemIndices; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.TransportChannel; @@ -161,7 +162,8 @@ public TransportShardBulkAction( IndexingPressureService indexingPressureService, SegmentReplicationPressureService segmentReplicationPressureService, RemoteStorePressureService remoteStorePressureService, - SystemIndices systemIndices + SystemIndices systemIndices, + Tracer tracer ) { super( settings, @@ -177,7 +179,8 @@ public TransportShardBulkAction( EXECUTOR_NAME_FUNCTION, false, indexingPressureService, - systemIndices + systemIndices, + tracer ); this.updateHelper = updateHelper; this.mappingUpdatedAction = mappingUpdatedAction; diff --git a/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java index 032fe83e2220b..9d60706d1f100 100644 --- a/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java @@ -54,6 +54,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.TransportException; @@ -93,7 +94,8 @@ public TransportResyncReplicationAction( ShardStateAction shardStateAction, ActionFilters actionFilters, IndexingPressureService indexingPressureService, - SystemIndices systemIndices + SystemIndices systemIndices, + Tracer tracer ) { super( settings, @@ -109,7 +111,8 @@ public TransportResyncReplicationAction( EXECUTOR_NAME_FUNCTION, true, /* we should never reject resync because of thread pool capacity on primary */ indexingPressureService, - systemIndices + systemIndices, + tracer ); } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java index a0b5299805868..9ebfa8cfd0df8 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java @@ -59,6 +59,11 @@ import org.opensearch.index.translog.Translog.Location; import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.listener.TraceableActionListener; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -82,6 +87,7 @@ public abstract class TransportWriteAction< protected final SystemIndices systemIndices; private final Function executorFunction; + private final Tracer tracer; protected TransportWriteAction( Settings settings, @@ -97,7 +103,8 @@ protected TransportWriteAction( Function executorFunction, boolean forceExecutionOnPrimary, IndexingPressureService indexingPressureService, - SystemIndices systemIndices + SystemIndices systemIndices, + Tracer tracer ) { // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the // ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class. @@ -119,6 +126,7 @@ protected TransportWriteAction( this.executorFunction = executorFunction; this.indexingPressureService = indexingPressureService; this.systemIndices = systemIndices; + this.tracer = tracer; } protected String executor(IndexShard shard) { @@ -220,7 +228,12 @@ protected void shardOperationOnPrimary( threadPool.executor(executor).execute(new ActionRunnable>(listener) { @Override protected void doRun() { - dispatchedShardOperationOnPrimary(request, primary, listener); + Span span = tracer.startSpan( + SpanBuilder.from("dispatchedShardOperationOnPrimary", clusterService.localNode().getId(), request) + ); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + dispatchedShardOperationOnPrimary(request, primary, TraceableActionListener.create(listener, span, tracer)); + } } @Override @@ -248,7 +261,12 @@ protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replic threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable(listener) { @Override protected void doRun() { - dispatchedShardOperationOnReplica(request, replica, listener); + Span span = tracer.startSpan( + SpanBuilder.from("dispatchedShardOperationOnReplica", clusterService.localNode().getId(), request) + ); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + dispatchedShardOperationOnReplica(request, replica, TraceableActionListener.create(listener, span, tracer)); + } } @Override diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java index f74fc7eefe65c..ca3c7e1d49700 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java @@ -62,6 +62,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportResponseHandler; @@ -99,7 +100,8 @@ public RetentionLeaseSyncAction( final ShardStateAction shardStateAction, final ActionFilters actionFilters, final IndexingPressureService indexingPressureService, - final SystemIndices systemIndices + final SystemIndices systemIndices, + final Tracer tracer ) { super( settings, @@ -115,7 +117,8 @@ public RetentionLeaseSyncAction( ignore -> ThreadPool.Names.MANAGEMENT, false, indexingPressureService, - systemIndices + systemIndices, + tracer ); } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index a9514c298ef88..b6b2cf360d1c5 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -69,4 +69,29 @@ private AttributeNames() { * Action Name. */ public static final String TRANSPORT_ACTION = "action"; + + /** + * Index Name + */ + public static final String INDEX = "index"; + + /** + * Shard ID + */ + public static final String SHARD_ID = "shard_id"; + + /** + * Number of request items in bulk request + */ + public static final String BULK_REQUEST_ITEMS = "bulk_request_items"; + + /** + * Node ID + */ + public static final String NODE_ID = "node_id"; + + /** + * Refresh Policy + */ + public static final String REFRESH_POLICY = "refresh_policy"; } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index d97fbd371ab2a..1dce422943b7a 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -8,6 +8,8 @@ package org.opensearch.telemetry.tracing; +import org.opensearch.action.bulk.BulkShardRequest; +import org.opensearch.action.support.replication.ReplicatedWriteRequest; import org.opensearch.common.annotation.InternalApi; import org.opensearch.core.common.Strings; import org.opensearch.http.HttpRequest; @@ -68,6 +70,10 @@ public static SpanCreationContext from(String action, Transport.Connection conne return SpanCreationContext.server().name(createSpanName(action, connection)).attributes(buildSpanAttributes(action, connection)); } + public static SpanCreationContext from(String spanName, String nodeId, ReplicatedWriteRequest request) { + return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, request)); + } + private static String createSpanName(HttpRequest httpRequest) { return httpRequest.method().name() + SEPARATOR + httpRequest.uri(); } @@ -150,4 +156,18 @@ private static Attributes buildSpanAttributes(String action, TcpChannel tcpChann return attributes; } + private static Attributes buildSpanAttributes(String nodeId, ReplicatedWriteRequest request) { + Attributes attributes = Attributes.create() + .addAttribute(AttributeNames.NODE_ID, nodeId) + .addAttribute(AttributeNames.REFRESH_POLICY, request.getRefreshPolicy().getValue()); + if (request.shardId() != null) { + attributes.addAttribute(AttributeNames.INDEX, request.shardId().getIndexName()) + .addAttribute(AttributeNames.SHARD_ID, request.shardId().getId()); + } + if (request instanceof BulkShardRequest) { + attributes.addAttribute(AttributeNames.BULK_REQUEST_ITEMS, ((BulkShardRequest) request).items().length); + } + return attributes; + } + } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 0f67eff26cbde..cf7080ab2fc06 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -54,6 +54,7 @@ import org.opensearch.index.VersionType; import org.opensearch.indices.SystemIndices; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; @@ -155,7 +156,8 @@ private void indicesThatCannotBeCreatedTestCase( new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), null, - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ) { @Override void executeBulk( diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index 515f6eae28a34..141c630b94020 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -70,6 +70,7 @@ import org.opensearch.indices.SystemIndices; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; @@ -172,7 +173,8 @@ class TestTransportBulkAction extends TransportBulkAction { new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), null, - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java index 10cad6fb147a2..6bbd740df7f9c 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java @@ -118,7 +118,8 @@ class TestTransportBulkAction extends TransportBulkAction { new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())), new IndexingPressureService(Settings.EMPTY, clusterService), mock(IndicesService.class), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java index 852e3837e1e7a..9d5b4430ea395 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java @@ -282,7 +282,8 @@ static class TestTransportBulkAction extends TransportBulkAction { new IndexingPressureService(Settings.EMPTY, clusterService), null, new SystemIndices(emptyMap()), - relativeTimeProvider + relativeTimeProvider, + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index fe0fdd07025d9..b325cfa197933 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -88,6 +88,7 @@ import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; @@ -1074,7 +1075,8 @@ public void testHandlePrimaryTermValidationRequestWithDifferentAllocationId() { mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), mock(RemoteStorePressureService.class), - mock(SystemIndices.class) + mock(SystemIndices.class), + NoopTracer.INSTANCE ); action.handlePrimaryTermValidationRequest( new TransportShardBulkAction.PrimaryTermValidationRequest(aId + "-1", 1, shardId), @@ -1105,7 +1107,8 @@ public void testHandlePrimaryTermValidationRequestWithOlderPrimaryTerm() { mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), mock(RemoteStorePressureService.class), - mock(SystemIndices.class) + mock(SystemIndices.class), + NoopTracer.INSTANCE ); action.handlePrimaryTermValidationRequest( new TransportShardBulkAction.PrimaryTermValidationRequest(aId, 1, shardId), @@ -1136,7 +1139,8 @@ public void testHandlePrimaryTermValidationRequestSuccess() { mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), mock(RemoteStorePressureService.class), - mock(SystemIndices.class) + mock(SystemIndices.class), + NoopTracer.INSTANCE ); action.handlePrimaryTermValidationRequest( new TransportShardBulkAction.PrimaryTermValidationRequest(aId, 1, shardId), @@ -1178,7 +1182,8 @@ private TransportShardBulkAction createAction() { mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), mock(RemoteStorePressureService.class), - mock(SystemIndices.class) + mock(SystemIndices.class), + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java index 3bd8930064563..da87a0a967f53 100644 --- a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java @@ -203,7 +203,8 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { shardStateAction, new ActionFilters(new HashSet<>()), new IndexingPressureService(Settings.EMPTY, clusterService), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); assertThat(action.globalBlockLevel(), nullValue()); @@ -256,7 +257,8 @@ private TransportResyncReplicationAction createAction() { mock(ShardStateAction.class), new ActionFilters(new HashSet<>()), mock(IndexingPressureService.class), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); } } diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java index 4a2185d1558f7..7212b1f5efe13 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java @@ -392,7 +392,8 @@ protected TestAction( ignore -> ThreadPool.Names.SAME, false, TransportWriteActionForIndexingPressureTests.this.indexingPressureService, - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java index 9d2069ac16190..b4549f82230bf 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java @@ -477,7 +477,8 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF ignore -> ThreadPool.Names.SAME, false, new IndexingPressureService(Settings.EMPTY, TransportWriteActionTests.this.clusterService), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; @@ -505,7 +506,8 @@ protected TestAction( ignore -> ThreadPool.Names.SAME, false, new IndexingPressureService(settings, clusterService), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; diff --git a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java index d9bca55a208c2..63a9ac2f2e8ec 100644 --- a/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -125,7 +125,8 @@ public void testRetentionLeaseSyncActionOnPrimary() { shardStateAction, new ActionFilters(Collections.emptySet()), new IndexingPressureService(Settings.EMPTY, clusterService), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); @@ -162,7 +163,8 @@ public void testRetentionLeaseSyncActionOnReplica() throws Exception { shardStateAction, new ActionFilters(Collections.emptySet()), new IndexingPressureService(Settings.EMPTY, clusterService), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); @@ -203,7 +205,8 @@ public void testBlocks() { shardStateAction, new ActionFilters(Collections.emptySet()), new IndexingPressureService(Settings.EMPTY, clusterService), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); assertNull(action.indexBlockLevel()); @@ -233,7 +236,8 @@ private RetentionLeaseSyncAction createAction() { shardStateAction, new ActionFilters(Collections.emptySet()), new IndexingPressureService(Settings.EMPTY, clusterService), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 03ebc222a6779..b7a2baacba611 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2125,7 +2125,8 @@ public void onFailure(final Exception e) { shardStateAction, actionFilters, new IndexingPressureService(settings, clusterService), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ) ), new GlobalCheckpointSyncAction( @@ -2192,7 +2193,8 @@ public void onFailure(final Exception e) { mock(ThreadPool.class) ), mock(RemoteStorePressureService.class), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ); actions.put( BulkAction.INSTANCE, @@ -2216,7 +2218,8 @@ public void onFailure(final Exception e) { new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())), new IndexingPressureService(settings, clusterService), mock(IndicesService.class), - new SystemIndices(emptyMap()) + new SystemIndices(emptyMap()), + NoopTracer.INSTANCE ) ); final RestoreService restoreService = new RestoreService(