diff --git a/docs/content.zh/docs/connectors/table/hbase.md b/docs/content.zh/docs/connectors/table/hbase.md index 79aae7b5..db68d379 100644 --- a/docs/content.zh/docs/connectors/table/hbase.md +++ b/docs/content.zh/docs/connectors/table/hbase.md @@ -75,6 +75,33 @@ LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime ON myTopic.key = hTable.rowkey; ``` +可用的元数据 +------------------ + +以下的连接器元数据可以在表定义中通过元数据列的形式获取。 + +`R/W` 列定义了一个元数据是可读的(`R`)还是可写的(`W`)。 +只读列必须声明为 `VIRTUAL` 以在 `INSERT INTO` 操作中排除它们。 + + + + + + + + + + + + + + + + + + +
KeyData TypeDescriptionR/W
timestampTIMESTAMP_LTZ(3) NOT NULLHBase记录的时间戳。W
+ 连接器参数 ---------------- diff --git a/docs/content/docs/connectors/table/hbase.md b/docs/content/docs/connectors/table/hbase.md index 31f6895f..8c723daf 100644 --- a/docs/content/docs/connectors/table/hbase.md +++ b/docs/content/docs/connectors/table/hbase.md @@ -77,6 +77,33 @@ LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime ON myTopic.key = hTable.rowkey; ``` +Available Metadata +------------------ + +The following connector metadata can be accessed as metadata columns in a table definition. + +The `R/W` column defines whether a metadata field is readable (`R`) and/or writable (`W`). +Read-only columns must be declared `VIRTUAL` to exclude them during an `INSERT INTO` operation. + + + + + + + + + + + + + + + + + + +
KeyData TypeDescriptionR/W
timestampTIMESTAMP_LTZ(3) NOT NULLTimestamp for the HBase mutation.W
+ Connector Options ---------------- diff --git a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java index 5321bf27..19e4d1ac 100644 --- a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java +++ b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java @@ -122,11 +122,13 @@ public DynamicTableSink createDynamicTableSink(Context context) { Configuration hbaseConf = getHBaseConfiguration(tableOptions); HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); - HBaseTableSchema hbaseSchema = - HBaseTableSchema.fromDataType(context.getPhysicalRowDataType()); return new HBaseDynamicTableSink( - tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral); + tableName, + context.getPhysicalRowDataType(), + hbaseConf, + hBaseWriteOptions, + nullStringLiteral); } @Override diff --git a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java index 0dec9375..456948fe 100644 --- a/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java +++ b/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseDynamicTableSink.java @@ -23,36 +23,49 @@ import org.apache.flink.connector.hbase.options.HBaseWriteOptions; import org.apache.flink.connector.hbase.sink.HBaseSinkFunction; import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter; +import org.apache.flink.connector.hbase.sink.WritableMetadata; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; import org.apache.hadoop.conf.Configuration; +import java.util.Collections; +import java.util.List; +import java.util.Map; + /** HBase table sink implementation. */ @Internal -public class HBaseDynamicTableSink implements DynamicTableSink { +public class HBaseDynamicTableSink implements DynamicTableSink, SupportsWritingMetadata { private final HBaseTableSchema hbaseTableSchema; private final String nullStringLiteral; private final Configuration hbaseConf; private final HBaseWriteOptions writeOptions; private final String tableName; + private final DataType physicalDataType; + + /** Metadata that is appended at the end of a physical sink row. */ + private List metadataKeys; public HBaseDynamicTableSink( String tableName, - HBaseTableSchema hbaseTableSchema, + DataType physicalDataType, Configuration hbaseConf, HBaseWriteOptions writeOptions, String nullStringLiteral) { - this.hbaseTableSchema = hbaseTableSchema; - this.nullStringLiteral = nullStringLiteral; + this.tableName = tableName; + this.physicalDataType = physicalDataType; + this.hbaseTableSchema = HBaseTableSchema.fromDataType(physicalDataType); + this.metadataKeys = Collections.emptyList(); this.hbaseConf = hbaseConf; this.writeOptions = writeOptions; - this.tableName = tableName; + this.nullStringLiteral = nullStringLiteral; } @Override @@ -63,6 +76,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { hbaseConf, new RowDataToMutationConverter( hbaseTableSchema, + physicalDataType, + metadataKeys, nullStringLiteral, writeOptions.isIgnoreNullValue()), writeOptions.getBufferFlushMaxSizeInBytes(), @@ -83,10 +98,20 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { return builder.build(); } + @Override + public Map listWritableMetadata() { + return WritableMetadata.list(); + } + + @Override + public void applyWritableMetadata(List metadataKeys, DataType consumedDataType) { + this.metadataKeys = metadataKeys; + } + @Override public DynamicTableSink copy() { return new HBaseDynamicTableSink( - tableName, hbaseTableSchema, hbaseConf, writeOptions, nullStringLiteral); + tableName, physicalDataType, hbaseConf, writeOptions, nullStringLiteral); } @Override diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java index 31290a77..1c531873 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseConnectorITCase.java @@ -372,6 +372,56 @@ public void testTableSinkWithChangelog() throws Exception { assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, Row.of("Hello3")))); } + @Test + public void testTableSinkWithTimestampMetadata() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + tEnv.executeSql( + "CREATE TABLE hTableForSink (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW," + + " version TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'" + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = '" + + TEST_TABLE_5 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + String insert = + "INSERT INTO hTableForSink VALUES" + + "(1, ROW(1), TO_TIMESTAMP_LTZ(1696767943270, 3))," + + "(2, ROW(2), TO_TIMESTAMP_LTZ(1696767943270, 3))," + + "(3, ROW(3), TO_TIMESTAMP_LTZ(1696767943270, 3))," + + "(1, ROW(10), TO_TIMESTAMP_LTZ(1696767943269, 3))," + + "(2, ROW(20), TO_TIMESTAMP_LTZ(1696767943271, 3))"; + tEnv.executeSql(insert).await(); + + tEnv.executeSql( + "CREATE TABLE hTableForQuery (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW" + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = '" + + TEST_TABLE_5 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + TableResult result = tEnv.executeSql("SELECT rowkey, family1.col1 FROM hTableForQuery"); + List results = CollectionUtil.iteratorToList(result.collect()); + + String expected = "+I[1, 1]\n+I[2, 20]\n+I[3, 3]\n"; + + TestBaseUtils.compareResultAsText(results, expected); + } + @Test public void testTableSourceSinkWithDDL() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -554,7 +604,12 @@ public void testHBaseSinkFunctionTableExistence() throws Exception { new HBaseSinkFunction<>( TEST_NOT_EXISTS_TABLE, hbaseConf, - new RowDataToMutationConverter(tableSchema, "null", false), + new RowDataToMutationConverter( + tableSchema, + tableSchema.convertToDataType(), + Collections.emptyList(), + "null", + false), 2 * 1024 * 1024, 1000, 1000); diff --git a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java index 530285d4..c3512a96 100644 --- a/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java +++ b/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/util/HBaseTestBase.java @@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; protected static final String TEST_TABLE_4 = "testTable4"; + protected static final String TEST_TABLE_5 = "testTable5"; protected static final String TEST_EMPTY_TABLE = "testEmptyTable"; protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable"; @@ -96,6 +97,7 @@ private static void prepareTables() throws IOException { createHBaseTable2(); createHBaseTable3(); createHBaseTable4(); + createHBaseTable5(); createEmptyHBaseTable(); } @@ -244,6 +246,13 @@ private static void createHBaseTable4() { createTable(tableName, families, SPLIT_KEYS); } + private static void createHBaseTable5() { + // create a table + byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; + TableName tableName = TableName.valueOf(TEST_TABLE_5); + createTable(tableName, families, SPLIT_KEYS); + } + private static void createEmptyHBaseTable() { // create a table byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java index 8a10a1c3..07c324d1 100644 --- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java +++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java @@ -124,11 +124,13 @@ public DynamicTableSink createDynamicTableSink(Context context) { Configuration hbaseConf = getHBaseConfiguration(tableOptions); HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); - HBaseTableSchema hbaseSchema = - HBaseTableSchema.fromDataType(context.getPhysicalRowDataType()); return new HBaseDynamicTableSink( - tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral); + tableName, + context.getPhysicalRowDataType(), + hbaseConf, + hBaseWriteOptions, + nullStringLiteral); } @Override diff --git a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java index 299a4579..fa8ab78c 100644 --- a/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java +++ b/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java @@ -23,34 +23,46 @@ import org.apache.flink.connector.hbase.options.HBaseWriteOptions; import org.apache.flink.connector.hbase.sink.HBaseSinkFunction; import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter; +import org.apache.flink.connector.hbase.sink.WritableMetadata; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; import org.apache.hadoop.conf.Configuration; +import java.util.Collections; +import java.util.List; +import java.util.Map; + /** HBase table sink implementation. */ @Internal -public class HBaseDynamicTableSink implements DynamicTableSink { +public class HBaseDynamicTableSink implements DynamicTableSink, SupportsWritingMetadata { private final String tableName; private final HBaseTableSchema hbaseTableSchema; private final Configuration hbaseConf; private final HBaseWriteOptions writeOptions; private final String nullStringLiteral; + private final DataType physicalDataType; + + /** Metadata that is appended at the end of a physical sink row. */ + private List metadataKeys; public HBaseDynamicTableSink( String tableName, - HBaseTableSchema hbaseTableSchema, + DataType physicalDataType, Configuration hbaseConf, HBaseWriteOptions writeOptions, String nullStringLiteral) { - this.tableName = tableName; - this.hbaseTableSchema = hbaseTableSchema; + this.physicalDataType = physicalDataType; + this.hbaseTableSchema = HBaseTableSchema.fromDataType(physicalDataType); + this.metadataKeys = Collections.emptyList(); this.hbaseConf = hbaseConf; this.writeOptions = writeOptions; this.nullStringLiteral = nullStringLiteral; @@ -64,6 +76,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { hbaseConf, new RowDataToMutationConverter( hbaseTableSchema, + physicalDataType, + metadataKeys, nullStringLiteral, writeOptions.isIgnoreNullValue()), writeOptions.getBufferFlushMaxSizeInBytes(), @@ -84,10 +98,20 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { return builder.build(); } + @Override + public Map listWritableMetadata() { + return WritableMetadata.list(); + } + + @Override + public void applyWritableMetadata(List metadataKeys, DataType consumedDataType) { + this.metadataKeys = metadataKeys; + } + @Override public DynamicTableSink copy() { return new HBaseDynamicTableSink( - tableName, hbaseTableSchema, hbaseConf, writeOptions, nullStringLiteral); + tableName, physicalDataType, hbaseConf, writeOptions, nullStringLiteral); } @Override diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index df72146d..c73bbc3c 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -401,6 +401,56 @@ public void testTableSinkWithChangelog() throws Exception { assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, Row.of("Hello3")))); } + @Test + public void testTableSinkWithTimestampMetadata() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + tEnv.executeSql( + "CREATE TABLE hTableForSink (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW," + + " version TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp'" + + ") WITH (" + + " 'connector' = 'hbase-2.2'," + + " 'table-name' = '" + + TEST_TABLE_5 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + String insert = + "INSERT INTO hTableForSink VALUES" + + "(1, ROW(1), TO_TIMESTAMP_LTZ(1696767943270, 3))," + + "(2, ROW(2), TO_TIMESTAMP_LTZ(1696767943270, 3))," + + "(3, ROW(3), TO_TIMESTAMP_LTZ(1696767943270, 3))," + + "(1, ROW(10), TO_TIMESTAMP_LTZ(1696767943269, 3))," + + "(2, ROW(20), TO_TIMESTAMP_LTZ(1696767943271, 3))"; + tEnv.executeSql(insert).await(); + + tEnv.executeSql( + "CREATE TABLE hTableForQuery (" + + " rowkey INT PRIMARY KEY NOT ENFORCED," + + " family1 ROW" + + ") WITH (" + + " 'connector' = 'hbase-2.2'," + + " 'table-name' = '" + + TEST_TABLE_5 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + TableResult result = tEnv.executeSql("SELECT rowkey, family1.col1 FROM hTableForQuery"); + List results = CollectionUtil.iteratorToList(result.collect()); + + String expected = "+I[1, 1]\n+I[2, 20]\n+I[3, 3]\n"; + + TestBaseUtils.compareResultAsText(results, expected); + } + @Test public void testTableSourceSinkWithDDL() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -538,7 +588,12 @@ public void testHBaseSinkFunctionTableExistence() throws Exception { new HBaseSinkFunction<>( TEST_NOT_EXISTS_TABLE, hbaseConf, - new RowDataToMutationConverter(tableSchema, "null", false), + new RowDataToMutationConverter( + tableSchema, + tableSchema.convertToDataType(), + Collections.emptyList(), + "null", + false), 2 * 1024 * 1024, 1000, 1000); diff --git a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java index 1bf60d50..d6a0a9e3 100644 --- a/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java +++ b/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestBase.java @@ -44,6 +44,7 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { protected static final String TEST_TABLE_2 = "testTable2"; protected static final String TEST_TABLE_3 = "testTable3"; protected static final String TEST_TABLE_4 = "testTable4"; + protected static final String TEST_TABLE_5 = "testTable5"; protected static final String TEST_EMPTY_TABLE = "testEmptyTable"; protected static final String TEST_NOT_EXISTS_TABLE = "notExistsTable"; @@ -96,6 +97,7 @@ private static void prepareTables() throws IOException { createHBaseTable2(); createHBaseTable3(); createHBaseTable4(); + createHBaseTable5(); createEmptyHBaseTable(); } @@ -244,6 +246,13 @@ private static void createHBaseTable4() { createTable(tableName, families, SPLIT_KEYS); } + private static void createHBaseTable5() { + // create a table + byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; + TableName tableName = TableName.valueOf(TEST_TABLE_5); + createTable(tableName, families, SPLIT_KEYS); + } + private static void createEmptyHBaseTable() { // create a table byte[][] families = new byte[][] {Bytes.toBytes(FAMILY1)}; diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java index f07377c9..f9a13c88 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java @@ -18,13 +18,17 @@ package org.apache.flink.connector.hbase.sink; +import org.apache.flink.connector.hbase.sink.WritableMetadata.TimestampMetadata; import org.apache.flink.connector.hbase.util.HBaseSerde; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; import org.apache.hadoop.hbase.client.Mutation; +import java.util.List; + /** * An implementation of {@link HBaseMutationConverter} which converts {@link RowData} into {@link * Mutation}. @@ -35,13 +39,19 @@ public class RowDataToMutationConverter implements HBaseMutationConverter metadataKeys, + String nullStringLiteral, + boolean ignoreNullValue) { this.schema = schema; this.nullStringLiteral = nullStringLiteral; this.ignoreNullValue = ignoreNullValue; + this.timestampMetadata = TimestampMetadata.of(metadataKeys, physicalDataType); } @Override @@ -51,11 +61,12 @@ public void open() { @Override public Mutation convertToMutation(RowData record) { + Long timestamp = timestampMetadata.read(record); RowKind kind = record.getRowKind(); if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) { - return serde.createPutMutation(record); + return serde.createPutMutation(record, timestamp); } else { - return serde.createDeleteMutation(record); + return serde.createDeleteMutation(record, timestamp); } } } diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java new file mode 100644 index 00000000..c7e9e980 --- /dev/null +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WritableMetadata.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.hbase.sink; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import org.apache.hadoop.hbase.HConstants; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Writable metadata for HBase. */ +public abstract class WritableMetadata implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * Returns the map of metadata keys and their corresponding data types that can be consumed by + * HBase sink for writing. + * + *

Note: All the supported writable metadata should be manually registered in it. + */ + public static Map list() { + Map metadataMap = new HashMap<>(); + metadataMap.put(TimestampMetadata.KEY, TimestampMetadata.DATA_TYPE); + return Collections.unmodifiableMap(metadataMap); + } + + public abstract T read(RowData row); + + /** Timestamp metadata for HBase. */ + public static class TimestampMetadata extends WritableMetadata { + + public static final String KEY = "timestamp"; + public static final DataType DATA_TYPE = + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(); + + private final int pos; + + public TimestampMetadata(int pos) { + this.pos = pos; + } + + @Override + public Long read(RowData row) { + if (pos < 0) { + return HConstants.LATEST_TIMESTAMP; + } + if (row.isNullAt(pos)) { + throw new IllegalArgumentException( + String.format("Writable metadata '%s' can not accept null value", KEY)); + } + return row.getTimestamp(pos, 3).getMillisecond(); + } + + public static TimestampMetadata of(List metadataKeys, DataType physicalDataType) { + int pos = metadataKeys.indexOf(TimestampMetadata.KEY); + if (pos < 0) { + return new TimestampMetadata(-1); + } + return new TimestampMetadata( + pos + physicalDataType.getLogicalType().getChildren().size()); + } + } +} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java index 458b25d2..d381033a 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java @@ -135,7 +135,7 @@ public HBaseSerde( * * @return The appropriate instance of Put for this use case. */ - public @Nullable Put createPutMutation(RowData row) { + public @Nullable Put createPutMutation(RowData row, long timestamp) { checkArgument(keyEncoder != null, "row key is not set."); byte[] rowkey = keyEncoder.encode(row, rowkeyIndex); if (rowkey.length == 0) { @@ -143,7 +143,7 @@ public HBaseSerde( return null; } // upsert - Put put = new Put(rowkey); + Put put = new Put(rowkey, timestamp); for (int i = 0; i < fieldLength; i++) { if (i != rowkeyIndex) { int f = i > rowkeyIndex ? i - 1 : i; @@ -172,7 +172,7 @@ public HBaseSerde( * * @return The appropriate instance of Delete for this use case. */ - public @Nullable Delete createDeleteMutation(RowData row) { + public @Nullable Delete createDeleteMutation(RowData row, long timestamp) { checkArgument(keyEncoder != null, "row key is not set."); byte[] rowkey = keyEncoder.encode(row, rowkeyIndex); if (rowkey.length == 0) { @@ -180,7 +180,7 @@ public HBaseSerde( return null; } // delete - Delete delete = new Delete(rowkey); + Delete delete = new Delete(rowkey, timestamp); for (int i = 0; i < fieldLength; i++) { if (i != rowkeyIndex) { int f = i > rowkeyIndex ? i - 1 : i; diff --git a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java index e370809d..85de7c8b 100644 --- a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java +++ b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java @@ -25,6 +25,7 @@ import org.apache.flink.table.types.DataType; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -105,7 +106,7 @@ void convertToReusedRowTest() { @Test public void writeIgnoreNullValueTest() { HBaseSerde serde = createHBaseSerde(false); - Put m1 = serde.createPutMutation(prepareRowData()); + Put m1 = serde.createPutMutation(prepareRowData(), HConstants.LATEST_TIMESTAMP); assert m1 != null; assertThat(m1.getRow()).isNotEmpty(); assertThat(m1.get(FAMILY1.getBytes(), F1COL1.getBytes())).isNotEmpty(); @@ -116,7 +117,9 @@ public void writeIgnoreNullValueTest() { assertThat(m1.get(FAMILY3.getBytes(), F3COL3.getBytes())).isNotEmpty(); HBaseSerde writeIgnoreNullValueSerde = createHBaseSerde(true); - Put m2 = writeIgnoreNullValueSerde.createPutMutation(prepareRowData()); + Put m2 = + writeIgnoreNullValueSerde.createPutMutation( + prepareRowData(), HConstants.LATEST_TIMESTAMP); assert m2 != null; assertThat(m2.getRow()).isNotEmpty(); assertThat(m2.get(FAMILY1.getBytes(), F1COL1.getBytes())).isEmpty();