diff --git a/CHANGELOG.md b/CHANGELOG.md
index b44fefb..f0fee07 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,5 @@
### 0.7.0
-+ TBD
++ Use greedy integrators where possible (Fixes #57)
### 0.6.0
+ Implement `dropLast(n)`
diff --git a/src/main/java/com/ginsberg/gatherers4j/BigDecimalGatherer.java b/src/main/java/com/ginsberg/gatherers4j/BigDecimalGatherer.java
index 9b669ec..4c9d063 100644
--- a/src/main/java/com/ginsberg/gatherers4j/BigDecimalGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/BigDecimalGatherer.java
@@ -36,7 +36,7 @@ abstract public class BigDecimalGatherer
@Override
public Integrator integrator() {
- return (state, element, downstream) -> {
+ return Integrator.ofGreedy((state, element, downstream) -> {
final BigDecimal mappedElement = element == null ? nullReplacement : mappingFunction.apply(element);
if (mappedElement != null) {
state.add(mappedElement, mathContext);
@@ -45,7 +45,7 @@ public Integrator integrator() {
}
}
return !downstream.isRejecting();
- };
+ });
}
/// When encountering a `null` value in a stream, treat it as `BigDecimal.ZERO` instead.
diff --git a/src/main/java/com/ginsberg/gatherers4j/DedupeConsecutiveGatherer.java b/src/main/java/com/ginsberg/gatherers4j/DedupeConsecutiveGatherer.java
index 0ba8861..5eb183c 100644
--- a/src/main/java/com/ginsberg/gatherers4j/DedupeConsecutiveGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/DedupeConsecutiveGatherer.java
@@ -37,7 +37,7 @@ public Supplier initializer() {
@Override
public Integrator integrator() {
- return (state, element, downstream) -> {
+ return Integrator.ofGreedy((state, element, downstream) -> {
final Object mapped = mappingFunction == null ? element : mappingFunction.apply(element);
if (!state.hasValue) {
state.hasValue = true;
@@ -48,7 +48,7 @@ public Integrator integrator() {
return downstream.push(element);
}
return !downstream.isRejecting();
- };
+ });
}
public static class State {
diff --git a/src/main/java/com/ginsberg/gatherers4j/DistinctGatherer.java b/src/main/java/com/ginsberg/gatherers4j/DistinctGatherer.java
index 2e31241..d1ec6f4 100644
--- a/src/main/java/com/ginsberg/gatherers4j/DistinctGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/DistinctGatherer.java
@@ -40,12 +40,12 @@ public Supplier initializer() {
@Override
public Integrator integrator() {
- return (state, element, downstream) -> {
+ return Integrator.ofGreedy((state, element, downstream) -> {
if (state.knownObjects.add(byFunction.apply(element))) {
downstream.push(element);
}
return !downstream.isRejecting();
- };
+ });
}
public static class State {
diff --git a/src/main/java/com/ginsberg/gatherers4j/DropLastGatherer.java b/src/main/java/com/ginsberg/gatherers4j/DropLastGatherer.java
index 05bf1c2..36af319 100644
--- a/src/main/java/com/ginsberg/gatherers4j/DropLastGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/DropLastGatherer.java
@@ -40,10 +40,10 @@ public Supplier> initializer() {
@Override
public Integrator, INPUT, INPUT> integrator() {
- return (state, element, downstream) -> {
+ return Integrator.ofGreedy((state, element, downstream) -> {
state.elements.add(element);
return !downstream.isRejecting();
- };
+ });
}
@Override
diff --git a/src/main/java/com/ginsberg/gatherers4j/FilteringWithIndexGatherer.java b/src/main/java/com/ginsberg/gatherers4j/FilteringWithIndexGatherer.java
index 1eec56e..d88cc9b 100644
--- a/src/main/java/com/ginsberg/gatherers4j/FilteringWithIndexGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/FilteringWithIndexGatherer.java
@@ -39,12 +39,12 @@ public Supplier initializer() {
@Override
public Integrator integrator() {
- return (state, element, downstream) -> {
+ return Integrator.ofGreedy((state, element, downstream) -> {
if (predicate.test(state.index++, element)) {
downstream.push(element);
}
return !downstream.isRejecting();
- };
+ });
}
public static class State {
diff --git a/src/main/java/com/ginsberg/gatherers4j/IndexingGatherer.java b/src/main/java/com/ginsberg/gatherers4j/IndexingGatherer.java
index d401ee7..5500c23 100644
--- a/src/main/java/com/ginsberg/gatherers4j/IndexingGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/IndexingGatherer.java
@@ -32,7 +32,9 @@ public Supplier initializer() {
@Override
public Integrator> integrator() {
- return (state, element, downstream) -> downstream.push(new IndexedValue<>(state.index++, element));
+ return Integrator.ofGreedy((state, element, downstream) ->
+ downstream.push(new IndexedValue<>(state.index++, element))
+ );
}
public static class State {
diff --git a/src/main/java/com/ginsberg/gatherers4j/LastGatherer.java b/src/main/java/com/ginsberg/gatherers4j/LastGatherer.java
index 4c83a55..af1b686 100644
--- a/src/main/java/com/ginsberg/gatherers4j/LastGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/LastGatherer.java
@@ -51,12 +51,12 @@ public Supplier> initializer() {
@Override
public Integrator, INPUT, INPUT> integrator() {
- return Integrator.ofGreedy((state, element, _) -> {
+ return Integrator.ofGreedy((state, element, downstream) -> {
if (state.elements.size() == lastCount) {
state.elements.removeFirst();
}
state.elements.add(element);
- return true;
+ return !downstream.isRejecting();
});
}
diff --git a/src/main/java/com/ginsberg/gatherers4j/MinMaxGatherer.java b/src/main/java/com/ginsberg/gatherers4j/MinMaxGatherer.java
index 25fa83d..80ed70e 100644
--- a/src/main/java/com/ginsberg/gatherers4j/MinMaxGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/MinMaxGatherer.java
@@ -39,7 +39,7 @@ public Supplier> initializer() {
@Override
public Integrator, INPUT, INPUT> integrator() {
- return (state, element, downstream) -> {
+ return Integrator.ofGreedy((state, element, downstream) -> {
final MAPPED mapped = element == null ? null : mappingFunction.apply(element);
if (mapped == null) {
return !downstream.isRejecting();
@@ -55,7 +55,7 @@ public Integrator, INPUT, INPUT> integrator() {
}
}
return !downstream.isRejecting();
- };
+ });
}
@Override
diff --git a/src/main/java/com/ginsberg/gatherers4j/ReversingGatherer.java b/src/main/java/com/ginsberg/gatherers4j/ReversingGatherer.java
index 57d33a7..e3df562 100644
--- a/src/main/java/com/ginsberg/gatherers4j/ReversingGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/ReversingGatherer.java
@@ -31,7 +31,10 @@ public Supplier> initializer() {
@Override
public Integrator, INPUT, INPUT> integrator() {
- return (state, element, _) -> state.inputs.add(element);
+ return Integrator.ofGreedy((state, element, downstream) -> {
+ state.inputs.add(element);
+ return !downstream.isRejecting();
+ });
}
@Override
diff --git a/src/main/java/com/ginsberg/gatherers4j/ShufflingGatherer.java b/src/main/java/com/ginsberg/gatherers4j/ShufflingGatherer.java
index 6598ea1..3555b39 100644
--- a/src/main/java/com/ginsberg/gatherers4j/ShufflingGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/ShufflingGatherer.java
@@ -41,7 +41,10 @@ public Supplier> initializer() {
@Override
public Integrator, INPUT, INPUT> integrator() {
- return (state, element, _) -> state.inputs.add(element);
+ return Integrator.ofGreedy((state, element, downstream) -> {
+ state.inputs.add(element);
+ return !downstream.isRejecting();
+ });
}
@Override
diff --git a/src/main/java/com/ginsberg/gatherers4j/ThrottlingGatherer.java b/src/main/java/com/ginsberg/gatherers4j/ThrottlingGatherer.java
index cc75422..f69a798 100644
--- a/src/main/java/com/ginsberg/gatherers4j/ThrottlingGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/ThrottlingGatherer.java
@@ -63,12 +63,12 @@ public Supplier initializer() {
@Override
public Integrator integrator() {
- return (state, element, downstream) -> {
+ return Integrator.ofGreedy((state, element, downstream) -> {
if (!downstream.isRejecting() && state.attempt()) {
downstream.push(element);
}
return !downstream.isRejecting();
- };
+ });
}
public static class State {
diff --git a/src/main/java/com/ginsberg/gatherers4j/ZipWithGatherer.java b/src/main/java/com/ginsberg/gatherers4j/ZipWithGatherer.java
index 2b0ab49..4d00691 100644
--- a/src/main/java/com/ginsberg/gatherers4j/ZipWithGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/ZipWithGatherer.java
@@ -106,9 +106,9 @@ public Integrator> integrator() {
return (_, element, downstream) -> {
boolean advanced = otherSpliterator.tryAdvance(it -> downstream.push(new Pair<>(element, it)));
if (!advanced && argumentWhenSourceLonger != null) {
- downstream.push(new Pair<>(element, argumentWhenSourceLonger.apply(element)));
+ return downstream.push(new Pair<>(element, argumentWhenSourceLonger.apply(element)));
}
- return !downstream.isRejecting();
+ return advanced && !downstream.isRejecting();
};
}
diff --git a/src/main/java/com/ginsberg/gatherers4j/ZipWithNextGatherer.java b/src/main/java/com/ginsberg/gatherers4j/ZipWithNextGatherer.java
index b59d41c..13df1cb 100644
--- a/src/main/java/com/ginsberg/gatherers4j/ZipWithNextGatherer.java
+++ b/src/main/java/com/ginsberg/gatherers4j/ZipWithNextGatherer.java
@@ -33,7 +33,7 @@ public Supplier> initializer() {
@Override
public Integrator, INPUT, List> integrator() {
- return (state, element, downstream) -> {
+ return Integrator.ofGreedy((state, element, downstream) -> {
if (!state.hasValue) {
state.hasValue = true;
} else {
@@ -41,7 +41,7 @@ public Integrator, INPUT, List> integrator() {
}
state.value = element;
return !downstream.isRejecting();
- };
+ });
}