Skip to content

Commit

Permalink
[ML] Exclude quantiles when fetching model snapshots where possible
Browse files Browse the repository at this point in the history
As a followup to elastic#70376 this change further reduces the number of
places where we fetch the `quantiles` field of model snapshot
documents.

The quantiles can be very large and can cause out-of-memory errors
on small nodes, especially if more than one document containing
quantiles is loaded into memory at one time. The method
`JobManager.validateModelSnapshotIdUpdate` was a place where
two model snapshot documents were being loaded simultaneously,
both with their quantiles unnecessarily included. Following this
change there should be no risk of that method causing an
out-of-memory exception.
  • Loading branch information
droberts195 committed Dec 18, 2023
1 parent 31145f7 commit 26355eb
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private static void getModelSnapshot(
return;
}

provider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), modelSnapshot -> {
provider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), true, modelSnapshot -> {
if (modelSnapshot == null) {
throw missingSnapshotException(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ protected void doExecute(
ActionListener<UpdateModelSnapshotAction.Response> listener
) {
logger.debug("Received request to update model snapshot [{}] for job [{}]", request.getSnapshotId(), request.getJobId());
jobResultsProvider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), modelSnapshot -> {
// Even though the quantiles can be large we have to fetch them initially so that the updated document is complete
jobResultsProvider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), true, modelSnapshot -> {
if (modelSnapshot == null) {
listener.onFailure(
new ResourceNotFoundException(
Expand All @@ -81,8 +82,7 @@ protected void doExecute(
} else {
Result<ModelSnapshot> updatedSnapshot = applyUpdate(request, modelSnapshot);
indexModelSnapshot(updatedSnapshot, b -> {
// The quantiles can be large, and totally dominate the output -
// it's clearer to remove them
// The quantiles can be large, and totally dominate the output - it's clearer to remove them at this stage
listener.onResponse(
new UpdateModelSnapshotAction.Response(new ModelSnapshot.Builder(updatedSnapshot.result).setQuantiles(null).build())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
jobResultsProvider.getModelSnapshot(
request.getJobId(),
request.getSnapshotId(),
false,
getSnapshotHandler::onResponse,
getSnapshotHandler::onFailure
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,24 +478,27 @@ private void validate(Job job, JobUpdate jobUpdate, ActionListener<Void> handler

private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, VoidChainTaskExecutor voidChainTaskExecutor) {
if (modelSnapshotId != null && ModelSnapshot.isTheEmptySnapshot(modelSnapshotId) == false) {
voidChainTaskExecutor.add(listener -> jobResultsProvider.getModelSnapshot(job.getId(), modelSnapshotId, newModelSnapshot -> {
if (newModelSnapshot == null) {
String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, modelSnapshotId, job.getId());
listener.onFailure(new ResourceNotFoundException(message));
return;
}
jobResultsProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> {
if (oldModelSnapshot != null && newModelSnapshot.result.getTimestamp().before(oldModelSnapshot.result.getTimestamp())) {
String message = "Job ["
+ job.getId()
+ "] has a more recent model snapshot ["
+ oldModelSnapshot.result.getSnapshotId()
+ "]";
listener.onFailure(new IllegalArgumentException(message));
voidChainTaskExecutor.add(
listener -> jobResultsProvider.getModelSnapshot(job.getId(), modelSnapshotId, false, newModelSnapshot -> {
if (newModelSnapshot == null) {
String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, modelSnapshotId, job.getId());
listener.onFailure(new ResourceNotFoundException(message));
return;
}
listener.onResponse(null);
}, listener::onFailure);
}, listener::onFailure));
jobResultsProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), false, oldModelSnapshot -> {
if (oldModelSnapshot != null
&& newModelSnapshot.result.getTimestamp().before(oldModelSnapshot.result.getTimestamp())) {
String message = "Job ["
+ job.getId()
+ "] has a more recent model snapshot ["
+ oldModelSnapshot.result.getSnapshotId()
+ "]";
listener.onFailure(new IllegalArgumentException(message));
}
listener.onResponse(null);
}, listener::onFailure);
}, listener::onFailure)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,11 +1257,13 @@ public BatchedResultsIterator<Influencer> newBatchedInfluencersIterator(String j
}

/**
* Get a job's model snapshot by its id
* Get a job's model snapshot by its id.
* Quantiles should only be included when strictly required, because they can be very large and consume a lot of heap.
*/
public void getModelSnapshot(
String jobId,
@Nullable String modelSnapshotId,
boolean includeQuantiles,
Consumer<Result<ModelSnapshot>> handler,
Consumer<Exception> errorHandler
) {
Expand All @@ -1271,6 +1273,9 @@ public void getModelSnapshot(
}
String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
SearchRequestBuilder search = createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, modelSnapshotId));
if (includeQuantiles == false) {
search.setFetchSource(null, ModelSnapshot.QUANTILES.getPreferredName());
}
searchSingleResult(
jobId,
ModelSnapshot.TYPE.getPreferredName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,6 @@ private void deleteSnapshotAndFailTask(AllocatedPersistentTask task, String jobI
);

});
jobResultsProvider.getModelSnapshot(jobId, snapshotId, modelSnapshotListener::onResponse, modelSnapshotListener::onFailure);
jobResultsProvider.getModelSnapshot(jobId, snapshotId, false, modelSnapshotListener::onResponse, modelSnapshotListener::onFailure);
}
}

0 comments on commit 26355eb

Please sign in to comment.