diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala index 23d6ff2586..6b4fc64fe6 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala @@ -631,7 +631,7 @@ class TaskExecutionServiceImpl if (null != task) { sendToEntrance(task, ResponseTaskLog(logUpdateEvent.taskId, logUpdateEvent.log)) } else { - logger.error("Task cannot null! logupdateEvent: " + logUpdateEvent.taskId) + logger.warn("Task cannot null! logupdateEvent: " + logUpdateEvent.taskId) } } else if (null != lastTask) { val executor = executorManager.getReportExecutor @@ -733,7 +733,7 @@ class TaskExecutionServiceImpl if (null != executor) { executor.getTaskById(taskId) } else { - logger.error(s"Executor of taskId : $taskId is not cached.") + logger.warn(s"Executor of taskId : $taskId is not cached.") null } } diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java index 6930a43f05..a5ac102033 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java @@ -91,6 +91,15 @@ public ClusterDescriptorAdapter(ExecutionContext executionContext) { /** Returns the status of the flink job. */ public JobStatus getJobStatus() throws JobExecutionException { + if (jobId == null) { + try { + LOG.info("flink getJobStatus jobId is null,sleep three seconds"); + Thread.sleep(3000); + } catch (InterruptedException e) { + + } + } + LOG.info("flink getJobStatus jobId:{}", jobId); if (jobId == null) { throw new JobExecutionException(NO_JOB_SUBMITTED.getErrorDesc()); } diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala index 2e3a2bd07a..60f1d9088b 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala @@ -236,6 +236,11 @@ class FlinkSQLComputationExecutor( super.close() } + override def tryShutdown(): Boolean = { + Utils.tryAndWarn(close()) + super.tryShutdown() + } + } class FlinkSQLStatusListener( diff --git a/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml index 3e790b6dad..c75bf80fb3 100644 --- a/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml +++ b/linkis-engineconn-plugins/repl/src/main/resources/log4j2.xml @@ -47,7 +47,7 @@ - +