Skip to content

Commit

Permalink
Doris ec supports dataSources
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Dec 28, 2023
1 parent 292cd90 commit 69efc46
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public class DorisConfiguration {
public static final CommonVars<String> DORIS_PASSWORD =
CommonVars.apply("linkis.ec.doris.password", "");

public static final CommonVars<String> DORIS_DATASOURCE =
CommonVars.apply("linkis.ec.doris.datasource", "");

public static final CommonVars<String> DORIS_DATASOURCE_SYSTEM_QUERY_PARAM =
CommonVars.apply("linkis.ec.doris.datasource.systemQueryParam", "");

public static final CommonVars<Boolean> DORIS_RECONNECT_ENABLED =
CommonVars.apply("linkis.ec.doris.2pc.enabled", false, "two phase commit Whether to enable");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,18 @@ public class DorisConstant {
public static final String TXN_OPERATION = "txn_operation";

public static final Integer HTTP_SUCCEED = 200;

public static final String DS_JDBC_HOST = "host";

public static final String DS_JDBC_PORT = "port";

public static final String DS_JDBC_DB_NAME = "databaseName";

public static final String DS_JDBC_DB_INSTANCE = "instance";

public static final String DS_JDBC_USERNAME = "username";

public static final String DS_JDBC_PASSWORD = "password";

public static final String DS_JDBC_PASSWORD_HIDE_VALUE = "******";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineplugin.doris.executor;

import org.apache.linkis.common.utils.JsonUtils;
import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient;
import org.apache.linkis.datasource.client.request.GetInfoPublishedByDataSourceNameAction;
import org.apache.linkis.datasourcemanager.common.domain.DataSource;
import org.apache.linkis.engineplugin.doris.constant.DorisConstant;

import org.apache.commons.collections.MapUtils;

import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DorisDatasourceParser {
private static final Logger logger = LoggerFactory.getLogger(DorisDatasourceParser.class);

public static Map<String, String> queryDatasourceInfoByName(
String datasourceName, String username, String system) {
logger.info(
"Starting query ["
+ system
+ ", "
+ username
+ ", "
+ datasourceName
+ "] datasource info ......");
LinkisDataSourceRemoteClient dataSourceClient = new LinkisDataSourceRemoteClient();
DataSource dataSource = null;

dataSource =
dataSourceClient
.getInfoPublishedByDataSourceName(
GetInfoPublishedByDataSourceNameAction.builder()
.setSystem(system)
.setDataSourceName(datasourceName)
.setUser(username)
.build())
.getDataSource();

return queryDatasourceParam(datasourceName, dataSource);
}

private static Map<String, String> queryDatasourceParam(
String datasourceName, DataSource dataSource) {
Map<String, String> paramMap = new HashMap<>();

if (dataSource == null) {
logger.warn("Doris dataSource is null: {}", datasourceName);
return paramMap;
}

if (dataSource.isExpire()) {
logger.warn("Doris dataSource of datasource name: {} is expired", datasourceName);
return paramMap;
}

Map<String, Object> connectParams = dataSource.getConnectParams();
if (MapUtils.isEmpty(connectParams)) {
logger.warn("Doris dataSource connectParams is empty: {}", datasourceName);
return paramMap;
}

paramMap.put(
DorisConstant.DS_JDBC_HOST,
String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_HOST, "")));
paramMap.put(
DorisConstant.DS_JDBC_PORT,
String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_PORT, "")));
paramMap.put(
DorisConstant.DS_JDBC_USERNAME,
String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_USERNAME, "")));
paramMap.put(
DorisConstant.DS_JDBC_PASSWORD,
String.valueOf(connectParams.getOrDefault(DorisConstant.DS_JDBC_PASSWORD, "")));
paramMap.put(
DorisConstant.DS_JDBC_DB_NAME,
String.valueOf(
connectParams.getOrDefault(
DorisConstant.DS_JDBC_DB_NAME,
connectParams.getOrDefault(DorisConstant.DS_JDBC_DB_INSTANCE, ""))));

try {
HashMap<String, String> printMap = new HashMap<>();
printMap.putAll(paramMap);

// To hide the password and prevent leaks
if (printMap.containsKey(DorisConstant.DS_JDBC_PASSWORD)) {
printMap.put(DorisConstant.DS_JDBC_PASSWORD, DorisConstant.DS_JDBC_PASSWORD_HIDE_VALUE);
}
String printMapString = JsonUtils.jackson().writeValueAsString(printMap);
logger.info("Load dataSource param: {}", printMapString);
} catch (JsonProcessingException e) {

}

return paramMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.engineplugin.doris.conf.DorisConfiguration;
import org.apache.linkis.engineplugin.doris.conf.DorisEngineConf;
import org.apache.linkis.engineplugin.doris.constant.DorisConstant;
import org.apache.linkis.engineplugin.doris.errorcode.DorisErrorCodeSummary;
import org.apache.linkis.engineplugin.doris.exception.DorisException;
import org.apache.linkis.engineplugin.doris.exception.DorisParameterException;
Expand Down Expand Up @@ -109,12 +110,17 @@ public class DorisEngineConnExecutor extends ConcurrentComputationExecutor {

private String dorisHost;
private String dorisDatabase;

private String datasourceDatabase;
private String dorisTable;
private String dorisUsername;
private String dorisPassword;

private String dorisStreamLoadFilePath;
private Integer dorisHttpPort;

private Integer dorisJdbcPort;

private CloseableHttpClient client;

public DorisEngineConnExecutor(int outputPrintLimit, int id) {
Expand Down Expand Up @@ -157,8 +163,6 @@ public ExecuteResponse execute(EngineConnTask engineConnTask) {
.forEach(entry -> configMap.put(entry.getKey(), String.valueOf(entry.getValue())));
}

checkParameter();

this.client =
HttpClients.custom()
.setRedirectStrategy(
Expand All @@ -174,6 +178,8 @@ protected boolean isRedirectable(String method) {

@Override
public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
checkParameter(engineExecutorContext);

String realCode;
if (StringUtils.isBlank(code)) {
throw new DorisException(
Expand All @@ -184,7 +190,7 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext,
}
logger.info("Doris engine begins to run code:\n {}", realCode);

checkRequiredParameter(realCode);
checkRequiredParameter(code);

String testConnectionUrl = String.format(DORIS_URL_BOOTSTRAP, dorisHost, dorisHttpPort);

Expand Down Expand Up @@ -292,7 +298,6 @@ private CloseableHttpResponse streamLoad(EngineExecutionContext engineExecutorCo
String dorisColumns = DorisConfiguration.DORIS_COLUMNS.getValue(configMap);

if (StringUtils.isBlank(dorisColumns)) {
Integer dorisJdbcPort = DorisConfiguration.DORIS_JDBC_PORT.getValue(configMap);
List<String> dorisCloumns =
DorisUtils.getDorisCloumns(
dorisHost, dorisJdbcPort, dorisUsername, dorisPassword, dorisDatabase, dorisTable);
Expand Down Expand Up @@ -385,24 +390,90 @@ private boolean isSupportedType(String fileExtension) {
return false;
}

private void checkParameter() {
String dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap);
String dorisUsername = DorisConfiguration.DORIS_USER_NAME.getValue(configMap);
Integer dorisHttpPort = DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap);
private void checkParameter(EngineExecutionContext engineExecutorContext) {
this.dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap);
this.dorisUsername = DorisConfiguration.DORIS_USER_NAME.getValue(configMap);
this.dorisHttpPort = DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap);
this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap);
this.dorisJdbcPort = DorisConfiguration.DORIS_JDBC_PORT.getValue(configMap);

if (StringUtils.isBlank(dorisHost)
|| StringUtils.isBlank(dorisUsername)
|| dorisHttpPort == null) {
String dorisDatasourceName = DorisConfiguration.DORIS_DATASOURCE.getValue(configMap);

// Data source parameters fail to be obtained, and task running cannot be affected
// The datasource param overwrites the DorisConfiguration param
try {
if (StringUtils.isNotBlank(dorisDatasourceName)) {
String dorisSystemQueryParam =
DorisConfiguration.DORIS_DATASOURCE_SYSTEM_QUERY_PARAM.getValue(configMap);
String execSqlUser = getExecSqlUser(engineExecutorContext);

Map<String, String> dataSourceParamMap =
DorisDatasourceParser.queryDatasourceInfoByName(
dorisDatasourceName, execSqlUser, dorisSystemQueryParam);

if (MapUtils.isNotEmpty(dataSourceParamMap)) {
if (dataSourceParamMap.containsKey(DS_JDBC_HOST)
&& StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_HOST))) {
this.dorisHost = dataSourceParamMap.get(DS_JDBC_HOST);
}

if (dataSourceParamMap.containsKey(DS_JDBC_USERNAME)
&& StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_USERNAME))) {
this.dorisUsername = dataSourceParamMap.get(DS_JDBC_USERNAME);
}

if (dataSourceParamMap.containsKey(DS_JDBC_PASSWORD)) {
this.dorisPassword = dataSourceParamMap.get(DS_JDBC_PASSWORD);
}

if (dataSourceParamMap.containsKey(DS_JDBC_PORT)
&& StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_PORT))) {
this.dorisJdbcPort = Integer.valueOf(dataSourceParamMap.get(DS_JDBC_PORT));
}

if (dataSourceParamMap.containsKey(DS_JDBC_DB_NAME)
&& StringUtils.isNotBlank(dataSourceParamMap.get(DS_JDBC_DB_NAME))) {
this.datasourceDatabase = dataSourceParamMap.get(DS_JDBC_DB_NAME);
}
} else {
logger.warn(
"Doris dataSource {} param is null, Skip get doris dataSource parameters",
dorisDatasourceName);
}
}
} catch (Exception e) {
logger.error("get doris dataSource {} param failed", dorisDatasourceName, e);
}

