Skip to content

Commit

Permalink
Merge branch 'main' into lib-toml
Browse files Browse the repository at this point in the history
  • Loading branch information
cwperks committed Oct 18, 2024
2 parents eafb25e + 9096aee commit 4ad21d2
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 74 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix multi-search with template doesn't return status code ([#16265](https://github.com/opensearch-project/OpenSearch/pull/16265))
- [Streaming Indexing] Fix intermittent 'The bulk request must be terminated by a newline [\n]' failures [#16337](https://github.com/opensearch-project/OpenSearch/pull/16337))
- Fix wrong default value when setting `index.number_of_routing_shards` to null on index creation ([#16331](https://github.com/opensearch-project/OpenSearch/pull/16331))
- Fix disk usage exceeds threshold cluster can't spin up issue ([#15258](https://github.com/opensearch-project/OpenSearch/pull/15258)))
- [Workload Management] Make query groups persistent across process restarts [#16370](https://github.com/opensearch-project/OpenSearch/pull/16370)

- Fix inefficient Stream API call chains ending with count() ([#15386](https://github.com/opensearch-project/OpenSearch/pull/15386))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@

package org.opensearch.bootstrap;

import org.opensearch.common.logging.LogConfigurator;
import org.opensearch.common.settings.KeyStoreCommandTestCase;
import org.opensearch.common.settings.KeyStoreWrapper;
import org.opensearch.common.settings.SecureSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.settings.SecureString;
import org.opensearch.env.Environment;
import org.opensearch.node.Node;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.After;
import org.junit.Before;
Expand All @@ -53,14 +51,8 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class BootstrapTests extends OpenSearchTestCase {
Environment env;
Expand Down Expand Up @@ -139,38 +131,4 @@ private void assertPassphraseRead(String source, String expected) {
}
}

public void testInitExecutionOrder() throws Exception {
AtomicInteger order = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(1);
Thread mockThread = new Thread(() -> {
assertEquals(0, order.getAndIncrement());
countDownLatch.countDown();
});

Node mockNode = mock(Node.class);
doAnswer(invocation -> {
try {
boolean threadStarted = countDownLatch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(
"Waited for one second but the keepAliveThread isn't started, please check the execution order of"
+ "keepAliveThread.start and node.start",
threadStarted
);
} catch (InterruptedException e) {
fail("Thread interrupted");
}
assertEquals(1, order.getAndIncrement());
return null;
}).when(mockNode).start();

LogConfigurator.registerErrorListener();
Bootstrap testBootstrap = new Bootstrap(mockThread, mockNode);
Bootstrap.setInstance(testBootstrap);

Bootstrap.startInstance(testBootstrap);

verify(mockNode).start();
assertEquals(2, order.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static <R extends Reader<?>> void registerReader(final byte ordinal, fina

public static void registerClassAlias(final Class<?> classInstance, final Class<?> classGeneric) {
if (WRITER_CUSTOM_CLASS_MAP.putIfAbsent(classInstance, classGeneric) != null) {
throw new IllegalArgumentException("Streamable custom class already registered [" + classInstance.getClass() + "]");
throw new IllegalArgumentException("Streamable custom class already registered [" + classInstance.getName() + "]");
}
}

Expand All @@ -96,7 +96,7 @@ public static <W extends Writer<?>> W getWriter(final Class<?> clazz) {
}

/**
* Returns the ristered reader keyed by the unique ordinal
* Returns the registered reader keyed by the unique ordinal
*/
@SuppressWarnings("unchecked")
public static <R extends Reader<?>> R getReader(final byte b) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
public final class MediaTypeRegistry {
private static Map<String, MediaType> formatToMediaType = Map.of();
private static Map<String, MediaType> typeWithSubtypeToMediaType = Map.of();
private static Map<String, MediaType> knownStringsToMediaType = Map.of();

// Default mediaType singleton
private static MediaType DEFAULT_MEDIA_TYPE;
Expand Down Expand Up @@ -84,6 +85,8 @@ private static void register(MediaType[] acceptedMediaTypes, Map<String, MediaTy
// ensures the map is not overwritten:
Map<String, MediaType> typeMap = new HashMap<>(typeWithSubtypeToMediaType);
Map<String, MediaType> formatMap = new HashMap<>(formatToMediaType);
Map<String, MediaType> knownStringMap = new HashMap<>(knownStringsToMediaType);

for (MediaType mediaType : acceptedMediaTypes) {
if (formatMap.containsKey(mediaType.format())) {
throw new IllegalArgumentException("unable to register mediaType: [" + mediaType.format() + "]. Type already exists.");
Expand All @@ -107,13 +110,24 @@ private static void register(MediaType[] acceptedMediaTypes, Map<String, MediaTy
MediaType mediaType = entry.getValue();
typeMap.put(typeWithSubtype, mediaType);
formatMap.putIfAbsent(mediaType.format(), mediaType); // ignore if the additional type mapping already exists
knownStringMap.put(mediaType.mediaType(), mediaType);
knownStringMap.put(mediaType.mediaTypeWithoutParameters(), mediaType);
}

formatToMediaType = Map.copyOf(formatMap);
typeWithSubtypeToMediaType = Map.copyOf(typeMap);
knownStringsToMediaType = Map.copyOf(knownStringMap);
}

public static MediaType fromMediaType(String mediaType) {
if (mediaType == null) {
return null;
}
// Skip parsing if the string is an exact match for any known string value
final MediaType knownMediaType = knownStringsToMediaType.get(mediaType);
if (knownMediaType != null) {
return knownMediaType;
}
ParsedMediaType parsedMediaType = parseMediaType(mediaType);
return parsedMediaType != null ? parsedMediaType.getMediaType() : null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.common.io.stream;

import org.opensearch.test.OpenSearchTestCase;
import org.junit.Assert;

import java.util.concurrent.atomic.AtomicInteger;

public class WriteableTests extends OpenSearchTestCase {

public void testRegisterClassAlias() {
Writeable.WriteableRegistry.registerClassAlias(StringBuilder.class, AtomicInteger.class);
try {
Writeable.WriteableRegistry.registerClassAlias(StringBuilder.class, AtomicInteger.class);
Assert.fail("expected exception not thrown");
} catch (IllegalArgumentException illegalArgumentException) {
Assert.assertEquals(
"Streamable custom class already registered [java.lang.StringBuilder]",
illegalArgumentException.getMessage()
);
}
}
}
21 changes: 2 additions & 19 deletions server/src/main/java/org/opensearch/bootstrap/Bootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,6 @@ final class Bootstrap {
private final Thread keepAliveThread;
private final Spawner spawner = new Spawner();

// For testing purpose
static void setInstance(Bootstrap bootstrap) {
INSTANCE = bootstrap;
}

// For testing purpose
Bootstrap(Thread keepAliveThread, Node node) {
this.keepAliveThread = keepAliveThread;
this.node = node;
}

/** creates a new instance */
Bootstrap() {
keepAliveThread = new Thread(new Runnable() {
Expand Down Expand Up @@ -347,10 +336,8 @@ private static Environment createEnvironment(
}

private void start() throws NodeValidationException {
// keepAliveThread should start first than node to ensure the cluster can spin up successfully in edge cases:
// https://github.com/opensearch-project/OpenSearch/issues/14791
keepAliveThread.start();
node.start();
keepAliveThread.start();
}

static void stop() throws IOException {
Expand Down Expand Up @@ -423,7 +410,7 @@ static void init(final boolean foreground, final Path pidFile, final boolean qui
throw new BootstrapException(e);
}

startInstance(INSTANCE);
INSTANCE.start();

// We don't close stderr if `--quiet` is passed, because that
// hides fatal startup errors. For example, if OpenSearch is
Expand Down Expand Up @@ -475,10 +462,6 @@ static void init(final boolean foreground, final Path pidFile, final boolean qui
}
}

static void startInstance(Bootstrap instance) throws NodeValidationException {
instance.start();
}

@SuppressForbidden(reason = "System#out")
private static void closeSystOut() {
System.out.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,13 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
DecommissionAttributeMetadata::fromXContent
)
);
entries.add(
new NamedXContentRegistry.Entry(
Metadata.Custom.class,
new ParseField(QueryGroupMetadata.TYPE),
QueryGroupMetadata::fromXContent
)
);
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -729,7 +730,7 @@ public static void validateRepositoryMetadataSettings(
+ " in the name as this delimiter is used to create pinning entity"
);
}
if (repositoryWithShallowV2Exists(repositories)) {
if (repositoryWithShallowV2Exists(repositories, repositoryName)) {
throw new RepositoryException(
repositoryName,
"setting "
Expand Down Expand Up @@ -763,8 +764,13 @@ public static void validateRepositoryMetadataSettings(
}
}

private static boolean repositoryWithShallowV2Exists(Map<String, Repository> repositories) {
return repositories.values().stream().anyMatch(repo -> SHALLOW_SNAPSHOT_V2.get(repo.getMetadata().settings()));
private static boolean repositoryWithShallowV2Exists(Map<String, Repository> repositories, String repositoryName) {
return repositories.values()
.stream()
.anyMatch(
repository -> SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings())
&& !Objects.equals(repository.getMetadata().name(), repositoryName)
);
}

private static boolean pinnedTimestampExistsWithDifferentRepository(
Expand Down
15 changes: 8 additions & 7 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -721,14 +721,15 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
}
} catch (Exception e) {
// execution exception can happen while loading the cache, strip it
if (e instanceof ExecutionException) {
e = (e.getCause() == null || e.getCause() instanceof Exception)
? (Exception) e.getCause()
: new OpenSearchException(e.getCause());
Exception exception = e;
if (exception instanceof ExecutionException) {
exception = (exception.getCause() == null || exception.getCause() instanceof Exception)
? (Exception) exception.getCause()
: new OpenSearchException(exception.getCause());
}
logger.trace("Query phase failed", e);
processFailure(readerContext, e);
throw e;
logger.trace("Query phase failed", exception);
processFailure(readerContext, exception);
throw exception;
} finally {
taskResourceTrackingService.writeTaskResourceUsage(task, clusterService.localNode().getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
Expand Down Expand Up @@ -331,10 +332,18 @@ public void testRejectsDuplicateExistingShardsAllocatorName() {

public void testQueryGroupMetadataRegister() {
List<NamedWriteableRegistry.Entry> customEntries = ClusterModule.getNamedWriteables();
List<NamedXContentRegistry.Entry> customXEntries = ClusterModule.getNamedXWriteables();
assertTrue(
customEntries.stream()
.anyMatch(entry -> entry.categoryClass == Metadata.Custom.class && entry.name.equals(QueryGroupMetadata.TYPE))
);

assertTrue(
customXEntries.stream()
.anyMatch(
entry -> entry.categoryClass == Metadata.Custom.class && entry.name.getPreferredName().equals(QueryGroupMetadata.TYPE)
)
);
}

public void testRerouteServiceSetForBalancedShardsAllocator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,13 @@ public void testRepositoryCreationShallowV2() throws Exception {
);
}

// Modify repo-1 settings. This should go through
updateRepository(
client,
"test-repo-1",
Settings.builder().put(snapshotRepoSettings1).put("max_snapshot_bytes_per_sec", "10k").build()
);

// Disable shallow snapshot V2 setting on test-repo-1
updateRepository(
client,
Expand Down

0 comments on commit 4ad21d2

Please sign in to comment.