Skip to content

Commit

Permalink
代码格式化
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Oct 30, 2024
1 parent 58275c9 commit 7057bb5
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
import org.slf4j.LoggerFactory;

/**
* Log retrieval logic:
* 1. LogRetriever polls to obtain real-time logs, and if the task is completed, it retrieves persistent logs
* 2. Organized by org.apache.inkis.cli.application. interactor.job. com LogRetriever # sendLogFin decides whether to continue polling logs
* 3. getNextLogLine is the FromLine returned by the log interface
* 4. The return of persistent logs is OpenLogResult2
* Log retrieval logic: 1. LogRetriever polls to obtain real-time logs, and if the task is
* completed, it retrieves persistent logs 2. Organized by org.apache.inkis.cli.application.
* interactor.job. com LogRetriever # sendLogFin decides whether to continue polling logs 3.
* getNextLogLine is the FromLine returned by the log interface 4. The return of persistent logs is
* OpenLogResult2
*/
public class LogRetriever {
private static final Logger logger = LoggerFactory.getLogger(LogRetriever.class);
Expand Down Expand Up @@ -137,7 +137,8 @@ public void queryLogLoop(LogData data) {
if (curLogIdx >= nextLogIdx) {
String msg =
MessageFormat.format(
"Retrieving log, curLogIdx={}, hasNext={0}, nextLogIdx={1}", curLogIdx, hasNext, nextLogIdx);
"Retrieving log, curLogIdx={}, hasNext={0}, nextLogIdx={1}",
curLogIdx, hasNext, nextLogIdx);
logger.info(msg);
}
CliUtils.doSleepQuietly(CliConstants.JOB_QUERY_SLEEP_MILLS);
Expand All @@ -152,16 +153,16 @@ public void queryLogLoop(LogData data) {
private void queryJobLogFromLine(LogData data, int fromLine) throws LinkisClientRuntimeException {

LinkisOperResultAdapter jobInfoResult =
linkisJobOperator.queryJobInfo(data.getUser(), data.getJobID());
linkisJobOperator.queryJobInfo(data.getUser(), data.getJobID());
data.updateLog(jobInfoResult);
if (!jobInfoResult.getJobStatus().isJobFinishedState()) {
data.updateLog(
linkisJobOperator.queryRunTimeLogFromLine(
data.getUser(), data.getJobID(), data.getExecID(), fromLine));
linkisJobOperator.queryRunTimeLogFromLine(
data.getUser(), data.getJobID(), data.getExecID(), fromLine));
} else {
data.updateLog(
linkisJobOperator.queryPersistedLogFromLine(
data.getLogPath(), data.getUser(), data.getJobID(), fromLine));
linkisJobOperator.queryPersistedLogFromLine(
data.getLogPath(), data.getUser(), data.getJobID(), fromLine));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ public JobResult run() {

// get log while running
LogRetriever logRetriever =
new LogRetriever(
jobInfoResult.getUser(),
jobInfoResult.getJobID(),
submitResult.getStrongerExecId(),
true,
oper,
new LogPresenter());
new LogRetriever(
jobInfoResult.getUser(),
jobInfoResult.getJobID(),
submitResult.getStrongerExecId(),
true,
oper,
new LogPresenter());
// async because we need to query job status
logRetriever.retrieveLogAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,35 @@

package org.apache.linkis.engineconn.computation.executor.hook

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.common.conf.EngineConnConf
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.common.hook.EngineConnHook
import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext}
import org.apache.linkis.engineconn.computation.executor.execute.{
ComputationExecutor,
EngineExecutionContext
}
import org.apache.linkis.engineconn.core.engineconn.EngineConnManager
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.hadoop.common.conf.HadoopConf
import org.apache.linkis.hadoop.common.utils.HDFSUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, RunType}
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
import org.apache.linkis.rpc.Sender
import org.apache.linkis.udf.UDFClientConfiguration
import org.apache.linkis.udf.api.rpc.{RequestPythonModuleProtocol, ResponsePythonModuleProtocol}
import org.apache.linkis.udf.entity.PythonModuleInfoVO

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import java.util
import java.util.{Collections, Comparator}

import scala.collection.JavaConverters._
import scala.collection.mutable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,25 @@ import org.apache.linkis.engineconn.acessible.executor.service.LockService
import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant}
import org.apache.linkis.engineconn.computation.executor.async.AsyncConcurrentComputationExecutor
import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import org.apache.linkis.engineconn.computation.executor.entity.{CommonEngineConnTask, EngineConnTask}
import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, ConcurrentComputationExecutor}
import org.apache.linkis.engineconn.computation.executor.entity.{
CommonEngineConnTask,
EngineConnTask
}
import org.apache.linkis.engineconn.computation.executor.execute.{
ComputationExecutor,
ConcurrentComputationExecutor
}
import org.apache.linkis.engineconn.computation.executor.hook.ExecutorLabelsRestHook
import org.apache.linkis.engineconn.computation.executor.listener.{ResultSetListener, TaskProgressListener, TaskStatusListener}
import org.apache.linkis.engineconn.computation.executor.listener.{
ResultSetListener,
TaskProgressListener,
TaskStatusListener
}
import org.apache.linkis.engineconn.computation.executor.upstream.event.TaskStatusChangedForUpstreamMonitorEvent
import org.apache.linkis.engineconn.computation.executor.utlis.{ComputationEngineConstant, ComputationEngineUtils}
import org.apache.linkis.engineconn.computation.executor.utlis.{
ComputationEngineConstant,
ComputationEngineUtils
}
import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
Expand All @@ -40,34 +53,50 @@ import org.apache.linkis.engineconn.executor.listener.event.EngineConnSyncEvent
import org.apache.linkis.engineconn.launch.EngineConnServer
import org.apache.linkis.governance.common.constant.ec.ECConstants
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.governance.common.exception.engineconn.{EngineConnExecutorErrorCode, EngineConnExecutorErrorException}
import org.apache.linkis.governance.common.exception.engineconn.{
EngineConnExecutorErrorCode,
EngineConnExecutorErrorException
}
import org.apache.linkis.governance.common.protocol.task._
import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
import org.apache.linkis.hadoop.common.utils.KerberosUtils
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.protocol.resource.{ResponseTaskRunningInfo, ResponseTaskYarnResource}
import org.apache.linkis.manager.common.protocol.resource.{
ResponseTaskRunningInfo,
ResponseTaskYarnResource
}
import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.message.RequestProtocol
import org.apache.linkis.rpc.Sender
import org.apache.linkis.rpc.message.annotation.Receiver
import org.apache.linkis.rpc.utils.RPCUtils
import org.apache.linkis.scheduler.executer.{ErrorExecuteResponse, ExecuteResponse, IncompleteExecuteResponse, SubmitResponse}
import org.apache.linkis.scheduler.executer.{
ErrorExecuteResponse,
ExecuteResponse,
IncompleteExecuteResponse,
SubmitResponse
}
import org.apache.linkis.server.BDPJettyServerHelper

