Skip to content

Commit

Permalink
[Flink] Support flink watermark spec and computed column (lakesoul-io…
Browse files Browse the repository at this point in the history
…#472)

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Apr 23, 2024
1 parent e5cb9d6 commit a351fb4
Show file tree
Hide file tree
Showing 17 changed files with 221 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.table.LakeSoulDynamicTableFactory;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.*;
Expand Down Expand Up @@ -302,6 +303,20 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
tableOptions.put(VIEW_ORIGINAL_QUERY, ((ResolvedCatalogView) table).getOriginalQuery());
tableOptions.put(VIEW_EXPANDED_QUERY, ((ResolvedCatalogView) table).getExpandedQuery());
}
if (!schema.getWatermarkSpecs().isEmpty()) {
tableOptions.put(WATERMARK_SPEC_JSON, FlinkUtil.serializeWatermarkSpec(schema.getWatermarkSpecs()));
}

Map<String, String> computedColumns = new HashMap<>();
schema.getTableColumns().forEach(tableColumn -> {
if (tableColumn instanceof TableColumn.ComputedColumn) {
computedColumns.put(tableColumn.getName(), ((TableColumn.ComputedColumn) tableColumn).getExpression());
}
});
if (!computedColumns.isEmpty()) {
tableOptions.put(COMPUTE_COLUMN_JSON, JSON.toJSONString(computedColumns));
}

String json = JSON.toJSONString(tableOptions);
JSONObject properties = JSON.parseObject(json);
String tableName = tablePath.getObjectName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ public class LakeSoulDynamicSplitEnumerator implements SplitEnumerator<LakeSoulS
private final long discoveryInterval;
private final String parDesc;
private final Set<Integer> taskIdsAwaitingSplit;
String tid;
String tableId;
private long startTime;
private long nextStartTime;
private int hashBucketNum = -1;


