From 018d8855e85d6dc3ed01b3e5b45b3babf622b4b2 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 15 Oct 2024 20:00:27 +0800 Subject: [PATCH] [BugFix] Fix null exception when insert overwrite job run concurrecy (backport #50628) (#50986) Signed-off-by: sevev Co-authored-by: zhangqiang --- .../starrocks/load/InsertOverwriteJob.java | 12 ++++++ .../load/InsertOverwriteJobRunner.java | 42 ++++++++++++++++--- .../InsertOverwriteStateChangeInfo.java | 12 +++++- .../load/InsertOverwriteJobManagerTest.java | 4 +- .../load/InsertOverwriteJobRunnerTest.java | 23 +++++++++- .../InsertOverwriteStateChangeInfoTest.java | 2 +- 6 files changed, 84 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java index b259ec9de5086..1a11ef55a439e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJob.java @@ -39,6 +39,10 @@ public class InsertOverwriteJob { @SerializedName(value = "targetTableId") private long targetTableId; + @SerializedName(value = "sourcePartitionNames") + private List sourcePartitionNames; + + private transient InsertStmt insertStmt; public InsertOverwriteJob(long jobId, InsertStmt insertStmt, long targetDbId, long targetTableId) { @@ -83,6 +87,14 @@ public void setSourcePartitionIds(List sourcePartitionIds) { this.sourcePartitionIds = sourcePartitionIds; } + public List getSourcePartitionNames() { + return sourcePartitionNames; + } + + public void setSourcePartitionNames(List sourcePartitionNames) { + this.sourcePartitionNames = sourcePartitionNames; + } + public List getTmpPartitionIds() { return tmpPartitionIds; } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java index c2a1bf5b70bfe..1509c5e776910 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobRunner.java @@ -51,6 +51,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -180,6 +181,7 @@ public void replayStateChange(InsertOverwriteStateChangeInfo info) { case OVERWRITE_RUNNING: job.setSourcePartitionIds(info.getSourcePartitionIds()); job.setTmpPartitionIds(info.getTmpPartitionIds()); + job.setSourcePartitionNames(info.getSourcePartitionNames()); job.setJobState(InsertOverwriteJobState.OVERWRITE_RUNNING); break; case OVERWRITE_SUCCESS: @@ -213,8 +215,21 @@ private void prepare() throws Exception { job.setTmpPartitionIds(tmpPartitionIds); Database db = getAndWriteLockDatabase(dbId); try { + OlapTable targetTable; + targetTable = checkAndGetTable(db, tableId); + List sourcePartitionNames = Lists.newArrayList(); + for (Long partitionId : job.getSourcePartitionIds()) { + Partition partition = targetTable.getPartition(partitionId); + if (partition == null) { + throw new DmlException("partition id:%s does not exist in table id:%s", partitionId, tableId); + } + sourcePartitionNames.add(partition.getName()); + } + job.setSourcePartitionNames(sourcePartitionNames); + InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(), - InsertOverwriteJobState.OVERWRITE_RUNNING, job.getSourcePartitionIds(), job.getTmpPartitionIds()); + InsertOverwriteJobState.OVERWRITE_RUNNING, job.getSourcePartitionIds(), job.getSourcePartitionNames(), + job.getTmpPartitionIds()); GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info); } finally { db.writeUnlock(); @@ -375,7 +390,8 @@ private void gc(boolean isReplay) { sourceTablets.forEach(GlobalStateMgr.getCurrentInvertedIndex()::markTabletForceDelete); InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(), - OVERWRITE_FAILED, job.getSourcePartitionIds(), job.getTmpPartitionIds()); + OVERWRITE_FAILED, job.getSourcePartitionIds(), job.getSourcePartitionNames(), + job.getTmpPartitionIds()); GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info); } } catch (Exception e) { @@ -392,9 +408,18 @@ private void doCommit(boolean isReplay) { // try exception to release write lock finally final OlapTable targetTable = checkAndGetTable(db, tableId); tmpTargetTable = targetTable; - List sourcePartitionNames = job.getSourcePartitionIds().stream() - .map(partitionId -> targetTable.getPartition(partitionId).getName()) - .collect(Collectors.toList()); + List sourcePartitionNames = job.getSourcePartitionNames(); + if (sourcePartitionNames == null || sourcePartitionNames.isEmpty()) { + sourcePartitionNames = new ArrayList<>(); + for (Long partitionId : job.getSourcePartitionIds()) { + Partition partition = targetTable.getPartition(partitionId); + if (partition == null) { + throw new DmlException("Partition id:%s does not exist in table id:%s", partitionId, tableId); + } else { + sourcePartitionNames.add(partition.getName()); + } + } + } List tmpPartitionNames = job.getTmpPartitionIds().stream() .map(partitionId -> targetTable.getPartition(partitionId).getName()) .collect(Collectors.toList()); @@ -420,7 +445,8 @@ private void doCommit(boolean isReplay) { sourceTablets.forEach(GlobalStateMgr.getCurrentInvertedIndex()::markTabletForceDelete); InsertOverwriteStateChangeInfo info = new InsertOverwriteStateChangeInfo(job.getJobId(), job.getJobState(), - InsertOverwriteJobState.OVERWRITE_SUCCESS, job.getSourcePartitionIds(), job.getTmpPartitionIds()); + InsertOverwriteJobState.OVERWRITE_SUCCESS, job.getSourcePartitionIds(), job.getSourcePartitionNames(), + job.getTmpPartitionIds()); GlobalStateMgr.getCurrentState().getEditLog().logInsertOverwriteStateChange(info); try { @@ -509,4 +535,8 @@ private OlapTable checkAndGetTable(Database db, long tableId) { Preconditions.checkState(table instanceof OlapTable); return (OlapTable) table; } + + protected void testDoCommit(boolean isReplay) { + doCommit(isReplay); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/InsertOverwriteStateChangeInfo.java b/fe/fe-core/src/main/java/com/starrocks/persist/InsertOverwriteStateChangeInfo.java index 63a41a1bbb0bf..b64a8773ffe25 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/InsertOverwriteStateChangeInfo.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/InsertOverwriteStateChangeInfo.java @@ -42,13 +42,19 @@ public class InsertOverwriteStateChangeInfo implements Writable { @SerializedName(value = "tmpPartitionIds") private List tmpPartitionIds; + @SerializedName(value = "sourcePartitionNames") + private List sourcePartitionNames = null; + public InsertOverwriteStateChangeInfo(long jobId, InsertOverwriteJobState fromState, InsertOverwriteJobState toState, - List sourcePartitionIds, List tmpPartitionIds) { + List sourcePartitionIds, + List sourcePartitionNames, + List tmpPartitionIds) { this.jobId = jobId; this.fromState = fromState; this.toState = toState; this.sourcePartitionIds = sourcePartitionIds; + this.sourcePartitionNames = sourcePartitionNames; this.tmpPartitionIds = tmpPartitionIds; } @@ -72,6 +78,10 @@ public List getTmpPartitionIds() { return tmpPartitionIds; } + public List getSourcePartitionNames() { + return sourcePartitionNames; + } + @Override public String toString() { return "InsertOverwriteStateChangeInfo{" + diff --git a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java index 99afec42dec20..c84e4eabe08bd 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobManagerTest.java @@ -132,12 +132,12 @@ public void testReplay() throws Exception { List newPartitionNames = Lists.newArrayList(10001L); InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(1100L, InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING, - sourcePartitionNames, newPartitionNames); + sourcePartitionNames, null, newPartitionNames); insertOverwriteJobManager.replayInsertOverwriteStateChange(stateChangeInfo); InsertOverwriteStateChangeInfo stateChangeInfo2 = new InsertOverwriteStateChangeInfo(1100L, InsertOverwriteJobState.OVERWRITE_RUNNING, InsertOverwriteJobState.OVERWRITE_SUCCESS, - sourcePartitionNames, newPartitionNames); + sourcePartitionNames, null, newPartitionNames); insertOverwriteJobManager.replayInsertOverwriteStateChange(stateChangeInfo2); } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java index 2743a55c4e2c5..c45e0a59b86bd 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/InsertOverwriteJobRunnerTest.java @@ -27,11 +27,13 @@ import com.starrocks.qe.StmtExecutor; import com.starrocks.server.GlobalStateMgr; import com.starrocks.sql.ast.InsertStmt; +import com.starrocks.sql.common.DmlException; import com.starrocks.utframe.StarRocksAssert; import com.starrocks.utframe.UtFrameUtils; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import java.sql.SQLException; @@ -97,7 +99,7 @@ public void testReplayInsertOverwrite() { Lists.newArrayList(olapTable.getPartition("t1").getId())); InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(100L, InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING, - Lists.newArrayList(2000L), Lists.newArrayList(2001L)); + Lists.newArrayList(2000L), null, Lists.newArrayList(2001L)); Assert.assertEquals(100L, stateChangeInfo.getJobId()); Assert.assertEquals(InsertOverwriteJobState.OVERWRITE_PENDING, stateChangeInfo.getFromState()); Assert.assertEquals(InsertOverwriteJobState.OVERWRITE_RUNNING, stateChangeInfo.getToState()); @@ -138,4 +140,23 @@ public void testInsertOverwriteWithDuplicatePartitions() throws SQLException { String sql = "insert overwrite t3 partitions(p1, p1) select * from t4"; cluster.runSql("insert_overwrite_test", sql); } + + @Test + public void testInsertOverwriteConcurrencyWithSamePartitions() throws Exception { + Database database = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("insert_overwrite_test"); + Table table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(database.getFullName(), "t1"); + Assert.assertTrue(table instanceof OlapTable); + OlapTable olapTable = (OlapTable) table; + InsertOverwriteJob insertOverwriteJob = new InsertOverwriteJob(100L, database.getId(), olapTable.getId(), + Lists.newArrayList(olapTable.getPartition("t1").getId())); + InsertOverwriteJobRunner runner = new InsertOverwriteJobRunner(insertOverwriteJob); + + connectContext.getSessionVariable().setOptimizerExecuteTimeout(300000000); + String sql = "insert overwrite t1 partitions(t1) select * from t2"; + cluster.runSql("insert_overwrite_test", sql); + + Assertions.assertThrows(DmlException.class, () -> runner.testDoCommit(false)); + insertOverwriteJob.setSourcePartitionNames(Lists.newArrayList("t1")); + Assertions.assertThrows(DmlException.class, () -> runner.testDoCommit(false)); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/persist/InsertOverwriteStateChangeInfoTest.java b/fe/fe-core/src/test/java/com/starrocks/persist/InsertOverwriteStateChangeInfoTest.java index 9cc37af4c72f1..2f5c31cfa05d3 100644 --- a/fe/fe-core/src/test/java/com/starrocks/persist/InsertOverwriteStateChangeInfoTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/persist/InsertOverwriteStateChangeInfoTest.java @@ -35,7 +35,7 @@ public void testBasic() throws IOException { List newPartitionNames = Lists.newArrayList(1000L, 1001L); InsertOverwriteStateChangeInfo stateChangeInfo = new InsertOverwriteStateChangeInfo(100L, InsertOverwriteJobState.OVERWRITE_PENDING, InsertOverwriteJobState.OVERWRITE_RUNNING, - sourcePartitionNames, newPartitionNames); + sourcePartitionNames, null, newPartitionNames); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(outputStream); stateChangeInfo.write(dataOutputStream);