Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Fix Reader leak by removing useless copy #12079

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

zizon
Copy link

@zizon zizon commented Jan 24, 2025

The ReadConf copy constructor will nullify the reader of source, leaving the reader of original unclosed

The ReadConf copy constructor will nullify the reader of source, leaving the reader of original unclosed
@zizon
Copy link
Author

zizon commented Jan 24, 2025

leak stacktrace looks like(iceberg 1.2.x)

2025-01-24T10:39:58.963+0800	WARN	Finalizer	org.apache.iceberg.hadoop.HadoopStreams	Unclosed input stream created by:
	org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.<init>(HadoopStreams.java:91)
	org.apache.iceberg.hadoop.HadoopStreams.wrap(HadoopStreams.java:55)
	org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183)
	com.facebook.presto.hive.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:23)
	com.facebook.presto.hive.HdfsEnvironment.doAs(HdfsEnvironment.java:80)
	com.facebook.presto.iceberg.HdfsInputFile.newStream(HdfsInputFile.java:58)
	org.apache.iceberg.parquet.ParquetIO$ParquetInputFile.newStream(ParquetIO.java:179)
	org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:774)
	org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:658)
	org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:231)
	org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:80)
	org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:71)
	org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:91)
	org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:188)
	org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:187)
	org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.hasNext(CloseableIterable.java:257)
	java.lang.Iterable.forEach(Iterable.java:74)
	org.apache.iceberg.deletes.Deletes.toPositionIndex(Deletes.java:138)

@aokolnychyi
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar self-requested a review January 24, 2025 04:48
@Fokko Fokko changed the title Fix Reader leak by removing useless copy Parquet: Fix Reader leak by removing useless copy Jan 24, 2025
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the history, this copy has been in there since the beginning. I don't think it is needed since the conf doesn't go outside of the class.

@zizon
Copy link
Author

zizon commented Jan 24, 2025 via email

@zizon
Copy link
Author

zizon commented Jan 24, 2025

I think I found the root cause.

2025-01-24T10:39:58.963+0800	WARN	Finalizer	org.apache.iceberg.hadoop.HadoopStreams	Unclosed input stream created by:
	org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.<init>(HadoopStreams.java:91)
	org.apache.iceberg.hadoop.HadoopStreams.wrap(HadoopStreams.java:55)
	org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183)
	com.facebook.presto.hive.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:23)
	com.facebook.presto.hive.HdfsEnvironment.doAs(HdfsEnvironment.java:80)
	com.facebook.presto.iceberg.HdfsInputFile.newStream(HdfsInputFile.java:58)
	org.apache.iceberg.parquet.ParquetIO$ParquetInputFile.newStream(ParquetIO.java:179). <-- this line 
	org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:774)
	org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:658)
	org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:231)
	org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:80)
	org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:71)
	org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:91)
	org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:188)
	org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:187)
	org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.hasNext(CloseableIterable.java:257)
	java.lang.Iterable.forEach(Iterable.java:74)
	org.apache.iceberg.deletes.Deletes.toPositionIndex(Deletes.java:138)

comments inline.
https://github.com/apache/iceberg/blob/main/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java#L96

  static SeekableInputStream stream(org.apache.iceberg.io.SeekableInputStream stream) {
    if (stream instanceof DelegatingInputStream) {
      // for somehow, it tries to optimistic steal the underlying stream, creating a new one by wrapping it.
      // the caller close the wrapper and underlying stream but not this intermediate one.
      InputStream wrapped = ((DelegatingInputStream) stream).getDelegate();
      if (wrapped instanceof FSDataInputStream) {
        return HadoopStreams.wrap((FSDataInputStream) wrapped);
      }
    }
    return new ParquetInputStreamAdapter(stream);
  }

A similar approach for the output version.

Can we just go through the Adapter path?

zizon added 2 commits January 25, 2025 00:56
Wrapping required a clear way to call close of the provided stream,
without also closing the underlying stream.
@zizon
Copy link
Author

zizon commented Jan 24, 2025

cc @nastra @Fokko

@@ -82,22 +82,10 @@ static OutputFile file(org.apache.iceberg.io.OutputFile file, Configuration conf
}

static SeekableInputStream stream(org.apache.iceberg.io.SeekableInputStream stream) {
if (stream instanceof DelegatingInputStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please add a test that tries to reproduce the original problem?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants