Skip to content

Commit

Permalink
Implement shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
tginsberg committed Sep 24, 2024
1 parent 1fc1be7 commit 7f5aeb8
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
### 0.4.0
+ Remove `concat` implementation (the JDK has this)
+ Implement `suffle()` and `shuffle(RandomGenerator)`

### 0.3.0
+ Move minimum Java version from 22 to 23
Expand Down
32 changes: 28 additions & 4 deletions src/main/java/com/ginsberg/gatherers4j/Gatherers4j.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Duration;
import java.util.List;
import java.util.function.Function;
import java.util.random.RandomGenerator;
import java.util.stream.Gatherer;
import java.util.stream.Stream;

Expand All @@ -31,10 +32,10 @@ public class Gatherers4j {
* Limit the number of elements in the stream to some number per period, dropping anything over the
* limit during the period.
*
* @param amount A positive number of elements to allow per period
* @param amount A positive number of elements to allow per period
* @param duration A positive duration for the length of the period
* @param <INPUT> Type of elements in the stream
* @return A non-null ThrottlingGatherer<INPUT>
* @param <INPUT> Type of elements in the stream
*/
public static <INPUT> ThrottlingGatherer<INPUT> debounce(final int amount, final Duration duration) {
return new ThrottlingGatherer<>(ThrottlingGatherer.LimitRule.Drop, amount, duration);
Expand Down Expand Up @@ -148,6 +149,29 @@ public static BigDecimalSimpleAverageGatherer<BigDecimal> simpleRunningAverage()
return simpleRunningAverageBy(Function.identity());
}

/**
* Shuffle the input stream into a random order. This consumes the entire stream and
* holds it in memory, so it will not work on infinite streams and may cause memory
* pressure on very large streams.
*
* @return A non-null <code>ShufflingGatherer</code>ShufflingGatherer`
*/
public static <INPUT> ShufflingGatherer<INPUT> shuffle() {
return new ShufflingGatherer<>(RandomGenerator.getDefault());
}

/**
* Shuffle the input stream into a random order. This consumes the entire stream and
* holds it in memory, so it will not work on infinite streams and may cause memory
* pressure on very large streams.
*
* @param randomGenerator A non-null <code>RandomGenerator</code> to use as a random source for the shuffle
* @return A non-null <code>ShufflingGatherer</code>ShufflingGatherer`
*/
public static <INPUT> ShufflingGatherer<INPUT> shuffle(final RandomGenerator randomGenerator) {
return new ShufflingGatherer<>(randomGenerator);
}

/**
* Create a Stream that is the running average of <code>BigDecimal</code> objects as mapped by
* the given function. This is useful when paired with the <code>withOriginal</code> function.
Expand Down Expand Up @@ -190,10 +214,10 @@ public static <INPUT> BigDecimalSimpleMovingAverageGatherer<INPUT> simpleMovingA
* Limit the number of elements in the stream to some number per period. When the limit is reached,
* consumption is paused until a new period starts and the count resets.
*
* @param amount A positive number of elements to allow per period
* @param amount A positive number of elements to allow per period
* @param duration A positive duration for the length of the period
* @param <INPUT> Type of elements in the stream
* @return A non-null ThrottlingGatherer<INPUT>
* @param <INPUT> Type of elements in the stream
*/
public static <INPUT> ThrottlingGatherer<INPUT> throttle(final int amount, final Duration duration) {
return new ThrottlingGatherer<>(ThrottlingGatherer.LimitRule.Pause, amount, duration);
Expand Down
65 changes: 65 additions & 0 deletions src/main/java/com/ginsberg/gatherers4j/ShufflingGatherer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.random.RandomGenerator;
import java.util.stream.Gatherer;

import static com.ginsberg.gatherers4j.GathererUtils.mustNotBeNull;

public class ShufflingGatherer<INPUT> implements Gatherer<INPUT, ShufflingGatherer.State<INPUT>, INPUT> {

private final RandomGenerator randomGenerator;

ShufflingGatherer(final RandomGenerator randomGenerator) {
mustNotBeNull(randomGenerator, "RandomGenerator must not be null");
this.randomGenerator = randomGenerator;
}

@Override
public Supplier<ShufflingGatherer.State<INPUT>> initializer() {
return State::new;
}

@Override
public Integrator<ShufflingGatherer.State<INPUT>, INPUT, INPUT> integrator() {
return (state, element, _) -> state.inputs.add(element);
}

@Override
public BiConsumer<ShufflingGatherer.State<INPUT>, Downstream<? super INPUT>> finisher() {
return (state, downstream) -> {
while (!state.inputs.isEmpty() && !downstream.isRejecting()) {
int randomSlot = randomGenerator.nextInt(state.inputs.size());
INPUT last = state.inputs.removeLast();
if (randomSlot < state.inputs.size()) {
downstream.push(state.inputs.set(randomSlot, last));
} else {
downstream.push(last);
}
}
};
}

public static class State<INPUT> {
List<INPUT> inputs = new ArrayList<>();
}
}
73 changes: 73 additions & 0 deletions src/test/java/com/ginsberg/gatherers4j/ShufflingGathererTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2024 Todd Ginsberg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ginsberg.gatherers4j;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class ShufflingGathererTest {

@Test
void testKnownShuffle() {
// Arrange
final Stream<String> input = Stream.of("A", "B", "C", "D", "E", "F", "G");

// Act
final String output = input
.gather(Gatherers4j.shuffle(new Random(42)))
.collect(Collectors.joining());

// Assert
assertThat(output).isEqualTo("BDFAEGC");
}

@Test
void testRandomShuffles() {
// Arrange
final List<String> input = List.of("A", "B", "C", "D", "E");

// Act
final Set<String> output = Set.of(
input.stream().gather(Gatherers4j.shuffle()).collect(Collectors.joining()),
input.stream().gather(Gatherers4j.shuffle()).collect(Collectors.joining()),
input.stream().gather(Gatherers4j.shuffle()).collect(Collectors.joining()),
input.stream().gather(Gatherers4j.shuffle()).collect(Collectors.joining()),
input.stream().gather(Gatherers4j.shuffle()).collect(Collectors.joining())
);

// Assert
assertThat(output).hasSizeGreaterThan(1);
}

@Test
void withNullRandomGenerator() {
// Arrange
final Stream<String> input = Stream.of("A");

// Act/Assert
assertThatThrownBy(() -> input.gather(Gatherers4j.shuffle(null)).toList())
.isExactlyInstanceOf(IllegalArgumentException.class);
}
}

0 comments on commit 7f5aeb8

Please sign in to comment.