Skip to content

Commit

Permalink
Optimized code
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Dec 25, 2023
1 parent c381145 commit 8a78929
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class LabelCommonConfig {
CommonVars.apply("wds.linkis.nebula.engine.version", "3.0.0");

public static final CommonVars<String> DORIS_ENGINE_VERSION =
CommonVars.apply("wds.linkis.doris.engine.version", "1.2.6");
CommonVars.apply("linkis.doris.engine.version", "1.2.6");

public static final CommonVars<String> PRESTO_ENGINE_VERSION =
CommonVars.apply("wds.linkis.presto.engine.version", "0.234");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public class DorisConfiguration {

public static final CommonVars<Integer> ENGINE_CONCURRENT_LIMIT =
CommonVars.apply("linkis.engineconn.concurrent.limit", 100);
CommonVars.apply("linkis.engineconn.doris.concurrent.limit", 100);

public static final CommonVars<Integer> ENGINE_DEFAULT_LIMIT =
CommonVars.apply("linkis.doris.default.limit", 5000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@
public enum DorisErrorCodeSummary implements LinkisErrorCode {
CHECK_DORIS_PARAMETER_FAILED(28501, "Failed to check the doris parameter(doris参数检查失败)"),
DORIS_TEST_CONNECTION_FAILED(28502, "The doris test connection failed(doris测试连接失败)"),

DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK(
28503, "The doris stream load file path cannot be empty(doris stream load file path不能为空)"),
DORIS_STREAM_LOAD_FILE_PATH_NOT_FILE(
28504, "The doris stream load file path must be a file(doris stream load file path必须是一个文件)"),
28503, "The doris stream load file path must be a file(doris stream load file path必须是一个文件)"),
DORIS_CODE_IS_NOT_BLANK(28504, "Doris engine code cannot be empty(Doris引擎代码不能为空)"),

DORIS_CODE_FAILED_TO_CONVERT_JSON(
28505, "Doris code Failed to convert json(Doris code 转换json失败)"),

DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK(
28506, "Doris required Parameter cannot be empty(Doris必填参数不能为空)"),

DORIS_STREAM_LOAD_FILE_PATH_NOT_SUPPORTED_TYPE_FILE(
28505,
28507,
"The doris stream load file path This file type is not currently supported(doris stream load file path目前不支持该文件类型)");

private final int errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public class DorisEngineConnExecutor extends ConcurrentComputationExecutor {
private String dorisTable;
private String dorisUsername;
private String dorisPassword;

private String dorisStreamLoadFilePath;
private Integer dorisHttpPort;
private CloseableHttpClient client;

Expand Down Expand Up @@ -172,6 +174,18 @@ protected boolean isRedirectable(String method) {

@Override
public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) {
String realCode;
if (StringUtils.isBlank(code)) {
throw new DorisException(
DorisErrorCodeSummary.DORIS_CODE_IS_NOT_BLANK.getErrorCode(),
DorisErrorCodeSummary.DORIS_CODE_IS_NOT_BLANK.getErrorDesc());
} else {
realCode = code.trim();
}
logger.info("Doris engine begins to run code:\n {}", realCode);

checkRequiredParameter(code);

String testConnectionUrl = String.format(DORIS_URL_BOOTSTRAP, dorisHost, dorisHttpPort);

if (!testConnection(testConnectionUrl)) {
Expand Down Expand Up @@ -283,8 +297,6 @@ private CloseableHttpResponse streamLoad(EngineExecutionContext engineExecutorCo
DorisUtils.getDorisCloumns(
dorisHost, dorisJdbcPort, dorisUsername, dorisPassword, dorisDatabase, dorisTable);
if (org.apache.commons.collections.CollectionUtils.isNotEmpty(dorisCloumns)) {
// httpPut.setHeader(COLUMNS, String.join(",", dorisCloumns.stream().map(f ->
// String.format("`%s`", f)).collect(Collectors.toList())));
dorisColumns =
String.join(
",",
Expand All @@ -309,15 +321,6 @@ private CloseableHttpResponse streamLoad(EngineExecutionContext engineExecutorCo
httpPut.setHeader(LABEL, dorisLabel);
logger.info("doris set param {} : {}", LABEL, dorisLabel);

String dorisStreamLoadFilePath =
DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.getValue(configMap);

if (StringUtils.isBlank(dorisStreamLoadFilePath)) {
throw new DorisStreamLoadFileException(
DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK.getErrorCode(),
DorisErrorCodeSummary.DORIS_STREAM_LOAD_FILE_PATH_NOT_BLANK.getErrorDesc());
}

File dorisStreamLoadFile = new File(dorisStreamLoadFilePath);
if (!dorisStreamLoadFile.isFile()) {
throw new DorisStreamLoadFileException(
Expand Down Expand Up @@ -384,14 +387,10 @@ private boolean isSupportedType(String fileExtension) {

private void checkParameter() {
String dorisHost = DorisConfiguration.DORIS_HOST.getValue(configMap);
String dorisDatabase = DorisConfiguration.DORIS_DATABASE.getValue(configMap);
String dorisTable = DorisConfiguration.DORIS_TABLE.getValue(configMap);
String dorisUsername = DorisConfiguration.DORIS_USER_NAME.getValue(configMap);
Integer dorisHttpPort = DorisConfiguration.DORIS_HTTP_PORT.getValue(configMap);

if (StringUtils.isBlank(dorisHost)
|| StringUtils.isBlank(dorisDatabase)
|| StringUtils.isBlank(dorisTable)
|| StringUtils.isBlank(dorisUsername)
|| dorisHttpPort == null) {
logger.error("Doris check param failed.");
Expand All @@ -401,13 +400,51 @@ private void checkParameter() {
}

this.dorisHost = dorisHost;
this.dorisDatabase = dorisDatabase;
this.dorisTable = dorisTable;
this.dorisUsername = dorisUsername;
this.dorisHttpPort = dorisHttpPort;
this.dorisPassword = DorisConfiguration.DORIS_PASSWORD.getValue(configMap);
}

private void checkRequiredParameter(String code) {
Map<String, String> codeMap = new HashMap<>();

try {
codeMap =
JsonUtils.jackson().readValue(code, new TypeReference<HashMap<String, String>>() {});
} catch (JsonProcessingException e) {
throw new DorisException(
DorisErrorCodeSummary.DORIS_CODE_FAILED_TO_CONVERT_JSON.getErrorCode(),
DorisErrorCodeSummary.DORIS_CODE_FAILED_TO_CONVERT_JSON.getErrorDesc());
}

String dorisStreamLoadFilePath =
codeMap.getOrDefault(DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(), "");
String dorisDatabase = codeMap.getOrDefault(DorisConfiguration.DORIS_DATABASE.key(), "");
String dorisTable = codeMap.getOrDefault(DorisConfiguration.DORIS_TABLE.key(), "");

if (StringUtils.isBlank(dorisStreamLoadFilePath)
|| StringUtils.isBlank(dorisDatabase)
|| StringUtils.isBlank(dorisTable)) {
logger.error(
"Check whether `{}`, `{}`, and `{}` are included in code json",
DorisConfiguration.DORIS_STREAM_LOAD_FILE_PATH.key(),
DorisConfiguration.DORIS_DATABASE.key(),
DorisConfiguration.DORIS_TABLE.key());
throw new DorisException(
DorisErrorCodeSummary.DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK.getErrorCode(),
DorisErrorCodeSummary.DORIS_REQUIRED_PARAMETER_IS_NOT_BLANK.getErrorDesc());
}

this.dorisStreamLoadFilePath = dorisStreamLoadFilePath;
this.dorisDatabase = dorisDatabase;
this.dorisTable = dorisTable;
logger.info(
"Doris parameter dorisStreamLoadFilePath: {}, dorisDatabase: {}, dorisTable: {}.",
this.dorisStreamLoadFilePath,
this.dorisDatabase,
this.dorisTable);
}

private boolean isSuccess(Map<String, String> map) {
if (org.apache.commons.collections.MapUtils.isEmpty(map)) {
return false;
Expand Down
24 changes: 14 additions & 10 deletions linkis-engineconn-plugins/doris/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<configuration status="error" monitorInterval="30">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
</Console>
<RollingFile name="RollingFile" append="true" fileName="${env:LOG_DIRS:-logs}/stdout"
filePattern="${env:LOG_DIRS}/$${date:yyyy-MM}/linkis-log-%d{yyyy-MM-dd-hh}-%i.log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] [JobId-%X{jobId}] - %msg%xEx%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>

<Send name="Send" >
<Filters>
Expand All @@ -41,9 +45,9 @@
</appenders>

<loggers>
<root level="INFO">
<root level="INFO">
<appender-ref ref="stderr"/>
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
<appender-ref ref="Send"/>
</root>
<logger name="org.apache.hadoop.hive.ql.exec.StatsTask" level="info" additivity="true">
Expand Down Expand Up @@ -87,5 +91,5 @@
<appender-ref ref="Send"/>
</logger>

</loggers>
</loggers>
</configuration>

0 comments on commit 8a78929

Please sign in to comment.