Skip to content

Commit

Permalink
[FLINK-36530][state] Fix S3 performance issue with uncompressed state…
Browse files Browse the repository at this point in the history
… restore
  • Loading branch information
gaborgsomogyi authored Oct 18, 2024
1 parent 8c13611 commit 21f79d1
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,21 @@ public class CompressibleFSDataInputStream extends FSDataInputStream {

private final FSDataInputStream delegate;
private final InputStream compressingDelegate;
private final boolean compressed;

public CompressibleFSDataInputStream(
FSDataInputStream delegate, StreamCompressionDecorator compressionDecorator)
throws IOException {
this.delegate = delegate;
this.compressingDelegate = compressionDecorator.decorateWithCompression(delegate);
this.compressed = compressionDecorator != UncompressedStreamCompressionDecorator.INSTANCE;
}

@Override
public void seek(long desired) throws IOException {
final int available = compressingDelegate.available();
if (available > 0) {
if (available != compressingDelegate.skip(available)) {
if (compressed) {
final int available = compressingDelegate.available();
if (available > 0 && available != compressingDelegate.skip(available)) {
throw new IOException("Unable to skip buffered data.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -33,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -99,16 +102,23 @@ private static void verifyRecordPrefix(
assertThat(readBuffer).asString(StandardCharsets.UTF_8).isEqualTo(prefix);
}

@Test
void testSeek() throws IOException {
private static Stream<Arguments> testSeekParameters() {
return Stream.of(
Arguments.of(new UncompressedStreamCompressionDecorator()),
Arguments.of(new SnappyStreamCompressionDecorator()));
}

@ParameterizedTest
@MethodSource("testSeekParameters")
void testSeek(StreamCompressionDecorator streamCompressionDecorator) throws IOException {
final List<String> records = Arrays.asList("first", "second", "third", "fourth", "fifth");
final Map<String, Long> positions = new HashMap<>();

byte[] compressedBytes;
try (final TestingOutputStream outputStream = new TestingOutputStream();
final CompressibleFSDataOutputStream compressibleOutputStream =
new CompressibleFSDataOutputStream(
outputStream, new SnappyStreamCompressionDecorator())) {
outputStream, streamCompressionDecorator)) {
for (String record : records) {
positions.put(record, compressibleOutputStream.getPos());
compressibleOutputStream.write(record.getBytes(StandardCharsets.UTF_8));
Expand All @@ -121,7 +131,7 @@ outputStream, new SnappyStreamCompressionDecorator())) {
new InputStreamFSInputWrapper(new ByteArrayInputStream(compressedBytes));
final FSDataInputStream compressibleInputStream =
new CompressibleFSDataInputStream(
inputStream, new SnappyStreamCompressionDecorator())) {
inputStream, streamCompressionDecorator)) {
verifyRecord(compressibleInputStream, positions, "first");
verifyRecord(compressibleInputStream, positions, "third");
verifyRecord(compressibleInputStream, positions, "fifth");
Expand All @@ -133,7 +143,7 @@ inputStream, new SnappyStreamCompressionDecorator())) {
new InputStreamFSInputWrapper(new ByteArrayInputStream(compressedBytes));
final FSDataInputStream compressibleInputStream =
new CompressibleFSDataInputStream(
inputStream, new SnappyStreamCompressionDecorator())) {
inputStream, streamCompressionDecorator)) {
verifyRecordPrefix(compressibleInputStream, positions, "first", "fir");
verifyRecordPrefix(compressibleInputStream, positions, "third", "thi");
verifyRecord(compressibleInputStream, positions, "fifth");
Expand Down

0 comments on commit 21f79d1

Please sign in to comment.