Skip to content

Commit

Permalink
[Enhancement] Loading task should abort transaction if fail because o…
Browse files Browse the repository at this point in the history
…f timeout (backport #51939) (#52054)

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Oct 17, 2024
1 parent 409cde1 commit 3d50e33
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,9 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
.build());
return;
}
if (retryTime <= 0 || !txnStatusChangeReason.contains("timeout") || !isTimeout()) {
boolean shouldRetry = retryTime > 0 && txnStatusChangeReason.contains("timeout")
&& (LoadErrorUtils.isTimeoutFromLoadingTaskExecution(txnStatusChangeReason) || isTimeout());
if (!shouldRetry) {
// record attachment in load job
unprotectUpdateLoadingStatus(txnState);
// cancel load job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.starrocks.catalog.Table;
import com.starrocks.common.DdlException;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.UserException;
import com.starrocks.common.io.Text;
import com.starrocks.common.util.LogBuilder;
import com.starrocks.common.util.LogKey;
Expand Down Expand Up @@ -247,6 +248,7 @@ protected List<TabletFailInfo> getTabletFailInfos() {

@Override
public void onTaskFailed(long taskId, FailMsg failMsg) {
boolean timeoutFailure = false;
writeLock();
try {
// check if job has been completed
Expand All @@ -261,10 +263,26 @@ public void onTaskFailed(long taskId, FailMsg failMsg) {
if (!failMsg.getMsg().contains("timeout")) {
unprotectedExecuteCancel(failMsg, true);
logFinalOperation();
} else {
timeoutFailure = true;
}
} finally {
writeUnlock();
}

// For timeout failure, should abort the transaction and retry as soon as possible
if (timeoutFailure) {
try {
LOG.debug("Loading task with timeout failure try to abort transaction, " +
"job_id: {}, task_id: {}, txn_id: {}, task fail message: {}",
id, taskId, transactionId, failMsg.getMsg());
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().abortTransaction(
dbId, transactionId, failMsg.getMsg());
} catch (UserException e) {
LOG.warn("Loading task failed to abort transaction, job_id: {}, task_id: {}, txn_id: {}, " +
"task fail message: {}, abort exception:", id, taskId, transactionId, failMsg.getMsg(), e);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.load.loadv2;

public class LoadErrorUtils {

public static class ErrorMeta {
final String keywords;
final String description;

public ErrorMeta(String keywords, String description) {
this.keywords = keywords;
this.description = description;
}
}

public static final ErrorMeta BACKEND_BRPC_TIMEOUT =
new ErrorMeta("[E1008]Reached timeout", "Backend BRPC timeout");

private static final ErrorMeta[] LOADING_TASK_TIMEOUT_ERRORS = new ErrorMeta[] {BACKEND_BRPC_TIMEOUT};

public static boolean isTimeoutFromLoadingTaskExecution(String errorMsg) {
for (ErrorMeta errorMeta : LOADING_TASK_TIMEOUT_ERRORS) {
if (errorMsg.contains(errorMeta.keywords)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,22 @@ public void unprotectUpdateLoadingStatus(TransactionState txnState) {
brokerLoadJob4.afterAborted(txnState, txnOperated, txnStatusChangeReason);
idToTasks = Deencapsulation.getField(brokerLoadJob4, "idToTasks");
Assert.assertEquals(1, idToTasks.size());

// test that timeout happens in loadin task before the job timeout
BrokerLoadJob brokerLoadJob5 = new BrokerLoadJob();
new Expectations() {
{
brokerLoadJob5.isTimeout();
result = false;
}
};
brokerLoadJob5.retryTime = 1;
brokerLoadJob5.unprotectedExecuteJob();
txnOperated = true;
txnStatusChangeReason = LoadErrorUtils.BACKEND_BRPC_TIMEOUT.keywords;
brokerLoadJob5.afterAborted(txnState, txnOperated, txnStatusChangeReason);
idToTasks = Deencapsulation.getField(brokerLoadJob5, "idToTasks");
Assert.assertEquals(1, idToTasks.size());
}

@Test
Expand All @@ -433,6 +449,37 @@ public void logEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) {
Assert.assertEquals(0, idToTasks.size());
}

@Test
public void testTaskAbortTransactionOnTimeoutFailure(@Mocked GlobalTransactionMgr globalTransactionMgr,
@Injectable long taskId, @Injectable FailMsg failMsg) throws UserException {
new Expectations() {
{
globalTransactionMgr.abortTransaction(anyLong, anyLong, anyString);
times = 1;
}
};

BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
failMsg = new FailMsg(FailMsg.CancelType.UNKNOWN, "[E1008]Reached timeout=7200000ms @127.0.0.1:8060");
brokerLoadJob.onTaskFailed(taskId, failMsg);

new Expectations() {
{
globalTransactionMgr.abortTransaction(anyLong, anyLong, anyString);
times = 1;
result = new UserException("Artificial exception");
}
};

try {
BrokerLoadJob brokerLoadJob1 = new BrokerLoadJob();
failMsg = new FailMsg(FailMsg.CancelType.UNKNOWN, "[E1008]Reached timeout=7200000ms @127.0.0.1:8060");
brokerLoadJob1.onTaskFailed(taskId, failMsg);
} catch (Exception e) {
Assert.fail("should not throw exception");
}
}

@Test
public void testPendingTaskOnFinishedWithJobCancelled(@Injectable BrokerPendingTaskAttachment attachment) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Expand Down

0 comments on commit 3d50e33

Please sign in to comment.