Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] When replica enter decommission, transaction will nerver complete. #49349

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,8 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) thr
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
int healthReplicaNum = 0;
boolean isDependencyReplicasNotCommited = transactionState.
checkTransactionDependencyReplicasNotCommited((LocalTablet) tablet, quorumReplicaNum);
for (Replica replica : ((LocalTablet) tablet).getImmutableReplicas()) {
if (transactionState.isVersionOverwrite()) {
++healthReplicaNum;
Expand All @@ -1095,7 +1097,11 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) thr
// if replica not commit yet, skip it. This may happen when it's just create by clone.
if (!transactionState.tabletCommitInfosContainsReplica(tablet.getId(),
replica.getBackendId(), replica.getState())) {
continue;
if (!(isDependencyReplicasNotCommited
&& replica.getVersion() >= partitionCommitInfo.getVersion())) {
continue;
}
//this means the replica is normal
}
// this means the replica is a healthy replica,
// it is healthy in the past and does not have error in current load
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Tablet;
Expand Down Expand Up @@ -107,7 +108,9 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
LOG.warn("partition {} is dropped, ignore", partitionId);
continue;
}
short replicationNum = table.getPartitionInfo().getReplicationNum(partitionId);
PartitionInfo partitionInfo = table.getPartitionInfo();
short replicationNum = partitionInfo.getReplicationNum(partitionId);
int quorumReplicaNum = partitionInfo.getQuorumNum(partitionId, table.writeQuorum());
long version = partitionCommitInfo.getVersion();
List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
Expand All @@ -116,6 +119,8 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
for (Tablet tablet : index.getTablets()) {
boolean hasFailedVersion = false;
List<Replica> replicas = ((LocalTablet) tablet).getImmutableReplicas();
boolean isDependencyReplicasNotCommited = txnState.
checkTransactionDependencyReplicasNotCommited((LocalTablet) tablet, quorumReplicaNum);
for (Replica replica : replicas) {
if (txnState.isNewFinish()) {
updateReplicaVersion(version, replica, txnState.getFinishState());
Expand All @@ -127,25 +132,31 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
if (!txnState.tabletCommitInfosContainsReplica(tablet.getId(), replica.getBackendId(),
replica.getState())
|| errorReplicaIds.contains(replica.getId())) {
// There are 2 cases that we can't update version to visible version and need to
// set lastFailedVersion.
// 1. this replica doesn't have version publish yet. This maybe happen when clone concurrent
// with data loading.
// 2. this replica has data loading failure.
//
// for example, A,B,C 3 replicas, B,C failed during publish version (Or never publish),
// then B C will be set abnormal and all loadings will be failed, B,C will have to recover
// by clone, it is very inefficient and may lose data.
// Using this method, B,C will publish failed, and fe will publish again,
// not update their last failed version.
// if B is published successfully in next turn, then B is normal and C will be set
// abnormal so that quorum is maintained and loading will go on.
String combinedId = String.format("%d_%d", tablet.getId(), replica.getBackendId());
skipUpdateReplicas.add(combinedId);
newVersion = replica.getVersion();
if (version > lastFailedVersion) {
lastFailedVersion = version;
hasFailedVersion = true;
if (isDependencyReplicasNotCommited && replica.getVersion() >= version) {
// this means the replica is a normal replica
// success version always move forward
lastSucessVersion = version;
} else {
// There are 2 cases that we can't update version to visible version and need to
// set lastFailedVersion.
// 1. this replica doesn't have version publish yet. This maybe happen when clone concurrent
// with data loading.
// 2. this replica has data loading failure.
//
// for example, A,B,C 3 replicas, B,C failed during publish version (Or never publish),
// then B C will be set abnormal and all loadings will be failed, B,C will have to recover
// by clone, it is very inefficient and may lose data.
// Using this method, B,C will publish failed, and fe will publish again,
// not update their last failed version.
// if B is published successfully in next turn, then B is normal and C will be set
// abnormal so that quorum is maintained and loading will go on.
String combinedId = String.format("%d_%d", tablet.getId(), replica.getBackendId());
skipUpdateReplicas.add(combinedId);
newVersion = replica.getVersion();
if (version > lastFailedVersion) {
lastFailedVersion = version;
hasFailedVersion = true;
}
}
} else {
if (replica.getLastFailedVersion() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Replica.ReplicaState;
import com.starrocks.common.Config;
import com.starrocks.common.TraceManager;
Expand Down Expand Up @@ -451,6 +453,37 @@ public boolean tabletCommitInfosContainsReplica(long tabletId, long backendId, R
return this.tabletCommitInfos.contains(info);
}

public boolean checkTransactionDependencyReplicasNotCommited(LocalTablet tablet, int quorumReplicaNum) {
// In order for the transaction to complete in time for this scenario: the server machine is not recovered.
// 1. Transaction TA writes to a two-replicas tablet and enters the committed state.
// The tablet's repliace are replicaA, replicaB.
// 2. replicaA, replicaB generate tasks: PublishVersionTaskA, PublishVersionTaskB.
// PublishVersionTaskA/PublishVersionTaskB successfully submitted to the beA/beB via RPC.
// 3. The machine where beB is located hangs and is not recoverable.
// Therefore PublishVersionTaskA is finished,PublishVersionTaskB is unfinished.
// 4. FE clone replicaC from replicaA, BE report replicaC info.
// So transactions must rely on replicaA and replicaC to accomplish visible state.
List<Replica> immutableReplicas = tablet.getImmutableReplicas();
if (tabletCommitInfos != null) {
List<Replica> replicaCommitedDecommission = immutableReplicas.stream().filter(replica -> {
if (replica.getState() == ReplicaState.DECOMMISSION) {
TabletCommitInfo info = new TabletCommitInfo(tablet.getId(), replica.getBackendId());
if (this.tabletCommitInfos.contains(info)) {
return true;
}
return false;
}
return false;
}).collect(Collectors.toList());

if (tabletCommitInfos.size() - replicaCommitedDecommission.size() < quorumReplicaNum) {
LOG.warn("transaction {} has dependency replicas not commited, tablet {}", this, tablet.getId());
return true;
}
}
return false;
}

// Only for OlapTable
public void addPublishVersionTask(Long backendId, PublishVersionTask task) {
this.publishVersionTasks.put(backendId, task);
Expand Down
Loading