diff --git a/.ci/scripts/resolve-dra-manifest.sh b/.ci/scripts/resolve-dra-manifest.sh index bd7a9bbbdafe..4ac94122351a 100755 --- a/.ci/scripts/resolve-dra-manifest.sh +++ b/.ci/scripts/resolve-dra-manifest.sh @@ -23,7 +23,14 @@ LATEST_VERSION=$(strip_version $LATEST_BUILD) if [ "$LATEST_VERSION" != "$ES_VERSION" ]; then echo "Latest build for '$ARTIFACT' is version $LATEST_VERSION but expected version $ES_VERSION." 1>&2 NEW_BRANCH=$(echo $ES_VERSION | sed -E "s/([0-9]+\.[0-9]+)\.[0-9]/\1/g") + + # Temporary + if [[ "$ES_VERSION" == "8.16.0" ]]; then + NEW_BRANCH="8.x" + fi + echo "Using branch $NEW_BRANCH instead of $BRANCH." 1>&2 + echo "https://artifacts-$WORKFLOW.elastic.co/$ARTIFACT/latest/$NEW_BRANCH.json" LATEST_BUILD=$(fetch_build $WORKFLOW $ARTIFACT $NEW_BRANCH) fi diff --git a/docs/reference/cluster/stats.asciidoc b/docs/reference/cluster/stats.asciidoc index c39bc0dcd287..575a6457804a 100644 --- a/docs/reference/cluster/stats.asciidoc +++ b/docs/reference/cluster/stats.asciidoc @@ -1307,6 +1307,142 @@ Each repository type may also include other statistics about the repositories of ==== +`ccs`:: +(object) Contains information relating to <> settings and activity in the cluster. ++ +.Properties of `ccs` +[%collapsible%open] +===== + + +`_search`::: +(object) Contains the telemetry information about the <> usage in the cluster. ++ +.Properties of `_search` +[%collapsible%open] +====== +`total`::: +(integer) The total number of {ccs} requests that have been executed by the cluster. + +`success`::: +(integer) The total number of {ccs} requests that have been successfully executed by the cluster. + +`skipped`::: +(integer) The total number of {ccs} requests (successful or failed) that had at least one remote cluster skipped. + +`took`::: +(object) Contains statistics about the time taken to execute {ccs} requests. ++ +.Properties of `took` +[%collapsible%open] +======= +`max`::: +(integer) The maximum time taken to execute a {ccs} request, in milliseconds. + +`avg`::: +(integer) The median time taken to execute a {ccs} request, in milliseconds. + +`p90`::: +(integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. +======= + +`took_mrt_true`:: +(object) Contains statistics about the time taken to execute {ccs} requests for which the +<> setting was set to `true`. ++ +.Properties of `took_mrt_true` +[%collapsible%open] +======= +`max`::: +(integer) The maximum time taken to execute a {ccs} request, in milliseconds. + +`avg`::: +(integer) The median time taken to execute a {ccs} request, in milliseconds. + +`p90`::: +(integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. +======= + +`took_mrt_false`:: +(object) Contains statistics about the time taken to execute {ccs} requests for which the +<> setting was set to `false`. ++ +.Properties of `took_mrt_false` +[%collapsible%open] +======= +`max`::: +(integer) The maximum time taken to execute a {ccs} request, in milliseconds. + +`avg`::: +(integer) The median time taken to execute a {ccs} request, in milliseconds. + +`p90`::: +(integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. +======= + +`remotes_per_search_max`:: +(integer) The maximum number of remote clusters that were queried in a single {ccs} request. + +`remotes_per_search_avg`:: +(float) The average number of remote clusters that were queried in a single {ccs} request. + +`failure_reasons`:: +(object) Contains statistics about the reasons for {ccs} request failures. +The keys are the failure reason names and the values are the number of requests that failed for that reason. + +`features`:: +(object) Contains statistics about the features used in {ccs} requests. The keys are the names of the search feature, +and the values are the number of requests that used that feature. Single request can use more than one feature +(e.g. both `async` and `wildcard`). Known features are: + +* `async` - <> + +* `mrt` - <> setting was set to `true`. + +* `wildcard` - <> for indices with wildcards was used in the search request. + +`clients`:: +(object) Contains statistics about the clients that executed {ccs} requests. +The keys are the names of the clients, and the values are the number of requests that were executed by that client. +Only known clients (such as `kibana` or `elasticsearch`) are counted. + +`clusters`:: +(object) Contains statistics about the clusters that were queried in {ccs} requests. +The keys are cluster names, and the values are per-cluster telemetry data. +This also includes the local cluster itself, which uses the name `(local)`. ++ +.Properties of per-cluster data: +[%collapsible%open] +======= +`total`::: +(integer) The total number of successful (not skipped) {ccs} requests that were executed against this cluster. +This may include requests where partial results were returned, but not requests in which the cluster has been skipped entirely. + +`skipped`::: +(integer) The total number of {ccs} requests for which this cluster was skipped. + +`took`::: +(object) Contains statistics about the time taken to execute requests against this cluster. ++ +.Properties of `took` +[%collapsible%open] +======== +`max`::: +(integer) The maximum time taken to execute a {ccs} request, in milliseconds. + +`avg`::: +(integer) The median time taken to execute a {ccs} request, in milliseconds. + +`p90`::: +(integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. +======== + +======= + +====== + +===== + [[cluster-stats-api-example]] ==== {api-examples-title} @@ -1607,7 +1743,35 @@ The API returns the following response: }, "repositories": { ... - } + }, + "ccs": { + "_search": { + "total": 7, + "success": 7, + "skipped": 0, + "took": { + "max": 36, + "avg": 20, + "p90": 33 + }, + "took_mrt_true": { + "max": 33, + "avg": 15, + "p90": 33 + }, + "took_mrt_false": { + "max": 36, + "avg": 26, + "p90": 36 + }, + "remotes_per_search_max": 3, + "remotes_per_search_avg": 2.0, + "failure_reasons": { ... }, + "features": { ... }, + "clients": { ... }, + "clusters": { ... } + } + } } -------------------------------------------------- // TESTRESPONSE[s/"plugins": \[[^\]]*\]/"plugins": $body.$_path/] @@ -1618,10 +1782,15 @@ The API returns the following response: // TESTRESPONSE[s/"packaging_types": \[[^\]]*\]/"packaging_types": $body.$_path/] // TESTRESPONSE[s/"snapshots": \{[^\}]*\}/"snapshots": $body.$_path/] // TESTRESPONSE[s/"repositories": \{[^\}]*\}/"repositories": $body.$_path/] +// TESTRESPONSE[s/"clusters": \{[^\}]*\}/"clusters": $body.$_path/] +// TESTRESPONSE[s/"features": \{[^\}]*\}/"features": $body.$_path/] +// TESTRESPONSE[s/"clients": \{[^\}]*\}/"clients": $body.$_path/] +// TESTRESPONSE[s/"failure_reasons": \{[^\}]*\}/"failure_reasons": $body.$_path/] // TESTRESPONSE[s/"field_types": \[[^\]]*\]/"field_types": $body.$_path/] // TESTRESPONSE[s/"runtime_field_types": \[[^\]]*\]/"runtime_field_types": $body.$_path/] // TESTRESPONSE[s/"search": \{[^\}]*\}/"search": $body.$_path/] -// TESTRESPONSE[s/: true|false/: $body.$_path/] +// TESTRESPONSE[s/"remotes_per_search_avg": [.0-9]+/"remotes_per_search_avg": $body.$_path/] +// TESTRESPONSE[s/: (true|false)/: $body.$_path/] // TESTRESPONSE[s/: (\-)?[0-9]+/: $body.$_path/] // TESTRESPONSE[s/: "[^"]*"/: $body.$_path/] // These replacements do a few things: diff --git a/muted-tests.yml b/muted-tests.yml index 12183e64bc53..cf4e519d78a1 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -154,9 +154,6 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/112471 - class: org.elasticsearch.ingest.geoip.IngestGeoIpClientYamlTestSuiteIT issue: https://github.com/elastic/elasticsearch/issues/111497 -- class: org.elasticsearch.smoketest.SmokeTestIngestWithAllDepsClientYamlTestSuiteIT - method: test {yaml=ingest/80_ingest_simulate/Test ingest simulate with reroute and mapping validation from templates} - issue: https://github.com/elastic/elasticsearch/issues/112575 - class: org.elasticsearch.script.mustache.LangMustacheClientYamlTestSuiteIT method: test {yaml=lang_mustache/50_multi_search_template/Multi-search template with errors} issue: https://github.com/elastic/elasticsearch/issues/112580 diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/yamlRestTest/resources/rest-api-spec/test/ingest/80_ingest_simulate.yml b/qa/smoke-test-ingest-with-all-dependencies/src/yamlRestTest/resources/rest-api-spec/test/ingest/80_ingest_simulate.yml index 1a7701991428..35ec9979c325 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/yamlRestTest/resources/rest-api-spec/test/ingest/80_ingest_simulate.yml +++ b/qa/smoke-test-ingest-with-all-dependencies/src/yamlRestTest/resources/rest-api-spec/test/ingest/80_ingest_simulate.yml @@ -217,7 +217,9 @@ setup: "Test ingest simulate with reroute and mapping validation from templates": - skip: - features: headers + features: + - headers + - allowed_warnings - requires: cluster_features: ["simulate.mapping.validation.templates"] @@ -241,6 +243,8 @@ setup: - match: { acknowledged: true } - do: + allowed_warnings: + - "index template [first-index-template] has index patterns [first-index*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [first-index-template] will take precedence during new index creation" indices.put_index_template: name: first-index-template body: @@ -255,6 +259,8 @@ setup: type: text - do: + allowed_warnings: + - "index template [second-index-template] has index patterns [second-index*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [second-index-template] will take precedence during new index creation" indices.put_index_template: name: second-index-template body: diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 858d4b8e7649..bc52649b58a3 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -207,6 +207,7 @@ static TransportVersion def(int id) { public static final TransportVersion UNASSIGNED_PRIMARY_COUNT_ON_CLUSTER_HEALTH = def(8_737_00_0); public static final TransportVersion ESQL_AGGREGATE_EXEC_TRACKS_INTERMEDIATE_ATTRS = def(8_738_00_0); + public static final TransportVersion CCS_TELEMETRY_STATS = def(8_739_00_0); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java index fe1da86dd54c..68fd4c2a1529 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java @@ -277,7 +277,7 @@ public int hashCode() { */ public void add(CCSTelemetrySnapshot stats) { // This should be called in ClusterStatsResponse ctor only, so we don't need to worry about concurrency - if (stats.totalCount == 0) { + if (stats == null || stats.totalCount == 0) { // Just ignore the empty stats. // This could happen if the node is brand new or if the stats are not available, e.g. because it runs an old version. return; @@ -315,7 +315,7 @@ public void add(CCSTelemetrySnapshot stats) { * "p90": 2570 * } */ - public static void publishLatency(XContentBuilder builder, String name, LongMetricValue took) throws IOException { + private static void publishLatency(XContentBuilder builder, String name, LongMetricValue took) throws IOException { builder.startObject(name); { builder.field("max", took.max()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetry.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetry.java index 60766bd4068e..6016378aa886 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetry.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetry.java @@ -175,7 +175,7 @@ public static class PerClusterCCSTelemetry { // The number of successful (not skipped) requests to this cluster. private final LongAdder count; private final LongAdder skippedCount; - // This is only over the successful requetss, skipped ones do not count here. + // This is only over the successful requests, skipped ones do not count here. private final LongMetric took; PerClusterCCSTelemetry(String clusterAlias) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java index b48295dc8b3e..732eb2ec2dcc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java @@ -30,6 +30,7 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse { private final ClusterHealthStatus clusterStatus; private final SearchUsageStats searchUsageStats; private final RepositoryUsageStats repositoryUsageStats; + private final CCSTelemetrySnapshot ccsMetrics; public ClusterStatsNodeResponse(StreamInput in) throws IOException { super(in); @@ -47,6 +48,11 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException { } else { repositoryUsageStats = RepositoryUsageStats.EMPTY; } + if (in.getTransportVersion().onOrAfter(TransportVersions.CCS_TELEMETRY_STATS)) { + ccsMetrics = new CCSTelemetrySnapshot(in); + } else { + ccsMetrics = new CCSTelemetrySnapshot(); + } } public ClusterStatsNodeResponse( @@ -56,7 +62,8 @@ public ClusterStatsNodeResponse( NodeStats nodeStats, ShardStats[] shardsStats, SearchUsageStats searchUsageStats, - RepositoryUsageStats repositoryUsageStats + RepositoryUsageStats repositoryUsageStats, + CCSTelemetrySnapshot ccsTelemetrySnapshot ) { super(node); this.nodeInfo = nodeInfo; @@ -65,6 +72,7 @@ public ClusterStatsNodeResponse( this.clusterStatus = clusterStatus; this.searchUsageStats = Objects.requireNonNull(searchUsageStats); this.repositoryUsageStats = Objects.requireNonNull(repositoryUsageStats); + this.ccsMetrics = ccsTelemetrySnapshot; } public NodeInfo nodeInfo() { @@ -95,6 +103,10 @@ public RepositoryUsageStats repositoryUsageStats() { return repositoryUsageStats; } + public CCSTelemetrySnapshot getCcsMetrics() { + return ccsMetrics; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -108,5 +120,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.REPOSITORIES_TELEMETRY)) { repositoryUsageStats.writeTo(out); } // else just drop these stats, ok for bwc + if (out.getTransportVersion().onOrAfter(TransportVersions.CCS_TELEMETRY_STATS)) { + ccsMetrics.writeTo(out); + } } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java index b6dd40e8c8b7..267db92496f7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Locale; +import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG; + public class ClusterStatsResponse extends BaseNodesResponse implements ToXContentFragment { final ClusterStatsNodes nodesStats; @@ -31,6 +33,8 @@ public class ClusterStatsResponse extends BaseNodesResponse ccsMetrics.add(node.getCcsMetrics())); this.status = status; this.clusterSnapshotStats = clusterSnapshotStats; @@ -90,6 +96,10 @@ public ClusterStatsIndices getIndicesStats() { return indicesStats; } + public CCSTelemetrySnapshot getCcsMetrics() { + return ccsMetrics; + } + @Override public void writeTo(StreamOutput out) throws IOException { TransportAction.localOnly(); @@ -125,6 +135,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("repositories"); repositoryUsageStats.toXContent(builder, params); + if (CCS_TELEMETRY_FEATURE_FLAG.isEnabled()) { + builder.startObject("ccs"); + ccsMetrics.toXContent(builder, params); + builder.endObject(); + } + return builder; } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 1912de3cfa4d..66cf627ce066 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -81,6 +81,7 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final IndicesService indicesService; private final RepositoriesService repositoriesService; private final SearchUsageHolder searchUsageHolder; + private final CCSUsageTelemetry ccsUsageHolder; private final MetadataStatsCache mappingStatsCache; private final MetadataStatsCache analysisStatsCache; @@ -108,6 +109,7 @@ public TransportClusterStatsAction( this.indicesService = indicesService; this.repositoriesService = repositoriesService; this.searchUsageHolder = usageService.getSearchUsageHolder(); + this.ccsUsageHolder = usageService.getCcsUsageHolder(); this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of); this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); } @@ -249,6 +251,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq final SearchUsageStats searchUsageStats = searchUsageHolder.getSearchUsageStats(); final RepositoryUsageStats repositoryUsageStats = repositoriesService.getUsageStats(); + final CCSTelemetrySnapshot ccsTelemetry = ccsUsageHolder.getCCSTelemetrySnapshot(); return new ClusterStatsNodeResponse( nodeInfo.getNode(), @@ -257,7 +260,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq nodeStats, shardsStats.toArray(new ShardStats[shardsStats.size()]), searchUsageStats, - repositoryUsageStats + repositoryUsageStats, + ccsTelemetry ); } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 23ff692da488..30faae9c1a5f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -127,7 +127,7 @@ public class TransportSearchAction extends HandledTransportAction SHARD_COUNT_LIMIT_SETTING = Setting.longSetting( diff --git a/server/src/main/java/org/elasticsearch/common/time/DateFormatters.java b/server/src/main/java/org/elasticsearch/common/time/DateFormatters.java index ae8f8cb28da1..c1081f3d6285 100644 --- a/server/src/main/java/org/elasticsearch/common/time/DateFormatters.java +++ b/server/src/main/java/org/elasticsearch/common/time/DateFormatters.java @@ -2323,6 +2323,8 @@ static DateFormatter forPattern(String input) { } else if (FormatNames.STRICT_YEAR_MONTH_DAY.matches(input)) { return STRICT_YEAR_MONTH_DAY; } else { + DateUtils.checkTextualDateFormats(input); + try { return newDateFormatter( input, diff --git a/server/src/main/java/org/elasticsearch/common/time/DateUtils.java b/server/src/main/java/org/elasticsearch/common/time/DateUtils.java index 8e98adc18336..e312ce78ea15 100644 --- a/server/src/main/java/org/elasticsearch/common/time/DateUtils.java +++ b/server/src/main/java/org/elasticsearch/common/time/DateUtils.java @@ -10,6 +10,9 @@ import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.core.Predicates; +import org.elasticsearch.core.UpdateForV9; +import org.elasticsearch.logging.LogManager; import java.time.Clock; import java.time.Duration; @@ -19,6 +22,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.function.Predicate; +import java.util.regex.Pattern; import static java.util.Map.entry; import static org.elasticsearch.common.time.DateUtilsRounding.getMonthOfYear; @@ -382,4 +387,16 @@ public static ZonedDateTime nowWithMillisResolution(Clock clock) { Clock millisResolutionClock = Clock.tick(clock, Duration.ofMillis(1)); return ZonedDateTime.now(millisResolutionClock); } + + // check for all textual fields, and localized zone offset + private static final Predicate CONTAINS_CHANGING_TEXT_SPECIFIERS = System.getProperty("java.locale.providers", "") + .contains("COMPAT") ? Pattern.compile("[EcGaO]|MMM|LLL|eee|ccc|QQQ|ZZZZ").asPredicate() : Predicates.never(); + + @UpdateForV9 // this can be removed, we will only use CLDR on v9 + static void checkTextualDateFormats(String format) { + if (CONTAINS_CHANGING_TEXT_SPECIFIERS.test(format)) { + LogManager.getLogger(DateFormatter.class) + .warn("Date format [{}] contains textual field specifiers that could change in JDK 23", format); + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 22bab1742589..43b0c27d3058 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -43,6 +43,9 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { private final SoftDeletesPolicy softDeletesPolicy; private final LongSupplier globalCheckpointSupplier; private final Map acquiredIndexCommits; // Number of references held against each commit point. + // Index commits internally acquired by the commits listener. We want to track them separately to be able to disregard them + // when checking for externally acquired index commits that haven't been released + private final Set internallyAcquiredIndexCommits; interface CommitsListener { @@ -72,6 +75,7 @@ interface CommitsListener { this.globalCheckpointSupplier = globalCheckpointSupplier; this.commitsListener = commitsListener; this.acquiredIndexCommits = new HashMap<>(); + this.internallyAcquiredIndexCommits = new HashSet<>(); } @Override @@ -114,7 +118,7 @@ public void onCommit(List commits) throws IOException { this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); } if (commitsListener != null && previousLastCommit != this.lastCommit) { - newCommit = acquireIndexCommit(false); + newCommit = acquireIndexCommit(false, true); } else { newCommit = null; } @@ -210,15 +214,25 @@ SafeCommitInfo getSafeCommitInfo() { * @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point. */ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { + return acquireIndexCommit(acquiringSafeCommit, false); + } + + private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredInternally) { assert safeCommit != null : "Safe commit is not initialized yet"; assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount - return wrapCommit(snapshotting); + assert acquiredInternally == false || internallyAcquiredIndexCommits.add(snapshotting) + : "commit [" + snapshotting + "] already added"; + return wrapCommit(snapshotting, acquiredInternally); } protected IndexCommit wrapCommit(IndexCommit indexCommit) { - return new SnapshotIndexCommit(indexCommit); + return wrapCommit(indexCommit, false); + } + + protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredInternally) { + return new SnapshotIndexCommit(indexCommit, acquiredInternally); } /** @@ -227,7 +241,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit) { * @return true if the acquired commit can be clean up. */ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { - final IndexCommit releasingCommit = ((SnapshotIndexCommit) acquiredCommit).getIndexCommit(); + final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit; + final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit(); assert acquiredIndexCommits.containsKey(releasingCommit) : "Release non-acquired commit;" + "acquired commits [" @@ -242,6 +257,8 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { } return count - 1; }); + assert snapshotIndexCommit.acquiredInternally == false || internallyAcquiredIndexCommits.remove(releasingCommit) + : "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally"; assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]"; // The commit can be clean up only if no refCount and it is neither the safe commit nor last commit. @@ -296,10 +313,16 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit } /** - * Checks whether the deletion policy is holding on to acquired index commits + * Checks whether the deletion policy is holding on to externally acquired index commits */ - synchronized boolean hasAcquiredIndexCommits() { - return acquiredIndexCommits.isEmpty() == false; + synchronized boolean hasAcquiredIndexCommitsForTesting() { + // We explicitly check only external commits and disregard internal commits acquired by the commits listener + for (var e : acquiredIndexCommits.entrySet()) { + if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) { + return true; + } + } + return false; } /** @@ -320,8 +343,12 @@ public static String commitDescription(IndexCommit commit) throws IOException { * A wrapper of an index commit that prevents it from being deleted. */ private static class SnapshotIndexCommit extends FilterIndexCommit { - SnapshotIndexCommit(IndexCommit delegate) { + + private final boolean acquiredInternally; + + SnapshotIndexCommit(IndexCommit delegate, boolean acquiredInternally) { super(delegate); + this.acquiredInternally = acquiredInternally; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 9743ee977a8c..7c456f55ac8a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -669,8 +669,8 @@ Translog getTranslog() { } // Package private for testing purposes only - boolean hasAcquiredIndexCommits() { - return combinedDeletionPolicy.hasAcquiredIndexCommits(); + boolean hasAcquiredIndexCommitsForTesting() { + return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting(); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java index 4bf5bfc26988..4ff99d17195a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java @@ -128,7 +128,8 @@ public void testCreation() { null, new ShardStats[] { shardStats }, new SearchUsageStats(), - RepositoryUsageStats.EMPTY + RepositoryUsageStats.EMPTY, + null ); stats = VersionStats.of(metadata, Collections.singletonList(nodeResponse)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 8412e9e25088..5387108592b1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1440,10 +1440,10 @@ public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throw assertBusy(() -> assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(seqNo))); } - public static boolean hasAcquiredIndexCommits(Engine engine) { + public static boolean hasAcquiredIndexCommitsForTesting(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; - return internalEngine.hasAcquiredIndexCommits(); + return internalEngine.hasAcquiredIndexCommitsForTesting(); } public static final class PrimaryTermSupplier implements LongSupplier { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 77762544c471..823f5084f5d5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1369,7 +1369,7 @@ private void assertNoAcquiredIndexCommit() throws Exception { if (engine instanceof InternalEngine) { assertFalse( indexShard.routingEntry().toString() + " has unreleased snapshotted index commits", - EngineTestCase.hasAcquiredIndexCommits(engine) + EngineTestCase.hasAcquiredIndexCommitsForTesting(engine) ); } } catch (AlreadyClosedException ignored) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 99344f22bae3..f577ccd4e5a4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -215,9 +215,9 @@ public void testGetSessionDoesNotLeakFileIfClosed() throws IOException { sessionReader.readFileBytes(files.get(1).name(), MockBigArrays.NON_RECYCLING_INSTANCE.newByteArray(10, false)); } - assertTrue(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); + assertTrue(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard))); restoreSourceService.closeSession(sessionUUID); - assertFalse(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); + assertFalse(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard))); closeShards(indexShard); // Exception will be thrown if file is not closed. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/ClusterStateLicenseService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/ClusterStateLicenseService.java index f5123b9352fe..a38170d87f9a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/ClusterStateLicenseService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/ClusterStateLicenseService.java @@ -140,7 +140,7 @@ CharSequence buildExpirationMessage(long expirationMillis, boolean expired) { License [{}] on [{}]. # If you have a new license, please update it. Otherwise, please reach out to # your support contact. - #\s""", expiredMsg, LicenseUtils.DATE_FORMATTER.formatMillis(expirationMillis)); + #\s""", expiredMsg, LicenseUtils.formatMillis(expirationMillis)); if (expired) { general = general.toUpperCase(Locale.ROOT); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java index b27c1bb9d449..7e67ee892043 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java @@ -8,7 +8,6 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.common.hash.MessageDigests; -import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.license.License.LicenseType; import org.elasticsearch.license.internal.XPackLicenseStatus; import org.elasticsearch.protocol.xpack.license.LicenseStatus; @@ -16,6 +15,9 @@ import java.nio.charset.StandardCharsets; import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Locale; import java.util.Map; @@ -25,7 +27,13 @@ public class LicenseUtils { public static final String EXPIRED_FEATURE_METADATA = "es.license.expired.feature"; - public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("EEEE, MMMM dd, yyyy").withLocale(Locale.ENGLISH); + + public static String formatMillis(long millis) { + // DateFormatters logs a warning about the pattern on COMPAT + // this will be confusing to users, so call DateTimeFormatter directly instead + return DateTimeFormatter.ofPattern("EEEE, MMMM dd, yyyy", Locale.ENGLISH) + .format(Instant.ofEpochMilli(millis).atOffset(ZoneOffset.UTC)); + } /** * Exception to be thrown when a feature action requires a valid license, but license @@ -155,7 +163,7 @@ public static String getExpiryWarning(long licenseExpiryDate, long currentTime) ? "expires today" : (diff > 0 ? String.format(Locale.ROOT, "will expire in [%d] days", days) - : String.format(Locale.ROOT, "expired on [%s]", LicenseUtils.DATE_FORMATTER.formatMillis(licenseExpiryDate))); + : String.format(Locale.ROOT, "expired on [%s]", formatMillis(licenseExpiryDate))); return "Your license " + expiryMessage + ". " diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java index 4a695f7c51e4..279fec8cc99a 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java @@ -754,7 +754,38 @@ public void testToXContent() throws IOException { }, "repositories": {} }, - "repositories": {} + "repositories": {}, + "ccs": { + "_search": { + "total": 0, + "success": 0, + "skipped": 0, + "took": { + "max": 0, + "avg": 0, + "p90": 0 + }, + "took_mrt_true": { + "max": 0, + "avg": 0, + "p90": 0 + }, + "took_mrt_false": { + "max": 0, + "avg": 0, + "p90": 0 + }, + "remotes_per_search_max": 0, + "remotes_per_search_avg": 0.0, + "failure_reasons": { + }, + "features": { + }, + "clients": { + }, + "clusters": {} + } + } }, "cluster_state": { "nodes_hash": 1314980060,