From 7ce781860e19402f831201a5334f68297023d9a5 Mon Sep 17 00:00:00 2001 From: Chase <62891993+engechas@users.noreply.github.com> Date: Wed, 8 Feb 2023 18:09:17 -0600 Subject: [PATCH] Refactor to remove stream in RemotePeerForwarder as micro optimization (#2250) * Refactor to remove stream in RemotePeerForwarder as micro optimization Signed-off-by: Chase Engelbrecht * Remove unused import Signed-off-by: Chase Engelbrecht --------- Signed-off-by: Chase Engelbrecht --- .../dataprepper/peerforwarder/RemotePeerForwarder.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/RemotePeerForwarder.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/RemotePeerForwarder.java index 5d55d4309d..f7fa299018 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/RemotePeerForwarder.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/RemotePeerForwarder.java @@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.Collectors; class RemotePeerForwarder implements PeerForwarder { private static final Logger LOG = LoggerFactory.getLogger(RemotePeerForwarder.class); @@ -227,10 +226,10 @@ private List> populateBatchingQueue(final String destinationIp, fi } private void forwardBatchedRecords() { - final Map, List>> futuresMap = peerBatchingQueueMap.keySet().stream() - .map(this::forwardRecordsForIp) - .flatMap(map -> map.entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + final Map, List>> futuresMap = new HashMap<>(); + peerBatchingQueueMap.forEach((ipAddress, records) -> { + futuresMap.putAll(forwardRecordsForIp(ipAddress)); + }); final CompletableFuture compositeFuture = CompletableFuture.allOf(futuresMap.keySet().toArray(CompletableFuture[]::new)); try {