diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java index e76eae44..74884545 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java @@ -317,13 +317,21 @@ private void execScheduleTask() { metricsMonitorExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { - replicateLagMetric(); + try { + replicateLagMetric(); + } catch (Throwable e) { + log.error("replicate log metric error", e); + } } }, period, period, TimeUnit.MILLISECONDS); commitOffsetScheduleService.scheduleAtFixedRate(new Runnable() { @Override public void run() { - commitOffsetSchedule(); + try { + commitOffsetSchedule(); + } catch (Throwable e) { + log.error("commit offset error", e); + } } }, connectorConfig.getCommitOffsetIntervalMs(), connectorConfig.getCommitOffsetIntervalMs(), TimeUnit.MILLISECONDS); } @@ -366,14 +374,14 @@ public void accept(MessageQueue messageQueue, OffsetWrapper offsetWrapper) { metricsItem2KeyMap.put(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_NUMS, delayNumsKeys); metricsItem2KeyMap.put(ReplicatorTaskStats.REPLICATOR_SOURCE_TASK_DELAY_MS, delayMsKeys); } catch (RemotingException | MQClientException e) { - log.error(" occur remoting or mqclient exception, retry build mqadminclient,", e); + log.error("occur remoting or mqclient exception, retry build mqadminclient", e); try { buildMqAdminClient(); } catch (MQClientException mqClientException) { - log.error(" rebuild mqadminclient error,", e); + log.error("rebuild mqadmin client error", e); } } catch (Exception e) { - log.error(" occur unknow exception,", e); + log.error(" occur unknown exception", e); } }