Skip to content

Commit

Permalink
fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored and yihua committed May 3, 2024
1 parent 40642d5 commit 34711f0
Showing 1 changed file with 22 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -55,43 +54,10 @@ public class TestAverageRecordSizeUtils {
private final HoodieTimeline mockTimeline = mock(HoodieTimeline.class);
private static final String PARTITION1 = "partition1";
private static final String TEST_WRITE_TOKEN = "1-0-1";
private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();

@ParameterizedTest
@MethodSource("testCasesCOW")
public void testAverageRecordSizeForCOW(List<Pair<HoodieInstant, Pair<Long, Long>>> instantSizePairs, long expectedSize) {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.build();
HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline();
List<HoodieInstant> instants = new ArrayList<>();
instantSizePairs.forEach(entry -> {
HoodieInstant hoodieInstant = entry.getKey();
Pair<Long, Long> recordCountSizePair = entry.getValue();
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setNumWrites(recordCountSizePair.getKey());
writeStat.setTotalWriteBytes(recordCountSizePair.getValue() * recordCountSizePair.getKey());
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
commitMetadata.addWriteStat(PARTITION1, writeStat);
instants.add(hoodieInstant);
try {
when(mockTimeline.getInstantDetails(hoodieInstant)).thenReturn(TimelineMetadataUtils.serializeCommitMetadata(commitMetadata));
} catch (IOException e) {
throw new RuntimeException("Should not have failed", e);
}
});

List<HoodieInstant> reverseOrderInstants = new ArrayList<>(instants);
Collections.reverse(reverseOrderInstants);
when(mockTimeline.getInstants()).thenReturn(instants);
when(mockTimeline.getReverseOrderedInstants()).then(i -> reverseOrderInstants.stream());
commitsTimeline.setInstants(instants);

assertEquals(expectedSize, AverageRecordSizeUtils.averageBytesPerRecord(mockTimeline, writeConfig));
}

@ParameterizedTest
@MethodSource("testCasesMOR")
public void testAverageRecordSizeForMOR(List<Pair<HoodieInstant, List<HWriteStat>>> instantSizePairs, long expectedSize) {
@MethodSource("testCases")
public void testAverageRecordSize(List<Pair<HoodieInstant, List<HWriteStat>>> instantSizePairs, long expectedSize) {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.build();
HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline();
Expand All @@ -101,8 +67,8 @@ public void testAverageRecordSizeForMOR(List<Pair<HoodieInstant, List<HWriteStat
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
entry.getValue().forEach(hWriteStat -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setNumWrites(hWriteStat.totalRecordsWritten);
writeStat.setTotalWriteBytes(hWriteStat.getPerRecordSize() * hWriteStat.totalRecordsWritten);
writeStat.setNumWrites(hWriteStat.getTotalRecordsWritten());
writeStat.setTotalWriteBytes(hWriteStat.getPerRecordSize() * hWriteStat.getTotalRecordsWritten());
writeStat.setPath(hWriteStat.getPath());
commitMetadata.addWriteStat(PARTITION1, writeStat);
});
Expand Down Expand Up @@ -135,37 +101,37 @@ private static String getLogFileName(String instantTime) {
return FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(), instantTime, 1, TEST_WRITE_TOKEN);
}

static Stream<Arguments> testCasesCOW() {
static Stream<Arguments> testCases() {
Long baseInstant = 20231204194919610L;
List<Arguments> arguments = new ArrayList<>();
// COW
// straight forward. just 1 instant.
arguments.add(Arguments.of(Collections.singletonList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant)), Pair.of(10000000L, 100L))), 100L));
arguments.add(Arguments.of(
Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L)))), 100L));

// two instants. latest instant should be honored
arguments.add(Arguments.of(
Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), Pair.of(10000000L, 100L)),
Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)), Pair.of(10000000L, 200L))), 200L));
Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)),
Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L)))), 200L));

// two instants, while 2nd one is smaller in size so as to not meet the threshold. So, 1st one should be honored
arguments.add(Arguments.of(
Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), Pair.of(10000000L, 100L)),
Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)), Pair.of(100000L, 200L))), 100L));
Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000L, 200L)))), 100L));

// 2nd instance is replace commit and should be honored.
arguments.add(Arguments.of(
Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), Pair.of(10000000L, 100L)),
Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant + 100)), Pair.of(10000000L, 300L))), 300L));
return arguments.stream();
}

static Stream<Arguments> testCasesMOR() {
Long baseInstant = 20231204194919610L;
List<Arguments> arguments = new ArrayList<>();
// straight forward. just 1 compaction instant.
arguments.add(Arguments.of(Collections.singletonList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant)), Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L)))), 100L));
Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant + 100)),
Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L)))), 200L));

// MOR
// for delta commits, only parquet files should be accounted for.
arguments.add(Arguments.of(
Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
Expand Down

0 comments on commit 34711f0

Please sign in to comment.