From 418b7dcb82932c8ae2b8bf017b1fce5a19c57df6 Mon Sep 17 00:00:00 2001 From: Shreyansh Ray Date: Fri, 3 Nov 2023 19:43:08 +0530 Subject: [PATCH] Indexing instrumentation changes for auto-create indices and threadpool queue waiting --- .../action/bulk/TransportBulkAction.java | 6 ++ .../replication/TransportWriteAction.java | 97 +++++++++++++------ .../telemetry/tracing/AttributeNames.java | 5 + .../telemetry/tracing/SpanBuilder.java | 6 ++ 4 files changed, 86 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 4a9b07c12821d..27494c1d3664c 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -325,15 +325,19 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); } else { final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); + Span span = tracer.startSpan(SpanBuilder.from("autoCreateIndex", autoCreateIndices.size())); + SpanScope spanScope = tracer.withSpanInScope(span); for (String index : autoCreateIndices) { createIndex(index, bulkRequest.timeout(), minNodeVersion, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { if (counter.decrementAndGet() == 0) { + span.endSpan(); threadPool.executor(executorName).execute(new ActionRunnable(listener) { @Override protected void doRun() { + spanScope.close(); executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); } }); @@ -352,6 +356,7 @@ public void onFailure(Exception e) { } } if (counter.decrementAndGet() == 0) { + span.endSpan(); final ActionListener wrappedListener = ActionListener.wrap(listener::onResponse, inner -> { inner.addSuppressed(e); listener.onFailure(inner); @@ -359,6 +364,7 @@ public void onFailure(Exception e) { threadPool.executor(executorName).execute(new ActionRunnable(wrappedListener) { @Override protected void doRun() { + spanScope.close(); executeBulk(task, bulkRequest, startTime, wrappedListener, responses, indicesThatCannotBeCreated); } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java index 27f9e6dee83de..eaabfb1dfbadf 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java @@ -265,22 +265,43 @@ protected void shardOperationOnPrimary( ActionListener> listener ) { final String executor = executorFunction.apply(primary); - threadPool.executor(executor).execute(new ActionRunnable>(listener) { - @Override - protected void doRun() { - Span span = tracer.startSpan( - SpanBuilder.from("dispatchedShardOperationOnPrimary", clusterService.localNode().getId(), request) - ); - try (SpanScope spanScope = tracer.withSpanInScope(span)) { - dispatchedShardOperationOnPrimary(request, primary, TraceableActionListener.create(listener, span, tracer)); + Span queueTimeSpan = tracer.startSpan( + SpanBuilder.from("dispatchedShardOperationOnPrimaryQueued", clusterService.localNode().getId(), request) + ); + + try (SpanScope spanScope = tracer.withSpanInScope(queueTimeSpan)) { + threadPool.executor(executor).execute(new ActionRunnable>(listener) { + @Override + public void onFailure(Exception e) { + queueTimeSpan.setError(e); + queueTimeSpan.endSpan(); + super.onFailure(e); } - } - @Override - public boolean isForceExecution() { - return force(request); - } - }); + @Override + public void onRejection(Exception e) { + queueTimeSpan.setError(e); + queueTimeSpan.endSpan(); + super.onRejection(e); + } + + @Override + protected void doRun() { + queueTimeSpan.endSpan(); + Span span = tracer.startSpan( + SpanBuilder.from("dispatchedShardOperationOnPrimary", clusterService.localNode().getId(), request) + ); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + dispatchedShardOperationOnPrimary(request, primary, TraceableActionListener.create(listener, span, tracer)); + } + } + + @Override + public boolean isForceExecution() { + return force(request); + } + }); + } } protected abstract void dispatchedShardOperationOnPrimary( @@ -298,22 +319,42 @@ protected abstract void dispatchedShardOperationOnPrimary( */ @Override protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replica, ActionListener listener) { - threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - Span span = tracer.startSpan( - SpanBuilder.from("dispatchedShardOperationOnReplica", clusterService.localNode().getId(), request) - ); - try (SpanScope spanScope = tracer.withSpanInScope(span)) { - dispatchedShardOperationOnReplica(request, replica, TraceableActionListener.create(listener, span, tracer)); + Span queueTimeSpan = tracer.startSpan( + SpanBuilder.from("dispatchedShardOperationOnReplicaQueued", clusterService.localNode().getId(), request) + ); + try (SpanScope spanScope = tracer.withSpanInScope(queueTimeSpan)) { + threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable(listener) { + @Override + public void onFailure(Exception e) { + queueTimeSpan.setError(e); + queueTimeSpan.endSpan(); + super.onFailure(e); } - } - @Override - public boolean isForceExecution() { - return true; - } - }); + @Override + public void onRejection(Exception e) { + queueTimeSpan.setError(e); + queueTimeSpan.endSpan(); + super.onRejection(e); + } + + @Override + protected void doRun() { + queueTimeSpan.endSpan(); + Span span = tracer.startSpan( + SpanBuilder.from("dispatchedShardOperationOnReplica", clusterService.localNode().getId(), request) + ); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + dispatchedShardOperationOnReplica(request, replica, TraceableActionListener.create(listener, span, tracer)); + } + } + + @Override + public boolean isForceExecution() { + return true; + } + }); + } } protected abstract void dispatchedShardOperationOnReplica( diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index 212ef3c713d8e..b5105b34a2fb5 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -114,4 +114,9 @@ private AttributeNames() { * Search Response Total Hits */ public static final String TOTAL_HITS = "total_hits"; + + /** + * Number of Indices + */ + public static final String NUM_INDICES = "num_indices"; } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index 42e64109b72fd..d37cb5989a21e 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -76,6 +76,12 @@ public static SpanCreationContext from(String spanName, String nodeId, Replicate return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, request)); } + public static SpanCreationContext from(String spanName, int numIndices) { + return SpanCreationContext.server() + .name(spanName) + .attributes(Attributes.create().addAttribute(AttributeNames.NUM_INDICES, numIndices)); + } + private static String createSpanName(HttpRequest httpRequest) { Tuple uriParts = splitUri(httpRequest.uri()); String path = uriParts.v1();