Skip to content

Commit

Permalink
Ignore iceberg tables using filter and updating commonBeans
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamza Jugon committed Nov 11, 2024
1 parent fdd37b5 commit 44d7268
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2020 Expedia, Inc.
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +39,7 @@

import com.expediagroup.beekeeper.core.model.LifecycleEventType;
import com.expediagroup.beekeeper.scheduler.apiary.filter.EventTypeListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.IcebergTableListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.LocationOnlyUpdateListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.TableParameterListenerEventFilter;
Expand Down Expand Up @@ -96,7 +97,8 @@ public MessageEventHandler unreferencedHousekeepingPathMessageEventHandler(
new EventTypeListenerEventFilter(eventClasses),
new LocationOnlyUpdateListenerEventFilter(),
new TableParameterListenerEventFilter(),
new WhitelistedListenerEventFilter()
new WhitelistedListenerEventFilter(),
new IcebergTableListenerEventFilter()
);

return new MessageEventHandler(generator, filters);
Expand All @@ -120,7 +122,8 @@ public MessageEventHandler expiredHousekeepingMetadataMessageEventHandler(

List<ListenerEventFilter> filters = List.of(
new EventTypeListenerEventFilter(eventClasses),
new TableParameterListenerEventFilter()
new TableParameterListenerEventFilter(),
new IcebergTableListenerEventFilter()
);

return new MessageEventHandler(generator, filters);
Expand All @@ -139,4 +142,4 @@ public BeekeeperEventReader eventReader(

return new MessageReaderAdapter(messageReader, handlers);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.beekeeper.scheduler.apiary.filter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent;
import com.expediagroup.beekeeper.core.model.LifecycleEventType;

import java.util.Map;

public class IcebergTableListenerEventFilter implements ListenerEventFilter {

private static final Logger log = LogManager.getLogger(IcebergTableListenerEventFilter.class);

private static final String TABLE_TYPE_KEY = "table_type";
private static final String TABLE_TYPE_ICEBERG_VALUE = "ICEBERG";

@Override
public boolean isFiltered(ListenerEvent event, LifecycleEventType type) {
Map<String, String> tableParameters = event.getTableParameters();

// Check if the table_type parameter indicates an Iceberg table
if (tableParameters != null) {
String tableType = tableParameters.get(TABLE_TYPE_KEY);
if (TABLE_TYPE_ICEBERG_VALUE.equalsIgnoreCase(tableType)) {
log.info("Ignoring Iceberg table '{}.{}'.", event.getDbName(), event.getTableName());
return true;
}
}

// Check if the output_format indicates an Iceberg table
String outputFormat = (tableParameters != null) ? tableParameters.get("output_format") : null;
if (outputFormat != null && outputFormat.toLowerCase().contains("iceberg")) {
log.info("Ignoring Iceberg table '{}.{}'.", event.getDbName(), event.getTableName());
return true;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2023 Expedia, Inc.
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,4 +58,8 @@ private boolean shouldFilterMessage(ListenerEvent listenerEvent) {
private List<HousekeepingEntity> generateHousekeepingEntities(ListenerEvent listenerEvent) {
return generator.generate(listenerEvent, CLIENT_ID);
}

public List<ListenerEventFilter> getFilters() {
return filters;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2020 Expedia, Inc.
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.EnumMap;
import java.util.List;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -32,6 +33,8 @@
import com.expedia.apiary.extensions.receiver.sqs.messaging.SqsMessageReader;

import com.expediagroup.beekeeper.core.model.LifecycleEventType;
import com.expediagroup.beekeeper.scheduler.apiary.filter.IcebergTableListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.generator.ExpiredHousekeepingMetadataGenerator;
import com.expediagroup.beekeeper.scheduler.apiary.generator.HousekeepingEntityGenerator;
import com.expediagroup.beekeeper.scheduler.apiary.generator.UnreferencedHousekeepingPathGenerator;
Expand Down Expand Up @@ -117,4 +120,20 @@ public void validatePathEventReader() {
mock(MessageEventHandler.class));
assertThat(reader).isInstanceOf(BeekeeperEventReader.class);
}

@Test
public void validateUnreferencedHousekeepingPathMessageEventHandlerIncludesIcebergFilter() {
MessageEventHandler handler = commonBeans.unreferencedHousekeepingPathMessageEventHandler(unreferencedHousekeepingPathGenerator);
List<ListenerEventFilter> filters = handler.getFilters();
assertThat(filters).hasAtLeastOneElementOfType(IcebergTableListenerEventFilter.class);
}

@Test
public void validateExpiredHousekeepingMetadataMessageEventHandlerIncludesIcebergFilter() {
MessageEventHandler handler = commonBeans.expiredHousekeepingMetadataMessageEventHandler(expiredHousekeepingMetadataGenerator);
List<ListenerEventFilter> filters = handler.getFilters();
assertThat(filters).hasAtLeastOneElementOfType(IcebergTableListenerEventFilter.class);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.beekeeper.scheduler.apiary.filter;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent;
import com.expediagroup.beekeeper.core.model.LifecycleEventType;

public class IcebergTableListenerEventFilterTest {

private final IcebergTableListenerEventFilter filter = new IcebergTableListenerEventFilter();

@Test
public void shouldFilterWhenTableTypeIsIceberg() {
ListenerEvent event = createListenerEventWithTableType("ICEBERG");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isTrue();
}

@Test
public void shouldFilterWhenOutputFormatContainsIceberg() {
ListenerEvent event = createListenerEventWithOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isTrue();
}

@Test
public void shouldNotFilterWhenTableIsNotIceberg() {
ListenerEvent event = createListenerEventWithTableType("MANAGED_TABLE");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isFalse();
}

@Test
public void shouldNotFilterWhenOutputFormatDoesNotContainIceberg() {
ListenerEvent event = createListenerEventWithOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isFalse();
}

@Test
public void shouldHandleNullTableParameters() {
ListenerEvent event = createListenerEventWithTableParameters(null);
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isFalse();
}

@Test
public void shouldFilterWhenTableTypeIsIcebergIgnoreCase() {
ListenerEvent event = createListenerEventWithTableType("iceberg");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isTrue();
}

@Test
public void shouldFilterWhenOutputFormatContainsIcebergIgnoreCase() {
ListenerEvent event = createListenerEventWithOutputFormat("org.apache.ICEBERG.mr.hive.HiveIcebergOutputFormat");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isTrue();
}

// Helper methods to create ListenerEvent instances with the necessary table parameters
private ListenerEvent createListenerEventWithTableType(String tableType) {
Map<String, String> tableParameters = new HashMap<>();
tableParameters.put("table_type", tableType);
return createListenerEventWithTableParameters(tableParameters);
}

private ListenerEvent createListenerEventWithOutputFormat(String outputFormat) {
Map<String, String> tableParameters = new HashMap<>();
tableParameters.put("output_format", outputFormat);
return createListenerEventWithTableParameters(tableParameters);
}

private ListenerEvent createListenerEventWithTableParameters(Map<String, String> tableParameters) {
return new ListenerEvent() {
@Override
public String getDbName() {
return "test_db";
}

@Override
public String getTableName() {
return "test_table";
}

@Override
public Map<String, String> getTableParameters() {
return tableParameters;
}
};
}
}

0 comments on commit 44d7268

Please sign in to comment.