Skip to content

Commit

Permalink
Add IcebergTableListenerEventFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamza Jugon committed Nov 27, 2024
1 parent 58c6e65 commit a32e9d0
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import com.expediagroup.beekeeper.core.model.LifecycleEventType;
import com.expediagroup.beekeeper.scheduler.apiary.filter.EventTypeListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.IcebergTableListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.LocationOnlyUpdateListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.TableParameterListenerEventFilter;
Expand Down Expand Up @@ -96,7 +97,8 @@ public MessageEventHandler unreferencedHousekeepingPathMessageEventHandler(
new EventTypeListenerEventFilter(eventClasses),
new LocationOnlyUpdateListenerEventFilter(),
new TableParameterListenerEventFilter(),
new WhitelistedListenerEventFilter()
new WhitelistedListenerEventFilter(),
new IcebergTableListenerEventFilter()
);

return new MessageEventHandler(generator, filters);
Expand All @@ -120,7 +122,8 @@ public MessageEventHandler expiredHousekeepingMetadataMessageEventHandler(

List<ListenerEventFilter> filters = List.of(
new EventTypeListenerEventFilter(eventClasses),
new TableParameterListenerEventFilter()
new TableParameterListenerEventFilter(),
new IcebergTableListenerEventFilter()
);

return new MessageEventHandler(generator, filters);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.beekeeper.scheduler.apiary.filter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Component;

import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent;
import com.expediagroup.beekeeper.core.model.LifecycleEventType;

import java.util.Locale;
import java.util.Map;

@Component
public class IcebergTableListenerEventFilter implements ListenerEventFilter {

private static final Logger log = LogManager.getLogger(IcebergTableListenerEventFilter.class);

private static final String METADATA_LOCATION_KEY = "metadata_location";
private static final String TABLE_TYPE_KEY = "table_type";
private static final String TABLE_TYPE_ICEBERG_VALUE = "iceberg";

@Override
public boolean isFiltered(ListenerEvent event, LifecycleEventType type) {
Map<String, String> tableParameters = event.getTableParameters();

if (tableParameters != null) {
String metadataLocation = tableParameters.getOrDefault(METADATA_LOCATION_KEY,null);
String tableType = tableParameters.getOrDefault(TABLE_TYPE_KEY,null);

boolean hasMetadataLocation = metadataLocation != null && !metadataLocation.trim().isEmpty();
boolean isIcebergType = tableType != null && tableType.toLowerCase().contains(TABLE_TYPE_ICEBERG_VALUE);

if (hasMetadataLocation || isIcebergType) {
log.info("Iceberg table '{}.{}' is not currently supported in Beekeeper.",
event.getDbName(), event.getTableName());
return true;
}
}
return false;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ private boolean shouldFilterMessage(ListenerEvent listenerEvent) {
private List<HousekeepingEntity> generateHousekeepingEntities(ListenerEvent listenerEvent) {
return generator.generate(listenerEvent, CLIENT_ID);
}

public List<ListenerEventFilter> getFilters() {
return filters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.EnumMap;
import java.util.List;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -32,6 +33,8 @@
import com.expedia.apiary.extensions.receiver.sqs.messaging.SqsMessageReader;

import com.expediagroup.beekeeper.core.model.LifecycleEventType;
import com.expediagroup.beekeeper.scheduler.apiary.filter.IcebergTableListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.generator.ExpiredHousekeepingMetadataGenerator;
import com.expediagroup.beekeeper.scheduler.apiary.generator.HousekeepingEntityGenerator;
import com.expediagroup.beekeeper.scheduler.apiary.generator.UnreferencedHousekeepingPathGenerator;
Expand Down Expand Up @@ -117,4 +120,18 @@ public void validatePathEventReader() {
mock(MessageEventHandler.class));
assertThat(reader).isInstanceOf(BeekeeperEventReader.class);
}

@Test
public void validateUnreferencedHousekeepingPathMessageEventHandlerIncludesIcebergFilter() {
MessageEventHandler handler = commonBeans.unreferencedHousekeepingPathMessageEventHandler(unreferencedHousekeepingPathGenerator);
List<ListenerEventFilter> filters = handler.getFilters();
assertThat(filters).hasAtLeastOneElementOfType(IcebergTableListenerEventFilter.class);
}

@Test
public void validateExpiredHousekeepingMetadataMessageEventHandlerIncludesIcebergFilter() {
MessageEventHandler handler = commonBeans.expiredHousekeepingMetadataMessageEventHandler(expiredHousekeepingMetadataGenerator);
List<ListenerEventFilter> filters = handler.getFilters();
assertThat(filters).hasAtLeastOneElementOfType(IcebergTableListenerEventFilter.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.beekeeper.scheduler.apiary.filter;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent;
import com.expediagroup.beekeeper.core.model.LifecycleEventType;

public class IcebergTableListenerEventFilterTest {

private final IcebergTableListenerEventFilter filter = new IcebergTableListenerEventFilter();

@Test
public void shouldFilterWhenTableTypeIsIceberg() {
ListenerEvent event = createListenerEventWithTableType("ICEBERG");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isTrue();
}

@Test
public void shouldNotFilterWhenTableTypeIsNotIceberg() {
ListenerEvent event = createListenerEventWithTableType("HIVE");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isFalse();
}

@Test
public void shouldFilterWhenTableTypeIsIcebergIgnoreCase() {
ListenerEvent event = createListenerEventWithTableType("iceberg");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isTrue();
}

@Test
public void shouldFilterWhenMetadataLocationIsPresent() {
ListenerEvent event = createListenerEventWithMetadataLocation("s3://example/path/to/metadata");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isTrue();
}

@Test
public void shouldNotFilterWhenMetadataLocationIsEmpty() {
ListenerEvent event = createListenerEventWithMetadataLocation("");
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isFalse();
}

@Test
public void shouldNotFilterWhenMetadataLocationIsNull() {
ListenerEvent event = createListenerEventWithMetadataLocation(null);
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isFalse();
}

@Test
public void shouldHandleNullTableParameters() {
ListenerEvent event = createListenerEventWithTableParameters(null);
boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED);
assertThat(isFiltered).isFalse();
}

private ListenerEvent createListenerEventWithTableType(String tableType) {
Map<String, String> tableParameters = new HashMap<>();
tableParameters.put("table_type", tableType);
return createListenerEventWithTableParameters(tableParameters);
}

private ListenerEvent createListenerEventWithMetadataLocation(String metadataLocation) {
Map<String, String> tableParameters = new HashMap<>();
tableParameters.put("metadata_location", metadataLocation);
return createListenerEventWithTableParameters(tableParameters);
}

private ListenerEvent createListenerEventWithTableParameters(Map<String, String> tableParameters) {
return new ListenerEvent() {
@Override
public String getDbName() {
return "test_db";
}

@Override
public String getTableName() {
return "test_table";
}

@Override
public Map<String, String> getTableParameters() {
return tableParameters;
}
};
}
}

0 comments on commit a32e9d0

Please sign in to comment.