diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java index 70dc0509e..c80c3ed7b 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/metadata/LakeSoulCatalog.java @@ -40,19 +40,14 @@ import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.ZonedTimestampType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; +import java.util.*; import static com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_HASH_PARTITION_SPLITTER; import static com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_PARTITION_SPLITTER_OF_RANGE_AND_HASH; @@ -275,7 +270,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig checkNotNull(tablePath); checkNotNull(table); TableSchema schema = table.getSchema(); - Optional primaryKeyColumns = schema.getPrimaryKey(); + schema.getTableColumns().forEach(this::validateType); + if (!databaseExists(tablePath.getDatabaseName())) { throw new DatabaseNotExistException(CATALOG_NAME, tablePath.getDatabaseName()); } @@ -284,6 +280,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig throw new TableAlreadyExistException(CATALOG_NAME, tablePath); } else return; } + + Optional primaryKeyColumns = schema.getPrimaryKey(); String primaryKeys = primaryKeyColumns.map( uniqueConstraint -> String.join(LAKESOUL_HASH_PARTITION_SPLITTER, uniqueConstraint.getColumns())) @@ -317,7 +315,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig List partitionKeys = Collections.emptyList(); if (table instanceof ResolvedCatalogTable) { partitionKeys = ((ResolvedCatalogTable) table).getPartitionKeys(); - validatePrimaryAndPartitionKeys(primaryKeyColumns, partitionKeys); + validatePrimaryAndPartitionKeys(primaryKeyColumns, partitionKeys, schema); String path = null; if (tableOptions.containsKey(TABLE_PATH)) { path = tableOptions.get(TABLE_PATH); @@ -593,18 +591,44 @@ public void cleanForTest() { } private void validatePrimaryAndPartitionKeys(Optional primaryKeyColumns, - List partitionKeys) { + List partitionKeys, + TableSchema tableSchema) { primaryKeyColumns.map(uniqueConstraint -> { - Set result = uniqueConstraint.getColumns().stream() - .distinct() - .filter(partitionKeys::contains) - .collect(Collectors.toSet()); - if (!result.isEmpty()) { - throw new RuntimeException( - String.format("Primray columns (%s) and partition columns (%s) cannot overlap", - uniqueConstraint.getColumns(), partitionKeys)); - } + uniqueConstraint.getColumns().forEach(column -> { + if (partitionKeys.contains(column)) { + throw new CatalogException( + String.format("Primray columns (%s) and partition columns (%s) cannot overlap", + uniqueConstraint.getColumns(), partitionKeys)); + } + validatePrimaryKeyType(tableSchema.getTableColumn(column).get()); + }); return 0; }); } + + private void validateType(TableColumn tableColumn) { + if (tableColumn.getType().getLogicalType() instanceof TimestampType) { + TimestampType timestampType = (TimestampType) tableColumn.getType().getLogicalType(); + if (timestampType.getPrecision() > 6) { + throw new CatalogException("LakeSoul does not support column `" + + tableColumn.getName() + + "` with timestamp precision > 6"); + } + } + } + + private void validatePrimaryKeyType(TableColumn tableColumn) { + LogicalType type = tableColumn.getType().getLogicalType(); + switch (type.getTypeRoot()) { + case TIMESTAMP_WITH_TIME_ZONE: + case MAP: + case MULTISET: + case ARRAY: + case ROW: + throw new CatalogException("LakeSoul does not support primary key `" + + tableColumn.getName() + + "` with type: " + + type.asSerializableString()); + } + } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulKeyGen.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulKeyGen.java index bd48651cc..799b0f5dd 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulKeyGen.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulKeyGen.java @@ -58,7 +58,6 @@ public long getRePartitionHash(RowData rowData) { } public static long getHash(LogicalType type, Object field, long seed) { - switch (type.getTypeRoot()) { case CHAR: case VARCHAR: @@ -106,7 +105,7 @@ public static long getHash(LogicalType type, Object field, long seed) { case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: TimestampData timestampData = (TimestampData) field; - long value = timestampData.getMillisecond() + timestampData.getNanoOfMillisecond() / 1000; + long value = timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000; seed = Murmur3HashFunction.hash(value, LongType, seed); break; default: diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java index 0db4354d5..ef5aca659 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java @@ -13,7 +13,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.core.fs.Path; import org.apache.flink.lakesoul.tool.FlinkUtil; -import org.apache.flink.table.data.TimestampData; import java.util.ArrayList; import java.util.Collections; @@ -87,7 +86,7 @@ public static BinarySourceRecord fromMysqlSourceRecord(SourceRecord sourceRecord Struct source = value.getStruct(Envelope.FieldName.SOURCE); if (sourceField != null && source != null) { if (sourceField.schema().field("file") != null) { - String fileName = (String)source.getWithoutDefault("file"); + String fileName = (String) source.getWithoutDefault("file"); if (StringUtils.isNotBlank(fileName)) { binlogFileIndex = Long.parseLong(fileName.substring(fileName.lastIndexOf(".") + 1)); } @@ -102,12 +101,13 @@ public static BinarySourceRecord fromMysqlSourceRecord(SourceRecord sourceRecord long sortField = (binlogFileIndex << 32) + binlogPosition; LakeSoulRowDataWrapper data = convert.toLakeSoulDataType(valueSchema, value, tableId, tsMs, sortField); String tablePath; - if (tableId.schema()==null){ + if (tableId.schema() == null) { tablePath = new Path(new Path(basePath, tableId.catalog()), tableId.table()).toString(); - }else { + } else { tablePath = new Path(new Path(basePath, tableId.schema()), tableId.table()).toString(); } - return new BinarySourceRecord(sourceRecord.topic(), primaryKeys, tableId, FlinkUtil.makeQualifiedPath(tablePath).toString(), + return new BinarySourceRecord(sourceRecord.topic(), primaryKeys, tableId, + FlinkUtil.makeQualifiedPath(tablePath).toString(), Collections.emptyList(), false, data, null); } } diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DDLSuite.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DDLSuite.java index a0e49fa3b..13e408910 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DDLSuite.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DDLSuite.java @@ -147,6 +147,31 @@ public void primaryKeyAndPartitionKeyOverlapTest() { }); } + @Test + public void typeValidationTest() { + validateTablePKType("TIMESTAMP(9)", true); + validateTablePKType("TIMESTAMP(9)", false); + validateTablePKType("ARRAY", true); + validateTablePKType("MAP", true); + validateTablePKType("ROW", true); + } + + private void validateTablePKType(String type, boolean isPK) { + StreamTableEnvironment streamTableEnv = TestUtils.createStreamTableEnv(STREAMING_TYPE); + Assert.assertThrows("type validation failed", TableException.class, () -> { + streamTableEnv.executeSql("drop table if exists test_table"); + streamTableEnv.executeSql("create table test_table (\n" + + String.format(" `pk` %s %s \n", type, isPK ? "PRIMARY KEY NOT ENFORCED" : "") + + ")\n" + + "with (\n" + + " 'connector' = 'lakesoul',\n" + + " 'path' = 'file:///tmp/test_table',\n" + + " 'hashBucketNum' = '1'\n" + + ");" + ); + }); + } + private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { String createUserSql = "create table user_info (" + " order_id INT," +