Skip to content

Commit

Permalink
Implement takeUntil (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
tginsberg authored Jan 24, 2025
1 parent 588fc50 commit 7f4fcaa
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
+ Add support for `orElse()` and `orElseEmpty()` on size-based gatherers to provide a non-exceptional output stream
+ Implement `everyNth()` to get every `n`<sup>th</sup> element from the stream
+ Implement `uniquelyOccurring()` to emit stream elements that occur a single time
+ Implement `takeUntil()` to take from a stream until a predicate is met, including the first element that matches the predicate

### 0.7.0
+ Use greedy integrators where possible (Fixes #57)
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ implementation("com.ginsberg:gatherers4j:0.8.0")
| `sizeGreaterThanOrEqualTo(n)` | Ensure the stream is greater than or equal to `n` elements long, or throw an `IllegalStateException` |
| `sizeLessThan(n)` | Ensure the stream is less than `n` elements long, or throw an `IllegalStateException` |
| `sizeLessThanOrEqualTo(n)` | Ensure the stream is less than or equal to `n` elements long, or throw an `IllegalStateException` |
| `takeUntil(predicate)` | Take elements from the input stream until the `predicate` is met, including the first element that matches the `preciate` |
| `throttle(amount, duration)` | Limit stream elements to `amount` elements over `duration`, pausing until a new `duration` period starts |
| `uniquelyOccurring()` | Emit elements that occur a single time, dropping all others |
| `withIndex()` | Maps all elements of the stream as-is along with their 0-based index |
Expand Down Expand Up @@ -176,6 +177,16 @@ Stream.of("A", "B", "C", "D", "E", "F", "G")
// ["A", "D", "G"]
```

#### Take from a stream until a predicate is met, inclusive

```java
Stream.of("A", "B", "C", "D", "E", "F", "G")
.gather(Gatherers4j.takeUntil(it -> it.equals("C")))
.toList()

// ["A", "B", "C"]
```

#### Ensure the stream is exactly `n` elements long

```java
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/com/ginsberg/gatherers4j/Gatherers4j.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.random.RandomGenerator;
import java.util.stream.Stream;

Expand Down Expand Up @@ -485,6 +486,18 @@ public static <INPUT> LastGatherer<INPUT> last(final int count) {
return new SizeGatherer<>(SizeGatherer.Operation.LessThanOrEqualTo, size);
}

/// Take elements from the input stream until the `predicate` is met, including the first element that
/// matches the `predicate`.
///
/// @param predicate A non-null predicate function
/// @param <INPUT> Type of elements in both the input and output streams
/// @return A non-null `TakeUntilGatherer`
public static <INPUT extends @Nullable Object> TakeUntilGatherer<INPUT> takeUntil(
final Predicate<INPUT> predicate
) {
return new TakeUntilGatherer<>(predicate);
}

/// Limit the number of elements in the stream to some number per period. When the limit is reached,
/// consumption is paused until a new period starts and the count resets.
///
Expand All @@ -499,7 +512,6 @@ public static <INPUT> LastGatherer<INPUT> last(final int count) {
return new ThrottlingGatherer<>(ThrottlingGatherer.LimitRule.Pause, amount, duration);
}


/// Emit only those elements that occur in the input stream a single time.
///
/// @param <INPUT> Type of elements in the input stream
Expand Down
56 changes: 56 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/TakeUntilGatherer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2025 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import org.jspecify.annotations.Nullable;

import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Gatherer;

import static com.ginsberg.gatherers4j.GathererUtils.mustNotBeNull;

public class TakeUntilGatherer<INPUT extends @Nullable Object>
implements Gatherer<INPUT, TakeUntilGatherer.State, INPUT> {

private final Predicate<INPUT> predicate;

TakeUntilGatherer(final Predicate<INPUT> predicate) {
mustNotBeNull(predicate, "Predicate must not be null");
this.predicate = predicate;
}

@Override
public Supplier<State> initializer() {
return State::new;
}

@Override
public Integrator<State, INPUT, INPUT> integrator() {
return (state, element, downstream) -> {
if(state.done) {
return false;
}
state.done = predicate.test(element);
return downstream.push(element);
};
}

public static class State {
boolean done;
}
}
46 changes: 46 additions & 0 deletions src/test/java/com/ginsberg/gatherers4j/TakeUntilGathererTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2025 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class TakeUntilGathererTest {

@Test
@SuppressWarnings("DataFlowIssue")
void predicateMustNotBeNull() {
assertThatThrownBy(() -> Gatherers4j.takeUntil(null)).isInstanceOf(IllegalArgumentException.class);
}

@Test
void takeUntilIncludesTriggeringElement() {
// Arrange
final Stream<String> input = Stream.of("A", "BB", "CCC", "DDDD", "EEEEE");

// Act
final List<String> output = input.gather(Gatherers4j.takeUntil(it -> it.equals("CCC"))).toList();

// Assert
assertThat(output).containsExactly("A", "BB", "CCC");
}
}

0 comments on commit 7f4fcaa

Please sign in to comment.