Skip to content

Commit

Permalink
Restored handling for multi operations in IndexResolverReplacer as it…
Browse files Browse the repository at this point in the history
… is necessary for MT

Signed-off-by: Nils Bandener <[email protected]>
  • Loading branch information
nibix committed Jun 24, 2024
1 parent 02d0f66 commit 98dd626
Showing 1 changed file with 112 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -43,6 +44,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.IndicesRequest.Replaceable;
Expand All @@ -61,15 +63,20 @@
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.fieldcaps.FieldCapabilitiesIndexRequest;
import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.get.MultiGetRequest.Item;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.main.MainRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.action.support.replication.ReplicationRequest;
import org.opensearch.action.support.single.shard.SingleShardRequest;
import org.opensearch.action.termvectors.MultiTermVectorsRequest;
import org.opensearch.action.termvectors.TermVectorsRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexAbstraction;
Expand Down Expand Up @@ -169,9 +176,9 @@ private AlreadyResolvedKey(final IndicesOptions indicesOptions, final boolean en
}

private AlreadyResolvedKey(
final IndicesOptions indicesOptions,
final boolean enableCrossClusterResolution,
final String[] original
final IndicesOptions indicesOptions,
final boolean enableCrossClusterResolution,
final String[] original
) {
this.indicesOptions = indicesOptions;
this.enableCrossClusterResolution = enableCrossClusterResolution;
Expand All @@ -184,8 +191,8 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
AlreadyResolvedKey that = (AlreadyResolvedKey) o;
return enableCrossClusterResolution == that.enableCrossClusterResolution
&& Objects.equals(indicesOptions, that.indicesOptions)
&& Arrays.equals(original, that.original);
&& Objects.equals(indicesOptions, that.indicesOptions)
&& Arrays.equals(original, that.original);
}

@Override
Expand All @@ -206,10 +213,10 @@ public int hashCode() {
}

