From 87e246486fa1ee28ca01da12ab25a5936d27f3d2 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 26 Nov 2024 20:16:21 +0000 Subject: [PATCH 1/3] GH-3080: HadoopStreams to support ByteBufferPositionedReadable input streams --- .../hadoop/util/H2SeekableInputStream.java | 4 + .../hadoop/util/H3ByteBufferInputStream.java | 66 +++ .../parquet/hadoop/util/HadoopStreams.java | 60 +-- .../hadoop/util/MockHadoopInputStream.java | 26 ++ .../util/TestHadoop2ByteBufferReads.java | 13 +- .../util/TestHadoop3ByteBufferReadFully.java | 427 ++++++++++++++++++ 6 files changed, 545 insertions(+), 51 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java index 6b65bdbafa..63565645d8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java @@ -100,6 +100,10 @@ public void readVectored(List ranges, ByteBufferAllocator allo VectorIoBridge.instance().readVectoredRanges(stream, ranges, allocator); } + protected Reader getReader() { + return reader; + } + public static void readFully(Reader reader, ByteBuffer buf) throws IOException { // unfortunately the Hadoop 2 APIs do not have a 'readFully' equivalent for the byteBuffer read // calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java new file mode 100644 index 0000000000..c9bed63a6f --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.fs.FSDataInputStream; + +/** + * Class which implements {@link #readFully(ByteBuffer)} through + * {@code ByteBufferPositionedReadable.readFully()}. + *

This is implemented by HDFS and possibly other clients, + */ +class H3ByteBufferInputStream extends H2SeekableInputStream { + public H3ByteBufferInputStream(final FSDataInputStream stream) { + super(stream); + } + + @Override + public FSDataInputStream getStream() { + return (FSDataInputStream) super.getStream(); + } + + /** + * Read the buffer fully through use of {@code ByteBufferPositionedReadable.readFully()} + * at the current location. + *

That operation is designed to not use the current reading position, rather + * an absolute position is passed in. + * In the use here the original read position is saved, and + * after the read is finished a {@code seek()} call made to move the + * cursor on. + * + * @param buf a byte buffer to fill with data from the stream + * + * @throws EOFException the buffer length is greater than the file length + * @throws IOException other IO problems. + */ + @Override + public void readFully(final ByteBuffer buf) throws EOFException, IOException { + // use ByteBufferPositionedReadable.readFully() + final FSDataInputStream stream = getStream(); + // remember the current position + final long pos = stream.getPos(); + final int size = buf.remaining(); + stream.readFully(pos, buf); + // then move read position on afterwards. + stream.seek(pos + size); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java index eebc59987b..107bf17e8f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java @@ -19,15 +19,16 @@ package org.apache.parquet.hadoop.util; +import static org.apache.hadoop.fs.StreamCapabilities.PREADBYTEBUFFER; +import static org.apache.hadoop.fs.StreamCapabilities.READBYTEBUFFER; + import java.io.InputStream; import java.util.Objects; -import java.util.function.Function; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.SeekableInputStream; -import org.apache.parquet.util.DynMethods; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,11 +39,6 @@ public class HadoopStreams { private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class); - private static final DynMethods.UnboundMethod hasCapabilitiesMethod = new DynMethods.Builder("hasCapabilities") - .impl(FSDataInputStream.class, "hasCapabilities", String.class) - .orNoop() - .build(); - /** * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream} * implementation for Parquet readers. @@ -53,42 +49,14 @@ public class HadoopStreams { public static SeekableInputStream wrap(FSDataInputStream stream) { Objects.requireNonNull(stream, "Cannot wrap a null input stream"); - // Try to check using hasCapabilities(str) - Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream); - - // If it is null, then fall back to the old method - if (hasCapabilitiesResult != null) { - if (hasCapabilitiesResult) { - return new H2SeekableInputStream(stream); - } else { - return new H1SeekableInputStream(stream); - } - } - - return unwrapByteBufferReadableLegacy(stream).apply(stream); - } - - /** - * Is the inner stream byte buffer readable? - * The test is 'the stream is not FSDataInputStream - * and implements ByteBufferReadable' - *

- * This logic is only used for Hadoop <2.9.x, and <3.x.x - * - * @param stream stream to probe - * @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable - */ - private static Function unwrapByteBufferReadableLegacy( - FSDataInputStream stream) { - InputStream wrapped = stream.getWrappedStream(); - if (wrapped instanceof FSDataInputStream) { - LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); - return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped)); - } - if (stream.getWrappedStream() instanceof ByteBufferReadable) { - return H2SeekableInputStream::new; + // Check using hasCapabilities(str) + if (stream.hasCapability(PREADBYTEBUFFER)) { + LOG.debug("Using ByteBufferPositionedReadable to read {}", stream); + return new H3ByteBufferInputStream(stream); + } else if (isWrappedStreamByteBufferReadable(stream)) { + return new H2SeekableInputStream(stream); } else { - return H1SeekableInputStream::new; + return new H1SeekableInputStream(stream); } } @@ -111,14 +79,8 @@ private static Function unwrapByteBuffer * the data, null when it cannot be determined because of missing hasCapabilities */ private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { - if (hasCapabilitiesMethod.isNoop()) { - // When the method is not available, just return a null - return null; - } - - boolean isByteBufferReadable = hasCapabilitiesMethod.invoke(stream, "in:readbytebuffer"); - if (isByteBufferReadable) { + if (stream.hasCapability(READBYTEBUFFER)) { // stream is issuing the guarantee that it implements the // API. Holds for all implementations in hadoop-* // since Hadoop 3.3.0 (HDFS-14111). diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java index 0e0c8f0db1..6c342fd5d1 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java @@ -20,6 +20,7 @@ package org.apache.parquet.hadoop.util; import java.io.ByteArrayInputStream; +import java.io.EOFException; import java.io.IOException; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; @@ -71,6 +72,7 @@ public void readFully(long position, byte[] buffer) throws IOException { @Override public void seek(long pos) throws IOException { + rejectNegativePosition(pos); this.pos = (int) pos; } @@ -84,4 +86,28 @@ public boolean seekToNewSource(long targetPos) throws IOException { seek(targetPos); return true; } + + /** + * How long is the actual test data. + * @return the test data + */ + int length() { + return TEST_ARRAY.length; + } + + byte[] data() { + return TEST_ARRAY; + } + + /** + * For consistency with real Hadoop streams: reject negative positions. + * @param pos position to read/seek to. + * @throws EOFException if pos is negative + */ + static void rejectNegativePosition(final long pos) throws EOFException { + if (pos < 0) { + throw new EOFException("Seek before file start: " + pos); + } + } + } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java index 0232ccf984..e833b64f39 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java @@ -27,6 +27,8 @@ import java.nio.ByteBuffer; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.util.StringUtils; import org.apache.parquet.hadoop.TestUtils; import org.apache.parquet.io.SeekableInputStream; import org.junit.Assert; @@ -407,13 +409,20 @@ public void testDoubleWrapByteBufferReadable() { } /** - * Input stream which claims to implement ByteBufferReadable. + * Input stream which claims to implement ByteBufferReadable in both interfaces and + * in {@code hasCapability()}. */ - private static final class MockByteBufferInputStream extends MockHadoopInputStream implements ByteBufferReadable { + private static final class MockByteBufferInputStream extends MockHadoopInputStream + implements ByteBufferReadable, StreamCapabilities { @Override public int read(final ByteBuffer buf) { return 0; } + + @Override + public boolean hasCapability(final String capability) { + return StringUtils.toLowerCase(capability).equals(READBYTEBUFFER); + } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java new file mode 100644 index 0000000000..c4b5c947b9 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop.util; + +import static org.apache.parquet.hadoop.util.HadoopStreams.wrap; +import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.util.StringUtils; +import org.apache.parquet.hadoop.TestUtils; +import org.apache.parquet.io.SeekableInputStream; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test {@code ByteBufferPositionedReadable.readFully()} reads. + */ +public class TestHadoop3ByteBufferReadFully { + + @Test + public void testHeapReadFullySmallBuffer() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(8); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testHeapReadFullyLargeBuffer() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocate(20); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + assertThrowsEOFException(hadoopStream, 0, readBuffer); + + // NOTE: This behavior differs from readFullyHeapBuffer because direct uses + // several read operations that will read up to the end of the input. This + // is a correct value because the bytes in the buffer are valid. This + // behavior can't be implemented for the heap buffer without using the read + // method instead of the readFully method on the underlying + // FSDataInputStream. + assertPositionAndLimit(readBuffer, 10, 20); + } + + private static void assertPositionAndLimit(ByteBuffer readBuffer, int pos, int limit) { + assertPosition(readBuffer, pos); + assertLimit(readBuffer, limit); + } + + private static void assertPosition(final ByteBuffer readBuffer, final int pos) { + Assert.assertEquals("Buffer Position", pos, readBuffer.position()); + } + + private static void assertLimit(final ByteBuffer readBuffer, final int limit) { + Assert.assertEquals("Buffer Limit", limit, readBuffer.limit()); + } + + @Test + public void testHeapReadFullyJustRight() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + // reads all of the bytes available without EOFException + hadoopStream.readFully(0, readBuffer); + assertPosition(readBuffer, 10); + + // trying to read 0 more bytes doesn't result in EOFException + hadoopStream.readFully(11, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapReadFullySmallReads() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapReadFullyPosition() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + readBuffer.position(3); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testHeapReadFullyLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + readBuffer.limit(7); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testHeapReadFullyPositionAndLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocate(10); + readBuffer.position(3); + readBuffer.limit(7); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testDirectReadFullySmallBuffer() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(8); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(8, readBuffer.position()); + Assert.assertEquals(8, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + } + + @Test + public void testDirectReadFullyLargeBuffer() throws Exception { + final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + final int position = 0; + assertThrowsEOFException(hadoopStream, position, readBuffer); + + // NOTE: This behavior differs from readFullyHeapBuffer because direct uses + // several read operations that will read up to the end of the input. This + // is a correct value because the bytes in the buffer are valid. This + // behavior can't be implemented for the heap buffer without using the read + // method instead of the readFully method on the underlying + // FSDataInputStream. + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(20, readBuffer.limit()); + } + + private static void assertThrowsEOFException( + final FSDataInputStream hadoopStream, final int position, final ByteBuffer readBuffer) { + TestUtils.assertThrows("Should throw EOFException", EOFException.class, () -> { + hadoopStream.readFully(position, readBuffer); + return null; + }); + } + + @Test + public void testDirectReadFullyJustRight() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + // reads all of the bytes available without EOFException + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + // trying to read 0 more bytes doesn't result in EOFException + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectReadFullySmallReads() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectReadFullyPosition() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + readBuffer.position(3); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testDirectReadFullyLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + readBuffer.limit(7); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.flip(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + } + + @Test + public void testDirectReadFullyPositionAndLimit() throws Exception { + ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); + readBuffer.position(3); + readBuffer.limit(7); + readBuffer.mark(); + + FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(7, readBuffer.position()); + Assert.assertEquals(7, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); + + readBuffer.position(7); + readBuffer.limit(10); + hadoopStream.readFully(0, readBuffer); + Assert.assertEquals(10, readBuffer.position()); + Assert.assertEquals(10, readBuffer.limit()); + + readBuffer.reset(); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + } + + @Test + public void testCreateStreamNoByteBufferPositionedReadable() { + final SeekableInputStream s = wrap(new FSDataInputStream(new MockHadoopInputStream())); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + } + + @Test + public void testDoubleWrapNoByteBufferPositionedReadable() { + final SeekableInputStream s = + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + } + + @Test + public void testCreateStreamWithByteBufferPositionedReadable() { + final SeekableInputStream s = wrap(new FSDataInputStream(new MockByteBufferReadFullyInputStream())); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + } + + @Test + public void testDoubleWrapByteBufferPositionedReadable() { + final SeekableInputStream s = + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + } + + /** + * Input stream which claims to implement ByteBufferPositionedReadable + */ + private static final class MockByteBufferReadFullyInputStream extends MockHadoopInputStream + implements ByteBufferPositionedReadable, StreamCapabilities { + + @Override + public int read(final long position, final ByteBuffer buf) throws IOException { + rejectNegativePosition(position); + return 0; + } + + @Override + public void readFully(final long position, final ByteBuffer buf) throws IOException { + + // validation + rejectNegativePosition(position); + final int toRead = buf.remaining(); + if (getPos() + length() > toRead) { + throw new EOFException("Read past " + length()); + } + // return the subset of the data + byte[] result = new byte[toRead]; + System.arraycopy(data(), 0, result, 0, toRead); + buf.put(result); + } + + public boolean hasCapability(final String capability) { + switch (StringUtils.toLowerCase(capability)) { + case StreamCapabilities.READBYTEBUFFER: + case StreamCapabilities.PREADBYTEBUFFER: + return true; + default: + return false; + } + } + } +} From cccde45645b6bd56a612ad843ca60e23ece6e270 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 27 Nov 2024 20:36:38 +0000 Subject: [PATCH 2/3] GH-3080: Tests of the new input stream. Based of the H2 stream test suite but * parameterized for on/off heap * expect no changes in buffer contents on out of range reads. Still one test failure. --- .../hadoop/util/H3ByteBufferInputStream.java | 15 +- .../hadoop/util/MockHadoopInputStream.java | 1 - .../util/TestHadoop3ByteBufferReadFully.java | 536 +++++++++--------- 3 files changed, 281 insertions(+), 271 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java index c9bed63a6f..bef6a7c1e5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java @@ -54,8 +54,19 @@ public FSDataInputStream getStream() { */ @Override public void readFully(final ByteBuffer buf) throws EOFException, IOException { - // use ByteBufferPositionedReadable.readFully() - final FSDataInputStream stream = getStream(); + performRead(getStream(), buf); + } + + /** + * Read the buffer fully through use of {@code ByteBufferPositionedReadable.readFully()} + * at the current location. + * + * @param buf a byte buffer to fill with data from the stream + * @throws EOFException the buffer length is greater than the file length + * @throws IOException other IO problems. + */ + // Visible for testing + static void performRead(final FSDataInputStream stream, final ByteBuffer buf) throws IOException { // remember the current position final long pos = stream.getPos(); final int size = buf.remaining(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java index 6c342fd5d1..58d494a208 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java @@ -109,5 +109,4 @@ static void rejectNegativePosition(final long pos) throws EOFException { throw new EOFException("Seek before file start: " + pos); } } - } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java index c4b5c947b9..c9e8a8249d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java @@ -19,378 +19,373 @@ package org.apache.parquet.hadoop.util; +import static org.apache.parquet.hadoop.util.H3ByteBufferInputStream.performRead; import static org.apache.parquet.hadoop.util.HadoopStreams.wrap; import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.util.StringUtils; import org.apache.parquet.hadoop.TestUtils; import org.apache.parquet.io.SeekableInputStream; +import org.jetbrains.annotations.NotNull; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Test {@code ByteBufferPositionedReadable.readFully()} reads. + * Parameterized on heap vs. direct buffers. */ +@RunWith(Parameterized.class) public class TestHadoop3ByteBufferReadFully { - @Test - public void testHeapReadFullySmallBuffer() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(8); + public static final int LEN = 10; - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + Object[][] data = new Object[][] {{"heap", true}, {"direct", false}}; + return Arrays.asList(data); + } - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); + /** + * Use a heap buffer? + */ + private final boolean useHeap; - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); + public TestHadoop3ByteBufferReadFully(final String type, final boolean useHeap) { + this.useHeap = useHeap; + } - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + /** + * Allocate a buffer; choice of on/off heap depends on test suite options. + * @param capacity buffer capacity. + * @return the buffer. + */ + private ByteBuffer allocate(int capacity) { + return useHeap ? ByteBuffer.allocate(capacity) : ByteBuffer.allocateDirect(capacity); } + /** + * Read a buffer smaller than the source file. + */ @Test - public void testHeapReadFullyLargeBuffer() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocate(20); + public void testReadFullySmallBuffer() throws Exception { + ByteBuffer readBuffer = allocate(8); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + FSDataInputStream hadoopStream = stream(); - assertThrowsEOFException(hadoopStream, 0, readBuffer); - - // NOTE: This behavior differs from readFullyHeapBuffer because direct uses - // several read operations that will read up to the end of the input. This - // is a correct value because the bytes in the buffer are valid. This - // behavior can't be implemented for the heap buffer without using the read - // method instead of the readFully method on the underlying - // FSDataInputStream. - assertPositionAndLimit(readBuffer, 10, 20); + assertBufferRead(hadoopStream, readBuffer, 8, 8); + assertPositionAndLimit(readBuffer, 8, 8); + // buffer is full so no more data is read. + assertBufferRead(hadoopStream, readBuffer, 8, 8); + assertBufferMatches(readBuffer, 0); } - private static void assertPositionAndLimit(ByteBuffer readBuffer, int pos, int limit) { - assertPosition(readBuffer, pos); - assertLimit(readBuffer, limit); + /** + * Read more than the file size, require EOF exceptions to be raised. + */ + @Test + public void testReadFullyLargeBuffer() throws Exception { + final ByteBuffer readBuffer = allocate(20); + + FSDataInputStream hadoopStream = stream(); + + assertThrowsEOFException(hadoopStream, readBuffer); + + // EOF check happened before the read -at least with this test stream. + assertPositionAndLimit(readBuffer, 0, 20); } - private static void assertPosition(final ByteBuffer readBuffer, final int pos) { - Assert.assertEquals("Buffer Position", pos, readBuffer.position()); + /** + * Seek to the file, try to read a buffer more than allowed. + */ + @Test + public void testReadFullyFromOffset() throws Exception { + final int size = 5; + final ByteBuffer readBuffer = allocate(size); + + FSDataInputStream hadoopStream = stream(); + hadoopStream.seek(6); + + // read past EOF is a failure + assertThrowsEOFException(hadoopStream, readBuffer); + // stream does not change position + assertStreamAt(hadoopStream, 6); + + // reduce buffer limit + readBuffer.limit(4); + // now the read works. + assertBufferRead(hadoopStream, readBuffer, 4, 4); } - private static void assertLimit(final ByteBuffer readBuffer, final int limit) { - Assert.assertEquals("Buffer Limit", limit, readBuffer.limit()); + @NotNull private static FSDataInputStream stream() { + return new FSDataInputStream(new MockByteBufferReadFullyInputStream()); } + /** + * Read exactly the size of the file. + */ @Test - public void testHeapReadFullyJustRight() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); + public void testReadFullyJustRight() throws Exception { + ByteBuffer readBuffer = allocate(LEN); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + FSDataInputStream hadoopStream = stream(); // reads all of the bytes available without EOFException - hadoopStream.readFully(0, readBuffer); - assertPosition(readBuffer, 10); + assertBufferRead(hadoopStream, readBuffer, LEN, LEN); // trying to read 0 more bytes doesn't result in EOFException hadoopStream.readFully(11, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + assertBufferMatches(readBuffer, 0); } + /** + * Read with the buffer position set to a value within the buffer. + */ @Test - public void testHeapReadFullySmallReads() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testHeapReadFullyPosition() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); + public void testReadFullyPosition() throws Exception { + ByteBuffer readBuffer = allocate(LEN); readBuffer.position(3); readBuffer.mark(); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); + FSDataInputStream hadoopStream = stream(); + assertBufferRead(hadoopStream, readBuffer, LEN, LEN); + assertBufferRead(hadoopStream, readBuffer, LEN, LEN); + // reset to where the mark is. readBuffer.reset(); Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); } + /** + * Limit the buffer size, read with it + * @throws Exception + */ @Test - public void testHeapReadFullyLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); - readBuffer.limit(7); + public void testReadFullyLimit() throws Exception { + ByteBuffer readBuffer = allocate(LEN); + final int smallLimit = 7; + readBuffer.limit(smallLimit); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + FSDataInputStream hadoopStream = stream(); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); + assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); + hadoopStream.seek(0); + assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); + assertBufferMatches(readBuffer, 0); - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - - readBuffer.position(7); - readBuffer.limit(10); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + // recycle the buffer with a larger value and continue + // reading from the end of the last read. + readBuffer.position(smallLimit); + readBuffer.limit(LEN); + assertBufferRead(hadoopStream, readBuffer, LEN, LEN); + assertBufferMatches(readBuffer, 0); } @Test - public void testHeapReadFullyPositionAndLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); + public void testReadFullyPositionAndLimit() throws Exception { + ByteBuffer readBuffer = allocate(LEN); readBuffer.position(3); - readBuffer.limit(7); + final int smallLimit = 7; + readBuffer.limit(smallLimit); readBuffer.mark(); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + FSDataInputStream hadoopStream = stream(); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); + assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); + assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); readBuffer.reset(); Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); - readBuffer.position(7); - readBuffer.limit(10); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); + readBuffer.position(smallLimit); + readBuffer.limit(LEN); + assertBufferRead(hadoopStream, readBuffer, LEN, LEN); readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, smallLimit), readBuffer); + // assertBufferMatches(readBuffer, 0); } - @Test - public void testDirectReadFullySmallBuffer() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(8); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); - } - - @Test - public void testDirectReadFullyLargeBuffer() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - final int position = 0; - assertThrowsEOFException(hadoopStream, position, readBuffer); - - // NOTE: This behavior differs from readFullyHeapBuffer because direct uses - // several read operations that will read up to the end of the input. This - // is a correct value because the bytes in the buffer are valid. This - // behavior can't be implemented for the heap buffer without using the read - // method instead of the readFully method on the underlying - // FSDataInputStream. - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(20, readBuffer.limit()); - } - - private static void assertThrowsEOFException( - final FSDataInputStream hadoopStream, final int position, final ByteBuffer readBuffer) { + private static void assertThrowsEOFException(final FSDataInputStream hadoopStream, final ByteBuffer readBuffer) { TestUtils.assertThrows("Should throw EOFException", EOFException.class, () -> { - hadoopStream.readFully(position, readBuffer); + performRead(hadoopStream, readBuffer); return null; }); } @Test - public void testDirectReadFullyJustRight() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - // reads all of the bytes available without EOFException - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - // trying to read 0 more bytes doesn't result in EOFException - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + public void testCreateStreamNoByteBufferPositionedReadable() { + final SeekableInputStream s = wrap(new FSDataInputStream(new MockHadoopInputStream())); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); } @Test - public void testDirectReadFullySmallReads() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); + public void testDoubleWrapNoByteBufferPositionedReadable() { + final SeekableInputStream s = + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); } @Test - public void testDirectReadFullyPosition() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - readBuffer.position(3); - readBuffer.mark(); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + public void testCreateStreamWithByteBufferPositionedReadable() { + final SeekableInputStream s = wrap(stream()); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); } @Test - public void testDirectReadFullyLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - readBuffer.limit(7); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - - readBuffer.position(7); - readBuffer.limit(10); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + public void testDoubleWrapByteBufferPositionedReadable() { + final SeekableInputStream s = + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); } + /** + * The buffer reading stream is only selected if the stream declares support; + * implementing the interface is not enough. + */ @Test - public void testDirectReadFullyPositionAndLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - readBuffer.position(3); - readBuffer.limit(7); - readBuffer.mark(); + public void testPositionedReadableNoCapability() { + class IncapableStream extends MockByteBufferReadFullyInputStream { + @Override + public boolean hasCapability(final String capability) { + return false; + } + } + final InputStream in = new IncapableStream(); + final SeekableInputStream s = wrap(new FSDataInputStream(in)); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + } - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + /** + * What happens if a stream declares support for the interface, + * but doesn't actually do it? + * The check is based on trust: if the stream lied -it doesn't work. + */ + @Test + public void testCapabilityWithoutInterface() { + class InconsistentStream extends MockHadoopInputStream + implements ByteBufferPositionedReadable, StreamCapabilities { + @Override + public boolean hasCapability(final String capability) { + return StringUtils.toLowerCase(capability).equals(StreamCapabilities.PREADBYTEBUFFER); + } - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); + @Override + public int read(final long position, final ByteBuffer buf) throws IOException { + return 0; + } - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); + @Override + public void readFully(final long position, final ByteBuffer buf) throws IOException {} + } - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); + final InputStream in = new InconsistentStream(); + final SeekableInputStream s = wrap(new FSDataInputStream(in)); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + } - readBuffer.position(7); - readBuffer.limit(10); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); + public static void assertBufferMatches(ByteBuffer readBuffer, int filePosition) { + readBuffer.flip(); + final int remaining = readBuffer.remaining(); + byte[] actual = getBytes(readBuffer); + byte[] expected = Arrays.copyOfRange(TEST_ARRAY, filePosition, remaining); + Assert.assertEquals( + "Buffer contents from data offset " + filePosition + " with length " + remaining, + stringify(expected), + stringify(actual)); + } - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + /** + * Gets the bytes of the buffer. This sets the buffer remaining + * value to 0. + * @param buffer buffer. + * @return buffer contents as bytes. + */ + public static byte[] getBytes(ByteBuffer buffer) { + byte[] byteArray = new byte[buffer.remaining()]; + buffer.get(byteArray); + return byteArray; } - @Test - public void testCreateStreamNoByteBufferPositionedReadable() { - final SeekableInputStream s = wrap(new FSDataInputStream(new MockHadoopInputStream())); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + /** + * Map a byte array to hex values. + * Of limited value once the byte value is greater than 15 + * as the string is hard to read. + * @param array source data + * @return string list. + */ + private static String stringify(byte[] array) { + // convert to offset of lower case A, to make those assertions meaningful + final int l = array.length; + StringBuilder chars = new StringBuilder(l); + for (byte b : array) { + chars.append(Integer.toHexString(b)); + } + return chars.toString(); } - @Test - public void testDoubleWrapNoByteBufferPositionedReadable() { - final SeekableInputStream s = - wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + /** + * Assert the current buffer position and limit. + * @param readBuffer buffer + * @param bufferPosition buffer position. + * @param limit buffer limit + */ + private static void assertPositionAndLimit(ByteBuffer readBuffer, int bufferPosition, int limit) { + Assert.assertEquals("Buffer Position", bufferPosition, readBuffer.position()); + Assert.assertEquals("Buffer Limit", limit, readBuffer.limit()); } - @Test - public void testCreateStreamWithByteBufferPositionedReadable() { - final SeekableInputStream s = wrap(new FSDataInputStream(new MockByteBufferReadFullyInputStream())); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + /** + * Assert the stream position is at the expected value. + * @param hadoopStream stream + * @param pos expected position + * @throws IOException exception raised on getPos() + */ + private static void assertStreamAt(final FSDataInputStream hadoopStream, long pos) throws IOException { + Assert.assertEquals("Read position of stream", pos, hadoopStream.getPos()); } - @Test - public void testDoubleWrapByteBufferPositionedReadable() { - final SeekableInputStream s = - wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + /** + * Read a buffer at the current position through {@link H3ByteBufferInputStream#performRead(FSDataInputStream, ByteBuffer)}. + * Assert that the stream buffer position and limit are what is expected + * + * @param hadoopStream stream + * @param readBuffer buffer to fill + * @param bufferPosition final buffer position + * @param limit final buffer limit + * + * @throws IOException read failure + */ + private static void assertBufferRead( + final FSDataInputStream hadoopStream, + final ByteBuffer readBuffer, + final int bufferPosition, + final int limit) + throws IOException { + final long pos = hadoopStream.getPos(); + final int remaining = readBuffer.remaining(); + performRead(hadoopStream, readBuffer); + assertPositionAndLimit(readBuffer, bufferPosition, limit); + assertStreamAt(hadoopStream, pos + remaining); } /** * Input stream which claims to implement ByteBufferPositionedReadable */ - private static final class MockByteBufferReadFullyInputStream extends MockHadoopInputStream + private static class MockByteBufferReadFullyInputStream extends MockHadoopInputStream implements ByteBufferPositionedReadable, StreamCapabilities { @Override @@ -405,23 +400,28 @@ public void readFully(final long position, final ByteBuffer buf) throws IOExcept // validation rejectNegativePosition(position); final int toRead = buf.remaining(); - if (getPos() + length() > toRead) { - throw new EOFException("Read past " + length()); + if (toRead == 0) { + return; + } + if (toRead + position > length()) { + throw new EOFException("ByteBuffer.readFully(" + position + + ") buffer size: " + toRead + + " reads past file length: " + length()); } // return the subset of the data byte[] result = new byte[toRead]; - System.arraycopy(data(), 0, result, 0, toRead); + System.arraycopy(data(), (int) position, result, 0, toRead); buf.put(result); } + /** + * Declare support for ByteBufferPositionedReadable. + * This is the only way that an implementation wil be picked up. + * @param capability string to query the stream support for. + * @return + */ public boolean hasCapability(final String capability) { - switch (StringUtils.toLowerCase(capability)) { - case StreamCapabilities.READBYTEBUFFER: - case StreamCapabilities.PREADBYTEBUFFER: - return true; - default: - return false; - } + return StringUtils.toLowerCase(capability).equals(StreamCapabilities.PREADBYTEBUFFER); } } } From 455403ef83dc155cc51e9c21df3ccf32aeb92645 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 3 Dec 2024 19:03:56 +0000 Subject: [PATCH 3/3] GH-3080: lots of work on the tests. * changing how stream capabilities are set up and queried, makes it easy to generate streams with different declared behaviours. * pull out common assertions * lots of javadoc of what each test case is trying to do. + all the tests are happy. --- .../hadoop/util/H3ByteBufferInputStream.java | 8 +- .../util/TestHadoop3ByteBufferReadFully.java | 236 +++++++++++++----- 2 files changed, 178 insertions(+), 66 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java index bef6a7c1e5..61b9f3aa85 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java @@ -59,19 +59,23 @@ public void readFully(final ByteBuffer buf) throws EOFException, IOException { /** * Read the buffer fully through use of {@code ByteBufferPositionedReadable.readFully()} - * at the current location. + * from the current location. + * That is it reads from stream[pos] to stream[pos + buf.remaining() -1] * * @param buf a byte buffer to fill with data from the stream + * @return number of bytes read. + * * @throws EOFException the buffer length is greater than the file length * @throws IOException other IO problems. */ // Visible for testing - static void performRead(final FSDataInputStream stream, final ByteBuffer buf) throws IOException { + static int performRead(final FSDataInputStream stream, final ByteBuffer buf) throws IOException { // remember the current position final long pos = stream.getPos(); final int size = buf.remaining(); stream.readFully(pos, buf); // then move read position on afterwards. stream.seek(pos + size); + return size; } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java index c9e8a8249d..0481a1fdf7 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java @@ -19,23 +19,23 @@ package org.apache.parquet.hadoop.util; +import static org.apache.hadoop.fs.StreamCapabilities.READBYTEBUFFER; import static org.apache.parquet.hadoop.util.H3ByteBufferInputStream.performRead; import static org.apache.parquet.hadoop.util.HadoopStreams.wrap; import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.util.StringUtils; import org.apache.parquet.hadoop.TestUtils; import org.apache.parquet.io.SeekableInputStream; -import org.jetbrains.annotations.NotNull; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,11 +48,14 @@ @RunWith(Parameterized.class) public class TestHadoop3ByteBufferReadFully { - public static final int LEN = 10; + /** + * The size of the stream. + */ + private static final int LEN = TEST_ARRAY.length; - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "heap={0}") public static Collection data() { - Object[][] data = new Object[][] {{"heap", true}, {"direct", false}}; + Object[][] data = new Object[][] {{true}, {false}}; return Arrays.asList(data); } @@ -61,13 +64,20 @@ public static Collection data() { */ private final boolean useHeap; - public TestHadoop3ByteBufferReadFully(final String type, final boolean useHeap) { + /** + * Instantiate test suite. + * + * @param useHeap use a heap buffer? + */ + public TestHadoop3ByteBufferReadFully(final boolean useHeap) { this.useHeap = useHeap; } /** * Allocate a buffer; choice of on/off heap depends on test suite options. + * * @param capacity buffer capacity. + * * @return the buffer. */ private ByteBuffer allocate(int capacity) { @@ -87,11 +97,11 @@ public void testReadFullySmallBuffer() throws Exception { assertPositionAndLimit(readBuffer, 8, 8); // buffer is full so no more data is read. assertBufferRead(hadoopStream, readBuffer, 8, 8); - assertBufferMatches(readBuffer, 0); + verifyBufferMatches(readBuffer, 0); } /** - * Read more than the file size, require EOF exceptions to be raised. + * Read more than the file size, require an EOF exception to be raised. */ @Test public void testReadFullyLargeBuffer() throws Exception { @@ -107,6 +117,8 @@ public void testReadFullyLargeBuffer() throws Exception { /** * Seek to the file, try to read a buffer more than allowed. + * This fails because readFully() requires the whole buffer to be filled. + * When the buffer limit is reduced it will work. */ @Test public void testReadFullyFromOffset() throws Exception { @@ -127,7 +139,12 @@ public void testReadFullyFromOffset() throws Exception { assertBufferRead(hadoopStream, readBuffer, 4, 4); } - @NotNull private static FSDataInputStream stream() { + /** + * Create a data input stream wrapping an {@link MockByteBufferReadFullyInputStream}. + * + * @return in input stream. + */ + private static FSDataInputStream stream() { return new FSDataInputStream(new MockByteBufferReadFullyInputStream()); } @@ -146,7 +163,8 @@ public void testReadFullyJustRight() throws Exception { // trying to read 0 more bytes doesn't result in EOFException hadoopStream.readFully(11, readBuffer); - assertBufferMatches(readBuffer, 0); + // buffer unchanged + verifyBufferMatches(readBuffer, 0); } /** @@ -168,8 +186,7 @@ public void testReadFullyPosition() throws Exception { } /** - * Limit the buffer size, read with it - * @throws Exception + * Limit the buffer size, read it. */ @Test public void testReadFullyLimit() throws Exception { @@ -177,20 +194,27 @@ public void testReadFullyLimit() throws Exception { final int smallLimit = 7; readBuffer.limit(smallLimit); + // read up to the limit, twice. FSDataInputStream hadoopStream = stream(); - assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); hadoopStream.seek(0); + // the buffer is now full, so no bytes are read. + // the the position and the limit are unchanged. assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); + // and the stream is still at position zero. + assertStreamAt(hadoopStream, 0); - assertBufferMatches(readBuffer, 0); + verifyBufferMatches(readBuffer, 0); // recycle the buffer with a larger value and continue // reading from the end of the last read. readBuffer.position(smallLimit); readBuffer.limit(LEN); + hadoopStream.seek(smallLimit); + + assertStreamAt(hadoopStream, smallLimit); assertBufferRead(hadoopStream, readBuffer, LEN, LEN); - assertBufferMatches(readBuffer, 0); + verifyBufferMatches(readBuffer, 0); } @Test @@ -215,41 +239,78 @@ public void testReadFullyPositionAndLimit() throws Exception { assertBufferRead(hadoopStream, readBuffer, LEN, LEN); readBuffer.reset(); Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, smallLimit), readBuffer); - // assertBufferMatches(readBuffer, 0); } + /** + * Assert that a buffer read raises EOFException. + * + * @param hadoopStream stream to read + * @param readBuffer target buffer. + */ private static void assertThrowsEOFException(final FSDataInputStream hadoopStream, final ByteBuffer readBuffer) { - TestUtils.assertThrows("Should throw EOFException", EOFException.class, () -> { + TestUtils.assertThrows("Must throw EOFException", EOFException.class, () -> { performRead(hadoopStream, readBuffer); return null; }); } + /** + * Regression test: verify that creating a stream for {@link MockHadoopInputStream} + * still generates an {@link H1SeekableInputStream}. + */ @Test - public void testCreateStreamNoByteBufferPositionedReadable() { - final SeekableInputStream s = wrap(new FSDataInputStream(new MockHadoopInputStream())); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + public void testCreateH1Stream() { + assertStreamClass(H1SeekableInputStream.class, wrap(new FSDataInputStream(new MockHadoopInputStream()))); } + /** + * Regression test: verify that creating a stream which implements + * ByteBufferReadable but doesn't declare the capability generates {@link H2SeekableInputStream}. + */ @Test - public void testDoubleWrapNoByteBufferPositionedReadable() { - final SeekableInputStream s = - wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); + public void testDoubleWrapByteBufferStream() { + assertStreamClass( + H2SeekableInputStream.class, + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferInputStream())))); + } - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + /** + * Regression test: verify that creating a stream which implements + * ByteBufferReadable generates {@link H2SeekableInputStream}. + */ + @Test + public void testDoubleWrapByteBufferStreamWithCapability() { + assertStreamClass( + H2SeekableInputStream.class, + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferInputStream(READBYTEBUFFER))))); + } + + /** + * Assert that an instantiated stream class matches the expected class. + * @param expected expected class + * @param stream stream to validate + */ + private static void assertStreamClass( + final Class expected, final SeekableInputStream stream) { + Assert.assertEquals("Wrong stream class: " + stream, expected, stream.getClass()); } + /** + * If a stream implements "in:preadbytebuffer" it gets bound to a H3ByteBufferInputStream. + */ @Test public void testCreateStreamWithByteBufferPositionedReadable() { - final SeekableInputStream s = wrap(stream()); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + assertStreamClass(H3ByteBufferInputStream.class, wrap(stream())); } + /** + * + */ @Test public void testDoubleWrapByteBufferPositionedReadable() { - final SeekableInputStream s = - wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + assertStreamClass( + H3ByteBufferInputStream.class, + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream())))); } /** @@ -258,15 +319,9 @@ public void testDoubleWrapByteBufferPositionedReadable() { */ @Test public void testPositionedReadableNoCapability() { - class IncapableStream extends MockByteBufferReadFullyInputStream { - @Override - public boolean hasCapability(final String capability) { - return false; - } - } - final InputStream in = new IncapableStream(); - final SeekableInputStream s = wrap(new FSDataInputStream(in)); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + assertStreamClass( + H2SeekableInputStream.class, + wrap(new FSDataInputStream(new MockByteBufferReadFullyInputStream(READBYTEBUFFER)))); } /** @@ -278,9 +333,10 @@ public boolean hasCapability(final String capability) { public void testCapabilityWithoutInterface() { class InconsistentStream extends MockHadoopInputStream implements ByteBufferPositionedReadable, StreamCapabilities { + @Override public boolean hasCapability(final String capability) { - return StringUtils.toLowerCase(capability).equals(StreamCapabilities.PREADBYTEBUFFER); + return StringUtils.toLowerCase(capability).equals(PREADBYTEBUFFER); } @Override @@ -292,12 +348,19 @@ public int read(final long position, final ByteBuffer buf) throws IOException { public void readFully(final long position, final ByteBuffer buf) throws IOException {} } - final InputStream in = new InconsistentStream(); - final SeekableInputStream s = wrap(new FSDataInputStream(in)); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + assertStreamClass(H3ByteBufferInputStream.class, wrap(new FSDataInputStream(new InconsistentStream()))); } - public static void assertBufferMatches(ByteBuffer readBuffer, int filePosition) { + /** + * Assert that the buffer contents match those of the input data from + * the offset filePosition. + * This operation reads the buffer data, so must be used after any other + * assertions about buffer, size, position etc. + * + * @param readBuffer buffer to examine + * @param filePosition file position. + */ + public static void verifyBufferMatches(ByteBuffer readBuffer, int filePosition) { readBuffer.flip(); final int remaining = readBuffer.remaining(); byte[] actual = getBytes(readBuffer); @@ -309,9 +372,11 @@ public static void assertBufferMatches(ByteBuffer readBuffer, int filePosition) } /** - * Gets the bytes of the buffer. This sets the buffer remaining + * Gets the bytes of the buffer. This sets the buffer.remaining() * value to 0. + * * @param buffer buffer. + * * @return buffer contents as bytes. */ public static byte[] getBytes(ByteBuffer buffer) { @@ -324,7 +389,9 @@ public static byte[] getBytes(ByteBuffer buffer) { * Map a byte array to hex values. * Of limited value once the byte value is greater than 15 * as the string is hard to read. + * * @param array source data + * * @return string list. */ private static String stringify(byte[] array) { @@ -338,7 +405,8 @@ private static String stringify(byte[] array) { } /** - * Assert the current buffer position and limit. + * Assert the current buffer position and limit are as expected + * * @param readBuffer buffer * @param bufferPosition buffer position. * @param limit buffer limit @@ -350,8 +418,10 @@ private static void assertPositionAndLimit(ByteBuffer readBuffer, int bufferPosi /** * Assert the stream position is at the expected value. + * * @param hadoopStream stream * @param pos expected position + * * @throws IOException exception raised on getPos() */ private static void assertStreamAt(final FSDataInputStream hadoopStream, long pos) throws IOException { @@ -360,34 +430,82 @@ private static void assertStreamAt(final FSDataInputStream hadoopStream, long po /** * Read a buffer at the current position through {@link H3ByteBufferInputStream#performRead(FSDataInputStream, ByteBuffer)}. - * Assert that the stream buffer position and limit are what is expected + * Assert that the stream buffer position and limit are as expected. + * That is: the stream position has been moved forwards by the + * size of the buffer. * * @param hadoopStream stream * @param readBuffer buffer to fill - * @param bufferPosition final buffer position - * @param limit final buffer limit + * @param expectedBufferPosition final buffer position + * @param expectedLimit final buffer limit * * @throws IOException read failure */ private static void assertBufferRead( final FSDataInputStream hadoopStream, final ByteBuffer readBuffer, - final int bufferPosition, - final int limit) + final int expectedBufferPosition, + final int expectedLimit) throws IOException { final long pos = hadoopStream.getPos(); final int remaining = readBuffer.remaining(); - performRead(hadoopStream, readBuffer); - assertPositionAndLimit(readBuffer, bufferPosition, limit); + final int read = performRead(hadoopStream, readBuffer); + // the bytes read MUST match the buffer size, as this is a full buffer read. + Assert.assertEquals("bytes read from stream", remaining, read); + // the buffer position and limit match what was expected. + assertPositionAndLimit(readBuffer, expectedBufferPosition, expectedLimit); + // the stream has moved forwards. assertStreamAt(hadoopStream, pos + remaining); } /** - * Input stream which claims to implement ByteBufferPositionedReadable + * Input stream which claims to implement ByteBufferReadable in both interfaces and, optionally, + * in {@code hasCapability()}. + */ + private static class MockByteBufferInputStream extends MockHadoopInputStream + implements ByteBufferReadable, StreamCapabilities { + + private final String[] capabilities; + + /** + * Constructor. + * @param capabilities an array of capabilities to declare support for. + */ + private MockByteBufferInputStream(String... capabilities) { + this.capabilities = capabilities; + } + + @Override + public int read(final ByteBuffer buf) { + return 0; + } + + /** + * Does a stream have the + * @param capability string to query the stream support for. + * @return true if there is an entry in the capability list matching the argument. + */ + @Override + public boolean hasCapability(final String capability) { + return Arrays.stream(capabilities).anyMatch(c -> c.equals(capability)); + } + } + + /** + * Input stream which claims to implement ByteBufferPositionedReadable, + * unless constructed with a capability list that excludes it. */ - private static class MockByteBufferReadFullyInputStream extends MockHadoopInputStream + private static class MockByteBufferReadFullyInputStream extends MockByteBufferInputStream implements ByteBufferPositionedReadable, StreamCapabilities { + public MockByteBufferReadFullyInputStream() { + this(READBYTEBUFFER, PREADBYTEBUFFER); + } + + public MockByteBufferReadFullyInputStream(final String... capabilites) { + super(capabilites); + } + @Override public int read(final long position, final ByteBuffer buf) throws IOException { rejectNegativePosition(position); @@ -413,15 +531,5 @@ public void readFully(final long position, final ByteBuffer buf) throws IOExcept System.arraycopy(data(), (int) position, result, 0, toRead); buf.put(result); } - - /** - * Declare support for ByteBufferPositionedReadable. - * This is the only way that an implementation wil be picked up. - * @param capability string to query the stream support for. - * @return - */ - public boolean hasCapability(final String capability) { - return StringUtils.toLowerCase(capability).equals(StreamCapabilities.PREADBYTEBUFFER); - } } }