Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into sparse_vector_store
Browse files Browse the repository at this point in the history
  • Loading branch information
jimczi committed Dec 3, 2024
2 parents b25003e + d582b82 commit 167d686
Show file tree
Hide file tree
Showing 19 changed files with 251 additions and 193 deletions.
6 changes: 6 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ tests:
- class: org.elasticsearch.xpack.core.ml.search.SparseVectorQueryBuilderTests
method: testToQuery
issue: https://github.com/elastic/elasticsearch/issues/117904
- class: org.elasticsearch.packaging.test.ArchiveGenerateInitialCredentialsTests
method: test20NoAutoGenerationWhenAutoConfigurationDisabled
issue: https://github.com/elastic/elasticsearch/issues/117891
- class: org.elasticsearch.packaging.test.BootstrapCheckTests
method: test20RunWithBootstrapChecks
issue: https://github.com/elastic/elasticsearch/issues/117890

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.test.cluster.util.ProcessUtils;
import org.elasticsearch.test.cluster.util.Retry;
import org.elasticsearch.test.cluster.util.Version;
import org.elasticsearch.test.cluster.util.resource.MutableResource;
import org.elasticsearch.test.cluster.util.resource.Resource;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
Expand Down Expand Up @@ -115,6 +117,9 @@ public static class Node {
private Version currentVersion;
private Process process = null;
private DistributionDescriptor distributionDescriptor;
private Set<String> extraConfigListeners = new HashSet<>();
private Set<String> keystoreFileListeners = new HashSet<>();
private Set<Resource> roleFileListeners = new HashSet<>();

public Node(Path baseWorkingDir, DistributionResolver distributionResolver, LocalNodeSpec spec) {
this(baseWorkingDir, distributionResolver, spec, null, false);
Expand Down Expand Up @@ -436,6 +441,10 @@ private void writeConfiguration() {

private void copyExtraConfigFiles() {
spec.getExtraConfigFiles().forEach((fileName, resource) -> {
if (fileName.equals("roles.yml")) {
throw new IllegalArgumentException("Security roles should be configured via 'rolesFile()' method.");
}

final Path target = configDir.resolve(fileName);
final Path directory = target.getParent();
if (Files.exists(directory) == false) {
Expand All @@ -446,6 +455,14 @@ private void copyExtraConfigFiles() {
}
}
resource.writeTo(target);

// Register and update listener for this config file
if (resource instanceof MutableResource && extraConfigListeners.add(fileName)) {
((MutableResource) resource).addUpdateListener(updated -> {
LOGGER.info("Updating config file '{}'", fileName);
updated.writeTo(target);
});
}
});
}

Expand Down Expand Up @@ -485,29 +502,39 @@ private void addKeystoreSettings() {

private void addKeystoreFiles() {
spec.getKeystoreFiles().forEach((key, file) -> {
try {
Path path = Files.createTempFile(tempDir, key, null);
file.writeTo(path);

ProcessUtils.exec(
spec.getKeystorePassword(),
workingDir,
OS.conditional(
c -> c.onWindows(() -> distributionDir.resolve("bin").resolve("elasticsearch-keystore.bat"))
.onUnix(() -> distributionDir.resolve("bin").resolve("elasticsearch-keystore"))
),
getEnvironmentVariables(),
false,
"add-file",
key,
path.toString()
).waitFor();
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
addKeystoreFile(key, file);
if (file instanceof MutableResource && keystoreFileListeners.add(key)) {
((MutableResource) file).addUpdateListener(updated -> {
LOGGER.info("Updating keystore file '{}'", key);
addKeystoreFile(key, updated);
});
}
});
}

private void addKeystoreFile(String key, Resource file) {
try {
Path path = Files.createTempFile(tempDir, key, null);
file.writeTo(path);

ProcessUtils.exec(
spec.getKeystorePassword(),
workingDir,
OS.conditional(
c -> c.onWindows(() -> distributionDir.resolve("bin").resolve("elasticsearch-keystore.bat"))
.onUnix(() -> distributionDir.resolve("bin").resolve("elasticsearch-keystore"))
),
getEnvironmentVariables(),
false,
"add-file",
key,
path.toString()
).waitFor();
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
}

private void writeSecureSecretsFile() {
if (spec.getKeystoreFiles().isEmpty() == false) {
throw new IllegalStateException(
Expand Down Expand Up @@ -535,16 +562,20 @@ private void configureSecurity() {
if (spec.isSecurityEnabled()) {
if (spec.getUsers().isEmpty() == false) {
LOGGER.info("Setting up roles.yml for node '{}'", name);

Path destination = workingDir.resolve("config").resolve("roles.yml");
spec.getRolesFiles().forEach(rolesFile -> {
try (
Writer writer = Files.newBufferedWriter(destination, StandardOpenOption.APPEND);
Reader reader = new BufferedReader(new InputStreamReader(rolesFile.asStream()))
) {
reader.transferTo(writer);
} catch (IOException e) {
throw new UncheckedIOException("Failed to append roles file " + rolesFile + " to " + destination, e);
writeRolesFile();
spec.getRolesFiles().forEach(resource -> {
if (resource instanceof MutableResource && roleFileListeners.add(resource)) {
((MutableResource) resource).addUpdateListener(updated -> {
LOGGER.info("Updating roles.yml for node '{}'", name);
Path rolesFile = workingDir.resolve("config").resolve("roles.yml");
try {
Files.delete(rolesFile);
Files.copy(distributionDir.resolve("config").resolve("roles.yml"), rolesFile);
writeRolesFile();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
});
}
Expand Down Expand Up @@ -596,6 +627,20 @@ private void configureSecurity() {
}
}

private void writeRolesFile() {
Path destination = workingDir.resolve("config").resolve("roles.yml");
spec.getRolesFiles().forEach(rolesFile -> {
try (
Writer writer = Files.newBufferedWriter(destination, StandardOpenOption.APPEND);
Reader reader = new BufferedReader(new InputStreamReader(rolesFile.asStream()))
) {
reader.transferTo(writer);
} catch (IOException e) {
throw new UncheckedIOException("Failed to append roles file " + rolesFile + " to " + destination, e);
}
});
}

private void installPlugins() {
if (spec.getPlugins().isEmpty() == false) {
Pattern pattern = Pattern.compile("(.+)(?:-\\d+\\.\\d+\\.\\d+(-SNAPSHOT)?\\.zip)");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.test.cluster.util.resource;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* A mutable version of {@link Resource}. Anywhere a {@link Resource} is accepted in the test clusters API a {@link MutableResource} can
* be supplied instead. Unless otherwise specified, when the {@link #update(Resource)} method is called, the backing configuration will
* be updated in-place.
*/
public class MutableResource implements Resource {
private final List<Consumer<? super Resource>> listeners = new ArrayList<>();
private Resource delegate;

private MutableResource(Resource delegate) {
this.delegate = delegate;
}

@Override
public InputStream asStream() {
return delegate.asStream();
}

public static MutableResource from(Resource delegate) {
return new MutableResource(delegate);
}

public void update(Resource delegate) {
this.delegate = delegate;
this.listeners.forEach(listener -> listener.accept(this));
}

/**
* Registers a listener that will be notified when any updates are made to this resource. This listener will receive a reference to
* the resource with the updated value.
*
* @param listener action to be called on update
*/
public synchronized void addUpdateListener(Consumer<? super Resource> listener) {
listeners.add(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static ElasticsearchCluster getCluster() {
.setting("xpack.license.self_generated.type", "basic")
.setting("xpack.monitoring.collection.enabled", "true")
.setting("xpack.security.enabled", "true")
.configFile("roles.yml", Resource.fromClasspath("roles.yml"))
.rolesFile(Resource.fromClasspath("roles.yml"))
.user("test-admin", "x-pack-test-password", "test-admin", false)
.user("user1", "x-pack-test-password", "user1", false)
.user("user2", "x-pack-test-password", "user2", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ public static ZonedDateTime asDateTime(long millis) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis), UTC);
}

public static ZonedDateTime asDateTime(Instant instant) {
return ZonedDateTime.ofInstant(instant, UTC);
}

public static long asMillis(ZonedDateTime zonedDateTime) {
return zonedDateTime.toInstant().toEpochMilli();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,70 +620,6 @@ public static void forUnaryBoolean(
unary(suppliers, expectedEvaluatorToString, booleanCases(), expectedType, v -> expectedValue.apply((Boolean) v), warnings);
}

/**
* Generate positive test cases for a unary function operating on an {@link DataType#DATETIME}.
* This variant defaults to maximum range of possible values
*/
public static void forUnaryDatetime(
List<TestCaseSupplier> suppliers,
String expectedEvaluatorToString,
DataType expectedType,
Function<Instant, Object> expectedValue,
List<String> warnings
) {
unaryNumeric(
suppliers,
expectedEvaluatorToString,
dateCases(),
expectedType,
n -> expectedValue.apply(Instant.ofEpochMilli(n.longValue())),
warnings
);
}

/**
* Generate positive test cases for a unary function operating on an {@link DataType#DATETIME}.
* This variant accepts a range of values
*/
public static void forUnaryDatetime(
List<TestCaseSupplier> suppliers,
String expectedEvaluatorToString,
DataType expectedType,
long min,
long max,
Function<Instant, Object> expectedValue,
List<String> warnings
) {
unaryNumeric(
suppliers,
expectedEvaluatorToString,
dateCases(min, max),
expectedType,
n -> expectedValue.apply(Instant.ofEpochMilli(n.longValue())),
warnings
);
}

/**
* Generate positive test cases for a unary function operating on an {@link DataType#DATE_NANOS}.
*/
public static void forUnaryDateNanos(
List<TestCaseSupplier> suppliers,
String expectedEvaluatorToString,
DataType expectedType,
Function<Instant, Object> expectedValue,
List<String> warnings
) {
unaryNumeric(
suppliers,
expectedEvaluatorToString,
dateNanosCases(),
expectedType,
n -> expectedValue.apply(DateUtils.toInstant((long) n)),
warnings
);
}

/**
* Generate positive test cases for a unary function operating on an {@link DataType#GEO_POINT}.
*/
Expand Down Expand Up @@ -1912,11 +1848,19 @@ public List<Object> multiRowData() {
}

/**
* @return the data value being supplied, casting unsigned longs into BigIntegers correctly
* @return the data value being supplied, casting to java objects when appropriate
*/
public Object getValue() {
if (type == DataType.UNSIGNED_LONG && data instanceof Long l) {
return NumericUtils.unsignedLongAsBigInteger(l);
if (data instanceof Long l) {
if (type == DataType.UNSIGNED_LONG) {
return NumericUtils.unsignedLongAsBigInteger(l);
}
if (type == DataType.DATETIME) {
return Instant.ofEpochMilli(l);
}
if (type == DataType.DATE_NANOS) {
return DateUtils.toInstant(l);
}
}
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;

import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
Expand All @@ -36,14 +37,20 @@ public static Iterable<Object[]> parameters() {
final String read = "Attribute[channel=0]";
final List<TestCaseSupplier> suppliers = new ArrayList<>();

TestCaseSupplier.forUnaryDateNanos(suppliers, read, DataType.DATE_NANOS, DateUtils::toLong, List.of());
TestCaseSupplier.forUnaryDatetime(
TestCaseSupplier.unary(
suppliers,
read,
TestCaseSupplier.dateNanosCases(),
DataType.DATE_NANOS,
v -> DateUtils.toLong((Instant) v),
List.of()
);
TestCaseSupplier.unary(
suppliers,
"ToDateNanosFromDatetimeEvaluator[field=" + read + "]",
TestCaseSupplier.dateCases(0, DateUtils.MAX_NANOSECOND_INSTANT.toEpochMilli()),
DataType.DATE_NANOS,
0,
DateUtils.MAX_NANOSECOND_INSTANT.toEpochMilli(),
i -> DateUtils.toNanoSeconds(i.toEpochMilli()),
i -> DateUtils.toNanoSeconds(((Instant) i).toEpochMilli()),
List.of()
);
TestCaseSupplier.forUnaryLong(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,20 @@ public static Iterable<Object[]> parameters() {
final String read = "Attribute[channel=0]";
final List<TestCaseSupplier> suppliers = new ArrayList<>();

TestCaseSupplier.forUnaryDatetime(suppliers, read, DataType.DATETIME, Instant::toEpochMilli, emptyList());
TestCaseSupplier.forUnaryDateNanos(
TestCaseSupplier.unary(
suppliers,
read,
TestCaseSupplier.dateCases(),
DataType.DATETIME,
v -> ((Instant) v).toEpochMilli(),
emptyList()
);
TestCaseSupplier.unary(
suppliers,
"ToDatetimeFromDateNanosEvaluator[field=" + read + "]",
TestCaseSupplier.dateNanosCases(),
DataType.DATETIME,
i -> DateUtils.toMilliSeconds(DateUtils.toLong(i)),
i -> DateUtils.toMilliSeconds(DateUtils.toLong((Instant) i)),
emptyList()
);

Expand Down
Loading

0 comments on commit 167d686

Please sign in to comment.