Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] When replica enter decommission, transaction will nerver complete. #49349

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,8 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) thr
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
int healthReplicaNum = 0;
boolean isDependencyReplicasNotCommited = transactionState.
checkTransactionDependencyReplicasNotCommited((LocalTablet) tablet, quorumReplicaNum);
for (Replica replica : ((LocalTablet) tablet).getImmutableReplicas()) {
if (transactionState.isVersionOverwrite()) {
++healthReplicaNum;
Expand All @@ -1093,7 +1095,9 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) thr
// if replica not commit yet, skip it. This may happen when it's just create by clone.
if (!transactionState.tabletCommitInfosContainsReplica(tablet.getId(),
replica.getBackendId(), replica.getState())) {
continue;
if (!isDependencyReplicasNotCommited) {
continue;
}
}
// this means the replica is a healthy replica,
// it is healthy in the past and does not have error in current load
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Tablet;
Expand Down Expand Up @@ -107,7 +108,9 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
LOG.warn("partition {} is dropped, ignore", partitionId);
continue;
}
short replicationNum = table.getPartitionInfo().getReplicationNum(partitionId);
PartitionInfo partitionInfo = table.getPartitionInfo();
short replicationNum = partitionInfo.getReplicationNum(partitionId);
int quorumReplicaNum = partitionInfo.getQuorumNum(partitionId, table.writeQuorum());
long version = partitionCommitInfo.getVersion();
List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
Expand All @@ -123,8 +126,9 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
long lastFailedVersion = replica.getLastFailedVersion();
long newVersion = version;
long lastSucessVersion = replica.getLastSuccessVersion();
if (!txnState.tabletCommitInfosContainsReplica(tablet.getId(), replica.getBackendId(),
replica.getState())
if ((!txnState.tabletCommitInfosContainsReplica(tablet.getId(), replica.getBackendId(),
replica.getState()) && !txnState.
checkTransactionDependencyReplicasNotCommited((LocalTablet) tablet, quorumReplicaNum))
zhangheihei marked this conversation as resolved.
Show resolved Hide resolved
|| errorReplicaIds.contains(replica.getId())) {
// There are 2 cases that we can't update version to visible version and need to
// set lastFailedVersion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Replica.ReplicaState;
import com.starrocks.common.Config;
import com.starrocks.common.TraceManager;
Expand Down Expand Up @@ -77,6 +79,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -448,6 +451,37 @@ public boolean tabletCommitInfosContainsReplica(long tabletId, long backendId, R
return this.tabletCommitInfos.contains(info);
}

public boolean checkTransactionDependencyReplicasNotCommited(LocalTablet tablet, int quorumReplicaNum) {
// In order for the transaction to complete in time for this scenario: the server machine is not recovered.
// 1. Transaction TA writes to a two-replicas tablet and enters the committed state.
// The tablet's repliace are replicaA, replicaB.
// 2. replicaA, replicaB generate tasks: PublishVersionTaskA, PublishVersionTaskB.
// PublishVersionTaskA/PublishVersionTaskB successfully submitted to the beA/beB via RPC.
// 3. The machine where beB is located hangs and is not recoverable.
// Therefore PublishVersionTaskA is finished,PublishVersionTaskB is unfinished.
// 4. FE clone replicaC from replicaA, BE report replicaC info.
// So transactions must rely on replicaA and replicaC to accomplish visible state.
List<Replica> immutableReplicas = tablet.getImmutableReplicas();
if (tabletCommitInfos != null) {
List<Replica> replicaCommitedDecommission = immutableReplicas.stream().filter(replica -> {
if (replica.getState() == ReplicaState.DECOMMISSION) {
TabletCommitInfo info = new TabletCommitInfo(tablet.getId(), replica.getBackendId());
if (this.tabletCommitInfos.contains(info)) {
return true;
}
return false;
}
return false;
}).collect(Collectors.toList());

if (tabletCommitInfos.size() - replicaCommitedDecommission.size() < quorumReplicaNum) {
LOG.warn("transaction {} has dependency replicas not commited, tablet {}", this, tablet.getId());
return true;
}
}
return false;
}

// Only for OlapTable
public void addPublishVersionTask(Long backendId, PublishVersionTask task) {
this.publishVersionTasks.put(backendId, task);
Expand Down
Loading