Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix online optimize conflict with expression partition #52074

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/exec/tablet_sink_index_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ Status NodeChannel::_wait_request(ReusableClosure<PTabletWriterAddBatchResult>*
}
}

std::vector<int64_t> tablet_ids;
std::set<int64_t> tablet_ids;
for (auto& tablet : closure->result.tablet_vec()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet.tablet_id();
Expand All @@ -818,7 +818,7 @@ Status NodeChannel::_wait_request(ReusableClosure<PTabletWriterAddBatchResult>*
_tablet_commit_infos.emplace_back(std::move(commit_info));

if (tablet_ids.size() < 128) {
tablet_ids.emplace_back(commit_info.tabletId);
tablet_ids.insert(commit_info.tabletId);
}
}
for (auto& log : *(closure->result.mutable_lake_tablet_data()->mutable_txn_logs())) {
Expand Down
23 changes: 18 additions & 5 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ void LocalTabletsChannel::_abort_replica_tablets(
cancel_request.set_txn_id(_txn_id);
cancel_request.set_index_id(_index_id);
cancel_request.set_reason(abort_reason);
cancel_request.set_sink_id(request.sink_id());

auto closure = new ReusableClosure<PTabletWriterCancelResult>();

Expand Down Expand Up @@ -589,8 +590,9 @@ void LocalTabletsChannel::_commit_tablets(const PTabletWriterAddChunkRequest& re
}
string commit_tablet_id_list_str;
JoinInts(commit_tablet_ids, ",", &commit_tablet_id_list_str);
LOG(INFO) << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(request.id()) << " commit "
<< commit_tablet_ids.size() << " tablets: " << commit_tablet_id_list_str;
LOG(INFO) << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(request.id())
<< " sink_id: " << request.sink_id() << " commit " << commit_tablet_ids.size()
<< " tablets: " << commit_tablet_id_list_str;

// abort seconary replicas located on other nodes which have no data
_abort_replica_tablets(request, "", node_id_to_abort_tablets);
Expand Down Expand Up @@ -713,8 +715,16 @@ Status LocalTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& pa
}
if (_is_replicated_storage) {
std::stringstream ss;
ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id()) << " open "
<< _delta_writers.size() << " delta writers, " << failed_tablet_ids.size() << " failed_tablets: ";
ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id())
<< " sink_id: " << params.sink_id() << " open " << _delta_writers.size() << " delta writer: ";
int i = 0;
for (auto& [tablet_id, delta_writer] : _delta_writers) {
ss << "[" << tablet_id << ":" << delta_writer->replica_state() << "]";
if (++i > 128) {
break;
}
}
ss << failed_tablet_ids.size() << " failed_tablets: ";
for (auto& tablet_id : failed_tablet_ids) {
ss << tablet_id << ",";
}
Expand Down Expand Up @@ -759,6 +769,9 @@ void LocalTabletsChannel::abort(const std::vector<int64_t>& tablet_ids, const st
if (it != _delta_writers.end()) {
it->second->cancel(Status::Cancelled(reason));
it->second->abort(abort_with_exception);
} else {
LOG(WARNING) << "tablet_id: " << tablet_id << " not found in LocalTabletsChannel txn_id: " << _txn_id
<< " load_id: " << _key.id << " index_id: " << _key.index_id;
}
}
string tablet_id_list_str;
Expand Down Expand Up @@ -842,7 +855,7 @@ Status LocalTabletsChannel::incremental_open(const PTabletWriterOpenRequest& par
size_t incremental_tablet_num = 0;
std::stringstream ss;
ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id())
<< " incremental open delta writer: ";
<< " sink_id: " << params.sink_id() << " incremental open delta writer: ";

for (const PTabletWithPartition& tablet : params.tablets()) {
if (_delta_writers.count(tablet.tablet_id()) != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,13 @@ protected boolean checkTableStable(Database db) throws AlterCancelException {
} else {
// table is stable, set is to ROLLUP and begin altering.
LOG.info("table {} is stable, start job{}, type {}", tableId, jobId, type);
tbl.setState(type == JobType.ROLLUP ? OlapTableState.ROLLUP : OlapTableState.SCHEMA_CHANGE);
if (type == JobType.ROLLUP) {
tbl.setState(OlapTableState.ROLLUP);
} else if (type == JobType.OPTIMIZE) {
tbl.setState(OlapTableState.OPTIMIZE);
} else {
tbl.setState(OlapTableState.SCHEMA_CHANGE);
}
return true;
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public enum AlterOpType {
}

public boolean needCheckCapacity() {
return this == ADD_ROLLUP || this == SCHEMA_CHANGE || this == ADD_PARTITION;
return this == ADD_ROLLUP || this == SCHEMA_CHANGE || this == ADD_PARTITION || this == OPTIMIZE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private void enableDoubleWritePartition(Database db, OlapTable tbl, String sourc
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.WRITE);
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
Preconditions.checkState(tbl.getState() == OlapTableState.OPTIMIZE);
tbl.addDoubleWritePartition(sourcePartitionName, tmpPartitionName);
LOG.info("job {} add double write partition {} to {}", jobId, tmpPartitionName, sourcePartitionName);
} finally {
Expand All @@ -306,7 +306,6 @@ private void enableDoubleWritePartition(Database db, OlapTable tbl, String sourc
private void disableDoubleWritePartition(Database db, OlapTable tbl) {
try (AutoCloseableLock ignored =
new AutoCloseableLock(new Locker(), db.getId(), Lists.newArrayList(tbl.getId()), LockType.WRITE)) {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
tbl.clearDoubleWritePartition();
LOG.info("job {} clear double write partitions", jobId);
}
Expand Down Expand Up @@ -596,7 +595,7 @@ private void replayPending(OnlineOptimizeJobV2 replayedJob) {
return;
}
// set table state
tbl.setState(OlapTableState.SCHEMA_CHANGE);
tbl.setState(OlapTableState.OPTIMIZE);
} finally {
locker.unLockDatabase(db.getId(), LockType.WRITE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ private void replayPending(OptimizeJobV2 replayedJob) {
try (AutoCloseableLock ignore =
new AutoCloseableLock(new Locker(), db.getId(), Lists.newArrayList(tbl.getId()), LockType.WRITE)) {
// set table state
tbl.setState(OlapTableState.SCHEMA_CHANGE);
tbl.setState(OlapTableState.OPTIMIZE);
}

this.jobState = JobState.PENDING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public AlterJobV2 build() throws UserException {
OptimizeJobV2 optimizeJob = new OptimizeJobV2(jobId, dbId, tableId, table.getName(), timeoutMs, optimizeClause);
return optimizeJob;
} else {
LOG.info("Online optimize job is created, table: {}", table.getName());
LOG.info("Online optimize job {} is created, table: {}", jobId, table.getName());
OnlineOptimizeJobV2 onlineOptimizeJob = new OnlineOptimizeJobV2(
jobId, dbId, tableId, table.getName(), timeoutMs, optimizeClause);
return onlineOptimizeJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,11 @@ public ShowResultSet process(List<AlterClause> alterClauses, Database db, OlapTa
}

// set table state
olapTable.setState(OlapTableState.SCHEMA_CHANGE);
if (schemaChangeJob.getType() == AlterJobV2.JobType.OPTIMIZE) {
olapTable.setState(OlapTableState.OPTIMIZE);
} else {
olapTable.setState(OlapTableState.SCHEMA_CHANGE);
}

// 2. add schemaChangeJob
addAlterJobV2(schemaChangeJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public static void checkNativeTable(Database db, Table table) throws DdlExceptio

// check table state
public static void checkTableState(OlapTable olapTable, String tableName) throws DdlException {
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL
&& olapTable.getState() != OlapTable.OlapTableState.OPTIMIZE) {
throw InvalidOlapTableStateException.of(olapTable.getState(), tableName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ public enum OlapTableState {
* The query plan which is generate during this state is invalid because the meta
* during the creation of the logical plan and the physical plan might be inconsistent.
*/
UPDATING_META
UPDATING_META,
OPTIMIZE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there any compatibility issues with upgrade or rollback when optimize job is running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be compatibility with upgrade or downgrade since replay will update table state.

}

@SerializedName(value = "state")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void complete() throws UserException {
tSink2.unsetPartition();
tSink2.unsetLocation();
TOlapTablePartitionParam partitionParam2 = createPartition(tSink2.getDb_id(), dstTable, tupleDescriptor,
enableAutomaticPartition, automaticBucketSize, doubleWritePartitionIds);
false, automaticBucketSize, doubleWritePartitionIds);
tSink2.setPartition(partitionParam2);
tSink2.setLocation(createLocation(dstTable, partitionParam2, enableReplicatedStorage, warehouseId));
tSink2.setIgnore_out_of_partition(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4774,11 +4774,10 @@ public OlapTable getCopiedTable(Database db, OlapTable olapTable, List<Long> sou
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
if (!isOptimize || olapTable.getState() != OlapTable.OlapTableState.SCHEMA_CHANGE) {
throw new RuntimeException("Table' state is not NORMAL: " + olapTable.getState()
+ ", tableId:" + olapTable.getId() + ", tabletName:" + olapTable.getName());
}
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL
&& olapTable.getState() != OlapTable.OlapTableState.OPTIMIZE) {
throw new RuntimeException("Table' state is not NORMAL: " + olapTable.getState()
+ ", tableId:" + olapTable.getId() + ", tabletName:" + olapTable.getName());
}
for (Long id : sourcePartitionIds) {
origPartitions.put(id, olapTable.getPartition(id).getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testOptimizeTable() throws Exception {
schemaChangeHandler.process(alterTableStmt.getAlterClauseList(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2();
Assert.assertEquals(1, alterJobsV2.size());
Assert.assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState());
Assert.assertEquals(OlapTableState.OPTIMIZE, olapTable.getState());
}

// start a schema change, then finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testOptimizeTable() throws Exception {
schemaChangeHandler.process(alterTableStmt.getAlterClauseList(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2();
Assert.assertEquals(1, alterJobsV2.size());
Assert.assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState());
Assert.assertEquals(OlapTableState.OPTIMIZE, olapTable.getState());
}

// start a schema change, then finished
Expand Down
Loading
Loading