diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java index 960a54b09b..2d180c496e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java @@ -76,7 +76,7 @@ public class LabelCommonConfig { CommonVars.apply("wds.linkis.nebula.engine.version", "3.0.0"); public static final CommonVars DORIS_ENGINE_VERSION = - CommonVars.apply("wds.linkis.doris.engine.version", "1.2.6"); + CommonVars.apply("linkis.doris.engine.version", "1.2.6"); public static final CommonVars PRESTO_ENGINE_VERSION = CommonVars.apply("wds.linkis.presto.engine.version", "0.234"); diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java index acba5a4dc6..2f454903a3 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java @@ -22,7 +22,7 @@ public class DorisConfiguration { public static final CommonVars ENGINE_CONCURRENT_LIMIT = - CommonVars.apply("linkis.engineconn.concurrent.limit", 100); + CommonVars.apply("linkis.engineconn.doris.concurrent.limit", 100); public static final CommonVars ENGINE_DEFAULT_LIMIT = CommonVars.apply("linkis.doris.default.limit", 5000); diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java index 80a3febb47..6bd991c77b 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java @@ -23,13 +23,18 @@ public enum DorisErrorCodeSummary implements LinkisErrorCode { CHECK_DORIS_PARAMETER_FAILED(28501, "Failed to check the doris parameter(doris参数检查失败)"), DORIS_TEST_CONNECTION_FAILED(28502, "The doris test connection failed(doris测试连接失败)"), - - DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK( - 28503, "The doris stream load file path cannot be empty(doris stream load file path不能为空)"), DORIS_STREAM_LOAD_FILE_PATH_NOT_FILE( - 28504, "The doris stream load file path must be a file(doris stream load file path必须是一个文件)"), + 28503, "The doris stream load file path must be a file(doris stream load file path必须是一个文件)"), + DORIS_CODE_IS_NOT_BLANK(28504, "Doris engine code cannot be empty(Doris引擎代码不能为空)"), + + DORIS_CODE_FAILED_TO_CONVERT_JSON( + 28505, "Doris code Failed to convert json(Doris code 转换json失败)"), + + DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK( + 28506, "Doris required Parameter cannot be empty(Doris必填参数不能为空)"), + DORIS_STREAM_LOAD_FILE_PATH_NOT_SUPPORTED_TYPE_FILE( - 28505, + 28507, "The doris stream load file path This file type is not currently supported(doris stream load file path目前不支持该文件类型)"); private final int errorCode; diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java index 52fb0b5f56..078b638367 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java @@ -112,6 +112,8 @@ public class DorisEngineConnExecutor extends ConcurrentComputationExecutor { private String dorisTable; private String dorisUsername; private String dorisPassword; + + private String dorisStreamLoadFilePath; private Integer dorisHttpPort; private CloseableHttpClient client; @@ -172,6 +174,18 @@ protected boolean isRedirectable(String method) { @Override public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) { + String realCode; + if (StringUtils.isBlank(code)) { + throw new DorisException( + DorisErrorCodeSummary.DORIS_CODE_IS_NOT_BLANK.getErrorCode(), + DorisErrorCodeSummary.DORIS_CODE_IS_NOT_BLANK.getErrorDesc()); + } else { + realCode = code.trim(); + } + logger.info("Doris engine begins to run code:\n {}", realCode); + + checkRequiredParameter(code); + String testConnectionUrl = String.format(DORIS_URL_BOOTSTRAP, dorisHost, dorisHttpPort); if (!testConnection(testConnectionUrl)) { @@ -283,8 +297,6 @@ private CloseableHttpResponse streamLoad(EngineExecutionContext engineExecutorCo DorisUtils.getDorisCloumns( dorisHost, dorisJdbcPort, dorisUsername, dorisPassword, dorisDatabase, dorisTable); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(dorisCloumns)) { - // httpPut.setHeader(COLUMNS, String.join(",", dorisCloumns.stream().map(f -> - // String.format("`%s`", f)).collect(Collectors.toList()))); dorisColumns = String.join( ",", @@ -309,15 +321,6 @@ private CloseableHttpResponse streamLoad(EngineExecutionContext engineExecutorCo httpPut.setHeader(LABEL, dorisLabel); logger.info("doris set param {} : {}", LABEL, dorisLabel); - String dorisStreamLoadFilePath = - DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.getValue(configMap); - - if (StringUtils.isBlank(dorisStreamLoadFilePath)) { - throw new DorisStreamLoadFileException( - DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK.getErrorCode(), - DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK.getErrorDesc()); - } - File dorisStreamLoadFile = new File(dorisStreamLoadFilePath); if (!dorisStreamLoadFile.isFile()) { throw new DorisStreamLoadFileException( @@ -384,14 +387,10 @@ private boolean isSupportedType(String fileExtension) { private void checkParameter() { String dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap); - String dorisDatabase = DorisConfiguration.DORIS_DATABASE.getValue(configMap); - String dorisTable = DorisConfiguration.DORIS_TABLE.getValue(configMap); String dorisUsername = DorisConfiguration.DORIS_USER_NAME.getValue(configMap); Integer dorisHttpPort = DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap); if (StringUtils.isBlank(dorisHost) - || StringUtils.isBlank(dorisDatabase) - || StringUtils.isBlank(dorisTable) || StringUtils.isBlank(dorisUsername) || dorisHttpPort == null) { logger.error("Doris check param failed."); @@ -401,13 +400,51 @@ private void checkParameter() { } this.dorisHost = dorisHost; - this.dorisDatabase = dorisDatabase; - this.dorisTable = dorisTable; this.dorisUsername = dorisUsername; this.dorisHttpPort = dorisHttpPort; this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap); } + private void checkRequiredParameter(String code) { + Map codeMap = new HashMap<>(); + + try { + codeMap = + JsonUtils.jackson().readValue(code, new TypeReference>() {}); + } catch (JsonProcessingException e) { + throw new DorisException( + DorisErrorCodeSummary.DORIS_CODE_FAILED_TO_CONVERT_JSON.getErrorCode(), + DorisErrorCodeSummary.DORIS_CODE_FAILED_TO_CONVERT_JSON.getErrorDesc()); + } + + String dorisStreamLoadFilePath = + codeMap.getOrDefault(DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(), ""); + String dorisDatabase = codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), ""); + String dorisTable = codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), ""); + + if (StringUtils.isBlank(dorisStreamLoadFilePath) + || StringUtils.isBlank(dorisDatabase) + || StringUtils.isBlank(dorisTable)) { + logger.error( + "Check whether `{}`, `{}`, and `{}` are included in code json", + DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(), + DorisConfiguration.DORIS_DATABASE.key(), + DorisConfiguration.DORIS_TABLE.key()); + throw new DorisException( + DorisErrorCodeSummary.DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK.getErrorCode(), + DorisErrorCodeSummary.DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK.getErrorDesc()); + } + + this.dorisStreamLoadFilePath = dorisStreamLoadFilePath; + this.dorisDatabase = dorisDatabase; + this.dorisTable = dorisTable; + logger.info( + "Doris parameter dorisStreamLoadFilePath: {}, dorisDatabase: {}, dorisTable: {}.", + this.dorisStreamLoadFilePath, + this.dorisDatabase, + this.dorisTable); + } + private boolean isSuccess(Map map) { if (org.apache.commons.collections.MapUtils.isEmpty(map)) { return false; diff --git a/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml index 2cd3e264c3..b6f2fc895c 100644 --- a/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml +++ b/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml @@ -6,22 +6,26 @@ ~ The ASF licenses this file to You under the Apache License, Version 2.0 ~ (the "License"); you may not use this file except in compliance with ~ the License. You may obtain a copy of the License at - ~ + ~ ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ + ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - + - - - - + + + + + + + @@ -41,9 +45,9 @@ - + - + @@ -87,5 +91,5 @@ - +