Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add tracing instrumentation for indexing paths #10962

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [BUG] Fix java.lang.SecurityException in repository-gcs plugin ([#10642](https://github.com/opensearch-project/OpenSearch/pull/10642))
- Add telemetry tracer/metric enable flag and integ test. ([#10395](https://github.com/opensearch-project/OpenSearch/pull/10395))
- Performance improvement for Datetime field caching ([#4558](https://github.com/opensearch-project/OpenSearch/issues/4558))
- Add instrumentation for indexing in transport bulk action and transport shard bulk action. ([#10273](https://github.com/opensearch-project/OpenSearch/pull/10273))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -134,6 +139,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexingPressureService indexingPressureService;
private final IndicesService indicesService;
private final SystemIndices systemIndices;
private final Tracer tracer;

@Inject
public TransportBulkAction(
Expand All @@ -148,7 +154,8 @@ public TransportBulkAction(
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
this(
threadPool,
Expand All @@ -163,7 +170,8 @@ public TransportBulkAction(
indexingPressureService,
indicesService,
systemIndices,
System::nanoTime
System::nanoTime,
tracer
);
}

Expand All @@ -180,7 +188,8 @@ public TransportBulkAction(
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
LongSupplier relativeTimeProvider,
Tracer tracer
) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
Objects.requireNonNull(relativeTimeProvider);
Expand All @@ -197,6 +206,7 @@ public TransportBulkAction(
this.indicesService = indicesService;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
this.tracer = tracer;
}

/**
Expand Down Expand Up @@ -647,52 +657,66 @@ protected void doRun() {
bulkShardRequest::ramBytesUsed,
isOnlySystem
);
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

docStatusStats.inc(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
final Span span = tracer.startSpan(SpanBuilder.from("bulkShardAction", nodeId, bulkShardRequest));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
shardBulkAction.execute(
bulkShardRequest,
TraceableActionListener.create(ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

if (counter.decrementAndGet() == 0) {
finishHim();
}
}
docStatusStats.inc(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}

@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}
@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}

if (counter.decrementAndGet() == 0) {
finishHim();
}
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

private void finishHim() {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
);
}
}, releasable::close));
private void finishHim() {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(
responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)
)
);
}
}, releasable::close), span, tracer)
);
} catch (Exception e) {
span.setError(e);
span.endSpan();
throw e;
}
}
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.opensearch.indices.SystemIndices;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -161,7 +162,8 @@ public TransportShardBulkAction(
IndexingPressureService indexingPressureService,
SegmentReplicationPressureService segmentReplicationPressureService,
RemoteStorePressureService remoteStorePressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
super(
settings,
Expand All @@ -177,7 +179,8 @@ public TransportShardBulkAction(
EXECUTOR_NAME_FUNCTION,
false,
indexingPressureService,
systemIndices
systemIndices,
tracer
);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportException;
Expand Down Expand Up @@ -93,7 +94,8 @@ public TransportResyncReplicationAction(
ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
super(
settings,
Expand All @@ -109,7 +111,8 @@ public TransportResyncReplicationAction(
EXECUTOR_NAME_FUNCTION,
true, /* we should never reject resync because of thread pool capacity on primary */
indexingPressureService,
systemIndices
systemIndices,
tracer
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
import org.opensearch.index.translog.Translog.Location;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -82,6 +87,7 @@ public abstract class TransportWriteAction<
protected final SystemIndices systemIndices;

private final Function<IndexShard, String> executorFunction;
private final Tracer tracer;

protected TransportWriteAction(
Settings settings,
Expand All @@ -97,7 +103,8 @@ protected TransportWriteAction(
Function<IndexShard, String> executorFunction,
boolean forceExecutionOnPrimary,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
// ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class.
Expand All @@ -119,6 +126,7 @@ protected TransportWriteAction(
this.executorFunction = executorFunction;
this.indexingPressureService = indexingPressureService;
this.systemIndices = systemIndices;
this.tracer = tracer;
}

protected String executor(IndexShard shard) {
Expand Down Expand Up @@ -220,7 +228,12 @@ protected void shardOperationOnPrimary(
threadPool.executor(executor).execute(new ActionRunnable<PrimaryResult<ReplicaRequest, Response>>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnPrimary(request, primary, listener);
Span span = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnPrimary", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnPrimary(request, primary, TraceableActionListener.create(listener, span, tracer));
}
}

@Override
Expand Down Expand Up @@ -248,7 +261,12 @@ protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replic
threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable<ReplicaResult>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnReplica(request, replica, listener);
Span span = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnReplica", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnReplica(request, replica, TraceableActionListener.create(listener, span, tracer));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
Expand Down Expand Up @@ -99,7 +100,8 @@ public RetentionLeaseSyncAction(
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexingPressureService indexingPressureService,
final SystemIndices systemIndices
final SystemIndices systemIndices,
final Tracer tracer
) {
super(
settings,
Expand All @@ -115,7 +117,8 @@ public RetentionLeaseSyncAction(
ignore -> ThreadPool.Names.MANAGEMENT,
false,
indexingPressureService,
systemIndices
systemIndices,
tracer
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,29 @@ private AttributeNames() {
* Action Name.
*/
public static final String TRANSPORT_ACTION = "action";

/**
* Index Name
*/
public static final String INDEX = "index";

/**
* Shard ID
*/
public static final String SHARD_ID = "shard_id";

/**
* Number of request items in bulk request
*/
public static final String BULK_REQUEST_ITEMS = "bulk_request_items";

/**
* Node ID
*/
public static final String NODE_ID = "node_id";

/**
* Refresh Policy
*/
public static final String REFRESH_POLICY = "refresh_policy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.telemetry.tracing;

import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.support.replication.ReplicatedWriteRequest;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.core.common.Strings;
import org.opensearch.http.HttpRequest;
Expand Down Expand Up @@ -68,6 +70,10 @@ public static SpanCreationContext from(String action, Transport.Connection conne
return SpanCreationContext.server().name(createSpanName(action, connection)).attributes(buildSpanAttributes(action, connection));
}

public static SpanCreationContext from(String spanName, String nodeId, ReplicatedWriteRequest request) {
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, request));
}

private static String createSpanName(HttpRequest httpRequest) {
return httpRequest.method().name() + SEPARATOR + httpRequest.uri();
}
Expand Down Expand Up @@ -150,4 +156,18 @@ private static Attributes buildSpanAttributes(String action, TcpChannel tcpChann
return attributes;
}

private static Attributes buildSpanAttributes(String nodeId, ReplicatedWriteRequest request) {
Attributes attributes = Attributes.create()
.addAttribute(AttributeNames.NODE_ID, nodeId)
.addAttribute(AttributeNames.REFRESH_POLICY, request.getRefreshPolicy().getValue());
if (request.shardId() != null) {
attributes.addAttribute(AttributeNames.INDEX, request.shardId().getIndexName())
.addAttribute(AttributeNames.SHARD_ID, request.shardId().getId());
}
if (request instanceof BulkShardRequest) {
attributes.addAttribute(AttributeNames.BULK_REQUEST_ITEMS, ((BulkShardRequest) request).items().length);
}
return attributes;
}

}
Loading
Loading