Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangheihei committed Nov 10, 2024
1 parent edaa5e5 commit 0ef1c8a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -919,17 +919,7 @@ public boolean canTxnFinished(TransactionState txn, Set<Long> errReplicas, Set<L
int successHealthyReplicaNum = 0;
// if most replica's version have been updated to version published
// which means publish version task finished in replica
for (Replica replica : ((LocalTablet) tablet).getAllReplicas()) {
// Using getAllReplicas() instead of getImmutableReplicas
// In order for the transaction to complete in time for this scenario: the server machine is not recovered.
// 1. Transaction TA writes to a two-replicas tablet and enters the committed state.
// The tablet's repliace are replicaA, replicaB.
// 2. replicaA, replicaB generate tasks: PublishVersionTaskA, PublishVersionTaskB.
// PublishVersionTaskA/PublishVersionTaskB successfully submitted to the beA/beB via RPC.
// 3. The machine where beB is located hangs and is not recoverable.
// Therefore PublishVersionTaskA is finished,PublishVersionTaskB is unfinished.
// 4. FE clone replicaC from replicaA, BE report replicaC info.
// However, the update of immutableReplicas must wait for the checkpoint
for (Replica replica : ((LocalTablet) tablet).getImmutableReplicas()) {
if (!errReplicas.contains(replica.getId())) {
// success healthy replica condition:
// 1. version is equal to partition's visible version
Expand Down Expand Up @@ -1089,6 +1079,8 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) thr
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
int healthReplicaNum = 0;
boolean isDependencyReplicasNotCommited = transactionState.
checkTransactionDependencyReplicasNotCommited((LocalTablet) tablet, quorumReplicaNum);
for (Replica replica : ((LocalTablet) tablet).getImmutableReplicas()) {
if (transactionState.isVersionOverwrite()) {
++healthReplicaNum;
Expand All @@ -1103,7 +1095,9 @@ public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) thr
// if replica not commit yet, skip it. This may happen when it's just create by clone.
if (!transactionState.tabletCommitInfosContainsReplica(tablet.getId(),
replica.getBackendId(), replica.getState())) {
continue;
if (!isDependencyReplicasNotCommited) {
continue;
}
}
// this means the replica is a healthy replica,
// it is healthy in the past and does not have error in current load
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Tablet;
Expand Down Expand Up @@ -107,14 +108,16 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
LOG.warn("partition {} is dropped, ignore", partitionId);
continue;
}
short replicationNum = table.getPartitionInfo().getReplicationNum(partitionId);
PartitionInfo partitionInfo = table.getPartitionInfo();
short replicationNum = partitionInfo.getReplicationNum(partitionId);
int quorumReplicaNum = partitionInfo.getQuorumNum(partitionId, table.writeQuorum());
long version = partitionCommitInfo.getVersion();
List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
boolean hasFailedVersion = false;
List<Replica> replicas = ((LocalTablet) tablet).getAllReplicas();
List<Replica> replicas = ((LocalTablet) tablet).getImmutableReplicas();
for (Replica replica : replicas) {
if (txnState.isNewFinish()) {
updateReplicaVersion(version, replica, txnState.getFinishState());
Expand All @@ -123,8 +126,9 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
long lastFailedVersion = replica.getLastFailedVersion();
long newVersion = version;
long lastSucessVersion = replica.getLastSuccessVersion();
if (!txnState.tabletCommitInfosContainsReplica(tablet.getId(), replica.getBackendId(),
replica.getState())
if ((!txnState.tabletCommitInfosContainsReplica(tablet.getId(), replica.getBackendId(),
replica.getState()) && !txnState.
checkTransactionDependencyReplicasNotCommited((LocalTablet) tablet, quorumReplicaNum))
|| errorReplicaIds.contains(replica.getId())) {
// There are 2 cases that we can't update version to visible version and need to
// set lastFailedVersion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Replica.ReplicaState;
import com.starrocks.common.Config;
import com.starrocks.common.TraceManager;
Expand Down Expand Up @@ -77,6 +79,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -448,6 +451,37 @@ public boolean tabletCommitInfosContainsReplica(long tabletId, long backendId, R
return this.tabletCommitInfos.contains(info);
}

public boolean checkTransactionDependencyReplicasNotCommited(LocalTablet tablet, int quorumReplicaNum) {
// In order for the transaction to complete in time for this scenario: the server machine is not recovered.
// 1. Transaction TA writes to a two-replicas tablet and enters the committed state.
// The tablet's repliace are replicaA, replicaB.
// 2. replicaA, replicaB generate tasks: PublishVersionTaskA, PublishVersionTaskB.
// PublishVersionTaskA/PublishVersionTaskB successfully submitted to the beA/beB via RPC.
// 3. The machine where beB is located hangs and is not recoverable.
// Therefore PublishVersionTaskA is finished,PublishVersionTaskB is unfinished.
// 4. FE clone replicaC from replicaA, BE report replicaC info.
// So transactions must rely on replicaA and replicaC to accomplish visible state.
List<Replica> immutableReplicas = tablet.getImmutableReplicas();
if (tabletCommitInfos != null) {
List<Replica> replicaCommitedDecommission = immutableReplicas.stream().filter(replica -> {
if (replica.getState() == ReplicaState.DECOMMISSION) {
TabletCommitInfo info = new TabletCommitInfo(tablet.getId(), replica.getBackendId());
if (this.tabletCommitInfos.contains(info)) {
return true;
}
return false;
}
return false;
}).collect(Collectors.toList());

if (tabletCommitInfos.size() - replicaCommitedDecommission.size() < quorumReplicaNum) {
LOG.warn("transaction {} has dependency replicas not commited, tablet {}", this, tablet.getId());
return true;
}
}
return false;
}

// Only for OlapTable
public void addPublishVersionTask(Long backendId, PublishVersionTask task) {
this.publishVersionTasks.put(backendId, task);
Expand Down

0 comments on commit 0ef1c8a

Please sign in to comment.