Skip to content

Commit

Permalink
Create SystemIndexRegistry with helper method matchesSystemIndex (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#14415)

* Create new extension point in SystemIndexPlugin for a single plugin to get registered system indices

Signed-off-by: Craig Perkins <[email protected]>

* Add to CHANGELOG

Signed-off-by: Craig Perkins <[email protected]>

* WIP on system indices from IndexNameExpressionResolver

Signed-off-by: Craig Perkins <[email protected]>

* Add test in IndexNameExpressionResolverTests

Signed-off-by: Craig Perkins <[email protected]>

* Remove changes in SystemIndexPlugin

Signed-off-by: Craig Perkins <[email protected]>

* Add method in IndexNameExpressionResolver to get matching system indices

Signed-off-by: Craig Perkins <[email protected]>

* Show how resolver can be chained to get system indices

Signed-off-by: Craig Perkins <[email protected]>

* Fix forbiddenApis check

Signed-off-by: Craig Perkins <[email protected]>

* Update CHANGELOG

Signed-off-by: Craig Perkins <[email protected]>

* Make SystemIndices internal

Signed-off-by: Craig Perkins <[email protected]>

* Remove unneeded changes

Signed-off-by: Craig Perkins <[email protected]>

* Fix CI failures

Signed-off-by: Craig Perkins <[email protected]>

* Fix precommit errors

Signed-off-by: Craig Perkins <[email protected]>

* Use Regex instead of WildcardMatcher

Signed-off-by: Craig Perkins <[email protected]>

* Address code review feedback

Signed-off-by: Craig Perkins <[email protected]>

* Allow caller to pass index expressions

Signed-off-by: Craig Perkins <[email protected]>

* Create SystemIndexRegistry

Signed-off-by: Craig Perkins <[email protected]>

* Update CHANGELOG

Signed-off-by: Craig Perkins <[email protected]>

* Remove singleton limitation

Signed-off-by: Craig Perkins <[email protected]>

* Add javadoc

Signed-off-by: Craig Perkins <[email protected]>

* Add @experimentalapi annotation

Signed-off-by: Craig Perkins <[email protected]>

---------

Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks authored and kaushalmahi12 committed Jul 23, 2024
1 parent bd28d1a commit b7d6e79
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554))
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
- Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
- Create SystemIndexRegistry with helper method matchesSystemIndex ([#14415](https://github.com/opensearch-project/OpenSearch/pull/14415))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@
package org.opensearch.indices;

import org.apache.lucene.util.automaton.CharacterRunAutomaton;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.regex.Regex;

import java.util.Objects;

/**
* Describes a system index. Provides the information required to create and maintain the system index.
*
* @opensearch.internal
* @opensearch.api
*/
@PublicApi(since = "2.16.0")
public class SystemIndexDescriptor {
private final String indexPattern;
private final String description;
Expand Down
130 changes: 130 additions & 0 deletions server/src/main/java/org/opensearch/indices/SystemIndexRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices;

import org.apache.lucene.util.automaton.Automaton;
import org.apache.lucene.util.automaton.Operations;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.regex.Regex;
import org.opensearch.tasks.TaskResultsService;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.tasks.TaskResultsService.TASK_INDEX;

/**
* This class holds the {@link SystemIndexDescriptor} objects that represent system indices the
* node knows about. This class also contains static methods that identify if index expressions match
* registered system index patterns
*
* @opensearch.api
*/
@ExperimentalApi
public class SystemIndexRegistry {
private static final SystemIndexDescriptor TASK_INDEX_DESCRIPTOR = new SystemIndexDescriptor(TASK_INDEX + "*", "Task Result Index");
private static final Map<String, Collection<SystemIndexDescriptor>> SERVER_SYSTEM_INDEX_DESCRIPTORS = singletonMap(
TaskResultsService.class.getName(),
singletonList(TASK_INDEX_DESCRIPTOR)
);

private volatile static String[] SYSTEM_INDEX_PATTERNS = new String[0];
volatile static Collection<SystemIndexDescriptor> SYSTEM_INDEX_DESCRIPTORS = Collections.emptyList();

static void register(Map<String, Collection<SystemIndexDescriptor>> pluginAndModulesDescriptors) {
final Map<String, Collection<SystemIndexDescriptor>> descriptorsMap = buildSystemIndexDescriptorMap(pluginAndModulesDescriptors);
checkForOverlappingPatterns(descriptorsMap);
List<SystemIndexDescriptor> descriptors = pluginAndModulesDescriptors.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
descriptors.add(TASK_INDEX_DESCRIPTOR);

SYSTEM_INDEX_DESCRIPTORS = descriptors.stream().collect(Collectors.toUnmodifiableList());
SYSTEM_INDEX_PATTERNS = descriptors.stream().map(SystemIndexDescriptor::getIndexPattern).toArray(String[]::new);
}

public static List<String> matchesSystemIndexPattern(String... indexExpressions) {
return Arrays.stream(indexExpressions)
.filter(pattern -> Regex.simpleMatch(SYSTEM_INDEX_PATTERNS, pattern))
.collect(Collectors.toList());
}

/**
* Given a collection of {@link SystemIndexDescriptor}s and their sources, checks to see if the index patterns of the listed
* descriptors overlap with any of the other patterns. If any do, throws an exception.
*
* @param sourceToDescriptors A map of source (plugin) names to the SystemIndexDescriptors they provide.
* @throws IllegalStateException Thrown if any of the index patterns overlaps with another.
*/
static void checkForOverlappingPatterns(Map<String, Collection<SystemIndexDescriptor>> sourceToDescriptors) {
List<Tuple<String, SystemIndexDescriptor>> sourceDescriptorPair = sourceToDescriptors.entrySet()
.stream()
.flatMap(entry -> entry.getValue().stream().map(descriptor -> new Tuple<>(entry.getKey(), descriptor)))
.sorted(Comparator.comparing(d -> d.v1() + ":" + d.v2().getIndexPattern())) // Consistent ordering -> consistent error message
.collect(Collectors.toList());

// This is O(n^2) with the number of system index descriptors, and each check is quadratic with the number of states in the
// automaton, but the absolute number of system index descriptors should be quite small (~10s at most), and the number of states
// per pattern should be low as well. If these assumptions change, this might need to be reworked.
sourceDescriptorPair.forEach(descriptorToCheck -> {
List<Tuple<String, SystemIndexDescriptor>> descriptorsMatchingThisPattern = sourceDescriptorPair.stream()

.filter(d -> descriptorToCheck.v2() != d.v2()) // Exclude the pattern currently being checked
.filter(d -> overlaps(descriptorToCheck.v2(), d.v2()))
.collect(Collectors.toList());
if (descriptorsMatchingThisPattern.isEmpty() == false) {
throw new IllegalStateException(
"a system index descriptor ["
+ descriptorToCheck.v2()
+ "] from ["
+ descriptorToCheck.v1()
+ "] overlaps with other system index descriptors: ["
+ descriptorsMatchingThisPattern.stream()
.map(descriptor -> descriptor.v2() + " from [" + descriptor.v1() + "]")
.collect(Collectors.joining(", "))
);
}
});
}

private static boolean overlaps(SystemIndexDescriptor a1, SystemIndexDescriptor a2) {
Automaton a1Automaton = Regex.simpleMatchToAutomaton(a1.getIndexPattern());
Automaton a2Automaton = Regex.simpleMatchToAutomaton(a2.getIndexPattern());
return Operations.isEmpty(Operations.intersection(a1Automaton, a2Automaton)) == false;
}

private static Map<String, Collection<SystemIndexDescriptor>> buildSystemIndexDescriptorMap(
Map<String, Collection<SystemIndexDescriptor>> pluginAndModulesMap
) {
final Map<String, Collection<SystemIndexDescriptor>> map = new HashMap<>(
pluginAndModulesMap.size() + SERVER_SYSTEM_INDEX_DESCRIPTORS.size()
);
map.putAll(pluginAndModulesMap);
// put the server items last since we expect less of them
SERVER_SYSTEM_INDEX_DESCRIPTORS.forEach((source, descriptors) -> {
if (map.putIfAbsent(source, descriptors) != null) {
throw new IllegalArgumentException(
"plugin or module attempted to define the same source [" + source + "] as a built-in system index"
);
}
});
return unmodifiableMap(map);
}
}
88 changes: 3 additions & 85 deletions server/src/main/java/org/opensearch/indices/SystemIndices.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,15 @@
import org.apache.lucene.util.automaton.MinimizationOperations;
import org.apache.lucene.util.automaton.Operations;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.regex.Regex;
import org.opensearch.core.index.Index;
import org.opensearch.tasks.TaskResultsService;

import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.tasks.TaskResultsService.TASK_INDEX;

/**
* This class holds the {@link SystemIndexDescriptor} objects that represent system indices the
* node knows about. Methods for determining if an index should be a system index are also provided
Expand All @@ -69,21 +59,11 @@
public class SystemIndices {
private static final Logger logger = LogManager.getLogger(SystemIndices.class);

private static final Map<String, Collection<SystemIndexDescriptor>> SERVER_SYSTEM_INDEX_DESCRIPTORS = singletonMap(
TaskResultsService.class.getName(),
singletonList(new SystemIndexDescriptor(TASK_INDEX + "*", "Task Result Index"))
);

private final CharacterRunAutomaton runAutomaton;
private final Collection<SystemIndexDescriptor> systemIndexDescriptors;

public SystemIndices(Map<String, Collection<SystemIndexDescriptor>> pluginAndModulesDescriptors) {
final Map<String, Collection<SystemIndexDescriptor>> descriptorsMap = buildSystemIndexDescriptorMap(pluginAndModulesDescriptors);
checkForOverlappingPatterns(descriptorsMap);
this.systemIndexDescriptors = unmodifiableList(
descriptorsMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList())
);
this.runAutomaton = buildCharacterRunAutomaton(systemIndexDescriptors);
SystemIndexRegistry.register(pluginAndModulesDescriptors);
this.runAutomaton = buildCharacterRunAutomaton(SystemIndexRegistry.SYSTEM_INDEX_DESCRIPTORS);
}

/**
Expand Down Expand Up @@ -111,7 +91,7 @@ public boolean isSystemIndex(String indexName) {
* @throws IllegalStateException if multiple descriptors match the name
*/
public @Nullable SystemIndexDescriptor findMatchingDescriptor(String name) {
final List<SystemIndexDescriptor> matchingDescriptors = systemIndexDescriptors.stream()
final List<SystemIndexDescriptor> matchingDescriptors = SystemIndexRegistry.SYSTEM_INDEX_DESCRIPTORS.stream()
.filter(descriptor -> descriptor.matchesIndexPattern(name))
.collect(Collectors.toList());

Expand Down Expand Up @@ -168,66 +148,4 @@ private static CharacterRunAutomaton buildCharacterRunAutomaton(Collection<Syste
.reduce(Operations::union);
return new CharacterRunAutomaton(MinimizationOperations.minimize(automaton.orElse(Automata.makeEmpty()), Integer.MAX_VALUE));
}

/**
* Given a collection of {@link SystemIndexDescriptor}s and their sources, checks to see if the index patterns of the listed
* descriptors overlap with any of the other patterns. If any do, throws an exception.
*
* @param sourceToDescriptors A map of source (plugin) names to the SystemIndexDescriptors they provide.
* @throws IllegalStateException Thrown if any of the index patterns overlaps with another.
*/
static void checkForOverlappingPatterns(Map<String, Collection<SystemIndexDescriptor>> sourceToDescriptors) {
List<Tuple<String, SystemIndexDescriptor>> sourceDescriptorPair = sourceToDescriptors.entrySet()
.stream()
.flatMap(entry -> entry.getValue().stream().map(descriptor -> new Tuple<>(entry.getKey(), descriptor)))
.sorted(Comparator.comparing(d -> d.v1() + ":" + d.v2().getIndexPattern())) // Consistent ordering -> consistent error message
.collect(Collectors.toList());

// This is O(n^2) with the number of system index descriptors, and each check is quadratic with the number of states in the
// automaton, but the absolute number of system index descriptors should be quite small (~10s at most), and the number of states
// per pattern should be low as well. If these assumptions change, this might need to be reworked.
sourceDescriptorPair.forEach(descriptorToCheck -> {
List<Tuple<String, SystemIndexDescriptor>> descriptorsMatchingThisPattern = sourceDescriptorPair.stream()

.filter(d -> descriptorToCheck.v2() != d.v2()) // Exclude the pattern currently being checked
.filter(d -> overlaps(descriptorToCheck.v2(), d.v2()))
.collect(Collectors.toList());
if (descriptorsMatchingThisPattern.isEmpty() == false) {
throw new IllegalStateException(
"a system index descriptor ["
+ descriptorToCheck.v2()
+ "] from ["
+ descriptorToCheck.v1()
+ "] overlaps with other system index descriptors: ["
+ descriptorsMatchingThisPattern.stream()
.map(descriptor -> descriptor.v2() + " from [" + descriptor.v1() + "]")
.collect(Collectors.joining(", "))
);
}
});
}

private static boolean overlaps(SystemIndexDescriptor a1, SystemIndexDescriptor a2) {
Automaton a1Automaton = Regex.simpleMatchToAutomaton(a1.getIndexPattern());
Automaton a2Automaton = Regex.simpleMatchToAutomaton(a2.getIndexPattern());
return Operations.isEmpty(Operations.intersection(a1Automaton, a2Automaton)) == false;
}

private static Map<String, Collection<SystemIndexDescriptor>> buildSystemIndexDescriptorMap(
Map<String, Collection<SystemIndexDescriptor>> pluginAndModulesMap
) {
final Map<String, Collection<SystemIndexDescriptor>> map = new HashMap<>(
pluginAndModulesMap.size() + SERVER_SYSTEM_INDEX_DESCRIPTORS.size()
);
map.putAll(pluginAndModulesMap);
// put the server items last since we expect less of them
SERVER_SYSTEM_INDEX_DESCRIPTORS.forEach((source, descriptors) -> {
if (map.putIfAbsent(source, descriptors) != null) {
throw new IllegalArgumentException(
"plugin or module attempted to define the same source [" + source + "] as a built-in system index"
);
}
});
return unmodifiableMap(map);
}
}
17 changes: 8 additions & 9 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,14 @@ protected Node(
repositoriesServiceReference::get,
rerouteServiceReference::get
);
final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = Collections.unmodifiableMap(
pluginsService.filterPlugins(SystemIndexPlugin.class)
.stream()
.collect(
Collectors.toMap(plugin -> plugin.getClass().getSimpleName(), plugin -> plugin.getSystemIndexDescriptors(settings))
)
);
final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap);
final ClusterModule clusterModule = new ClusterModule(
settings,
clusterService,
Expand Down Expand Up @@ -819,15 +827,6 @@ protected Node(
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = Collections.unmodifiableMap(
pluginsService.filterPlugins(SystemIndexPlugin.class)
.stream()
.collect(
Collectors.toMap(plugin -> plugin.getClass().getSimpleName(), plugin -> plugin.getSystemIndexDescriptors(settings))
)
);
final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap);

final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);
Expand Down
Loading

0 comments on commit b7d6e79

Please sign in to comment.