Skip to content

Commit

Permalink
[FLINK-33208] Support the writable metadata timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Tan-JiaLiang committed Nov 10, 2023
1 parent 4b33c32 commit fed3c91
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 28 deletions.
27 changes: 27 additions & 0 deletions docs/content.zh/docs/connectors/table/hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,33 @@ LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = hTable.rowkey;
```

可用的元数据
------------------

以下的连接器元数据可以在表定义中通过元数据列的形式获取。

`R/W` 列定义了一个元数据是可读的(`R`)还是可写的(`W`)。
只读列必须声明为 `VIRTUAL` 以在 `INSERT INTO` 操作中排除它们。

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 25%">Key</th>
<th class="text-center" style="width: 30%">Data Type</th>
<th class="text-center" style="width: 40%">Description</th>
<th class="text-center" style="width: 5%">R/W</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>timestamp</code></td>
<td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td>
<td>HBase记录的时间戳。</td>
<td><code>W</code></td>
</tr>
</tbody>
</table>

连接器参数
----------------

Expand Down
27 changes: 27 additions & 0 deletions docs/content/docs/connectors/table/hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,33 @@ LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = hTable.rowkey;
```

Available Metadata
------------------

The following connector metadata can be accessed as metadata columns in a table definition.

The `R/W` column defines whether a metadata field is readable (`R`) and/or writable (`W`).
Read-only columns must be declared `VIRTUAL` to exclude them during an `INSERT INTO` operation.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 25%">Key</th>
<th class="text-center" style="width: 30%">Data Type</th>
<th class="text-center" style="width: 40%">Description</th>
<th class="text-center" style="width: 5%">R/W</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>timestamp</code></td>
<td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td>
<td>Timestamp for the HBase mutation.</td>
<td><code>W</code></td>
</tr>
</tbody>
</table>

Connector Options
----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,13 @@ public DynamicTableSink createDynamicTableSink(Context context) {
Configuration hbaseConf = getHBaseConfiguration(tableOptions);
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

return new HBaseDynamicTableSink(
tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
tableName,
context.getPhysicalRowDataType(),
hbaseConf,
hBaseWriteOptions,
nullStringLiteral);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,66 @@
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.sink.HBaseSinkFunction;
import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter;
import org.apache.flink.connector.hbase.sink.WritableMetadata;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;

import org.apache.hadoop.conf.Configuration;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

/** HBase table sink implementation. */
@Internal
public class HBaseDynamicTableSink implements DynamicTableSink {
public class HBaseDynamicTableSink implements DynamicTableSink, SupportsWritingMetadata {

private final HBaseTableSchema hbaseTableSchema;
private final String nullStringLiteral;
private final Configuration hbaseConf;
private final HBaseWriteOptions writeOptions;
private final String tableName;
private final DataType physicalDataType;

/** Metadata that is appended at the end of a physical sink row. */
private List<String> metadataKeys;

public HBaseDynamicTableSink(
String tableName,
HBaseTableSchema hbaseTableSchema,
DataType physicalDataType,
Configuration hbaseConf,
HBaseWriteOptions writeOptions,
String nullStringLiteral) {
this.hbaseTableSchema = hbaseTableSchema;
this.nullStringLiteral = nullStringLiteral;
this.tableName = tableName;
this.physicalDataType = physicalDataType;
this.hbaseTableSchema = HBaseTableSchema.fromDataType(physicalDataType);
this.metadataKeys = Collections.emptyList();
this.hbaseConf = hbaseConf;
this.writeOptions = writeOptions;
this.tableName = tableName;
this.nullStringLiteral = nullStringLiteral;
}

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
int[] metadataPositions =
getMetadataPositions(physicalDataType.getLogicalType().getChildren());
HBaseSinkFunction<RowData> sinkFunction =
new HBaseSinkFunction<>(
tableName,
hbaseConf,
new RowDataToMutationConverter(
hbaseTableSchema,
metadataKeys.size() > 0,
metadataPositions,
nullStringLiteral,
writeOptions.isIgnoreNullValue()),
writeOptions.getBufferFlushMaxSizeInBytes(),
Expand All @@ -83,10 +103,23 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return builder.build();
}

@Override
public Map<String, DataType> listWritableMetadata() {
final Map<String, DataType> metadataMap = new LinkedHashMap<>();
Stream.of(WritableMetadata.values())
.forEachOrdered(m -> metadataMap.put(m.getKey(), m.getDataType()));
return metadataMap;
}

@Override
public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
this.metadataKeys = metadataKeys;
}

@Override
public DynamicTableSink copy() {
return new HBaseDynamicTableSink(
tableName, hbaseTableSchema, hbaseConf, writeOptions, nullStringLiteral);
tableName, physicalDataType, hbaseConf, writeOptions, nullStringLiteral);
}

@Override
Expand Down Expand Up @@ -115,4 +148,17 @@ public Configuration getConfiguration() {
public String getTableName() {
return this.tableName;
}

private int[] getMetadataPositions(List<LogicalType> physicalChildren) {
return Stream.of(WritableMetadata.values())
.mapToInt(
m -> {
final int pos = metadataKeys.indexOf(m.getKey());
if (pos < 0) {
return -1;
}
return physicalChildren.size() + pos;
})
.toArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,56 @@ public void testTableSinkWithChangelog() throws Exception {
assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, Row.of("Hello3"))));
}

