diff --git a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperPathCleanupIntegrationTest.java b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperPathCleanupIntegrationTest.java index ed9ed046..e370906c 100644 --- a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperPathCleanupIntegrationTest.java +++ b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperPathCleanupIntegrationTest.java @@ -24,11 +24,13 @@ import static com.expediagroup.beekeeper.cleanup.monitoring.BytesDeletedReporter.METRIC_NAME; import static com.expediagroup.beekeeper.core.model.HousekeepingStatus.DELETED; +import static com.expediagroup.beekeeper.core.model.HousekeepingStatus.SKIPPED; import static com.expediagroup.beekeeper.integration.CommonTestVariables.AWS_REGION; import static com.expediagroup.beekeeper.integration.CommonTestVariables.DATABASE_NAME_VALUE; import static com.expediagroup.beekeeper.integration.CommonTestVariables.TABLE_NAME_VALUE; import java.sql.SQLException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,6 +62,9 @@ import com.amazonaws.services.s3.model.CreateBucketRequest; import com.google.common.collect.ImmutableMap; +import com.expediagroup.beekeeper.core.model.HousekeepingEntity; +import com.expediagroup.beekeeper.core.model.HousekeepingPath; +import com.expediagroup.beekeeper.core.model.HousekeepingStatus; import com.expediagroup.beekeeper.integration.utils.ContainerTestUtils; import com.expediagroup.beekeeper.integration.utils.HiveTestUtils; import com.expediagroup.beekeeper.path.cleanup.BeekeeperPathCleanup; @@ -295,6 +300,31 @@ public void cleanupSentinelForNonEmptyParent() throws SQLException, TException { assertThat(amazonS3.doesObjectExist(BUCKET, tableSentinel)).isTrue(); } + @Test + public void shouldSkipCleanupForIcebergTable() throws Exception { + // add iceberg table props + Map tableProperties = new HashMap<>(); + tableProperties.put("table_type", "ICEBERG"); + tableProperties.put("format", "ICEBERG/PARQUET"); + String outputFormat = "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat"; + // create iceberg table + hiveTestUtils.createTableWithProperties( + TABLE_PATH, TABLE_NAME_VALUE, false, tableProperties, outputFormat, true); + // add data + String objectKey = DATABASE_NAME_VALUE + "/" + TABLE_NAME_VALUE + "/file1"; + amazonS3.putObject(BUCKET, objectKey, CONTENT); + // insert housekeepingPath record + String path = "s3://" + BUCKET + "/" + DATABASE_NAME_VALUE + "/" + TABLE_NAME_VALUE + "/"; + insertUnreferencedPath(path); // Uses default database and table names + // wait for the cleanup process to run and update to skipped + await().atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> getUnreferencedPaths().get(0).getHousekeepingStatus() == SKIPPED); + // verify that the data in S3 is still present + assertThat(amazonS3.doesObjectExist(BUCKET, objectKey)) + .withFailMessage("S3 object %s should still exist as cleanup was skipped.", objectKey) + .isTrue(); + } + @Test public void metrics() throws SQLException, TException { hiveTestUtils.createTable(TABLE_PATH, TABLE_NAME_VALUE, false); diff --git a/beekeeper-path-cleanup/src/main/java/com/expediagroup/beekeeper/path/cleanup/handler/GenericPathHandler.java b/beekeeper-path-cleanup/src/main/java/com/expediagroup/beekeeper/path/cleanup/handler/GenericPathHandler.java index 30442c22..ad85a3b0 100644 --- a/beekeeper-path-cleanup/src/main/java/com/expediagroup/beekeeper/path/cleanup/handler/GenericPathHandler.java +++ b/beekeeper-path-cleanup/src/main/java/com/expediagroup/beekeeper/path/cleanup/handler/GenericPathHandler.java @@ -24,6 +24,7 @@ import org.springframework.data.domain.Slice; import com.expediagroup.beekeeper.cleanup.path.PathCleaner; +import com.expediagroup.beekeeper.core.error.BeekeeperException; import com.expediagroup.beekeeper.core.model.HousekeepingPath; import com.expediagroup.beekeeper.core.model.HousekeepingStatus; import com.expediagroup.beekeeper.core.repository.HousekeepingPathRepository; @@ -67,12 +68,20 @@ public Pageable processPage(Pageable pageable, Slice page, boo } private boolean cleanUpPath(HousekeepingPath housekeepingPath) { - if (S3PathValidator.validTablePath(housekeepingPath.getPath())) { - pathCleaner.cleanupPath(housekeepingPath); - return true; + try { + if (S3PathValidator.validTablePath(housekeepingPath.getPath())) { + pathCleaner.cleanupPath(housekeepingPath); + return true; + } + log.warn("Will not clean up path \"{}\" because it is not valid.", housekeepingPath.getPath()); + return false; + } catch (BeekeeperException e) { + // Handle Iceberg table by updating status to SKIPPED + updateStatus(housekeepingPath, HousekeepingStatus.SKIPPED); + log.warn("Skipping cleanup for Iceberg table \"{}.{}\": {}", housekeepingPath.getDatabaseName(), + housekeepingPath.getTableName(), e.getMessage()); + return false; } - log.warn("Will not clean up path \"{}\" because it is not valid.", housekeepingPath.getPath()); - return false; } private void cleanupContent(HousekeepingPath housekeepingPath) {