Skip to content

Commit

Permalink
Addressing Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
Ajay Kumar Movva committed Nov 12, 2023
1 parent 5ad9f77 commit 07b4141
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.node.NodeClosedException;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -180,7 +181,8 @@ public TransportShardBulkAction(
false,
indexingPressureService,
systemIndices,
tracer
tracer,
AdmissionControlActionType.INDEXING
);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.action.bulk.TransportShardBulkAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.ChannelActionListener;
Expand Down Expand Up @@ -203,6 +202,40 @@ protected TransportReplicationAction(
String executor,
boolean syncGlobalCheckpointAfterOperation,
boolean forceExecutionOnPrimary
) {
this(
settings,
actionName,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
requestReader,
replicaRequestReader,
executor,
syncGlobalCheckpointAfterOperation,
forceExecutionOnPrimary,
null
);
}

protected TransportReplicationAction(
Settings settings,
String actionName,
TransportService transportService,
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters,
Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader,
String executor,
boolean syncGlobalCheckpointAfterOperation,
boolean forceExecutionOnPrimary,
AdmissionControlActionType admissionControlActionType
) {
super(actionName, actionFilters, transportService.getTaskManager());
this.threadPool = threadPool;
Expand All @@ -221,27 +254,8 @@ protected TransportReplicationAction(

transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);

// Register only TransportShardBulkAction for admission control ( primary indexing action )
if (transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)) {
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
AdmissionControlActionType.INDEXING,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
} else {
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
}
// This method will register Primary Request Handler Based on AdmissionControlActionType
registerPrimaryRequestHandler(requestReader, admissionControlActionType);

// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(
Expand All @@ -262,6 +276,38 @@ protected TransportReplicationAction(
clusterSettings.addSettingsUpdateConsumer(REPLICATION_RETRY_TIMEOUT, (v) -> retryTimeout = v);
}

/**
* This method will register handler as based on admissionControlActionType and AdmissionControlHandler will be
* invoked for registered action
* @param requestReader instance of the request reader
* @param admissionControlActionType type of AdmissionControlActionType
*/
private void registerPrimaryRequestHandler(
Writeable.Reader<Request> requestReader,
AdmissionControlActionType admissionControlActionType
) {
if (admissionControlActionType != null) {
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
admissionControlActionType,
in -> new ConcreteShardRequest<>(requestReader, in),

Check warning on line 296 in server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java#L296

Added line #L296 was not covered by tests
this::handlePrimaryRequest
);
} else {
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
in -> new ConcreteShardRequest<>(requestReader, in),

Check warning on line 305 in server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java#L305

Added line #L305 was not covered by tests
this::handlePrimaryRequest
);
}
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.index.translog.Translog.Location;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
Expand Down Expand Up @@ -104,7 +105,8 @@ protected TransportWriteAction(
boolean forceExecutionOnPrimary,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices,
Tracer tracer
Tracer tracer,
AdmissionControlActionType admissionControlActionType
) {
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
// ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class.
Expand All @@ -121,14 +123,52 @@ protected TransportWriteAction(
replicaRequest,
ThreadPool.Names.SAME,
true,
forceExecutionOnPrimary
forceExecutionOnPrimary,
admissionControlActionType
);
this.executorFunction = executorFunction;
this.indexingPressureService = indexingPressureService;
this.systemIndices = systemIndices;
this.tracer = tracer;
}

protected TransportWriteAction(
Settings settings,
String actionName,
TransportService transportService,
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
Writeable.Reader<ReplicaRequest> replicaRequest,
Function<IndexShard, String> executorFunction,
boolean forceExecutionOnPrimary,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices,
Tracer tracer
) {
this(
settings,
actionName,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
request,
replicaRequest,
executorFunction,
forceExecutionOnPrimary,
indexingPressureService,
systemIndices,
tracer,
null
);
}

