From 98231308bdaca462bf8e3232b6f0f3e6e4742320 Mon Sep 17 00:00:00 2001
From: ChengJie1053 <18033291053@163.com>
Date: Fri, 29 Dec 2023 10:53:45 +0800
Subject: [PATCH] Spark etl supports doris (#5058)
* Spark etl supports doris
* Modify known-dependencies.txt
* Optimized code
---
.../spark/scala-2.12/pom.xml | 10 ++
.../spark/datacalc/TestDorisCala.scala | 126 ++++++++++++++++++
.../spark/datacalc/sink/DorisSinkConfig.java | 91 +++++++++++++
.../datacalc/source/DorisSourceConfig.java | 74 ++++++++++
.../spark/datacalc/util/PluginUtil.java | 2 +
.../spark/datacalc/sink/DorisSink.scala | 57 ++++++++
.../spark/datacalc/source/DorisSource.scala | 49 +++++++
tool/dependencies/known-dependencies.txt | 1 +
8 files changed, 410 insertions(+)
create mode 100644 linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
create mode 100644 linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java
create mode 100644 linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java
create mode 100644 linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala
create mode 100644 linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/pom.xml b/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
index 1ba1a8ad5f..840b02dc87 100644
--- a/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
+++ b/linkis-engineconn-plugins/spark/scala-2.12/pom.xml
@@ -31,6 +31,10 @@
2.0.2
0.13.0
8.11.0
+
+
+ 3.2
+ 1.2.0
@@ -185,6 +189,12 @@
+
+ org.apache.doris
+ spark-doris-connector-${spark.doris.version}_${scala.binary.version}
+ ${spark.doris.connector.version}
+
+
com.lucidworks.spark
spark-solr
diff --git a/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
new file mode 100644
index 0000000000..4ddba1d2fd
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/scala-2.12/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDorisCala.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.common.io.FsPath
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestDorisCala {
+
+ val filePath = this.getClass.getResource("/").getFile
+
+ @Test
+ def testDorisWrite: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data = DataCalcGroupData.getData(dorisWriteConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ @Test
+ def testDorisReader: Unit = {
+ // skip os: windows
+ if (!FsPath.WINDOWS) {
+ val data =
+ DataCalcGroupData.getData(dorisReaderConfigJson.replace("{filePath}", filePath))
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+ }
+
+ val dorisWriteConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "file",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "path": "file://{filePath}/etltest.dolphin",
+ | "serializer": "csv",
+ | "options": {
+ | "header":"true",
+ | "delimiter":";"
+ | },
+ | "columnNames": ["name", "age"]
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "doris",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "url": "localhost:8030",
+ | "user": "root",
+ | "password": "",
+ | "targetDatabase": "test",
+ | "targetTable": "test"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+ val dorisReaderConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "doris",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "url": "localhost:8030",
+ | "user": "root",
+ | "password": "",
+ | "sourceDatabase": "test",
+ | "sourceTable": "test"
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "file",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "file://{filePath}/json",
+ | "saveMode": "overwrite",
+ | "serializer": "json"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java
new file mode 100644
index 0000000000..3c8227faed
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSinkConfig.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+public class DorisSinkConfig extends SinkConfig {
+
+ @NotBlank private String url;
+
+ @NotBlank private String user;
+
+ private String password;
+
+ @NotBlank private String targetDatabase;
+
+ @NotBlank private String targetTable;
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+ message =
+ "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
+ private String saveMode = "overwrite";
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getTargetDatabase() {
+ return targetDatabase;
+ }
+
+ public void setTargetDatabase(String targetDatabase) {
+ this.targetDatabase = targetDatabase;
+ }
+
+ public String getTargetTable() {
+ return targetTable;
+ }
+
+ public void setTargetTable(String targetTable) {
+ this.targetTable = targetTable;
+ }
+
+ public String getSaveMode() {
+ return saveMode;
+ }
+
+ public void setSaveMode(String saveMode) {
+ this.saveMode = saveMode;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java
new file mode 100644
index 0000000000..95a11d89df
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSourceConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+public class DorisSourceConfig extends SourceConfig {
+
+ @NotBlank private String url;
+
+ @NotBlank private String user;
+ private String password;
+
+ @NotBlank private String sourceDatabase;
+
+ @NotBlank private String sourceTable;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getSourceDatabase() {
+ return sourceDatabase;
+ }
+
+ public void setSourceDatabase(String sourceDatabase) {
+ this.sourceDatabase = sourceDatabase;
+ }
+
+ public String getSourceTable() {
+ return sourceTable;
+ }
+
+ public void setSourceTable(String sourceTable) {
+ this.sourceTable = sourceTable;
+ }
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
index e27d110c32..2d29c1b551 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -52,6 +52,7 @@ private static Map> getSourcePlugins() {
classMap.put("solr", SolrSource.class);
classMap.put("kafka", KafkaSource.class);
classMap.put("starrocks", StarrocksSource.class);
+ classMap.put("doris", DorisSource.class);
return classMap;
}
@@ -75,6 +76,7 @@ private static Map> getSinkPlugins() {
classMap.put("solr", SolrSink.class);
classMap.put("kafka", KafkaSink.class);
classMap.put("starrocks", StarrocksSink.class);
+ classMap.put("doris", DorisSink.class);
return classMap;
}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala
new file mode 100644
index 0000000000..9d5301ced9
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/DorisSink.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+class DorisSink extends DataCalcSink[DorisSinkConfig] with Logging {
+
+ def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+ var options = Map(
+ "doris.fenodes" -> config.getUrl,
+ "user" -> config.getUser,
+ "password" -> config.getPassword,
+ "doris.table.identifier" -> String.format(
+ "%s.%s",
+ config.getTargetDatabase,
+ config.getTargetTable
+ )
+ )
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ options = config.getOptions.asScala.toMap ++ options
+ }
+
+ val writer = ds.write.format("doris")
+ if (StringUtils.isNotBlank(config.getSaveMode)) {
+ writer.mode(config.getSaveMode)
+ }
+
+ logger.info(
+ s"Save data from doris url: ${config.getUrl}, targetDatabase: ${config.getTargetDatabase}, targetTable: ${config.getTargetTable}"
+ )
+ writer.options(options).save()
+ }
+
+}
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala
new file mode 100644
index 0000000000..a4819f2181
--- /dev/null
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/DorisSource.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+class DorisSource extends DataCalcSource[DorisSourceConfig] with Logging {
+
+ override def getData(spark: SparkSession): Dataset[Row] = {
+ val reader = spark.read.format("doris")
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ reader.options(config.getOptions)
+ }
+
+ logger.info(
+ s"Load data from Doris url: ${config.getUrl}, sourceDatabase: ${config.getSourceDatabase}, sourceTable: ${config.getSourceTable}"
+ )
+
+ reader
+ .option(
+ "doris.table.identifier",
+ String.format("%s.%s", config.getSourceDatabase, config.getSourceTable)
+ )
+ .option("doris.fenodes", config.getUrl)
+ .option("user", config.getUser)
+ .option("password", config.getPassword)
+ .load()
+ }
+
+}
diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt
index 73f1abede8..b826a67495 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -578,6 +578,7 @@ snakeyaml-1.33.jar
snappy-java-1.1.4.jar
snappy-java-1.1.7.7.jar
snappy-java-1.1.8.2.jar
+spark-doris-connector-3.2_2.12-1.2.0.jar
spark-redis_2.12-2.6.0.jar
spring-aop-5.2.23.RELEASE.jar
spring-beans-5.2.23.RELEASE.jar