Skip to content

Commit

Permalink
Revert "Undo some changes and update events filter class"
Browse files Browse the repository at this point in the history
This reverts commit aaca374.
  • Loading branch information
Hamza Jugon committed Nov 12, 2024
1 parent e9e2da8 commit 7ff8ce9
Show file tree
Hide file tree
Showing 11 changed files with 502 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2023 Expedia, Inc.
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -126,7 +126,8 @@ public void expiredMetadataCreateTableEvent() throws SQLException, IOException,
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getExpiredMetadataRowCount() == 1);

List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, null);
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, null, true);
// assertMetrics() accepts a boolean value now so we can verify if metadata-scheduled is not present
}

@Test
Expand All @@ -139,7 +140,7 @@ public void expiredMetadataAlterTableEvent() throws SQLException, IOException, U
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUpdatedExpiredMetadataRowCount() == 1);

List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, null);
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, null, true);
}

@Test
Expand All @@ -156,7 +157,7 @@ public void expiredMetadataAddPartitionEvent() throws SQLException, IOException,
List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
// check first entry is for the table
assertThat(expiredMetadata.get(0).getPartitionName()).isEqualTo(null);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_A, PARTITION_A_NAME);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_A, PARTITION_A_NAME, true);
}

@Test
Expand All @@ -176,8 +177,8 @@ public void expiredMetadataMultipleAddPartitionEvents() throws SQLException, IOE
List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
// check first entry is for the table
assertThat(expiredMetadata.get(0).getPartitionName()).isEqualTo(null);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_A, PARTITION_A_NAME);
assertExpiredMetadata(expiredMetadata.get(2), LOCATION_B, PARTITION_B_NAME);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_A, PARTITION_A_NAME, true);
assertExpiredMetadata(expiredMetadata.get(2), LOCATION_B, PARTITION_B_NAME, true);
}

@Test
Expand All @@ -191,7 +192,7 @@ public void expiredMetadataAlterPartitionTableEvent() throws SQLException, IOExc
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUpdatedExpiredMetadataRowCount() == 1);

List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, PARTITION_A_NAME);
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, PARTITION_A_NAME, true);
}

@Test
Expand All @@ -209,8 +210,28 @@ public void expiredMetadataMultipleAlterPartitionTableEvents() throws SQLExcepti
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUpdatedExpiredMetadataRowCount() == 2);

List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, PARTITION_A_NAME);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_B, PARTITION_B_NAME);
assertExpiredMetadata(expiredMetadata.get(0), LOCATION_A, PARTITION_A_NAME, true);
assertExpiredMetadata(expiredMetadata.get(1), LOCATION_B, PARTITION_B_NAME, true);
}

// New test to check if expired metadata for Iceberg tables is filtered
@Test
public void expiredMetadataIcebergTableEventIsFiltered() throws SQLException, IOException, URISyntaxException {
//create a message for an Iceberg table by including table_type=ICEBERG in the payload
CreateTableSqsMessage createIcebergTableSqsMessage = new CreateTableSqsMessage(LOCATION_A, true);
createIcebergTableSqsMessage.setTableType("ICEBERG");
createIcebergTableSqsMessage.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");
amazonSQS.sendMessage(sendMessageRequest(createIcebergTableSqsMessage.getFormattedString()));
// wait for SchedulerApiary to process message
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getExpiredMetadataRowCount() == 0);
// asserts that no expired metadata was scheduled
List<HousekeepingMetadata> expiredMetadata = getExpiredMetadata();
assertThat(expiredMetadata).isEmpty();
// verify metrics (updated assertMetrics) below
assertMetrics(false);
// assert the event was deleted from the queue
int queueSize = getSqsQueueSize();
assertThat(queueSize).isEqualTo(0);
}

@Test
Expand All @@ -233,9 +254,9 @@ private SendMessageRequest sendMessageRequest(String payload) {
return new SendMessageRequest(ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE), payload);
}