protected String executor(IndexShard shard) {
return executorFunction.apply(shard);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,6 @@ public NetworkModule(
List<TransportInterceptor> transportInterceptors
) {
this.settings = settings;
if (transportInterceptors != null) {
transportInterceptors.forEach(this::registerTransportInterceptor);
}
for (NetworkPlugin plugin : plugins) {
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(
settings,
Expand Down Expand Up @@ -193,6 +190,10 @@ public NetworkModule(
registerTransportInterceptor(interceptor);
}
}
// Adding last because interceptors are triggered from last to first order from the list
if (transportInterceptors != null) {
transportInterceptors.forEach(this::registerTransportInterceptor);
}
}

/** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,7 @@ public void apply(Settings value, Settings current, Settings previous) {
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,9 @@ public List<AdmissionControllerStats> getAdmissionControllerStatsList() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("admission_control");
this.admissionControllerStatsList.forEach(stats -> {
try {
builder.field(stats.getAdmissionControllerName(), stats);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return builder.endObject();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
for (AdmissionControllerStats admissionControllerStats : this.admissionControllerStatsList) {
builder.field(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats);
}
if (obj == null || getClass() != obj.getClass()) return false;
AdmissionControlStats admissionControlStats = (AdmissionControlStats) obj;
return this.getAdmissionControllerStatsList().size() == admissionControlStats.getAdmissionControllerStatsList().size();
}

@Override
public int hashCode() {
return super.hashCode();
return builder.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

/**
* Class for admission controller ( such as CPU ) stats which includes rejection count for each action type
Expand Down Expand Up @@ -64,48 +62,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
{
builder.startObject("rejection_count");
{
this.rejectionCount.forEach((actionType, count) -> {
try {
builder.field(actionType, count);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
for (Map.Entry<String, Long> rejectionCountEntry : this.rejectionCount.entrySet()) {
builder.field(rejectionCountEntry.getKey(), rejectionCountEntry.getValue());
}
}
builder.endObject();
}
builder.endObject();
return builder.endObject();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) return false;
AdmissionControllerStats admissionControllerStats = (AdmissionControllerStats) obj;
return Objects.equals(this.getAdmissionControllerName(), admissionControllerStats.getAdmissionControllerName())
&& Objects.equals(
this.rejectionCount.containsKey(AdmissionControlActionType.SEARCH.getType()),
admissionControllerStats.rejectionCount.containsKey(AdmissionControlActionType.SEARCH.getType())
)
&& Objects.equals(
this.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()),
admissionControllerStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType())
)
&& Objects.equals(
this.rejectionCount.containsKey(AdmissionControlActionType.INDEXING.getType()),
admissionControllerStats.rejectionCount.containsKey(AdmissionControlActionType.INDEXING.getType())
)
&& Objects.equals(
this.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()),
admissionControllerStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType())
);
}

@Override
public int hashCode() {
return super.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
log.warn(openSearchRejectedExecutionException.getMessage());
channel.sendResponse(openSearchRejectedExecutionException);
return;
} catch (final Exception e) {
throw e;
}
}
actualHandler.messageReceived(request, channel, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,14 +547,33 @@ public void testSerialization() throws IOException {
}
AdmissionControlStats admissionControlStats = nodeStats.getAdmissionControlStats();
AdmissionControlStats deserializedAdmissionControlStats = deserializedNodeStats.getAdmissionControlStats();
assertEquals(admissionControlStats, deserializedAdmissionControlStats);
AdmissionControllerStats admissionControllerStats = admissionControlStats.getAdmissionControllerStatsList().get(0);
AdmissionControllerStats deserializedAdmissionControllerStats = deserializedAdmissionControlStats
.getAdmissionControllerStatsList()
.get(0);
assertEquals(1, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()));
assertEquals(2, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()));
assertEquals(admissionControllerStats, deserializedAdmissionControllerStats);
if (admissionControlStats == null) {
assertNull(deserializedAdmissionControlStats);
} else {
assertEquals(
admissionControlStats.getAdmissionControllerStatsList().size(),
deserializedAdmissionControlStats.getAdmissionControllerStatsList().size()
);
AdmissionControllerStats admissionControllerStats = admissionControlStats.getAdmissionControllerStatsList().get(0);
AdmissionControllerStats deserializedAdmissionControllerStats = deserializedAdmissionControlStats
.getAdmissionControllerStatsList()
.get(0);
assertEquals(
admissionControllerStats.getAdmissionControllerName(),
deserializedAdmissionControllerStats.getAdmissionControllerName()
);
assertEquals(1, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()));
assertEquals(
admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()),
deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())
);

assertEquals(2, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()));
assertEquals(
admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()),
deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())
);
}
}
}
}
Expand Down
Loading

0 comments on commit 07b4141

Please sign in to comment.