Skip to content

Commit

Permalink
HIVE-28131: Iceberg: Add support for Replace Branch. (apache#5190). (…
Browse files Browse the repository at this point in the history
…Ayush Saxena, reviewed by Butao Zhang)
  • Loading branch information
ayushtkn authored Apr 18, 2024
1 parent 881d38f commit 4f05ca6
Show file tree
Hide file tree
Showing 14 changed files with 543 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
AlterTableType.ADDPROPS, AlterTableType.DROPPROPS, AlterTableType.SETPARTITIONSPEC,
AlterTableType.UPDATE_COLUMNS, AlterTableType.RENAME, AlterTableType.EXECUTE, AlterTableType.CREATE_BRANCH,
AlterTableType.CREATE_TAG, AlterTableType.DROP_BRANCH, AlterTableType.RENAME_BRANCH, AlterTableType.DROPPARTITION,
AlterTableType.DROP_TAG, AlterTableType.COMPACT);
AlterTableType.DROP_TAG, AlterTableType.COMPACT, AlterTableType.REPLACE_BRANCH);
private static final List<String> MIGRATION_ALLOWED_SOURCE_FORMATS = ImmutableList.of(
FileFormat.PARQUET.name().toLowerCase(),
FileFormat.ORC.name().toLowerCase(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,11 @@ public void alterTableSnapshotRefOperation(org.apache.hadoop.hive.ql.metadata.Ta
(AlterTableSnapshotRefSpec.RenameSnapshotrefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergBranchExec.renameBranch(icebergTable, renameSnapshotrefSpec);
break;
case REPLACE_BRANCH:
AlterTableSnapshotRefSpec.ReplaceSnapshotrefSpec replaceSnapshotrefSpec =
(AlterTableSnapshotRefSpec.ReplaceSnapshotrefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergBranchExec.replaceBranch(icebergTable, replaceSnapshotrefSpec);
break;
case DROP_TAG:
AlterTableSnapshotRefSpec.DropSnapshotRefSpec dropTagSpec =
(AlterTableSnapshotRefSpec.DropSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,29 @@ public static void renameBranch(Table table, AlterTableSnapshotRefSpec.RenameSna
LOG.info("Renaming branch {} to {} on iceberg table {}", sourceBranch, targetBranch, table.name());
table.manageSnapshots().renameBranch(sourceBranch, targetBranch).commit();
}

public static void replaceBranch(Table table,
AlterTableSnapshotRefSpec.ReplaceSnapshotrefSpec replaceSnapshotrefSpec) {
String sourceBranch = replaceSnapshotrefSpec.getSourceBranchName();
ManageSnapshots manageSnapshots;
if (replaceSnapshotrefSpec.isReplaceBySnapshot()) {
long targetSnapshot = replaceSnapshotrefSpec.getTargetSnapshot();
LOG.info("Replacing branch {} with snapshot {} on iceberg table {}", sourceBranch, targetSnapshot, table.name());
manageSnapshots = table.manageSnapshots().replaceBranch(sourceBranch, targetSnapshot);
} else {
String targetBranch = replaceSnapshotrefSpec.getTargetBranchName();
LOG.info("Replacing branch {} with branch {} on iceberg table {}", sourceBranch, targetBranch, table.name());
manageSnapshots = table.manageSnapshots().replaceBranch(sourceBranch, targetBranch);
}
if (replaceSnapshotrefSpec.getMaxRefAgeMs() > 0) {
manageSnapshots.setMaxRefAgeMs(sourceBranch, replaceSnapshotrefSpec.getMaxRefAgeMs());
}
if (replaceSnapshotrefSpec.getMaxSnapshotAgeMs() > 0) {
manageSnapshots.setMaxSnapshotAgeMs(sourceBranch, replaceSnapshotrefSpec.getMaxSnapshotAgeMs());
}
if (replaceSnapshotrefSpec.getMinSnapshotsToKeep() > 0) {
manageSnapshots.setMinSnapshotsToKeep(sourceBranch, replaceSnapshotrefSpec.getMinSnapshotsToKeep());
}
manageSnapshots.commit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static org.apache.iceberg.mr.hive.TestTables.TestTableType.HIVE_CATALOG;
import static org.junit.Assert.assertEquals;

public class TestHiveIcebergCherryPick {
public class TestHiveIcebergSnapshotOperations {

private TestTables testTables;
private TestHiveShell shell;
Expand Down Expand Up @@ -82,4 +82,38 @@ public void testCherryPick() {
List<Object[]> result = shell.executeStatement("SELECT COUNT(*) FROM " + identifier.name());
assertEquals(6L, result.get(0)[0]);
}

@Test
public void testReplaceBranchWithSnapshot() {
TableIdentifier identifier = TableIdentifier.of("default", "testReplaceBranchWithSnapshot");
shell.executeStatement(
String.format("CREATE EXTERNAL TABLE %s (id INT) STORED BY iceberg %s %s",
identifier.name(),
testTables.locationForCreateTableSQL(identifier),
testTables.propertiesForCreateTableSQL(ImmutableMap.of())));

shell.executeStatement(String.format("INSERT INTO TABLE %s VALUES(1),(2),(3),(4)", identifier.name()));

org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
icebergTable.refresh();
// Create a branch
shell.executeStatement(String.format("ALTER TABLE %s create branch branch1", identifier.name()));
// Make one new insert to the main branch
shell.executeStatement(String.format("INSERT INTO TABLE %s VALUES(5),(6)", identifier.name()));
icebergTable.refresh();
long id = icebergTable.currentSnapshot().snapshotId();

// Make another insert so that the commit isn't the last commit on the branch
shell.executeStatement(String.format("INSERT INTO TABLE %s VALUES(7),(8)", identifier.name()));

// Validate the original count on branch before replace
List<Object[]> result =
shell.executeStatement("SELECT COUNT(*) FROM default.testReplaceBranchWithSnapshot.branch_branch1");
assertEquals(4L, result.get(0)[0]);
// Perform replace branch with snapshot id.
shell.executeStatement(
String.format("ALTER TABLE %s replace branch branch1 as of system_version %s", identifier.name(), id));
result = shell.executeStatement("SELECT COUNT(*) FROM default.testReplaceBranchWithSnapshot.branch_branch1");
assertEquals(6L, result.get(0)[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
-- SORT_QUERY_RESULTS
set hive.explain.user=false;
set hive.fetch.task.conversion=more;

create external table ice01(id int) stored by iceberg stored as orc tblproperties ('format-version'='2');

insert into ice01 values (1), (2), (3), (4);

select * from ice01;

-- create one branch
alter table ice01 create branch branch1;

-- insert some values in branch1
insert into default.ice01.branch_branch1 values (5), (6);
select * from default.ice01.branch_branch1;

-- create another branch
alter table ice01 create branch branch2;
-- do some inserts & deletes on this branch
insert into default.ice01.branch_branch2 values (22), (44);
delete from default.ice01.branch_branch2 where id=2;
select * from default.ice01.branch_branch2;

-- Do a replace
explain alter table ice01 replace branch branch1 as of branch branch2;
alter table ice01 replace branch branch1 as of branch branch2;
select * from default.ice01.branch_branch1;

-- create another branch
alter table ice01 create branch branch3;
-- do some inserts & deletes on this branch
insert into default.ice01.branch_branch3 values (45), (32);

-- Do a replace with retain last
explain alter table ice01 replace branch branch1 as of branch branch3 retain 5 days;
alter table ice01 replace branch branch1 as of branch branch3 retain 5 days;
select * from default.ice01.branch_branch1;

-- create another branch
alter table ice01 create branch branch4;
-- do some inserts & deletes on this branch
insert into default.ice01.branch_branch4 values (11), (78);
explain alter table ice01 replace branch branch1 as of branch branch4 with snapshot retention 5 snapshots 6 days;
alter table ice01 replace branch branch1 as of branch branch4 with snapshot retention 5 snapshots 6 days;
select * from default.ice01.branch_branch1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
PREHOOK: query: create external table ice01(id int) stored by iceberg stored as orc tblproperties ('format-version'='2')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice01
POSTHOOK: query: create external table ice01(id int) stored by iceberg stored as orc tblproperties ('format-version'='2')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice01
PREHOOK: query: insert into ice01 values (1), (2), (3), (4)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice01
POSTHOOK: query: insert into ice01 values (1), (2), (3), (4)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice01
PREHOOK: query: select * from ice01
PREHOOK: type: QUERY
PREHOOK: Input: default@ice01
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from ice01
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice01
POSTHOOK: Output: hdfs://### HDFS PATH ###
1
2
3
4
PREHOOK: query: alter table ice01 create branch branch1
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@ice01
POSTHOOK: query: alter table ice01 create branch branch1
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@ice01
PREHOOK: query: insert into default.ice01.branch_branch1 values (5), (6)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice01
POSTHOOK: query: insert into default.ice01.branch_branch1 values (5), (6)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice01
PREHOOK: query: select * from default.ice01.branch_branch1
PREHOOK: type: QUERY
PREHOOK: Input: default@ice01
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from default.ice01.branch_branch1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice01
POSTHOOK: Output: hdfs://### HDFS PATH ###
1
2
3
4
5
6
PREHOOK: query: alter table ice01 create branch branch2
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@ice01
POSTHOOK: query: alter table ice01 create branch branch2
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@ice01
PREHOOK: query: insert into default.ice01.branch_branch2 values (22), (44)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice01
POSTHOOK: query: insert into default.ice01.branch_branch2 values (22), (44)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice01
PREHOOK: query: delete from default.ice01.branch_branch2 where id=2
PREHOOK: type: QUERY
PREHOOK: Input: default@ice01
PREHOOK: Output: default@ice01
POSTHOOK: query: delete from default.ice01.branch_branch2 where id=2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice01
POSTHOOK: Output: default@ice01
PREHOOK: query: select * from default.ice01.branch_branch2
PREHOOK: type: QUERY
PREHOOK: Input: default@ice01
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from default.ice01.branch_branch2
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice01
POSTHOOK: Output: hdfs://### HDFS PATH ###
1
22
3
4
44
PREHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch2
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: Input: default@ice01
POSTHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch2
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: Input: default@ice01
STAGE DEPENDENCIES:
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.ice01
spec: AlterTableSnapshotRefSpec{operationType=REPLACE_BRANCH, operationParams=ReplaceSnapshotrefSpec{sourceBranch=branch1, targetBranch=branch2}}

PREHOOK: query: alter table ice01 replace branch branch1 as of branch branch2
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: Input: default@ice01
POSTHOOK: query: alter table ice01 replace branch branch1 as of branch branch2
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: Input: default@ice01
PREHOOK: query: select * from default.ice01.branch_branch1
PREHOOK: type: QUERY
PREHOOK: Input: default@ice01
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from default.ice01.branch_branch1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice01
POSTHOOK: Output: hdfs://### HDFS PATH ###
1
22
3
4
44
PREHOOK: query: alter table ice01 create branch branch3
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@ice01
POSTHOOK: query: alter table ice01 create branch branch3
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@ice01
PREHOOK: query: insert into default.ice01.branch_branch3 values (45), (32)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice01
POSTHOOK: query: insert into default.ice01.branch_branch3 values (45), (32)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice01
PREHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch3 retain 5 days
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: Input: default@ice01
POSTHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch3 retain 5 days
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: Input: default@ice01
STAGE DEPENDENCIES:
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.ice01
spec: AlterTableSnapshotRefSpec{operationType=REPLACE_BRANCH, operationParams=ReplaceSnapshotrefSpec{sourceBranch=branch1, targetBranch=branch3, maxRefAgeMs=432000000}}

PREHOOK: query: alter table ice01 replace branch branch1 as of branch branch3 retain 5 days
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: Input: default@ice01
POSTHOOK: query: alter table ice01 replace branch branch1 as of branch branch3 retain 5 days
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: Input: default@ice01
PREHOOK: query: select * from default.ice01.branch_branch1
PREHOOK: type: QUERY
PREHOOK: Input: default@ice01
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from default.ice01.branch_branch1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice01
POSTHOOK: Output: hdfs://### HDFS PATH ###
1
2
3
32
4
45
PREHOOK: query: alter table ice01 create branch branch4
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@ice01
POSTHOOK: query: alter table ice01 create branch branch4
POSTHOOK: type: ALTERTABLE_CREATEBRANCH
POSTHOOK: Input: default@ice01
PREHOOK: query: insert into default.ice01.branch_branch4 values (11), (78)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice01
POSTHOOK: query: insert into default.ice01.branch_branch4 values (11), (78)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice01
PREHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch4 with snapshot retention 5 snapshots 6 days
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: Input: default@ice01
POSTHOOK: query: explain alter table ice01 replace branch branch1 as of branch branch4 with snapshot retention 5 snapshots 6 days
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: Input: default@ice01
STAGE DEPENDENCIES:
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-0
SnapshotRef Operation
table name: default.ice01
spec: AlterTableSnapshotRefSpec{operationType=REPLACE_BRANCH, operationParams=ReplaceSnapshotrefSpec{sourceBranch=branch1, targetBranch=branch4, minSnapshotsToKeep=5, maxSnapshotAgeMs=518400000}}

PREHOOK: query: alter table ice01 replace branch branch1 as of branch branch4 with snapshot retention 5 snapshots 6 days
PREHOOK: type: ALTERTABLE_REPLACEBRANCH
PREHOOK: Input: default@ice01
POSTHOOK: query: alter table ice01 replace branch branch1 as of branch branch4 with snapshot retention 5 snapshots 6 days
POSTHOOK: type: ALTERTABLE_REPLACEBRANCH
POSTHOOK: Input: default@ice01
PREHOOK: query: select * from default.ice01.branch_branch1
PREHOOK: type: QUERY
PREHOOK: Input: default@ice01
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from default.ice01.branch_branch1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice01
POSTHOOK: Output: hdfs://### HDFS PATH ###
1
11
2
3
4
78
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ alterTableStatementSuffix
| alterStatementSuffixDropTag
| alterStatementSuffixConvert
| alterStatementSuffixRenameBranch
| alterStatementSuffixReplaceBranch
;

alterTblPartitionStatementSuffix[boolean partition]
Expand Down Expand Up @@ -513,6 +514,13 @@ alterStatementSuffixRenameBranch
-> ^(TOK_ALTERTABLE_RENAME_BRANCH $sourceBranch $targetBranch)
;

alterStatementSuffixReplaceBranch
@init { gParent.pushMsg("alter table replace branch", state); }
@after { gParent.popMsg(state); }
: KW_REPLACE KW_BRANCH sourceBranch=Identifier KW_AS KW_OF ((KW_SYSTEM_VERSION snapshotId=Number) | (KW_BRANCH branch=identifier)) refRetain? retentionOfSnapshots?
-> ^(TOK_ALTERTABLE_REPLACE_BRANCH $sourceBranch KW_SYSTEM_VERSION? $snapshotId? $branch? refRetain? retentionOfSnapshots?)
;

alterStatementSuffixDropBranch
@init { gParent.pushMsg("alter table drop branch (if exists) branchName", state); }
@after { gParent.popMsg(state); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ TOK_ALTERTABLE_EXECUTE;
TOK_ALTERTABLE_CREATE_BRANCH;
TOK_ALTERTABLE_DROP_BRANCH;
TOK_ALTERTABLE_RENAME_BRANCH;
TOK_ALTERTABLE_REPLACE_BRANCH;
TOK_ALTERTABLE_CREATE_TAG;
TOK_ALTERTABLE_DROP_TAG;
TOK_RETAIN;
Expand Down
Loading

0 comments on commit 4f05ca6

Please sign in to comment.