if (StringUtils.isBlank(this.dorisHost)
|| StringUtils.isBlank(this.dorisUsername)
|| this.dorisHttpPort == null) {
logger.error("Doris check param failed.");
throw new DorisParameterException(
DorisErrorCodeSummary.CHECK_DORIS_PARAMETER_FAILED.getErrorCode(),
DorisErrorCodeSummary.CHECK_DORIS_PARAMETER_FAILED.getErrorDesc());
}

this.dorisHost = dorisHost;
this.dorisUsername = dorisUsername;
this.dorisHttpPort = dorisHttpPort;
this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap);
logger.info(
"Doris parameter dorisHost: {}, dorisUsername: {}, dorisPassword: {}, dorisHttpPort: {}, dorisJdbcPort: {}.",
this.dorisHost,
this.dorisUsername,
DorisConstant.DS_JDBC_PASSWORD_HIDE_VALUE,
this.dorisHttpPort,
this.dorisJdbcPort);
}

private String getExecSqlUser(EngineExecutionContext engineExecutionContext) {
UserCreatorLabel userCreatorLabel =
(UserCreatorLabel)
Arrays.stream(engineExecutionContext.getLabels())
.filter(label -> label instanceof UserCreatorLabel)
.findFirst()
.orElse(null);
if (userCreatorLabel != null) {
return userCreatorLabel.getUser();
}
return null;
}

private void checkRequiredParameter(String code) {
Expand All @@ -417,14 +488,20 @@ private void checkRequiredParameter(String code) {
DorisErrorCodeSummary.DORIS_CODE_FAILED_TO_CONVERT_JSON.getErrorDesc());
}

String dorisStreamLoadFilePath =
this.dorisStreamLoadFilePath =
codeMap.getOrDefault(DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(), "");
String dorisDatabase = codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), "");
String dorisTable = codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), "");
this.dorisTable = codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), "");
this.dorisDatabase = codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), "");

String dorisDatasourceName = DorisConfiguration.DORIS_DATASOURCE.getValue(configMap);
// The datasource param overwrites the DorisConfiguration param
if (StringUtils.isNotBlank(dorisDatasourceName) && StringUtils.isNotBlank(datasourceDatabase)) {
this.dorisDatabase = datasourceDatabase;
}

if (StringUtils.isBlank(dorisStreamLoadFilePath)
|| StringUtils.isBlank(dorisDatabase)
|| StringUtils.isBlank(dorisTable)) {
if (StringUtils.isBlank(this.dorisStreamLoadFilePath)
|| StringUtils.isBlank(this.dorisDatabase)
|| StringUtils.isBlank(this.dorisTable)) {
logger.error(
"Check whether `{}`, `{}`, and `{}` are included in code json",
DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(),
Expand All @@ -435,9 +512,6 @@ private void checkRequiredParameter(String code) {
DorisErrorCodeSummary.DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK.getErrorDesc());
}

this.dorisStreamLoadFilePath = dorisStreamLoadFilePath;
this.dorisDatabase = dorisDatabase;
this.dorisTable = dorisTable;
logger.info(
"Doris parameter dorisStreamLoadFilePath: {}, dorisDatabase: {}, dorisTable: {}.",
this.dorisStreamLoadFilePath,
Expand Down

0 comments on commit 69efc46

Please sign in to comment.