Skip to content

Commit

Permalink
Update path-cleanup housekeeping status
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamza Jugon committed Nov 25, 2024
1 parent 8c1ce38 commit 33a22c1
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@

import static com.expediagroup.beekeeper.cleanup.monitoring.BytesDeletedReporter.METRIC_NAME;
import static com.expediagroup.beekeeper.core.model.HousekeepingStatus.DELETED;
import static com.expediagroup.beekeeper.core.model.HousekeepingStatus.SKIPPED;
import static com.expediagroup.beekeeper.integration.CommonTestVariables.AWS_REGION;
import static com.expediagroup.beekeeper.integration.CommonTestVariables.DATABASE_NAME_VALUE;
import static com.expediagroup.beekeeper.integration.CommonTestVariables.TABLE_NAME_VALUE;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -60,6 +62,9 @@
import com.amazonaws.services.s3.model.CreateBucketRequest;
import com.google.common.collect.ImmutableMap;

import com.expediagroup.beekeeper.core.model.HousekeepingEntity;
import com.expediagroup.beekeeper.core.model.HousekeepingPath;
import com.expediagroup.beekeeper.core.model.HousekeepingStatus;
import com.expediagroup.beekeeper.integration.utils.ContainerTestUtils;
import com.expediagroup.beekeeper.integration.utils.HiveTestUtils;
import com.expediagroup.beekeeper.path.cleanup.BeekeeperPathCleanup;
Expand Down Expand Up @@ -295,6 +300,31 @@ public void cleanupSentinelForNonEmptyParent() throws SQLException, TException {
assertThat(amazonS3.doesObjectExist(BUCKET, tableSentinel)).isTrue();
}

@Test
public void shouldSkipCleanupForIcebergTable() throws Exception {
// add iceberg table props
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put("table_type", "ICEBERG");
tableProperties.put("format", "ICEBERG/PARQUET");
String outputFormat = "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat";
// create iceberg table
hiveTestUtils.createTableWithProperties(
TABLE_PATH, TABLE_NAME_VALUE, false, tableProperties, outputFormat, true);
// add data
String objectKey = DATABASE_NAME_VALUE + "/" + TABLE_NAME_VALUE + "/file1";
amazonS3.putObject(BUCKET, objectKey, CONTENT);
// insert housekeepingPath record
String path = "s3://" + BUCKET + "/" + DATABASE_NAME_VALUE + "/" + TABLE_NAME_VALUE + "/";
insertUnreferencedPath(path); // Uses default database and table names
// wait for the cleanup process to run and update to skipped
await().atMost(TIMEOUT, TimeUnit.SECONDS)
.until(() -> getUnreferencedPaths().get(0).getHousekeepingStatus() == SKIPPED);
// verify that the data in S3 is still present
assertThat(amazonS3.doesObjectExist(BUCKET, objectKey))
.withFailMessage("S3 object %s should still exist as cleanup was skipped.", objectKey)
.isTrue();
}

@Test
public void metrics() throws SQLException, TException {
hiveTestUtils.createTable(TABLE_PATH, TABLE_NAME_VALUE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.data.domain.Slice;

import com.expediagroup.beekeeper.cleanup.path.PathCleaner;
import com.expediagroup.beekeeper.core.error.BeekeeperException;
import com.expediagroup.beekeeper.core.model.HousekeepingPath;
import com.expediagroup.beekeeper.core.model.HousekeepingStatus;
import com.expediagroup.beekeeper.core.repository.HousekeepingPathRepository;
Expand Down Expand Up @@ -67,12 +68,20 @@ public Pageable processPage(Pageable pageable, Slice<HousekeepingPath> page, boo
}

private boolean cleanUpPath(HousekeepingPath housekeepingPath) {
if (S3PathValidator.validTablePath(housekeepingPath.getPath())) {
pathCleaner.cleanupPath(housekeepingPath);
return true;
try {
if (S3PathValidator.validTablePath(housekeepingPath.getPath())) {
pathCleaner.cleanupPath(housekeepingPath);
return true;
}
log.warn("Will not clean up path \"{}\" because it is not valid.", housekeepingPath.getPath());
return false;
} catch (BeekeeperException e) {
// Handle Iceberg table by updating status to SKIPPED
updateStatus(housekeepingPath, HousekeepingStatus.SKIPPED);
log.warn("Skipping cleanup for Iceberg table \"{}.{}\": {}", housekeepingPath.getDatabaseName(),
housekeepingPath.getTableName(), e.getMessage());
return false;
}
log.warn("Will not clean up path \"{}\" because it is not valid.", housekeepingPath.getPath());
return false;
}

private void cleanupContent(HousekeepingPath housekeepingPath) {
Expand Down

0 comments on commit 33a22c1

Please sign in to comment.