From 7228a862b026ca984959969087d5fec854f834f9 Mon Sep 17 00:00:00 2001 From: samcchen Date: Thu, 21 Mar 2024 20:00:08 +0800 Subject: [PATCH] [Feature] Support clickhouse jdbc datasource(#40894) Signed-off-by: samcchen --- be/src/exec/jdbc_scanner.cpp | 52 ++- fe/fe-core/pom.xml | 7 + .../java/com/starrocks/catalog/JDBCTable.java | 4 +- .../jdbc/ClickhouseSchemaResolver.java | 387 ++++++++++++++++++ .../connector/jdbc/JDBCMetadata.java | 2 + .../jdbc/ClickhouseSchemaResolverTest.java | 358 ++++++++++++++++ fe/pom.xml | 8 + java-extensions/jdbc-bridge/pom.xml | 11 + .../com/starrocks/jdbcbridge/JDBCScanner.java | 55 +-- .../java/com/starrocks/udf/UDFHelper.java | 17 + 10 files changed, 874 insertions(+), 27 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolver.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolverTest.java diff --git a/be/src/exec/jdbc_scanner.cpp b/be/src/exec/jdbc_scanner.cpp index 8bf5ecbe22c68b..bce60f34efd477 100644 --- a/be/src/exec/jdbc_scanner.cpp +++ b/be/src/exec/jdbc_scanner.cpp @@ -195,7 +195,26 @@ void JDBCScanner::_init_profile() { StatusOr JDBCScanner::_precheck_data_type(const std::string& java_class, SlotDescriptor* slot_desc) { auto type = slot_desc->type().type; - if (java_class == "java.lang.Short") { + if (java_class == "java.lang.Byte") { + if (type != TYPE_BOOLEAN && type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) { + return Status::NotSupported( + fmt::format("Type mismatches on column[{}], JDBC result type is Byte, please set the type to " + "one of boolean,tinyint,smallint,int,bigint", + slot_desc->col_name())); + } + if(type == TYPE_BOOLEAN){ + return TYPE_BOOLEAN; + } + return TYPE_TINYINT; + } else if (java_class == "com.clickhouse.data.value.UnsignedByte") { + if (type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) { + return Status::NotSupported( + fmt::format("Type mismatches on column[{}], JDBC result type is UnsignedByte, please set the type to " + "one of smallint,int,bigint", + slot_desc->col_name())); + } + return TYPE_SMALLINT; + } else if (java_class == "java.lang.Short") { if (type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) { return Status::NotSupported( fmt::format("Type mismatches on column[{}], JDBC result type is Short, please set the type to " @@ -203,6 +222,14 @@ StatusOr JDBCScanner::_precheck_data_type(const std::string& java_c slot_desc->col_name())); } return TYPE_SMALLINT; + } else if (java_class == "com.clickhouse.data.value.UnsignedShort") { + if (type != TYPE_INT && type != TYPE_BIGINT) { + return Status::NotSupported( + fmt::format("Type mismatches on column[{}], JDBC result type is UnsignedShort, please set the type to " + "one of int,bigint", + slot_desc->col_name())); + } + return TYPE_INT; } else if (java_class == "java.lang.Integer") { if (type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) { return Status::NotSupported( @@ -218,6 +245,13 @@ StatusOr JDBCScanner::_precheck_data_type(const std::string& java_c slot_desc->col_name())); } return TYPE_VARCHAR; + } else if (java_class == "com.clickhouse.data.value.UnsignedInteger") { + if (type != TYPE_BIGINT) { + return Status::NotSupported(fmt::format( + "Type mismatches on column[{}], JDBC result type is UnsignedInteger, please set the type to bigint", + slot_desc->col_name())); + } + return TYPE_BIGINT; } else if (java_class == "java.lang.Long") { if (type != TYPE_BIGINT) { return Status::NotSupported(fmt::format( @@ -232,6 +266,13 @@ StatusOr JDBCScanner::_precheck_data_type(const std::string& java_c slot_desc->col_name())); } return TYPE_VARCHAR; + } else if (java_class == "com.clickhouse.data.value.UnsignedLong") { + if (type != TYPE_LARGEINT) { + return Status::NotSupported(fmt::format( + "Type mismatches on column[{}], JDBC result type is UnsignedLong, please set the type to largeint", + slot_desc->col_name())); + } + return TYPE_VARCHAR; } else if (java_class == "java.lang.Boolean") { if (type != TYPE_BOOLEAN && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) { return Status::NotSupported( @@ -261,7 +302,7 @@ StatusOr JDBCScanner::_precheck_data_type(const std::string& java_c slot_desc->col_name())); } return TYPE_VARCHAR; - } else if (java_class == "java.sql.Date") { + } else if (java_class == "java.sql.Date" ) { if (type != TYPE_DATE) { return Status::NotSupported( fmt::format("Type mismatches on column[{}], JDBC result type is Date, please set the type to date", @@ -282,6 +323,13 @@ StatusOr JDBCScanner::_precheck_data_type(const std::string& java_c slot_desc->col_name())); } return TYPE_VARCHAR; + } else if (java_class == "java.time.LocalDate") { + if (type != TYPE_DATE) { + return Status::NotSupported(fmt::format( + "Type mismatches on column[{}], JDBC result type is LocalDate, please set the type to date", + slot_desc->col_name())); + } + return TYPE_VARCHAR; } else if (java_class == "java.math.BigDecimal") { if (type != TYPE_DECIMAL32 && type != TYPE_DECIMAL64 && type != TYPE_DECIMAL128 && type != TYPE_VARCHAR) { return Status::NotSupported( diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 5413e1dfcba67e..d242f405a99671 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -793,6 +793,13 @@ under the License. postgresql + + + + com.clickhouse + clickhouse-jdbc + + com.mockrunner diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/JDBCTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/JDBCTable.java index 2940963887fdd3..1edbf5ed66f17c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/JDBCTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/JDBCTable.java @@ -280,6 +280,8 @@ public enum ProtocolType { MYSQL, POSTGRES, ORACLE, - MARIADB + MARIADB, + + CLICKHOUSE } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolver.java b/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolver.java new file mode 100644 index 00000000000000..d4fe7fbd94029e --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolver.java @@ -0,0 +1,387 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.jdbc; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.starrocks.catalog.JDBCTable; +import com.starrocks.catalog.PrimitiveType; +import com.starrocks.catalog.ScalarType; +import com.starrocks.catalog.Table; +import com.starrocks.catalog.Type; +import com.starrocks.connector.exception.StarRocksConnectorException; +import org.jetbrains.annotations.NotNull; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +public class ClickhouseSchemaResolver extends JDBCSchemaResolver { + Map properties; + + public static final String KEY_FOR_TABLE_NAME_FOR_PARTITION_INFO = "table_name_for_partition_info"; + public static final String KEY_FOR_TABLE_NAME_FOR_TABLE_INFO = "table_name_for_table_info"; + private static final String DEFAULT_TABLE_NAME_FOR_PARTITION_INFO = "system.parts"; + private static final String DEFAULT_TABLE_NAME_FOR_TABLE_INFO = "system.tables"; + private static final Set SUPPORTED_TABLE_TYPES = new HashSet<>( + Arrays.asList("LOG TABLE", "MEMORY TABLE", "TEMPORARY TABLE", "VIEW", "DICTIONARY", "SYSTEM TABLE", + "REMOTE TABLE", "TABLE")); + + public ClickhouseSchemaResolver(Map properties) { + this.properties = properties; + } + + @Override + public Collection listSchemas(Connection connection) { + + try (Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery("SHOW DATABASES")) { + ImmutableSet.Builder schemaNames = ImmutableSet.builder(); + while (resultSet.next()) { + String schemaName = resultSet.getString("NAME"); + // skip internal schemas + if (!schemaName.equalsIgnoreCase("INFORMATION_SCHEMA") && !schemaName.equalsIgnoreCase("system")) { + schemaNames.add(schemaName); + } + } + return schemaNames.build(); + } catch (SQLException e) { + throw new StarRocksConnectorException(e.getMessage()); + } + } + + + @Override + public ResultSet getTables(Connection connection, String dbName) throws SQLException { + String tableTypes = properties.get("table_types"); + if (null != tableTypes) { + String[] tableTypesArray = tableTypes.split(","); + if (tableTypesArray.length == 0) { + throw new StarRocksConnectorException("table_types should be populated with table types separated by " + + "comma, e.g. 'TABLE,VIEW'. Currently supported type includes:" + + String.join(",", SUPPORTED_TABLE_TYPES)); + } + + for (String tt : tableTypesArray) { + if (!SUPPORTED_TABLE_TYPES.contains(tt)) { + throw new StarRocksConnectorException("Unsupported table type found: " + tt, + ",Currently supported table types includes:" + String.join(",", SUPPORTED_TABLE_TYPES)); + } + } + return connection.getMetaData().getTables(connection.getCatalog(), dbName, null, tableTypesArray); + } + return connection.getMetaData().getTables(connection.getCatalog(), dbName, null, new String[] {"TABLE", "VIEW"}); + + } + + @Override + public ResultSet getColumns(Connection connection, String dbName, String tblName) throws SQLException { + return connection.getMetaData().getColumns(connection.getCatalog(), dbName, tblName, "%"); + } + + @Override + public boolean checkAndSetSupportPartitionInformation(Connection connection) { + // The architecture of user's clickhouse is undermined, so we allow user to specify + // and fall back to clickhouse's default system table for partition information, i.e. `system.parts`. + String tableNameForPartInfo = getTableNameForPartInfo(); + String[] schemaAndTable = tableNameForPartInfo.split("\\."); + if (schemaAndTable.length != 2) { + throw new StarRocksConnectorException(String.format("Invalid table name for partition information: %s," + + "Please specify the full table name .", tableNameForPartInfo)); + } + String catalogSchema = schemaAndTable[0]; + String partitionInfoTable = schemaAndTable[1]; + // Different types of MySQL protocol databases have different case names for schema and table names, + // which need to be converted to lowercase for comparison + try (ResultSet catalogSet = connection.getMetaData().getCatalogs()) { + while (catalogSet.next()) { + String schemaName = catalogSet.getString("TABLE_CAT"); + if (schemaName.equalsIgnoreCase(catalogSchema)) { + try (ResultSet tableSet = connection.getMetaData().getTables(catalogSchema, null, null, null)) { + while (tableSet.next()) { + String tableName = tableSet.getString("TABLE_NAME"); + if (tableName.equalsIgnoreCase(partitionInfoTable)) { + return this.supportPartitionInformation = true; + } + } + } + } + } + } catch (SQLException e) { + throw new StarRocksConnectorException(e.getMessage()); + } + return this.supportPartitionInformation = false; + } + + private String getTableNameForPartInfo() { + return properties.getOrDefault(KEY_FOR_TABLE_NAME_FOR_PARTITION_INFO, DEFAULT_TABLE_NAME_FOR_PARTITION_INFO); + } + + private String getTableNameForTableInfo() { + return properties.getOrDefault(KEY_FOR_TABLE_NAME_FOR_TABLE_INFO, DEFAULT_TABLE_NAME_FOR_TABLE_INFO); + } + + @Override + public Type convertColumnType(int dataType, String typeName, int columnSize, int digits) { + + PrimitiveType primitiveType; + boolean isUnsigned = typeName.toLowerCase().startsWith("uint"); + + switch (typeName) { + case "Int8": + primitiveType = PrimitiveType.TINYINT; + break; + case "UInt8": + case "Int16": + primitiveType = PrimitiveType.SMALLINT; + break; + case "UInt16": + case "Int32": + primitiveType = PrimitiveType.INT; + break; + case "UInt32": + case "Int64": + primitiveType = PrimitiveType.BIGINT; + break; + case "UInt64": + case "Int128": + case "UInt128": + case "Int256": + case "UInt256": + primitiveType = PrimitiveType.LARGEINT; + break; + case "Float32": + primitiveType = PrimitiveType.FLOAT; + break; + case "Float64": + primitiveType = PrimitiveType.DOUBLE; + break; + case "Bool": + primitiveType = PrimitiveType.BOOLEAN; + break; + case "String": + return ScalarType.createVarcharType(65533); + case "Date": + primitiveType = PrimitiveType.DATE; + break; + case "DateTime": + primitiveType = PrimitiveType.DATETIME; + break; + default: + // Decimal(9,9), first 9 is precision, second 9 is scale + if (typeName.startsWith("Decimal")) { + String[] precisionAndScale = + typeName.replace("Decimal", "").replace("(", "") + .replace(")", "").replace(" ", "") + .split(","); + if (precisionAndScale.length != 2) { + // should not go here, but if it does, we make it DECIMALV2. + throw new StarRocksConnectorException( + "Cannot extract precision and scale from Decimal typename:" + typeName); + } else { + int precision = Integer.parseInt(precisionAndScale[0]); + int scale = Integer.parseInt(precisionAndScale[1]); + return ScalarType.createUnifiedDecimalType(precision, scale); + } + } + primitiveType = PrimitiveType.UNKNOWN_TYPE; + break; + } + return ScalarType.createType(primitiveType); + } + + @Override + public List listPartitionNames(Connection connection, String databaseName, String tableName) { + String tableNameForPartInfo = getTableNameForPartInfo(); + String partitionNamesQuery = "SELECT DISTINCT partition FROM " + tableNameForPartInfo + " WHERE database = ? " + + "AND table = ? AND name IS NOT NULL " + "ORDER BY name"; + try (PreparedStatement ps = connection.prepareStatement(partitionNamesQuery)) { + ps.setString(1, databaseName); + ps.setString(2, tableName); + ResultSet rs = ps.executeQuery(); + ImmutableList.Builder list = ImmutableList.builder(); + if (null != rs) { + while (rs.next()) { + String partitionName = rs.getString("name"); + list.add(partitionName); + } + return list.build(); + } else { + return Lists.newArrayList(); + } + } catch (SQLException | NullPointerException e) { + throw new StarRocksConnectorException(e.getMessage(), e); + } + } + + @Override + public List listPartitionColumns(Connection connection, String databaseName, String tableName) { + String tableNameForTableInfo = getTableNameForTableInfo(); + String localTableName = getLocalTableNameFromDistributedTableName(tableName); + + String partitionColumnsQuery = "SELECT DISTINCT partition_key FROM " + tableNameForTableInfo + + " WHERE database = ? AND name = ? AND partition_key IS NOT NULL "; + try (PreparedStatement ps = connection.prepareStatement(partitionColumnsQuery)) { + ps.setString(1, databaseName); + ps.setString(2, localTableName); + ResultSet rs = ps.executeQuery(); + ImmutableList.Builder list = ImmutableList.builder(); + if (null != rs) { + while (rs.next()) { + String partitionColumn = rs.getString("PARTITION_EXPRESSION").replace("`", ""); + list.add(partitionColumn); + } + try { + rs.close(); + } catch (Exception e) { + ; + } + return list.build(); + } else { + return Lists.newArrayList(); + } + } catch (SQLException | NullPointerException e) { + throw new StarRocksConnectorException(e.getMessage(), e); + } + } + + @NotNull + private String getLocalTableNameFromDistributedTableName(String tableName) { + String prefix = properties.getOrDefault("prefix_for_local_table", ""); + String suffix = properties.getOrDefault("suffix_for_local_table", ""); + String localTableName = prefix + tableName + suffix; + return localTableName; + } + + public List getPartitions(Connection connection, Table table) { + JDBCTable jdbcTable = (JDBCTable) table; + String query = getPartitionQuery(table); + String localTableName = getLocalTableNameFromDistributedTableName(jdbcTable.getJdbcTable()); + try (PreparedStatement ps = connection.prepareStatement(query)) { + ps.setString(1, jdbcTable.getDbName()); + ps.setString(2, localTableName); + ResultSet rs = ps.executeQuery(); + ImmutableList.Builder list = ImmutableList.builder(); + if (null != rs) { + while (rs.next()) { + String partitionName = rs.getString("NAME"); + long createTime = rs.getTimestamp("MODIFIED_TIME").getTime(); + list.add(new Partition(partitionName, createTime)); + } + return list.build(); + } else { + return Lists.newArrayList(); + } + } catch (SQLException | NullPointerException e) { + throw new StarRocksConnectorException(e.getMessage(), e); + } + } + + /** + * Fetch jdbc table's partition info from `system.parts` or user-specified table with the same table structure. + * eg: + * clickhouse> desc system.parts; + * ┌─name──────────────────────────────────┬─type────────────┬─default_type─┬─default_expression─┬ + * │ partition │ String │ │ │ + * │ name │ String │ │ │ + * │ uuid │ UUID │ │ │ + * │ part_type │ String │ │ │ + * │ active │ UInt8 │ │ │ + * │ marks │ UInt64 │ │ │ + * │ rows │ UInt64 │ │ │ + * │ bytes_on_disk │ UInt64 │ │ │ + * │ data_compressed_bytes │ UInt64 │ │ │ + * │ data_uncompressed_bytes │ UInt64 │ │ │ + * │ marks_bytes │ UInt64 │ │ │ + * │ secondary_indices_compressed_bytes │ UInt64 │ │ │ + * │ secondary_indices_uncompressed_bytes │ UInt64 │ │ │ + * │ secondary_indices_marks_bytes │ UInt64 │ │ │ + * │ modification_time │ DateTime │ │ │ + * │ remove_time │ DateTime │ │ │ + * │ refcount │ UInt32 │ │ │ + * │ min_date │ Date │ │ │ + * │ max_date │ Date │ │ │ + * │ min_time │ DateTime │ │ │ + * │ max_time │ DateTime │ │ │ + * │ partition_id │ String │ │ │ + * │ min_block_number │ Int64 │ │ │ + * │ max_block_number │ Int64 │ │ │ + * │ level │ UInt32 │ │ │ + * │ data_version │ UInt64 │ │ │ + * │ primary_key_bytes_in_memory │ UInt64 │ │ │ + * │ primary_key_bytes_in_memory_allocated │ UInt64 │ │ │ + * │ is_frozen │ UInt8 │ │ │ + * │ database │ String │ │ │ + * │ table │ String │ │ │ + * │ engine │ String │ │ │ + * │ disk_name │ String │ │ │ + * │ path │ String │ │ │ + * │ hash_of_all_files │ String │ │ │ + * │ hash_of_uncompressed_files │ String │ │ │ + * │ uncompressed_hash_of_compressed_files │ String │ │ │ + * │ delete_ttl_info_min │ DateTime │ │ │ + * │ delete_ttl_info_max │ DateTime │ │ │ + * │ move_ttl_info.expression │ Array(String) │ │ │ + * │ move_ttl_info.min │ Array(DateTime) │ │ │ + * │ move_ttl_info.max │ Array(DateTime) │ │ │ + * │ default_compression_codec │ String │ │ │ + * │ recompression_ttl_info.expression │ Array(String) │ │ │ + * │ recompression_ttl_info.min │ Array(DateTime) │ │ │ + * │ recompression_ttl_info.max │ Array(DateTime) │ │ │ + * │ group_by_ttl_info.expression │ Array(String) │ │ │ + * │ group_by_ttl_info.min │ Array(DateTime) │ │ │ + * │ group_by_ttl_info.max │ Array(DateTime) │ │ │ + * │ rows_where_ttl_info.expression │ Array(String) │ │ │ + * │ rows_where_ttl_info.min │ Array(DateTime) │ │ │ + * │ rows_where_ttl_info.max │ Array(DateTime) │ │ │ + * │ projections │ Array(String) │ │ │ + * │ bytes │ UInt64 │ ALIAS │ bytes_on_disk │ + * │ marks_size │ UInt64 │ ALIAS │ marks_bytes │ + * └───────────────────────────────────────┴─────────────────┴──────────────┴────────────────────┴ + * + * @param table + * @return + */ + @NotNull + private String getPartitionQuery(Table table) { + if (table.isPartitioned()) { + String tableNameForPartInfo = getTableNameForPartInfo(); + final String partitionQuery = + "SELECT partition AS NAME, max(modification_time) AS MODIFIED_TIME FROM " + tableNameForPartInfo + + " WHERE database = ? " + "AND table = ? AND name IS NOT NULL" + + " GROUP BY partition ORDER BY partition"; + return partitionQuery; + } else { + String tableNameForTableInfo = getTableNameForTableInfo(); + final String nonPartitionQuery = + " SELECT name AS NAME, max(metadata_modification_time) AS MODIFIED_TIME FROM " + + tableNameForTableInfo + + " WHERE database = ? AND name = ? AND name IS NOT NULL GROUP BY name"; + return nonPartitionQuery; + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/JDBCMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/JDBCMetadata.java index 5b190d49d78771..1240b35cbe2253 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/JDBCMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/JDBCMetadata.java @@ -80,6 +80,8 @@ public JDBCMetadata(Map properties, String catalogName, HikariDa schemaResolver = new PostgresSchemaResolver(); } else if (properties.get(JDBCResource.DRIVER_CLASS).toLowerCase().contains("mariadb")) { schemaResolver = new MysqlSchemaResolver(); + } else if (properties.get(JDBCResource.DRIVER_CLASS).toLowerCase().contains("clickhouse")) { + schemaResolver = new ClickhouseSchemaResolver(properties); } else { LOG.warn("{} not support yet", properties.get(JDBCResource.DRIVER_CLASS)); throw new StarRocksConnectorException(properties.get(JDBCResource.DRIVER_CLASS) + " not support yet"); diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolverTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolverTest.java new file mode 100644 index 00000000000000..3bd5bd18ce6082 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolverTest.java @@ -0,0 +1,358 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.jdbc; + +import com.google.common.collect.Lists; +import com.mockrunner.mock.jdbc.MockResultSet; +import com.starrocks.catalog.Column; +import com.starrocks.catalog.JDBCResource; +import com.starrocks.catalog.JDBCTable; +import com.starrocks.catalog.Type; +import com.starrocks.common.DdlException; +import com.starrocks.connector.PartitionUtil; +import com.starrocks.qe.ConnectContext; +import com.starrocks.utframe.UtFrameUtils; +import com.zaxxer.hikari.HikariDataSource; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static com.starrocks.catalog.JDBCResource.DRIVER_CLASS; + +public class ClickhouseSchemaResolverTest { + + private static ConnectContext connectContext; + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Mocked + HikariDataSource dataSource; + + @Mocked + Connection connection; + + @Mocked + PreparedStatement preparedStatement; + + @Mocked + Statement statement; + + private Map properties; + private MockResultSet dbResult; + private MockResultSet tableResult; + private MockResultSet partitionsResult; + private Map tableIdCache; + + @BeforeClass + public static void beforeClass() throws Exception { + UtFrameUtils.createMinStarRocksCluster(); + + // create connect context + connectContext = UtFrameUtils.createDefaultCtx(); + } + + @Before + public void setUp() throws SQLException { + partitionsResult = new MockResultSet("partitions"); + partitionsResult.addColumn("NAME", Arrays.asList("20230810")); + partitionsResult.addColumn("PARTITION_EXPRESSION", Arrays.asList("`d`")); + partitionsResult.addColumn("MODIFIED_TIME", Arrays.asList("2023-08-01 00:00:00")); + + properties = new HashMap<>(); + properties.put(DRIVER_CLASS, "com.clickhouse.jdbc.ClickHouseDriver"); + properties.put(JDBCResource.URI, "jdbc:clickhouse://127.0.0.1:8123"); + properties.put(JDBCResource.USER, "root"); + properties.put(JDBCResource.PASSWORD, "123456"); + properties.put(JDBCResource.CHECK_SUM, "xxxx"); + properties.put(JDBCResource.DRIVER_URL, "xxxx"); + tableIdCache = new ConcurrentHashMap<>(); + tableIdCache.put(JDBCTableName.of("catalog", "test", "tbl1"), 100000); + + new Expectations() { + { + dataSource.getConnection(); + result = connection; + minTimes = 0; + + preparedStatement.executeQuery(); + result = partitionsResult; + minTimes = 0; + } + }; + } + + @Test + public void testCheckPartitionWithoutPartitionsTable() { + try { + JDBCSchemaResolver schemaResolver = new ClickhouseSchemaResolver(properties); + Assert.assertFalse(schemaResolver.checkAndSetSupportPartitionInformation(connection)); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testCheckPartitionWithPartitionsTable() throws SQLException { + new Expectations() { + { + dbResult = new MockResultSet("catalog"); + dbResult.addColumn("TABLE_CAT", Arrays.asList("system")); + + connection.getMetaData().getCatalogs(); + result = dbResult; + minTimes = 0; + + MockResultSet piResult = new MockResultSet("partitions"); + piResult.addColumn("TABLE_NAME", Arrays.asList("parts")); + connection.getMetaData().getTables(anyString, null, null, null); + result = piResult; + minTimes = 0; + } + }; + try { + JDBCSchemaResolver schemaResolver = new ClickhouseSchemaResolver(properties); + Assert.assertTrue(schemaResolver.checkAndSetSupportPartitionInformation(connection)); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testCheckPartitionWithUserSpecifiedPartitionsTable() throws SQLException { + new Expectations() { + { + dbResult = new MockResultSet("catalog"); + dbResult.addColumn("TABLE_CAT", Arrays.asList("clickhouse_manager")); + + connection.getMetaData().getCatalogs(); + result = dbResult; + minTimes = 0; + + MockResultSet piResult = new MockResultSet("partitions"); + piResult.addColumn("TABLE_NAME", Arrays.asList("parts")); + connection.getMetaData().getTables(anyString, null, null, null); + result = piResult; + minTimes = 0; + } + }; + try { + Map propertiesWithUserDefinedPartitionTable = new HashMap<>(properties); + propertiesWithUserDefinedPartitionTable.put(ClickhouseSchemaResolver.KEY_FOR_TABLE_NAME_FOR_PARTITION_INFO, + "clickhouse_manager.parts"); + JDBCSchemaResolver schemaResolver = new ClickhouseSchemaResolver(propertiesWithUserDefinedPartitionTable); + Assert.assertTrue(schemaResolver.checkAndSetSupportPartitionInformation(connection)); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testListPartitionNames() { + try { + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + List partitionNames = jdbcMetadata.listPartitionNames("test", "tbl1"); + Assert.assertFalse(partitionNames.isEmpty()); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testListPartitionNamesWithCache() { + try { + JDBCCacheTestUtil.openCacheEnable(connectContext); + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + List partitionNames = jdbcMetadata.listPartitionNames("test", "tbl1"); + Assert.assertFalse(partitionNames.isEmpty()); + List partitionNamesWithCache = jdbcMetadata.listPartitionNames("test", "tbl1"); + Assert.assertFalse(partitionNamesWithCache.isEmpty()); + JDBCCacheTestUtil.closeCacheEnable(connectContext); + Map properties = new HashMap<>(); + jdbcMetadata.refreshCache(properties); + List partitionNamesWithOutCache = jdbcMetadata.listPartitionNames("test", "tbl1"); + Assert.assertTrue(partitionNamesWithOutCache.isEmpty()); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testListPartitionNamesRsNull() { + try { + new Expectations() { + { + preparedStatement.executeQuery(); + result = null; + minTimes = 0; + } + }; + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + List partitionNames = jdbcMetadata.listPartitionNames("test", "tbl1"); + Assert.assertTrue(partitionNames.size() == 0); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testListPartitionColumns() { + try { + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + Integer size = jdbcMetadata.listPartitionColumns("test", "tbl1", + Arrays.asList(new Column("d", Type.VARCHAR))).size(); + Assert.assertTrue(size > 0); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testListPartitionColumnsRsNull() { + try { + new Expectations() { + { + preparedStatement.executeQuery(); + result = null; + minTimes = 0; + } + }; + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + Integer size = jdbcMetadata.listPartitionColumns("test", "tbl1", + Arrays.asList(new Column("d", Type.VARCHAR))).size(); + Assert.assertTrue(size == 0); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testGetPartitions() { + try { + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + JDBCTable jdbcTable = new JDBCTable(100000, "tbl1", Arrays.asList(new Column("d", Type.VARCHAR)), + Arrays.asList(new Column("d", Type.VARCHAR)), "test", "catalog", properties); + Integer size = jdbcMetadata.getPartitions(jdbcTable, Arrays.asList("20230810")).size(); + Assert.assertTrue(size > 0); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testGetPartitionsWithCache() { + try { + JDBCCacheTestUtil.openCacheEnable(connectContext); + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + JDBCTable jdbcTable = new JDBCTable(100000, "tbl1", Arrays.asList(new Column("d", Type.VARCHAR)), + Arrays.asList(new Column("d", Type.VARCHAR)), "test", "catalog", properties); + int size = jdbcMetadata.getPartitions(jdbcTable, Arrays.asList("20230810")).size(); + Assert.assertTrue(size > 0); + int sizeWithCache = jdbcMetadata.getPartitions(jdbcTable, Arrays.asList("20230810")).size(); + Assert.assertTrue(sizeWithCache > 0); + JDBCCacheTestUtil.closeCacheEnable(connectContext); + Map properties = new HashMap<>(); + jdbcMetadata.refreshCache(properties); + int sizeWithOutCache = jdbcMetadata.getPartitions(jdbcTable, Arrays.asList("20230810")).size(); + Assert.assertEquals(0, sizeWithOutCache); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testGetPartitions_NonPartitioned() throws DdlException { // to fix + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + List columns = Arrays.asList(new Column("d", Type.VARCHAR)); + JDBCTable jdbcTable = new JDBCTable(100000, "tbl1", columns, Lists.newArrayList(), + "test", "catalog", properties); + int size = jdbcMetadata.getPartitions(jdbcTable, Arrays.asList("20230810")).size(); + Assert.assertEquals(1, size); + List partitionNames = PartitionUtil.getPartitionNames(jdbcTable); + Assert.assertEquals(Arrays.asList("tbl1"), partitionNames); + } + + @Test + public void testGetPartitionsRsNull() { + try { + new Expectations() { + { + preparedStatement.executeQuery(); + result = null; + minTimes = 0; + } + }; + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + JDBCTable jdbcTable = new JDBCTable(100000, "tbl1", Arrays.asList(new Column("d", Type.VARCHAR)), + Arrays.asList(new Column("d", Type.VARCHAR)), "test", "catalog", properties); + Integer size = jdbcMetadata.getPartitions(jdbcTable, Arrays.asList("20230810")).size(); + Assert.assertTrue(size == 0); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testClickHouseInvalidPartition() { + try { + MockResultSet invalidPartition = new MockResultSet("partitions"); + invalidPartition.addColumn("NAME", Arrays.asList("'20230810'")); + invalidPartition.addColumn("PARTITION_EXPRESSION", Arrays.asList("`d`")); + invalidPartition.addColumn("MODIFIED_TIME", Arrays.asList("2023-08-01")); + + new Expectations() { + { + preparedStatement.executeQuery(); + result = invalidPartition; + minTimes = 0; + } + }; + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + List columns = Arrays.asList(new Column("d", Type.VARCHAR)); + JDBCTable jdbcTable = new JDBCTable(100000, "tbl1", columns, Lists.newArrayList(), + "test", "catalog", properties); + jdbcMetadata.getPartitions(jdbcTable, Arrays.asList("20230810")).size(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Timestamp format must be yyyy-mm-dd hh:mm:ss")); + } + } +} diff --git a/fe/pom.xml b/fe/pom.xml index e7393f60042e7b..b76e1ba661e1a4 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -717,6 +717,14 @@ under the License. 42.4.4 + + + + com.clickhouse + clickhouse-jdbc + 0.4.6 + + com.mockrunner diff --git a/java-extensions/jdbc-bridge/pom.xml b/java-extensions/jdbc-bridge/pom.xml index af739c4e7822e3..127ca0a079433c 100644 --- a/java-extensions/jdbc-bridge/pom.xml +++ b/java-extensions/jdbc-bridge/pom.xml @@ -36,6 +36,17 @@ log4j-slf4j-impl ${log4j.version} + + + + + org.lz4 + lz4-java + 1.8.0 + + + + diff --git a/java-extensions/jdbc-bridge/src/main/java/com/starrocks/jdbcbridge/JDBCScanner.java b/java-extensions/jdbc-bridge/src/main/java/com/starrocks/jdbcbridge/JDBCScanner.java index 77904b6d5b0e89..c2522465ecaa21 100644 --- a/java-extensions/jdbc-bridge/src/main/java/com/starrocks/jdbcbridge/JDBCScanner.java +++ b/java-extensions/jdbc-bridge/src/main/java/com/starrocks/jdbcbridge/JDBCScanner.java @@ -29,15 +29,17 @@ import java.sql.ResultSetMetaData; import java.sql.Time; import java.sql.Timestamp; +import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; - public class JDBCScanner { private String driverLocation; private HikariDataSource dataSource; @@ -51,7 +53,6 @@ public class JDBCScanner { private int resultNumRows = 0; ClassLoader classLoader; - public JDBCScanner(String driverLocation, JDBCScanContext scanContext) { this.driverLocation = driverLocation; this.scanContext = scanContext; @@ -61,9 +62,7 @@ public void open() throws Exception { String key = scanContext.getUser() + "/" + scanContext.getJdbcURL(); URL driverURL = new File(driverLocation).toURI().toURL(); DataSourceCache.DataSourceCacheItem cacheItem = DataSourceCache.getInstance().getSource(key, () -> { - ClassLoader classLoader = URLClassLoader.newInstance(new URL[] { - driverURL, - }); + ClassLoader classLoader = URLClassLoader.newInstance(new URL[] {driverURL}); Thread.currentThread().setContextClassLoader(classLoader); HikariConfig config = new HikariConfig(); config.setDriverClassName(scanContext.getDriverClassName()); @@ -84,7 +83,8 @@ public void open() throws Exception { connection = dataSource.getConnection(); connection.setAutoCommit(false); - statement = connection.prepareStatement(scanContext.getSql(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement = connection.prepareStatement(scanContext.getSql(), ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY); if (scanContext.getDriverClassName().toLowerCase(Locale.ROOT).contains("mysql")) { statement.setFetchSize(Integer.MIN_VALUE); } else { @@ -100,32 +100,36 @@ public void open() throws Exception { Class clazz = classLoader.loadClass(resultSetMetaData.getColumnClassName(i)); if (isGeneralJDBCClassType(clazz)) { resultChunk.add((Object[]) Array.newInstance(clazz, scanContext.getStatementFetchSize())); + } else if (null != mapEngineSpecificClassType(clazz)) { + Class targetClass = mapEngineSpecificClassType(clazz); + resultChunk.add((Object[]) Array.newInstance(targetClass, scanContext.getStatementFetchSize())); } else { resultChunk.add((Object[]) Array.newInstance(String.class, scanContext.getStatementFetchSize())); } } } - private static final Set> GENERAL_JDBC_CLASS_SET = new HashSet<>(Arrays.asList( - Boolean.class, - Short.class, - Integer.class, - Long.class, - Float.class, - Double.class, - BigInteger.class, - BigDecimal.class, - java.sql.Date.class, - Timestamp.class, - LocalDateTime.class, - Time.class, - String.class - )); + private static final Set> GENERAL_JDBC_CLASS_SET = new HashSet<>( + Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class, + BigInteger.class, BigDecimal.class, java.sql.Date.class, Timestamp.class, LocalDate.class, + LocalDateTime.class, Time.class, String.class)); private boolean isGeneralJDBCClassType(Class clazz) { return GENERAL_JDBC_CLASS_SET.contains(clazz); } + private static final Map ENGINE_SPECIFIC_CLASS_MAPPING = new HashMap() {{ + put("com.clickhouse.data.value.UnsignedByte", Short.class); + put("com.clickhouse.data.value.UnsignedShort", Integer.class); + put("com.clickhouse.data.value.UnsignedInteger", Long.class); + put("com.clickhouse.data.value.UnsignedLong", BigInteger.class); + }}; + + private Class mapEngineSpecificClassType(Class clazz) { + String className = clazz.getName(); + return ENGINE_SPECIFIC_CLASS_MAPPING.get(className); + } + // used for cpp interface public List getResultColumnClassNames() { return resultColumnClassNames; @@ -165,8 +169,12 @@ public List getNextChunk() throws Exception { // if both sides are String, assign value directly to avoid additional calls to getString dataColumn[resultNumRows] = resultObject; } else if (!(dataColumn instanceof String[])) { - // for other general class type, assign value directly - dataColumn[resultNumRows] = resultObject; + if (dataColumn instanceof BigInteger[] && resultObject instanceof Number) { + dataColumn[resultNumRows] = new BigInteger(resultObject.toString()); + } else { + // for other general class type, assign value directly + dataColumn[resultNumRows] = resultObject; + } } else { // for non-general class type, use string representation dataColumn[resultNumRows] = resultSet.getString(i + 1); @@ -181,7 +189,6 @@ public int getResultNumRows() { return resultNumRows; } - public void close() throws Exception { if (resultSet != null) { resultSet.close(); diff --git a/java-extensions/udf-extensions/src/main/java/com/starrocks/udf/UDFHelper.java b/java-extensions/udf-extensions/src/main/java/com/starrocks/udf/UDFHelper.java index da28ad1567cb4a..1273f8530c359c 100644 --- a/java-extensions/udf-extensions/src/main/java/com/starrocks/udf/UDFHelper.java +++ b/java-extensions/udf-extensions/src/main/java/com/starrocks/udf/UDFHelper.java @@ -30,6 +30,7 @@ import java.sql.Timestamp; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; @@ -49,6 +50,7 @@ public class UDFHelper { public static final int TYPE_ARRAY = 19; public static final int TYPE_BOOLEAN = 24; public static final int TYPE_TIME = 44; + public static final int TYPE_DATE = 50; public static final int TYPE_DATETIME = 51; private static final byte[] emptyBytes = new byte[0]; @@ -56,6 +58,8 @@ public class UDFHelper { private static final ThreadLocal formatter = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private static final TimeZone timeZone = TimeZone.getDefault(); private static void getBooleanBoxedResult(int numRows, Boolean[] boxedArr, long columnAddr) { @@ -226,6 +230,17 @@ private static void getStringDateResult(int numRows, Date[] column, long columnA getStringBoxedResult(numRows, results, columnAddr); } + private static void getStringLocalDateResult(int numRows, LocalDate[] column, long columnAddr) { + // TODO: return timestamp + String[] results = new String[numRows]; + for (int i = 0; i < numRows; i++) { + if (column[i] != null) { + results[i] = dateFormatter.format(column[i]); + } + } + getStringBoxedResult(numRows, results, columnAddr); + } + private static void getStringTimeStampResult(int numRows, Timestamp[] column, long columnAddr) { // TODO: return timestamp String[] results = new String[numRows]; @@ -326,6 +341,8 @@ public static void getResultFromBoxedArray(int type, int numRows, Object boxedRe case TYPE_VARCHAR: { if (boxedResult instanceof Date[]) { getStringDateResult(numRows, (Date[]) boxedResult, columnAddr); + } else if (boxedResult instanceof LocalDate[]) { + getStringLocalDateResult(numRows, (LocalDate[]) boxedResult, columnAddr); } else if (boxedResult instanceof LocalDateTime[]) { getStringDateTimeResult(numRows, (LocalDateTime[]) boxedResult, columnAddr); } else if (boxedResult instanceof Timestamp[]) {