Skip to content

Commit

Permalink
Add tracing instrumentation for indexing paths (#10273)
Browse files Browse the repository at this point in the history
* Add tracing instrumentation for indexing paths

Signed-off-by: Shreyansh Ray <[email protected]>

* Fix failing tests and review changes

Signed-off-by: Shreyansh Ray <[email protected]>

* Fix test failures due to Span not being properly closed

Signed-off-by: Shreyansh Ray <[email protected]>

* Changes to spans in primary and replica actions

Signed-off-by: Shreyansh Ray <[email protected]>

* Review comments fixes and refactoring

Signed-off-by: Shreyansh Ray <[email protected]>

* Precommit auto-changes

Signed-off-by: Shreyansh Ray <[email protected]>

* Add refresh policy as attribute

Signed-off-by: Shreyansh Ray <[email protected]>

* Fix changelog entry

Signed-off-by: Shreyansh Ray <[email protected]>

* Instrument primary/replica write in TransportWriteAction instead of TransportShardBulkAction

Signed-off-by: Shreyansh Ray <[email protected]>

* Modify SpanBuilder

Signed-off-by: Shreyansh Ray <[email protected]>

* spotlessApply and precommit

Signed-off-by: Shreyansh Ray <[email protected]>

* Change span names

Signed-off-by: Shreyansh Ray <[email protected]>

* Pass Noop Tracer instead of injected tracer

Signed-off-by: Shreyansh Ray <[email protected]>

* Reverting previous changes

Signed-off-by: Shreyansh Ray <[email protected]>

* Remove tracer variable from TransportShardBulkAction

Signed-off-by: Shreyansh Ray <[email protected]>

---------

Signed-off-by: Shreyansh Ray <[email protected]>
  • Loading branch information
rayshrey authored Oct 20, 2023
1 parent 5093cc7 commit e691df0
Show file tree
Hide file tree
Showing 18 changed files with 192 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add Remote Store backpressure rejection stats to `_nodes/stats` ([#10524](https://github.com/opensearch-project/OpenSearch/pull/10524))
- [BUG] Fix java.lang.SecurityException in repository-gcs plugin ([#10642](https://github.com/opensearch-project/OpenSearch/pull/10642))
- Add telemetry tracer/metric enable flag and integ test. ([#10395](https://github.com/opensearch-project/OpenSearch/pull/10395))
- Add instrumentation for indexing in transport bulk action and transport shard bulk action. ([#10273](https://github.com/opensearch-project/OpenSearch/pull/10273))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -133,6 +138,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexingPressureService indexingPressureService;
private final IndicesService indicesService;
private final SystemIndices systemIndices;
private final Tracer tracer;

@Inject
public TransportBulkAction(
Expand All @@ -147,7 +153,8 @@ public TransportBulkAction(
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
this(
threadPool,
Expand All @@ -162,7 +169,8 @@ public TransportBulkAction(
indexingPressureService,
indicesService,
systemIndices,
System::nanoTime
System::nanoTime,
tracer
);
}

Expand All @@ -179,7 +187,8 @@ public TransportBulkAction(
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
LongSupplier relativeTimeProvider,
Tracer tracer
) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
Objects.requireNonNull(relativeTimeProvider);
Expand All @@ -196,6 +205,7 @@ public TransportBulkAction(
this.indicesService = indicesService;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
this.tracer = tracer;
}

/**
Expand Down Expand Up @@ -642,52 +652,66 @@ protected void doRun() {
bulkShardRequest::ramBytesUsed,
isOnlySystem
);
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

docStatusStats.inc(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
final Span span = tracer.startSpan(SpanBuilder.from("bulkShardAction", nodeId, bulkShardRequest));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
shardBulkAction.execute(
bulkShardRequest,
TraceableActionListener.create(ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

if (counter.decrementAndGet() == 0) {
finishHim();
}
}
docStatusStats.inc(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}

@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}
@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

docStatusStats.inc(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}

if (counter.decrementAndGet() == 0) {
finishHim();
}
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

private void finishHim() {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
);
}
}, releasable::close));
private void finishHim() {
indicesService.addDocStatusStats(docStatusStats);
listener.onResponse(
new BulkResponse(
responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)
)
);
}
}, releasable::close), span, tracer)
);
} catch (Exception e) {
span.setError(e);
span.endSpan();
throw e;
}
}
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.opensearch.indices.SystemIndices;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -161,7 +162,8 @@ public TransportShardBulkAction(
IndexingPressureService indexingPressureService,
SegmentReplicationPressureService segmentReplicationPressureService,
RemoteStorePressureService remoteStorePressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
super(
settings,
Expand All @@ -177,7 +179,8 @@ public TransportShardBulkAction(
EXECUTOR_NAME_FUNCTION,
false,
indexingPressureService,
systemIndices
systemIndices,
tracer
);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportException;
Expand Down Expand Up @@ -93,7 +94,8 @@ public TransportResyncReplicationAction(
ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
super(
settings,
Expand All @@ -109,7 +111,8 @@ public TransportResyncReplicationAction(
EXECUTOR_NAME_FUNCTION,
true, /* we should never reject resync because of thread pool capacity on primary */
indexingPressureService,
systemIndices
systemIndices,
tracer
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
import org.opensearch.index.translog.Translog.Location;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -82,6 +87,7 @@ public abstract class TransportWriteAction<
protected final SystemIndices systemIndices;

private final Function<IndexShard, String> executorFunction;
private final Tracer tracer;

protected TransportWriteAction(
Settings settings,
Expand All @@ -97,7 +103,8 @@ protected TransportWriteAction(
Function<IndexShard, String> executorFunction,
boolean forceExecutionOnPrimary,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
// ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class.
Expand All @@ -119,6 +126,7 @@ protected TransportWriteAction(
this.executorFunction = executorFunction;
this.indexingPressureService = indexingPressureService;
this.systemIndices = systemIndices;
this.tracer = tracer;
}

protected String executor(IndexShard shard) {
Expand Down Expand Up @@ -220,7 +228,12 @@ protected void shardOperationOnPrimary(
threadPool.executor(executor).execute(new ActionRunnable<PrimaryResult<ReplicaRequest, Response>>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnPrimary(request, primary, listener);
Span span = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnPrimary", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnPrimary(request, primary, TraceableActionListener.create(listener, span, tracer));
}
}

@Override
Expand Down Expand Up @@ -248,7 +261,12 @@ protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replic
threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable<ReplicaResult>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnReplica(request, replica, listener);
Span span = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnReplica", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnReplica(request, replica, TraceableActionListener.create(listener, span, tracer));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
Expand Down Expand Up @@ -99,7 +100,8 @@ public RetentionLeaseSyncAction(
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexingPressureService indexingPressureService,
final SystemIndices systemIndices
final SystemIndices systemIndices,
final Tracer tracer
) {
super(
settings,
Expand All @@ -115,7 +117,8 @@ public RetentionLeaseSyncAction(
ignore -> ThreadPool.Names.MANAGEMENT,
false,
indexingPressureService,
systemIndices
systemIndices,
tracer
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,29 @@ private AttributeNames() {
* Action Name.
*/
public static final String TRANSPORT_ACTION = "action";

/**
* Index Name
*/
public static final String INDEX = "index";

/**
* Shard ID
*/
public static final String SHARD_ID = "shard_id";

/**
* Number of request items in bulk request
*/
public static final String BULK_REQUEST_ITEMS = "bulk_request_items";

/**
* Node ID
*/
public static final String NODE_ID = "node_id";

/**
* Refresh Policy
*/
public static final String REFRESH_POLICY = "refresh_policy";
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.telemetry.tracing;

import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.support.replication.ReplicatedWriteRequest;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.core.common.Strings;
import org.opensearch.http.HttpRequest;
Expand Down Expand Up @@ -68,6 +70,10 @@ public static SpanCreationContext from(String action, Transport.Connection conne
return SpanCreationContext.server().name(createSpanName(action, connection)).attributes(buildSpanAttributes(action, connection));
}

public static SpanCreationContext from(String spanName, String nodeId, ReplicatedWriteRequest request) {
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, request));
}

private static String createSpanName(HttpRequest httpRequest) {
return httpRequest.method().name() + SEPARATOR + httpRequest.uri();
}
Expand Down Expand Up @@ -150,4 +156,18 @@ private static Attributes buildSpanAttributes(String action, TcpChannel tcpChann
return attributes;
}

private static Attributes buildSpanAttributes(String nodeId, ReplicatedWriteRequest request) {
Attributes attributes = Attributes.create()
.addAttribute(AttributeNames.NODE_ID, nodeId)
.addAttribute(AttributeNames.REFRESH_POLICY, request.getRefreshPolicy().getValue());
if (request.shardId() != null) {
attributes.addAttribute(AttributeNames.INDEX, request.shardId().getIndexName())
.addAttribute(AttributeNames.SHARD_ID, request.shardId().getId());
}
if (request instanceof BulkShardRequest) {
attributes.addAttribute(AttributeNames.BULK_REQUEST_ITEMS, ((BulkShardRequest) request).items().length);
}
return attributes;
}

}
Loading

0 comments on commit e691df0

Please sign in to comment.