-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-3080: HadoopStreams to support ByteBufferPositionedReadable #3096
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.parquet.hadoop.util; | ||
|
||
import java.io.EOFException; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import org.apache.hadoop.fs.FSDataInputStream; | ||
|
||
/** | ||
* Class which implements {@link #readFully(ByteBuffer)} through | ||
* {@code ByteBufferPositionedReadable.readFully()}. | ||
* <p>This is implemented by HDFS and possibly other clients, | ||
*/ | ||
class H3ByteBufferInputStream extends H2SeekableInputStream { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine with the inheritance to reduce code duplication. |
||
public H3ByteBufferInputStream(final FSDataInputStream stream) { | ||
super(stream); | ||
} | ||
|
||
@Override | ||
public FSDataInputStream getStream() { | ||
return (FSDataInputStream) super.getStream(); | ||
} | ||
|
||
/** | ||
* Read the buffer fully through use of {@code ByteBufferPositionedReadable.readFully()} | ||
* at the current location. | ||
* <p>That operation is designed to not use the current reading position, rather | ||
* an absolute position is passed in. | ||
* In the use here the original read position is saved, and | ||
* after the read is finished a {@code seek()} call made to move the | ||
* cursor on. | ||
* | ||
* @param buf a byte buffer to fill with data from the stream | ||
* | ||
* @throws EOFException the buffer length is greater than the file length | ||
* @throws IOException other IO problems. | ||
*/ | ||
@Override | ||
public void readFully(final ByteBuffer buf) throws EOFException, IOException { | ||
performRead(getStream(), buf); | ||
} | ||
|
||
/** | ||
* Read the buffer fully through use of {@code ByteBufferPositionedReadable.readFully()} | ||
* from the current location. | ||
* That is it reads from stream[pos] to stream[pos + buf.remaining() -1] | ||
* | ||
* @param buf a byte buffer to fill with data from the stream | ||
* @return number of bytes read. | ||
* | ||
* @throws EOFException the buffer length is greater than the file length | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add |
||
* @throws IOException other IO problems. | ||
*/ | ||
// Visible for testing | ||
static int performRead(final FSDataInputStream stream, final ByteBuffer buf) throws IOException { | ||
// remember the current position | ||
final long pos = stream.getPos(); | ||
final int size = buf.remaining(); | ||
stream.readFully(pos, buf); | ||
// then move read position on afterwards. | ||
stream.seek(pos + size); | ||
return size; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,15 +19,16 @@ | |
|
||
package org.apache.parquet.hadoop.util; | ||
|
||
import static org.apache.hadoop.fs.StreamCapabilities.PREADBYTEBUFFER; | ||
import static org.apache.hadoop.fs.StreamCapabilities.READBYTEBUFFER; | ||
|
||
import java.io.InputStream; | ||
import java.util.Objects; | ||
import java.util.function.Function; | ||
import org.apache.hadoop.fs.ByteBufferReadable; | ||
import org.apache.hadoop.fs.FSDataInputStream; | ||
import org.apache.hadoop.fs.FSDataOutputStream; | ||
import org.apache.parquet.io.PositionOutputStream; | ||
import org.apache.parquet.io.SeekableInputStream; | ||
import org.apache.parquet.util.DynMethods; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -38,11 +39,6 @@ public class HadoopStreams { | |
|
||
private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class); | ||
|
||
private static final DynMethods.UnboundMethod hasCapabilitiesMethod = new DynMethods.Builder("hasCapabilities") | ||
.impl(FSDataInputStream.class, "hasCapabilities", String.class) | ||
.orNoop() | ||
.build(); | ||
|
||
/** | ||
* Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream} | ||
* implementation for Parquet readers. | ||
|
@@ -53,42 +49,14 @@ public class HadoopStreams { | |
public static SeekableInputStream wrap(FSDataInputStream stream) { | ||
Objects.requireNonNull(stream, "Cannot wrap a null input stream"); | ||
|
||
// Try to check using hasCapabilities(str) | ||
Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream); | ||
|
||
// If it is null, then fall back to the old method | ||
if (hasCapabilitiesResult != null) { | ||
if (hasCapabilitiesResult) { | ||
return new H2SeekableInputStream(stream); | ||
} else { | ||
return new H1SeekableInputStream(stream); | ||
} | ||
} | ||
|
||
return unwrapByteBufferReadableLegacy(stream).apply(stream); | ||
} | ||
|
||
/** | ||
* Is the inner stream byte buffer readable? | ||
* The test is 'the stream is not FSDataInputStream | ||
* and implements ByteBufferReadable' | ||
* <p> | ||
* This logic is only used for Hadoop <2.9.x, and <3.x.x | ||
* | ||
* @param stream stream to probe | ||
* @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable | ||
*/ | ||
private static Function<FSDataInputStream, SeekableInputStream> unwrapByteBufferReadableLegacy( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any behavior change of a wrapped stream after removing |
||
FSDataInputStream stream) { | ||
InputStream wrapped = stream.getWrappedStream(); | ||
if (wrapped instanceof FSDataInputStream) { | ||
LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream); | ||
return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped)); | ||
} | ||
if (stream.getWrappedStream() instanceof ByteBufferReadable) { | ||
return H2SeekableInputStream::new; | ||
// Check using hasCapabilities(str) | ||
if (stream.hasCapability(PREADBYTEBUFFER)) { | ||
LOG.debug("Using ByteBufferPositionedReadable to read {}", stream); | ||
return new H3ByteBufferInputStream(stream); | ||
} else if (isWrappedStreamByteBufferReadable(stream)) { | ||
return new H2SeekableInputStream(stream); | ||
} else { | ||
return H1SeekableInputStream::new; | ||
return new H1SeekableInputStream(stream); | ||
} | ||
} | ||
|
||
|
@@ -111,14 +79,8 @@ private static Function<FSDataInputStream, SeekableInputStream> unwrapByteBuffer | |
* the data, null when it cannot be determined because of missing hasCapabilities | ||
*/ | ||
private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) { | ||
if (hasCapabilitiesMethod.isNoop()) { | ||
// When the method is not available, just return a null | ||
return null; | ||
} | ||
|
||
boolean isByteBufferReadable = hasCapabilitiesMethod.invoke(stream, "in:readbytebuffer"); | ||
|
||
if (isByteBufferReadable) { | ||
if (stream.hasCapability(READBYTEBUFFER)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can avoid any reflection here because of the Hadoop version bump? |
||
// stream is issuing the guarantee that it implements the | ||
// API. Holds for all implementations in hadoop-* | ||
// since Hadoop 3.3.0 (HDFS-14111). | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why adding this but not used elsewhere?