Skip to content

Commit

Permalink
Merge 'main' into lucene_snapshot_9_9
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisHegarty committed Dec 4, 2023
2 parents 237db90 + d3fefde commit 11185d0
Show file tree
Hide file tree
Showing 111 changed files with 3,077 additions and 537 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102840.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102840
summary: Fail S3 repository analysis on partial reads
area: Snapshot/Restore
type: enhancement
issues: []
1 change: 1 addition & 0 deletions docs/reference/esql/functions/signature/to_boolean.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/reference/esql/functions/signature/to_datetime.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/reference/esql/functions/signature/to_degrees.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/reference/esql/functions/signature/to_double.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/reference/esql/functions/signature/to_integer.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/reference/esql/functions/signature/to_ip.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/reference/esql/functions/signature/to_long.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/reference/esql/functions/signature/to_radians.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions docs/reference/esql/functions/types/mv_count.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
|===
v | result
boolean | integer
cartesian_point | integer
datetime | integer
double | integer
geo_point | integer
integer | integer
ip | integer
keyword | integer
Expand Down
11 changes: 11 additions & 0 deletions docs/reference/esql/functions/types/to_boolean.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[%header.monospaced.styled,format=dsv,separator=|]
|===
v | result
boolean | boolean
double | boolean
integer | boolean
keyword | boolean
long | boolean
text | boolean
unsigned_long | boolean
|===
11 changes: 11 additions & 0 deletions docs/reference/esql/functions/types/to_datetime.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[%header.monospaced.styled,format=dsv,separator=|]
|===
v | result
datetime | datetime
double | datetime
integer | datetime
keyword | datetime
long | datetime
text | datetime
unsigned_long | datetime
|===
8 changes: 8 additions & 0 deletions docs/reference/esql/functions/types/to_degrees.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[%header.monospaced.styled,format=dsv,separator=|]
|===
v | result
double | double
integer | double
long | double
unsigned_long | double
|===
12 changes: 12 additions & 0 deletions docs/reference/esql/functions/types/to_double.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[%header.monospaced.styled,format=dsv,separator=|]
|===
v | result
boolean | double
datetime | double
double | double
integer | double
keyword | double
long | double
text | double
unsigned_long | double
|===
12 changes: 12 additions & 0 deletions docs/reference/esql/functions/types/to_integer.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[%header.monospaced.styled,format=dsv,separator=|]
|===
v | result
boolean | integer
datetime | integer
double | integer
integer | integer
keyword | integer
long | integer
text | integer
unsigned_long | integer
|===
3 changes: 2 additions & 1 deletion docs/reference/esql/functions/types/to_ip.asciidoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[%header.monospaced.styled,format=dsv,separator=|]
|===
arg1 | result
v | result
ip | ip
keyword | ip
text | ip
|===
14 changes: 14 additions & 0 deletions docs/reference/esql/functions/types/to_long.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[%header.monospaced.styled,format=dsv,separator=|]
|===
v | result
boolean | long
cartesian_point | long
datetime | long
double | long
geo_point | long
integer | long
keyword | long
long | long
text | long
unsigned_long | long
|===
8 changes: 8 additions & 0 deletions docs/reference/esql/functions/types/to_radians.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[%header.monospaced.styled,format=dsv,separator=|]
|===
v | result
double | double
integer | double
long | double
unsigned_long | double
|===
2 changes: 2 additions & 0 deletions docs/reference/esql/functions/types/to_string.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
|===
v | result
boolean | keyword
cartesian_point | keyword
datetime | keyword
double | keyword
geo_point | keyword
integer | keyword
ip | keyword
keyword | keyword
Expand Down
12 changes: 12 additions & 0 deletions docs/reference/esql/functions/types/to_unsigned_long.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[%header.monospaced.styled,format=dsv,separator=|]
|===
v | result
boolean | unsigned_long
datetime | unsigned_long
double | unsigned_long
integer | unsigned_long
keyword | unsigned_long
long | unsigned_long
text | unsigned_long
unsigned_long | unsigned_long
|===
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
org.gradle.welcome=never
org.gradle.warning.mode=none
org.gradle.parallel=true
# We need to declare --add-exports to make spotless working seamlessly with jdk16
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import java.util.stream.StreamSupport;