import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component

import javax.annotation.PostConstruct

import java.util
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContextExecutorService

import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.linkis.manager.label.utils.LabelUtil

@Component
class TaskExecutionServiceImpl
Expand Down Expand Up @@ -199,9 +228,9 @@ class TaskExecutionServiceImpl
// only sql can use udf check
val udfNames: String = ComputationExecutorConf.SPECIAL_UDF_NAMES.getValue
if (
ComputationExecutorConf.SPECIAL_UDF_CHECK_ENABLED.getValue && StringUtils.isNotBlank(
udfNames
)
ComputationExecutorConf.SPECIAL_UDF_CHECK_ENABLED.getValue && StringUtils.isNotBlank(
udfNames
)
) {
System.getProperties.put(ComputationExecutorConf.ONLY_SQL_USE_UDF_KEY, udfNames)
val codeType: String = LabelUtil.getCodeType(requestTask.getLabels)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.linkis.engineplugin.spark.factory
import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.utils.{JsonUtils, Logging, Utils}
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import org.apache.linkis.engineconn.launch.EngineConnServer
import org.apache.linkis.engineplugin.spark.client.context.{ExecutionContext, SparkConfig}
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
Expand All @@ -28,19 +29,25 @@ import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration._
import org.apache.linkis.engineplugin.spark.context.{EnvironmentContext, SparkEngineConnContext}
import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession
import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
import org.apache.linkis.engineplugin.spark.exception.{SparkCreateFileException, SparkSessionNullException}
import org.apache.linkis.engineplugin.spark.exception.{
SparkCreateFileException,
SparkSessionNullException
}
import org.apache.linkis.engineplugin.spark.extension.SparkUDFCheckRule
import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration
import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory, MultiExecutorEngineConnFactory}
import org.apache.linkis.manager.engineplugin.common.creation.{
ExecutorFactory,
MultiExecutorEngineConnFactory
}
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment.variable
import org.apache.linkis.manager.label.entity.engine.EngineType
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
import org.apache.linkis.server.JMap

import org.apache.commons.lang3.StringUtils
import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import org.apache.linkis.engineplugin.spark.extension.SparkUDFCheckRule
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.util.SparkUtils

import java.io.File
Expand Down

0 comments on commit 7057bb5

Please sign in to comment.