public LakeSoulDynamicSplitEnumerator(SplitEnumeratorContext<LakeSoulSplit> context,
LakeSoulDynSplitAssigner splitAssigner, long discoveryInterval,
long startTime, String tid, String parDesc, String hashBucketNum) {
long startTime, String tableId, String parDesc, String hashBucketNum) {
this.context = context;
this.splitAssigner = splitAssigner;
this.discoveryInterval = discoveryInterval;
this.tid = tid;
this.tableId = tableId;
this.startTime = startTime;
this.parDesc = parDesc;
this.hashBucketNum = Integer.parseInt(hashBucketNum);
Expand All @@ -49,7 +49,7 @@ public LakeSoulDynamicSplitEnumerator(SplitEnumeratorContext<LakeSoulSplit> cont

@Override
public void start() {
context.callAsync(() -> this.enumerateSplits(tid), this::processDiscoveredSplits, discoveryInterval,
context.callAsync(() -> this.enumerateSplits(tableId), this::processDiscoveredSplits, discoveryInterval,
discoveryInterval);
}

Expand All @@ -60,9 +60,9 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
return;
}
int tasksSize = context.registeredReaders().size();
Optional<LakeSoulSplit> al = this.splitAssigner.getNext(subtaskId, tasksSize);
if (al.isPresent()) {
context.assignSplit(al.get(), subtaskId);
Optional<LakeSoulSplit> nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize);
if (nextSplit.isPresent()) {
context.assignSplit(nextSplit.get(), subtaskId);
taskIdsAwaitingSplit.remove(subtaskId);
} else {
taskIdsAwaitingSplit.add(subtaskId);
Expand All @@ -83,7 +83,7 @@ public void addReader(int subtaskId) {
@Override
public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception {
LakeSoulPendingSplits pendingSplits =
new LakeSoulPendingSplits(splitAssigner.remainingSplits(), this.nextStartTime, this.tid, this.parDesc,
new LakeSoulPendingSplits(splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, this.parDesc,
this.discoveryInterval, this.hashBucketNum);
LOG.info("LakeSoulDynamicSplitEnumerator snapshotState {}", pendingSplits);
return pendingSplits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private DataStreamSink<?> createStreamingSink(DataStream<RowData> dataStream, Co
Path path = FlinkUtil.makeQualifiedPath(new Path(flinkConf.getString(CATALOG_PATH)));
int bucketParallelism = flinkConf.getInteger(HASH_BUCKET_NUM);
//rowData key tools
RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType();
RowType rowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType();
//bucket file name config
OutputFileConfig fileNameConfig = OutputFileConfig.builder().withPartSuffix(".parquet").build();
//file rolling rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(RowType ro
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}

public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(TableSchema tsc, Optional<String> cdcColumn) throws CatalogException {
public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(TableSchema tableSchema, Optional<String> cdcColumn) throws CatalogException {
List<Field> fields = new ArrayList<>();
String cdcColName = null;
if (cdcColumn.isPresent()) {
Expand All @@ -105,12 +105,15 @@ public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(TableSchem
fields.add(cdcField);
}

for (int i = 0; i < tsc.getFieldCount(); i++) {
String name = tsc.getFieldName(i).get();
DataType dt = tsc.getFieldDataType(i).get();
for (int i = 0; i < tableSchema.getFieldCount(); i++) {
if (tableSchema.getTableColumn(i).get() instanceof TableColumn.ComputedColumn) {
continue;
}
String name = tableSchema.getFieldName(i).get();
DataType dataType = tableSchema.getFieldDataType(i).get();
if (name.equals(SORT_FIELD)) continue;

LogicalType logicalType = dt.getLogicalType();
LogicalType logicalType = dataType.getLogicalType();
Field arrowField = ArrowUtils.toArrowField(name, logicalType);
if (name.equals(cdcColName)) {
if (!arrowField.toString().equals(fields.get(0).toString())) {
Expand Down Expand Up @@ -213,10 +216,22 @@ public static CatalogBaseTable toFlinkCatalog(TableInfo tableInfo) {
}
bd.column(field.getName(), field.getType().asSerializableString());
}
if (properties.getString(COMPUTE_COLUMN_JSON) != null) {
JSONObject computeColumnJson = JSONObject.parseObject(properties.getString(COMPUTE_COLUMN_JSON));
computeColumnJson.forEach((column, columnExpr) -> bd.columnByExpression(column, (String) columnExpr));
}

DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
if (!partitionKeys.primaryKeys.isEmpty()) {
bd.primaryKey(partitionKeys.primaryKeys);
}

if (properties.getString(WATERMARK_SPEC_JSON) != null) {
JSONObject watermarkJson = JSONObject.parseObject(properties.getString(WATERMARK_SPEC_JSON));
watermarkJson.forEach((column, watermarkExpr) -> bd.watermark(column, (String) watermarkExpr));
}


List<String> parKeys = partitionKeys.rangeKeys;
HashMap<String, String> conf = new HashMap<>();
properties.forEach((key, value) -> conf.put(key, (String) value));
Expand Down Expand Up @@ -551,4 +566,15 @@ public static void createAndSetTableDirPermission(Path p) throws IOException {
hdfs.setPermission(tbDir, new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.NONE));
}
}

public static String serializeWatermarkSpec(List<WatermarkSpec> watermarkSpecs) {
Map<String, String> map = new HashMap<>();
for (WatermarkSpec watermarkSpec : watermarkSpecs) {
// Deserialize: watermarkSpec.getWatermarkExprOutputType() will be inferred from LakeSoul TableSchema
map.put(watermarkSpec.getRowtimeAttribute(), watermarkSpec.getWatermarkExpr());
}
return JSON.toJSONString(map);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public class LakeSoulSinkOptions {

public static final String CDC_CHANGE_COLUMN = "lakesoul_cdc_change_column";

public static final String WATERMARK_SPEC_JSON = "flink:watermark_spec_json";

public static final String COMPUTE_COLUMN_JSON = "flink:compute_column_json";

public static final String CDC_CHANGE_COLUMN_DEFAULT = "rowKinds";

public static final String SORT_FIELD = "__sort_filed__";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,9 @@ public ArrowType visit(LocalZonedTimestampType localZonedTimestampType) {
public ArrowType visit(TimestampType timestampType) {
if (timestampType.getPrecision() == 0) {
return new ArrowType.Timestamp(SECOND, null);
} else if (timestampType.getPrecision() >= 1 && timestampType.getPrecision() <= 6) {
} else if (timestampType.getPrecision() >= 1 && timestampType.getPrecision() <= 3) {
return new ArrowType.Timestamp(MILLISECOND, null);
} else if (timestampType.getPrecision() >= 4 && timestampType.getPrecision() <= 6) {
return new ArrowType.Timestamp(MICROSECOND, null);
} else {
return new ArrowType.Timestamp(NANOSECOND, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;

import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -136,10 +134,13 @@ public static void checkStreamingQueryAnswer(StreamTableEnvironment tableEnv, St
throw new RuntimeException(e);
}
List<String> results = TestValuesTableFactory.getResults(String.format("%s_sink", sourceTable));
results.sort(Comparator.comparing(
row -> Integer.valueOf(row.substring(3, (row.contains(",")) ? row.indexOf(",") : row.length() - 1))));
assertThat(results.toString()).isEqualTo(expectedAnswer);

if (expectedAnswer.isEmpty()) {
System.out.println(results);
} else {
results.sort(Comparator.comparing(
row -> Integer.valueOf(row.substring(3, (row.contains(",")) ? row.indexOf(",") : row.length() - 1))));
assertThat(results.toString()).isEqualTo(expectedAnswer);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.lakesoul.test.flinkSource.TestUtils.BATCH_TYPE;
import static org.assertj.core.api.Assertions.assertThat;

public class SubstraitTest extends AbstractTestBase {

private final String BATCH_TYPE = "batch";

@Test
public void dateTypeTest() throws ExecutionException, InterruptedException {
TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.lakesoul.test.flinkSource.TestUtils.BATCH_TYPE;

public class BatchReadSuite extends AbstractTestBase {
private final String BATCH_TYPE = "batch";
private String startTime;
private String endTime;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@

import java.util.concurrent.ExecutionException;

import static org.apache.flink.lakesoul.test.flinkSource.TestUtils.BATCH_TYPE;
import static org.apache.flink.lakesoul.test.flinkSource.TestUtils.STREAMING_TYPE;

public class DDLSuite extends AbstractTestBase {
private String BATCH_TYPE = "batch";
private String STREAMING_TYPE = "streaming";

@Test
public void dropTable() throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -55,14 +56,13 @@ public void dropView() throws ExecutionException, InterruptedException {
// tEnv.executeSql("select * from user_info").print();
// tEnv.executeSql("alter table user_info drop partition `date`='1995-10-01'");
// }

@Test
public void alterTableNotSupported() throws ExecutionException, InterruptedException {
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableUser(tEnv);
try {
tEnv.executeSql("ALTER TABLE user_info RENAME TO NewUsers");
}catch (TableException e) {
} catch (TableException e) {
System.out.println("Rename lakesoul table not supported now");
}
}
Expand All @@ -84,22 +84,22 @@ public void explainTable() throws ExecutionException, InterruptedException {
}

@Test
public void loadLakeSoulModuleNotSupported(){
public void loadLakeSoulModuleNotSupported() {
StreamTableEnvironment streamTableEnv = TestUtils.createStreamTableEnv(STREAMING_TYPE);
try {
streamTableEnv.executeSql("LOAD MODULE lakesoul WITH ('format'='lakesoul')");
}catch (ValidationException e) {
} catch (ValidationException e) {
System.out.println("LOAD lakesoul module not supported now");
}
}

@Test
public void unloadModuleTest(){
public void unloadModuleTest() {
StreamTableEnvironment streamTableEnv = TestUtils.createStreamTableEnv(STREAMING_TYPE);
try {
streamTableEnv.executeSql("UNLOAD MODULE core");
streamTableEnv.executeSql("SHOW MODULES");
}catch (ValidationException e) {
} catch (ValidationException e) {
System.out.println("UNLOAD lakesoul module not supported now");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.lakesoul.test.flinkSource.TestUtils.BATCH_TYPE;

public class DMLSuite extends AbstractTestBase {
private final String BATCH_TYPE = "batch";

@Test
public void testInsertSQL() throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.lakesoul.test.flinkSource.TestUtils.BATCH_TYPE;
import static org.assertj.core.api.Assertions.assertThat;

public class DataTypeSupportTest extends AbstractTestBase {
private final String BATCH_TYPE = "batch";

@Test
public void testTimeStampLTZ() throws ExecutionException, InterruptedException {
Expand Down
Loading

0 comments on commit a351fb4

Please sign in to comment.