private void resolveIndexPatterns(
final String name,
final IndicesOptions indicesOptions,
final boolean enableCrossClusterResolution,
final String[] original
final String name,
final IndicesOptions indicesOptions,
final boolean enableCrossClusterResolution,
final String[] original
) {
final boolean isTraceEnabled = log.isTraceEnabled();
if (isTraceEnabled) {
Expand All @@ -232,11 +239,11 @@ private void resolveIndexPatterns(
if (remoteClusterService.isCrossClusterSearchEnabled() && enableCrossClusterResolution) {
remoteIndices = new HashSet<>();
final Map<String, OriginalIndices> remoteClusterIndices = OpenSearchSecurityPlugin.GuiceHolder.getRemoteClusterService()
.groupIndices(indicesOptions, original, idx -> resolver.hasIndexAbstraction(idx, clusterService.state()));
.groupIndices(indicesOptions, original, idx -> resolver.hasIndexAbstraction(idx, clusterService.state()));
final Set<String> remoteClusters = remoteClusterIndices.keySet()
.stream()
.filter(k -> !RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY.equals(k))
.collect(Collectors.toSet());
.stream()
.filter(k -> !RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY.equals(k))
.collect(Collectors.toSet());
for (String remoteCluster : remoteClusters) {
for (String remoteIndex : remoteClusterIndices.get(remoteCluster).indices()) {
remoteIndices.add(RemoteClusterService.buildRemoteIndexName(remoteCluster, remoteIndex));
Expand All @@ -254,10 +261,10 @@ private void resolveIndexPatterns(

if (isTraceEnabled) {
log.trace(
"CCS is enabled, we found this local patterns "
+ localRequestedPatterns
+ " and this remote patterns: "
+ remoteIndices
"CCS is enabled, we found this local patterns "
+ localRequestedPatterns
+ " and this remote patterns: "
+ remoteIndices
);
}

Expand Down Expand Up @@ -287,31 +294,31 @@ private void resolveIndexPatterns(
else {
final ClusterState state = clusterService.state();
final Set<String> dateResolvedLocalRequestedPatterns = localRequestedPatterns.stream()
.map(resolver::resolveDateMathExpression)
.collect(Collectors.toSet());
.map(resolver::resolveDateMathExpression)
.collect(Collectors.toSet());
final WildcardMatcher dateResolvedMatcher = WildcardMatcher.from(dateResolvedLocalRequestedPatterns);
// fill matchingAliases
final Map<String, IndexAbstraction> lookup = state.metadata().getIndicesLookup();
matchingAliases = lookup.entrySet()
.stream()
.filter(e -> e.getValue().getType() == ALIAS)
.map(Map.Entry::getKey)
.filter(dateResolvedMatcher)
.collect(Collectors.toSet());
.stream()
.filter(e -> e.getValue().getType() == ALIAS)
.map(Map.Entry::getKey)
.filter(dateResolvedMatcher)
.collect(Collectors.toSet());

final boolean isDebugEnabled = log.isDebugEnabled();
try {
matchingAllIndices = Arrays.asList(
resolver.concreteIndexNames(state, indicesOptions, localRequestedPatterns.toArray(new String[0]))
resolver.concreteIndexNames(state, indicesOptions, localRequestedPatterns.toArray(new String[0]))
);
matchingDataStreams = resolver.dataStreamNames(state, indicesOptions, localRequestedPatterns.toArray(new String[0]));

if (isDebugEnabled) {
log.debug(
"Resolved pattern {} to indices: {} and data-streams: {}",
localRequestedPatterns,
matchingAllIndices,
matchingDataStreams
"Resolved pattern {} to indices: {} and data-streams: {}",
localRequestedPatterns,
matchingAllIndices,
matchingDataStreams
);
}
} catch (IndexNotFoundException e1) {
Expand All @@ -329,15 +336,15 @@ private void resolveIndexPatterns(

if (isTraceEnabled) {
log.trace(
"Resolved patterns {} for {} ({}) to [aliases {}, allIndices {}, dataStreams {}, originalRequested{}, remote indices {}]",
original,
name,
this.name,
matchingAliases,
matchingAllIndices,
matchingDataStreams,
Arrays.toString(original),
remoteIndices
"Resolved patterns {} for {} ({}) to [aliases {}, allIndices {}, dataStreams {}, originalRequested{}, remote indices {}]",
original,
name,
this.name,
matchingAliases,
matchingAllIndices,
matchingDataStreams,
Arrays.toString(original),
remoteIndices
);
}

Expand All @@ -351,11 +358,11 @@ private void resolveToLocalAll() {
}

private void resolveTo(
Iterable<String> matchingAliases,
Iterable<String> matchingAllIndices,
Iterable<String> matchingDataStreams,
String[] original,
Iterable<String> remoteIndices
Iterable<String> matchingAliases,
Iterable<String> matchingAllIndices,
Iterable<String> matchingDataStreams,
String[] original,
Iterable<String> remoteIndices
) {
aliases.addAll(matchingAliases);
allIndices.addAll(matchingAllIndices);
Expand All @@ -368,8 +375,8 @@ private void resolveTo(
public String[] provide(String[] original, Object localRequest, boolean supportsReplace) {
final IndicesOptions indicesOptions = indicesOptionsFrom(localRequest);
final boolean enableCrossClusterResolution = localRequest instanceof FieldCapabilitiesRequest
|| localRequest instanceof SearchRequest
|| localRequest instanceof ResolveIndexAction.Request;
|| localRequest instanceof SearchRequest
|| localRequest instanceof ResolveIndexAction.Request;
// skip the whole thing if we have seen this exact resolveIndexPatterns request
final AlreadyResolvedKey alreadyResolvedKey;
if (original != null) {
Expand All @@ -385,8 +392,8 @@ public String[] provide(String[] original, Object localRequest, boolean supports

Resolved resolved(IndicesOptions indicesOptions) {
final Resolved resolved = alreadyResolved.isEmpty()
? Resolved._LOCAL_ALL
: new Resolved(aliases.build(), allIndices.build(), originalRequested.build(), remoteIndices.build(), indicesOptions);
? Resolved._LOCAL_ALL
: new Resolved(aliases.build(), allIndices.build(), originalRequested.build(), remoteIndices.build(), indicesOptions);

if (log.isTraceEnabled()) {
log.trace("Finally resolved for {}: {}", name, resolved);
Expand All @@ -406,7 +413,7 @@ public String[] provide(String[] original, Object request, boolean supportsRepla
if (retainMode && !isAllWithNoRemote(original)) {
final Resolved resolved = resolveRequest(request);
final List<String> retained = WildcardMatcher.from(resolved.getAllIndices())
.getMatchAny(replacements, Collectors.toList());
.getMatchAny(replacements, Collectors.toList());
retained.addAll(resolved.getRemoteIndices());
return retained.toArray(new String[0]);
}
Expand Down Expand Up @@ -435,11 +442,11 @@ public final static class Resolved {
private static final ImmutableSet<String> All_SET = ImmutableSet.of(ANY);
private static final Set<String> types = All_SET;
public static final Resolved _LOCAL_ALL = new Resolved(
All_SET,
All_SET,
All_SET,
ImmutableSet.of(),
SearchRequest.DEFAULT_INDICES_OPTIONS
All_SET,
All_SET,
All_SET,
ImmutableSet.of(),
SearchRequest.DEFAULT_INDICES_OPTIONS
);

private final Set<String> aliases;
Expand All @@ -450,18 +457,18 @@ public final static class Resolved {
private final IndicesOptions indicesOptions;

public Resolved(
final ImmutableSet<String> aliases,
final ImmutableSet<String> allIndices,
final ImmutableSet<String> originalRequested,
final ImmutableSet<String> remoteIndices,
IndicesOptions indicesOptions
final ImmutableSet<String> aliases,
final ImmutableSet<String> allIndices,
final ImmutableSet<String> originalRequested,
final ImmutableSet<String> remoteIndices,
IndicesOptions indicesOptions
) {
this.aliases = aliases;
this.allIndices = allIndices;
this.originalRequested = originalRequested;
this.remoteIndices = remoteIndices;
this.isLocalAll = IndexResolverReplacer.isLocalAll(originalRequested.toArray(new String[0]))
|| (aliases.contains("*") && allIndices.contains("*"));
|| (aliases.contains("*") && allIndices.contains("*"));
this.indicesOptions = indicesOptions;
}

Expand Down Expand Up @@ -500,16 +507,16 @@ public Set<String> getRemoteIndices() {
@Override
public String toString() {
return "Resolved [aliases="
+ aliases
+ ", allIndices="
+ allIndices
+ ", types="
+ types
+ ", originalRequested="
+ originalRequested
+ ", remoteIndices="
+ remoteIndices
+ "]";
+ aliases
+ ", allIndices="
+ allIndices
+ ", types="
+ types
+ ", originalRequested="
+ originalRequested
+ ", remoteIndices="
+ remoteIndices
+ "]";
}

@Override
Expand Down Expand Up @@ -628,6 +635,32 @@ private boolean getOrReplaceAllIndices(final Object request, final IndicesProvid
result = getOrReplaceAllIndices(ar, provider, false) && result;
}

} else if (request instanceof MultiGetRequest) {

for (ListIterator<Item> it = ((MultiGetRequest) request).getItems().listIterator(); it.hasNext();) {
Item item = it.next();
result = getOrReplaceAllIndices(item, provider, false) && result;
/*if(item.index() == null || item.indices() == null || item.indices().length == 0) {
it.remove();
}*/
}

} else if (request instanceof MultiSearchRequest) {

for (ListIterator<SearchRequest> it = ((MultiSearchRequest) request).requests().listIterator(); it.hasNext();) {
SearchRequest ar = it.next();
result = getOrReplaceAllIndices(ar, provider, false) && result;
/*if(ar.indices() == null || ar.indices().length == 0) {
it.remove();
}*/
}

} else if (request instanceof MultiTermVectorsRequest) {

for (ActionRequest ar : (Iterable<TermVectorsRequest>) () -> ((MultiTermVectorsRequest) request).iterator()) {
result = getOrReplaceAllIndices(ar, provider, false) && result;
}

} else if (request instanceof PutMappingRequest) {
PutMappingRequest pmr = (PutMappingRequest) request;
Index concreteIndex = pmr.getConcreteIndex();
Expand Down Expand Up @@ -657,14 +690,14 @@ private boolean getOrReplaceAllIndices(final Object request, final IndicesProvid

if (snapshotInfo == null) {
log.warn(
"snapshot repository '" + restoreRequest.repository() + "', snapshot '" + restoreRequest.snapshot() + "' not found"
"snapshot repository '" + restoreRequest.repository() + "', snapshot '" + restoreRequest.snapshot() + "' not found"
);
provider.provide(new String[] { "*" }, request, false);
} else {
final List<String> requestedResolvedIndices = IndexUtils.filterIndices(
snapshotInfo.indices(),
restoreRequest.indices(),
restoreRequest.indicesOptions()
snapshotInfo.indices(),
restoreRequest.indices(),
restoreRequest.indicesOptions()
);
final List<String> renamedTargetIndices = renamedIndices(restoreRequest, requestedResolvedIndices);
// final Set<String> indices = new HashSet<>(requestedResolvedIndices);
Expand Down Expand Up @@ -733,6 +766,12 @@ private boolean getOrReplaceAllIndices(final Object request, final IndicesProvid
return false;
}
((ReplicationRequest) request).index(newIndices.length != 1 ? null : newIndices[0]);
} else if (request instanceof MultiGetRequest.Item) {
String[] newIndices = provider.provide(((MultiGetRequest.Item) request).indices(), request, true);
if (checkIndices(request, newIndices, true, allowEmptyIndices) == false) {
return false;
}
((MultiGetRequest.Item) request).index(newIndices.length != 1 ? null : newIndices[0]);
} else if (request instanceof CreateIndexRequest) {
String[] newIndices = provider.provide(((CreateIndexRequest) request).indices(), request, true);
if (checkIndices(request, newIndices, true, allowEmptyIndices) == false) {
Expand Down

0 comments on commit 98dd626

Please sign in to comment.