Skip to content

Commit

Permalink
Spark etl supports doris
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Dec 26, 2023
1 parent adb7f8e commit dac4601
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 0 deletions.
6 changes: 6 additions & 0 deletions linkis-engineconn-plugins/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,12 @@
<artifactId>kubernetes-model-core</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.2_2.12</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.spark.datacalc.sink;

import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.Pattern;

public class DorisSinkConfig extends SinkConfig {

@NotBlank private String url;

@NotBlank private String user;

private String password;

@NotBlank private String targetDatabase;

@NotBlank private String targetTable;

@NotBlank
@Pattern(
regexp = "^(overwrite|append|ignore|error|errorifexists)$",
message =
"Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
private String saveMode = "overwrite";

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getTargetDatabase() {
return targetDatabase;
}

public void setTargetDatabase(String targetDatabase) {
this.targetDatabase = targetDatabase;
}

public String getTargetTable() {
return targetTable;
}

public void setTargetTable(String targetTable) {
this.targetTable = targetTable;
}

public String getSaveMode() {
return saveMode;
}

public void setSaveMode(String saveMode) {
this.saveMode = saveMode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.spark.datacalc.source;

import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;

import javax.validation.constraints.NotBlank;

public class DorisSourceConfig extends SourceConfig {

@NotBlank private String url;

@NotBlank private String user;
private String password;

@NotBlank private String sourceDatabase;

@NotBlank private String sourceTable;

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getSourceDatabase() {
return sourceDatabase;
}

public void setSourceDatabase(String sourceDatabase) {
this.sourceDatabase = sourceDatabase;
}

public String getSourceTable() {
return sourceTable;
}

public void setSourceTable(String sourceTable) {
this.sourceTable = sourceTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private static Map<String, Class<?>> getSourcePlugins() {
classMap.put("solr", SolrSource.class);
classMap.put("kafka", KafkaSource.class);
classMap.put("starrocks", StarrocksSource.class);
classMap.put("doris", DorisSource.class);
return classMap;
}

Expand All @@ -75,6 +76,7 @@ private static Map<String, Class<?>> getSinkPlugins() {
classMap.put("solr", SolrSink.class);
classMap.put("kafka", KafkaSink.class);
classMap.put("starrocks", StarrocksSink.class);
classMap.put("doris", DorisSink.class);
return classMap;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.spark.datacalc.sink

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.JavaConverters._

class DorisSink extends DataCalcSink[DorisSinkConfig] with Logging {

def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
var options = Map(
"doris.fenodes" -> config.getUrl,
"user" -> config.getUser,
"password" -> config.getPassword,
"doris.table.identifier" -> String.format(
"%s.%s",
config.getTargetDatabase,
config.getTargetTable
)
)

if (config.getOptions != null && !config.getOptions.isEmpty) {
options = config.getOptions.asScala.toMap ++ options
}

val writer = ds.write.format("doris")
if (StringUtils.isNotBlank(config.getSaveMode)) {
writer.mode(config.getSaveMode)
}

logger.info(
s"Save data from doris url: ${config.getUrl}, targetDatabase: ${config.getTargetDatabase}, targetTable: ${config.getTargetTable}"
)
writer.options(options).save()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.spark.datacalc.source

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource

import org.apache.spark.sql.{Dataset, Row, SparkSession}

class DorisSource extends DataCalcSource[DorisSourceConfig] with Logging {

override def getData(spark: SparkSession): Dataset[Row] = {
val reader = spark.read.format("doris")

if (config.getOptions != null && !config.getOptions.isEmpty) {
reader.options(config.getOptions)
}

logger.info(
s"Load data from Doris url: ${config.getUrl}, sourceDatabase: ${config.getSourceDatabase}, sourceTable: ${config.getSourceTable}"
)

reader
.option(
"doris.table.identifier",
String.format("%s.%s", config.getSourceDatabase, config.getSourceTable)
)
.option("doris.fenodes", config.getUrl)
.option("user", config.getUser)
.option("password", config.getPassword)
.load()
}

}
Loading

0 comments on commit dac4601

Please sign in to comment.