From dac4601c042b40bd4a8e3bf9738f28355b596e0c Mon Sep 17 00:00:00 2001
From: ChengJie1053 <18033291053@163.com>
Date: Tue, 26 Dec 2023 11:18:35 +0800
Subject: [PATCH 1/3] Spark etl supports doris
---
linkis-engineconn-plugins/spark/pom.xml | 6 +
.../spark/datacalc/sink/DorisSinkConfig.java | 91 +++++++++++++
.../datacalc/source/DorisSourceConfig.java | 74 ++++++++++
.../spark/datacalc/util/PluginUtil.java | 2 +
.../spark/datacalc/sink/DorisSink.scala | 57 ++++++++
.../spark/datacalc/source/DorisSource.scala | 49 +++++++
.../spark/datacalc/TestDorisCala.scala | 126 ++++++++++++++++++
7 files changed, 405 insertions(+)
create mode 100644 linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java
create mode 100644 linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java
create mode 100644 linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala
create mode 100644 linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala
create mode 100644 linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml
index 78232472ac..cf5a2e5b3f 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -418,6 +418,12 @@
kubernetes-model-core
${kubernetes-client.version}
+
+
+ org.apache.doris
+ spark-doris-connector-3.2_2.12
+ 1.2.0
+
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java
new file mode 100644
index 0000000000..3c8227faed
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+public class DorisSinkConfig extends SinkConfig {
+
+ @NotBlank private String url;
+
+ @NotBlank private String user;
+
+ private String password;
+
+ @NotBlank private String targetDatabase;
+
+ @NotBlank private String targetTable;
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+ message =
+ "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
+ private String saveMode = "overwrite";
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getTargetDatabase() {
+ return targetDatabase;
+ }
+
+ public void setTargetDatabase(String targetDatabase) {
+ this.targetDatabase = targetDatabase;
+ }
+
+ public String getTargetTable() {
+ return targetTable;
+ }
+
+ public void setTargetTable(String targetTable) {
+ this.targetTable = targetTable;
+ }
+
+ public String getSaveMode() {
+ return saveMode;
+ }
+
+ public void setSaveMode(String saveMode) {
+ this.saveMode = saveMode;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java
new file mode 100644
index 0000000000..95a11d89df
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class DorisSourceConfig extends SourceConfig {
+
+ @NotBlank private String url;
+
+ @NotBlank private String user;
+ private String password;
+
+ @NotBlank private String sourceDatabase;
+
+ @NotBlank private String sourceTable;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getSourceDatabase() {
+ return sourceDatabase;
+ }
+
+ public void setSourceDatabase(String sourceDatabase) {
+ this.sourceDatabase = sourceDatabase;
+ }
+
+ public String getSourceTable() {
+ return sourceTable;
+ }
+
+ public void setSourceTable(String sourceTable) {
+ this.sourceTable = sourceTable;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
index e27d110c32..2d29c1b551 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -52,6 +52,7 @@ private static Map> getSourcePlugins() {
classMap.put("solr", SolrSource.class);
classMap.put("kafka", KafkaSource.class);
classMap.put("starrocks", StarrocksSource.class);
+ classMap.put("doris", DorisSource.class);
return classMap;
}
@@ -75,6 +76,7 @@ private static Map> getSinkPlugins() {
classMap.put("solr", SolrSink.class);
classMap.put("kafka", KafkaSink.class);
classMap.put("starrocks", StarrocksSink.class);
+ classMap.put("doris", DorisSink.class);
return classMap;
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala
new file mode 100644
index 0000000000..9d5301ced9
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class DorisSink extends DataCalcSink[DorisSinkConfig] with Logging {
+
+ def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+ var options = Map(
+ "doris.fenodes" -> config.getUrl,
+ "user" -> config.getUser,
+ "password" -> config.getPassword,
+ "doris.table.identifier" -> String.format(
+ "%s.%s",
+ config.getTargetDatabase,
+ config.getTargetTable
+ )
+ )
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ options = config.getOptions.asScala.toMap ++ options
+ }
+
+ val writer = ds.write.format("doris")
+ if (StringUtils.isNotBlank(config.getSaveMode)) {
+ writer.mode(config.getSaveMode)
+ }
+
+ logger.info(
+ s"Save data from doris url: ${config.getUrl}, targetDatabase: ${config.getTargetDatabase}, targetTable: ${config.getTargetTable}"
+ )
+ writer.options(options).save()
+ }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala
new file mode 100644
index 0000000000..a4819f2181
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class DorisSource extends DataCalcSource[DorisSourceConfig] with Logging {
+
+ override def getData(spark: SparkSession): Dataset[Row] = {
+ val reader = spark.read.format("doris")
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ reader.options(config.getOptions)
+ }
+
+ logger.info(
+ s"Load data from Doris url: ${config.getUrl}, sourceDatabase: ${config.getSourceDatabase}, sourceTable: ${config.getSourceTable}"
+ )
+
+ reader
+ .option(
+ "doris.table.identifier",
+ String.format("%s.%s", config.getSourceDatabase, config.getSourceTable)
+ )
+ .option("doris.fenodes", config.getUrl)
+ .option("user", config.getUser)
+ .option("password", config.getPassword)
+ .load()
+ }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
new file mode 100644
index 0000000000..4ddba1d2fd
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestDorisCala {
+
+ val filePath = this.getClass.getResource("/").getFile
+
+ @Test
+ def testDorisWrite: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data = DataCalcGroupData.getData(dorisWriteConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ @Test
+ def testDorisReader: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data =
+ DataCalcGroupData.getData(dorisReaderConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ val dorisWriteConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "file",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "path": "file://{filePath}/etltest.dolphin",
+ | "serializer": "csv",
+ | "options": {
+ | "header":"true",
+ | "delimiter":";"
+ | },
+ | "columnNames": ["name", "age"]
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "doris",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "url": "localhost:8030",
+ | "user": "root",
+ | "password": "",
+ | "targetDatabase": "test",
+ | "targetTable": "test"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+ val dorisReaderConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "doris",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "url": "localhost:8030",
+ | "user": "root",
+ | "password": "",
+ | "sourceDatabase": "test",
+ | "sourceTable": "test"
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "file",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "file://{filePath}/json",
+ | "saveMode": "overwrite",
+ | "serializer": "json"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+}
From 79a2b98fd659fd5ba8086c98aa4cf330e6ab4617 Mon Sep 17 00:00:00 2001
From: ChengJie1053 <18033291053@163.com>
Date: Tue, 26 Dec 2023 11:39:14 +0800
Subject: [PATCH 2/3] Modify known-dependencies.txt
---
tool/dependencies/known-dependencies.txt | 1 +
1 file changed, 1 insertion(+)
diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt
index 73f1abede8..b826a67495 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -578,6 +578,7 @@ snakeyaml-1.33.jar
snappy-java-1.1.4.jar
snappy-java-1.1.7.7.jar
snappy-java-1.1.8.2.jar
+spark-doris-connector-3.2_2.12-1.2.0.jar
spark-redis_2.12-2.6.0.jar
spring-aop-5.2.23.RELEASE.jar
spring-beans-5.2.23.RELEASE.jar
From ccb0cdbd73c3f862e40b2eea25d643f00be0d35a Mon Sep 17 00:00:00 2001
From: ChengJie1053 <18033291053@163.com>
Date: Wed, 27 Dec 2023 11:13:37 +0800
Subject: [PATCH 3/3] Optimized code
---
linkis-engineconn-plugins/spark/pom.xml | 6 ------
linkis-engineconn-plugins/spark/scala-2.12/pom.xml | 10 ++++++++++
.../engineplugin/spark/datacalc/TestDorisCala.scala | 0
3 files changed, 10 insertions(+), 6 deletions(-)
rename linkis-engineconn-plugins/spark/{ => scala-2.12}/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala (100%)
diff --git a/linkis-engineconn-plugins/spark/pom.xml b/linkis-engineconn-plugins/spark/pom.xml
index cf5a2e5b3f..78232472ac 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -418,12 +418,6 @@
kubernetes-model-core
${kubernetes-client.version}
-
-
- org.apache.doris
- spark-doris-connector-3.2_2.12
- 1.2.0
-
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/pom.xml b/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
index 1ba1a8ad5f..840b02dc87 100644
--- a/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
+++ b/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
@@ -31,6 +31,10 @@
2.0.2
0.13.0
8.11.0
+
+
+ 3.2
+ 1.2.0
@@ -185,6 +189,12 @@
+
+ org.apache.doris
+ spark-doris-connector-${spark.doris.version}_${scala.binary.version}
+ ${spark.doris.connector.version}
+
+
com.lucidworks.spark
spark-solr
diff --git a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
similarity index 100%
rename from linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
rename to linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala