From 26355eb569a003adcb61e0b8ea4a62796d69073e Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 18 Dec 2023 17:59:14 +0000 Subject: [PATCH] [ML] Exclude quantiles when fetching model snapshots where possible As a followup to #70376 this change further reduces the number of places where we fetch the `quantiles` field of model snapshot documents. The quantiles can be very large and can cause out-of-memory errors on small nodes, especially if more than one document containing quantiles is loaded into memory at one time. The method `JobManager.validateModelSnapshotIdUpdate` was a place where two model snapshot documents were being loaded simultaneously, both with their quantiles unnecessarily included. Following this change there should be no risk of that method causing an out-of-memory exception. --- .../TransportRevertModelSnapshotAction.java | 2 +- .../TransportUpdateModelSnapshotAction.java | 6 +-- ...ransportUpgradeJobModelSnapshotAction.java | 1 + .../xpack/ml/job/JobManager.java | 37 ++++++++++--------- .../job/persistence/JobResultsProvider.java | 7 +++- .../upgrader/SnapshotUpgradeTaskExecutor.java | 2 +- 6 files changed, 32 insertions(+), 23 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index 5450b2752ab97..c01c1f46b3d13 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -272,7 +272,7 @@ private static void getModelSnapshot( return; } - provider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), modelSnapshot -> { + provider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), true, modelSnapshot -> { if (modelSnapshot == null) { throw missingSnapshotException(request); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateModelSnapshotAction.java index f0872bccc8378..097be745996ab 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateModelSnapshotAction.java @@ -71,7 +71,8 @@ protected void doExecute( ActionListener listener ) { logger.debug("Received request to update model snapshot [{}] for job [{}]", request.getSnapshotId(), request.getJobId()); - jobResultsProvider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), modelSnapshot -> { + // Even though the quantiles can be large we have to fetch them initially so that the updated document is complete + jobResultsProvider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), true, modelSnapshot -> { if (modelSnapshot == null) { listener.onFailure( new ResourceNotFoundException( @@ -81,8 +82,7 @@ protected void doExecute( } else { Result updatedSnapshot = applyUpdate(request, modelSnapshot); indexModelSnapshot(updatedSnapshot, b -> { - // The quantiles can be large, and totally dominate the output - - // it's clearer to remove them + // The quantiles can be large, and totally dominate the output - it's clearer to remove them at this stage listener.onResponse( new UpdateModelSnapshotAction.Response(new ModelSnapshot.Builder(updatedSnapshot.result).setQuantiles(null).build()) ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpgradeJobModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpgradeJobModelSnapshotAction.java index 3f6193c124a9a..15c1d53f7bdf8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpgradeJobModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpgradeJobModelSnapshotAction.java @@ -223,6 +223,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A jobResultsProvider.getModelSnapshot( request.getJobId(), request.getSnapshotId(), + false, getSnapshotHandler::onResponse, getSnapshotHandler::onFailure ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 035f4864ebace..7532ae4317830 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -478,24 +478,27 @@ private void validate(Job job, JobUpdate jobUpdate, ActionListener handler private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, VoidChainTaskExecutor voidChainTaskExecutor) { if (modelSnapshotId != null && ModelSnapshot.isTheEmptySnapshot(modelSnapshotId) == false) { - voidChainTaskExecutor.add(listener -> jobResultsProvider.getModelSnapshot(job.getId(), modelSnapshotId, newModelSnapshot -> { - if (newModelSnapshot == null) { - String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, modelSnapshotId, job.getId()); - listener.onFailure(new ResourceNotFoundException(message)); - return; - } - jobResultsProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> { - if (oldModelSnapshot != null && newModelSnapshot.result.getTimestamp().before(oldModelSnapshot.result.getTimestamp())) { - String message = "Job [" - + job.getId() - + "] has a more recent model snapshot [" - + oldModelSnapshot.result.getSnapshotId() - + "]"; - listener.onFailure(new IllegalArgumentException(message)); + voidChainTaskExecutor.add( + listener -> jobResultsProvider.getModelSnapshot(job.getId(), modelSnapshotId, false, newModelSnapshot -> { + if (newModelSnapshot == null) { + String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, modelSnapshotId, job.getId()); + listener.onFailure(new ResourceNotFoundException(message)); + return; } - listener.onResponse(null); - }, listener::onFailure); - }, listener::onFailure)); + jobResultsProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), false, oldModelSnapshot -> { + if (oldModelSnapshot != null + && newModelSnapshot.result.getTimestamp().before(oldModelSnapshot.result.getTimestamp())) { + String message = "Job [" + + job.getId() + + "] has a more recent model snapshot [" + + oldModelSnapshot.result.getSnapshotId() + + "]"; + listener.onFailure(new IllegalArgumentException(message)); + } + listener.onResponse(null); + }, listener::onFailure); + }, listener::onFailure) + ); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java index 7b41f3e055874..b661f6294d89a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java @@ -1257,11 +1257,13 @@ public BatchedResultsIterator newBatchedInfluencersIterator(String j } /** - * Get a job's model snapshot by its id + * Get a job's model snapshot by its id. + * Quantiles should only be included when strictly required, because they can be very large and consume a lot of heap. */ public void getModelSnapshot( String jobId, @Nullable String modelSnapshotId, + boolean includeQuantiles, Consumer> handler, Consumer errorHandler ) { @@ -1271,6 +1273,9 @@ public void getModelSnapshot( } String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); SearchRequestBuilder search = createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, modelSnapshotId)); + if (includeQuantiles == false) { + search.setFetchSource(null, ModelSnapshot.QUANTILES.getPreferredName()); + } searchSingleResult( jobId, ModelSnapshot.TYPE.getPreferredName(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java index 69b926876302a..cc3f8f0dd1e67 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java @@ -329,6 +329,6 @@ private void deleteSnapshotAndFailTask(AllocatedPersistentTask task, String jobI ); }); - jobResultsProvider.getModelSnapshot(jobId, snapshotId, modelSnapshotListener::onResponse, modelSnapshotListener::onFailure); + jobResultsProvider.getModelSnapshot(jobId, snapshotId, false, modelSnapshotListener::onResponse, modelSnapshotListener::onFailure); } }