Skip to content

Commit

Permalink
Spark etl supports doris (#5058)
Browse files Browse the repository at this point in the history
* Spark etl supports doris

* Modify known-dependencies.txt

* Optimized code
  • Loading branch information
ChengJie1053 authored Dec 29, 2023
1 parent 292cd90 commit 9823130
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 0 deletions.
10 changes: 10 additions & 0 deletions linkis-engineconn-plugins/spark/scala-2.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
<delta.version>2.0.2</delta.version>
<hudi.version>0.13.0</hudi.version>
<solr.version>8.11.0</solr.version>

<!-- doris-spark -->
<spark.doris.version>3.2</spark.doris.version>
<spark.doris.connector.version>1.2.0</spark.doris.connector.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -185,6 +189,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-${spark.doris.version}_${scala.binary.version}</artifactId>
<version>${spark.doris.connector.version}</version>
</dependency>

<dependency>
<groupId>com.lucidworks.spark</groupId>
<artifactId>spark-solr</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.spark.datacalc

import org.apache.linkis.common.io.FsPath
import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcGroupData

import org.junit.jupiter.api.{Assertions, Test};

class TestDorisCala {

val filePath = this.getClass.getResource("/").getFile

@Test
def testDorisWrite: Unit = {
// skip os: windows
if (!FsPath.WINDOWS) {
val data = DataCalcGroupData.getData(dorisWriteConfigJson.replace("{filePath}", filePath))
Assertions.assertTrue(data != null)

val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
Assertions.assertTrue(sources != null)
Assertions.assertTrue(transforms != null)
Assertions.assertTrue(sinks != null)
}
}

@Test
def testDorisReader: Unit = {
// skip os: windows
if (!FsPath.WINDOWS) {
val data =
DataCalcGroupData.getData(dorisReaderConfigJson.replace("{filePath}", filePath))
Assertions.assertTrue(data != null)

val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
Assertions.assertTrue(sources != null)
Assertions.assertTrue(transforms != null)
Assertions.assertTrue(sinks != null)
}
}

val dorisWriteConfigJson =
"""
|{
| "sources": [
| {
| "name": "file",
| "type": "source",
| "config": {
| "resultTable": "T1654611700631",
| "path": "file://{filePath}/etltest.dolphin",
| "serializer": "csv",
| "options": {
| "header":"true",
| "delimiter":";"
| },
| "columnNames": ["name", "age"]
| }
| }
| ],
| "sinks": [
| {
| "name": "doris",
| "type": "sink",
| "config": {
| "sourceTable": "T1654611700631",
| "url": "localhost:8030",
| "user": "root",
| "password": "",
| "targetDatabase": "test",
| "targetTable": "test"
| }
| }
| ]
|}
|""".stripMargin

val dorisReaderConfigJson =
"""
|{
| "sources": [
| {
| "name": "doris",
| "type": "source",
| "config": {
| "resultTable": "T1654611700631",
| "url": "localhost:8030",
| "user": "root",
| "password": "",
| "sourceDatabase": "test",
| "sourceTable": "test"
| }
| }
| ],
| "sinks": [
| {
| "name": "file",
| "type": "sink",
| "config": {
| "sourceTable": "T1654611700631",
| "path": "file://{filePath}/json",
| "saveMode": "overwrite",
| "serializer": "json"
| }
| }
| ]
|}
|""".stripMargin

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.spark.datacalc.sink;

import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.Pattern;

public class DorisSinkConfig extends SinkConfig {

@NotBlank private String url;

@NotBlank private String user;

private String password;

@NotBlank private String targetDatabase;

@NotBlank private String targetTable;

@NotBlank
@Pattern(
regexp = "^(overwrite|append|ignore|error|errorifexists)$",
message =
"Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
private String saveMode = "overwrite";

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getTargetDatabase() {
return targetDatabase;
}

public void setTargetDatabase(String targetDatabase) {
this.targetDatabase = targetDatabase;
}

public String getTargetTable() {
return targetTable;
}

public void setTargetTable(String targetTable) {
this.targetTable = targetTable;
}

public String getSaveMode() {
return saveMode;
}

public void setSaveMode(String saveMode) {
this.saveMode = saveMode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.spark.datacalc.source;

import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;

import javax.validation.constraints.NotBlank;

public class DorisSourceConfig extends SourceConfig {

@NotBlank private String url;

@NotBlank private String user;
private String password;

@NotBlank private String sourceDatabase;

@NotBlank private String sourceTable;

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getSourceDatabase() {
return sourceDatabase;
}

public void setSourceDatabase(String sourceDatabase) {
this.sourceDatabase = sourceDatabase;
}

public String getSourceTable() {
return sourceTable;
}

public void setSourceTable(String sourceTable) {
this.sourceTable = sourceTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private static Map<String, Class<?>> getSourcePlugins() {
classMap.put("solr", SolrSource.class);
classMap.put("kafka", KafkaSource.class);
classMap.put("starrocks", StarrocksSource.class);
classMap.put("doris", DorisSource.class);
return classMap;
}

Expand All @@ -75,6 +76,7 @@ private static Map<String, Class<?>> getSinkPlugins() {
classMap.put("solr", SolrSink.class);
classMap.put("kafka", KafkaSink.class);
classMap.put("starrocks", StarrocksSink.class);
classMap.put("doris", DorisSink.class);
return classMap;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.spark.datacalc.sink

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.JavaConverters._

class DorisSink extends DataCalcSink[DorisSinkConfig] with Logging {

def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
var options = Map(
"doris.fenodes" -> config.getUrl,
"user" -> config.getUser,
"password" -> config.getPassword,
"doris.table.identifier" -> String.format(
"%s.%s",
config.getTargetDatabase,
config.getTargetTable
)
)

if (config.getOptions != null && !config.getOptions.isEmpty) {
options = config.getOptions.asScala.toMap ++ options
}

val writer = ds.write.format("doris")
if (StringUtils.isNotBlank(config.getSaveMode)) {
writer.mode(config.getSaveMode)
}

logger.info(
s"Save data from doris url: ${config.getUrl}, targetDatabase: ${config.getTargetDatabase}, targetTable: ${config.getTargetTable}"
)
writer.options(options).save()
}

}
Loading

0 comments on commit 9823130

Please sign in to comment.