Skip to content

Commit

Permalink
Added offset index based parquet reading support (deephaven#4844)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Nov 23, 2023
1 parent eab615f commit 2c5ecbd
Show file tree
Hide file tree
Showing 19 changed files with 576 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@
package io.deephaven.parquet.base;

import org.apache.parquet.column.Dictionary;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.schema.PrimitiveType;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.util.Iterator;
import java.util.function.Supplier;

public interface ColumnChunkReader {
/**
* @return -1 if the current column doesn't guarantee fixed page size, otherwise the fixed page size
*/
int getPageFixedSize();

/**
* @return The number of rows in this ColumnChunk, or -1 if it's unknown.
*/
Expand All @@ -32,15 +29,28 @@ public interface ColumnChunkReader {
*/
int getMaxRl();

interface ColumnPageReaderIterator extends Iterator<ColumnPageReader>, AutoCloseable {
@Override
void close() throws IOException;
}
/**
* @return The offset index for this column chunk, or null if it not found in the metadata.
*/
@Nullable
OffsetIndex getOffsetIndex();

/**
* @return An iterator over individual parquet pages
*/
ColumnPageReaderIterator getPageIterator() throws IOException;
Iterator<ColumnPageReader> getPageIterator() throws IOException;

interface ColumnPageDirectAccessor {
/**
* Directly access a page reader for a given page number.
*/
ColumnPageReader getPageReader(final int pageNum);
}

/**
* @return An accessor for individual parquet pages
*/
ColumnPageDirectAccessor getPageAccessor();

/**
* @return Whether this column chunk uses a dictionary-based encoding on every page
Expand Down Expand Up @@ -69,4 +79,10 @@ public int getMaxId() {
}

PrimitiveType getType();

/**
* @return The "version" string from deephaven specific parquet metadata, or null if it's not present.
*/
@Nullable
String getVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.format.*;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand All @@ -27,7 +28,9 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Supplier;

import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
Expand All @@ -46,10 +49,18 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {
private final PageMaterializer.Factory nullMaterializerFactory;

private Path filePath;

ColumnChunkReaderImpl(
ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider,
Path rootPath, MessageType type, OffsetIndex offsetIndex, List<Type> fieldTypes) {
/**
* Number of rows in the row group of this column chunk.
*/
private final long numRows;
/**
* Version string from deephaven specific parquet metadata, or null if it's not present.
*/
private final String version;

ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, Path rootPath,
MessageType type, OffsetIndex offsetIndex, List<Type> fieldTypes, final long numRows,
final String version) {
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.rootPath = rootPath;
Expand All @@ -65,16 +76,13 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {
this.fieldTypes = fieldTypes;
this.dictionarySupplier = new LazyCachingSupplier<>(this::getDictionary);
this.nullMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType().getPrimitiveTypeName());
}

@Override
public int getPageFixedSize() {
return -1;
this.numRows = numRows;
this.version = version;
}

@Override
public long numRows() {
return numValues();
return numRows;
}

@Override
Expand All @@ -87,15 +95,26 @@ public int getMaxRl() {
return path.getMaxRepetitionLevel();
}

public final OffsetIndex getOffsetIndex() {
return offsetIndex;
}

@Override
public ColumnPageReaderIterator getPageIterator() {
public Iterator<ColumnPageReader> getPageIterator() {
final long dataPageOffset = columnChunk.meta_data.getData_page_offset();
if (offsetIndex == null) {
return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values(),
path, channelsProvider);
return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values());
} else {
return new ColumnPageReaderIteratorIndexImpl(path, channelsProvider);
return new ColumnPageReaderIteratorIndexImpl();
}
}

@Override
public final ColumnPageDirectAccessor getPageAccessor() {
if (offsetIndex == null) {
throw new UnsupportedOperationException("Cannot use direct accessor without offset index");
}
return new ColumnPageDirectAccessorImpl();
}

private Path getFilePath() {
Expand Down Expand Up @@ -166,6 +185,11 @@ public PrimitiveType getType() {
return path.getPrimitiveType();
}

@Override
public String getVersion() {
return version;
}

@NotNull
private Dictionary readDictionary(ReadableByteChannel file) throws IOException {
// explicitly not closing this, caller is responsible
Expand All @@ -192,21 +216,13 @@ private Dictionary readDictionary(ReadableByteChannel file) throws IOException {
return dictionaryPage.getEncoding().initDictionary(path, dictionaryPage);
}

class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator {
private final SeekableChannelsProvider channelsProvider;
private final class ColumnPageReaderIteratorImpl implements Iterator<ColumnPageReader> {
private long currentOffset;
private final ColumnDescriptor path;

private long remainingValues;

ColumnPageReaderIteratorImpl(final long startOffset,
final long numValues,
@NotNull final ColumnDescriptor path,
@NotNull final SeekableChannelsProvider channelsProvider) {
ColumnPageReaderIteratorImpl(final long startOffset, final long numValues) {
this.remainingValues = numValues;
this.currentOffset = startOffset;
this.path = path;
this.channelsProvider = channelsProvider;
}

@Override
Expand All @@ -217,7 +233,7 @@ public boolean hasNext() {
@Override
public ColumnPageReader next() {
if (!hasNext()) {
throw new RuntimeException("No next element");
throw new NoSuchElementException("No next element");
}
// NB: The channels provider typically caches channels; this avoids maintaining a handle per column chunk
try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(getFilePath())) {
Expand Down Expand Up @@ -254,28 +270,19 @@ public ColumnPageReader next() {
(encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY)
? dictionarySupplier
: () -> NULL_DICTIONARY;
return new ColumnPageReaderImpl(
channelsProvider, decompressor, pageDictionarySupplier,
return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier,
nullMaterializerFactory, path, getFilePath(), fieldTypes, readChannel.position(), pageHeader,
-1);
ColumnPageReaderImpl.NULL_NUM_VALUES);
} catch (IOException e) {
throw new RuntimeException("Error reading page header", e);
throw new UncheckedDeephavenException("Error reading page header", e);
}
}

@Override
public void close() {}
}

class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator {
private final SeekableChannelsProvider channelsProvider;
private final class ColumnPageReaderIteratorIndexImpl implements Iterator<ColumnPageReader> {
private int pos;
private final ColumnDescriptor path;

ColumnPageReaderIteratorIndexImpl(ColumnDescriptor path,
SeekableChannelsProvider channelsProvider) {
this.path = path;
this.channelsProvider = channelsProvider;
ColumnPageReaderIteratorIndexImpl() {
pos = 0;
}

Expand All @@ -287,20 +294,37 @@ public boolean hasNext() {
@Override
public ColumnPageReader next() {
if (!hasNext()) {
throw new RuntimeException("No next element");
throw new NoSuchElementException("No next element");
}
int rowCount =
(int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values())
- offsetIndex.getFirstRowIndex(pos) + 1);
// Following logic assumes that offsetIndex will store the number of values for a page instead of number
// of rows (which can be different for array and vector columns). This behavior is because of a bug on
// parquet writing side which got fixed in deephaven-core/pull/4844 and is only kept to support reading
// parquet files written before deephaven-core/pull/4844.
final int numValues = (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values())
- offsetIndex.getFirstRowIndex(pos) + 1);
ColumnPageReaderImpl columnPageReader =
new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier,
nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset(pos), null,
rowCount);
numValues);
pos++;
return columnPageReader;
}
}

private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {

ColumnPageDirectAccessorImpl() {}

@Override
public void close() throws IOException {}
public ColumnPageReader getPageReader(final int pageNum) {
if (pageNum < 0 || pageNum >= offsetIndex.getPageCount()) {
throw new IndexOutOfBoundsException(
"pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount());
}
// Page header and number of values will be populated later when we read the page header from the file
return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory,
path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null,
ColumnPageReaderImpl.NULL_NUM_VALUES);
}
}
}
Loading

0 comments on commit 2c5ecbd

Please sign in to comment.