Skip to content

Commit

Permalink
Merge branch 'main' into cleanup_guava_dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelandis authored Sep 18, 2023
2 parents 90b0368 + 3691312 commit dc8a72e
Show file tree
Hide file tree
Showing 41 changed files with 601 additions and 327 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/99470.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99470
summary: "ESQL: Improve log messages"
area: ES|QL
type: enhancement
issues: []
10 changes: 5 additions & 5 deletions docs/reference/snapshot-restore/repository-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,16 @@ emulate S3's behaviour in full. The `repository-s3` type requires full
compatibility with S3. In particular it must support the same set of API
endpoints, return the same errors in case of failures, and offer consistency
and performance at least as good as S3 even when accessed concurrently by
multiple nodes. Incompatible error codes, consistency or performance may be
particularly hard to track down since errors, consistency failures, and
performance issues are usually rare and hard to reproduce.
multiple nodes. You will need to work with the supplier of your storage system
to address any incompatibilities you encounter.

You can perform some basic checks of the suitability of your storage system
using the {ref}/repo-analysis-api.html[repository analysis API]. If this API
does not complete successfully, or indicates poor performance, then your
storage system is not fully compatible with AWS S3 and therefore unsuitable for
use as a snapshot repository. You will need to work with the supplier of your
storage system to address any incompatibilities you encounter.
use as a snapshot repository. However, these checks do not guarantee full
compatibility. Incompatible error codes and consistency or performance issues
may be rare and hard to reproduce.

[[repository-s3-repository]]
==== Repository settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ public void testDownsampling() throws Exception {
DataStreamLifecycle.newBuilder()
.downsampling(
new Downsampling(
List.of(new Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s"))))
List.of(new Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))))
)
)
.dataRetention(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -977,7 +977,7 @@ public void testDownsampling() throws Exception {
String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName(
DOWNSAMPLED_INDEX_PREFIX,
state.metadata().index(firstGenIndex),
new DateHistogramInterval("1s")
new DateHistogramInterval("5m")
);
{
// let's simulate the in-progress downsampling
Expand Down Expand Up @@ -1100,7 +1100,7 @@ public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exc
DataStreamLifecycle.newBuilder()
.downsampling(
new Downsampling(
List.of(new Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s"))))
List.of(new Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))))
)
)
.dataRetention(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -1129,7 +1129,7 @@ public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exc
String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName(
DOWNSAMPLED_INDEX_PREFIX,
state.metadata().index(firstGenIndexName),
new DateHistogramInterval("1s")
new DateHistogramInterval("5m")
);
Metadata.Builder newMetadata = Metadata.builder(state.metadata())
.put(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
"Explain backing index lifecycle":
- skip:
version: " - 8.9.99"
reason: "Explain data stream lifecycle API was updated in 8.10"
version: " - 8.10.99"
reason: "Data stream lifecycle was GA in 8.11"
features: allowed_warnings
- do:
allowed_warnings:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
"Create data stream with lifecycle":
- skip:
version: " - 8.9.99"
reason: "data stream lifecycle in index templates was added after 8.10"
version: " - 8.10.99"
reason: "Data stream lifecycle was GA in 8.11"
features: allowed_warnings
- do:
allowed_warnings:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
setup:
- skip:
features: allowed_warnings
version: " - 8.9.99"
reason: "Data stream lifecycles only supported in 8.10+"
version: " - 8.10.99"
reason: "Data stream lifecycle was GA in 8.11"
- do:
allowed_warnings:
- "index template [my-lifecycle] has index patterns [my-data-stream-1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-lifecycle] will take precedence during new index creation"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,30 @@
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.TestCaseOrdering;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.util.Version;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.junit.AfterClass;
import org.junit.Before;

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.upgrades.FullClusterRestartUpgradeStatus.OLD;
import static org.elasticsearch.upgrades.FullClusterRestartUpgradeStatus.UPGRADED;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

@TestCaseOrdering(FullClusterRestartTestOrdering.class)
public abstract class ParameterizedFullClusterRestartTestCase extends ESRestTestCase {
private static final Version MINIMUM_WIRE_COMPATIBLE_VERSION = Version.fromString("7.17.0");
private static final Version OLD_CLUSTER_VERSION = Version.fromString(System.getProperty("tests.old_cluster_version"));
private static IndexVersion oldIndexVersion;
private static boolean upgradeFailed = false;
private static boolean upgraded = false;
private final FullClusterRestartUpgradeStatus requestedUpgradeStatus;
Expand All @@ -43,6 +49,39 @@ public static Iterable<Object[]> parameters() throws Exception {
return Arrays.stream(FullClusterRestartUpgradeStatus.values()).map(v -> new Object[] { v }).toList();
}

@Before
public void extractOldIndexVersion() throws Exception {
if (upgraded == false) {
IndexVersion indexVersion = null; // these should all be the same version

Request request = new Request("GET", "_nodes");
request.addParameter("filter_path", "nodes.*.index_version,nodes.*.name");
Response response = client().performRequest(request);
ObjectPath objectPath = ObjectPath.createFromResponse(response);
Map<String, Object> nodeMap = objectPath.evaluate("nodes");
for (String id : nodeMap.keySet()) {
Number ix = objectPath.evaluate("nodes." + id + ".index_version");
IndexVersion version;
if (ix != null) {
version = IndexVersion.fromId(ix.intValue());
} else {
// it doesn't have index version (pre 8.11) - just infer it from the release version
version = IndexVersion.fromId(getOldClusterVersion().id);
}

if (indexVersion == null) {
indexVersion = version;
} else {
String name = objectPath.evaluate("nodes." + id + ".name");
assertThat("Node " + name + " has a different index version to other nodes", version, equalTo(indexVersion));
}
}

assertThat("Index version could not be read", indexVersion, notNullValue());
oldIndexVersion = indexVersion;
}
}

@Before
public void maybeUpgrade() throws Exception {
if (upgraded == false && requestedUpgradeStatus == UPGRADED) {
Expand Down Expand Up @@ -81,13 +120,8 @@ public static org.elasticsearch.Version getOldClusterVersion() {
}

public static IndexVersion getOldClusterIndexVersion() {
var version = getOldClusterVersion();
if (version.equals(org.elasticsearch.Version.CURRENT)) {
return IndexVersion.current();
} else {
assertThat("Index version needs to be added to restart test parameters", version, lessThan(org.elasticsearch.Version.V_8_11_0));
return IndexVersion.fromId(version.id);
}
assert oldIndexVersion != null;
return oldIndexVersion;
}

public static Version getOldClusterTestVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@
---
"Add data stream lifecycle":
- skip:
version: " - 8.9.99"
reason: "Data stream lifecycle template was updated after 8.9"
version: " - 8.10.99"
reason: "Data stream lifecycle was GA in 8.11"

- do:
cluster.put_component_template:
Expand All @@ -145,8 +145,8 @@
---
"Get data stream lifecycle with default rollover":
- skip:
version: " - 8.9.99"
reason: "Data stream lifecycle template was added after 8.9"
version: " - 8.10.99"
reason: "Data stream lifecycle was GA in 8.11"

- do:
cluster.put_component_template:
Expand Down
24 changes: 6 additions & 18 deletions server/src/main/java/org/elasticsearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ public class ActionModule extends AbstractModule {
private final ThreadPool threadPool;
private final ReservedClusterStateService reservedClusterStateService;
private final boolean serverlessEnabled;
private final RestExtension restExtension;

public ActionModule(
Settings settings,
Expand All @@ -525,7 +526,8 @@ public ActionModule(
SystemIndices systemIndices,
Tracer tracer,
ClusterService clusterService,
List<ReservedClusterStateHandler<?>> reservedStateHandlers
List<ReservedClusterStateHandler<?>> reservedStateHandlers,
RestExtension restExtension
) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand Down Expand Up @@ -572,6 +574,7 @@ public ActionModule(
restController = new RestController(restInterceptor, nodeClient, circuitBreakerService, usageService, tracer);
}
reservedClusterStateService = new ReservedClusterStateService(clusterService, reservedStateHandlers);
this.restExtension = restExtension;
}

private static <T> T getRestServerComponent(
Expand Down Expand Up @@ -851,15 +854,10 @@ private static ActionFilters setupActionFilters(List<ActionPlugin> actionPlugins

public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
List<AbstractCatAction> catActions = new ArrayList<>();
var restExtension = RestExtension.load(() -> new RestExtension() {
@Override
public Predicate<AbstractCatAction> getCatActionsFilter() {
return action -> true;
}
});
Predicate<AbstractCatAction> catActionsFilter = restExtension.getCatActionsFilter();
Predicate<RestHandler> restFilter = restExtension.getActionsFilter();
Consumer<RestHandler> registerHandler = handler -> {
if (shouldKeepRestHandler(handler)) {
if (restFilter.test(handler)) {
if (handler instanceof AbstractCatAction catAction && catActionsFilter.test(catAction)) {
catActions.add(catAction);
}
Expand Down Expand Up @@ -1066,16 +1064,6 @@ public Predicate<AbstractCatAction> getCatActionsFilter() {
registerHandler.accept(new RestDeleteSynonymRuleAction());
}

/**
* This method is used to determine whether a RestHandler ought to be kept in memory or not. Returns true if serverless mode is
* disabled, or if there is any ServlerlessScope annotation on the RestHandler.
* @param handler
* @return
*/
private boolean shouldKeepRestHandler(final RestHandler handler) {
return serverlessEnabled == false || handler.getServerlessScope() != null;
}

@Override
protected void configure() {
bind(ActionFilters.class).toInstance(actionFilters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -333,6 +334,53 @@ public void complete(ActionListener<?> listener) {
}
}

/**
* Creates and returns a new {@link SubscribableListener} {@code L} and subscribes {@code nextStep} to this listener such that if this
* listener is completed successfully with result {@code R} then {@code nextStep} is invoked with arguments {@code L} and {@code R}. If
* this listener is completed with exception {@code E} then so is {@code L}.
* <p>
* This can be used to construct a sequence of async actions, each invoked with the result of the previous one:
* <pre>
* l.andThen((l1, o1) -> forkAction1(o1, args1, l1)).andThen((l2, o2) -> forkAction2(o2, args2, l2)).addListener(finalListener);
* </pre>
* After creating this chain, completing {@code l} with a successful response will pass the response to {@code forkAction1}, which will
* on completion pass its response to {@code forkAction2}, which will in turn pass its response to {@code finalListener}. A failure of
* any step will bypass the remaining steps and ultimately fail {@code finalListener}.
* <p>
* The threading of the {@code nextStep} callback is the same as for listeners added with {@link #addListener}: if this listener is
* already complete then {@code nextStep} is invoked on the thread calling {@link #andThen} and in its thread context, but if this
* listener is incomplete then {@code nextStep} is invoked on the completing thread and in its thread context.
*/
public <U> SubscribableListener<U> andThen(CheckedBiConsumer<ActionListener<U>, T, ? extends Exception> nextStep) {
return andThen(EsExecutors.DIRECT_EXECUTOR_SERVICE, null, nextStep);
}

/**
* Creates and returns a new {@link SubscribableListener} {@code L} and subscribes {@code nextStep} to this listener such that if this
* listener is completed successfully with result {@code R} then {@code nextStep} is invoked with arguments {@code L} and {@code R}. If
* this listener is completed with exception {@code E} then so is {@code L}.
* <p>
* This can be used to construct a sequence of async actions, each invoked with the result of the previous one:
* <pre>
* l.andThen(x, t, (l1,o1) -> forkAction1(o1,args1,l1)).andThen(x, t, (l2,o2) -> forkAction2(o2,args2,l2)).addListener(finalListener);
* </pre>
* After creating this chain, completing {@code l} with a successful response will pass the response to {@code forkAction1}, which will
* on completion pass its response to {@code forkAction2}, which will in turn pass its response to {@code finalListener}. A failure of
* any step will bypass the remaining steps and ultimately fail {@code finalListener}.
* <p>
* The threading of the {@code nextStep} callback is the same as for listeners added with {@link #addListener}: if this listener is
* already complete then {@code nextStep} is invoked on the thread calling {@link #andThen} and in its thread context, but if this
* listener is incomplete then {@code nextStep} is invoked using {@code executor}, in a thread context captured when {@link #andThen}
* was called.
*/
public <U> SubscribableListener<U> andThen(
Executor executor,
@Nullable ThreadContext threadContext,
CheckedBiConsumer<ActionListener<U>, T, ? extends Exception> nextStep
) {
return newForked(l -> addListener(l.delegateFailureAndWrap(nextStep), executor, threadContext));
}

/**
* Adds a timeout to this listener, such that if the timeout elapses before the listener is completed then it will be completed with an
* {@link ElasticsearchTimeoutException}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ public void writeTo(StreamOutput out) throws IOException {
*/
public record Downsampling(@Nullable List<Round> rounds) implements Writeable, ToXContentFragment {

public static final long FIVE_MINUTES_MILLIS = TimeValue.timeValueMinutes(5).getMillis();

/**
* A round represents the configuration for when and how elasticsearch will downsample a backing index.
* @param after is a TimeValue configuring how old (based on generation age) should a backing index be before downsampling
Expand Down Expand Up @@ -356,6 +358,14 @@ public static Round read(StreamInput in) throws IOException {
return new Round(in.readTimeValue(), new DownsampleConfig(in));
}

public Round {
if (config.getFixedInterval().estimateMillis() < FIVE_MINUTES_MILLIS) {
throw new IllegalArgumentException(
"A downsampling round must have a fixed interval of at least five minutes but found: " + config.getFixedInterval()
);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeTimeValue(after);
Expand Down
Loading

0 comments on commit dc8a72e

Please sign in to comment.