From 59e526def1247f87cf70e48fb3d0175bf4c40d32 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 29 Dec 2023 10:56:36 +0800 Subject: [PATCH] Doris ec supports dataSources (#5060) * Doris ec supports dataSources * Optimized code --- .../doris/conf/DorisConfiguration.java | 6 + .../doris/constant/DorisConstant.java | 14 ++ .../doris/executor/DorisDatasourceParser.java | 117 +++++++++++++++++ .../executor/DorisEngineConnExecutor.java | 120 ++++++++++++++---- 4 files changed, 234 insertions(+), 23 deletions(-) create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisDatasourceParser.java diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java index 24b80f7345..4d691a9c30 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java @@ -72,6 +72,12 @@ public class DorisConfiguration { public static final CommonVars DORIS_PASSWORD = CommonVars.apply("linkis.ec.doris.password", ""); + public static final CommonVars DORIS_DATASOURCE = + CommonVars.apply("linkis.ec.doris.datasource", ""); + + public static final CommonVars DORIS_DATASOURCE_SYSTEM_QUERY_PARAM = + CommonVars.apply("linkis.ec.doris.datasource.systemQueryParam", ""); + public static final CommonVars DORIS_RECONNECT_ENABLED = CommonVars.apply("linkis.ec.doris.2pc.enabled", false, "two phase commit Whether to enable"); diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java index 771d51a67c..6eb556c888 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java @@ -56,4 +56,18 @@ public class DorisConstant { public static final String TXN_OPERATION = "txn_operation"; public static final Integer HTTP_SUCCEED = 200; + + public static final String DS_JDBC_HOST = "host"; + + public static final String DS_JDBC_PORT = "port"; + + public static final String DS_JDBC_DB_NAME = "databaseName"; + + public static final String DS_JDBC_DB_INSTANCE = "instance"; + + public static final String DS_JDBC_USERNAME = "username"; + + public static final String DS_JDBC_PASSWORD = "password"; + + public static final String DS_JDBC_PASSWORD_HIDE_VALUE = "******"; } diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisDatasourceParser.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisDatasourceParser.java new file mode 100644 index 0000000000..b9a0986d21 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisDatasourceParser.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.executor; + +import org.apache.linkis.common.utils.JsonUtils; +import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient; +import org.apache.linkis.datasource.client.request.GetInfoPublishedByDataSourceNameAction; +import org.apache.linkis.datasourcemanager.common.domain.DataSource; +import org.apache.linkis.engineplugin.doris.constant.DorisConstant; + +import org.apache.commons.collections.MapUtils; + +import java.util.HashMap; +import java.util.Map; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DorisDatasourceParser { + private static final Logger logger = LoggerFactory.getLogger(DorisDatasourceParser.class); + + public static Map queryDatasourceInfoByName( + String datasourceName, String username, String system) { + logger.info( + "Starting query [" + + system + + ", " + + username + + ", " + + datasourceName + + "] datasource info ......"); + LinkisDataSourceRemoteClient dataSourceClient = new LinkisDataSourceRemoteClient(); + DataSource dataSource = + dataSourceClient + .getInfoPublishedByDataSourceName( + GetInfoPublishedByDataSourceNameAction.builder() + .setSystem(system) + .setDataSourceName(datasourceName) + .setUser(username) + .build()) + .getDataSource(); + + return queryDatasourceParam(datasourceName, dataSource); + } + + private static Map queryDatasourceParam( + String datasourceName, DataSource dataSource) { + Map paramMap = new HashMap<>(); + + if (dataSource == null) { + logger.warn("Doris dataSource is null: {}", datasourceName); + return paramMap; + } + + if (dataSource.isExpire()) { + logger.warn("Doris dataSource of datasource name: {} is expired", datasourceName); + return paramMap; + } + + Map connectParams = dataSource.getConnectParams(); + if (MapUtils.isEmpty(connectParams)) { + logger.warn("Doris dataSource connectParams is empty: {}", datasourceName); + return paramMap; + } + + paramMap.put( + DorisConstant.DS_JDBC_HOST, + String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_HOST, ""))); + paramMap.put( + DorisConstant.DS_JDBC_PORT, + String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_PORT, ""))); + paramMap.put( + DorisConstant.DS_JDBC_USERNAME, + String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_USERNAME, ""))); + paramMap.put( + DorisConstant.DS_JDBC_PASSWORD, + String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_PASSWORD, ""))); + paramMap.put( + DorisConstant.DS_JDBC_DB_NAME, + String.valueOf( + connectParams.getOrDefault( + DorisConstant.DS_JDBC_DB_NAME, + connectParams.getOrDefault(DorisConstant.DS_JDBC_DB_INSTANCE, "")))); + + try { + HashMap printMap = new HashMap<>(); + printMap.putAll(paramMap); + + // To hide the password and prevent leaks + if (printMap.containsKey(DorisConstant.DS_JDBC_PASSWORD)) { + printMap.put(DorisConstant.DS_JDBC_PASSWORD, DorisConstant.DS_JDBC_PASSWORD_HIDE_VALUE); + } + String printMapString = JsonUtils.jackson().writeValueAsString(printMap); + logger.info("Load dataSource param: {}", printMapString); + } catch (JsonProcessingException e) { + + } + + return paramMap; + } +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java index 8d6a494560..dd0a88dddf 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java @@ -29,6 +29,7 @@ import org.apache.linkis.engineconn.core.EngineConnObject; import org.apache.linkis.engineplugin.doris.conf.DorisConfiguration; import org.apache.linkis.engineplugin.doris.conf.DorisEngineConf; +import org.apache.linkis.engineplugin.doris.constant.DorisConstant; import org.apache.linkis.engineplugin.doris.errorcode.DorisErrorCodeSummary; import org.apache.linkis.engineplugin.doris.exception.DorisException; import org.apache.linkis.engineplugin.doris.exception.DorisParameterException; @@ -109,12 +110,17 @@ public class DorisEngineConnExecutor extends ConcurrentComputationExecutor { private String dorisHost; private String dorisDatabase; + + private String datasourceDatabase; private String dorisTable; private String dorisUsername; private String dorisPassword; private String dorisStreamLoadFilePath; private Integer dorisHttpPort; + + private Integer dorisJdbcPort; + private CloseableHttpClient client; public DorisEngineConnExecutor(int outputPrintLimit, int id) { @@ -157,8 +163,6 @@ public ExecuteResponse execute(EngineConnTask engineConnTask) { .forEach(entry -> configMap.put(entry.getKey(), String.valueOf(entry.getValue()))); } - checkParameter(); - this.client = HttpClients.custom() .setRedirectStrategy( @@ -174,6 +178,8 @@ protected boolean isRedirectable(String method) { @Override public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) { + checkParameter(engineExecutorContext); + String realCode; if (StringUtils.isBlank(code)) { throw new DorisException( @@ -292,7 +298,6 @@ private CloseableHttpResponse streamLoad(EngineExecutionContext engineExecutorCo String dorisColumns = DorisConfiguration.DORIS_COLUMNS.getValue(configMap); if (StringUtils.isBlank(dorisColumns)) { - Integer dorisJdbcPort = DorisConfiguration.DORIS_JDBC_PORT.getValue(configMap); List dorisCloumns = DorisUtils.getDorisCloumns( dorisHost, dorisJdbcPort, dorisUsername, dorisPassword, dorisDatabase, dorisTable); @@ -385,24 +390,90 @@ private boolean isSupportedType(String fileExtension) { return false; } - private void checkParameter() { - String dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap); - String dorisUsername = DorisConfiguration.DORIS_USER_NAME.getValue(configMap); - Integer dorisHttpPort = DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap); + private void checkParameter(EngineExecutionContext engineExecutorContext) { + this.dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap); + this.dorisUsername = DorisConfiguration.DORIS_USER_NAME.getValue(configMap); + this.dorisHttpPort = DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap); + this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap); + this.dorisJdbcPort = DorisConfiguration.DORIS_JDBC_PORT.getValue(configMap); - if (StringUtils.isBlank(dorisHost) - || StringUtils.isBlank(dorisUsername) - || dorisHttpPort == null) { + String dorisDatasourceName = DorisConfiguration.DORIS_DATASOURCE.getValue(configMap); + + // Data source parameters fail to be obtained, and task running cannot be affected + // The datasource param overwrites the DorisConfiguration param + try { + if (StringUtils.isNotBlank(dorisDatasourceName)) { + String dorisSystemQueryParam = + DorisConfiguration.DORIS_DATASOURCE_SYSTEM_QUERY_PARAM.getValue(configMap); + String execSqlUser = getExecSqlUser(engineExecutorContext); + + Map dataSourceParamMap = + DorisDatasourceParser.queryDatasourceInfoByName( + dorisDatasourceName, execSqlUser, dorisSystemQueryParam); + + if (MapUtils.isNotEmpty(dataSourceParamMap)) { + if (dataSourceParamMap.containsKey(DS_JDBC_HOST) + && StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_HOST))) { + this.dorisHost = dataSourceParamMap.get(DS_JDBC_HOST); + } + + if (dataSourceParamMap.containsKey(DS_JDBC_USERNAME) + && StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_USERNAME))) { + this.dorisUsername = dataSourceParamMap.get(DS_JDBC_USERNAME); + } + + if (dataSourceParamMap.containsKey(DS_JDBC_PASSWORD)) { + this.dorisPassword = dataSourceParamMap.get(DS_JDBC_PASSWORD); + } + + if (dataSourceParamMap.containsKey(DS_JDBC_PORT) + && StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_PORT))) { + this.dorisJdbcPort = Integer.valueOf(dataSourceParamMap.get(DS_JDBC_PORT)); + } + + if (dataSourceParamMap.containsKey(DS_JDBC_DB_NAME) + && StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_DB_NAME))) { + this.datasourceDatabase = dataSourceParamMap.get(DS_JDBC_DB_NAME); + } + } else { + logger.warn( + "Doris dataSource {} param is null, Skip get doris dataSource parameters", + dorisDatasourceName); + } + } + } catch (Exception e) { + logger.error("get doris dataSource {} param failed", dorisDatasourceName, e); + } + + if (StringUtils.isBlank(this.dorisHost) + || StringUtils.isBlank(this.dorisUsername) + || this.dorisHttpPort == null) { logger.error("Doris check param failed."); throw new DorisParameterException( DorisErrorCodeSummary.CHECK_DORIS_PARAMETER_FAILED.getErrorCode(), DorisErrorCodeSummary.CHECK_DORIS_PARAMETER_FAILED.getErrorDesc()); } - this.dorisHost = dorisHost; - this.dorisUsername = dorisUsername; - this.dorisHttpPort = dorisHttpPort; - this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap); + logger.info( + "Doris parameter dorisHost: {}, dorisUsername: {}, dorisPassword: {}, dorisHttpPort: {}, dorisJdbcPort: {}.", + this.dorisHost, + this.dorisUsername, + DorisConstant.DS_JDBC_PASSWORD_HIDE_VALUE, + this.dorisHttpPort, + this.dorisJdbcPort); + } + + private String getExecSqlUser(EngineExecutionContext engineExecutionContext) { + UserCreatorLabel userCreatorLabel = + (UserCreatorLabel) + Arrays.stream(engineExecutionContext.getLabels()) + .filter(label -> label instanceof UserCreatorLabel) + .findFirst() + .orElse(null); + if (userCreatorLabel != null) { + return userCreatorLabel.getUser(); + } + return null; } private void checkRequiredParameter(String code) { @@ -417,14 +488,20 @@ private void checkRequiredParameter(String code) { DorisErrorCodeSummary.DORIS_CODE_FAILED_TO_CONVERT_JSON.getErrorDesc()); } - String dorisStreamLoadFilePath = + this.dorisStreamLoadFilePath = codeMap.getOrDefault(DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(), ""); - String dorisDatabase = codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), ""); - String dorisTable = codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), ""); + this.dorisTable = codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), ""); + this.dorisDatabase = codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), ""); + + String dorisDatasourceName = DorisConfiguration.DORIS_DATASOURCE.getValue(configMap); + // The datasource param overwrites the DorisConfiguration param + if (StringUtils.isNotBlank(dorisDatasourceName) && StringUtils.isNotBlank(datasourceDatabase)) { + this.dorisDatabase = datasourceDatabase; + } - if (StringUtils.isBlank(dorisStreamLoadFilePath) - || StringUtils.isBlank(dorisDatabase) - || StringUtils.isBlank(dorisTable)) { + if (StringUtils.isBlank(this.dorisStreamLoadFilePath) + || StringUtils.isBlank(this.dorisDatabase) + || StringUtils.isBlank(this.dorisTable)) { logger.error( "Check whether `{}`, `{}`, and `{}` are included in code json", DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(), @@ -435,9 +512,6 @@ private void checkRequiredParameter(String code) { DorisErrorCodeSummary.DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK.getErrorDesc()); } - this.dorisStreamLoadFilePath = dorisStreamLoadFilePath; - this.dorisDatabase = dorisDatabase; - this.dorisTable = dorisTable; logger.info( "Doris parameter dorisStreamLoadFilePath: {}, dorisDatabase: {}, dorisTable: {}.", this.dorisStreamLoadFilePath,