Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33971]Specifies whether to use HBase table that supports dynamic columns. #36

Closed
wants to merge 5 commits into from

Conversation

MOBIN-F
Copy link
Contributor

@MOBIN-F MOBIN-F commented Jan 3, 2024

Specifies whether to use HBase table that supports dynamic columns.

Refer to the dynamic.table parameter in this document: https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv

Sample code for a result table that supports dynamic columns

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  f1hour STRING,
  f1deal BIGINT,
  f2day STRING,
  f2deal BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  f1 ROW<`hour` STRING, deal BIGINT>,
  f2 ROW<`day` STRING, deal BIGINT>
) WITH (
  'connector'='hbase-2.2',
  'table-name'='<yourTableName>',
  'zookeeper.quorum'='<yourZookeeperQuorum>',
  'dynamic.table'='true'
);

INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;

If dynamic.table is set to true, HBase table that supports dynamic columns is used.
Two fields must be declared in the rows that correspond to each column family. The value of the first field indicates the dynamic column, and the value of the second field indicates the value of the dynamic column.

For example, the datagen_source table contains a row of data The row of data indicates that the ID of the commodity is 1, the transaction amount of the commodity between 10:00 and 11:00 is 100, and the transaction amount of the commodity on July 26, 2020 is 10000. In this case, a row whose rowkey is 1 is inserted into the HBase table. f1:10 is 100, and f2:2020-7-26 is 10000.

Copy link

boring-cyborg bot commented Jan 3, 2024

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@MOBIN-F MOBIN-F changed the title Specifies whether to use HBase table that supports dynamic columns. [FLINK-33971]Specifies whether to use HBase table that supports dynamic columns. Jan 3, 2024
@MOBIN-F MOBIN-F changed the title [FLINK-33971]Specifies whether to use HBase table that supports dynamic columns. Specifies whether to use HBase table that supports dynamic columns. Jan 3, 2024
@MOBIN-F MOBIN-F changed the title Specifies whether to use HBase table that supports dynamic columns. [FLINK-33971]Specifies whether to use HBase table that supports dynamic columns. Jan 3, 2024
@MOBIN-F
Copy link
Contributor Author

MOBIN-F commented Jan 8, 2024

@MartijnVisser What do you think of this pr, thks.

@MartijnVisser
Copy link
Contributor

@ferenc-csaky Want to take a look?

@ferenc-csaky
Copy link
Contributor

@ferenc-csaky Want to take a look?

I'll review it in the next 2 days. Thanks for your contribution @MOBIN-F!

@MOBIN-F
Copy link
Contributor Author

MOBIN-F commented Jan 12, 2024

@ferenc-csaky Want to take a look?

I'll review it in the next 2 days. Thanks for your contribution @MOBIN-F!

I added some test cases

Copy link
Contributor

@ferenc-csaky ferenc-csaky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments to the implementation, but my understanding is that this feature is ApsaraDB specific, which means Alibaba Cloud specific.

Correct me if I'm wrong here, but in that case, the main question is that is it a good idea to introduce platform-specific features and flags to the IMO common upstream connector. I am open to discuss this, I don't know if we have other examples to these kind of stories or any rule of thumbs to follow here, @MartijnVisser WDYT?

@@ -60,6 +60,12 @@ public class HBaseConnectorOptions {
"Representation for null values for string fields. HBase source and "
+ "sink encodes/decodes empty bytes as null values for all types except string type.");

public static final ConfigOption<Boolean> DYNAMIC_TABLE =
ConfigOptions.key("dynamic.table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think dynamic-table would comply more to the other similar options like null-string-literal or table-name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

@@ -43,23 +43,26 @@ public class RowDataToMutationConverter implements HBaseMutationConverter<RowDat
private final TimestampMetadata timestampMetadata;
private final TimeToLiveMetadata timeToLiveMetadata;
private transient HBaseSerde serde;
private final boolean dynamicTable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we move the new field above serde to keep the grouping of the private final fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -121,6 +122,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

String tableName = tableOptions.get(TABLE_NAME);
boolean dynamicTable = tableOptions.get(DYNAMIC_TABLE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is unused, can be removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I think it would worth to make HBaseSerde to an abstract class, e.g. BaseHBaseSerde (or AbstractHBaseSerde, cause base HBase sounds a bit weird) , which contains any common logic and introduce 2 child classes (for example DefaultHBaseSerde and DynamicHBaseSerde) that cointains the different implementations where it makes sense.

Furthermore, I would add an error msg constant and use that message in the thrown UnsupportedOperationExceptions to make it more meaningful and explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this point of view. If you have no objection to the dynamic-table parameter, I am willing to do this part of the refactoring work.

@MartijnVisser
Copy link
Contributor

the main question is that is it a good idea to introduce platform-specific features and flags to the IMO common upstream connector.

If this isn't something that isn't supported in open source HBase itself, then at least we should first have a discussion on this topic in the Dev mailing list.

@MOBIN-F
Copy link
Contributor Author

MOBIN-F commented Jan 16, 2024

the main question is that is it a good idea to introduce platform-specific features and flags to the IMO common upstream connector.主要问题是,将特定于平台的功能和标志引入 IMO 通用上游连接器是否是一个好主意。

If this isn't something that isn't supported in open source HBase itself, then at least we should first have a discussion on this topic in the Dev mailing list.如果开源 HBase 本身不支持这一点,那么至少我们应该首先在 Dev 邮件列表中讨论这个话题。

@MartijnVisser @ferenc-csaky
open source hbase is supported. We have used this function internally online. The version is also open source hbase and has nothing to do with the cloud platform.

@Tan-JiaLiang
Copy link
Contributor

@MOBIN-F @ferenc-csaky @MartijnVisser Hi, sorry to jump into this discussion.

This option will make FlinkSQL behave strange IMO, because we define the ROW type and the actual behavior is the MAP type, so why not just implement it as a MAP type? And looking at the current implementation as long as user set this option in their table, scan and get will throw an exception, this case never appear in any other connector in my mind.

So I think is better to have a discussion on this topic in the dev mailing list.

@MartijnVisser
Copy link
Contributor

So I think is better to have a discussion on this topic in the dev mailing list.

+1

@ferenc-csaky
Copy link
Contributor

So I think is better to have a discussion on this topic in the dev mailing list.

Agreed!

@MOBIN-F MOBIN-F closed this Jan 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants