Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Commit to fix actions on iceberg tables #181

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2023 Expedia, Inc.
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -126,7 +126,8 @@ public void expiredMetadataCreateTableEvent() throws SQLException, IOException,
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getExpiredMetadataRowCount() == 1);

List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, null);
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, null, true);
// assertMetrics() accepts a boolean value now so we can verify if metadata-scheduled is not present
}

@Test
Expand All @@ -139,7 +140,7 @@ public void expiredMetadataAlterTableEvent() throws SQLException, IOException, U
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUpdatedExpiredMetadataRowCount() == 1);

List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, null);
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, null, true);
}

@Test
Expand All @@ -156,7 +157,7 @@ public void expiredMetadataAddPartitionEvent() throws SQLException, IOException,
List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
// check first entry is for the table
assertThat(expiredMetadata.get(0).getPartitionName()).isEqualTo(null);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_A, PARTITION_A_NAME);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_A, PARTITION_A_NAME, true);
}

@Test
Expand All @@ -176,8 +177,8 @@ public void expiredMetadataMultipleAddPartitionEvents() throws SQLException, IOE
List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
// check first entry is for the table
assertThat(expiredMetadata.get(0).getPartitionName()).isEqualTo(null);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_A, PARTITION_A_NAME);
assertExpiredMetadata(expiredMetadata.get(2), LOCATION_B, PARTITION_B_NAME);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_A, PARTITION_A_NAME, true);
assertExpiredMetadata(expiredMetadata.get(2), LOCATION_B, PARTITION_B_NAME, true);
}

@Test
Expand All @@ -191,7 +192,7 @@ public void expiredMetadataAlterPartitionTableEvent() throws SQLException, IOExc
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUpdatedExpiredMetadataRowCount() == 1);

List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, PARTITION_A_NAME);
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, PARTITION_A_NAME, true);
}

@Test
Expand All @@ -209,8 +210,28 @@ public void expiredMetadataMultipleAlterPartitionTableEvents() throws SQLExcepti
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUpdatedExpiredMetadataRowCount() == 2);

List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, PARTITION_A_NAME);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_B, PARTITION_B_NAME);
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, PARTITION_A_NAME, true);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_B, PARTITION_B_NAME, true);
}

// New test to check if expired metadata for Iceberg tables is filtered
@Test
public void expiredMetadataIcebergTableEventIsFiltered() throws SQLException, IOException, URISyntaxException {
//create a message for an Iceberg table by including table_type=ICEBERG in the payload
CreateTableSqsMessage createIcebergTableSqsMessage = new CreateTableSqsMessage(LOCATION_A, true);
createIcebergTableSqsMessage.setTableType("ICEBERG");
createIcebergTableSqsMessage.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");
amazonSQS.sendMessage(sendMessageRequest(createIcebergTableSqsMessage.getFormattedString()));
// wait for SchedulerApiary to process message
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getExpiredMetadataRowCount() == 0);
// asserts that no expired metadata was scheduled
List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
assertThat(expiredMetadata).isEmpty();
// verify metrics (updated assertMetrics) below
assertMetrics(false);
// assert the event was deleted from the queue
int queueSize = getSqsQueueSize();
assertThat(queueSize).isEqualTo(0);
}

@Test
Expand All @@ -233,9 +254,9 @@ private SendMessageRequest sendMessageRequest(String payload) {
return new SendMessageRequest(ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE), payload);
}

private void assertExpiredMetadata(HousekeepingMetadata actual, String expectedPath, String partitionName) {
private void assertExpiredMetadata(HousekeepingMetadata actual, String expectedPath, String partitionName, boolean expectScheduledExpiredMetric) {
assertHousekeepingMetadata(actual, expectedPath, partitionName);
assertMetrics();
assertMetrics(expectScheduledExpiredMetric);
}

public void assertHousekeepingMetadata(
Expand All @@ -256,13 +277,40 @@ public void assertHousekeepingMetadata(
assertThat(actual.getLifecycleType()).isEqualTo(EXPIRED.toString());
}

public void assertMetrics() {
/**
* Previously, assertExpiredMetadata didn't differentiate between whether specific metrics (e.g., metadata-scheduled) were expected to be present or not
* Adding boolean param allows us to check if SCHEDULED_EXPIRED_METRIC exists
* This allows me to check if the metadata-scheduled is present in expiredMetadataIcebergTableEventIsFiltered test.
*/
public void assertMetrics(boolean expectScheduledExpiredMetric) {
Set<MeterRegistry> meterRegistry = ((CompositeMeterRegistry) BeekeeperSchedulerApiary.meterRegistry())
.getRegistries();
assertThat(meterRegistry).hasSize(2);
meterRegistry.forEach(registry -> {
List<Meter> meters = registry.getMeters();
assertThat(meters).extracting("id", Meter.Id.class).extracting("name").contains(SCHEDULED_EXPIRED_METRIC);
if (expectScheduledExpiredMetric) {
assertThat(meters).extracting("id", Meter.Id.class)
.extracting("name")
.contains(SCHEDULED_EXPIRED_METRIC);
} else {
assertThat(meters).extracting("id", Meter.Id.class)
.extracting("name")
.doesNotContain(SCHEDULED_EXPIRED_METRIC);
}
});
}

// retrieves the current number of messages to check if the event has been added to the SQS queue or successfully ignored
private int getSqsQueueSize() {
String queueUrl = ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE);
// fetch the number of messages
String approximateNumberOfMessages = amazonSQS.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages"))
.getAttributes()
.get("ApproximateNumberOfMessages");

//return the count as an integer
return approximateNumberOfMessages != null && !approximateNumberOfMessages.isEmpty()
? Integer.parseInt(approximateNumberOfMessages)
: 0;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2023 Expedia, Inc.
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,8 +68,9 @@

@Testcontainers
public class BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest extends BeekeeperIntegrationTestBase {

private static final int TIMEOUT = 5;
// changes similar to BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest
private static final int TIMEOUT = 30;
// updated to match BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest, asynchronous operations so 5 seconds might not be enough
private static final String APIARY_QUEUE_URL_PROPERTY = "properties.apiary.queue-url";

private static final String QUEUE = "apiary-receiver-queue";
Expand Down Expand Up @@ -118,7 +119,7 @@ public void unreferencedAlterTableEvent() throws SQLException, IOException, URIS
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 1);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/oldTableLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/oldTableLocation", true);
}

@Test
Expand All @@ -132,8 +133,8 @@ public void unreferencedMultipleAlterTableEvents() throws SQLException, IOExcept
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/oldTableLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/tableLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/oldTableLocation", true);
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/tableLocation", true);
}

@Test
Expand All @@ -149,8 +150,8 @@ public void unreferencedAlterPartitionEvent() throws SQLException, IOException,
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/unreferencedPartitionLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation", true);
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/unreferencedPartitionLocation", true);
}

@Test
Expand All @@ -165,8 +166,8 @@ public void unreferencedMultipleAlterPartitionEvent() throws IOException, SQLExc
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/unreferencedPartitionLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation", true);
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/unreferencedPartitionLocation", true);
}

@Test
Expand All @@ -179,8 +180,8 @@ public void unreferencedDropPartitionEvent() throws SQLException, IOException, U
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/partitionLocation2");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation", true);
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/partitionLocation2", true);
}

@Test
Expand All @@ -192,8 +193,25 @@ public void unreferencedDropTableEvent() throws SQLException, IOException, URISy
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/tableLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/tableLocation2");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/tableLocation", true);
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/tableLocation2", true);
}

@Test
public void unreferencedIcebergTableEventIsFiltered() throws SQLException, IOException, URISyntaxException {
DropTableSqsMessage dropIcebergTableSqsMessage = new DropTableSqsMessage("s3://bucket/icebergTableLocation", true, true);
dropIcebergTableSqsMessage.setTableType("ICEBERG");
dropIcebergTableSqsMessage.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");
amazonSQS.sendMessage(sendMessageRequest(dropIcebergTableSqsMessage.getFormattedString()));

await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 0);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertThat(unreferencedPaths).isEmpty();
assertMetrics(false);

int queueSize = getSqsQueueSize();
assertThat(queueSize).isEqualTo(0);
}

@Test
Expand All @@ -216,9 +234,9 @@ private SendMessageRequest sendMessageRequest(String payload) {
return new SendMessageRequest(ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE), payload);
}

private void assertUnreferencedPath(HousekeepingPath actual, String expectedPath) {
private void assertUnreferencedPath(HousekeepingPath actual, String expectedPath, boolean expectScheduledUnreferencedMetric) {
assertHousekeepingEntity(actual, expectedPath);
assertMetrics();
assertMetrics(expectScheduledUnreferencedMetric);
}

public void assertHousekeepingEntity(HousekeepingPath actual, String expectedPath) {
Expand All @@ -235,13 +253,32 @@ public void assertHousekeepingEntity(HousekeepingPath actual, String expectedPat
assertThat(actual.getLifecycleType()).isEqualTo(UNREFERENCED.toString());
}

public void assertMetrics() {
public void assertMetrics(boolean expectScheduledUnreferencedMetric) {
Set<MeterRegistry> meterRegistry = ((CompositeMeterRegistry) BeekeeperSchedulerApiary.meterRegistry())
.getRegistries();
assertThat(meterRegistry).hasSize(2);
meterRegistry.forEach(registry -> {
List<Meter> meters = registry.getMeters();
assertThat(meters).extracting("id", Meter.Id.class).extracting("name").contains(SCHEDULED_ORPHANED_METRIC);
if (expectScheduledUnreferencedMetric) {
assertThat(meters).extracting("id", Meter.Id.class)
.extracting("name")
.contains(SCHEDULED_ORPHANED_METRIC);
} else {
assertThat(meters).extracting("id", Meter.Id.class)
.extracting("name")
.doesNotContain(SCHEDULED_ORPHANED_METRIC);
}
});
}

private int getSqsQueueSize() {
String queueUrl = ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE);
String approximateNumberOfMessages = amazonSQS.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages"))
.getAttributes()
.get("ApproximateNumberOfMessages");

return approximateNumberOfMessages != null && !approximateNumberOfMessages.isEmpty()
? Integer.parseInt(approximateNumberOfMessages)
: 0;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2020 Expedia, Inc.
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -99,6 +99,18 @@ public void setWhitelisted(boolean isWhitelisted) {
tableParameters.add(BEEKEEPER_HIVE_EVENT_WHITELIST, new JsonPrimitive(whitelist));
}

//enable the setting of the table_type parameter in SQS messages, to allow tests to simulate events for Iceberg/non-Iceberg tables.
public void setTableType(String tableType) {
JsonObject tableParameters = apiaryEventMessageJsonObject.getAsJsonObject(EVENT_TABLE_PARAMETERS_KEY);
tableParameters.add("table_type", new JsonPrimitive(tableType));
}

// New method to set output_format
public void setOutputFormat(String outputFormat) {
JsonObject tableParameters = apiaryEventMessageJsonObject.getAsJsonObject(EVENT_TABLE_PARAMETERS_KEY);
tableParameters.add("output_format", new JsonPrimitive(outputFormat));
}

public final String getFormattedString() {
apiaryEventJsonObject.add(APIARY_EVENT_MESSAGE_KEY, new JsonPrimitive(apiaryEventMessageJsonObject.toString()));
return apiaryEventJsonObject.toString();
Expand Down
Loading
Loading