diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortedColumnsAttribute.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortedColumnsAttribute.java index d470c72ec51..31f2828bcb4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortedColumnsAttribute.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortedColumnsAttribute.java @@ -3,6 +3,8 @@ // package io.deephaven.engine.table.impl; +import io.deephaven.api.ColumnName; +import io.deephaven.api.SortColumn; import io.deephaven.engine.table.Table; import java.util.*; @@ -95,6 +97,25 @@ public static Table withOrderForColumn(Table table, String columnName, SortingOr return table.withAttributes(Map.of(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute)); } + /** + * Get the columns a {@link Table} is sorted by. + * + * @param table The table to interrogate + * + * @return A (possibly-empty) list of {@link SortColumn SortColumns} representing columns the table is sorted on and + * their associated sort order + */ + public static List getSortedColumns(Table table) { + final String attribute = (String) table.getAttribute(Table.SORTED_COLUMNS_ATTRIBUTE); + if (attribute == null || attribute.isEmpty()) { + return Collections.emptyList(); + } + return stringToMap(attribute, true).entrySet().stream().map(e -> { + final ColumnName columnName = ColumnName.of(e.getKey()); + return e.getValue().isAscending() ? SortColumn.asc(columnName) : SortColumn.desc(columnName); + }).collect(Collectors.toList()); + } + private static Map stringToMap(String attribute, boolean writable) { if (attribute == null || attribute.isEmpty()) { return writable ? new HashMap<>() : Collections.emptyMap(); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index 8b69a67cd7e..a0c63614a82 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -3,10 +3,12 @@ // package io.deephaven.parquet.table; +import io.deephaven.api.SortColumn; import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.TrackingRowSet; import io.deephaven.engine.table.*; +import io.deephaven.engine.table.impl.SortedColumnsAttribute; import io.deephaven.engine.table.impl.indexer.DataIndexer; import io.deephaven.engine.table.impl.select.FormulaColumn; import io.deephaven.engine.table.impl.select.NullSelectColumn; @@ -18,10 +20,7 @@ import io.deephaven.parquet.base.ParquetFileWriter; import io.deephaven.parquet.base.ParquetUtils; import io.deephaven.parquet.base.RowGroupWriter; -import io.deephaven.parquet.table.metadata.CodecInfo; -import io.deephaven.parquet.table.metadata.ColumnTypeInfo; -import io.deephaven.parquet.table.metadata.DataIndexInfo; -import io.deephaven.parquet.table.metadata.TableInfo; +import io.deephaven.parquet.table.metadata.*; import io.deephaven.parquet.table.transfer.ArrayAndVectorTransfer; import io.deephaven.parquet.table.transfer.StringDictionary; import io.deephaven.parquet.table.transfer.TransferObject; @@ -44,7 +43,6 @@ import java.io.IOException; import java.nio.IntBuffer; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.*; import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY; @@ -152,7 +150,11 @@ static void write( .or(() -> Optional.of(DataIndexer.getOrCreateDataIndex(t, info.indexColumnNames))) .get() .transform(DataIndexTransformer.builder().invertRowSet(t.getRowSet()).build()); - final Table indexTable = dataIndex.table(); + final Table indexTable = dataIndex.table().sort(info.indexColumnNames.toArray(new String[0])); + final TableInfo.Builder indexTableInfoBuilder = TableInfo.builder().addSortingColumns( + info.indexColumnNames.stream() + .map(cn -> SortColumnInfo.of(cn, SortColumnInfo.SortDirection.Ascending)) + .toArray(SortColumnInfo[]::new)); cleanupFiles.add(info.destFile); tableInfoBuilder.addDataIndexes(DataIndexInfo.of( @@ -168,11 +170,19 @@ static void write( } write(indexTable, indexTable.getDefinition(), writeInstructionsToUse, info.destFile.getAbsolutePath(), info.destFileForMetadata.getAbsolutePath(), - Collections.emptyMap(), TableInfo.builder(), NullParquetMetadataFileWriter.INSTANCE, + Collections.emptyMap(), indexTableInfoBuilder, NullParquetMetadataFileWriter.INSTANCE, computedCache); } } } + + // SortedColumnsAttribute effectively only stores (zero or more) individual columns by which the table is + // sorted, rather than ordered sets expressing multi-column sorts. Given that mismatch, we can only reflect + // a single column sort in the metadata at this time. + final List sortedColumns = SortedColumnsAttribute.getSortedColumns(t); + if (!sortedColumns.isEmpty()) { + tableInfoBuilder.addSortingColumns(SortColumnInfo.of(sortedColumns.get(0))); + } write(t, definition, writeInstructions, destFilePath, destFilePathForMetadata, incomingMeta, tableInfoBuilder, metadataFileWriter, computedCache); } catch (Exception e) { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index e5b0d96f3a5..9b56ae15bb8 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -386,7 +386,8 @@ private static String minusParquetSuffix(@NotNull final String s) { * indexing column {@code "IndexingColName"}, the method will return * {@code ".dh_metadata/indexes/IndexingColName/index_IndexingColName_table.parquet"} on unix systems. */ - private static String getRelativeIndexFilePath(@NotNull final File tableDest, @NotNull final String[] columnNames) { + @VisibleForTesting + static String getRelativeIndexFilePath(@NotNull final File tableDest, @NotNull final String... columnNames) { final String columns = String.join(",", columnNames); return String.format(".dh_metadata%sindexes%s%s%sindex_%s_%s", File.separator, File.separator, columns, File.separator, columns, tableDest.getName()); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 1cb61e20872..561d6f32076 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -31,8 +31,8 @@ import io.deephaven.parquet.table.metadata.ColumnTypeInfo; import io.deephaven.parquet.table.metadata.DataIndexInfo; import io.deephaven.parquet.table.metadata.GroupingColumnInfo; +import io.deephaven.parquet.table.metadata.SortColumnInfo; import io.deephaven.parquet.table.metadata.TableInfo; -import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.RowGroup; @@ -57,7 +57,6 @@ public class ParquetTableLocation extends AbstractTableLocation { private static final String IMPLEMENTATION_NAME = ParquetColumnLocation.class.getSimpleName(); private final ParquetInstructions readInstructions; - private final List sortingColumns; private final ParquetFileReader parquetFileReader; private final int[] rowGroupIndices; @@ -69,6 +68,7 @@ public class ParquetTableLocation extends AbstractTableLocation { private final Map groupingColumns; private final List dataIndexes; private final Map columnTypes; + private final List sortingColumns; private final String version; @@ -116,7 +116,7 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, groupingColumns = tableInfo.groupingColumnMap(); dataIndexes = tableInfo.dataIndexes(); columnTypes = tableInfo.columnTypeMap(); - sortingColumns = tableInfo.sortingColumns(); + sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) { // We do not have the last modified time for non-file URIs diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/SortColumnInfo.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/SortColumnInfo.java new file mode 100644 index 00000000000..a1c9b1dc712 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/SortColumnInfo.java @@ -0,0 +1,61 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.parquet.table.metadata; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.deephaven.annotations.SimpleStyle; +import io.deephaven.api.ColumnName; +import io.deephaven.api.SortColumn; +import org.immutables.value.Value.Check; +import org.immutables.value.Value.Immutable; +import org.immutables.value.Value.Parameter; +import org.jetbrains.annotations.NotNull; + +import java.util.List; +import java.util.stream.Collectors; + +@Immutable +@SimpleStyle +@JsonSerialize(as = ImmutableSortColumnInfo.class) +@JsonDeserialize(as = ImmutableSortColumnInfo.class) +@JsonInclude(JsonInclude.Include.NON_EMPTY) +public abstract class SortColumnInfo { + + public enum SortDirection { + Ascending, Descending + } + + @Parameter + public abstract String columnName(); + + @Parameter + public abstract SortDirection sortDirection(); + + @Check + final void checkColumnName() { + ColumnName.of(columnName()); + } + + public final SortColumn sortColumn() { + return sortDirection() == SortDirection.Ascending + ? SortColumn.asc(ColumnName.of(columnName())) + : SortColumn.desc(ColumnName.of(columnName())); + } + + public static List sortColumns(@NotNull final List sortColumnInfos) { + return sortColumnInfos.stream().map(SortColumnInfo::sortColumn).collect(Collectors.toList()); + } + + public static SortColumnInfo of(@NotNull final SortColumn sortColumn) { + return of(sortColumn.column().name(), sortColumn.order() == SortColumn.Order.ASCENDING + ? SortDirection.Ascending + : SortDirection.Descending); + } + + public static SortColumnInfo of(@NotNull final String columnName, @NotNull final SortDirection sortDirection) { + return ImmutableSortColumnInfo.of(columnName, sortDirection); + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java index a506cd8e91f..b6a437083de 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java @@ -10,8 +10,9 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import io.deephaven.annotations.BuildableStyle; -import io.deephaven.api.SortColumn; -import org.immutables.value.Value; +import org.immutables.value.Value.Check; +import org.immutables.value.Value.Default; +import org.immutables.value.Value.Immutable; import org.jetbrains.annotations.NotNull; import java.io.IOException; @@ -23,7 +24,7 @@ /** * Representation class for per-table information stored in key-value metadata for Deephaven-written Parquet files. */ -@Value.Immutable +@Immutable @BuildableStyle @JsonSerialize(as = ImmutableTableInfo.class) @JsonDeserialize(as = ImmutableTableInfo.class) @@ -59,7 +60,7 @@ public final Map columnTypeMap() { /** * @return The Deephaven release version used to write the parquet file */ - @Value.Default + @Default public String version() { final String version = TableInfo.class.getPackage().getImplementationVersion(); // noinspection ReplaceNullCheck @@ -86,10 +87,13 @@ public String version() { */ public abstract List columnTypes(); - public abstract List sortingColumns(); - + /** + * @return List of {@link SortColumnInfo sort columns} representing the sort order of the table. Note that these are + * ordered by precedence, representing a multi-column sort. + */ + public abstract List sortingColumns(); - @Value.Check + @Check final void checkVersion() { if (version().isEmpty()) { throw new IllegalArgumentException("Empty version"); @@ -110,11 +114,11 @@ public interface Builder { Builder addAllGroupingColumns(Iterable groupingColumns); - Builder addDataIndexes(DataIndexInfo info); + Builder addDataIndexes(DataIndexInfo dataIndex); - Builder addDataIndexes(DataIndexInfo... infos); + Builder addDataIndexes(DataIndexInfo... dataIndexes); - Builder addAllDataIndexes(Iterable infos); + Builder addAllDataIndexes(Iterable dataIndexes); Builder addColumnTypes(ColumnTypeInfo columnType); @@ -122,11 +126,11 @@ public interface Builder { Builder addAllColumnTypes(Iterable columnTypes); - Builder addSortingColumns(SortColumn sortPair); + Builder addSortingColumns(SortColumnInfo sortColumns); - Builder addSortingColumns(SortColumn... sortPairs); + Builder addSortingColumns(SortColumnInfo... sortColumns); - Builder addAllSortingColumns(Iterable sortPairs); + Builder addAllSortingColumns(Iterable sortColumns); TableInfo build(); } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index d8005ef598e..44ed9586a61 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -4,7 +4,9 @@ package io.deephaven.parquet.table; import io.deephaven.UncheckedDeephavenException; +import io.deephaven.api.ColumnName; import io.deephaven.api.Selectable; +import io.deephaven.api.SortColumn; import io.deephaven.base.FileUtils; import io.deephaven.base.verify.Assert; import io.deephaven.configuration.Configuration; @@ -24,6 +26,7 @@ import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.dataindex.DataIndexUtils; import io.deephaven.engine.table.impl.indexer.DataIndexer; +import io.deephaven.engine.table.impl.locations.impl.StandaloneTableKey; import io.deephaven.engine.table.impl.select.FormulaEvaluationException; import io.deephaven.engine.table.impl.select.FunctionalColumn; import io.deephaven.engine.table.impl.select.SelectColumn; @@ -39,6 +42,7 @@ import io.deephaven.extensions.s3.S3Instructions; import io.deephaven.parquet.base.InvalidParquetFileException; import io.deephaven.parquet.base.NullStatistics; +import io.deephaven.parquet.table.location.ParquetTableLocation; import io.deephaven.parquet.table.location.ParquetTableLocationKey; import io.deephaven.parquet.table.pagestore.ColumnChunkPageStore; import io.deephaven.parquet.table.transfer.StringDictionary; @@ -396,6 +400,63 @@ public void indexByBigInt() { verifyIndexingInfoExists(fromDisk, "someBigInt", "someInt"); } + @Test + public void testSortingMetadata() { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("someInt"), + ColumnDefinition.ofString("someString")); + final Table testTable = + ((QueryTable) TableTools.emptyTable(10).select("someInt = i", "someString = `foo`") + .where("i % 2 == 0").groupBy("someString").ungroup("someInt") + .sortDescending("someInt")) + .withDefinitionUnsafe(definition); + + DataIndexer.getOrCreateDataIndex(testTable, "someString"); + DataIndexer.getOrCreateDataIndex(testTable, "someInt", "someString"); + + final File dest = new File(rootFile, "ParquetTest_sortingMetadata_test.parquet"); + writeTable(testTable, dest); + + final Table fromDisk = checkSingleTable(testTable, dest); + + // Validate the indexes and lookup functions. + verifyIndexingInfoExists(fromDisk, "someString"); + verifyIndexingInfoExists(fromDisk, "someInt", "someString"); + verifyIndexingInfoExists(fromDisk, "someString", "someInt"); + + final ParquetTableLocation tableLocation = new ParquetTableLocation( + StandaloneTableKey.getInstance(), + new ParquetTableLocationKey( + convertToURI(dest, false), + 0, Map.of(), EMPTY), + EMPTY); + assertEquals(tableLocation.getSortedColumns(), List.of(SortColumn.desc(ColumnName.of("someInt")))); + + final ParquetTableLocation index1Location = new ParquetTableLocation( + StandaloneTableKey.getInstance(), + new ParquetTableLocationKey( + convertToURI(new File(rootFile, + ParquetTools.getRelativeIndexFilePath(dest, "someString")), false), + 0, Map.of(), EMPTY), + EMPTY); + assertEquals(index1Location.getSortedColumns(), List.of(SortColumn.asc(ColumnName.of("someString")))); + final Table index1Table = DataIndexer.getDataIndex(fromDisk, "someString").table(); + assertTableEquals(index1Table, index1Table.sort("someString")); + + final ParquetTableLocation index2Location = new ParquetTableLocation( + StandaloneTableKey.getInstance(), + new ParquetTableLocationKey( + convertToURI(new File(rootFile, + ParquetTools.getRelativeIndexFilePath(dest, "someInt", "someString")), false), + 0, Map.of(), EMPTY), + EMPTY); + assertEquals(index2Location.getSortedColumns(), List.of( + SortColumn.asc(ColumnName.of("someInt")), + SortColumn.asc(ColumnName.of("someString")))); + final Table index2Table = DataIndexer.getDataIndex(fromDisk, "someInt", "someString").table(); + assertTableEquals(index2Table, index2Table.sort("someInt", "someString")); + } + private static void verifyIndexingInfoExists(final Table table, final String... columnNames) { assertTrue(DataIndexer.hasDataIndex(table, columnNames)); final DataIndex fullIndex = DataIndexer.getDataIndex(table, columnNames);