Skip to content

Commit

Permalink
Create SystemIndexRegistry
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Jul 3, 2024
1 parent 884fa4c commit a75e167
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.indices.SystemIndices;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.persistent.PersistentTasksCustomMetadata;
import org.opensearch.persistent.PersistentTasksNodeService;
Expand Down Expand Up @@ -147,15 +146,14 @@ public ClusterModule(
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
ThreadContext threadContext,
ClusterManagerMetrics clusterManagerMetrics,
SystemIndices systemIndices
ClusterManagerMetrics clusterManagerMetrics
) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext, systemIndices);
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext);
this.allocationService = new AllocationService(
allocationDeciders,
shardsAllocator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.logging.DeprecationLogger;
Expand All @@ -54,7 +53,6 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.indices.IndexClosedException;
import org.opensearch.indices.InvalidIndexNameException;
import org.opensearch.indices.SystemIndices;

import java.time.Instant;
import java.time.ZoneId;
Expand Down Expand Up @@ -93,18 +91,9 @@ public class IndexNameExpressionResolver {
private final List<ExpressionResolver> expressionResolvers = List.of(dateMathExpressionResolver, wildcardExpressionResolver);

private final ThreadContext threadContext;
private final String[] systemIndices;

public IndexNameExpressionResolver(ThreadContext threadContext) {
this.threadContext = Objects.requireNonNull(threadContext, "Thread Context must not be null");
this.systemIndices = new String[0];
}

@InternalApi
public IndexNameExpressionResolver(ThreadContext threadContext, SystemIndices systemIndices) {
this.threadContext = Objects.requireNonNull(threadContext, "Thread Context must not be null");
List<String> allSystemIndexPatterns = systemIndices.getAllSystemIndexPatterns();
this.systemIndices = allSystemIndexPatterns.toArray(new String[0]);
}

/**
Expand Down Expand Up @@ -175,10 +164,6 @@ public String[] concreteIndexNames(ClusterState state, IndicesOptions options, I
return concreteIndexNames(context, request.indices());
}

public List<String> matchesSystemIndexPattern(String... indexExpressions) {
return Arrays.stream(indexExpressions).filter(pattern -> Regex.simpleMatch(systemIndices, pattern)).collect(Collectors.toList());
}

public List<String> dataStreamNames(ClusterState state, IndicesOptions options, String... indexExpressions) {
// Allow system index access - they'll be filtered out below as there's no such thing (yet) as system data streams
Context context = new Context(state, options, false, false, true, true, true);
Expand Down
131 changes: 131 additions & 0 deletions server/src/main/java/org/opensearch/indices/SystemIndexRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices;

import org.apache.lucene.util.automaton.Automaton;
import org.apache.lucene.util.automaton.Operations;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.regex.Regex;
import org.opensearch.tasks.TaskResultsService;

import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.tasks.TaskResultsService.TASK_INDEX;

class SystemIndexRegistry {
private static SystemIndexRegistry INSTANCE = null;
private static final SystemIndexDescriptor TASK_INDEX_DESCRIPTOR = new SystemIndexDescriptor(TASK_INDEX + "*", "Task Result Index");
static final Map<String, Collection<SystemIndexDescriptor>> SERVER_SYSTEM_INDEX_DESCRIPTORS = singletonMap(
TaskResultsService.class.getName(),
singletonList(TASK_INDEX_DESCRIPTOR)
);
private static String[] SYSTEM_INDEX_PATTERNS = new String[0];
static Collection<SystemIndexDescriptor> SYSTEM_INDEX_DESCRIPTORS;

private SystemIndexRegistry(Map<String, Collection<SystemIndexDescriptor>> pluginAndModulesDescriptors) {
final Map<String, Collection<SystemIndexDescriptor>> descriptorsMap = buildSystemIndexDescriptorMap(pluginAndModulesDescriptors);
checkForOverlappingPatterns(descriptorsMap);
List<SystemIndexDescriptor> descriptors = pluginAndModulesDescriptors.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
descriptors.add(TASK_INDEX_DESCRIPTOR);
SYSTEM_INDEX_DESCRIPTORS = descriptors.stream().collect(Collectors.toUnmodifiableList());
SYSTEM_INDEX_PATTERNS = descriptors.stream().map(SystemIndexDescriptor::getIndexPattern).toArray(String[]::new);
}

public static synchronized SystemIndexRegistry initialize(Map<String, Collection<SystemIndexDescriptor>> pluginAndModulesDescriptors) {
if (INSTANCE == null) {
INSTANCE = new SystemIndexRegistry(pluginAndModulesDescriptors);
}
return INSTANCE;
}

public static List<String> matchesSystemIndexPattern(String... indexExpressions) {
return Arrays.stream(indexExpressions)
.filter(pattern -> Regex.simpleMatch(SYSTEM_INDEX_PATTERNS, pattern))
.collect(Collectors.toList());
}

/**
* Given a collection of {@link SystemIndexDescriptor}s and their sources, checks to see if the index patterns of the listed
* descriptors overlap with any of the other patterns. If any do, throws an exception.
*
* @param sourceToDescriptors A map of source (plugin) names to the SystemIndexDescriptors they provide.
* @throws IllegalStateException Thrown if any of the index patterns overlaps with another.
*/
static void checkForOverlappingPatterns(Map<String, Collection<SystemIndexDescriptor>> sourceToDescriptors) {
List<Tuple<String, SystemIndexDescriptor>> sourceDescriptorPair = sourceToDescriptors.entrySet()
.stream()
.flatMap(entry -> entry.getValue().stream().map(descriptor -> new Tuple<>(entry.getKey(), descriptor)))
.sorted(Comparator.comparing(d -> d.v1() + ":" + d.v2().getIndexPattern())) // Consistent ordering -> consistent error message
.collect(Collectors.toList());

// This is O(n^2) with the number of system index descriptors, and each check is quadratic with the number of states in the
// automaton, but the absolute number of system index descriptors should be quite small (~10s at most), and the number of states
// per pattern should be low as well. If these assumptions change, this might need to be reworked.
sourceDescriptorPair.forEach(descriptorToCheck -> {
List<Tuple<String, SystemIndexDescriptor>> descriptorsMatchingThisPattern = sourceDescriptorPair.stream()

.filter(d -> descriptorToCheck.v2() != d.v2()) // Exclude the pattern currently being checked
.filter(d -> overlaps(descriptorToCheck.v2(), d.v2()))
.collect(Collectors.toList());
if (descriptorsMatchingThisPattern.isEmpty() == false) {
throw new IllegalStateException(
"a system index descriptor ["
+ descriptorToCheck.v2()
+ "] from ["
+ descriptorToCheck.v1()
+ "] overlaps with other system index descriptors: ["
+ descriptorsMatchingThisPattern.stream()
.map(descriptor -> descriptor.v2() + " from [" + descriptor.v1() + "]")
.collect(Collectors.joining(", "))
);
}
});
}

private static boolean overlaps(SystemIndexDescriptor a1, SystemIndexDescriptor a2) {
Automaton a1Automaton = Regex.simpleMatchToAutomaton(a1.getIndexPattern());
Automaton a2Automaton = Regex.simpleMatchToAutomaton(a2.getIndexPattern());
return Operations.isEmpty(Operations.intersection(a1Automaton, a2Automaton)) == false;
}

private static Map<String, Collection<SystemIndexDescriptor>> buildSystemIndexDescriptorMap(
Map<String, Collection<SystemIndexDescriptor>> pluginAndModulesMap
) {
final Map<String, Collection<SystemIndexDescriptor>> map = new HashMap<>(
pluginAndModulesMap.size() + SERVER_SYSTEM_INDEX_DESCRIPTORS.size()
);
map.putAll(pluginAndModulesMap);
// put the server items last since we expect less of them
SERVER_SYSTEM_INDEX_DESCRIPTORS.forEach((source, descriptors) -> {
if (map.putIfAbsent(source, descriptors) != null) {
throw new IllegalArgumentException(
"plugin or module attempted to define the same source [" + source + "] as a built-in system index"
);
}
});
return unmodifiableMap(map);
}

// visible for testing
static void clear() {
INSTANCE = null;
}
}
97 changes: 7 additions & 90 deletions server/src/main/java/org/opensearch/indices/SystemIndices.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,50 +40,33 @@
import org.apache.lucene.util.automaton.MinimizationOperations;
import org.apache.lucene.util.automaton.Operations;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.regex.Regex;
import org.opensearch.core.index.Index;
import org.opensearch.tasks.TaskResultsService;

import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.tasks.TaskResultsService.TASK_INDEX;

/**
* This class holds the {@link SystemIndexDescriptor} objects that represent system indices the
* node knows about. Methods for determining if an index should be a system index are also provided
* to reduce the locations within the code that need to deal with {@link SystemIndexDescriptor}s.
*
* @opensearch.internal
* @opensearch.api
*/
@PublicApi(since = "2.16.0")
public class SystemIndices {
private static final Logger logger = LogManager.getLogger(SystemIndices.class);

private static final Map<String, Collection<SystemIndexDescriptor>> SERVER_SYSTEM_INDEX_DESCRIPTORS = singletonMap(
TaskResultsService.class.getName(),
singletonList(new SystemIndexDescriptor(TASK_INDEX + "*", "Task Result Index"))
);

private final CharacterRunAutomaton runAutomaton;
private final Collection<SystemIndexDescriptor> systemIndexDescriptors;

public SystemIndices(Map<String, Collection<SystemIndexDescriptor>> pluginAndModulesDescriptors) {
final Map<String, Collection<SystemIndexDescriptor>> descriptorsMap = buildSystemIndexDescriptorMap(pluginAndModulesDescriptors);
checkForOverlappingPatterns(descriptorsMap);
this.systemIndexDescriptors = unmodifiableList(
descriptorsMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList())
);
this.runAutomaton = buildCharacterRunAutomaton(systemIndexDescriptors);
SystemIndexRegistry.initialize(pluginAndModulesDescriptors);
SystemIndexRegistry.initialize(pluginAndModulesDescriptors);
this.runAutomaton = buildCharacterRunAutomaton(SystemIndexRegistry.SYSTEM_INDEX_DESCRIPTORS);
}

/**
Expand All @@ -104,18 +87,14 @@ public boolean isSystemIndex(String indexName) {
return runAutomaton.run(indexName);
}

public List<String> getAllSystemIndexPatterns() {
return systemIndexDescriptors.stream().map(SystemIndexDescriptor::getIndexPattern).collect(Collectors.toList());
}

/**
* Finds a single matching {@link SystemIndexDescriptor}, if any, for the given index name.
* @param name the name of the index
* @return The matching {@link SystemIndexDescriptor} or {@code null} if no descriptor is found
* @throws IllegalStateException if multiple descriptors match the name
*/
public @Nullable SystemIndexDescriptor findMatchingDescriptor(String name) {
final List<SystemIndexDescriptor> matchingDescriptors = systemIndexDescriptors.stream()
final List<SystemIndexDescriptor> matchingDescriptors = SystemIndexRegistry.SYSTEM_INDEX_DESCRIPTORS.stream()
.filter(descriptor -> descriptor.matchesIndexPattern(name))
.collect(Collectors.toList());

Expand Down Expand Up @@ -172,66 +151,4 @@ private static CharacterRunAutomaton buildCharacterRunAutomaton(Collection<Syste
.reduce(Operations::union);
return new CharacterRunAutomaton(MinimizationOperations.minimize(automaton.orElse(Automata.makeEmpty()), Integer.MAX_VALUE));
}

/**
* Given a collection of {@link SystemIndexDescriptor}s and their sources, checks to see if the index patterns of the listed
* descriptors overlap with any of the other patterns. If any do, throws an exception.
*
* @param sourceToDescriptors A map of source (plugin) names to the SystemIndexDescriptors they provide.
* @throws IllegalStateException Thrown if any of the index patterns overlaps with another.
*/
static void checkForOverlappingPatterns(Map<String, Collection<SystemIndexDescriptor>> sourceToDescriptors) {
List<Tuple<String, SystemIndexDescriptor>> sourceDescriptorPair = sourceToDescriptors.entrySet()
.stream()
.flatMap(entry -> entry.getValue().stream().map(descriptor -> new Tuple<>(entry.getKey(), descriptor)))
.sorted(Comparator.comparing(d -> d.v1() + ":" + d.v2().getIndexPattern())) // Consistent ordering -> consistent error message
.collect(Collectors.toList());

// This is O(n^2) with the number of system index descriptors, and each check is quadratic with the number of states in the
// automaton, but the absolute number of system index descriptors should be quite small (~10s at most), and the number of states
// per pattern should be low as well. If these assumptions change, this might need to be reworked.
sourceDescriptorPair.forEach(descriptorToCheck -> {
List<Tuple<String, SystemIndexDescriptor>> descriptorsMatchingThisPattern = sourceDescriptorPair.stream()

.filter(d -> descriptorToCheck.v2() != d.v2()) // Exclude the pattern currently being checked
.filter(d -> overlaps(descriptorToCheck.v2(), d.v2()))
.collect(Collectors.toList());
if (descriptorsMatchingThisPattern.isEmpty() == false) {
throw new IllegalStateException(
"a system index descriptor ["
+ descriptorToCheck.v2()
+ "] from ["
+ descriptorToCheck.v1()
+ "] overlaps with other system index descriptors: ["
+ descriptorsMatchingThisPattern.stream()
.map(descriptor -> descriptor.v2() + " from [" + descriptor.v1() + "]")
.collect(Collectors.joining(", "))
);
}
});
}

private static boolean overlaps(SystemIndexDescriptor a1, SystemIndexDescriptor a2) {
Automaton a1Automaton = Regex.simpleMatchToAutomaton(a1.getIndexPattern());
Automaton a2Automaton = Regex.simpleMatchToAutomaton(a2.getIndexPattern());
return Operations.isEmpty(Operations.intersection(a1Automaton, a2Automaton)) == false;
}

private static Map<String, Collection<SystemIndexDescriptor>> buildSystemIndexDescriptorMap(
Map<String, Collection<SystemIndexDescriptor>> pluginAndModulesMap
) {
final Map<String, Collection<SystemIndexDescriptor>> map = new HashMap<>(
pluginAndModulesMap.size() + SERVER_SYSTEM_INDEX_DESCRIPTORS.size()
);
map.putAll(pluginAndModulesMap);
// put the server items last since we expect less of them
SERVER_SYSTEM_INDEX_DESCRIPTORS.forEach((source, descriptors) -> {
if (map.putIfAbsent(source, descriptors) != null) {
throw new IllegalArgumentException(
"plugin or module attempted to define the same source [" + source + "] as a built-in system index"
);
}
});
return unmodifiableMap(map);
}
}
3 changes: 1 addition & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,7 @@ protected Node(
clusterInfoService,
snapshotsInfoService,
threadPool.getThreadContext(),
clusterManagerMetrics,
systemIndices
clusterManagerMetrics
);
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
Expand Down
Loading

0 comments on commit a75e167

Please sign in to comment.