diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java index 784e792fb7f99..774336cf1cb96 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java @@ -919,17 +919,7 @@ public boolean canTxnFinished(TransactionState txn, Set errReplicas, Set errorReplicaIds) thr for (MaterializedIndex index : allIndices) { for (Tablet tablet : index.getTablets()) { int healthReplicaNum = 0; + boolean isDependencyReplicasNotCommited = transactionState. + checkTransactionDependencyReplicasNotCommited((LocalTablet) tablet, quorumReplicaNum); for (Replica replica : ((LocalTablet) tablet).getImmutableReplicas()) { if (transactionState.isVersionOverwrite()) { ++healthReplicaNum; @@ -1103,7 +1095,9 @@ public void finishTransaction(long transactionId, Set errorReplicaIds) thr // if replica not commit yet, skip it. This may happen when it's just create by clone. if (!transactionState.tabletCommitInfosContainsReplica(tablet.getId(), replica.getBackendId(), replica.getState())) { - continue; + if (!isDependencyReplicasNotCommited) { + continue; + } } // this means the replica is a healthy replica, // it is healthy in the past and does not have error in current load diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java b/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java index 3f989aa9c4deb..47de06100f9fc 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/OlapTableTxnLogApplier.java @@ -20,6 +20,7 @@ import com.starrocks.catalog.LocalTablet; import com.starrocks.catalog.MaterializedIndex; import com.starrocks.catalog.OlapTable; +import com.starrocks.catalog.PartitionInfo; import com.starrocks.catalog.PhysicalPartition; import com.starrocks.catalog.Replica; import com.starrocks.catalog.Tablet; @@ -107,14 +108,16 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf LOG.warn("partition {} is dropped, ignore", partitionId); continue; } - short replicationNum = table.getPartitionInfo().getReplicationNum(partitionId); + PartitionInfo partitionInfo = table.getPartitionInfo(); + short replicationNum = partitionInfo.getReplicationNum(partitionId); + int quorumReplicaNum = partitionInfo.getQuorumNum(partitionId, table.writeQuorum()); long version = partitionCommitInfo.getVersion(); List allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); for (MaterializedIndex index : allIndices) { for (Tablet tablet : index.getTablets()) { boolean hasFailedVersion = false; - List replicas = ((LocalTablet) tablet).getAllReplicas(); + List replicas = ((LocalTablet) tablet).getImmutableReplicas(); for (Replica replica : replicas) { if (txnState.isNewFinish()) { updateReplicaVersion(version, replica, txnState.getFinishState()); @@ -123,8 +126,9 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf long lastFailedVersion = replica.getLastFailedVersion(); long newVersion = version; long lastSucessVersion = replica.getLastSuccessVersion(); - if (!txnState.tabletCommitInfosContainsReplica(tablet.getId(), replica.getBackendId(), - replica.getState()) + if ((!txnState.tabletCommitInfosContainsReplica(tablet.getId(), replica.getBackendId(), + replica.getState()) && !txnState. + checkTransactionDependencyReplicasNotCommited((LocalTablet) tablet, quorumReplicaNum)) || errorReplicaIds.contains(replica.getId())) { // There are 2 cases that we can't update version to visible version and need to // set lastFailedVersion. diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java b/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java index 43f5f7974cde6..dca302c701f8b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionState.java @@ -41,9 +41,11 @@ import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import com.starrocks.catalog.Database; +import com.starrocks.catalog.LocalTablet; import com.starrocks.catalog.MaterializedIndex; import com.starrocks.catalog.OlapTable; import com.starrocks.catalog.PhysicalPartition; +import com.starrocks.catalog.Replica; import com.starrocks.catalog.Replica.ReplicaState; import com.starrocks.common.Config; import com.starrocks.common.TraceManager; @@ -77,6 +79,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; @@ -448,6 +451,37 @@ public boolean tabletCommitInfosContainsReplica(long tabletId, long backendId, R return this.tabletCommitInfos.contains(info); } + public boolean checkTransactionDependencyReplicasNotCommited(LocalTablet tablet, int quorumReplicaNum) { + // In order for the transaction to complete in time for this scenario: the server machine is not recovered. + // 1. Transaction TA writes to a two-replicas tablet and enters the committed state. + // The tablet's repliace are replicaA, replicaB. + // 2. replicaA, replicaB generate tasks: PublishVersionTaskA, PublishVersionTaskB. + // PublishVersionTaskA/PublishVersionTaskB successfully submitted to the beA/beB via RPC. + // 3. The machine where beB is located hangs and is not recoverable. + // Therefore PublishVersionTaskA is finished,PublishVersionTaskB is unfinished. + // 4. FE clone replicaC from replicaA, BE report replicaC info. + // So transactions must rely on replicaA and replicaC to accomplish visible state. + List immutableReplicas = tablet.getImmutableReplicas(); + if (tabletCommitInfos != null) { + List replicaCommitedDecommission = immutableReplicas.stream().filter(replica -> { + if (replica.getState() == ReplicaState.DECOMMISSION) { + TabletCommitInfo info = new TabletCommitInfo(tablet.getId(), replica.getBackendId()); + if (this.tabletCommitInfos.contains(info)) { + return true; + } + return false; + } + return false; + }).collect(Collectors.toList()); + + if (tabletCommitInfos.size() - replicaCommitedDecommission.size() < quorumReplicaNum) { + LOG.warn("transaction {} has dependency replicas not commited, tablet {}", this, tablet.getId()); + return true; + } + } + return false; + } + // Only for OlapTable public void addPublishVersionTask(Long backendId, PublishVersionTask task) { this.publishVersionTasks.put(backendId, task);