Skip to content

Commit

Permalink
Merge pull request ESQL-915 from elastic/main
Browse files Browse the repository at this point in the history
🤖 ESQL: Merge upstream
  • Loading branch information
elasticsearchmachine authored Mar 23, 2023
2 parents ff37737 + 5dc2541 commit 546ff53
Show file tree
Hide file tree
Showing 38 changed files with 864 additions and 513 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ out/
!.idea/eclipseCodeFormatter.xml
!.idea/externalDependencies.xml
!.idea/inspectionProfiles/Project_Default.xml
!.idea/runConfigurations/Debug_Elasticsearch.xml
!.idea/runConfigurations/
!.idea/scopes/x_pack.xml

# These files are generated in the main tree by IntelliJ
Expand Down
11 changes: 11 additions & 0 deletions .idea/runConfigurations/Debug_Elasticsearch__node_2_.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions .idea/runConfigurations/Debug_Elasticsearch__node_3_.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion TESTING.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ port of `5007`.
NOTE: In the case of test clusters using multiple nodes, multiple debuggers
will need to be attached on incrementing ports. For example, for a 3 node
cluster ports `5007`, `5008`, and `5009` will attempt to attach to a listening
debugger.
debugger. You can use the "Debug Elasticsearch (node 2)" and "(node 3)" run
configurations should you need to debug a multi-node cluster.

