Skip to content

Commit

Permalink
Implement uniquelyOccurring to constrain stream to elements that occu…
Browse files Browse the repository at this point in the history
…r a single time
  • Loading branch information
tginsberg committed Jan 21, 2025
1 parent 960e109 commit b105f5f
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
### 0.8.0
+ Add support for `orElse()` and `orElseEmpty()` on size-based gatherers to provide a non-exceptional output stream
+ Implement `everyNth()` to get every `n`<sup>th</sup> element from the stream
+ Implement `uniquelyOccurring()` to emit stream elements that occur a single time

### 0.7.0
+ Use greedy integrators where possible (Fixes #57)
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ implementation("com.ginsberg:gatherers4j:0.8.0")
| `sizeLessThan(n)` | Ensure the stream is less than `n` elements long, or throw an `IllegalStateException` |
| `sizeLessThanOrEqualTo(n)` | Ensure the stream is less than or equal to `n` elements long, or throw an `IllegalStateException` |
| `throttle(amount, duration)` | Limit stream elements to `amount` elements over `duration`, pausing until a new `duration` period starts |
| `uniquelyOccurring()` | Emit elements that occur a single time, dropping all others |
| `withIndex()` | Maps all elements of the stream as-is along with their 0-based index |
| `zipWith(iterable)` | Creates a stream of `Pair` objects whose values come from the input stream and argument iterable |
| `zipWith(iterator)` | Creates a stream of `Pair` objects whose values come from the input stream and argument iterator |
Expand Down Expand Up @@ -400,6 +401,16 @@ Stream
+----------- Pause
```

#### Limit stream to elements that occur a single time

```java
Stream
.of("A", "B", "C", "A")
.gather(Gatherers4j.uniquelyOccurring())
.toList();

// ["B", "C"]
```

#### Zip two streams of together into a `Stream<Pair>`

Expand Down
9 changes: 9 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/Gatherers4j.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,15 @@ public static <INPUT> LastGatherer<INPUT> last(final int count) {
return new ThrottlingGatherer<>(ThrottlingGatherer.LimitRule.Pause, amount, duration);
}


/// Emit only those elements that occur in the input stream a single time.
///
/// @param <INPUT> Type of elements in the input stream
/// @return A non-null `UniquelyOccurringGatherer`
public static <INPUT extends @Nullable Object> UniquelyOccurringGatherer<INPUT> uniquelyOccurring() {
return new UniquelyOccurringGatherer<>();
}

/// Maps all elements of the stream as-is along with their 0-based index.
///
/// @param <INPUT> Type of elements in the input stream
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2025 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import org.jspecify.annotations.Nullable;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Gatherer;

public class UniquelyOccurringGatherer<INPUT extends @Nullable Object>
implements Gatherer<INPUT, UniquelyOccurringGatherer.State<INPUT>, INPUT> {

UniquelyOccurringGatherer() {
// Nothing to do
}

@Override
public Supplier<State<INPUT>> initializer() {
return State::new;
}

@Override
public Integrator<State<INPUT>, INPUT, INPUT> integrator() {
return Integrator.ofGreedy((state, element, downstream) -> {
if (!state.knownBad.contains(element)) {
if (state.found.containsKey(element)) {
state.knownBad.add(element);
state.found.remove(element);
} else {
state.found.put(element, state.iteration++);
}
}
return !downstream.isRejecting();
});
}

@Override
public BiConsumer<State<INPUT>, Downstream<? super INPUT>> finisher() {
return (inputState, downstream) ->
inputState.found
.entrySet()
.stream()
.sorted((a, b) -> (int) (a.getValue() - b.getValue()))
.forEach(it -> downstream.push(it.getKey()));
}

public static class State<INPUT> {
long iteration = 0;
final Set<INPUT> knownBad = new HashSet<>();
final Map<INPUT, Long> found = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2025 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

class UniquelyOccurringGathererTest {

@Test
void allowsUniqueNull() {
// Arrange
final Stream<String> input = Stream.of(null, "B", "B");

//Act
final List<String> output = input.gather(Gatherers4j.uniquelyOccurring()).toList();

// Assert
assertThat(output).hasSize(1).containsNull();
}

@Test
void emitsEmptyOnNoUniqueItems() {
// Arrange
final Stream<String> input = Stream.of("A", "A");

//Act
final List<String> output = input.gather(Gatherers4j.uniquelyOccurring()).toList();

// Assert
assertThat(output).isEmpty();
}

@Test
void emitsInEncounterOrder() {
// Arrange
final Stream<String> input = Stream.of("A", "B", "C", "D", "B", "C");

//Act
final List<String> output = input.gather(Gatherers4j.uniquelyOccurring()).toList();

// Assert
assertThat(output).containsExactly("A", "D");
}

@Test
void filtersOutNonUnique() {
// Arrange
final Stream<String> input = Stream.of("A", "B", "A", "A");

//Act
final List<String> output = input.gather(Gatherers4j.uniquelyOccurring()).toList();

// Assert
assertThat(output).containsExactly("B");
}

@Test
void filtersOutNonUniqueNull() {
// Arrange
final Stream<String> input = Stream.of(null, "B", null, null);

//Act
final List<String> output = input.gather(Gatherers4j.uniquelyOccurring()).toList();

// Assert
assertThat(output).containsExactly("B");
}

}

0 comments on commit b105f5f

Please sign in to comment.