Skip to content

Commit

Permalink
Address sorting for Parquet metadata and persistent data indexes (dee…
Browse files Browse the repository at this point in the history
…phaven#5423)

* Sort Parquet persistent index tables before writing them
* Correct Parquet metadata (TableInfo) for sort columns; address missing documentation and incorrect serialization
* Populate TableInfo.sortingColumns() for primary tables and index tables
  • Loading branch information
rcaudy authored Apr 26, 2024
1 parent 1ab63d5 commit c71506e
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.api.ColumnName;
import io.deephaven.api.SortColumn;
import io.deephaven.engine.table.Table;

import java.util.*;
Expand Down Expand Up @@ -95,6 +97,25 @@ public static Table withOrderForColumn(Table table, String columnName, SortingOr
return table.withAttributes(Map.of(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute));
}

/**
* Get the columns a {@link Table} is sorted by.
*
* @param table The table to interrogate
*
* @return A (possibly-empty) list of {@link SortColumn SortColumns} representing columns the table is sorted on and
* their associated sort order
*/
public static List<SortColumn> getSortedColumns(Table table) {
final String attribute = (String) table.getAttribute(Table.SORTED_COLUMNS_ATTRIBUTE);
if (attribute == null || attribute.isEmpty()) {
return Collections.emptyList();
}
return stringToMap(attribute, true).entrySet().stream().map(e -> {
final ColumnName columnName = ColumnName.of(e.getKey());
return e.getValue().isAscending() ? SortColumn.asc(columnName) : SortColumn.desc(columnName);
}).collect(Collectors.toList());
}

private static Map<String, SortingOrder> stringToMap(String attribute, boolean writable) {
if (attribute == null || attribute.isEmpty()) {
return writable ? new HashMap<>() : Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
//
package io.deephaven.parquet.table;

import io.deephaven.api.SortColumn;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.SortedColumnsAttribute;
import io.deephaven.engine.table.impl.indexer.DataIndexer;
import io.deephaven.engine.table.impl.select.FormulaColumn;
import io.deephaven.engine.table.impl.select.NullSelectColumn;
Expand All @@ -18,10 +20,7 @@
import io.deephaven.parquet.base.ParquetFileWriter;
import io.deephaven.parquet.base.ParquetUtils;
import io.deephaven.parquet.base.RowGroupWriter;
import io.deephaven.parquet.table.metadata.CodecInfo;
import io.deephaven.parquet.table.metadata.ColumnTypeInfo;
import io.deephaven.parquet.table.metadata.DataIndexInfo;
import io.deephaven.parquet.table.metadata.TableInfo;
import io.deephaven.parquet.table.metadata.*;
import io.deephaven.parquet.table.transfer.ArrayAndVectorTransfer;
import io.deephaven.parquet.table.transfer.StringDictionary;
import io.deephaven.parquet.table.transfer.TransferObject;
Expand All @@ -44,7 +43,6 @@
import java.io.IOException;
import java.nio.IntBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;

import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY;
Expand Down Expand Up @@ -152,7 +150,11 @@ static void write(
.or(() -> Optional.of(DataIndexer.getOrCreateDataIndex(t, info.indexColumnNames)))
.get()
.transform(DataIndexTransformer.builder().invertRowSet(t.getRowSet()).build());
final Table indexTable = dataIndex.table();
final Table indexTable = dataIndex.table().sort(info.indexColumnNames.toArray(new String[0]));
final TableInfo.Builder indexTableInfoBuilder = TableInfo.builder().addSortingColumns(
info.indexColumnNames.stream()
.map(cn -> SortColumnInfo.of(cn, SortColumnInfo.SortDirection.Ascending))
.toArray(SortColumnInfo[]::new));

cleanupFiles.add(info.destFile);
tableInfoBuilder.addDataIndexes(DataIndexInfo.of(
Expand All @@ -168,11 +170,19 @@ static void write(
}
write(indexTable, indexTable.getDefinition(), writeInstructionsToUse,
info.destFile.getAbsolutePath(), info.destFileForMetadata.getAbsolutePath(),
Collections.emptyMap(), TableInfo.builder(), NullParquetMetadataFileWriter.INSTANCE,
Collections.emptyMap(), indexTableInfoBuilder, NullParquetMetadataFileWriter.INSTANCE,
computedCache);
}
}
}

// SortedColumnsAttribute effectively only stores (zero or more) individual columns by which the table is
// sorted, rather than ordered sets expressing multi-column sorts. Given that mismatch, we can only reflect
// a single column sort in the metadata at this time.
final List<SortColumn> sortedColumns = SortedColumnsAttribute.getSortedColumns(t);
if (!sortedColumns.isEmpty()) {
tableInfoBuilder.addSortingColumns(SortColumnInfo.of(sortedColumns.get(0)));
}
write(t, definition, writeInstructions, destFilePath, destFilePathForMetadata, incomingMeta,
tableInfoBuilder, metadataFileWriter, computedCache);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ private static String minusParquetSuffix(@NotNull final String s) {
* indexing column {@code "IndexingColName"}, the method will return
* {@code ".dh_metadata/indexes/IndexingColName/index_IndexingColName_table.parquet"} on unix systems.
*/
private static String getRelativeIndexFilePath(@NotNull final File tableDest, @NotNull final String[] columnNames) {
@VisibleForTesting
static String getRelativeIndexFilePath(@NotNull final File tableDest, @NotNull final String... columnNames) {
final String columns = String.join(",", columnNames);
return String.format(".dh_metadata%sindexes%s%s%sindex_%s_%s", File.separator, File.separator, columns,
File.separator, columns, tableDest.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io.deephaven.parquet.table.metadata.ColumnTypeInfo;
import io.deephaven.parquet.table.metadata.DataIndexInfo;
import io.deephaven.parquet.table.metadata.GroupingColumnInfo;
import io.deephaven.parquet.table.metadata.SortColumnInfo;
import io.deephaven.parquet.table.metadata.TableInfo;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.RowGroup;
Expand All @@ -57,7 +57,6 @@ public class ParquetTableLocation extends AbstractTableLocation {
private static final String IMPLEMENTATION_NAME = ParquetColumnLocation.class.getSimpleName();

private final ParquetInstructions readInstructions;
private final List<SortColumn> sortingColumns;
private final ParquetFileReader parquetFileReader;
private final int[] rowGroupIndices;

Expand All @@ -69,6 +68,7 @@ public class ParquetTableLocation extends AbstractTableLocation {
private final Map<String, GroupingColumnInfo> groupingColumns;
private final List<DataIndexInfo> dataIndexes;
private final Map<String, ColumnTypeInfo> columnTypes;
private final List<SortColumn> sortingColumns;

private final String version;

Expand Down Expand Up @@ -116,7 +116,7 @@ public ParquetTableLocation(@NotNull final TableKey tableKey,
groupingColumns = tableInfo.groupingColumnMap();
dataIndexes = tableInfo.dataIndexes();
columnTypes = tableInfo.columnTypeMap();
sortingColumns = tableInfo.sortingColumns();
sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns());

if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) {
// We do not have the last modified time for non-file URIs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table.metadata;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.deephaven.annotations.SimpleStyle;
import io.deephaven.api.ColumnName;
import io.deephaven.api.SortColumn;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Parameter;
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.stream.Collectors;

@Immutable
@SimpleStyle
@JsonSerialize(as = ImmutableSortColumnInfo.class)
@JsonDeserialize(as = ImmutableSortColumnInfo.class)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public abstract class SortColumnInfo {

public enum SortDirection {
Ascending, Descending
}

@Parameter
public abstract String columnName();

@Parameter
public abstract SortDirection sortDirection();

@Check
final void checkColumnName() {
ColumnName.of(columnName());
}

public final SortColumn sortColumn() {
return sortDirection() == SortDirection.Ascending
? SortColumn.asc(ColumnName.of(columnName()))
: SortColumn.desc(ColumnName.of(columnName()));
}

public static List<SortColumn> sortColumns(@NotNull final List<SortColumnInfo> sortColumnInfos) {
return sortColumnInfos.stream().map(SortColumnInfo::sortColumn).collect(Collectors.toList());
}

public static SortColumnInfo of(@NotNull final SortColumn sortColumn) {
return of(sortColumn.column().name(), sortColumn.order() == SortColumn.Order.ASCENDING
? SortDirection.Ascending
: SortDirection.Descending);
}

public static SortColumnInfo of(@NotNull final String columnName, @NotNull final SortDirection sortDirection) {
return ImmutableSortColumnInfo.of(columnName, sortDirection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.deephaven.annotations.BuildableStyle;
import io.deephaven.api.SortColumn;
import org.immutables.value.Value;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
Expand All @@ -23,7 +24,7 @@
/**
* Representation class for per-table information stored in key-value metadata for Deephaven-written Parquet files.
*/
@Value.Immutable
@Immutable
@BuildableStyle
@JsonSerialize(as = ImmutableTableInfo.class)
@JsonDeserialize(as = ImmutableTableInfo.class)
Expand Down Expand Up @@ -59,7 +60,7 @@ public final Map<String, ColumnTypeInfo> columnTypeMap() {
/**
* @return The Deephaven release version used to write the parquet file
*/
@Value.Default
@Default
public String version() {
final String version = TableInfo.class.getPackage().getImplementationVersion();
// noinspection ReplaceNullCheck
Expand All @@ -86,10 +87,13 @@ public String version() {
*/
public abstract List<ColumnTypeInfo> columnTypes();

public abstract List<SortColumn> sortingColumns();

/**
* @return List of {@link SortColumnInfo sort columns} representing the sort order of the table. Note that these are
* ordered by precedence, representing a multi-column sort.
*/
public abstract List<SortColumnInfo> sortingColumns();

@Value.Check
@Check
final void checkVersion() {
if (version().isEmpty()) {
throw new IllegalArgumentException("Empty version");
Expand All @@ -110,23 +114,23 @@ public interface Builder {

Builder addAllGroupingColumns(Iterable<? extends GroupingColumnInfo> groupingColumns);

Builder addDataIndexes(DataIndexInfo info);
Builder addDataIndexes(DataIndexInfo dataIndex);

Builder addDataIndexes(DataIndexInfo... infos);
Builder addDataIndexes(DataIndexInfo... dataIndexes);

Builder addAllDataIndexes(Iterable<? extends DataIndexInfo> infos);
Builder addAllDataIndexes(Iterable<? extends DataIndexInfo> dataIndexes);

Builder addColumnTypes(ColumnTypeInfo columnType);

Builder addColumnTypes(ColumnTypeInfo... columnTypes);

Builder addAllColumnTypes(Iterable<? extends ColumnTypeInfo> columnTypes);

Builder addSortingColumns(SortColumn sortPair);
Builder addSortingColumns(SortColumnInfo sortColumns);

Builder addSortingColumns(SortColumn... sortPairs);
Builder addSortingColumns(SortColumnInfo... sortColumns);

Builder addAllSortingColumns(Iterable<? extends SortColumn> sortPairs);
Builder addAllSortingColumns(Iterable<? extends SortColumnInfo> sortColumns);

TableInfo build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package io.deephaven.parquet.table;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.ColumnName;
import io.deephaven.api.Selectable;
import io.deephaven.api.SortColumn;
import io.deephaven.base.FileUtils;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
Expand All @@ -24,6 +26,7 @@
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.dataindex.DataIndexUtils;
import io.deephaven.engine.table.impl.indexer.DataIndexer;
import io.deephaven.engine.table.impl.locations.impl.StandaloneTableKey;
import io.deephaven.engine.table.impl.select.FormulaEvaluationException;
import io.deephaven.engine.table.impl.select.FunctionalColumn;
import io.deephaven.engine.table.impl.select.SelectColumn;
Expand All @@ -39,6 +42,7 @@
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.parquet.base.InvalidParquetFileException;
import io.deephaven.parquet.base.NullStatistics;
import io.deephaven.parquet.table.location.ParquetTableLocation;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
import io.deephaven.parquet.table.pagestore.ColumnChunkPageStore;
import io.deephaven.parquet.table.transfer.StringDictionary;
Expand Down Expand Up @@ -396,6 +400,63 @@ public void indexByBigInt() {
verifyIndexingInfoExists(fromDisk, "someBigInt", "someInt");
}

@Test
public void testSortingMetadata() {
final TableDefinition definition = TableDefinition.of(
ColumnDefinition.ofInt("someInt"),
ColumnDefinition.ofString("someString"));
final Table testTable =
((QueryTable) TableTools.emptyTable(10).select("someInt = i", "someString = `foo`")
.where("i % 2 == 0").groupBy("someString").ungroup("someInt")
.sortDescending("someInt"))
.withDefinitionUnsafe(definition);

DataIndexer.getOrCreateDataIndex(testTable, "someString");
DataIndexer.getOrCreateDataIndex(testTable, "someInt", "someString");

final File dest = new File(rootFile, "ParquetTest_sortingMetadata_test.parquet");
writeTable(testTable, dest);

final Table fromDisk = checkSingleTable(testTable, dest);

// Validate the indexes and lookup functions.
verifyIndexingInfoExists(fromDisk, "someString");
verifyIndexingInfoExists(fromDisk, "someInt", "someString");
verifyIndexingInfoExists(fromDisk, "someString", "someInt");

final ParquetTableLocation tableLocation = new ParquetTableLocation(
StandaloneTableKey.getInstance(),
new ParquetTableLocationKey(
convertToURI(dest, false),
0, Map.of(), EMPTY),
EMPTY);
assertEquals(tableLocation.getSortedColumns(), List.of(SortColumn.desc(ColumnName.of("someInt"))));

final ParquetTableLocation index1Location = new ParquetTableLocation(
StandaloneTableKey.getInstance(),
new ParquetTableLocationKey(
convertToURI(new File(rootFile,
ParquetTools.getRelativeIndexFilePath(dest, "someString")), false),
0, Map.of(), EMPTY),
EMPTY);
assertEquals(index1Location.getSortedColumns(), List.of(SortColumn.asc(ColumnName.of("someString"))));
final Table index1Table = DataIndexer.getDataIndex(fromDisk, "someString").table();
assertTableEquals(index1Table, index1Table.sort("someString"));

final ParquetTableLocation index2Location = new ParquetTableLocation(
StandaloneTableKey.getInstance(),
new ParquetTableLocationKey(
convertToURI(new File(rootFile,
ParquetTools.getRelativeIndexFilePath(dest, "someInt", "someString")), false),
0, Map.of(), EMPTY),
EMPTY);
assertEquals(index2Location.getSortedColumns(), List.of(
SortColumn.asc(ColumnName.of("someInt")),
SortColumn.asc(ColumnName.of("someString"))));
final Table index2Table = DataIndexer.getDataIndex(fromDisk, "someInt", "someString").table();
assertTableEquals(index2Table, index2Table.sort("someInt", "someString"));
}

private static void verifyIndexingInfoExists(final Table table, final String... columnNames) {
assertTrue(DataIndexer.hasDataIndex(table, columnNames));
final DataIndex fullIndex = DataIndexer.getDataIndex(table, columnNames);
Expand Down

0 comments on commit c71506e

Please sign in to comment.