@Test
public void testTableSinkWithTimestampMetadata() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);

tEnv.executeSql(
"CREATE TABLE hTableForSink ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>,"
+ " version TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_5
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");

String insert =
"INSERT INTO hTableForSink VALUES"
+ "(1, ROW(1), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ "(2, ROW(2), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ "(3, ROW(3), TO_TIMESTAMP_LTZ(1696767943270, 3)),"
+ "(1, ROW(10), TO_TIMESTAMP_LTZ(1696767943269, 3)),"
+ "(2, ROW(20), TO_TIMESTAMP_LTZ(1696767943271, 3))";
tEnv.executeSql(insert).await();

tEnv.executeSql(
"CREATE TABLE hTableForQuery ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-1.4',"
+ " 'table-name' = '"
+ TEST_TABLE_5
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
TableResult result = tEnv.executeSql("SELECT rowkey, family1.col1 FROM hTableForQuery");
List<Row> results = CollectionUtil.iteratorToList(result.collect());

String expected = "+I[1, 1]\n+I[2, 20]\n+I[3, 3]\n";

TestBaseUtils.compareResultAsText(results, expected);
}

@Test
public void testTableSourceSinkWithDDL() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -554,7 +604,8 @@ public void testHBaseSinkFunctionTableExistence() throws Exception {
new HBaseSinkFunction<>(
TEST_NOT_EXISTS_TABLE,
hbaseConf,
new RowDataToMutationConverter(tableSchema, "null", false),
new RowDataToMutationConverter(
tableSchema, false, new int[] {}, "null", false),
2 * 1024 * 1024,
1000,
1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
protected static final String TEST_TABLE_2 = "testTable2";
protected static final String TEST_TABLE_3 = "testTable3";
protected static final String TEST_TABLE_4 = "testTable4";
protected static final String TEST_TABLE_5 = "testTable5";
protected static final String TEST_EMPTY_TABLE = "testEmptyTable";
protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable";

Expand Down Expand Up @@ -96,6 +97,7 @@ private static void prepareTables() throws IOException {
createHBaseTable2();
createHBaseTable3();
createHBaseTable4();
createHBaseTable5();
createEmptyHBaseTable();
}

Expand Down Expand Up @@ -244,6 +246,13 @@ private static void createHBaseTable4() {
createTable(tableName, families, SPLIT_KEYS);
}

private static void createHBaseTable5() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
TableName tableName = TableName.valueOf(TEST_TABLE_5);
createTable(tableName, families, SPLIT_KEYS);
}

private static void createEmptyHBaseTable() {
// create a table
byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ public DynamicTableSink createDynamicTableSink(Context context) {
Configuration hbaseConf = getHBaseConfiguration(tableOptions);
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

return new HBaseDynamicTableSink(
tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
tableName,
context.getPhysicalRowDataType(),
hbaseConf,
hBaseWriteOptions,
nullStringLiteral);
}

@Override
Expand Down
Loading

0 comments on commit fed3c91

Please sign in to comment.