Skip to content

Commit

Permalink
[Flink] fix timestamp hash. add type validation (lakesoul-io#529)
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Aug 16, 2024
1 parent 02057c7 commit 793c353
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,14 @@
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.ZonedTimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.*;

import static com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_HASH_PARTITION_SPLITTER;
import static com.dmetasoul.lakesoul.meta.DBConfig.LAKESOUL_PARTITION_SPLITTER_OF_RANGE_AND_HASH;
Expand Down Expand Up @@ -275,7 +270,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
checkNotNull(tablePath);
checkNotNull(table);
TableSchema schema = table.getSchema();
Optional<UniqueConstraint> primaryKeyColumns = schema.getPrimaryKey();
schema.getTableColumns().forEach(this::validateType);

if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(CATALOG_NAME, tablePath.getDatabaseName());
}
Expand All @@ -284,6 +280,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
throw new TableAlreadyExistException(CATALOG_NAME, tablePath);
} else return;
}

Optional<UniqueConstraint> primaryKeyColumns = schema.getPrimaryKey();
String primaryKeys = primaryKeyColumns.map(
uniqueConstraint -> String.join(LAKESOUL_HASH_PARTITION_SPLITTER,
uniqueConstraint.getColumns()))
Expand Down Expand Up @@ -317,7 +315,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
List<String> partitionKeys = Collections.emptyList();
if (table instanceof ResolvedCatalogTable) {
partitionKeys = ((ResolvedCatalogTable) table).getPartitionKeys();
validatePrimaryAndPartitionKeys(primaryKeyColumns, partitionKeys);
validatePrimaryAndPartitionKeys(primaryKeyColumns, partitionKeys, schema);
String path = null;
if (tableOptions.containsKey(TABLE_PATH)) {
path = tableOptions.get(TABLE_PATH);
Expand Down Expand Up @@ -593,18 +591,44 @@ public void cleanForTest() {
}

private void validatePrimaryAndPartitionKeys(Optional<UniqueConstraint> primaryKeyColumns,
List<String> partitionKeys) {
List<String> partitionKeys,
TableSchema tableSchema) {
primaryKeyColumns.map(uniqueConstraint -> {
Set<String> result = uniqueConstraint.getColumns().stream()
.distinct()
.filter(partitionKeys::contains)
.collect(Collectors.toSet());
if (!result.isEmpty()) {
throw new RuntimeException(
String.format("Primray columns (%s) and partition columns (%s) cannot overlap",
uniqueConstraint.getColumns(), partitionKeys));
}
uniqueConstraint.getColumns().forEach(column -> {
if (partitionKeys.contains(column)) {
throw new CatalogException(
String.format("Primray columns (%s) and partition columns (%s) cannot overlap",
uniqueConstraint.getColumns(), partitionKeys));
}
validatePrimaryKeyType(tableSchema.getTableColumn(column).get());
});
return 0;
});
}

private void validateType(TableColumn tableColumn) {
if (tableColumn.getType().getLogicalType() instanceof TimestampType) {
TimestampType timestampType = (TimestampType) tableColumn.getType().getLogicalType();
if (timestampType.getPrecision() > 6) {
throw new CatalogException("LakeSoul does not support column `" +
tableColumn.getName() +
"` with timestamp precision > 6");
}
}
}

private void validatePrimaryKeyType(TableColumn tableColumn) {
LogicalType type = tableColumn.getType().getLogicalType();
switch (type.getTypeRoot()) {
case TIMESTAMP_WITH_TIME_ZONE:
case MAP:
case MULTISET:
case ARRAY:
case ROW:
throw new CatalogException("LakeSoul does not support primary key `" +
tableColumn.getName() +
"` with type: " +
type.asSerializableString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public long getRePartitionHash(RowData rowData) {
}

public static long getHash(LogicalType type, Object field, long seed) {

switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
Expand Down Expand Up @@ -106,7 +105,7 @@ public static long getHash(LogicalType type, Object field, long seed) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
TimestampData timestampData = (TimestampData) field;
long value = timestampData.getMillisecond() + timestampData.getNanoOfMillisecond() / 1000;
long value = timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000;
seed = Murmur3HashFunction.hash(value, LongType, seed);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.table.data.TimestampData;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -87,7 +86,7 @@ public static BinarySourceRecord fromMysqlSourceRecord(SourceRecord sourceRecord
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
if (sourceField != null && source != null) {
if (sourceField.schema().field("file") != null) {
String fileName = (String)source.getWithoutDefault("file");
String fileName = (String) source.getWithoutDefault("file");
if (StringUtils.isNotBlank(fileName)) {
binlogFileIndex = Long.parseLong(fileName.substring(fileName.lastIndexOf(".") + 1));
}
Expand All @@ -102,12 +101,13 @@ public static BinarySourceRecord fromMysqlSourceRecord(SourceRecord sourceRecord
long sortField = (binlogFileIndex << 32) + binlogPosition;
LakeSoulRowDataWrapper data = convert.toLakeSoulDataType(valueSchema, value, tableId, tsMs, sortField);
String tablePath;
if (tableId.schema()==null){
if (tableId.schema() == null) {
tablePath = new Path(new Path(basePath, tableId.catalog()), tableId.table()).toString();
}else {
} else {
tablePath = new Path(new Path(basePath, tableId.schema()), tableId.table()).toString();
}
return new BinarySourceRecord(sourceRecord.topic(), primaryKeys, tableId, FlinkUtil.makeQualifiedPath(tablePath).toString(),
return new BinarySourceRecord(sourceRecord.topic(), primaryKeys, tableId,
FlinkUtil.makeQualifiedPath(tablePath).toString(),
Collections.emptyList(), false, data, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,31 @@ public void primaryKeyAndPartitionKeyOverlapTest() {
});
}

@Test
public void typeValidationTest() {
validateTablePKType("TIMESTAMP(9)", true);
validateTablePKType("TIMESTAMP(9)", false);
validateTablePKType("ARRAY<STRING>", true);
validateTablePKType("MAP<STRING, STRING>", true);
validateTablePKType("ROW<n0 STRING, n1 STRING>", true);
}

private void validateTablePKType(String type, boolean isPK) {
StreamTableEnvironment streamTableEnv = TestUtils.createStreamTableEnv(STREAMING_TYPE);
Assert.assertThrows("type validation failed", TableException.class, () -> {
streamTableEnv.executeSql("drop table if exists test_table");
streamTableEnv.executeSql("create table test_table (\n" +
String.format(" `pk` %s %s \n", type, isPK ? "PRIMARY KEY NOT ENFORCED" : "") +
")\n" +
"with (\n" +
" 'connector' = 'lakesoul',\n" +
" 'path' = 'file:///tmp/test_table',\n" +
" 'hashBucketNum' = '1'\n" +
");"
);
});
}

private void createLakeSoulSourceTableUser(TableEnvironment tEnvs) throws ExecutionException, InterruptedException {
String createUserSql = "create table user_info (" +
" order_id INT," +
Expand Down

0 comments on commit 793c353

Please sign in to comment.