Skip to content

Commit

Permalink
[BugFix] Fix reset ids for restore (#51630)
Browse files Browse the repository at this point in the history
Signed-off-by: xiangguangyxg <[email protected]>
(cherry picked from commit 47314a6)
  • Loading branch information
xiangguangyxg authored and mergify[bot] committed Oct 16, 2024
1 parent cc63afe commit b7496d8
Showing 1 changed file with 53 additions and 28 deletions.
81 changes: 53 additions & 28 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -821,13 +821,15 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
setId(globalStateMgr.getNextId());

// reset all 'indexIdToXXX' map
Map<Long, MaterializedIndexMeta> origIndexIdToMeta = Maps.newHashMap(indexIdToMeta);
indexIdToMeta.clear();
for (Map.Entry<Long, String> entry : origIdxIdToName.entrySet()) {
long newIdxId = globalStateMgr.getNextId();
if (entry.getValue().equals(name)) {
// base index
baseIndexId = newIdxId;
}
indexIdToMeta.put(newIdxId, indexIdToMeta.remove(entry.getKey()));
indexIdToMeta.put(newIdxId, origIndexIdToMeta.get(entry.getKey()));
indexIdToMeta.get(newIdxId).setIndexIdForRestore(newIdxId);
indexIdToMeta.get(newIdxId).setSchemaId(newIdxId);
indexNameToId.put(entry.getValue(), newIdxId);
Expand All @@ -843,28 +845,35 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
// reset partition info and idToPartition map
if (partitionInfo.isRangePartition()) {
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
RangePartitionInfo origRangePartitionInfo = (RangePartitionInfo) rangePartitionInfo.clone();
for (Long partitionId : origPartNameToId.values()) {
rangePartitionInfo.dropPartition(partitionId);
}
Map<Long, Partition> origIdToPartition = Maps.newHashMap(idToPartition);
idToPartition.clear();
physicalPartitionIdToPartitionId.clear();
physicalPartitionNameToPartitionId.clear();
for (Map.Entry<String, Long> entry : origPartNameToId.entrySet()) {
long newPartId = globalStateMgr.getNextId();
// preserve existing info
DataProperty dataProperty = rangePartitionInfo.getDataProperty(entry.getValue());
boolean inMemory = rangePartitionInfo.getIsInMemory(entry.getValue());
DataCacheInfo dataCacheInfo = rangePartitionInfo.getDataCacheInfo(entry.getValue());
Range<PartitionKey> range = rangePartitionInfo.getIdToRange(false).get(entry.getValue());
DataProperty dataProperty = origRangePartitionInfo.getDataProperty(entry.getValue());
boolean inMemory = origRangePartitionInfo.getIsInMemory(entry.getValue());
DataCacheInfo dataCacheInfo = origRangePartitionInfo.getDataCacheInfo(entry.getValue());
Range<PartitionKey> range = origRangePartitionInfo.getIdToRange(false).get(entry.getValue());
// replace with new info
rangePartitionInfo.dropPartition(entry.getValue());
rangePartitionInfo.addPartition(newPartId, false, range, dataProperty, (short) restoreReplicationNum,
inMemory, dataCacheInfo);

idToPartition.get(entry.getValue()).getSubPartitions().forEach(physicalPartition -> {
physicalPartitionIdToPartitionId.remove(physicalPartition.getId());
physicalPartitionNameToPartitionId.remove(physicalPartition.getName());
});
idToPartition.put(newPartId, idToPartition.remove(entry.getValue()));
idToPartition.put(newPartId, origIdToPartition.get(entry.getValue()));
Partition partition = idToPartition.get(newPartId);
partition.setIdForRestore(newPartId);
partition.getSubPartitions().forEach(physicalPartition -> {
List<PhysicalPartition> origPhysicalPartitions = Lists.newArrayList(partition.getSubPartitions());
origPhysicalPartitions.forEach(physicalPartition -> {
if (physicalPartition.getId() != newPartId) {
partition.removeSubPartition(physicalPartition.getId());
}
});
origPhysicalPartitions.forEach(physicalPartition -> {
if (physicalPartition.getId() != newPartId) {
physicalPartition.setIdForRestore(globalStateMgr.getNextId());
physicalPartition.setParentId(newPartId);
partition.addSubPartition(physicalPartition);
Expand All @@ -873,27 +882,34 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), newPartId);
});
}
} else {
} else if (partitionInfo.isUnPartitioned()) {
// Single partitioned
PartitionInfo origPartitionInfo = (PartitionInfo) partitionInfo.clone();
for (Long partitionId : origPartNameToId.values()) {
partitionInfo.dropPartition(partitionId);
}
Map<Long, Partition> origIdToPartition = Maps.newHashMap(idToPartition);
idToPartition.clear();
physicalPartitionIdToPartitionId.clear();
physicalPartitionNameToPartitionId.clear();
long newPartId = globalStateMgr.getNextId();
for (Map.Entry<String, Long> entry : origPartNameToId.entrySet()) {
DataProperty dataProperty = partitionInfo.getDataProperty(entry.getValue());
boolean inMemory = partitionInfo.getIsInMemory(entry.getValue());
DataCacheInfo dataCacheInfo = partitionInfo.getDataCacheInfo(entry.getValue());
partitionInfo.dropPartition(entry.getValue());
DataProperty dataProperty = origPartitionInfo.getDataProperty(entry.getValue());
boolean inMemory = origPartitionInfo.getIsInMemory(entry.getValue());
DataCacheInfo dataCacheInfo = origPartitionInfo.getDataCacheInfo(entry.getValue());
partitionInfo.addPartition(newPartId, dataProperty, (short) restoreReplicationNum, inMemory,
dataCacheInfo);

idToPartition.get(entry.getValue()).getSubPartitions().forEach(physicalPartition -> {
physicalPartitionIdToPartitionId.remove(physicalPartition.getId());
physicalPartitionNameToPartitionId.remove(physicalPartition.getName());
});
idToPartition.put(newPartId, idToPartition.remove(entry.getValue()));
idToPartition.put(newPartId, origIdToPartition.get(entry.getValue()));
Partition partition = idToPartition.get(newPartId);
partition.setIdForRestore(newPartId);
partition.getSubPartitions().forEach(physicalPartition -> {
List<PhysicalPartition> origPhysicalPartitions = Lists.newArrayList(partition.getSubPartitions());
origPhysicalPartitions.forEach(physicalPartition -> {
if (physicalPartition.getId() != newPartId) {
partition.removeSubPartition(physicalPartition.getId());
}
});
origPhysicalPartitions.forEach(physicalPartition -> {
if (physicalPartition.getId() != newPartId) {
physicalPartition.setIdForRestore(globalStateMgr.getNextId());
physicalPartition.setParentId(newPartId);
partition.addSubPartition(physicalPartition);
Expand All @@ -902,6 +918,8 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), newPartId);
});
}
} else {
return new Status(ErrCode.UNSUPPORTED, "List partitioned table does not support restore");
}

// reset replication number for olaptable
Expand All @@ -911,16 +929,23 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
Partition partition = entry.getValue();
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
Map<Long, MaterializedIndex> origIdToIndex = Maps.newHashMapWithExpectedSize(origIdxIdToName.size());
for (Map.Entry<Long, String> entry2 : origIdxIdToName.entrySet()) {
System.out.println("entry2.getValue():" + entry2.getValue()
+ " baseIndex: " + physicalPartition.getBaseIndex());
MaterializedIndex idx = physicalPartition.getIndex(entry2.getKey());
origIdToIndex.put(entry2.getKey(), idx);
long newIdxId = indexNameToId.get(entry2.getValue());
if (newIdxId != baseIndexId) {
// not base table, delete old index
physicalPartition.deleteRollupIndex(entry2.getKey());
}
}
for (Map.Entry<Long, String> entry2 : origIdxIdToName.entrySet()) {
MaterializedIndex idx = origIdToIndex.get(entry2.getKey());
long newIdxId = indexNameToId.get(entry2.getValue());
int schemaHash = indexIdToMeta.get(newIdxId).getSchemaHash();
idx.setIdForRestore(newIdxId);
if (newIdxId != baseIndexId) {
// not base table, reset
physicalPartition.deleteRollupIndex(entry2.getKey());
physicalPartition.createRollupIndex(idx);
}

Expand Down

0 comments on commit b7496d8

Please sign in to comment.