Skip to content

Commit

Permalink
[ML] Use more efficient buffering strategy for job input created by d…
Browse files Browse the repository at this point in the history
…atafeeds (elastic#98915)

Java's own `ByteArrayOutputStream` doubles the size of its
buffer when it needs to grow. This can be inefficient and lead
to risk of OOMs with small JVM heaps. For example, suppose
we need to build `autodetect` input 16.1MB in size. When the
input grows past 16MB the buffer size will increase to 32MB,
with a temporary requirement for 48MB at the point where
both buffers exist.

This change switches to use Elasticsearch's `BytesStreamOutput`
class, which manages memory more efficiently.
  • Loading branch information
i-plusplus authored Sep 8, 2023
1 parent 88e2508 commit 34711e7
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 16 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/98915.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98915
summary: Avoid risk of OOM in datafeeds when memory is constrained
area: Machine Learning
type: bug
issues: [89769]
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand All @@ -30,8 +31,6 @@
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.extractor.ExtractedField;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -169,18 +168,6 @@ private SearchRequestBuilder buildSearchRequest(long start) {
return searchRequestBuilder;
}

/**
* Utility class to convert ByteArrayOutputStream to ByteArrayInputStream without copying the underlying buffer.
*/
private static class ConvertableByteArrayOutputStream extends ByteArrayOutputStream {
public ByteArrayInputStream resetThisAndGetByteArrayInputStream() {
ByteArrayInputStream inputStream = new ByteArrayInputStream(buf, 0, count);
buf = new byte[0];
count = 0;
return inputStream;
}
}

/**
* IMPORTANT: This is not an idempotent method. This method changes the input array by setting each element to <code>null</code>.
*/
Expand All @@ -192,7 +179,7 @@ private InputStream processAndConsumeSearchHits(SearchHit hits[]) throws IOExcep
return null;
}

ConvertableByteArrayOutputStream outputStream = new ConvertableByteArrayOutputStream();
BytesStreamOutput outputStream = new BytesStreamOutput();

SearchHit lastHit = hits[hits.length - 1];
lastTimestamp = context.extractedFields.timeFieldValue(lastHit);
Expand All @@ -217,7 +204,7 @@ private InputStream processAndConsumeSearchHits(SearchHit hits[]) throws IOExcep
hits[i] = null;
}
}
return outputStream.resetThisAndGetByteArrayInputStream();
return outputStream.bytes().streamInput();
}

private InputStream continueScroll() throws IOException {
Expand Down

0 comments on commit 34711e7

Please sign in to comment.