diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BrokerLoadJob.java index bb82fd4cc9100..b44fcf444c4d2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BrokerLoadJob.java @@ -335,7 +335,9 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String .build()); return; } - if (retryTime <= 0 || !txnStatusChangeReason.contains("timeout") || !isTimeout()) { + boolean shouldRetry = retryTime > 0 && txnStatusChangeReason.contains("timeout") + && (LoadErrorUtils.isTimeoutFromLoadingTaskExecution(txnStatusChangeReason) || isTimeout()); + if (!shouldRetry) { // record attachment in load job unprotectUpdateLoadingStatus(txnState); // cancel load job diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BulkLoadJob.java index c02467bda6637..1eaa6ca1c9bae 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BulkLoadJob.java @@ -45,6 +45,7 @@ import com.starrocks.catalog.Table; import com.starrocks.common.DdlException; import com.starrocks.common.MetaNotFoundException; +import com.starrocks.common.UserException; import com.starrocks.common.io.Text; import com.starrocks.common.util.LogBuilder; import com.starrocks.common.util.LogKey; @@ -247,6 +248,7 @@ protected List getTabletFailInfos() { @Override public void onTaskFailed(long taskId, FailMsg failMsg) { + boolean timeoutFailure = false; writeLock(); try { // check if job has been completed @@ -261,10 +263,26 @@ public void onTaskFailed(long taskId, FailMsg failMsg) { if (!failMsg.getMsg().contains("timeout")) { unprotectedExecuteCancel(failMsg, true); logFinalOperation(); + } else { + timeoutFailure = true; } } finally { writeUnlock(); } + + // For timeout failure, should abort the transaction and retry as soon as possible + if (timeoutFailure) { + try { + LOG.debug("Loading task with timeout failure try to abort transaction, " + + "job_id: {}, task_id: {}, txn_id: {}, task fail message: {}", + id, taskId, transactionId, failMsg.getMsg()); + GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().abortTransaction( + dbId, transactionId, failMsg.getMsg()); + } catch (UserException e) { + LOG.warn("Loading task failed to abort transaction, job_id: {}, task_id: {}, txn_id: {}, " + + "task fail message: {}, abort exception:", id, taskId, transactionId, failMsg.getMsg(), e); + } + } } /** diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadErrorUtils.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadErrorUtils.java new file mode 100644 index 0000000000000..19721cc3f8428 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadErrorUtils.java @@ -0,0 +1,42 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.load.loadv2; + +public class LoadErrorUtils { + + public static class ErrorMeta { + final String keywords; + final String description; + + public ErrorMeta(String keywords, String description) { + this.keywords = keywords; + this.description = description; + } + } + + public static final ErrorMeta BACKEND_BRPC_TIMEOUT = + new ErrorMeta("[E1008]Reached timeout", "Backend BRPC timeout"); + + private static final ErrorMeta[] LOADING_TASK_TIMEOUT_ERRORS = new ErrorMeta[] {BACKEND_BRPC_TIMEOUT}; + + public static boolean isTimeoutFromLoadingTaskExecution(String errorMsg) { + for (ErrorMeta errorMeta : LOADING_TASK_TIMEOUT_ERRORS) { + if (errorMsg.contains(errorMeta.keywords)) { + return true; + } + } + return false; + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java index b4e413d0359a0..a1fdfea2b8b3c 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java @@ -413,6 +413,22 @@ public void unprotectUpdateLoadingStatus(TransactionState txnState) { brokerLoadJob4.afterAborted(txnState, txnOperated, txnStatusChangeReason); idToTasks = Deencapsulation.getField(brokerLoadJob4, "idToTasks"); Assert.assertEquals(1, idToTasks.size()); + + // test that timeout happens in loadin task before the job timeout + BrokerLoadJob brokerLoadJob5 = new BrokerLoadJob(); + new Expectations() { + { + brokerLoadJob5.isTimeout(); + result = false; + } + }; + brokerLoadJob5.retryTime = 1; + brokerLoadJob5.unprotectedExecuteJob(); + txnOperated = true; + txnStatusChangeReason = LoadErrorUtils.BACKEND_BRPC_TIMEOUT.keywords; + brokerLoadJob5.afterAborted(txnState, txnOperated, txnStatusChangeReason); + idToTasks = Deencapsulation.getField(brokerLoadJob5, "idToTasks"); + Assert.assertEquals(1, idToTasks.size()); } @Test @@ -433,6 +449,37 @@ public void logEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) { Assert.assertEquals(0, idToTasks.size()); } + @Test + public void testTaskAbortTransactionOnTimeoutFailure(@Mocked GlobalTransactionMgr globalTransactionMgr, + @Injectable long taskId, @Injectable FailMsg failMsg) throws UserException { + new Expectations() { + { + globalTransactionMgr.abortTransaction(anyLong, anyLong, anyString); + times = 1; + } + }; + + BrokerLoadJob brokerLoadJob = new BrokerLoadJob(); + failMsg = new FailMsg(FailMsg.CancelType.UNKNOWN, "[E1008]Reached timeout=7200000ms @127.0.0.1:8060"); + brokerLoadJob.onTaskFailed(taskId, failMsg); + + new Expectations() { + { + globalTransactionMgr.abortTransaction(anyLong, anyLong, anyString); + times = 1; + result = new UserException("Artificial exception"); + } + }; + + try { + BrokerLoadJob brokerLoadJob1 = new BrokerLoadJob(); + failMsg = new FailMsg(FailMsg.CancelType.UNKNOWN, "[E1008]Reached timeout=7200000ms @127.0.0.1:8060"); + brokerLoadJob1.onTaskFailed(taskId, failMsg); + } catch (Exception e) { + Assert.fail("should not throw exception"); + } + } + @Test public void testPendingTaskOnFinishedWithJobCancelled(@Injectable BrokerPendingTaskAttachment attachment) { BrokerLoadJob brokerLoadJob = new BrokerLoadJob();