Skip to content

Commit

Permalink
[Enhancement] Support list partition for backup restore (#51993)
Browse files Browse the repository at this point in the history
Signed-off-by: xiangguangyxg <[email protected]>
  • Loading branch information
xiangguangyxg authored Oct 17, 2024
1 parent 58fc586 commit d62055e
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 11 deletions.
48 changes: 46 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,50 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), newPartId);
});
}
} else if (partitionInfo.isListPartition()) {
ListPartitionInfo listPartitionInfo = (ListPartitionInfo) partitionInfo;
ListPartitionInfo origListPartitionInfo = (ListPartitionInfo) listPartitionInfo.clone();
for (Long partitionId : origPartNameToId.values()) {
listPartitionInfo.dropPartition(partitionId);
}
Map<Long, Partition> origIdToPartition = Maps.newHashMap(idToPartition);
idToPartition.clear();
physicalPartitionIdToPartitionId.clear();
physicalPartitionNameToPartitionId.clear();
for (Map.Entry<String, Long> entry : origPartNameToId.entrySet()) {
long newPartId = globalStateMgr.getNextId();
// preserve existing info
DataProperty dataProperty = origListPartitionInfo.getDataProperty(entry.getValue());
boolean inMemory = origListPartitionInfo.getIsInMemory(entry.getValue());
DataCacheInfo dataCacheInfo = origListPartitionInfo.getDataCacheInfo(entry.getValue());
List<String> values = origListPartitionInfo.getIdToValues().get(entry.getValue());
List<List<String>> multiValues = origListPartitionInfo.getIdToMultiValues().get(entry.getValue());
// replace with new info
try {
listPartitionInfo.addPartition(idToColumn, newPartId, dataProperty, (short) restoreReplicationNum,
inMemory, dataCacheInfo, values, multiValues);
} catch (AnalysisException e) {
return new Status(ErrCode.COMMON_ERROR, "Failed to add partition " + e.getMessage());
}
idToPartition.put(newPartId, origIdToPartition.get(entry.getValue()));
Partition partition = idToPartition.get(newPartId);
partition.setIdForRestore(newPartId);
List<PhysicalPartition> origPhysicalPartitions = Lists.newArrayList(partition.getSubPartitions());
origPhysicalPartitions.forEach(physicalPartition -> {
if (physicalPartition.getId() != newPartId) {
partition.removeSubPartition(physicalPartition.getId());
}
});
origPhysicalPartitions.forEach(physicalPartition -> {
if (physicalPartition.getId() != newPartId) {
physicalPartition.setIdForRestore(globalStateMgr.getNextId());
physicalPartition.setParentId(newPartId);
partition.addSubPartition(physicalPartition);
}
physicalPartitionIdToPartitionId.put(physicalPartition.getId(), newPartId);
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), newPartId);
});
}
} else if (partitionInfo.isUnPartitioned()) {
// Single partitioned
PartitionInfo origPartitionInfo = (PartitionInfo) partitionInfo.clone();
Expand All @@ -891,8 +935,8 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
idToPartition.clear();
physicalPartitionIdToPartitionId.clear();
physicalPartitionNameToPartitionId.clear();
long newPartId = globalStateMgr.getNextId();
for (Map.Entry<String, Long> entry : origPartNameToId.entrySet()) {
long newPartId = globalStateMgr.getNextId();
DataProperty dataProperty = origPartitionInfo.getDataProperty(entry.getValue());
boolean inMemory = origPartitionInfo.getIsInMemory(entry.getValue());
DataCacheInfo dataCacheInfo = origPartitionInfo.getDataCacheInfo(entry.getValue());
Expand All @@ -918,7 +962,7 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
});
}
} else {
return new Status(ErrCode.UNSUPPORTED, "List partitioned table does not support restore");
return new Status(ErrCode.UNSUPPORTED, "Unsupported partition type: " + partitionInfo.getType());
}

// reset replication number for olaptable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import com.starrocks.backup.Repository;
import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.ErrorCode;
Expand Down Expand Up @@ -411,13 +409,6 @@ public static void analyzeTableRef(TableRef tableRef, String dbName, Database db
}
}

if (tbl instanceof OlapTable) {
PartitionInfo partitionInfo = ((OlapTable) tbl).getPartitionInfo();
if (partitionInfo instanceof ListPartitionInfo) {
throw new SemanticException("List partition table does not support backup/restore job");
}
}

