Skip to content

Commit

Permalink
Refactor to remove stream in RemotePeerForwarder as micro optimization (
Browse files Browse the repository at this point in the history
opensearch-project#2250)

* Refactor to remove stream in RemotePeerForwarder as micro optimization

Signed-off-by: Chase Engelbrecht <[email protected]>

* Remove unused import

Signed-off-by: Chase Engelbrecht <[email protected]>

---------

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas authored Feb 9, 2023
1 parent d27afa9 commit 7ce7818
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

class RemotePeerForwarder implements PeerForwarder {
private static final Logger LOG = LoggerFactory.getLogger(RemotePeerForwarder.class);
Expand Down Expand Up @@ -227,10 +226,10 @@ private List<Record<Event>> populateBatchingQueue(final String destinationIp, fi
}

private void forwardBatchedRecords() {
final Map<CompletableFuture<AggregatedHttpResponse>, List<Record<Event>>> futuresMap = peerBatchingQueueMap.keySet().stream()
.map(this::forwardRecordsForIp)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final Map<CompletableFuture<AggregatedHttpResponse>, List<Record<Event>>> futuresMap = new HashMap<>();
peerBatchingQueueMap.forEach((ipAddress, records) -> {
futuresMap.putAll(forwardRecordsForIp(ipAddress));
});

final CompletableFuture<Void> compositeFuture = CompletableFuture.allOf(futuresMap.keySet().toArray(CompletableFuture[]::new));
try {
Expand Down

0 comments on commit 7ce7818

Please sign in to comment.