You can also use a combination of both flags to debug both tests and server.
This is only applicable to Java REST tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ configure(allprojects) {
project.getTasks().withType(ThirdPartyAuditTask.class).configureEach {
if (BuildParams.getIsRuntimeJavaHomeSet() == false) {
javaHome.set(providers.provider(() -> "${project.jdks.provisioned_runtime.javaHomePath}"))
targetCompatibility.set(providers.provider(() -> JavaVersion.toVersion(project.jdks.provisioned_runtime.major)))
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions docs/changelog/94000.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 94000
summary: Introduce redirect method on IngestDocument
area: Ingest Node
type: enhancement
issues:
- 83653
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -240,11 +239,8 @@ public void testErrorRecordingOnRetention() throws Exception {
String firstGenerationIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1L);

// mark the first generation index as read-only so deletion fails when we enable the retention configuration
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(firstGenerationIndex);
updateSettingsRequest.settings(Settings.builder().put(READ_ONLY.settingName(), true));
updateIndexSettings(Settings.builder().put(READ_ONLY.settingName(), true), firstGenerationIndex);
try {
client().admin().indices().updateSettings(updateSettingsRequest);

// TODO replace this with an API call to update the lifecycle for the data stream once available
PlainActionFuture.get(
fut -> internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.core.Nullable;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -23,38 +24,45 @@
*/
public class DataLifecycleErrorStore {

private final ConcurrentMap<String, String> targetToError = new ConcurrentHashMap<>();
private final ConcurrentMap<String, String> indexNameToError = new ConcurrentHashMap<>();

/**
* Records a string representation of the provided exception for the provided target.
* If an error was already recorded for the provided target this will override that error.
* Records a string representation of the provided exception for the provided index.
* If an error was already recorded for the provided index this will override that error.
*/
public void recordError(String target, Exception e) {
targetToError.put(target, org.elasticsearch.common.Strings.toString(((builder, params) -> {
public void recordError(String indexName, Exception e) {
indexNameToError.put(indexName, org.elasticsearch.common.Strings.toString(((builder, params) -> {
ElasticsearchException.generateThrowableXContent(builder, EMPTY_PARAMS, e);
return builder;
})));
}

/**
* Clears the recorded error for the provided target (if any exists)
* Clears the recorded error for the provided index (if any exists)
*/
public void clearRecordedError(String target) {
targetToError.remove(target);
public void clearRecordedError(String indexName) {
indexNameToError.remove(indexName);
}

/**
* Clears all the errors recorded in the store.
*/
public void clearStore() {
targetToError.clear();
indexNameToError.clear();
}

/**
* Retrieves the recorderd error for the provided target.
* Retrieves the recorderd error for the provided index.
*/
@Nullable
public String getError(String target) {
return targetToError.get(target);
public String getError(String indexName) {
return indexNameToError.get(indexName);
}

/**
* Return an immutable view (a snapshot) of the tracked indices at the moment this method is called.
*/
public List<String> getAllIndices() {
return List.copyOf(indexNameToError.keySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;

/**
Expand Down Expand Up @@ -136,13 +134,6 @@ public void clusterChanged(ClusterChangedEvent event) {
errorStore.clearStore();
}
}
if (event.localNodeMaster()) {
// only execute if we're the master
List<Index> indicesDeleted = event.indicesDeleted();
for (Index deleted : indicesDeleted) {
errorStore.clearRecordedError(deleted.getName());
}
}
}

@Override
Expand Down Expand Up @@ -210,9 +201,12 @@ void run(ClusterState state) {
*/
private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
Metadata metadata = clusterService.state().metadata();
for (Index index : dataStream.getIndices()) {
if (dataStream.isIndexManagedByDLM(index, metadata::index) == false) {
errorStore.clearRecordedError(index.getName());
for (String indexName : errorStore.getAllIndices()) {
IndexMetadata indexMeta = metadata.index(indexName);
if (indexMeta == null) {
errorStore.clearRecordedError(indexName);
} else if (dataStream.isIndexManagedByDLM(indexMeta.getIndex(), metadata::index) == false) {
errorStore.clearRecordedError(indexName);
}
}
}
Expand All @@ -223,7 +217,7 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
RolloverRequest rolloverRequest = getDefaultRolloverRequest(dataStream.getName());
transportActionsDeduplicator.executeOnce(
rolloverRequest,
new ErrorRecordingActionListener(writeIndex.getName(), errorStore::recordError, errorStore::clearRecordedError),
new ErrorRecordingActionListener(writeIndex.getName(), errorStore),
(req, reqListener) -> rolloverDataStream(writeIndex.getName(), rolloverRequest, reqListener)
);
}
Expand All @@ -247,7 +241,7 @@ private void maybeExecuteRetention(ClusterState state, DataStream dataStream) {
// time to delete the index
transportActionsDeduplicator.executeOnce(
deleteRequest,
new ErrorRecordingActionListener(indexName, errorStore::recordError, errorStore::clearRecordedError),
new ErrorRecordingActionListener(indexName, errorStore),
(req, reqListener) -> deleteIndex(deleteRequest, retention, reqListener)
);
}
Expand Down Expand Up @@ -348,23 +342,21 @@ static TimeValue getRetentionConfiguration(DataStream dataStream) {
*/
static class ErrorRecordingActionListener implements ActionListener<Void> {
private final String targetIndex;
private final BiConsumer<String, Exception> recordError;
private final Consumer<String> clearErrorRecord;
private final DataLifecycleErrorStore errorStore;

ErrorRecordingActionListener(String targetIndex, BiConsumer<String, Exception> recordError, Consumer<String> clearErrorRecord) {
ErrorRecordingActionListener(String targetIndex, DataLifecycleErrorStore errorStore) {
this.targetIndex = targetIndex;
this.recordError = recordError;
this.clearErrorRecord = clearErrorRecord;
this.errorStore = errorStore;
}

@Override
public void onResponse(Void unused) {
clearErrorRecord.accept(targetIndex);
errorStore.clearRecordedError(targetIndex);
}

@Override
public void onFailure(Exception e) {
recordError.accept(targetIndex, e);
errorStore.recordError(targetIndex, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.util.List;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -27,11 +31,41 @@ public void setupServices() {
public void testRecordAndRetrieveError() {
errorStore.recordError("test", new NullPointerException("testing"));
assertThat(errorStore.getError("test"), is(notNullValue()));
assertThat(errorStore.getAllIndices().size(), is(1));
assertThat(errorStore.getAllIndices().get(0), is("test"));
}

public void testRetrieveAfterClear() {
errorStore.recordError("test", new NullPointerException("testing"));
errorStore.clearStore();
assertThat(errorStore.getError("test"), is(nullValue()));
}

public void testGetAllIndicesIsASnapshotViewOfTheStore() {
Stream.iterate(0, i -> i + 1).limit(5).forEach(i -> errorStore.recordError("test" + i, new NullPointerException("testing")));
List<String> initialAllIndices = errorStore.getAllIndices();
assertThat(initialAllIndices.size(), is(5));
assertThat(
initialAllIndices,
containsInAnyOrder(Stream.iterate(0, i -> i + 1).limit(5).map(i -> "test" + i).toArray(String[]::new))
);

// let's add some more items to the store and clear a couple of the initial ones
Stream.iterate(5, i -> i + 1).limit(5).forEach(i -> errorStore.recordError("test" + i, new NullPointerException("testing")));
errorStore.clearRecordedError("test0");
errorStore.clearRecordedError("test1");
// the initial list should remain unchanged
assertThat(initialAllIndices.size(), is(5));
assertThat(
initialAllIndices,
containsInAnyOrder(Stream.iterate(0, i -> i + 1).limit(5).map(i -> "test" + i).toArray(String[]::new))
);

// calling getAllIndices again should reflect the latest state
assertThat(errorStore.getAllIndices().size(), is(8));
assertThat(
errorStore.getAllIndices(),
containsInAnyOrder(Stream.iterate(2, i -> i + 1).limit(8).map(i -> "test" + i).toArray(String[]::new))
);
}
}
Loading

0 comments on commit 546ff53

Please sign in to comment.