tableIdToTableRefMap.put(tbl.getId(), tableRef);
if (tbl.isMaterializedView()) {
MaterializedView mv = (MaterializedView) tbl;
Expand Down
54 changes: 54 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/backup/CatalogMocker.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.starrocks.catalog.FakeEditLog;
import com.starrocks.catalog.HashDistributionInfo;
import com.starrocks.catalog.KeysType;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.MaterializedIndex.IndexState;
Expand All @@ -53,6 +54,7 @@
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.PartitionType;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.PhysicalPartitionImpl;
import com.starrocks.catalog.PrimitiveType;
Expand Down Expand Up @@ -129,6 +131,10 @@ public class CatalogMocker {
public static final String TEST_TBL4_NAME = "test_tbl4";
public static final long TEST_TBL4_ID = 30004;

// list partition olap table
public static final String TEST_TBL5_NAME = "test_tbl5";
public static final long TEST_TBL5_ID = 30005;

public static final String TEST_PARTITION1_NAME = "p1";
public static final long TEST_PARTITION1_ID = 40001;
public static final String TEST_PARTITION2_NAME = "p2";
Expand Down Expand Up @@ -511,6 +517,54 @@ public static Database mockDb() throws AnalysisException {
db.registerTableUnlocked(olapTable4);
}

// 6. list partition olap table
{
baseIndexP1 = new MaterializedIndex(TEST_TBL5_ID, IndexState.NORMAL);
baseIndexP2 = new MaterializedIndex(TEST_TBL5_ID, IndexState.NORMAL);
DistributionInfo distributionInfo5 = new RandomDistributionInfo(1);
partition1 = new Partition(TEST_PARTITION1_ID, TEST_PARTITION1_NAME, baseIndexP1, distributionInfo5);

ListPartitionInfo listPartitionInfo = new ListPartitionInfo(PartitionType.LIST,
Lists.newArrayList(TEST_TBL_BASE_SCHEMA.get(0)));
listPartitionInfo.setValues(TEST_PARTITION1_ID, Lists.newArrayList("10"));
listPartitionInfo.setReplicationNum(TEST_PARTITION1_ID, (short) 3);
listPartitionInfo.setIsInMemory(TEST_PARTITION1_ID, false);
listPartitionInfo.setDataProperty(TEST_PARTITION1_ID, dataPropertyP1);

baseTabletP1 = new LocalTablet(TEST_BASE_TABLET_P1_ID);
tabletMetaBaseTabletP1 = new TabletMeta(TEST_DB_ID, TEST_TBL5_ID, TEST_PARTITION1_ID,
TEST_TBL5_ID, SCHEMA_HASH, TStorageMedium.HDD);
baseIndexP1.addTablet(baseTabletP1, tabletMetaBaseTabletP1);
replica3 = new Replica(TEST_REPLICA3_ID, BACKEND1_ID, 0, ReplicaState.NORMAL);
replica4 = new Replica(TEST_REPLICA4_ID, BACKEND2_ID, 0, ReplicaState.NORMAL);
replica5 = new Replica(TEST_REPLICA5_ID, BACKEND3_ID, 0, ReplicaState.NORMAL);

baseTabletP1.addReplica(replica3);
baseTabletP1.addReplica(replica4);
baseTabletP1.addReplica(replica5);

baseTabletP2 = new LocalTablet(TEST_BASE_TABLET_P2_ID);
tabletMetaBaseTabletP2 = new TabletMeta(TEST_DB_ID, TEST_TBL5_ID, TEST_PARTITION2_ID,
TEST_TBL5_ID, SCHEMA_HASH, TStorageMedium.HDD);
baseIndexP2.addTablet(baseTabletP2, tabletMetaBaseTabletP2);
replica6 = new Replica(TEST_REPLICA6_ID, BACKEND1_ID, 0, ReplicaState.NORMAL);
replica7 = new Replica(TEST_REPLICA7_ID, BACKEND2_ID, 0, ReplicaState.NORMAL);
replica8 = new Replica(TEST_REPLICA8_ID, BACKEND3_ID, 0, ReplicaState.NORMAL);

baseTabletP2.addReplica(replica6);
baseTabletP2.addReplica(replica7);
baseTabletP2.addReplica(replica8);

OlapTable olapTable5 = new OlapTable(TEST_TBL5_ID, TEST_TBL5_NAME, TEST_TBL_BASE_SCHEMA,
KeysType.DUP_KEYS, listPartitionInfo, distributionInfo5);
Deencapsulation.setField(olapTable5, "baseIndexId", TEST_TBL5_ID);
olapTable5.setIndexMeta(TEST_TBL5_ID, TEST_TBL5_NAME, TEST_TBL_BASE_SCHEMA, 0, SCHEMA_HASH, (short) 1,
TStorageType.COLUMN, KeysType.DUP_KEYS);

olapTable5.addPartition(partition1);

db.registerTableUnlocked(olapTable5);
}
return db;
}

Expand Down
178 changes: 178 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/backup/RestoreJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,184 @@ public Table getTable(Long dbId, Long tableId) {
Assert.assertEquals(RestoreJobState.DOWNLOAD, job.getState());
}

@Test
public void testRunBackupListTable() {
new Expectations() {
{
globalStateMgr.getLocalMetastore().getDb(anyLong);
minTimes = 0;
result = db;

globalStateMgr.getNextId();
minTimes = 0;
result = id.incrementAndGet();

globalStateMgr.getEditLog();
minTimes = 0;
result = editLog;

GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
minTimes = 0;
result = systemInfoService;
}
};

List<Long> beIds = Lists.newArrayList();
beIds.add(CatalogMocker.BACKEND1_ID);
beIds.add(CatalogMocker.BACKEND2_ID);
beIds.add(CatalogMocker.BACKEND3_ID);
new Expectations() {
{
systemInfoService.getNodeSelector().seqChooseBackendIds(anyInt, anyBoolean, anyBoolean, null);
minTimes = 0;
result = beIds;

systemInfoService.checkExceedDiskCapacityLimit((Multimap<Long, Long>) any, anyBoolean);
minTimes = 0;
result = com.starrocks.common.Status.OK;
}
};

new Expectations() {
{
editLog.logBackupJob((BackupJob) any);
minTimes = 0;
result = new Delegate() {
public void logBackupJob(BackupJob job) {
System.out.println("log backup job: " + job);
}
};
}
};

new Expectations() {
{
repo.upload(anyString, anyString);
result = Status.OK;
minTimes = 0;

List<BackupMeta> backupMetas = Lists.newArrayList();
repo.getSnapshotMetaFile(label, backupMetas, -1, -1);
minTimes = 0;
result = new Delegate() {
public Status getSnapshotMetaFile(String label, List<BackupMeta> backupMetas) {
backupMetas.add(backupMeta);
return Status.OK;
}
};
}
};

new MockUp<MarkedCountDownLatch>() {
@Mock
boolean await(long timeout, TimeUnit unit) {
return true;
}
};
Locker locker = new Locker();

// gen BackupJobInfo
jobInfo = new BackupJobInfo();
jobInfo.backupTime = System.currentTimeMillis();
jobInfo.dbId = CatalogMocker.TEST_DB_ID;
jobInfo.dbName = CatalogMocker.TEST_DB_NAME;
jobInfo.name = label;
jobInfo.success = true;

expectedRestoreTbl = (OlapTable) db.getTable(CatalogMocker.TEST_TBL5_ID);
BackupTableInfo tblInfo = new BackupTableInfo();
tblInfo.id = CatalogMocker.TEST_TBL5_ID;
tblInfo.name = CatalogMocker.TEST_TBL5_NAME;
jobInfo.tables.put(tblInfo.name, tblInfo);

for (Partition partition : expectedRestoreTbl.getPartitions()) {
BackupPartitionInfo partInfo = new BackupPartitionInfo();
partInfo.id = partition.getId();
partInfo.name = partition.getName();
tblInfo.partitions.put(partInfo.name, partInfo);

for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
BackupIndexInfo idxInfo = new BackupIndexInfo();
idxInfo.id = index.getId();
idxInfo.name = expectedRestoreTbl.getIndexNameById(index.getId());
idxInfo.schemaHash = expectedRestoreTbl.getSchemaHashByIndexId(index.getId());
partInfo.indexes.put(idxInfo.name, idxInfo);

for (Tablet tablet : index.getTablets()) {
BackupTabletInfo tabletInfo = new BackupTabletInfo();
tabletInfo.id = tablet.getId();
idxInfo.tablets.add(tabletInfo);
}
}
}

// drop this table, cause we want to try restoring this table
db.dropTable(expectedRestoreTbl.getName());

new MockUp<LocalMetastore>() {
@Mock
public Database getDb(String dbName) {
return db;
}

@Mock
public Table getTable(String dbName, String tblName) {
return db.getTable(tblName);
}

@Mock
public Table getTable(Long dbId, Long tableId) {
return db.getTable(tableId);
}
};

List<Table> tbls = Lists.newArrayList();
tbls.add(expectedRestoreTbl);
backupMeta = new BackupMeta(tbls);
job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(),
jobInfo, false, 3, 100000,
globalStateMgr, repo.getId(), backupMeta, new MvRestoreContext());
job.setRepo(repo);
// pending
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
Assert.assertEquals(1, job.getFileMapping().getMapping().size());

// 2. snapshoting
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
Assert.assertEquals(4, AgentTaskQueue.getTaskNum());

// 3. snapshot finished
List<AgentTask> agentTasks = Lists.newArrayList();
Map<TTaskType, Set<Long>> runningTasks = Maps.newHashMap();
agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
Assert.assertEquals(4, agentTasks.size());

for (AgentTask agentTask : agentTasks) {
if (agentTask.getTaskType() != TTaskType.MAKE_SNAPSHOT) {
continue;
}

SnapshotTask task = (SnapshotTask) agentTask;
String snapshotPath = "/path/to/snapshot/" + System.currentTimeMillis();
TStatus taskStatus = new TStatus(TStatusCode.OK);
TBackend tBackend = new TBackend("", 0, 1);
TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
task.getSignature(), taskStatus);
request.setSnapshot_path(snapshotPath);
Assert.assertTrue(job.finishTabletSnapshotTask(task, request));
}

job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.DOWNLOAD, job.getState());
}

@Test
public void testSignature() {
Adler32 sig1 = new Adler32();
Expand Down

0 comments on commit d62055e

Please sign in to comment.