From bf65c384bef08aefebc84cf02df5f976ba1d4abe Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 22 Dec 2023 16:26:24 +0800 Subject: [PATCH 1/5] [Feature] Add doris engine to linkis --- .../ujes/jdbc/LinkisSQLConnection.scala | 1 + .../manager/am/conf/AMConfiguration.java | 6 +- .../manager/label/conf/LabelCommonConfig.java | 3 + .../label/entity/engine/EngineType.scala | 3 + .../manager/label/entity/engine/RunType.scala | 1 + .../label/utils/EngineTypeLabelCreator.java | 2 + linkis-engineconn-plugins/doris/pom.xml | 110 ++++ .../doris/src/main/assembly/distribution.xml | 71 ++ .../doris/DorisEngineConnPlugin.java | 72 ++ .../DorisProcessEngineConnLaunchBuilder.java | 22 + .../doris/conf/DorisConfiguration.java | 81 +++ .../doris/conf/DorisEngineConf.java | 53 ++ .../doris/constant/DorisConstant.java | 59 ++ .../errorcode/DorisErrorCodeSummary.java | 54 ++ .../doris/exception/DorisException.java | 27 + .../exception/DorisParameterException.java | 27 + .../DorisStreamLoadFileException.java | 27 + .../executor/DorisEngineConnExecutor.java | 616 ++++++++++++++++++ .../engineplugin/doris/util/DorisUtils.java | 107 +++ .../resources/linkis-engineconn.properties | 23 + .../doris/src/main/resources/log4j2.xml | 91 +++ .../factory/DorisEngineConnFactory.scala | 44 ++ linkis-engineconn-plugins/pom.xml | 1 + pom.xml | 1 + 24 files changed, 1499 insertions(+), 3 deletions(-) create mode 100644 linkis-engineconn-plugins/doris/pom.xml create mode 100644 linkis-engineconn-plugins/doris/src/main/assembly/distribution.xml create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/DorisEngineConnPlugin.java create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/builder/DorisProcessEngineConnLaunchBuilder.java create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisEngineConf.java create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisException.java create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisParameterException.java create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisStreamLoadFileException.java create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java create mode 100644 linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/util/DorisUtils.java create mode 100644 linkis-engineconn-plugins/doris/src/main/resources/linkis-engineconn.properties create mode 100644 linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml create mode 100644 linkis-engineconn-plugins/doris/src/main/scala/org/apache/linkis/engineplugin/doris/factory/DorisEngineConnFactory.scala diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala index 7b3e2ada8c..80298bcb1b 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala @@ -451,6 +451,7 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope case EngineType.SPARK => RunType.SQL case EngineType.HIVE => RunType.HIVE case EngineType.REPL => RunType.REPL + case EngineType.DORIS => RunType.DORIS case EngineType.TRINO => RunType.TRINO_SQL case EngineType.PRESTO => RunType.PRESTO_SQL case EngineType.NEBULA => RunType.NEBULA_SQL diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java index 0f018ca9de..80b433b3f2 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java @@ -69,7 +69,7 @@ public class AMConfiguration { public static final CommonVars MULTI_USER_ENGINE_TYPES = CommonVars.apply( "wds.linkis.multi.user.engine.types", - "jdbc,es,presto,io_file,appconn,openlookeng,trino,nebula,hbase"); + "jdbc,es,presto,io_file,appconn,openlookeng,trino,nebula,hbase,doris"); public static final CommonVars ALLOW_BATCH_KILL_ENGINE_TYPES = CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "spark,hive,python"); @@ -105,8 +105,8 @@ public class AMConfiguration { public static String getDefaultMultiEngineUser() { String jvmUser = Utils.getJvmUser(); return String.format( - "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", nebula:\"%s\", hbase:\"%s\",io_file:\"root\"}", - jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser); + "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", nebula:\"%s\",doris:\"%s\", hbase:\"%s\",io_file:\"root\"}", + jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser); } public static boolean isMultiUserEngine(String engineType) { diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java index 16a23c773b..960a54b09b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java @@ -75,6 +75,9 @@ public class LabelCommonConfig { public static final CommonVars NEBULA_ENGINE_VERSION = CommonVars.apply("wds.linkis.nebula.engine.version", "3.0.0"); + public static final CommonVars DORIS_ENGINE_VERSION = + CommonVars.apply("wds.linkis.doris.engine.version", "1.2.6"); + public static final CommonVars PRESTO_ENGINE_VERSION = CommonVars.apply("wds.linkis.presto.engine.version", "0.234"); diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala index c5c44017e6..c58f6421be 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala @@ -35,6 +35,8 @@ object EngineType extends Enumeration with Logging { val REPL = Value("repl") + val DORIS = Value("doris") + val SHELL = Value("shell") val JDBC = Value("jdbc") @@ -98,6 +100,7 @@ object EngineType extends Enumeration with Logging { case _ if PRESTO.toString.equalsIgnoreCase(str) => PRESTO case _ if NEBULA.toString.equalsIgnoreCase(str) => NEBULA case _ if REPL.toString.equalsIgnoreCase(str) => REPL + case _ if DORIS.toString.equalsIgnoreCase(str) => DORIS case _ if FLINK.toString.equalsIgnoreCase(str) => FLINK case _ if APPCONN.toString.equals(str) => APPCONN case _ if SQOOP.toString.equalsIgnoreCase(str) => SQOOP diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala index 645c15dbdb..df17b6f2d3 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala @@ -25,6 +25,7 @@ object RunType extends Enumeration { val SCALA = Value("scala") val PYTHON = Value("python") val REPL = Value("repl") + val DORIS = Value("doris") val JAVA = Value("java") val PYSPARK = Value("py") val R = Value("r") diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java index d7911c030e..88cc9139ec 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java @@ -76,6 +76,8 @@ private static void init() { EngineType.HBASE().toString(), LabelCommonConfig.HBASE_ENGINE_VERSION.getValue()); defaultVersion.put( EngineType.NEBULA().toString(), LabelCommonConfig.NEBULA_ENGINE_VERSION.getValue()); + defaultVersion.put( + EngineType.DORIS().toString(), LabelCommonConfig.DORIS_ENGINE_VERSION.getValue()); defaultVersion.put( EngineType.SQOOP().toString(), LabelCommonConfig.SQOOP_ENGINE_VERSION.getValue()); defaultVersion.put( diff --git a/linkis-engineconn-plugins/doris/pom.xml b/linkis-engineconn-plugins/doris/pom.xml new file mode 100644 index 0000000000..c108c9c3ea --- /dev/null +++ b/linkis-engineconn-plugins/doris/pom.xml @@ -0,0 +1,110 @@ + + + + 4.0.0 + + org.apache.linkis + linkis + ${revision} + ../../pom.xml + + + linkis-engineplugin-doris + + + + org.apache.linkis + linkis-engineconn-plugin-core + ${project.version} + + + + org.apache.linkis + linkis-computation-engineconn + ${project.version} + + + + org.apache.linkis + linkis-storage + ${project.version} + provided + + + + org.apache.linkis + linkis-rpc + ${project.version} + provided + + + + org.apache.linkis + linkis-common + ${project.version} + provided + + + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + org.apache.maven.plugins + maven-assembly-plugin + false + + false + out + false + false + + src/main/assembly/distribution.xml + + + + + make-assembly + + single + + package + + + src/main/assembly/distribution.xml + + + + + + + + + diff --git a/linkis-engineconn-plugins/doris/src/main/assembly/distribution.xml b/linkis-engineconn-plugins/doris/src/main/assembly/distribution.xml new file mode 100644 index 0000000000..37dac510e4 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/assembly/distribution.xml @@ -0,0 +1,71 @@ + + + + + linkis-engineplugin-doris + + dir + zip + + true + doris + + + + + + /dist/${doris.version}/lib + true + true + false + false + true + + + + + + + + ${basedir}/src/main/resources + + linkis-engineconn.properties + log4j2.xml + + 0777 + dist/${doris.version}/conf + unix + + + + ${basedir}/target + + *.jar + + + *doc.jar + + 0777 + plugin/${doris.version} + + + + + + diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/DorisEngineConnPlugin.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/DorisEngineConnPlugin.java new file mode 100644 index 0000000000..68864e13a3 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/DorisEngineConnPlugin.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris; + +import org.apache.linkis.engineplugin.doris.builder.DorisProcessEngineConnLaunchBuilder; +import org.apache.linkis.engineplugin.doris.factory.DorisEngineConnFactory; +import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin; +import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory; +import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder; +import org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory; +import org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory; +import org.apache.linkis.manager.label.entity.Label; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class DorisEngineConnPlugin implements EngineConnPlugin { + private Object resourceLocker = new Object(); + private Object engineFactoryLocker = new Object(); + private volatile EngineResourceFactory engineResourceFactory; + private volatile EngineConnFactory engineFactory; + private List> defaultLabels = new ArrayList<>(); + + @Override + public void init(Map params) {} + + @Override + public EngineResourceFactory getEngineResourceFactory() { + if (null == engineResourceFactory) { + synchronized (resourceLocker) { + engineResourceFactory = new GenericEngineResourceFactory(); + } + } + return engineResourceFactory; + } + + @Override + public EngineConnLaunchBuilder getEngineConnLaunchBuilder() { + return new DorisProcessEngineConnLaunchBuilder(); + } + + @Override + public EngineConnFactory getEngineConnFactory() { + if (null == engineFactory) { + synchronized (engineFactoryLocker) { + engineFactory = new DorisEngineConnFactory(); + } + } + return engineFactory; + } + + @Override + public List> getDefaultLabels() { + return defaultLabels; + } +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/builder/DorisProcessEngineConnLaunchBuilder.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/builder/DorisProcessEngineConnLaunchBuilder.java new file mode 100644 index 0000000000..73c9f6458a --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/builder/DorisProcessEngineConnLaunchBuilder.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.builder; + +import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder; + +public class DorisProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java new file mode 100644 index 0000000000..acba5a4dc6 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.conf; + +import org.apache.linkis.common.conf.CommonVars; + +public class DorisConfiguration { + + public static final CommonVars ENGINE_CONCURRENT_LIMIT = + CommonVars.apply("linkis.engineconn.concurrent.limit", 100); + + public static final CommonVars ENGINE_DEFAULT_LIMIT = + CommonVars.apply("linkis.doris.default.limit", 5000); + + public static final CommonVars DORIS_COLUMN_SEPARATOR = + CommonVars.apply("linkis.doris.column.separator", ","); + + public static final CommonVars DORIS_LINE_DELIMITER = + CommonVars.apply("linkis.doris.line.delimiter", "\\n"); + + public static final CommonVars DORIS_STREAM_LOAD_FILE_PATH = + CommonVars.apply( + "linkis.doris.stream.load.file.path", + "", + "A file path, for example: /test/test.csv, currently supports csv、json、parquet、orc format"); + + public static final CommonVars DORIS_COLUMNS = + CommonVars.apply("linkis.doris.columns", ""); + + public static final CommonVars DORIS_LABEL = CommonVars.apply("linkis.doris.label", ""); + + public static final CommonVars DORIS_CONF = + CommonVars.apply( + "linkis.doris.conf", + "", + "The doris parameter, separated by commas, for example: timeout:600,label:123"); + + public static final CommonVars DORIS_HOST = + CommonVars.apply("linkis.doris.host", "127.0.0.1"); + + public static final CommonVars DORIS_HTTP_PORT = + CommonVars.apply("linkis.doris.http.port", 8030); + + public static final CommonVars DORIS_JDBC_PORT = + CommonVars.apply("linkis.doris.jdcb.port", 9030); + + public static final CommonVars DORIS_DATABASE = + CommonVars.apply("linkis.doris.database", ""); + + public static final CommonVars DORIS_TABLE = CommonVars.apply("linkis.doris.table", ""); + + public static final CommonVars DORIS_USER_NAME = + CommonVars.apply("linkis.doris.username", "root"); + + public static final CommonVars DORIS_PASSWORD = + CommonVars.apply("linkis.doris.password", ""); + + public static final CommonVars DORIS_RECONNECT_ENABLED = + CommonVars.apply("linkis.doris.2pc.enabled", false, "two phase commit Whether to enable"); + + public static final CommonVars DORIS_STRIP_OUTER_ARRAY = + CommonVars.apply( + "linkis.doris.strip.outer.array", + true, + "true indicates that the json data starts with an array object and flattens the array object, the default value is true, Refer to doris for strip_outer_array"); +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisEngineConf.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisEngineConf.java new file mode 100644 index 0000000000..04b0c386b3 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisEngineConf.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.conf; + +import org.apache.linkis.common.conf.Configuration; +import org.apache.linkis.governance.common.protocol.conf.RequestQueryEngineConfigWithGlobalConfig; +import org.apache.linkis.governance.common.protocol.conf.ResponseQueryConfig; +import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; +import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; +import org.apache.linkis.protocol.CacheableProtocol; +import org.apache.linkis.rpc.RPCMapCache; + +import java.util.Map; + +import scala.Tuple2; + +public class DorisEngineConf + extends RPCMapCache, String, String> { + + public DorisEngineConf() { + super(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue()); + } + + @Override + public CacheableProtocol createRequest(Tuple2 labelTuple) { + return new RequestQueryEngineConfigWithGlobalConfig(labelTuple._1(), labelTuple._2(), null); + } + + @Override + public Map createMap(Object obj) { + if (obj instanceof ResponseQueryConfig) { + ResponseQueryConfig response = (ResponseQueryConfig) obj; + return response.getKeyAndValue(); + } else { + return null; + } + } +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java new file mode 100644 index 0000000000..771d51a67c --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/constant/DorisConstant.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.constant; + +public class DorisConstant { + + public static final String STATUS = "Status"; + + public static final String TXN_ID = "TxnId"; + + public static final String TXN_ID_LOWER = "txn_id"; + + public static final String SUCCESS = "Success"; + + public static final String COMMIT = "commit"; + + public static final String ABORT = "abort"; + + public static final String COLUMNS = "columns"; + + public static final String LABEL = "label"; + + public static final String FORMAT = "format"; + + public static final String CSV = "csv"; + + public static final String JSON = "json"; + + public static final String PARQUET = "parquet"; + + public static final String ORC = "orc"; + + public static final String TWO_PHASE_COMMIT = "two_phase_commit"; + + public static final String STRIP_OUTER_ARRAY = "strip_outer_array"; + + public static final String COLUMN_SEPARATOR = "column_separator"; + + public static final String LINE_DELIMITER = "line_delimiter"; + + public static final String TXN_OPERATION = "txn_operation"; + + public static final Integer HTTP_SUCCEED = 200; +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java new file mode 100644 index 0000000000..b9a956dd01 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.errorcode; + +import org.apache.linkis.common.errorcode.ErrorCodeUtils; +import org.apache.linkis.common.errorcode.LinkisErrorCode; + +public enum DorisErrorCodeSummary implements LinkisErrorCode { + CHECK_DORIS_PARAMETER_FAILED(23001, "Failed to check the doris parameter(doris参数检查失败)"), + DORIS_TEST_CONNECTION_FAILED(23002, "The doris test connection failed(doris测试连接失败)"), + + DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK( + 23003, "The doris stream load file path cannot be empty(doris stream load file path不能为空)"), + DORIS_STREAM_LOAD_FILE_PATH_NOT_FILE( + 23004, "The doris stream load file path must be a file(doris stream load file path必须是一个文件)"), + DORIS_STREAM_LOAD_FILE_PATH_NOT_SUPPORTED_TYPE_FILE( + 23005, + "The doris stream load file path This file type is not currently supported(doris stream load file path目前不支持该文件类型)"); + + private final int errorCode; + + private final String errorDesc; + + DorisErrorCodeSummary(int errorCode, String errorDesc) { + ErrorCodeUtils.validateErrorCode(errorCode, 26000, 29999); + this.errorCode = errorCode; + this.errorDesc = errorDesc; + } + + @Override + public int getErrorCode() { + return errorCode; + } + + @Override + public String getErrorDesc() { + return errorDesc; + } +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisException.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisException.java new file mode 100644 index 0000000000..fa4d9b0989 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.exception; + +import org.apache.linkis.common.exception.ErrorException; + +public class DorisException extends ErrorException { + + public DorisException(int errorCode, String message) { + super(errorCode, message); + } +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisParameterException.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisParameterException.java new file mode 100644 index 0000000000..3a91e4d172 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisParameterException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.exception; + +import org.apache.linkis.common.exception.ErrorException; + +public class DorisParameterException extends ErrorException { + + public DorisParameterException(int errorCode, String message) { + super(errorCode, message); + } +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisStreamLoadFileException.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisStreamLoadFileException.java new file mode 100644 index 0000000000..a5b04061bf --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/exception/DorisStreamLoadFileException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.exception; + +import org.apache.linkis.common.exception.ErrorException; + +public class DorisStreamLoadFileException extends ErrorException { + + public DorisStreamLoadFileException(int errorCode, String message) { + super(errorCode, message); + } +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java new file mode 100644 index 0000000000..52fb0b5f56 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.executor; + +import org.apache.linkis.common.io.MetaData; +import org.apache.linkis.common.io.Record; +import org.apache.linkis.common.io.resultset.ResultSetWriter; +import org.apache.linkis.common.log.LogUtils; +import org.apache.linkis.common.utils.JsonUtils; +import org.apache.linkis.common.utils.OverloadUtils; +import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask; +import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor; +import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext; +import org.apache.linkis.engineconn.core.EngineConnObject; +import org.apache.linkis.engineplugin.doris.conf.DorisConfiguration; +import org.apache.linkis.engineplugin.doris.conf.DorisEngineConf; +import org.apache.linkis.engineplugin.doris.errorcode.DorisErrorCodeSummary; +import org.apache.linkis.engineplugin.doris.exception.DorisException; +import org.apache.linkis.engineplugin.doris.exception.DorisParameterException; +import org.apache.linkis.engineplugin.doris.exception.DorisStreamLoadFileException; +import org.apache.linkis.engineplugin.doris.util.DorisUtils; +import org.apache.linkis.manager.common.entity.resource.CommonNodeResource; +import org.apache.linkis.manager.common.entity.resource.LoadResource; +import org.apache.linkis.manager.common.entity.resource.NodeResource; +import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils; +import org.apache.linkis.manager.label.entity.Label; +import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; +import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; +import org.apache.linkis.protocol.engine.JobProgressInfo; +import org.apache.linkis.rpc.Sender; +import org.apache.linkis.scheduler.executer.ErrorExecuteResponse; +import org.apache.linkis.scheduler.executer.ExecuteResponse; +import org.apache.linkis.scheduler.executer.SuccessExecuteResponse; +import org.apache.linkis.storage.LineMetaData; +import org.apache.linkis.storage.LineRecord; +import org.apache.linkis.storage.resultset.ResultSetFactory; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.FileEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import org.springframework.util.CollectionUtils; + +import java.io.File; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import scala.Tuple2; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.linkis.engineplugin.doris.constant.DorisConstant.*; + +/** + * Inspired by: + * https://github.com/apache/doris/blob/master/samples/stream_load/java/DorisStreamLoad.java + */ +public class DorisEngineConnExecutor extends ConcurrentComputationExecutor { + + private static final Logger logger = LoggerFactory.getLogger(DorisEngineConnExecutor.class); + private int id; + + private List> executorLabels = new ArrayList<>(2); + private Map threadCache = new ConcurrentHashMap<>(); + + private Map configMap = new HashMap<>(); + + private static final String DORIS_LABEL_PREFIX = "linkis_doris_"; + + public static final String DORIS_URL_BOOTSTRAP = "http://%s:%s/api/bootstrap"; + + public static final String DORIS_URL_STREAM_LOAD = "http://%s:%s/api/%s/%s/_stream_load"; + + public static final String DORIS_URL_STREAM_LOAD_2PC = "http://%s:%s/api/%s/%s/_stream_load_2pc"; + + private String dorisHost; + private String dorisDatabase; + private String dorisTable; + private String dorisUsername; + private String dorisPassword; + private Integer dorisHttpPort; + private CloseableHttpClient client; + + public DorisEngineConnExecutor(int outputPrintLimit, int id) { + super(outputPrintLimit); + this.id = id; + } + + @Override + public void init() { + super.init(); + } + + @Override + public ExecuteResponse execute(EngineConnTask engineConnTask) { + Optional> userCreatorLabelOp = + Arrays.stream(engineConnTask.getLables()) + .filter(label -> label instanceof UserCreatorLabel) + .findFirst(); + Optional> engineTypeLabelOp = + Arrays.stream(engineConnTask.getLables()) + .filter(label -> label instanceof EngineTypeLabel) + .findFirst(); + + if (userCreatorLabelOp.isPresent() && engineTypeLabelOp.isPresent()) { + UserCreatorLabel userCreatorLabel = (UserCreatorLabel) userCreatorLabelOp.get(); + EngineTypeLabel engineTypeLabel = (EngineTypeLabel) engineTypeLabelOp.get(); + + Map cacheMap = + new DorisEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel, engineTypeLabel)); + if (MapUtils.isNotEmpty(cacheMap)) { + configMap.putAll(cacheMap); + } + } + + Map taskParams = engineConnTask.getProperties(); + + if (MapUtils.isNotEmpty(taskParams)) { + taskParams.entrySet().stream() + .filter(entry -> entry.getValue() != null) + .forEach(entry -> configMap.put(entry.getKey(), String.valueOf(entry.getValue()))); + } + + checkParameter(); + + this.client = + HttpClients.custom() + .setRedirectStrategy( + new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }) + .build(); + return super.execute(engineConnTask); + } + + @Override + public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) { + String testConnectionUrl = String.format(DORIS_URL_BOOTSTRAP, dorisHost, dorisHttpPort); + + if (!testConnection(testConnectionUrl)) { + logger.error("Test connection failed: {}", testConnectionUrl); + throw new DorisException( + DorisErrorCodeSummary.DORIS_TEST_CONNECTION_FAILED.getErrorCode(), + DorisErrorCodeSummary.DORIS_TEST_CONNECTION_FAILED.getErrorDesc()); + } + + String taskId = engineExecutorContext.getJobId().get(); + + initialStatusUpdates(taskId, engineExecutorContext); + + threadCache.put(taskId, Thread.currentThread()); + + CloseableHttpResponse response = null; + Boolean executeResponse = false; + try { + response = streamLoad(engineExecutorContext); + } catch (Exception e) { + String errorMessage = ExceptionUtils.getStackTrace(e); + logger.error("Doris engine execute failed : {}", errorMessage); + engineExecutorContext.appendStdout(LogUtils.generateERROR(errorMessage)); + return new ErrorExecuteResponse(errorMessage, null); + } + + String resultMessage = entitytoString(response); + StringBuilder resultMessageStringBuilder = new StringBuilder(); + resultMessageStringBuilder.append(resultMessage); + + logger.info("Doris stream load execution result: {}", resultMessage); + Map resultMap = responseToMap(resultMessage); + int statusCode = response.getStatusLine().getStatusCode(); + Boolean enabled2PC = DorisConfiguration.DORIS_RECONNECT_ENABLED.getValue(configMap); + + // If two phase commit is enabled, commit if executed successfully, abort otherwise + if (statusCode == HTTP_SUCCEED && isSuccess(resultMap)) { + executeResponse = true; + if (enabled2PC && resultMap.containsKey(TXN_ID)) { + String commitMessage = "doris begin commit"; + logger.info(commitMessage); + engineExecutorContext.appendStdout(commitMessage); + executeResponse = + dorisCommitOrAbort(resultMap.get(TXN_ID), COMMIT, resultMessageStringBuilder); + } + } else { + executeResponse = false; + if (enabled2PC && resultMap.containsKey(TXN_ID)) { + String abortMessage = "doris stream load failed, begin abort"; + logger.error(abortMessage); + engineExecutorContext.appendStdout(abortMessage); + dorisCommitOrAbort(resultMap.get(TXN_ID), ABORT, resultMessageStringBuilder); + } + } + + engineExecutorContext.appendStdout(resultMessageStringBuilder.toString()); + ResultSetWriter resultSetWriter = + engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE); + try { + resultSetWriter.addMetaData(new LineMetaData()); + resultSetWriter.addRecord(new LineRecord(resultMessageStringBuilder.toString())); + } catch (IOException e) { + logger.error("Failed to get the task result"); + } finally { + IOUtils.closeQuietly(resultSetWriter); + } + + if (executeResponse) { + return new SuccessExecuteResponse(); + } else { + return new ErrorExecuteResponse(resultMessageStringBuilder.toString(), null); + } + } + + private CloseableHttpResponse streamLoad(EngineExecutionContext engineExecutorContext) + throws Exception { + String loadUrl = + String.format(DORIS_URL_STREAM_LOAD, dorisHost, dorisHttpPort, dorisDatabase, dorisTable); + + logger.info("Doris engine stream load begins to run loadUrl:\n {}", loadUrl); + engineExecutorContext.appendStdout( + String.format("Doris engine stream load begins to run loadUrl:\n %s", loadUrl)); + + HttpPut httpPut = new HttpPut(loadUrl); + + // Set the doris configuration, which has a low priority and will be overwritten by other + // configurations + String dorisConf = DorisConfiguration.DORIS_CONF.getValue(configMap); + if (StringUtils.isNotBlank(dorisConf)) { + String[] confs = dorisConf.split(","); + for (String conf : confs) { + String[] keyValue = conf.split(":"); + if (keyValue.length == 2) { + String key = keyValue[0]; + String value = keyValue[1]; + httpPut.setHeader(key, value); + logger.info("doris set param {} : {}", key, value); + } + } + } + + addCommonHeader(httpPut); + + String dorisColumns = DorisConfiguration.DORIS_COLUMNS.getValue(configMap); + + if (StringUtils.isBlank(dorisColumns)) { + Integer dorisJdbcPort = DorisConfiguration.DORIS_JDBC_PORT.getValue(configMap); + List dorisCloumns = + DorisUtils.getDorisCloumns( + dorisHost, dorisJdbcPort, dorisUsername, dorisPassword, dorisDatabase, dorisTable); + if (org.apache.commons.collections.CollectionUtils.isNotEmpty(dorisCloumns)) { + // httpPut.setHeader(COLUMNS, String.join(",", dorisCloumns.stream().map(f -> + // String.format("`%s`", f)).collect(Collectors.toList()))); + dorisColumns = + String.join( + ",", + dorisCloumns.stream() + .map(f -> String.format("`%s`", f)) + .collect(Collectors.toList())); + } + } + + // dorisColumns may fail to obtain metadata and is empty + if (StringUtils.isNotBlank(dorisColumns)) { + httpPut.setHeader(COLUMNS, dorisColumns); + logger.info("doris set param {} : {}", COLUMNS, dorisColumns); + } + + // the label header is optional, not necessary + // use label header can ensure at most once semantics + String dorisLabel = DorisConfiguration.DORIS_LABEL.getValue(configMap); + if (StringUtils.isBlank(dorisLabel)) { + dorisLabel = DORIS_LABEL_PREFIX + UUID.randomUUID(); + } + httpPut.setHeader(LABEL, dorisLabel); + logger.info("doris set param {} : {}", LABEL, dorisLabel); + + String dorisStreamLoadFilePath = + DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.getValue(configMap); + + if (StringUtils.isBlank(dorisStreamLoadFilePath)) { + throw new DorisStreamLoadFileException( + DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK.getErrorCode(), + DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK.getErrorDesc()); + } + + File dorisStreamLoadFile = new File(dorisStreamLoadFilePath); + if (!dorisStreamLoadFile.isFile()) { + throw new DorisStreamLoadFileException( + DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_FILE.getErrorCode(), + DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_FILE.getErrorDesc()); + } + + String fileExtension = FilenameUtils.getExtension(dorisStreamLoadFilePath); + + // Currently only csv、json、parquet、orc format are supported + if (!isSupportedType(fileExtension)) { + logger.error( + "The supported types are csv, json, parquet, and orc,This file type is not currently supported: {}", + fileExtension); + throw new DorisStreamLoadFileException( + DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_SUPPORTED_TYPE_FILE.getErrorCode(), + DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_SUPPORTED_TYPE_FILE.getErrorDesc()); + } + + httpPut.setHeader(FORMAT, fileExtension); + + Boolean enabled2PC = DorisConfiguration.DORIS_RECONNECT_ENABLED.getValue(configMap); + httpPut.setHeader(TWO_PHASE_COMMIT, String.valueOf(enabled2PC)); + logger.info("doris set param {} : {}", TWO_PHASE_COMMIT, enabled2PC); + + if (fileExtension.equals(JSON)) { + Boolean stripOuterArray = DorisConfiguration.DORIS_STRIP_OUTER_ARRAY.getValue(configMap); + httpPut.setHeader(STRIP_OUTER_ARRAY, String.valueOf(stripOuterArray)); + logger.info("doris set param {} : {}", STRIP_OUTER_ARRAY, stripOuterArray); + } + + String dorisColumnSeparator = DorisConfiguration.DORIS_COLUMN_SEPARATOR.getValue(configMap); + httpPut.setHeader(COLUMN_SEPARATOR, dorisColumnSeparator); + logger.info("doris set param {} : {}", COLUMN_SEPARATOR, dorisColumnSeparator); + + String dorisLineDelimiter = DorisConfiguration.DORIS_LINE_DELIMITER.getValue(configMap); + httpPut.setHeader(LINE_DELIMITER, dorisLineDelimiter); + logger.info("doris set param {} : {}", LINE_DELIMITER, dorisLineDelimiter); + + FileEntity entity = new FileEntity(dorisStreamLoadFile); + httpPut.setEntity(entity); + engineExecutorContext.appendStdout( + String.format("doris stread load file path: %s", dorisStreamLoadFile.getAbsolutePath())); + + String allHeaders = Arrays.toString(httpPut.getAllHeaders()); + logger.info("doris param: {}", allHeaders); + engineExecutorContext.appendStdout(String.format("doris param: %s", allHeaders)); + + return client.execute(httpPut); + } + + private boolean isSupportedType(String fileExtension) { + if (StringUtils.isBlank(fileExtension)) { + return false; + } + if (fileExtension.equals(CSV) + || fileExtension.equals(JSON) + || fileExtension.equals(PARQUET) + || fileExtension.equals(ORC)) { + return true; + } + return false; + } + + private void checkParameter() { + String dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap); + String dorisDatabase = DorisConfiguration.DORIS_DATABASE.getValue(configMap); + String dorisTable = DorisConfiguration.DORIS_TABLE.getValue(configMap); + String dorisUsername = DorisConfiguration.DORIS_USER_NAME.getValue(configMap); + Integer dorisHttpPort = DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap); + + if (StringUtils.isBlank(dorisHost) + || StringUtils.isBlank(dorisDatabase) + || StringUtils.isBlank(dorisTable) + || StringUtils.isBlank(dorisUsername) + || dorisHttpPort == null) { + logger.error("Doris check param failed."); + throw new DorisParameterException( + DorisErrorCodeSummary.CHECK_DORIS_PARAMETER_FAILED.getErrorCode(), + DorisErrorCodeSummary.CHECK_DORIS_PARAMETER_FAILED.getErrorDesc()); + } + + this.dorisHost = dorisHost; + this.dorisDatabase = dorisDatabase; + this.dorisTable = dorisTable; + this.dorisUsername = dorisUsername; + this.dorisHttpPort = dorisHttpPort; + this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap); + } + + private boolean isSuccess(Map map) { + if (org.apache.commons.collections.MapUtils.isEmpty(map)) { + return false; + } + + if (map.containsKey(STATUS) && map.get(STATUS).equalsIgnoreCase(SUCCESS)) { + return true; + } + + // Sometimes Status is uppercase and sometimes lowercase + if (map.containsKey(STATUS.toLowerCase()) + && map.get(STATUS.toLowerCase()).equalsIgnoreCase(SUCCESS)) { + return true; + } + + return false; + } + + /** After the two phase commit is enabled, you can use it to commit or abort */ + private boolean dorisCommitOrAbort( + String id, String type, StringBuilder resultMessageStringBuilder) { + String load2PCUrl = + String.format( + DORIS_URL_STREAM_LOAD_2PC, dorisHost, dorisHttpPort, dorisDatabase, dorisTable); + HttpPut commmitHttpPut = new HttpPut(load2PCUrl); + addCommonHeader(commmitHttpPut); + commmitHttpPut.setHeader(TXN_ID_LOWER, id); + commmitHttpPut.setHeader(TXN_OPERATION, type); + + CloseableHttpResponse commmitResponse = null; + try { + commmitResponse = client.execute(commmitHttpPut); + } catch (IOException e) { + logger.error("doris {} failed", type, e); + return false; + } + String commmitLoadResult = entitytoString(commmitResponse); + logger.info("Doris stream load {} execution result: {}", type, commmitLoadResult); + resultMessageStringBuilder.append("\r\n").append(commmitLoadResult); + + Map commmitResultMap = responseToMap(commmitLoadResult); + int statusCode = commmitResponse.getStatusLine().getStatusCode(); + + if (statusCode == HTTP_SUCCEED && isSuccess(commmitResultMap)) { + return true; + } + return false; + } + + private static String entitytoString(CloseableHttpResponse response) { + String loadResult = ""; + if (response.getEntity() != null) { + try { + loadResult = EntityUtils.toString(response.getEntity()); + } catch (IOException e) { + logger.error("Doris httpResponse entity conversion to string failed", e); + } + } + return loadResult; + } + + private void addCommonHeader(HttpPut httpPut) { + if (httpPut == null) return; + httpPut.setHeader(HttpHeaders.EXPECT, "100-continue"); + httpPut.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(dorisUsername, dorisPassword)); + } + + private Map responseToMap(String response) { + Map resultMap = new HashMap<>(); + + if (StringUtils.isBlank(response)) { + return resultMap; + } + + try { + resultMap = + JsonUtils.jackson().readValue(response, new TypeReference>() {}); + } catch (JsonProcessingException e) { + logger.error("doris response to map failed", e); + return resultMap; + } + + return resultMap; + } + + private boolean testConnection(String testUrl) { + try { + URL url = new URL(testUrl); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setConnectTimeout(5000); + connection.setRequestMethod("GET"); + connection.connect(); + int responseCode = connection.getResponseCode(); + if (responseCode == HTTP_SUCCEED) { + return true; + } + } catch (Exception e) { + return false; + } + return false; + } + + private String basicAuthHeader(String username, String password) { + String tobeEncode = username + ":" + password; + byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8)); + return "Basic " + new String(encoded); + } + + @Override + public ExecuteResponse executeCompletely( + EngineExecutionContext engineExecutorContext, String code, String completedLine) { + return null; + } + + @Override + public float progress(String taskID) { + return 0.0f; + } + + @Override + public JobProgressInfo[] getProgressInfo(String taskID) { + return new JobProgressInfo[0]; + } + + @Override + public void killTask(String taskId) { + Thread thread = threadCache.remove(taskId); + if (null != thread) { + thread.interrupt(); + } + super.killTask(taskId); + } + + @Override + public List> getExecutorLabels() { + return executorLabels; + } + + @Override + public void setExecutorLabels(List> labels) { + if (!CollectionUtils.isEmpty(labels)) { + executorLabels.clear(); + executorLabels.addAll(labels); + } + } + + @Override + public boolean supportCallBackLogs() { + return false; + } + + @Override + public NodeResource requestExpectedResource(NodeResource expectedResource) { + return null; + } + + @Override + public NodeResource getCurrentNodeResource() { + NodeResourceUtils.appendMemoryUnitIfMissing( + EngineConnObject.getEngineCreationContext().getOptions()); + + CommonNodeResource resource = new CommonNodeResource(); + LoadResource usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory(), 1); + resource.setUsedResource(usedResource); + return resource; + } + + @Override + public String getId() { + return Sender.getThisServiceInstance().getInstance() + "_" + id; + } + + @Override + public int getConcurrentLimit() { + return DorisConfiguration.ENGINE_CONCURRENT_LIMIT.getValue(); + } + + private void initialStatusUpdates(String taskId, EngineExecutionContext engineExecutorContext) { + engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)); + } + + @Override + public void killAll() { + Iterator iterator = threadCache.values().iterator(); + while (iterator.hasNext()) { + Thread thread = iterator.next(); + if (thread != null) { + thread.interrupt(); + } + } + threadCache.clear(); + } + + @Override + public void close() { + killAll(); + try { + if (client != null) { + this.client.close(); + } + } catch (IOException e) { + logger.warn("close doris HttpClient failed"); + } + super.close(); + } +} diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/util/DorisUtils.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/util/DorisUtils.java new file mode 100644 index 0000000000..2d6d560d77 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/util/DorisUtils.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.util; + +import java.sql.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DorisUtils { + + private static final Logger logger = LoggerFactory.getLogger(DorisUtils.class); + + private static final String JDBC_URL = "jdbc:mysql://%s:%s/%s"; + + public static List getDorisCloumns( + String host, Integer port, String username, String password, String database, String table) { + String url = String.format(JDBC_URL, host, port, database); + + Connection connecion = getConnecion(username, password, url); + if (connecion == null) { + return Collections.emptyList(); + } + + String columnSql = "SELECT * FROM `" + database + "`.`" + table + "` WHERE 1 = 2"; + PreparedStatement ps = null; + ResultSet rs = null; + ResultSetMetaData meta = null; + List columns = new ArrayList<>(); + + try { + ps = connecion.prepareStatement(columnSql); + rs = ps.executeQuery(); + meta = rs.getMetaData(); + int columnCount = meta.getColumnCount(); + for (int i = 1; i < columnCount + 1; i++) { + columns.add(meta.getColumnName(i)); + } + } catch (SQLException e) { + logger.error("getDorisCloumns failed", e); + return columns; + } finally { + closeResource(connecion, ps, rs); + } + return columns; + } + + public static void closeResource( + Connection connection, Statement statement, ResultSet resultSet) { + try { + if (null != resultSet && !resultSet.isClosed()) { + resultSet.close(); + } + if (null != statement && !statement.isClosed()) { + statement.close(); + } + if (null != connection && !connection.isClosed()) { + connection.close(); + } + } catch (SQLException e) { + logger.warn("Fail to release resource [" + e.getMessage() + "]", e); + } + } + + private static Connection getConnecion(String username, String password, String url) { + Connection connection = null; + + try { + Class.forName("com.mysql.jdbc.Driver"); + } catch (ClassNotFoundException e) { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + } catch (ClassNotFoundException ex) { + logger.warn( + "The mysql driver does not exist, mysql driver is used to fetch the table column. If you need to use this feature, you need to place the mysql jar into the doris ec lib. ClassNotFoundException: {}", + ex.getMessage()); + return connection; + } + } + + try { + connection = DriverManager.getConnection(url, username, password); + } catch (Exception e) { + logger.warn( + "getConnecion failed,please check whether the connection parameters are correct.", e); + } + return connection; + } +} diff --git a/linkis-engineconn-plugins/doris/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/doris/src/main/resources/linkis-engineconn.properties new file mode 100644 index 0000000000..fd897c71fd --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/resources/linkis-engineconn.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +wds.linkis.server.version=v1 +#wds.linkis.engineconn.debug.enable=true +#wds.linkis.keytab.enable=true +wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.doris.DorisEngineConnPlugin + +wds.linkis.engineconn.support.parallelism=true + +wds.linkis.engineconn.max.free.time=0 \ No newline at end of file diff --git a/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..2cd3e264c3 --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml @@ -0,0 +1,91 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/linkis-engineconn-plugins/doris/src/main/scala/org/apache/linkis/engineplugin/doris/factory/DorisEngineConnFactory.scala b/linkis-engineconn-plugins/doris/src/main/scala/org/apache/linkis/engineplugin/doris/factory/DorisEngineConnFactory.scala new file mode 100644 index 0000000000..cbc75dfd7e --- /dev/null +++ b/linkis-engineconn-plugins/doris/src/main/scala/org/apache/linkis/engineplugin/doris/factory/DorisEngineConnFactory.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.doris.factory + +import org.apache.linkis.engineconn.common.creation.EngineCreationContext +import org.apache.linkis.engineconn.common.engineconn.EngineConn +import org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory +import org.apache.linkis.engineconn.executor.entity.LabelExecutor +import org.apache.linkis.engineplugin.doris.conf.DorisConfiguration +import org.apache.linkis.engineplugin.doris.executor.DorisEngineConnExecutor +import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType} +import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType +import org.apache.linkis.manager.label.entity.engine.RunType.RunType + +class DorisEngineConnFactory extends ComputationSingleExecutorEngineConnFactory { + + override def newExecutor( + id: Int, + engineCreationContext: EngineCreationContext, + engineConn: EngineConn + ): LabelExecutor = { + new DorisEngineConnExecutor(DorisConfiguration.ENGINE_DEFAULT_LIMIT.getValue, id) + } + + override protected def getEngineConnType: EngineType = EngineType.DORIS + + override protected def getRunType: RunType = RunType.DORIS + +} diff --git a/linkis-engineconn-plugins/pom.xml b/linkis-engineconn-plugins/pom.xml index cbee4fa0fe..054d300ed0 100644 --- a/linkis-engineconn-plugins/pom.xml +++ b/linkis-engineconn-plugins/pom.xml @@ -43,6 +43,7 @@ seatunnel hbase nebula + doris repl diff --git a/pom.xml b/pom.xml index ad8a6e4d62..5af89dfaca 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,7 @@ 1 0.234 3.0.0 + 1.2.6 1 python2 2.1.2 From c381145ac9bb54c19b8668c3c5f81b4e92f78746 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 22 Dec 2023 17:19:57 +0800 Subject: [PATCH 2/5] Modify error codes --- .../doris/errorcode/DorisErrorCodeSummary.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java index b9a956dd01..80a3febb47 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java @@ -21,15 +21,15 @@ import org.apache.linkis.common.errorcode.LinkisErrorCode; public enum DorisErrorCodeSummary implements LinkisErrorCode { - CHECK_DORIS_PARAMETER_FAILED(23001, "Failed to check the doris parameter(doris参数检查失败)"), - DORIS_TEST_CONNECTION_FAILED(23002, "The doris test connection failed(doris测试连接失败)"), + CHECK_DORIS_PARAMETER_FAILED(28501, "Failed to check the doris parameter(doris参数检查失败)"), + DORIS_TEST_CONNECTION_FAILED(28502, "The doris test connection failed(doris测试连接失败)"), DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK( - 23003, "The doris stream load file path cannot be empty(doris stream load file path不能为空)"), + 28503, "The doris stream load file path cannot be empty(doris stream load file path不能为空)"), DORIS_STREAM_LOAD_FILE_PATH_NOT_FILE( - 23004, "The doris stream load file path must be a file(doris stream load file path必须是一个文件)"), + 28504, "The doris stream load file path must be a file(doris stream load file path必须是一个文件)"), DORIS_STREAM_LOAD_FILE_PATH_NOT_SUPPORTED_TYPE_FILE( - 23005, + 28505, "The doris stream load file path This file type is not currently supported(doris stream load file path目前不支持该文件类型)"); private final int errorCode; From 8a78929752b0a4d366f1e1953244e94e77dc5459 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Mon, 25 Dec 2023 15:51:06 +0800 Subject: [PATCH 3/5] Optimized code --- .../manager/label/conf/LabelCommonConfig.java | 2 +- .../doris/conf/DorisConfiguration.java | 2 +- .../errorcode/DorisErrorCodeSummary.java | 15 ++-- .../executor/DorisEngineConnExecutor.java | 71 ++++++++++++++----- .../doris/src/main/resources/log4j2.xml | 24 ++++--- 5 files changed, 80 insertions(+), 34 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java index 960a54b09b..2d180c496e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java @@ -76,7 +76,7 @@ public class LabelCommonConfig { CommonVars.apply("wds.linkis.nebula.engine.version", "3.0.0"); public static final CommonVars DORIS_ENGINE_VERSION = - CommonVars.apply("wds.linkis.doris.engine.version", "1.2.6"); + CommonVars.apply("linkis.doris.engine.version", "1.2.6"); public static final CommonVars PRESTO_ENGINE_VERSION = CommonVars.apply("wds.linkis.presto.engine.version", "0.234"); diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java index acba5a4dc6..2f454903a3 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java @@ -22,7 +22,7 @@ public class DorisConfiguration { public static final CommonVars ENGINE_CONCURRENT_LIMIT = - CommonVars.apply("linkis.engineconn.concurrent.limit", 100); + CommonVars.apply("linkis.engineconn.doris.concurrent.limit", 100); public static final CommonVars ENGINE_DEFAULT_LIMIT = CommonVars.apply("linkis.doris.default.limit", 5000); diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java index 80a3febb47..6bd991c77b 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/errorcode/DorisErrorCodeSummary.java @@ -23,13 +23,18 @@ public enum DorisErrorCodeSummary implements LinkisErrorCode { CHECK_DORIS_PARAMETER_FAILED(28501, "Failed to check the doris parameter(doris参数检查失败)"), DORIS_TEST_CONNECTION_FAILED(28502, "The doris test connection failed(doris测试连接失败)"), - - DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK( - 28503, "The doris stream load file path cannot be empty(doris stream load file path不能为空)"), DORIS_STREAM_LOAD_FILE_PATH_NOT_FILE( - 28504, "The doris stream load file path must be a file(doris stream load file path必须是一个文件)"), + 28503, "The doris stream load file path must be a file(doris stream load file path必须是一个文件)"), + DORIS_CODE_IS_NOT_BLANK(28504, "Doris engine code cannot be empty(Doris引擎代码不能为空)"), + + DORIS_CODE_FAILED_TO_CONVERT_JSON( + 28505, "Doris code Failed to convert json(Doris code 转换json失败)"), + + DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK( + 28506, "Doris required Parameter cannot be empty(Doris必填参数不能为空)"), + DORIS_STREAM_LOAD_FILE_PATH_NOT_SUPPORTED_TYPE_FILE( - 28505, + 28507, "The doris stream load file path This file type is not currently supported(doris stream load file path目前不支持该文件类型)"); private final int errorCode; diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java index 52fb0b5f56..078b638367 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java @@ -112,6 +112,8 @@ public class DorisEngineConnExecutor extends ConcurrentComputationExecutor { private String dorisTable; private String dorisUsername; private String dorisPassword; + + private String dorisStreamLoadFilePath; private Integer dorisHttpPort; private CloseableHttpClient client; @@ -172,6 +174,18 @@ protected boolean isRedirectable(String method) { @Override public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) { + String realCode; + if (StringUtils.isBlank(code)) { + throw new DorisException( + DorisErrorCodeSummary.DORIS_CODE_IS_NOT_BLANK.getErrorCode(), + DorisErrorCodeSummary.DORIS_CODE_IS_NOT_BLANK.getErrorDesc()); + } else { + realCode = code.trim(); + } + logger.info("Doris engine begins to run code:\n {}", realCode); + + checkRequiredParameter(code); + String testConnectionUrl = String.format(DORIS_URL_BOOTSTRAP, dorisHost, dorisHttpPort); if (!testConnection(testConnectionUrl)) { @@ -283,8 +297,6 @@ private CloseableHttpResponse streamLoad(EngineExecutionContext engineExecutorCo DorisUtils.getDorisCloumns( dorisHost, dorisJdbcPort, dorisUsername, dorisPassword, dorisDatabase, dorisTable); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(dorisCloumns)) { - // httpPut.setHeader(COLUMNS, String.join(",", dorisCloumns.stream().map(f -> - // String.format("`%s`", f)).collect(Collectors.toList()))); dorisColumns = String.join( ",", @@ -309,15 +321,6 @@ private CloseableHttpResponse streamLoad(EngineExecutionContext engineExecutorCo httpPut.setHeader(LABEL, dorisLabel); logger.info("doris set param {} : {}", LABEL, dorisLabel); - String dorisStreamLoadFilePath = - DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.getValue(configMap); - - if (StringUtils.isBlank(dorisStreamLoadFilePath)) { - throw new DorisStreamLoadFileException( - DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK.getErrorCode(), - DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK.getErrorDesc()); - } - File dorisStreamLoadFile = new File(dorisStreamLoadFilePath); if (!dorisStreamLoadFile.isFile()) { throw new DorisStreamLoadFileException( @@ -384,14 +387,10 @@ private boolean isSupportedType(String fileExtension) { private void checkParameter() { String dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap); - String dorisDatabase = DorisConfiguration.DORIS_DATABASE.getValue(configMap); - String dorisTable = DorisConfiguration.DORIS_TABLE.getValue(configMap); String dorisUsername = DorisConfiguration.DORIS_USER_NAME.getValue(configMap); Integer dorisHttpPort = DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap); if (StringUtils.isBlank(dorisHost) - || StringUtils.isBlank(dorisDatabase) - || StringUtils.isBlank(dorisTable) || StringUtils.isBlank(dorisUsername) || dorisHttpPort == null) { logger.error("Doris check param failed."); @@ -401,13 +400,51 @@ private void checkParameter() { } this.dorisHost = dorisHost; - this.dorisDatabase = dorisDatabase; - this.dorisTable = dorisTable; this.dorisUsername = dorisUsername; this.dorisHttpPort = dorisHttpPort; this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap); } + private void checkRequiredParameter(String code) { + Map codeMap = new HashMap<>(); + + try { + codeMap = + JsonUtils.jackson().readValue(code, new TypeReference>() {}); + } catch (JsonProcessingException e) { + throw new DorisException( + DorisErrorCodeSummary.DORIS_CODE_FAILED_TO_CONVERT_JSON.getErrorCode(), + DorisErrorCodeSummary.DORIS_CODE_FAILED_TO_CONVERT_JSON.getErrorDesc()); + } + + String dorisStreamLoadFilePath = + codeMap.getOrDefault(DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(), ""); + String dorisDatabase = codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), ""); + String dorisTable = codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), ""); + + if (StringUtils.isBlank(dorisStreamLoadFilePath) + || StringUtils.isBlank(dorisDatabase) + || StringUtils.isBlank(dorisTable)) { + logger.error( + "Check whether `{}`, `{}`, and `{}` are included in code json", + DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(), + DorisConfiguration.DORIS_DATABASE.key(), + DorisConfiguration.DORIS_TABLE.key()); + throw new DorisException( + DorisErrorCodeSummary.DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK.getErrorCode(), + DorisErrorCodeSummary.DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK.getErrorDesc()); + } + + this.dorisStreamLoadFilePath = dorisStreamLoadFilePath; + this.dorisDatabase = dorisDatabase; + this.dorisTable = dorisTable; + logger.info( + "Doris parameter dorisStreamLoadFilePath: {}, dorisDatabase: {}, dorisTable: {}.", + this.dorisStreamLoadFilePath, + this.dorisDatabase, + this.dorisTable); + } + private boolean isSuccess(Map map) { if (org.apache.commons.collections.MapUtils.isEmpty(map)) { return false; diff --git a/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml index 2cd3e264c3..b6f2fc895c 100644 --- a/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml +++ b/linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml @@ -6,22 +6,26 @@ ~ The ASF licenses this file to You under the Apache License, Version 2.0 ~ (the "License"); you may not use this file except in compliance with ~ the License. You may obtain a copy of the License at - ~ + ~ ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ + ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - + - - - - + + + + + + + @@ -41,9 +45,9 @@ - + - + @@ -87,5 +91,5 @@ - + From 41bbe0adcc7fdc4dd0d49201dcb991bedcebc626 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Tue, 26 Dec 2023 09:38:11 +0800 Subject: [PATCH 4/5] Optimized code --- .../engineplugin/doris/executor/DorisEngineConnExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java index 078b638367..8d6a494560 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/executor/DorisEngineConnExecutor.java @@ -184,7 +184,7 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, } logger.info("Doris engine begins to run code:\n {}", realCode); - checkRequiredParameter(code); + checkRequiredParameter(realCode); String testConnectionUrl = String.format(DORIS_URL_BOOTSTRAP, dorisHost, dorisHttpPort); From 97d1f69df9f2b4bac7f5b7583348fcca1ce89166 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Tue, 26 Dec 2023 09:53:04 +0800 Subject: [PATCH 5/5] Optimized code --- .../doris/conf/DorisConfiguration.java | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java index 2f454903a3..24b80f7345 100644 --- a/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java +++ b/linkis-engineconn-plugins/doris/src/main/java/org/apache/linkis/engineplugin/doris/conf/DorisConfiguration.java @@ -22,60 +22,62 @@ public class DorisConfiguration { public static final CommonVars ENGINE_CONCURRENT_LIMIT = - CommonVars.apply("linkis.engineconn.doris.concurrent.limit", 100); + CommonVars.apply("linkis.engineconn.concurrent.limit", 100); public static final CommonVars ENGINE_DEFAULT_LIMIT = - CommonVars.apply("linkis.doris.default.limit", 5000); + CommonVars.apply("linkis.ec.doris.default.limit", 5000); public static final CommonVars DORIS_COLUMN_SEPARATOR = - CommonVars.apply("linkis.doris.column.separator", ","); + CommonVars.apply("linkis.ec.doris.column.separator", ","); public static final CommonVars DORIS_LINE_DELIMITER = - CommonVars.apply("linkis.doris.line.delimiter", "\\n"); + CommonVars.apply("linkis.ec.doris.line.delimiter", "\\n"); public static final CommonVars DORIS_STREAM_LOAD_FILE_PATH = CommonVars.apply( - "linkis.doris.stream.load.file.path", + "linkis.ec.doris.stream.load.file.path", "", "A file path, for example: /test/test.csv, currently supports csv、json、parquet、orc format"); public static final CommonVars DORIS_COLUMNS = - CommonVars.apply("linkis.doris.columns", ""); + CommonVars.apply("linkis.ec.doris.columns", ""); - public static final CommonVars DORIS_LABEL = CommonVars.apply("linkis.doris.label", ""); + public static final CommonVars DORIS_LABEL = + CommonVars.apply("linkis.ec.doris.label", ""); public static final CommonVars DORIS_CONF = CommonVars.apply( - "linkis.doris.conf", + "linkis.ec.doris.conf", "", "The doris parameter, separated by commas, for example: timeout:600,label:123"); public static final CommonVars DORIS_HOST = - CommonVars.apply("linkis.doris.host", "127.0.0.1"); + CommonVars.apply("linkis.ec.doris.host", "127.0.0.1"); public static final CommonVars DORIS_HTTP_PORT = - CommonVars.apply("linkis.doris.http.port", 8030); + CommonVars.apply("linkis.ec.doris.http.port", 8030); public static final CommonVars DORIS_JDBC_PORT = - CommonVars.apply("linkis.doris.jdcb.port", 9030); + CommonVars.apply("linkis.ec.doris.jdcb.port", 9030); public static final CommonVars DORIS_DATABASE = - CommonVars.apply("linkis.doris.database", ""); + CommonVars.apply("linkis.ec.doris.database", ""); - public static final CommonVars DORIS_TABLE = CommonVars.apply("linkis.doris.table", ""); + public static final CommonVars DORIS_TABLE = + CommonVars.apply("linkis.ec.doris.table", ""); public static final CommonVars DORIS_USER_NAME = - CommonVars.apply("linkis.doris.username", "root"); + CommonVars.apply("linkis.ec.doris.username", "root"); public static final CommonVars DORIS_PASSWORD = - CommonVars.apply("linkis.doris.password", ""); + CommonVars.apply("linkis.ec.doris.password", ""); public static final CommonVars DORIS_RECONNECT_ENABLED = - CommonVars.apply("linkis.doris.2pc.enabled", false, "two phase commit Whether to enable"); + CommonVars.apply("linkis.ec.doris.2pc.enabled", false, "two phase commit Whether to enable"); public static final CommonVars DORIS_STRIP_OUTER_ARRAY = CommonVars.apply( - "linkis.doris.strip.outer.array", + "linkis.ec.doris.strip.outer.array", true, "true indicates that the json data starts with an array object and flattens the array object, the default value is true, Refer to doris for strip_outer_array"); }