private void assertExpiredMetadata(HousekeepingMetadata actual, String expectedPath, String partitionName) {
private void assertExpiredMetadata(HousekeepingMetadata actual, String expectedPath, String partitionName, boolean expectScheduledExpiredMetric) {
assertHousekeepingMetadata(actual, expectedPath, partitionName);
assertMetrics();
assertMetrics(expectScheduledExpiredMetric);
}

public void assertHousekeepingMetadata(
Expand All @@ -256,13 +277,40 @@ public void assertHousekeepingMetadata(
assertThat(actual.getLifecycleType()).isEqualTo(EXPIRED.toString());
}

public void assertMetrics() {
/**
* Previously, assertExpiredMetadata didn't differentiate between whether specific metrics (e.g., metadata-scheduled) were expected to be present or not
* Adding boolean param allows us to check if SCHEDULED_EXPIRED_METRIC exists
* This allows me to check if the metadata-scheduled is present in expiredMetadataIcebergTableEventIsFiltered test.
*/
public void assertMetrics(boolean expectScheduledExpiredMetric) {
Set<MeterRegistry> meterRegistry = ((CompositeMeterRegistry) BeekeeperSchedulerApiary.meterRegistry())
.getRegistries();
assertThat(meterRegistry).hasSize(2);
meterRegistry.forEach(registry -> {
List<Meter> meters = registry.getMeters();
assertThat(meters).extracting("id", Meter.Id.class).extracting("name").contains(SCHEDULED_EXPIRED_METRIC);
if (expectScheduledExpiredMetric) {
assertThat(meters).extracting("id", Meter.Id.class)
.extracting("name")
.contains(SCHEDULED_EXPIRED_METRIC);
} else {
assertThat(meters).extracting("id", Meter.Id.class)
.extracting("name")
.doesNotContain(SCHEDULED_EXPIRED_METRIC);
}
});
}
}

// retrieves the current number of messages to check if the event has been added to the SQS queue or successfully ignored
private int getSqsQueueSize() {
String queueUrl = ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE);
// fetch the number of messages
String approximateNumberOfMessages = amazonSQS.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages"))
.getAttributes()
.get("ApproximateNumberOfMessages");

//return the count as an integer
return approximateNumberOfMessages != null && !approximateNumberOfMessages.isEmpty()
? Integer.parseInt(approximateNumberOfMessages)
: 0;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2023 Expedia, Inc.
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,8 +68,9 @@

@Testcontainers
public class BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest extends BeekeeperIntegrationTestBase {

private static final int TIMEOUT = 5;
// changes similar to BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest
private static final int TIMEOUT = 30;
// updated to match BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest, asynchronous operations so 5 seconds might not be enough
private static final String APIARY_QUEUE_URL_PROPERTY = "properties.apiary.queue-url";

private static final String QUEUE = "apiary-receiver-queue";
Expand Down Expand Up @@ -118,7 +119,7 @@ public void unreferencedAlterTableEvent() throws SQLException, IOException, URIS
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 1);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/oldTableLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/oldTableLocation", true);
}

@Test
Expand All @@ -132,8 +133,8 @@ public void unreferencedMultipleAlterTableEvents() throws SQLException, IOExcept
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/oldTableLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/tableLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/oldTableLocation", true);
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/tableLocation", true);
}

@Test
Expand All @@ -149,24 +150,24 @@ public void unreferencedAlterPartitionEvent() throws SQLException, IOException,
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/unreferencedPartitionLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation", true);
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/unreferencedPartitionLocation", true);
}

@Test
public void unreferencedMultipleAlterPartitionEvent() throws IOException, SQLException, URISyntaxException {
List
.of(new AlterPartitionSqsMessage("s3://bucket/table/expiredTableLocation",
"s3://bucket/table/partitionLocation", "s3://bucket/table/unreferencedPartitionLocation", true, true),
"s3://bucket/table/partitionLocation", "s3://bucket/table/unreferencedPartitionLocation", true, true),
new AlterPartitionSqsMessage("s3://bucket/table/expiredTableLocation2",
"s3://bucket/table/partitionLocation2", "s3://bucket/table/partitionLocation", true, true))
.forEach(msg -> amazonSQS.sendMessage(sendMessageRequest(msg.getFormattedString())));

await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/unreferencedPartitionLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation", true);
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/unreferencedPartitionLocation", true);
}

@Test
Expand All @@ -179,8 +180,8 @@ public void unreferencedDropPartitionEvent() throws SQLException, IOException, U
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/partitionLocation2");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation", true);
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/partitionLocation2", true);
}

@Test
Expand All @@ -192,8 +193,25 @@ public void unreferencedDropTableEvent() throws SQLException, IOException, URISy
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/tableLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/tableLocation2");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/tableLocation", true);
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/tableLocation2", true);
}

