diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java index 492017c6..83a1e9b6 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2020 Expedia, Inc. + * Copyright (C) 2019-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ import com.expediagroup.beekeeper.core.model.LifecycleEventType; import com.expediagroup.beekeeper.scheduler.apiary.filter.EventTypeListenerEventFilter; +import com.expediagroup.beekeeper.scheduler.apiary.filter.IcebergTableListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.filter.LocationOnlyUpdateListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.filter.TableParameterListenerEventFilter; @@ -96,7 +97,8 @@ public MessageEventHandler unreferencedHousekeepingPathMessageEventHandler( new EventTypeListenerEventFilter(eventClasses), new LocationOnlyUpdateListenerEventFilter(), new TableParameterListenerEventFilter(), - new WhitelistedListenerEventFilter() + new WhitelistedListenerEventFilter(), + new IcebergTableListenerEventFilter() ); return new MessageEventHandler(generator, filters); @@ -120,7 +122,8 @@ public MessageEventHandler expiredHousekeepingMetadataMessageEventHandler( List filters = List.of( new EventTypeListenerEventFilter(eventClasses), - new TableParameterListenerEventFilter() + new TableParameterListenerEventFilter(), + new IcebergTableListenerEventFilter() ); return new MessageEventHandler(generator, filters); @@ -139,4 +142,4 @@ public BeekeeperEventReader eventReader( return new MessageReaderAdapter(messageReader, handlers); } -} +} \ No newline at end of file diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java new file mode 100644 index 00000000..5712fbe6 --- /dev/null +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.scheduler.apiary.filter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent; +import com.expediagroup.beekeeper.core.model.LifecycleEventType; + +import java.util.Map; + +public class IcebergTableListenerEventFilter implements ListenerEventFilter { + + private static final Logger log = LogManager.getLogger(IcebergTableListenerEventFilter.class); + + private static final String TABLE_TYPE_KEY = "table_type"; + private static final String TABLE_TYPE_ICEBERG_VALUE = "ICEBERG"; + + @Override + public boolean isFiltered(ListenerEvent event, LifecycleEventType type) { + Map tableParameters = event.getTableParameters(); + + // Check if the table_type parameter indicates an Iceberg table + if (tableParameters != null) { + String tableType = tableParameters.get(TABLE_TYPE_KEY); + if (TABLE_TYPE_ICEBERG_VALUE.equalsIgnoreCase(tableType)) { + log.info("Ignoring Iceberg table '{}.{}'.", event.getDbName(), event.getTableName()); + return true; + } + } + + // Check if the output_format indicates an Iceberg table + String outputFormat = (tableParameters != null) ? tableParameters.get("output_format") : null; + if (outputFormat != null && outputFormat.toLowerCase().contains("iceberg")) { + log.info("Ignoring Iceberg table '{}.{}'.", event.getDbName(), event.getTableName()); + return true; + } + + return false; + } +} \ No newline at end of file diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java index 44f0e2f2..141cc048 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2023 Expedia, Inc. + * Copyright (C) 2019-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,4 +58,8 @@ private boolean shouldFilterMessage(ListenerEvent listenerEvent) { private List generateHousekeepingEntities(ListenerEvent listenerEvent) { return generator.generate(listenerEvent, CLIENT_ID); } + + public List getFilters() { + return filters; + } } diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java index 2aa5f17b..85d71990 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2020 Expedia, Inc. + * Copyright (C) 2019-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.EnumMap; +import java.util.List; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; @@ -32,6 +33,8 @@ import com.expedia.apiary.extensions.receiver.sqs.messaging.SqsMessageReader; import com.expediagroup.beekeeper.core.model.LifecycleEventType; +import com.expediagroup.beekeeper.scheduler.apiary.filter.IcebergTableListenerEventFilter; +import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.generator.ExpiredHousekeepingMetadataGenerator; import com.expediagroup.beekeeper.scheduler.apiary.generator.HousekeepingEntityGenerator; import com.expediagroup.beekeeper.scheduler.apiary.generator.UnreferencedHousekeepingPathGenerator; @@ -117,4 +120,20 @@ public void validatePathEventReader() { mock(MessageEventHandler.class)); assertThat(reader).isInstanceOf(BeekeeperEventReader.class); } + + @Test + public void validateUnreferencedHousekeepingPathMessageEventHandlerIncludesIcebergFilter() { + MessageEventHandler handler = commonBeans.unreferencedHousekeepingPathMessageEventHandler(unreferencedHousekeepingPathGenerator); + List filters = handler.getFilters(); + assertThat(filters).hasAtLeastOneElementOfType(IcebergTableListenerEventFilter.class); + } + + @Test + public void validateExpiredHousekeepingMetadataMessageEventHandlerIncludesIcebergFilter() { + MessageEventHandler handler = commonBeans.expiredHousekeepingMetadataMessageEventHandler(expiredHousekeepingMetadataGenerator); + List filters = handler.getFilters(); + assertThat(filters).hasAtLeastOneElementOfType(IcebergTableListenerEventFilter.class); + } + } + diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java new file mode 100644 index 00000000..73430738 --- /dev/null +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.scheduler.apiary.filter; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent; +import com.expediagroup.beekeeper.core.model.LifecycleEventType; + +public class IcebergTableListenerEventFilterTest { + + private final IcebergTableListenerEventFilter filter = new IcebergTableListenerEventFilter(); + + @Test + public void shouldFilterWhenTableTypeIsIceberg() { + ListenerEvent event = createListenerEventWithTableType("ICEBERG"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isTrue(); + } + + @Test + public void shouldFilterWhenOutputFormatContainsIceberg() { + ListenerEvent event = createListenerEventWithOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isTrue(); + } + + @Test + public void shouldNotFilterWhenTableIsNotIceberg() { + ListenerEvent event = createListenerEventWithTableType("MANAGED_TABLE"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isFalse(); + } + + @Test + public void shouldNotFilterWhenOutputFormatDoesNotContainIceberg() { + ListenerEvent event = createListenerEventWithOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isFalse(); + } + + @Test + public void shouldHandleNullTableParameters() { + ListenerEvent event = createListenerEventWithTableParameters(null); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isFalse(); + } + + @Test + public void shouldFilterWhenTableTypeIsIcebergIgnoreCase() { + ListenerEvent event = createListenerEventWithTableType("iceberg"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isTrue(); + } + + @Test + public void shouldFilterWhenOutputFormatContainsIcebergIgnoreCase() { + ListenerEvent event = createListenerEventWithOutputFormat("org.apache.ICEBERG.mr.hive.HiveIcebergOutputFormat"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertThat(isFiltered).isTrue(); + } + + // Helper methods to create ListenerEvent instances with the necessary table parameters + private ListenerEvent createListenerEventWithTableType(String tableType) { + Map tableParameters = new HashMap<>(); + tableParameters.put("table_type", tableType); + return createListenerEventWithTableParameters(tableParameters); + } + + private ListenerEvent createListenerEventWithOutputFormat(String outputFormat) { + Map tableParameters = new HashMap<>(); + tableParameters.put("output_format", outputFormat); + return createListenerEventWithTableParameters(tableParameters); + } + + private ListenerEvent createListenerEventWithTableParameters(Map tableParameters) { + return new ListenerEvent() { + @Override + public String getDbName() { + return "test_db"; + } + + @Override + public String getTableName() { + return "test_table"; + } + + @Override + public Map getTableParameters() { + return tableParameters; + } + }; + } +}