From 88508989ad853a26686841ef86947fc104aae884 Mon Sep 17 00:00:00 2001 From: samcchen Date: Thu, 21 Mar 2024 20:00:08 +0800 Subject: [PATCH] [Feature] Support clickhouse jdbc datasource(#40894) Signed-off-by: samcchen --- be/src/exec/jdbc_scanner.cpp | 51 +++- fe/fe-core/pom.xml | 7 + .../java/com/starrocks/catalog/JDBCTable.java | 4 +- .../jdbc/ClickhouseSchemaResolver.java | 151 +++++++++++ .../connector/jdbc/JDBCMetadata.java | 2 + .../jdbc/ClickhouseSchemaResolverTest.java | 245 ++++++++++++++++++ fe/pom.xml | 8 + java-extensions/jdbc-bridge/pom.xml | 11 + .../com/starrocks/jdbcbridge/JDBCScanner.java | 55 ++-- .../java/com/starrocks/udf/UDFHelper.java | 17 ++ 10 files changed, 525 insertions(+), 26 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolver.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolverTest.java diff --git a/be/src/exec/jdbc_scanner.cpp b/be/src/exec/jdbc_scanner.cpp index 8bf5ecbe22c68..5546ee45ed8cc 100644 --- a/be/src/exec/jdbc_scanner.cpp +++ b/be/src/exec/jdbc_scanner.cpp @@ -195,7 +195,27 @@ void JDBCScanner::_init_profile() { StatusOr JDBCScanner::_precheck_data_type(const std::string& java_class, SlotDescriptor* slot_desc) { auto type = slot_desc->type().type; - if (java_class == "java.lang.Short") { + if (java_class == "java.lang.Byte") { + if (type != TYPE_BOOLEAN && type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && + type != TYPE_BIGINT) { + return Status::NotSupported( + fmt::format("Type mismatches on column[{}], JDBC result type is Byte, please set the type to " + "one of boolean,tinyint,smallint,int,bigint", + slot_desc->col_name())); + } + if (type == TYPE_BOOLEAN) { + return TYPE_BOOLEAN; + } + return TYPE_TINYINT; + } else if (java_class == "com.clickhouse.data.value.UnsignedByte") { + if (type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) { + return Status::NotSupported(fmt::format( + "Type mismatches on column[{}], JDBC result type is UnsignedByte, please set the type to " + "one of smallint,int,bigint", + slot_desc->col_name())); + } + return TYPE_SMALLINT; + } else if (java_class == "java.lang.Short") { if (type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) { return Status::NotSupported( fmt::format("Type mismatches on column[{}], JDBC result type is Short, please set the type to " @@ -203,6 +223,14 @@ StatusOr JDBCScanner::_precheck_data_type(const std::string& java_c slot_desc->col_name())); } return TYPE_SMALLINT; + } else if (java_class == "com.clickhouse.data.value.UnsignedShort") { + if (type != TYPE_INT && type != TYPE_BIGINT) { + return Status::NotSupported(fmt::format( + "Type mismatches on column[{}], JDBC result type is UnsignedShort, please set the type to " + "one of int,bigint", + slot_desc->col_name())); + } + return TYPE_INT; } else if (java_class == "java.lang.Integer") { if (type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) { return Status::NotSupported( @@ -218,6 +246,13 @@ StatusOr JDBCScanner::_precheck_data_type(const std::string& java_c slot_desc->col_name())); } return TYPE_VARCHAR; + } else if (java_class == "com.clickhouse.data.value.UnsignedInteger") { + if (type != TYPE_BIGINT) { + return Status::NotSupported(fmt::format( + "Type mismatches on column[{}], JDBC result type is UnsignedInteger, please set the type to bigint", + slot_desc->col_name())); + } + return TYPE_BIGINT; } else if (java_class == "java.lang.Long") { if (type != TYPE_BIGINT) { return Status::NotSupported(fmt::format( @@ -232,6 +267,13 @@ StatusOr JDBCScanner::_precheck_data_type(const std::string& java_c slot_desc->col_name())); } return TYPE_VARCHAR; + } else if (java_class == "com.clickhouse.data.value.UnsignedLong") { + if (type != TYPE_LARGEINT) { + return Status::NotSupported(fmt::format( + "Type mismatches on column[{}], JDBC result type is UnsignedLong, please set the type to largeint", + slot_desc->col_name())); + } + return TYPE_VARCHAR; } else if (java_class == "java.lang.Boolean") { if (type != TYPE_BOOLEAN && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) { return Status::NotSupported( @@ -282,6 +324,13 @@ StatusOr JDBCScanner::_precheck_data_type(const std::string& java_c slot_desc->col_name())); } return TYPE_VARCHAR; + } else if (java_class == "java.time.LocalDate") { + if (type != TYPE_DATE) { + return Status::NotSupported(fmt::format( + "Type mismatches on column[{}], JDBC result type is LocalDate, please set the type to date", + slot_desc->col_name())); + } + return TYPE_VARCHAR; } else if (java_class == "java.math.BigDecimal") { if (type != TYPE_DECIMAL32 && type != TYPE_DECIMAL64 && type != TYPE_DECIMAL128 && type != TYPE_VARCHAR) { return Status::NotSupported( diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 5413e1dfcba67..d242f405a9967 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -793,6 +793,13 @@ under the License. postgresql + + + + com.clickhouse + clickhouse-jdbc + + com.mockrunner diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/JDBCTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/JDBCTable.java index 2940963887fdd..1edbf5ed66f17 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/JDBCTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/JDBCTable.java @@ -280,6 +280,8 @@ public enum ProtocolType { MYSQL, POSTGRES, ORACLE, - MARIADB + MARIADB, + + CLICKHOUSE } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolver.java b/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolver.java new file mode 100644 index 0000000000000..d30c828d58e97 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolver.java @@ -0,0 +1,151 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.jdbc; + +import com.google.common.collect.ImmutableSet; +import com.starrocks.catalog.PrimitiveType; +import com.starrocks.catalog.ScalarType; +import com.starrocks.catalog.Type; +import com.starrocks.connector.exception.StarRocksConnectorException; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class ClickhouseSchemaResolver extends JDBCSchemaResolver { + Map properties; + + public static final Set SUPPORTED_TABLE_TYPES = new HashSet<>( + Arrays.asList("LOG TABLE", "MEMORY TABLE", "TEMPORARY TABLE", "VIEW", "DICTIONARY", "SYSTEM TABLE", + "REMOTE TABLE", "TABLE")); + + public ClickhouseSchemaResolver(Map properties) { + this.properties = properties; + } + + @Override + public Collection listSchemas(Connection connection) { + try (ResultSet resultSet = connection.getMetaData().getSchemas()) { + ImmutableSet.Builder schemaNames = ImmutableSet.builder(); + while (resultSet.next()) { + String schemaName = resultSet.getString("TABLE_SCHEM"); + // skip internal schemas + if (!schemaName.equalsIgnoreCase("INFORMATION_SCHEMA") && !schemaName.equalsIgnoreCase("system")) { + schemaNames.add(schemaName); + } + } + return schemaNames.build(); + } catch (SQLException e) { + throw new StarRocksConnectorException(e.getMessage()); + } + } + + + @Override + public ResultSet getTables(Connection connection, String dbName) throws SQLException { + String tableTypes = properties.get("table_types"); + if (null != tableTypes) { + String[] tableTypesArray = tableTypes.split(","); + if (tableTypesArray.length == 0) { + throw new StarRocksConnectorException("table_types should be populated with table types separated by " + + "comma, e.g. 'TABLE,VIEW'. Currently supported type includes:" + + String.join(",", SUPPORTED_TABLE_TYPES)); + } + + for (String tt : tableTypesArray) { + if (!SUPPORTED_TABLE_TYPES.contains(tt)) { + throw new StarRocksConnectorException("Unsupported table type found: " + tt, + ",Currently supported table types includes:" + String.join(",", SUPPORTED_TABLE_TYPES)); + } + } + return connection.getMetaData().getTables(connection.getCatalog(), dbName, null, tableTypesArray); + } + return connection.getMetaData().getTables(connection.getCatalog(), dbName, null, + SUPPORTED_TABLE_TYPES.toArray(new String[SUPPORTED_TABLE_TYPES.size()])); + + } + + @Override + public ResultSet getColumns(Connection connection, String dbName, String tblName) throws SQLException { + return connection.getMetaData().getColumns(connection.getCatalog(), dbName, tblName, "%"); + } + + + @Override + public Type convertColumnType(int dataType, String typeName, int columnSize, int digits) { + PrimitiveType primitiveType; + switch (dataType) { + case Types.TINYINT: + primitiveType = PrimitiveType.TINYINT; + break; + case Types.SMALLINT: + primitiveType = PrimitiveType.SMALLINT; + break; + case Types.INTEGER: + primitiveType = PrimitiveType.INT; + break; + case Types.BIGINT: + primitiveType = PrimitiveType.BIGINT; + break; + case Types.NUMERIC: + primitiveType = PrimitiveType.LARGEINT; + break; + case Types.FLOAT: + primitiveType = PrimitiveType.FLOAT; + break; + case Types.DOUBLE: + primitiveType = PrimitiveType.DOUBLE; + break; + case Types.BOOLEAN: + primitiveType = PrimitiveType.BOOLEAN; + break; + case Types.VARCHAR: + return ScalarType.createVarcharType(65533); + case Types.DATE: + primitiveType = PrimitiveType.DATE; + break; + case Types.TIMESTAMP: + primitiveType = PrimitiveType.DATETIME; + break; + case Types.DECIMAL: + // Decimal(9,9), first 9 is precision, second 9 is scale + String[] precisionAndScale = + typeName.replace("Decimal", "").replace("(", "") + .replace(")", "").replace(" ", "") + .split(","); + if (precisionAndScale.length != 2) { + // should not go here, but if it does, we make it DECIMALV2. + throw new StarRocksConnectorException( + "Cannot extract precision and scale from Decimal typename:" + typeName); + } else { + int precision = Integer.parseInt(precisionAndScale[0]); + int scale = Integer.parseInt(precisionAndScale[1]); + return ScalarType.createUnifiedDecimalType(precision, scale); + } + default: + primitiveType = PrimitiveType.UNKNOWN_TYPE; + break; + } + return ScalarType.createType(primitiveType); + } + + +} diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/JDBCMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/JDBCMetadata.java index 5b190d49d7877..1240b35cbe225 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/JDBCMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/jdbc/JDBCMetadata.java @@ -80,6 +80,8 @@ public JDBCMetadata(Map properties, String catalogName, HikariDa schemaResolver = new PostgresSchemaResolver(); } else if (properties.get(JDBCResource.DRIVER_CLASS).toLowerCase().contains("mariadb")) { schemaResolver = new MysqlSchemaResolver(); + } else if (properties.get(JDBCResource.DRIVER_CLASS).toLowerCase().contains("clickhouse")) { + schemaResolver = new ClickhouseSchemaResolver(properties); } else { LOG.warn("{} not support yet", properties.get(JDBCResource.DRIVER_CLASS)); throw new StarRocksConnectorException(properties.get(JDBCResource.DRIVER_CLASS) + " not support yet"); diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolverTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolverTest.java new file mode 100644 index 0000000000000..7fbdf000bdcc8 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/connector/jdbc/ClickhouseSchemaResolverTest.java @@ -0,0 +1,245 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.jdbc; + +import com.google.common.collect.Lists; +import com.mockrunner.mock.jdbc.MockResultSet; +import com.starrocks.catalog.Column; +import com.starrocks.catalog.Database; +import com.starrocks.catalog.JDBCResource; +import com.starrocks.catalog.JDBCTable; +import com.starrocks.catalog.Table; +import com.zaxxer.hikari.HikariDataSource; +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.starrocks.catalog.JDBCResource.DRIVER_CLASS; +import static com.starrocks.connector.jdbc.ClickhouseSchemaResolver.SUPPORTED_TABLE_TYPES; + +public class ClickhouseSchemaResolverTest { + + @Mocked + HikariDataSource dataSource; + + @Mocked + Connection connection; + + private Map properties; + private MockResultSet dbResult; + private MockResultSet tableResult; + private MockResultSet columnResult; + + @Before + public void setUp() throws SQLException { + dbResult = new MockResultSet("catalog"); + dbResult.addColumn("TABLE_SCHEM", Arrays.asList("clickhouse", "template1", "test")); + tableResult = new MockResultSet("tables"); + tableResult.addColumn("TABLE_NAME", Arrays.asList("tbl1", "tbl2", "tbl3")); + columnResult = new MockResultSet("columns"); + columnResult.addColumn("DATA_TYPE", + Arrays.asList(Types.TINYINT, Types.SMALLINT, Types.SMALLINT, Types.INTEGER, Types.INTEGER, Types.BIGINT, + Types.BIGINT, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, + Types.FLOAT, + Types.DOUBLE, Types.BOOLEAN, Types.DATE, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, + Types.DECIMAL + )); + columnResult.addColumn("TYPE_NAME", Arrays.asList("Int8", "UInt8", "Int16", "UInt16", "Int32", "Int64", + "UInt32", "UInt64", "Int128", "UInt128", "Int256", "UInt256", "Float32", "Float64", "Bool", "Date", + "DateTime", + "String", "Nullable(String)", "Decimal(9,9)")); + columnResult.addColumn("COLUMN_SIZE", + Arrays.asList(3, 3, 5, 5, 10, 19, 10, 20, 39, 39, 77, 78, 12, 22, 1, 10, 29, 0, 0, 9)); + columnResult.addColumn("DECIMAL_DIGITS", + Arrays.asList(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null, 0, 0, null, null, 9)); + columnResult.addColumn("COLUMN_NAME", Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", + "m", "n", "o", "p", "q", "r", "s", "t")); + columnResult.addColumn("IS_NULLABLE", + Arrays.asList("NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", + "NO", "NO", "NO", "YES", "NO")); + properties = new HashMap<>(); + properties.put(DRIVER_CLASS, "com.clickhouse.jdbc.ClickHouseDriver"); + properties.put(JDBCResource.URI, "jdbc:clickhouse://127.0.0.1:8123"); + properties.put(JDBCResource.USER, "root"); + properties.put(JDBCResource.PASSWORD, "123456"); + properties.put(JDBCResource.CHECK_SUM, "xxxx"); + properties.put(JDBCResource.DRIVER_URL, "xxxx"); + } + + @Test + public void testListDatabaseNames() throws SQLException { + new Expectations() { + { + dataSource.getConnection(); + result = connection; + minTimes = 0; + + connection.getMetaData().getSchemas(); + result = dbResult; + minTimes = 0; + } + }; + try { + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + List result = jdbcMetadata.listDbNames(); + List expectResult = Lists.newArrayList("clickhouse", "template1", "test"); + Assert.assertEquals(expectResult, result); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testGetDb() throws SQLException { + new Expectations() { + { + dataSource.getConnection(); + result = connection; + minTimes = 0; + + connection.getMetaData().getSchemas(); + result = dbResult; + minTimes = 0; + } + }; + try { + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + Database db = jdbcMetadata.getDb("test"); + Assert.assertEquals("test", db.getOriginName()); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testListTableNames() throws SQLException { + new Expectations() { + { + dataSource.getConnection(); + result = connection; + minTimes = 0; + + connection.getCatalog(); + result = "t1"; + minTimes = 0; + + connection.getMetaData().getTables("t1", "test", null, + SUPPORTED_TABLE_TYPES.toArray(new String[SUPPORTED_TABLE_TYPES.size()])); + result = tableResult; + minTimes = 0; + } + }; + try { + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "t1", dataSource); + List result = jdbcMetadata.listTableNames("test"); + List expectResult = Lists.newArrayList("tbl1", "tbl2", "tbl3"); + Assert.assertEquals(expectResult, result); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testGetTables() throws SQLException { + new Expectations() { + { + dataSource.getConnection(); + result = connection; + minTimes = 0; + + connection.getCatalog(); + result = "catalog"; + minTimes = 0; + + connection.getMetaData().getTables("catalog", "test", null, + SUPPORTED_TABLE_TYPES.toArray(new String[SUPPORTED_TABLE_TYPES.size()])); + result = tableResult; + minTimes = 0; + } + }; + + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + List result = jdbcMetadata.listTableNames("test"); + List expectResult = Lists.newArrayList("tbl1", "tbl2", "tbl3"); + Assert.assertEquals(expectResult, result); + + } + + @Test + public void testGetTable() throws SQLException { + new Expectations() { + { + dataSource.getConnection(); + result = connection; + minTimes = 0; + + connection.getCatalog(); + result = "t1"; + minTimes = 0; + + connection.getMetaData().getColumns("t1", "test", "tbl1", "%"); + result = columnResult; + minTimes = 0; + } + }; + try { + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + Table table = jdbcMetadata.getTable("test", "tbl1"); + Assert.assertTrue(table instanceof JDBCTable); + Assert.assertEquals("catalog.test.tbl1", table.getUUID()); + Assert.assertEquals("tbl1", table.getName()); + Assert.assertNull(properties.get(JDBCTable.JDBC_TABLENAME)); + ClickhouseSchemaResolver clickhouseSchemaResolver = new ClickhouseSchemaResolver(properties); + ResultSet columnSet = clickhouseSchemaResolver.getColumns(connection, "test", "tbl1"); + List fullSchema = clickhouseSchemaResolver.convertToSRTable(columnSet); + Table table1 = clickhouseSchemaResolver.getTable(1, "tbl1", fullSchema, "test", "catalog", properties); + Assert.assertTrue(table1 instanceof JDBCTable); + Assert.assertNull(properties.get(JDBCTable.JDBC_TABLENAME)); + } catch (Exception e) { + System.out.println(e.getMessage()); + Assert.fail(); + } + } + + @Test + public void testListSchemas() throws SQLException { + new Expectations() { + { + dataSource.getConnection(); + result = connection; + minTimes = 0; + + connection.getMetaData().getSchemas(); + result = dbResult; + minTimes = 0; + } + }; + JDBCMetadata jdbcMetadata = new JDBCMetadata(properties, "catalog", dataSource); + List result = jdbcMetadata.listDbNames(); + List expectResult = Lists.newArrayList("clickhouse", "template1", "test"); + Assert.assertEquals(expectResult, result); + } +} diff --git a/fe/pom.xml b/fe/pom.xml index e7393f60042e7..b76e1ba661e1a 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -717,6 +717,14 @@ under the License. 42.4.4 + + + + com.clickhouse + clickhouse-jdbc + 0.4.6 + + com.mockrunner diff --git a/java-extensions/jdbc-bridge/pom.xml b/java-extensions/jdbc-bridge/pom.xml index af739c4e7822e..127ca0a079433 100644 --- a/java-extensions/jdbc-bridge/pom.xml +++ b/java-extensions/jdbc-bridge/pom.xml @@ -36,6 +36,17 @@ log4j-slf4j-impl ${log4j.version} + + + + + org.lz4 + lz4-java + 1.8.0 + + + + diff --git a/java-extensions/jdbc-bridge/src/main/java/com/starrocks/jdbcbridge/JDBCScanner.java b/java-extensions/jdbc-bridge/src/main/java/com/starrocks/jdbcbridge/JDBCScanner.java index 77904b6d5b0e8..c2522465ecaa2 100644 --- a/java-extensions/jdbc-bridge/src/main/java/com/starrocks/jdbcbridge/JDBCScanner.java +++ b/java-extensions/jdbc-bridge/src/main/java/com/starrocks/jdbcbridge/JDBCScanner.java @@ -29,15 +29,17 @@ import java.sql.ResultSetMetaData; import java.sql.Time; import java.sql.Timestamp; +import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; - public class JDBCScanner { private String driverLocation; private HikariDataSource dataSource; @@ -51,7 +53,6 @@ public class JDBCScanner { private int resultNumRows = 0; ClassLoader classLoader; - public JDBCScanner(String driverLocation, JDBCScanContext scanContext) { this.driverLocation = driverLocation; this.scanContext = scanContext; @@ -61,9 +62,7 @@ public void open() throws Exception { String key = scanContext.getUser() + "/" + scanContext.getJdbcURL(); URL driverURL = new File(driverLocation).toURI().toURL(); DataSourceCache.DataSourceCacheItem cacheItem = DataSourceCache.getInstance().getSource(key, () -> { - ClassLoader classLoader = URLClassLoader.newInstance(new URL[] { - driverURL, - }); + ClassLoader classLoader = URLClassLoader.newInstance(new URL[] {driverURL}); Thread.currentThread().setContextClassLoader(classLoader); HikariConfig config = new HikariConfig(); config.setDriverClassName(scanContext.getDriverClassName()); @@ -84,7 +83,8 @@ public void open() throws Exception { connection = dataSource.getConnection(); connection.setAutoCommit(false); - statement = connection.prepareStatement(scanContext.getSql(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement = connection.prepareStatement(scanContext.getSql(), ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY); if (scanContext.getDriverClassName().toLowerCase(Locale.ROOT).contains("mysql")) { statement.setFetchSize(Integer.MIN_VALUE); } else { @@ -100,32 +100,36 @@ public void open() throws Exception { Class clazz = classLoader.loadClass(resultSetMetaData.getColumnClassName(i)); if (isGeneralJDBCClassType(clazz)) { resultChunk.add((Object[]) Array.newInstance(clazz, scanContext.getStatementFetchSize())); + } else if (null != mapEngineSpecificClassType(clazz)) { + Class targetClass = mapEngineSpecificClassType(clazz); + resultChunk.add((Object[]) Array.newInstance(targetClass, scanContext.getStatementFetchSize())); } else { resultChunk.add((Object[]) Array.newInstance(String.class, scanContext.getStatementFetchSize())); } } } - private static final Set> GENERAL_JDBC_CLASS_SET = new HashSet<>(Arrays.asList( - Boolean.class, - Short.class, - Integer.class, - Long.class, - Float.class, - Double.class, - BigInteger.class, - BigDecimal.class, - java.sql.Date.class, - Timestamp.class, - LocalDateTime.class, - Time.class, - String.class - )); + private static final Set> GENERAL_JDBC_CLASS_SET = new HashSet<>( + Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class, + BigInteger.class, BigDecimal.class, java.sql.Date.class, Timestamp.class, LocalDate.class, + LocalDateTime.class, Time.class, String.class)); private boolean isGeneralJDBCClassType(Class clazz) { return GENERAL_JDBC_CLASS_SET.contains(clazz); } + private static final Map ENGINE_SPECIFIC_CLASS_MAPPING = new HashMap() {{ + put("com.clickhouse.data.value.UnsignedByte", Short.class); + put("com.clickhouse.data.value.UnsignedShort", Integer.class); + put("com.clickhouse.data.value.UnsignedInteger", Long.class); + put("com.clickhouse.data.value.UnsignedLong", BigInteger.class); + }}; + + private Class mapEngineSpecificClassType(Class clazz) { + String className = clazz.getName(); + return ENGINE_SPECIFIC_CLASS_MAPPING.get(className); + } + // used for cpp interface public List getResultColumnClassNames() { return resultColumnClassNames; @@ -165,8 +169,12 @@ public List getNextChunk() throws Exception { // if both sides are String, assign value directly to avoid additional calls to getString dataColumn[resultNumRows] = resultObject; } else if (!(dataColumn instanceof String[])) { - // for other general class type, assign value directly - dataColumn[resultNumRows] = resultObject; + if (dataColumn instanceof BigInteger[] && resultObject instanceof Number) { + dataColumn[resultNumRows] = new BigInteger(resultObject.toString()); + } else { + // for other general class type, assign value directly + dataColumn[resultNumRows] = resultObject; + } } else { // for non-general class type, use string representation dataColumn[resultNumRows] = resultSet.getString(i + 1); @@ -181,7 +189,6 @@ public int getResultNumRows() { return resultNumRows; } - public void close() throws Exception { if (resultSet != null) { resultSet.close(); diff --git a/java-extensions/udf-extensions/src/main/java/com/starrocks/udf/UDFHelper.java b/java-extensions/udf-extensions/src/main/java/com/starrocks/udf/UDFHelper.java index da28ad1567cb4..1273f8530c359 100644 --- a/java-extensions/udf-extensions/src/main/java/com/starrocks/udf/UDFHelper.java +++ b/java-extensions/udf-extensions/src/main/java/com/starrocks/udf/UDFHelper.java @@ -30,6 +30,7 @@ import java.sql.Timestamp; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; @@ -49,6 +50,7 @@ public class UDFHelper { public static final int TYPE_ARRAY = 19; public static final int TYPE_BOOLEAN = 24; public static final int TYPE_TIME = 44; + public static final int TYPE_DATE = 50; public static final int TYPE_DATETIME = 51; private static final byte[] emptyBytes = new byte[0]; @@ -56,6 +58,8 @@ public class UDFHelper { private static final ThreadLocal formatter = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private static final TimeZone timeZone = TimeZone.getDefault(); private static void getBooleanBoxedResult(int numRows, Boolean[] boxedArr, long columnAddr) { @@ -226,6 +230,17 @@ private static void getStringDateResult(int numRows, Date[] column, long columnA getStringBoxedResult(numRows, results, columnAddr); } + private static void getStringLocalDateResult(int numRows, LocalDate[] column, long columnAddr) { + // TODO: return timestamp + String[] results = new String[numRows]; + for (int i = 0; i < numRows; i++) { + if (column[i] != null) { + results[i] = dateFormatter.format(column[i]); + } + } + getStringBoxedResult(numRows, results, columnAddr); + } + private static void getStringTimeStampResult(int numRows, Timestamp[] column, long columnAddr) { // TODO: return timestamp String[] results = new String[numRows]; @@ -326,6 +341,8 @@ public static void getResultFromBoxedArray(int type, int numRows, Object boxedRe case TYPE_VARCHAR: { if (boxedResult instanceof Date[]) { getStringDateResult(numRows, (Date[]) boxedResult, columnAddr); + } else if (boxedResult instanceof LocalDate[]) { + getStringLocalDateResult(numRows, (LocalDate[]) boxedResult, columnAddr); } else if (boxedResult instanceof LocalDateTime[]) { getStringDateTimeResult(numRows, (LocalDateTime[]) boxedResult, columnAddr); } else if (boxedResult instanceof Timestamp[]) {