diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java index 51adc5eb8b..9f2ff56210 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java @@ -17,8 +17,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterators; -import com.google.common.collect.PeekingIterator; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.entity.ByteArrayEntity; @@ -50,10 +48,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -61,6 +57,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -426,28 +423,41 @@ class RequestBytes { @VisibleForTesting int getSerializedSize() { int serializedSize = this.requestBytes.length; - serializedSize+= 1; //For follow-up NEW_LINE_BYTES + serializedSize += 1; //For follow-up NEW_LINE_BYTES if (this.requestSource != null) { serializedSize += this.requestSource.length; - serializedSize+= 1; //For follow-up NEW_LINE_BYTES + serializedSize += 1; //For follow-up NEW_LINE_BYTES } return serializedSize; } - private void writeTo(OutputStream outputStream) throws IOException { - outputStream.write(this.requestBytes); - outputStream.write(NEW_LINE_BYTES); + private int writeTo(byte[] target, int initialOffset) { + int offset = initialOffset; + System.arraycopy(this.requestBytes, 0, target, offset, this.requestBytes.length); + offset += this.requestBytes.length; + System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length); + offset += NEW_LINE_BYTES.length; if (this.requestSource != null) { - outputStream.write(requestSource); - outputStream.write(NEW_LINE_BYTES); + System.arraycopy(this.requestSource, 0, target, offset, this.requestSource.length); + offset += this.requestSource.length; + System.arraycopy(NEW_LINE_BYTES, 0, target, offset, NEW_LINE_BYTES.length); + offset += NEW_LINE_BYTES.length; } + return offset; } } - private Pair buildBulkRequestInput(List requests, String ingestPipeline) throws IOException { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + private Pair buildBulkRequestInput(List requests, String ingestPipeline) { + int totalBytes = requests.stream().mapToInt(RequestBytes::getSerializedSize).sum(); + //By making a singular array we copy into avoids any dynamically expanded growth of the array that may overshoot + //how much memory we actually need, additionally it also avoids a final copy at the end normally done by + //ByteArrayOutputStream's toByteArray() + byte[] bytes = new byte[totalBytes]; + int offset = 0; for (final RequestBytes request : requests) { - request.writeTo(outputStream); + //We can't remove the element from the collection like we do elsewhere, because we need to retain the + //serialized form in case of an error so the error can be paired to the originating request based on index + offset = request.writeTo(bytes, offset); } final StringBuilder bulkRequestQueryParameters = new StringBuilder(); @@ -458,7 +468,7 @@ private Pair buildBulkRequestInput(List requests, APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh); } final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters; - return Pair.with(bulkRequestPath, outputStream.toByteArray()); + return Pair.with(bulkRequestPath, bytes); } private List> pairErrorsWithSubmittedMutation( @@ -490,14 +500,20 @@ class BulkRequestChunker implements Iterator> { //There is no "correct" number of actions to perform in a single bulk request. Experiment with different // settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum // size of a HTTP request to 100mb by default - private final PeekingIterator requestIterator; + private final ListIterator requestIterator; private final int[] exceptionallyLargeRequests; + private RequestBytes peeked; @VisibleForTesting BulkRequestChunker(List requests) throws JsonProcessingException { List serializedRequests = new ArrayList<>(requests.size()); List requestSizesThatWereTooLarge = new ArrayList<>(); - for (ElasticSearchMutation request : requests) { + ListIterator requestsIter = requests.listIterator(); + while (requestsIter.hasNext()) { + ElasticSearchMutation request = requestsIter.next(); + //Remove the element from the collection so the collection's reference to it doesn't hold it from being + //GC'ed after it has been converted to its serialized form + requestsIter.set(null); RequestBytes requestBytes = new RequestBytes(request); int requestSerializedSize = requestBytes.getSerializedSize(); if (requestSerializedSize <= bulkChunkSerializedLimitBytes) { @@ -507,7 +523,7 @@ class BulkRequestChunker implements Iterator> { requestSizesThatWereTooLarge.add(requestSerializedSize); } } - this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator()); + this.requestIterator = serializedRequests.listIterator(); //Condense request sizes that are too large into an int array to remove Boxed & List memory overhead this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.isEmpty() ? null : requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray(); @@ -517,20 +533,31 @@ class BulkRequestChunker implements Iterator> { public boolean hasNext() { //Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted //This allows next() to throw after all well sized requests have been chunked for submission - return requestIterator.hasNext() || exceptionallyLargeRequests != null; + return peeked != null || requestIterator.hasNext() || exceptionallyLargeRequests != null; } @Override public List next() { List serializedRequests = new ArrayList<>(); int chunkSerializedTotal = 0; - while (requestIterator.hasNext()) { - RequestBytes peeked = requestIterator.peek(); + //If we peeked at something but stopped on it, then add it to this list + if (peeked != null) { chunkSerializedTotal += peeked.getSerializedSize(); + serializedRequests.add(peeked); + peeked = null; + } + + while (requestIterator.hasNext()) { + RequestBytes next = requestIterator.next(); + //Remove the element from the collection, so the iterator doesn't prevent it from being GC'ed + //due to the reference to it in the collection + requestIterator.set(null); + chunkSerializedTotal += next.getSerializedSize(); if (chunkSerializedTotal <= bulkChunkSerializedLimitBytes) { - serializedRequests.add(requestIterator.next()); + serializedRequests.add(next); } else { //Adding this element would exceed the limit, so return the chunk + this.peeked = next; return serializedRequests; } } diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java index dae07870d5..e70477d83e 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java @@ -35,7 +35,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -152,10 +154,11 @@ public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException { //This payload is too large to send given the set limit, since it is a single item we can't split it IntStream.range(0, bulkLimit * 10).forEach(value -> payloadBuilder.append("a")); Assertions.assertThrows(IllegalArgumentException.class, () -> restClientUnderTest.bulkRequest( - Collections.singletonList( - ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id", - Collections.singletonMap("someKey", payloadBuilder.toString())) - ), null), "Should have thrown due to bulk request item being too large"); + Stream.of( + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id", + Collections.singletonMap("someKey", payloadBuilder.toString()))) + .collect(Collectors.toList()), + null), "Should have thrown due to bulk request item being too large"); } } } diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java index 9c9e101977..3acb743a3b 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientRetryTest.java @@ -143,8 +143,10 @@ public void testRetryOnConfiguredErrorStatus() throws IOException { when(restClientMock.performRequest(any())) .thenThrow(responseException) .thenThrow(expectedFinalException); - restClientUnderTest.bulkRequest(Collections.singletonList( - ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + restClientUnderTest.bulkRequest( + Stream.of( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")) + .collect(Collectors.toList()), null); Assertions.fail("Should have thrown the expected exception after retry"); } catch (Exception actualException) { @@ -173,8 +175,10 @@ public void testRetriesExhaustedReturnsLastRetryException() throws IOException { .thenThrow(responseException); - restClientUnderTest.bulkRequest(Collections.singletonList( - ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + restClientUnderTest.bulkRequest( + Stream.of( + ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")) + .collect(Collectors.toList()), null); Assertions.fail("Should have thrown the expected exception after retry"); } catch (Exception e) { @@ -194,8 +198,9 @@ public void testNonRetryErrorCodeException() throws IOException { //prime the restClientMock again after it's reset after creation when(restClientMock.performRequest(any())) .thenThrow(responseException); - restClientUnderTest.bulkRequest(Collections.singletonList( - ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")), + restClientUnderTest.bulkRequest( + Stream.of(ElasticSearchMutation.createDeleteRequest("some_index", "some_type", "some_doc_id")). + collect(Collectors.toList()), null); Assertions.fail("Should have thrown the expected exception"); } catch (Exception e) {