@Test
public void unreferencedIcebergTableEventIsFiltered() throws SQLException, IOException, URISyntaxException {
DropTableSqsMessage dropIcebergTableSqsMessage = new DropTableSqsMessage("s3://bucket/icebergTableLocation", true, true);
dropIcebergTableSqsMessage.setTableType("ICEBERG");
dropIcebergTableSqsMessage.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");
amazonSQS.sendMessage(sendMessageRequest(dropIcebergTableSqsMessage.getFormattedString()));

await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 0);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertThat(unreferencedPaths).isEmpty();
assertMetrics(false);

int queueSize = getSqsQueueSize();
assertThat(queueSize).isEqualTo(0);
}

@Test
Expand All @@ -216,9 +234,9 @@ private SendMessageRequest sendMessageRequest(String payload) {
return new SendMessageRequest(ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE), payload);
}

private void assertUnreferencedPath(HousekeepingPath actual, String expectedPath) {
private void assertUnreferencedPath(HousekeepingPath actual, String expectedPath, boolean expectScheduledUnreferencedMetric) {
assertHousekeepingEntity(actual, expectedPath);
assertMetrics();
assertMetrics(expectScheduledUnreferencedMetric);
}

public void assertHousekeepingEntity(HousekeepingPath actual, String expectedPath) {
Expand All @@ -235,13 +253,32 @@ public void assertHousekeepingEntity(HousekeepingPath actual, String expectedPat
assertThat(actual.getLifecycleType()).isEqualTo(UNREFERENCED.toString());
}

public void assertMetrics() {
public void assertMetrics(boolean expectScheduledUnreferencedMetric) {
Set<MeterRegistry> meterRegistry = ((CompositeMeterRegistry) BeekeeperSchedulerApiary.meterRegistry())
.getRegistries();
assertThat(meterRegistry).hasSize(2);
meterRegistry.forEach(registry -> {
List<Meter> meters = registry.getMeters();
assertThat(meters).extracting("id", Meter.Id.class).extracting("name").contains(SCHEDULED_ORPHANED_METRIC);
if (expectScheduledUnreferencedMetric) {
assertThat(meters).extracting("id", Meter.Id.class)
.extracting("name")
.contains(SCHEDULED_ORPHANED_METRIC);
} else {
assertThat(meters).extracting("id", Meter.Id.class)
.extracting("name")
.doesNotContain(SCHEDULED_ORPHANED_METRIC);
}
});
}
}

private int getSqsQueueSize() {
String queueUrl = ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE);
String approximateNumberOfMessages = amazonSQS.getQueueAttributes(queueUrl, List.of("ApproximateNumberOfMessages"))
.getAttributes()
.get("ApproximateNumberOfMessages");

return approximateNumberOfMessages != null && !approximateNumberOfMessages.isEmpty()
? Integer.parseInt(approximateNumberOfMessages)
: 0;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2020 Expedia, Inc.
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -99,6 +99,18 @@ public void setWhitelisted(boolean isWhitelisted) {
tableParameters.add(BEEKEEPER_HIVE_EVENT_WHITELIST, new JsonPrimitive(whitelist));
}

//enable the setting of the table_type parameter in SQS messages, to allow tests to simulate events for Iceberg/non-Iceberg tables.
public void setTableType(String tableType) {
JsonObject tableParameters = apiaryEventMessageJsonObject.getAsJsonObject(EVENT_TABLE_PARAMETERS_KEY);
tableParameters.add("table_type", new JsonPrimitive(tableType));
}

// New method to set output_format
public void setOutputFormat(String outputFormat) {
JsonObject tableParameters = apiaryEventMessageJsonObject.getAsJsonObject(EVENT_TABLE_PARAMETERS_KEY);
tableParameters.add("output_format", new JsonPrimitive(outputFormat));
}

public final String getFormattedString() {
apiaryEventJsonObject.add(APIARY_EVENT_MESSAGE_KEY, new JsonPrimitive(apiaryEventMessageJsonObject.toString()));
return apiaryEventJsonObject.toString();
Expand All @@ -107,4 +119,4 @@ public final String getFormattedString() {
public JsonObject getApiaryEventMessageJsonObject() {
return apiaryEventMessageJsonObject;
}
}
}
Loading

0 comments on commit 7ff8ce9

Please sign in to comment.