import static org.elasticsearch.repositories.RepositoriesModule.METRIC_REQUESTS_COUNT;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.allOf;
Expand All @@ -85,8 +85,6 @@
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
Expand Down Expand Up @@ -271,8 +269,12 @@ public void testMetrics() throws Exception {
final List<Measurement> metrics = Measurement.combine(plugins.get(0).getLongCounterMeasurement(METRIC_REQUESTS_COUNT));

assertThat(
statsCollectors.size(),
equalTo(metrics.stream().map(m -> m.attributes().get("operation")).collect(Collectors.toSet()).size())
statsCollectors.keySet().stream().map(S3BlobStore.StatsKey::operation).collect(Collectors.toSet()),
equalTo(
metrics.stream()
.map(m -> S3BlobStore.Operation.parse((String) m.attributes().get("operation")))
.collect(Collectors.toSet())
)
);
metrics.forEach(metric -> {
assertThat(
Expand Down Expand Up @@ -303,23 +305,24 @@ public void testRequestStatsWithOperationPurposes() throws IOException {
final String repoName = createRepository(randomRepositoryName());
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final BlobStore blobStore = repository.blobStore();
assertThat(blobStore, instanceOf(BlobStoreWrapper.class));
final BlobStore delegateBlobStore = ((BlobStoreWrapper) blobStore).delegate();
assertThat(delegateBlobStore, instanceOf(S3BlobStore.class));
final S3BlobStore.StatsCollectors statsCollectors = ((S3BlobStore) delegateBlobStore).getStatsCollectors();
final BlobStoreWrapper blobStore = asInstanceOf(BlobStoreWrapper.class, repository.blobStore());
final S3BlobStore delegateBlobStore = asInstanceOf(S3BlobStore.class, blobStore.delegate());
final S3BlobStore.StatsCollectors statsCollectors = delegateBlobStore.getStatsCollectors();

// Initial stats are collected with the default operation purpose
// Initial stats are collected for repository verification, which counts as SNAPSHOT_METADATA
final Set<String> allOperations = EnumSet.allOf(S3BlobStore.Operation.class)
.stream()
.map(S3BlobStore.Operation::getKey)
.collect(Collectors.toUnmodifiableSet());
statsCollectors.collectors.keySet().forEach(statsKey -> assertThat(statsKey.purpose(), is(OperationPurpose.SNAPSHOT)));
assertThat(
statsCollectors.collectors.keySet().stream().map(S3BlobStore.StatsKey::purpose).collect(Collectors.toUnmodifiableSet()),
equalTo(Set.of(OperationPurpose.SNAPSHOT_METADATA))
);
final Map<String, Long> initialStats = blobStore.stats();
assertThat(initialStats.keySet(), equalTo(allOperations));

// Collect more stats with an operation purpose other than the default
final OperationPurpose purpose = randomValueOtherThan(OperationPurpose.SNAPSHOT, BlobStoreTestUtil::randomPurpose);
final OperationPurpose purpose = randomValueOtherThan(OperationPurpose.SNAPSHOT_METADATA, BlobStoreTestUtil::randomPurpose);
final BlobPath blobPath = repository.basePath().add(randomAlphaOfLength(10));
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
final BytesArray whatToWrite = new BytesArray(randomByteArrayOfLength(randomIntBetween(100, 1000)));
Expand All @@ -332,7 +335,7 @@ public void testRequestStatsWithOperationPurposes() throws IOException {
// Internal stats collection is fine-grained and records different purposes
assertThat(
statsCollectors.collectors.keySet().stream().map(S3BlobStore.StatsKey::purpose).collect(Collectors.toUnmodifiableSet()),
equalTo(Set.of(OperationPurpose.SNAPSHOT, purpose))
equalTo(Set.of(OperationPurpose.SNAPSHOT_METADATA, purpose))
);
// The stats report aggregates over different purposes
final Map<String, Long> newStats = blobStore.stats();
Expand All @@ -341,7 +344,7 @@ public void testRequestStatsWithOperationPurposes() throws IOException {

final Set<String> operationsSeenForTheNewPurpose = statsCollectors.collectors.keySet()
.stream()
.filter(sk -> sk.purpose() != OperationPurpose.SNAPSHOT)
.filter(sk -> sk.purpose() != OperationPurpose.SNAPSHOT_METADATA)
.map(sk -> sk.operation().getKey())
.collect(Collectors.toUnmodifiableSet());

Expand Down Expand Up @@ -396,7 +399,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
() -> repository.blobStore()
.blobContainer(repository.basePath())
.writeBlobAtomic(
randomPurpose(),
randomNonDataPurpose(),
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(),
serialized,
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public long readBlobPreferredLength() {
@Override
public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
Expand All @@ -148,6 +149,7 @@ public void writeMetadataBlob(
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer
) throws IOException {
assert purpose != OperationPurpose.SNAPSHOT_DATA && BlobContainer.assertPurposeConsistency(purpose, blobName) : purpose;
final String absoluteBlobKey = buildKey(blobName);
try (
AmazonS3Reference clientReference = blobStore.clientReference();
Expand Down Expand Up @@ -273,6 +275,7 @@ long getLargeBlobThresholdInBytes() {
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
writeBlob(purpose, blobName, bytes, failIfAlreadyExists);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ private void ensureOpen() {
}

private void reopenStreamOrFail(IOException e) throws IOException {
if (purpose == OperationPurpose.REPOSITORY_ANALYSIS) {
logger.warn(() -> format("""
failed reading [%s/%s] at offset [%s]""", blobStore.bucket(), blobKey, start + currentOffset), e);
throw e;
}

final int maxAttempts = blobStore.getMaxRetries() + 1;

final long meaningfulProgressSize = Math.max(1L, blobStore.bufferSizeInBytes() / 100L);
Expand Down
Loading

0 comments on commit 11185d0

Please sign in to comment.