diff --git a/docs/changelog/98915.yaml b/docs/changelog/98915.yaml new file mode 100644 index 0000000000000..c23ddcc55d98e --- /dev/null +++ b/docs/changelog/98915.yaml @@ -0,0 +1,5 @@ +pr: 98915 +summary: Avoid risk of OOM in datafeeds when memory is constrained +area: Machine Learning +type: bug +issues: [89769] diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index d70d34126fe27..e7aba2211b2df 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.core.TimeValue; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -30,8 +31,6 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; import org.elasticsearch.xpack.ml.extractor.ExtractedField; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.NoSuchElementException; @@ -169,18 +168,6 @@ private SearchRequestBuilder buildSearchRequest(long start) { return searchRequestBuilder; } - /** - * Utility class to convert ByteArrayOutputStream to ByteArrayInputStream without copying the underlying buffer. - */ - private static class ConvertableByteArrayOutputStream extends ByteArrayOutputStream { - public ByteArrayInputStream resetThisAndGetByteArrayInputStream() { - ByteArrayInputStream inputStream = new ByteArrayInputStream(buf, 0, count); - buf = new byte[0]; - count = 0; - return inputStream; - } - } - /** * IMPORTANT: This is not an idempotent method. This method changes the input array by setting each element to null. */ @@ -192,7 +179,7 @@ private InputStream processAndConsumeSearchHits(SearchHit hits[]) throws IOExcep return null; } - ConvertableByteArrayOutputStream outputStream = new ConvertableByteArrayOutputStream(); + BytesStreamOutput outputStream = new BytesStreamOutput(); SearchHit lastHit = hits[hits.length - 1]; lastTimestamp = context.extractedFields.timeFieldValue(lastHit); @@ -217,7 +204,7 @@ private InputStream processAndConsumeSearchHits(SearchHit hits[]) throws IOExcep hits[i] = null; } } - return outputStream.resetThisAndGetByteArrayInputStream(); + return outputStream.bytes().streamInput(); } private InputStream continueScroll() throws IOException {