From 411f19a75f8eb5f1182d36610a1f61bc4cd2645d Mon Sep 17 00:00:00 2001 From: Abhimanyu Gupta Date: Mon, 15 Jul 2019 16:57:35 +0100 Subject: [PATCH 1/2] SAving progress --- pom.xml | 4 +- .../AbstractMetaStoreEventListener.java | 47 ++- .../shunting/yard/common/event/EventType.java | 33 ++- .../common/event/ListenerEventFactory.java | 75 +++++ .../event/SerializableListenerEvent.java | 8 +- .../yard/common/io/MetaStoreEventSerDe.java | 8 +- .../io/jackson/JsonMetaStoreEventSerDe.java | 50 ++-- .../io/java/JavaMetaStoreEventSerDe.java | 9 +- .../yard/common/messaging/Message.java | 22 +- .../yard/common/messaging/MessageReader.java | 6 +- .../AbstractMetaStoreEventListenerTest.java | 56 ++-- .../yard/common/event/EventTypeTest.java | 5 +- .../SerializableAddPartitionEventTest.java | 177 +++++------ .../SerializableAlterPartitionEventTest.java | 145 ++++----- .../SerializableAlterTableEventTest.java | 129 ++++---- .../SerializableCreateTableEventTest.java | 123 ++++---- .../SerializableDropPartitionEventTest.java | 175 ++++++----- .../event/SerializableDropTableEventTest.java | 123 ++++---- .../event/SerializableInsertEventTest.java | 157 +++++----- .../SerializableListenerEventFactoryTest.java | 275 +++++++++--------- .../io/AbstractMetaStoreEventSerDeTest.java | 7 +- shunting-yard-emitter/pom.xml | 4 +- .../emitter/kafka/KafkaProducerProperty.java | 2 +- .../kafka/messaging/KafkaMessageTask.java | 10 +- .../KafkaMetaStoreEventListenerTest.java | 42 +-- .../emitter/sqs/messaging/SqsMessageTask.java | 14 +- .../kafka/messaging/KafkaMessageReader.java | 16 +- .../messaging/MessageReaderAdapterTest.java | 15 + 28 files changed, 1008 insertions(+), 729 deletions(-) create mode 100644 shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/ListenerEventFactory.java diff --git a/pom.xml b/pom.xml index da6e146..913dbfb 100644 --- a/pom.xml +++ b/pom.xml @@ -24,9 +24,9 @@ shunting-yard-common shunting-yard-emitter - shunting-yard-receiver + diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListener.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListener.java index 377417e..fc9e2e4 100644 --- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListener.java +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListener.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,10 @@ */ package com.hotels.shunting.yard.common.emitter; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; + import static com.hotels.shunting.yard.common.emitter.EmitterUtils.error; +import static com.hotels.shunting.yard.common.event.CustomEventParameters.HIVE_VERSION; import java.util.concurrent.ExecutorService; @@ -37,11 +40,13 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import org.apache.hive.common.util.HiveVersionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.hotels.shunting.yard.common.event.SerializableListenerEvent; +import com.hotels.shunting.yard.common.event.EventType; import com.hotels.shunting.yard.common.event.SerializableListenerEventFactory; import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe; import com.hotels.shunting.yard.common.messaging.Message; @@ -72,11 +77,13 @@ private MessageTask message(Message message) { return new WrappingMessageTask(getMessageTaskFactory().newTask(message)); } - private Message withPayload(SerializableListenerEvent event) throws MetaException { + private Message withPayload(ListenerEvent event, String dbName, String tableName, EventType eventType) + throws MetaException { return Message .builder() - .database(event.getDatabaseName()) - .table(event.getTableName()) + .database(dbName) + .table(tableName) + .eventType(eventType) .payload(getMetaStoreEventSerDe().marshal(event)) .build(); } @@ -85,7 +92,12 @@ private Message withPayload(SerializableListenerEvent event) throws MetaExceptio public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { log.info("Create table event received"); try { - executorService.submit(message(withPayload(serializableListenerEventFactory.create(tableEvent)))); + tableEvent.putParameter(HIVE_VERSION.varname(), HiveVersionInfo.getVersion()); + tableEvent.putParameter(METASTOREURIS.varname, super.getConf().get(METASTOREURIS.varname)); + + executorService + .submit(message(withPayload(tableEvent, tableEvent.getTable().getDbName(), + tableEvent.getTable().getTableName(), EventType.ON_CREATE_TABLE))); } catch (Exception e) { error(e); } @@ -95,7 +107,9 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { public void onDropTable(DropTableEvent tableEvent) throws MetaException { log.info("Drop table event received"); try { - executorService.submit(message(withPayload(serializableListenerEventFactory.create(tableEvent)))); + executorService + .submit(message(withPayload(tableEvent, tableEvent.getTable().getDbName(), + tableEvent.getTable().getTableName(), EventType.ON_DROP_TABLE))); } catch (Exception e) { error(e); } @@ -105,7 +119,9 @@ public void onDropTable(DropTableEvent tableEvent) throws MetaException { public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { log.info("Alter table event received"); try { - executorService.submit(message(withPayload(serializableListenerEventFactory.create(tableEvent)))); + executorService + .submit(message(withPayload(tableEvent, tableEvent.getNewTable().getDbName(), + tableEvent.getNewTable().getTableName(), EventType.ON_ALTER_TABLE))); } catch (Exception e) { error(e); } @@ -115,7 +131,9 @@ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException { log.info("Add partition event received"); try { - executorService.submit(message(withPayload(serializableListenerEventFactory.create(partitionEvent)))); + executorService + .submit(message(withPayload(partitionEvent, partitionEvent.getTable().getDbName(), + partitionEvent.getTable().getTableName(), EventType.ON_ADD_PARTITION))); } catch (Exception e) { error(e); } @@ -125,7 +143,9 @@ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaExceptio public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { log.info("Drop partition event received"); try { - executorService.submit(message(withPayload(serializableListenerEventFactory.create(partitionEvent)))); + executorService + .submit(message(withPayload(partitionEvent, partitionEvent.getTable().getDbName(), + partitionEvent.getTable().getTableName(), EventType.ON_DROP_PARTITION))); } catch (Exception e) { error(e); } @@ -135,7 +155,9 @@ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaExcept public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException { log.info("Alter partition event received"); try { - executorService.submit(message(withPayload(serializableListenerEventFactory.create(partitionEvent)))); + executorService + .submit(message(withPayload(partitionEvent, partitionEvent.getTable().getDbName(), + partitionEvent.getTable().getTableName(), EventType.ON_ALTER_PARTITION))); } catch (Exception e) { error(e); } @@ -145,7 +167,8 @@ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaExce public void onInsert(InsertEvent insertEvent) throws MetaException { log.info("Insert event received"); try { - executorService.submit(message(withPayload(serializableListenerEventFactory.create(insertEvent)))); + executorService + .submit(message(withPayload(insertEvent, insertEvent.getTable(), insertEvent.getDb(), EventType.ON_INSERT))); } catch (Exception e) { error(e); } diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/EventType.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/EventType.java index 5df4bab..054721e 100644 --- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/EventType.java +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/EventType.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,32 +15,41 @@ */ package com.hotels.shunting.yard.common.event; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; + /** * To make processing event in the receiver easier. */ public enum EventType { - ON_CREATE_TABLE(SerializableCreateTableEvent.class), - ON_ALTER_TABLE(SerializableAlterTableEvent.class), - ON_DROP_TABLE(SerializableDropTableEvent.class), - ON_ADD_PARTITION(SerializableAddPartitionEvent.class), - ON_ALTER_PARTITION(SerializableAlterPartitionEvent.class), - ON_DROP_PARTITION(SerializableDropPartitionEvent.class), - ON_INSERT(SerializableInsertEvent.class); + ON_CREATE_TABLE(CreateTableEvent.class), + ON_ALTER_TABLE(AlterTableEvent.class), + ON_DROP_TABLE(DropTableEvent.class), + ON_ADD_PARTITION(AddPartitionEvent.class), + ON_ALTER_PARTITION(AlterPartitionEvent.class), + ON_DROP_PARTITION(DropPartitionEvent.class), + ON_INSERT(InsertEvent.class); - private final Class eventClass; + private final Class eventClass; - private EventType(Class eventClass) { + private EventType(Class eventClass) { if (eventClass == null) { throw new NullPointerException("Parameter eventClass is required"); } this.eventClass = eventClass; } - public Class eventClass() { + public Class eventClass() { return eventClass; } - public static EventType forClass(Class clazz) { + public static EventType forClass(Class clazz) { for (EventType e : values()) { if (e.eventClass().equals(clazz)) { return e; diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/ListenerEventFactory.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/ListenerEventFactory.java new file mode 100644 index 0000000..f31082f --- /dev/null +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/ListenerEventFactory.java @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2016-2019 Expedia Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.shunting.yard.common.event; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; + +import static com.hotels.shunting.yard.common.event.CustomEventParameters.HIVE_VERSION; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.hive.common.util.HiveVersionInfo; + +public class ListenerEventFactory { + + private final Configuration config; + + public ListenerEventFactory(Configuration config) { + this.config = config; + } + + private T addParams(T event) { + event.putParameter(HIVE_VERSION.varname(), HiveVersionInfo.getVersion()); + event.putParameter(METASTOREURIS.varname, config.get(METASTOREURIS.varname)); + return event; + } + + public SerializableCreateTableEvent create(CreateTableEvent event) { + return new SerializableCreateTableEvent(addParams(event)); + } + + public SerializableAlterTableEvent create(AlterTableEvent event) { + return new SerializableAlterTableEvent(addParams(event)); + } + + public SerializableDropTableEvent create(DropTableEvent event) { + return new SerializableDropTableEvent(addParams(event)); + } + + public SerializableAddPartitionEvent create(AddPartitionEvent event) { + return new SerializableAddPartitionEvent(addParams(event)); + } + + public SerializableAlterPartitionEvent create(AlterPartitionEvent event) { + return new SerializableAlterPartitionEvent(addParams(event)); + } + + public SerializableDropPartitionEvent create(DropPartitionEvent event) { + return new SerializableDropPartitionEvent(addParams(event)); + } + + public SerializableInsertEvent create(InsertEvent event) { + return new SerializableInsertEvent(addParams(event)); + } + +} diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/SerializableListenerEvent.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/SerializableListenerEvent.java index 3f059a2..c07a806 100644 --- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/SerializableListenerEvent.java +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/SerializableListenerEvent.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,9 +55,9 @@ protected SerializableListenerEvent(ListenerEvent event) { environmentContext = event.getEnvironmentContext(); } - public EventType getEventType() { - return EventType.forClass(this.getClass()); - } + // public EventType getEventType() { + // return EventType.forClass(this.getClass()); + // } public abstract String getDatabaseName(); diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDe.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDe.java index e86da3c..a7d8df4 100644 --- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDe.java +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDe.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,9 @@ package com.hotels.shunting.yard.common.io; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import com.hotels.shunting.yard.common.ShuntingYardException; -import com.hotels.shunting.yard.common.event.SerializableListenerEvent; public interface MetaStoreEventSerDe { @@ -31,8 +31,8 @@ static T serDeForClassName(String className) { } } - byte[] marshal(SerializableListenerEvent listenerEvent) throws MetaException; + byte[] marshal(ListenerEvent listenerEvent) throws MetaException; - T unmarshal(byte[] payload) throws MetaException; + T unmarshal(byte[] payload) throws MetaException; } diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/JsonMetaStoreEventSerDe.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/JsonMetaStoreEventSerDe.java index 2b8658f..07a6604 100644 --- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/JsonMetaStoreEventSerDe.java +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/JsonMetaStoreEventSerDe.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.events.CreateTableEvent; @@ -33,7 +34,6 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.hotels.shunting.yard.common.event.EventType; -import com.hotels.shunting.yard.common.event.SerializableListenerEvent; import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe; public class JsonMetaStoreEventSerDe implements MetaStoreEventSerDe { @@ -60,7 +60,7 @@ private void registerDeserializers(SimpleModule module) { } @Override - public byte[] marshal(SerializableListenerEvent listenerEvent) throws MetaException { + public byte[] marshal(ListenerEvent listenerEvent) throws MetaException { try { log.debug("Marshalling event: {}", listenerEvent); ByteArrayOutputStream buffer = new ByteArrayOutputStream(); @@ -78,7 +78,7 @@ public byte[] marshal(SerializableListenerEvent listenerEvent) throws MetaExcept } @Override - public T unmarshal(byte[] payload) throws MetaException { + public T unmarshal(byte[] payload) throws MetaException { try { if (log.isDebugEnabled()) { log.debug("Marshalled event is: {}", new String(payload)); @@ -86,11 +86,12 @@ public T unmarshal(byte[] payload) throws ByteArrayInputStream buffer = new ByteArrayInputStream(payload); // As we don't know the type in advance we can only deserialize the event twice: // 1. Create a dummy object just to find out the type - T genericEvent = mapper.readerFor(HeplerSerializableListenerEvent.class).readValue(buffer); - log.debug("Umarshal event of type: {}", genericEvent.getEventType()); + T genericEvent = mapper.readerFor(HelperListenerEvent.class).readValue(buffer); + // log.debug("Umarshal event of type: {}", genericEvent.getEventType()); // 2. Deserialize the actual object buffer.reset(); - T event = mapper.readerFor(genericEvent.getEventType().eventClass()).readValue(buffer); + // T event = mapper.readerFor(genericEvent.getEventType().eventClass()).readValue(buffer); + T event = mapper.readerFor(EventType.ON_DROP_TABLE.eventClass()).readValue(buffer); log.debug("Unmarshalled event is: {}", event); return event; } catch (Exception e) { @@ -100,34 +101,37 @@ public T unmarshal(byte[] payload) throws } } - static class HeplerSerializableListenerEvent extends SerializableListenerEvent { + static class HelperListenerEvent extends ListenerEvent { private static final long serialVersionUID = 1L; private static final ListenerEvent DUMMY_EVENT = new CreateTableEvent(null, false, null); private EventType eventType; - HeplerSerializableListenerEvent() { - super(DUMMY_EVENT); + // HelperListenerEvent() { + // super(DUMMY_EVENT); + // } + public HelperListenerEvent(boolean status, HMSHandler handler) { + super(status, handler); } - @Override - public EventType getEventType() { - return eventType; - } + // @Override + // public EventType getEventType() { + // return eventType; + // } public void setEventType(EventType eventType) { this.eventType = eventType; } - @Override - public String getDatabaseName() { - return null; - } - - @Override - public String getTableName() { - return null; - } + // @Override + // public String getDatabaseName() { + // return null; + // } + // + // @Override + // public String getTableName() { + // return null; + // } } } diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/java/JavaMetaStoreEventSerDe.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/java/JavaMetaStoreEventSerDe.java index e75aad9..666ef3d 100644 --- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/java/JavaMetaStoreEventSerDe.java +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/java/JavaMetaStoreEventSerDe.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,17 +22,17 @@ import java.io.ObjectOutputStream; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.hotels.shunting.yard.common.event.SerializableListenerEvent; import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe; public class JavaMetaStoreEventSerDe implements MetaStoreEventSerDe { private static final Logger log = LoggerFactory.getLogger(JavaMetaStoreEventSerDe.class); @Override - public byte[] marshal(SerializableListenerEvent listenerEvent) throws MetaException { + public byte[] marshal(ListenerEvent listenerEvent) throws MetaException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); try (ObjectOutputStream out = new ObjectOutputStream(buffer)) { out.writeObject(listenerEvent); @@ -45,9 +45,10 @@ public byte[] marshal(SerializableListenerEvent listenerEvent) throws MetaExcept } @Override - public T unmarshal(byte[] payload) throws MetaException { + public T unmarshal(byte[] payload) throws MetaException { ByteArrayInputStream buffer = new ByteArrayInputStream(payload); try (ObjectInputStream in = new ObjectInputStream(buffer)) { + System.out.println(in); return (T) in.readObject(); } catch (Exception e) { String message = "Unable to deserialize event from payload"; diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/messaging/Message.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/messaging/Message.java index 383f6f8..c0df154 100644 --- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/messaging/Message.java +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/messaging/Message.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,11 +15,14 @@ */ package com.hotels.shunting.yard.common.messaging; +import com.hotels.shunting.yard.common.event.EventType; + public class Message { public static class Builder { private String database; private String table; + private EventType eventType; private long timestamp = System.currentTimeMillis(); private byte[] payload; @@ -49,6 +52,11 @@ public Builder table(String table) { return this; } + public Builder eventType(EventType eventType) { + this.eventType = eventType; + return this; + } + public Builder timestamp(long timestamp) { this.timestamp = timestamp; return this; @@ -61,8 +69,8 @@ public Builder payload(byte[] payload) { public Message build() { return new Message(checkEmpty(database, "Parameter 'database' is required"), - checkEmpty(table, "Parameter 'table' is required"), timestamp, - checkNull(payload, "Parameter 'payload' is required")); + checkEmpty(table, "Parameter 'table' is required"), checkNull(eventType, "Parameter 'eventType' is required"), + timestamp, checkNull(payload, "Parameter 'payload' is required")); } } @@ -72,12 +80,14 @@ public static Builder builder() { private final String database; private final String table; + private final EventType eventType; private final long timestamp; private final byte[] payload; - private Message(String database, String table, long timestamp, byte[] payload) { + private Message(String database, String table, EventType eventType, long timestamp, byte[] payload) { this.database = database; this.table = table; + this.eventType = eventType; this.timestamp = timestamp; this.payload = payload; } @@ -94,6 +104,10 @@ public String getTable() { return table; } + public EventType getEventType() { + return eventType; + } + public long getTimestamp() { return timestamp; } diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/messaging/MessageReader.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/messaging/MessageReader.java index ea308bf..f359c48 100644 --- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/messaging/MessageReader.java +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/messaging/MessageReader.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,11 +18,11 @@ import java.io.Closeable; import java.util.Iterator; -import com.hotels.shunting.yard.common.event.SerializableListenerEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; /** * A {@code MessageReader} is in charge of retrieving events from the messaging infrastructure. */ -public interface MessageReader extends Iterator, Closeable { +public interface MessageReader extends Iterator, Closeable { } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListenerTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListenerTest.java index 9164325..771c64e 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListenerTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListenerTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.junit.Before; import org.junit.Test; @@ -60,7 +61,6 @@ import com.hotels.shunting.yard.common.event.SerializableDropPartitionEvent; import com.hotels.shunting.yard.common.event.SerializableDropTableEvent; import com.hotels.shunting.yard.common.event.SerializableInsertEvent; -import com.hotels.shunting.yard.common.event.SerializableListenerEvent; import com.hotels.shunting.yard.common.event.SerializableListenerEventFactory; import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe; import com.hotels.shunting.yard.common.messaging.Message; @@ -78,7 +78,7 @@ public class AbstractMetaStoreEventListenerTest { private @Mock MetaStoreEventSerDe eventSerDe; private @Mock MessageTask messageTask; private @Mock MessageTaskFactory messageTaskFactory; - private @Mock SerializableListenerEventFactory serializableListenerEventFactory; + private @Mock SerializableListenerEventFactory ListenerEventFactory; private @Mock ExecutorService executorService; private @Captor ArgumentCaptor captor; @@ -89,9 +89,9 @@ public class AbstractMetaStoreEventListenerTest { @Before public void init() throws Exception { mockStatic(EmitterUtils.class); - when(eventSerDe.marshal(any(SerializableListenerEvent.class))).thenReturn(PAYLOAD); + when(eventSerDe.marshal(any(ListenerEvent.class))).thenReturn(PAYLOAD); when(messageTaskFactory.newTask(any(Message.class))).thenReturn(messageTask); - listener = new AbstractMetaStoreEventListener(config, serializableListenerEventFactory, executorService) { + listener = new AbstractMetaStoreEventListener(config, ListenerEventFactory, executorService) { @Override protected MetaStoreEventSerDe getMetaStoreEventSerDe() { return eventSerDe; @@ -110,7 +110,7 @@ public void onCreateTable() throws Exception { SerializableCreateTableEvent serializableEvent = mock(SerializableCreateTableEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onCreateTable(event); verify(executorService).submit(captor.capture()); assertThat(captor.getValue()).isEqualTo(new WrappingMessageTask(messageTask)); @@ -122,7 +122,7 @@ public void onCreateTableErrors() throws Exception { SerializableCreateTableEvent serializableEvent = mock(SerializableCreateTableEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); RuntimeException e = new RuntimeException("Something has gone wrong"); doThrow(e).when(executorService).submit(any(MessageTask.class)); listener.onCreateTable(event); @@ -136,7 +136,7 @@ public void onAlterTable() throws Exception { SerializableAlterTableEvent serializableEvent = mock(SerializableAlterTableEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onAlterTable(event); verify(executorService).submit(captor.capture()); assertThat(captor.getValue()).isEqualTo(new WrappingMessageTask(messageTask)); @@ -148,7 +148,7 @@ public void onAlterTableErrors() throws Exception { SerializableAlterTableEvent serializableEvent = mock(SerializableAlterTableEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); RuntimeException e = new RuntimeException("Something has gone wrong"); doThrow(e).when(executorService).submit(any(MessageTask.class)); listener.onAlterTable(event); @@ -162,7 +162,7 @@ public void onDropTable() throws Exception { SerializableDropTableEvent serializableEvent = mock(SerializableDropTableEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onDropTable(event); verify(executorService).submit(captor.capture()); assertThat(captor.getValue()).isEqualTo(new WrappingMessageTask(messageTask)); @@ -174,7 +174,7 @@ public void onDropTableErrors() throws Exception { SerializableDropTableEvent serializableEvent = mock(SerializableDropTableEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); RuntimeException e = new RuntimeException("Something has gone wrong"); doThrow(e).when(executorService).submit(any(MessageTask.class)); listener.onDropTable(event); @@ -188,7 +188,7 @@ public void onAddPartition() throws Exception { SerializableAddPartitionEvent serializableEvent = mock(SerializableAddPartitionEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onAddPartition(event); verify(executorService).submit(captor.capture()); assertThat(captor.getValue()).isEqualTo(new WrappingMessageTask(messageTask)); @@ -200,7 +200,7 @@ public void onAddPartitionErrors() throws Exception { SerializableAddPartitionEvent serializableEvent = mock(SerializableAddPartitionEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); RuntimeException e = new RuntimeException("Something has gone wrong"); doThrow(e).when(executorService).submit(any(MessageTask.class)); listener.onAddPartition(event); @@ -214,7 +214,7 @@ public void onAlterPartition() throws Exception { SerializableAlterPartitionEvent serializableEvent = mock(SerializableAlterPartitionEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onAlterPartition(event); verify(executorService).submit(captor.capture()); assertThat(captor.getValue()).isEqualTo(new WrappingMessageTask(messageTask)); @@ -226,7 +226,7 @@ public void onAlterPartitionErrors() throws Exception { SerializableAlterPartitionEvent serializableEvent = mock(SerializableAlterPartitionEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); RuntimeException e = new RuntimeException("Something has gone wrong"); doThrow(e).when(executorService).submit(any(MessageTask.class)); listener.onAlterPartition(event); @@ -240,7 +240,7 @@ public void onDropPartition() throws Exception { SerializableDropPartitionEvent serializableEvent = mock(SerializableDropPartitionEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onDropPartition(event); verify(executorService).submit(captor.capture()); assertThat(captor.getValue()).isEqualTo(new WrappingMessageTask(messageTask)); @@ -252,7 +252,7 @@ public void onDropPartitionErrors() throws Exception { SerializableDropPartitionEvent serializableEvent = mock(SerializableDropPartitionEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); RuntimeException e = new RuntimeException("Something has gone wrong"); doThrow(e).when(executorService).submit(any(MessageTask.class)); listener.onDropPartition(event); @@ -266,7 +266,7 @@ public void onInsert() throws Exception { SerializableInsertEvent serializableEvent = mock(SerializableInsertEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onInsert(event); verify(executorService).submit(captor.capture()); assertThat(captor.getValue()).isEqualTo(new WrappingMessageTask(messageTask)); @@ -278,7 +278,7 @@ public void onInsertErrors() throws Exception { SerializableInsertEvent serializableEvent = mock(SerializableInsertEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); RuntimeException e = new RuntimeException("Something has gone wrong"); doThrow(e).when(executorService).submit(any(MessageTask.class)); listener.onInsert(event); @@ -290,63 +290,63 @@ public void onInsertErrors() throws Exception { public void onConfigChange() throws Exception { listener.onConfigChange(mock(ConfigChangeEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onCreateDatabase() throws Exception { listener.onCreateDatabase(mock(CreateDatabaseEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onDropDatabase() throws Exception { listener.onDropDatabase(mock(DropDatabaseEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onLoadPartitionDone() throws Exception { listener.onLoadPartitionDone(mock(LoadPartitionDoneEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onAddIndex() throws Exception { listener.onAddIndex(mock(AddIndexEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onDropIndex() throws Exception { listener.onDropIndex(mock(DropIndexEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onAlterIndex() throws Exception { listener.onAlterIndex(mock(AlterIndexEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onCreateFunction() throws Exception { listener.onCreateFunction(mock(CreateFunctionEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onDropFunction() throws Exception { listener.onDropFunction(mock(DropFunctionEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/EventTypeTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/EventTypeTest.java index 03ddfc7..5c2f9ce 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/EventTypeTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/EventTypeTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,13 +20,14 @@ import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.junit.Test; public class EventTypeTest { @Test public void eventClassesAreUnique() { - Map, EventType> cache = new HashMap<>(); + Map, EventType> cache = new HashMap<>(); for (EventType et : EventType.values()) { assertThat(cache).doesNotContainKey(et.eventClass()); cache.put(et.eventClass(), et); diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAddPartitionEventTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAddPartitionEventTest.java index b665283..f61636f 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAddPartitionEventTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAddPartitionEventTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,83 +13,98 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.hotels.shunting.yard.common.event; - -import static java.util.Collections.EMPTY_LIST; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -import java.util.Arrays; - -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class SerializableAddPartitionEventTest { - - private static final String DATABASE = "db"; - private static final String TABLE = "tbl"; - - private @Mock AddPartitionEvent addPartitionEvent; - private @Mock Table table; - private @Mock Partition partition; - - private SerializableAddPartitionEvent event; - - @Before - public void init() { - when(table.getDbName()).thenReturn(DATABASE); - when(table.getTableName()).thenReturn(TABLE); - when(addPartitionEvent.getTable()).thenReturn(table); - when(addPartitionEvent.getPartitionIterator()).thenReturn(Arrays.asList(partition).iterator()); - when(addPartitionEvent.getStatus()).thenReturn(true); - event = new SerializableAddPartitionEvent(addPartitionEvent); - } - - @Test - public void databaseName() { - assertThat(event.getDatabaseName()).isEqualTo(DATABASE); - } - - @Test - public void tableName() { - assertThat(event.getTableName()).isEqualTo(TABLE); - } - - @Test - public void eventType() { - assertThat(event.getEventType()).isSameAs(EventType.ON_ADD_PARTITION); - } - - @Test - public void table() { - assertThat(event.getTable()).isSameAs(table); - } - - @Test - public void partitions() { - assertThat(event.getPartitions()).isEqualTo(Arrays.asList(partition)); - assertThat(event.getPartitions()).isEqualTo(Arrays.asList(partition)); - } - - @Test(expected = NullPointerException.class) - public void nullPartitionIterator() { - when(addPartitionEvent.getPartitionIterator()).thenReturn(null); - new SerializableAddPartitionEvent(addPartitionEvent); - } - - @Test - public void emptyPartitionIterator() { - when(addPartitionEvent.getPartitionIterator()).thenReturn(EMPTY_LIST.iterator()); - SerializableAddPartitionEvent event = new SerializableAddPartitionEvent(addPartitionEvent); - assertThat(event.getPartitions()).isEqualTo(EMPTY_LIST); - } - -} +/// ** +// * Copyright (C) 2016-2018 Expedia Inc. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package com.hotels.shunting.yard.common.event; +// +// import static java.util.Collections.EMPTY_LIST; +// +// import static org.assertj.core.api.Assertions.assertThat; +// import static org.mockito.Mockito.when; +// +// import java.util.Arrays; +// +// import org.apache.hadoop.hive.metastore.api.Partition; +// import org.apache.hadoop.hive.metastore.api.Table; +// import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +// import org.junit.Before; +// import org.junit.Test; +// import org.junit.runner.RunWith; +// import org.mockito.Mock; +// import org.mockito.junit.MockitoJUnitRunner; +// +// @RunWith(MockitoJUnitRunner.class) +// public class SerializableAddPartitionEventTest { +// +// private static final String DATABASE = "db"; +// private static final String TABLE = "tbl"; +// +// private @Mock AddPartitionEvent addPartitionEvent; +// private @Mock Table table; +// private @Mock Partition partition; +// +// private SerializableAddPartitionEvent event; +// +// @Before +// public void init() { +// when(table.getDbName()).thenReturn(DATABASE); +// when(table.getTableName()).thenReturn(TABLE); +// when(addPartitionEvent.getTable()).thenReturn(table); +// when(addPartitionEvent.getPartitionIterator()).thenReturn(Arrays.asList(partition).iterator()); +// when(addPartitionEvent.getStatus()).thenReturn(true); +// event = new SerializableAddPartitionEvent(addPartitionEvent); +// } +// +// @Test +// public void databaseName() { +// assertThat(event.getDatabaseName()).isEqualTo(DATABASE); +// } +// +// @Test +// public void tableName() { +// assertThat(event.getTableName()).isEqualTo(TABLE); +// } +// +// @Test +// public void eventType() { +// assertThat(event.getEventType()).isSameAs(EventType.ON_ADD_PARTITION); +// } +// +// @Test +// public void table() { +// assertThat(event.getTable()).isSameAs(table); +// } +// +// @Test +// public void partitions() { +// assertThat(event.getPartitions()).isEqualTo(Arrays.asList(partition)); +// assertThat(event.getPartitions()).isEqualTo(Arrays.asList(partition)); +// } +// +// @Test(expected = NullPointerException.class) +// public void nullPartitionIterator() { +// when(addPartitionEvent.getPartitionIterator()).thenReturn(null); +// new SerializableAddPartitionEvent(addPartitionEvent); +// } +// +// @Test +// public void emptyPartitionIterator() { +// when(addPartitionEvent.getPartitionIterator()).thenReturn(EMPTY_LIST.iterator()); +// SerializableAddPartitionEvent event = new SerializableAddPartitionEvent(addPartitionEvent); +// assertThat(event.getPartitions()).isEqualTo(EMPTY_LIST); +// } +// +// } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAlterPartitionEventTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAlterPartitionEventTest.java index 10fbd33..5b08a98 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAlterPartitionEventTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAlterPartitionEventTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,67 +13,82 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.hotels.shunting.yard.common.event; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class SerializableAlterPartitionEventTest { - - private static final String DATABASE = "db"; - private static final String TABLE = "tbl"; - - private @Mock AlterPartitionEvent alterPartitionEvent; - private @Mock Table table; - private @Mock Partition newPartition; - private @Mock Partition oldPartition; - - private SerializableAlterPartitionEvent event; - - @Before - public void init() { - when(table.getDbName()).thenReturn(DATABASE); - when(table.getTableName()).thenReturn(TABLE); - when(alterPartitionEvent.getTable()).thenReturn(table); - when(alterPartitionEvent.getNewPartition()).thenReturn(newPartition); - when(alterPartitionEvent.getOldPartition()).thenReturn(oldPartition); - event = new SerializableAlterPartitionEvent(alterPartitionEvent); - } - - @Test - public void databaseName() { - assertThat(event.getDatabaseName()).isEqualTo(DATABASE); - } - - @Test - public void tableName() { - assertThat(event.getTableName()).isEqualTo(TABLE); - } - - @Test - public void eventType() { - assertThat(event.getEventType()).isSameAs(EventType.ON_ALTER_PARTITION); - } - - @Test - public void table() { - assertThat(event.getTable()).isSameAs(table); - } - - @Test - public void partitions() { - assertThat(event.getNewPartition()).isSameAs(newPartition); - assertThat(event.getOldPartition()).isSameAs(oldPartition); - } - -} +/// ** +// * Copyright (C) 2016-2018 Expedia Inc. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package com.hotels.shunting.yard.common.event; +// +// import static org.assertj.core.api.Assertions.assertThat; +// import static org.mockito.Mockito.when; +// +// import org.apache.hadoop.hive.metastore.api.Partition; +// import org.apache.hadoop.hive.metastore.api.Table; +// import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +// import org.junit.Before; +// import org.junit.Test; +// import org.junit.runner.RunWith; +// import org.mockito.Mock; +// import org.mockito.junit.MockitoJUnitRunner; +// +// @RunWith(MockitoJUnitRunner.class) +// public class SerializableAlterPartitionEventTest { +// +// private static final String DATABASE = "db"; +// private static final String TABLE = "tbl"; +// +// private @Mock AlterPartitionEvent alterPartitionEvent; +// private @Mock Table table; +// private @Mock Partition newPartition; +// private @Mock Partition oldPartition; +// +// private SerializableAlterPartitionEvent event; +// +// @Before +// public void init() { +// when(table.getDbName()).thenReturn(DATABASE); +// when(table.getTableName()).thenReturn(TABLE); +// when(alterPartitionEvent.getTable()).thenReturn(table); +// when(alterPartitionEvent.getNewPartition()).thenReturn(newPartition); +// when(alterPartitionEvent.getOldPartition()).thenReturn(oldPartition); +// event = new SerializableAlterPartitionEvent(alterPartitionEvent); +// } +// +// @Test +// public void databaseName() { +// assertThat(event.getDatabaseName()).isEqualTo(DATABASE); +// } +// +// @Test +// public void tableName() { +// assertThat(event.getTableName()).isEqualTo(TABLE); +// } +// +// @Test +// public void eventType() { +// assertThat(event.getEventType()).isSameAs(EventType.ON_ALTER_PARTITION); +// } +// +// @Test +// public void table() { +// assertThat(event.getTable()).isSameAs(table); +// } +// +// @Test +// public void partitions() { +// assertThat(event.getNewPartition()).isSameAs(newPartition); +// assertThat(event.getOldPartition()).isSameAs(oldPartition); +// } +// +// } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAlterTableEventTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAlterTableEventTest.java index dd03510..866886e 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAlterTableEventTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableAlterTableEventTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,59 +13,74 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.hotels.shunting.yard.common.event; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.events.AlterTableEvent; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class SerializableAlterTableEventTest { - - private static final String NEW_DATABASE = "new_db"; - private static final String NEW_TABLE = "new_tbl"; - - private @Mock AlterTableEvent alterTableEvent; - private @Mock Table newTable; - private @Mock Table oldTable; - - private SerializableAlterTableEvent event; - - @Before - public void init() { - when(newTable.getDbName()).thenReturn(NEW_DATABASE); - when(newTable.getTableName()).thenReturn(NEW_TABLE); - when(alterTableEvent.getNewTable()).thenReturn(newTable); - when(alterTableEvent.getOldTable()).thenReturn(oldTable); - event = new SerializableAlterTableEvent(alterTableEvent); - } - - @Test - public void databaseName() { - assertThat(event.getDatabaseName()).isEqualTo(NEW_DATABASE); - } - - @Test - public void tableName() { - assertThat(event.getTableName()).isEqualTo(NEW_TABLE); - } - - @Test - public void eventType() { - assertThat(event.getEventType()).isSameAs(EventType.ON_ALTER_TABLE); - } - - @Test - public void tables() { - assertThat(event.getNewTable()).isSameAs(newTable); - assertThat(event.getOldTable()).isSameAs(oldTable); - } - -} +/// ** +// * Copyright (C) 2016-2018 Expedia Inc. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package com.hotels.shunting.yard.common.event; +// +// import static org.assertj.core.api.Assertions.assertThat; +// import static org.mockito.Mockito.when; +// +// import org.apache.hadoop.hive.metastore.api.Table; +// import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +// import org.junit.Before; +// import org.junit.Test; +// import org.junit.runner.RunWith; +// import org.mockito.Mock; +// import org.mockito.junit.MockitoJUnitRunner; +// +// @RunWith(MockitoJUnitRunner.class) +// public class SerializableAlterTableEventTest { +// +// private static final String NEW_DATABASE = "new_db"; +// private static final String NEW_TABLE = "new_tbl"; +// +// private @Mock AlterTableEvent alterTableEvent; +// private @Mock Table newTable; +// private @Mock Table oldTable; +// +// private SerializableAlterTableEvent event; +// +// @Before +// public void init() { +// when(newTable.getDbName()).thenReturn(NEW_DATABASE); +// when(newTable.getTableName()).thenReturn(NEW_TABLE); +// when(alterTableEvent.getNewTable()).thenReturn(newTable); +// when(alterTableEvent.getOldTable()).thenReturn(oldTable); +// event = new SerializableAlterTableEvent(alterTableEvent); +// } +// +// @Test +// public void databaseName() { +// assertThat(event.getDatabaseName()).isEqualTo(NEW_DATABASE); +// } +// +// @Test +// public void tableName() { +// assertThat(event.getTableName()).isEqualTo(NEW_TABLE); +// } +// +// @Test +// public void eventType() { +// assertThat(event.getEventType()).isSameAs(EventType.ON_ALTER_TABLE); +// } +// +// @Test +// public void tables() { +// assertThat(event.getNewTable()).isSameAs(newTable); +// assertThat(event.getOldTable()).isSameAs(oldTable); +// } +// +// } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableCreateTableEventTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableCreateTableEventTest.java index 3dafd32..ed25cd3 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableCreateTableEventTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableCreateTableEventTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,56 +13,71 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.hotels.shunting.yard.common.event; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.events.CreateTableEvent; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class SerializableCreateTableEventTest { - - private static final String DATABASE = "db"; - private static final String TABLE = "tbl"; - - private @Mock CreateTableEvent createTableEvent; - private @Mock Table table; - - private SerializableCreateTableEvent event; - - @Before - public void init() { - when(table.getDbName()).thenReturn(DATABASE); - when(table.getTableName()).thenReturn(TABLE); - when(createTableEvent.getTable()).thenReturn(table); - event = new SerializableCreateTableEvent(createTableEvent); - } - - @Test - public void databaseName() { - assertThat(event.getDatabaseName()).isEqualTo(DATABASE); - } - - @Test - public void tableName() { - assertThat(event.getTableName()).isEqualTo(TABLE); - } - - @Test - public void eventType() { - assertThat(event.getEventType()).isSameAs(EventType.ON_CREATE_TABLE); - } - - @Test - public void table() { - assertThat(event.getTable()).isSameAs(table); - } - -} +/// ** +// * Copyright (C) 2016-2018 Expedia Inc. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package com.hotels.shunting.yard.common.event; +// +// import static org.assertj.core.api.Assertions.assertThat; +// import static org.mockito.Mockito.when; +// +// import org.apache.hadoop.hive.metastore.api.Table; +// import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +// import org.junit.Before; +// import org.junit.Test; +// import org.junit.runner.RunWith; +// import org.mockito.Mock; +// import org.mockito.junit.MockitoJUnitRunner; +// +// @RunWith(MockitoJUnitRunner.class) +// public class SerializableCreateTableEventTest { +// +// private static final String DATABASE = "db"; +// private static final String TABLE = "tbl"; +// +// private @Mock CreateTableEvent createTableEvent; +// private @Mock Table table; +// +// private SerializableCreateTableEvent event; +// +// @Before +// public void init() { +// when(table.getDbName()).thenReturn(DATABASE); +// when(table.getTableName()).thenReturn(TABLE); +// when(createTableEvent.getTable()).thenReturn(table); +// event = new SerializableCreateTableEvent(createTableEvent); +// } +// +// @Test +// public void databaseName() { +// assertThat(event.getDatabaseName()).isEqualTo(DATABASE); +// } +// +// @Test +// public void tableName() { +// assertThat(event.getTableName()).isEqualTo(TABLE); +// } +// +// @Test +// public void eventType() { +// assertThat(event.getEventType()).isSameAs(EventType.ON_CREATE_TABLE); +// } +// +// @Test +// public void table() { +// assertThat(event.getTable()).isSameAs(table); +// } +// +// } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableDropPartitionEventTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableDropPartitionEventTest.java index 00cea44..82131d5 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableDropPartitionEventTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableDropPartitionEventTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,82 +13,97 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.hotels.shunting.yard.common.event; - -import static java.util.Collections.EMPTY_LIST; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -import java.util.Arrays; - -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class SerializableDropPartitionEventTest { - - private static final String DATABASE = "db"; - private static final String TABLE = "tbl"; - - private @Mock DropPartitionEvent dropPartitionEvent; - private @Mock Table table; - private @Mock Partition partition; - - private SerializableDropPartitionEvent event; - - @Before - public void init() { - when(table.getDbName()).thenReturn(DATABASE); - when(table.getTableName()).thenReturn(TABLE); - when(dropPartitionEvent.getTable()).thenReturn(table); - when(dropPartitionEvent.getPartitionIterator()).thenReturn(Arrays.asList(partition).iterator()); - event = new SerializableDropPartitionEvent(dropPartitionEvent); - } - - @Test - public void databaseName() { - assertThat(event.getDatabaseName()).isEqualTo(DATABASE); - } - - @Test - public void tableName() { - assertThat(event.getTableName()).isEqualTo(TABLE); - } - - @Test - public void eventType() { - assertThat(event.getEventType()).isSameAs(EventType.ON_DROP_PARTITION); - } - - @Test - public void table() { - assertThat(event.getTable()).isSameAs(table); - } - - @Test - public void partitions() { - assertThat(event.getPartitions()).isEqualTo(Arrays.asList(partition)); - assertThat(event.getPartitions()).isEqualTo(Arrays.asList(partition)); - } - - @Test(expected = NullPointerException.class) - public void nullPartitionIterator() { - when(dropPartitionEvent.getPartitionIterator()).thenReturn(null); - new SerializableDropPartitionEvent(dropPartitionEvent); - } - - @Test - public void emptyPartitionIterator() { - when(dropPartitionEvent.getPartitionIterator()).thenReturn(EMPTY_LIST.iterator()); - SerializableDropPartitionEvent event = new SerializableDropPartitionEvent(dropPartitionEvent); - assertThat(event.getPartitions()).isEqualTo(EMPTY_LIST); - } - -} +/// ** +// * Copyright (C) 2016-2018 Expedia Inc. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package com.hotels.shunting.yard.common.event; +// +// import static java.util.Collections.EMPTY_LIST; +// +// import static org.assertj.core.api.Assertions.assertThat; +// import static org.mockito.Mockito.when; +// +// import java.util.Arrays; +// +// import org.apache.hadoop.hive.metastore.api.Partition; +// import org.apache.hadoop.hive.metastore.api.Table; +// import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +// import org.junit.Before; +// import org.junit.Test; +// import org.junit.runner.RunWith; +// import org.mockito.Mock; +// import org.mockito.junit.MockitoJUnitRunner; +// +// @RunWith(MockitoJUnitRunner.class) +// public class SerializableDropPartitionEventTest { +// +// private static final String DATABASE = "db"; +// private static final String TABLE = "tbl"; +// +// private @Mock DropPartitionEvent dropPartitionEvent; +// private @Mock Table table; +// private @Mock Partition partition; +// +// private SerializableDropPartitionEvent event; +// +// @Before +// public void init() { +// when(table.getDbName()).thenReturn(DATABASE); +// when(table.getTableName()).thenReturn(TABLE); +// when(dropPartitionEvent.getTable()).thenReturn(table); +// when(dropPartitionEvent.getPartitionIterator()).thenReturn(Arrays.asList(partition).iterator()); +// event = new SerializableDropPartitionEvent(dropPartitionEvent); +// } +// +// @Test +// public void databaseName() { +// assertThat(event.getDatabaseName()).isEqualTo(DATABASE); +// } +// +// @Test +// public void tableName() { +// assertThat(event.getTableName()).isEqualTo(TABLE); +// } +// +// @Test +// public void eventType() { +// assertThat(event.getEventType()).isSameAs(EventType.ON_DROP_PARTITION); +// } +// +// @Test +// public void table() { +// assertThat(event.getTable()).isSameAs(table); +// } +// +// @Test +// public void partitions() { +// assertThat(event.getPartitions()).isEqualTo(Arrays.asList(partition)); +// assertThat(event.getPartitions()).isEqualTo(Arrays.asList(partition)); +// } +// +// @Test(expected = NullPointerException.class) +// public void nullPartitionIterator() { +// when(dropPartitionEvent.getPartitionIterator()).thenReturn(null); +// new SerializableDropPartitionEvent(dropPartitionEvent); +// } +// +// @Test +// public void emptyPartitionIterator() { +// when(dropPartitionEvent.getPartitionIterator()).thenReturn(EMPTY_LIST.iterator()); +// SerializableDropPartitionEvent event = new SerializableDropPartitionEvent(dropPartitionEvent); +// assertThat(event.getPartitions()).isEqualTo(EMPTY_LIST); +// } +// +// } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableDropTableEventTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableDropTableEventTest.java index e45e6f8..632b8ae 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableDropTableEventTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableDropTableEventTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,56 +13,71 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.hotels.shunting.yard.common.event; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.events.DropTableEvent; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class SerializableDropTableEventTest { - - private static final String DATABASE = "db"; - private static final String TABLE = "tbl"; - - private @Mock DropTableEvent dropTableEvent; - private @Mock Table table; - - private SerializableDropTableEvent event; - - @Before - public void init() { - when(table.getDbName()).thenReturn(DATABASE); - when(table.getTableName()).thenReturn(TABLE); - when(dropTableEvent.getTable()).thenReturn(table); - event = new SerializableDropTableEvent(dropTableEvent); - } - - @Test - public void databaseName() { - assertThat(event.getDatabaseName()).isEqualTo(DATABASE); - } - - @Test - public void tableName() { - assertThat(event.getTableName()).isEqualTo(TABLE); - } - - @Test - public void eventType() { - assertThat(event.getEventType()).isSameAs(EventType.ON_DROP_TABLE); - } - - @Test - public void table() { - assertThat(event.getTable()).isSameAs(table); - } - -} +/// ** +// * Copyright (C) 2016-2018 Expedia Inc. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package com.hotels.shunting.yard.common.event; +// +// import static org.assertj.core.api.Assertions.assertThat; +// import static org.mockito.Mockito.when; +// +// import org.apache.hadoop.hive.metastore.api.Table; +// import org.apache.hadoop.hive.metastore.events.DropTableEvent; +// import org.junit.Before; +// import org.junit.Test; +// import org.junit.runner.RunWith; +// import org.mockito.Mock; +// import org.mockito.junit.MockitoJUnitRunner; +// +// @RunWith(MockitoJUnitRunner.class) +// public class SerializableDropTableEventTest { +// +// private static final String DATABASE = "db"; +// private static final String TABLE = "tbl"; +// +// private @Mock DropTableEvent dropTableEvent; +// private @Mock Table table; +// +// private SerializableDropTableEvent event; +// +// @Before +// public void init() { +// when(table.getDbName()).thenReturn(DATABASE); +// when(table.getTableName()).thenReturn(TABLE); +// when(dropTableEvent.getTable()).thenReturn(table); +// event = new SerializableDropTableEvent(dropTableEvent); +// } +// +// @Test +// public void databaseName() { +// assertThat(event.getDatabaseName()).isEqualTo(DATABASE); +// } +// +// @Test +// public void tableName() { +// assertThat(event.getTableName()).isEqualTo(TABLE); +// } +// +// @Test +// public void eventType() { +// assertThat(event.getEventType()).isSameAs(EventType.ON_DROP_TABLE); +// } +// +// @Test +// public void table() { +// assertThat(event.getTable()).isSameAs(table); +// } +// +// } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableInsertEventTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableInsertEventTest.java index a01653f..30ab3bd 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableInsertEventTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableInsertEventTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,73 +13,88 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.hotels.shunting.yard.common.event; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.hive.metastore.events.InsertEvent; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -@RunWith(MockitoJUnitRunner.class) -public class SerializableInsertEventTest { - - private static final String DATABASE = "db"; - private static final String TABLE = "tbl"; - private static final String KEY = "key"; - private static final String VALUE = "val"; - private static final String FILE = "file"; - private static final String CHECKSUM = "checksum"; - - private @Mock InsertEvent insertEvent; - - private SerializableInsertEvent event; - - @Before - public void init() { - when(insertEvent.getDb()).thenReturn(DATABASE); - when(insertEvent.getTable()).thenReturn(TABLE); - when(insertEvent.getPartitionKeyValues()).thenReturn(ImmutableMap.of(KEY, VALUE)); - when(insertEvent.getFiles()).thenReturn(ImmutableList.of(FILE)); - when(insertEvent.getFileChecksums()).thenReturn(ImmutableList.of(CHECKSUM)); - event = new SerializableInsertEvent(insertEvent); - } - - @Test - public void databaseName() { - assertThat(event.getDatabaseName()).isEqualTo(DATABASE); - } - - @Test - public void tableName() { - assertThat(event.getTableName()).isEqualTo(TABLE); - } - - @Test - public void eventType() { - assertThat(event.getEventType()).isSameAs(EventType.ON_INSERT); - } - - @Test - public void keyValues() { - assertThat(event.getKeyValues()).isEqualTo(ImmutableMap.of(KEY, VALUE)); - } - - @Test - public void files() { - assertThat(event.getFiles()).isEqualTo(ImmutableList.of(FILE)); - } - - @Test - public void checksums() { - assertThat(event.getFileChecksums()).isEqualTo(ImmutableList.of(CHECKSUM)); - } - -} +/// ** +// * Copyright (C) 2016-2018 Expedia Inc. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package com.hotels.shunting.yard.common.event; +// +// import static org.assertj.core.api.Assertions.assertThat; +// import static org.mockito.Mockito.when; +// +// import org.apache.hadoop.hive.metastore.events.InsertEvent; +// import org.junit.Before; +// import org.junit.Test; +// import org.junit.runner.RunWith; +// import org.mockito.Mock; +// import org.mockito.junit.MockitoJUnitRunner; +// +// import com.google.common.collect.ImmutableList; +// import com.google.common.collect.ImmutableMap; +// +// @RunWith(MockitoJUnitRunner.class) +// public class SerializableInsertEventTest { +// +// private static final String DATABASE = "db"; +// private static final String TABLE = "tbl"; +// private static final String KEY = "key"; +// private static final String VALUE = "val"; +// private static final String FILE = "file"; +// private static final String CHECKSUM = "checksum"; +// +// private @Mock InsertEvent insertEvent; +// +// private SerializableInsertEvent event; +// +// @Before +// public void init() { +// when(insertEvent.getDb()).thenReturn(DATABASE); +// when(insertEvent.getTable()).thenReturn(TABLE); +// when(insertEvent.getPartitionKeyValues()).thenReturn(ImmutableMap.of(KEY, VALUE)); +// when(insertEvent.getFiles()).thenReturn(ImmutableList.of(FILE)); +// when(insertEvent.getFileChecksums()).thenReturn(ImmutableList.of(CHECKSUM)); +// event = new SerializableInsertEvent(insertEvent); +// } +// +// @Test +// public void databaseName() { +// assertThat(event.getDatabaseName()).isEqualTo(DATABASE); +// } +// +// @Test +// public void tableName() { +// assertThat(event.getTableName()).isEqualTo(TABLE); +// } +// +// @Test +// public void eventType() { +// assertThat(event.getEventType()).isSameAs(EventType.ON_INSERT); +// } +// +// @Test +// public void keyValues() { +// assertThat(event.getKeyValues()).isEqualTo(ImmutableMap.of(KEY, VALUE)); +// } +// +// @Test +// public void files() { +// assertThat(event.getFiles()).isEqualTo(ImmutableList.of(FILE)); +// } +// +// @Test +// public void checksums() { +// assertThat(event.getFileChecksums()).isEqualTo(ImmutableList.of(CHECKSUM)); +// } +// +// } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableListenerEventFactoryTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableListenerEventFactoryTest.java index eb5d8e1..4f58960 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableListenerEventFactoryTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/event/SerializableListenerEventFactoryTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,132 +13,147 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.hotels.shunting.yard.common.event; - -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; -import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; -import org.apache.hadoop.hive.metastore.events.AlterTableEvent; -import org.apache.hadoop.hive.metastore.events.CreateTableEvent; -import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; -import org.apache.hadoop.hive.metastore.events.DropTableEvent; -import org.apache.hadoop.hive.metastore.events.InsertEvent; -import org.apache.hadoop.hive.metastore.events.ListenerEvent; -import org.apache.hive.common.util.HiveVersionInfo; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; - -@RunWith(MockitoJUnitRunner.class) -public class SerializableListenerEventFactoryTest { - - private static final String METASTORE_URIS = "thrift://localhost:1234"; - - private @Mock Iterator partitionIterator; - - private Map parameters; - private SerializableListenerEventFactory factory; - - @Before - public void init() { - parameters = new HashMap<>(); - HiveConf config = new HiveConf(); - config.setVar(METASTOREURIS, METASTORE_URIS); - factory = new SerializableListenerEventFactory(config); - } - - private T mockEvent(Class clazz) { - T event = mock(clazz); - when(event.getStatus()).thenReturn(true); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - parameters.put(invocation.getArgument(0).toString(), invocation.getArgument(1).toString()); - return null; - } - }).when(event).putParameter(anyString(), anyString()); - return event; - } - - private void assertCommon(SerializableListenerEvent event) { - assertThat(event.getStatus()).isTrue(); - // We don't use event.getParameters() here because it is deferred to parameters in the stub - assertThat(parameters).containsEntry(METASTOREURIS.varname, METASTORE_URIS).containsEntry( - CustomEventParameters.HIVE_VERSION.varname(), HiveVersionInfo.getVersion()); - } - - @Test - public void createSerializableCreateTableEvent() { - CreateTableEvent event = mockEvent(CreateTableEvent.class); - SerializableListenerEvent serializableEvent = factory.create(event); - assertCommon(serializableEvent); - assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_CREATE_TABLE); - } - - @Test - public void createSerializableAlterTableEvent() { - AlterTableEvent event = mockEvent(AlterTableEvent.class); - SerializableListenerEvent serializableEvent = factory.create(event); - assertCommon(serializableEvent); - assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_ALTER_TABLE); - } - - @Test - public void createSerializableDropTableEvent() { - DropTableEvent event = mockEvent(DropTableEvent.class); - SerializableListenerEvent serializableEvent = factory.create(event); - assertCommon(serializableEvent); - assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_DROP_TABLE); - } - - @Test - public void createSerializableAddPartitionEvent() { - AddPartitionEvent event = mockEvent(AddPartitionEvent.class); - when(event.getPartitionIterator()).thenReturn(partitionIterator); - SerializableListenerEvent serializableEvent = factory.create(event); - assertCommon(serializableEvent); - assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_ADD_PARTITION); - } - - @Test - public void createSerializableAlterPartitionEvent() { - AlterPartitionEvent event = mockEvent(AlterPartitionEvent.class); - SerializableListenerEvent serializableEvent = factory.create(event); - assertCommon(serializableEvent); - assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_ALTER_PARTITION); - } - - @Test - public void createSerializableDropPartitionEvent() { - DropPartitionEvent event = mockEvent(DropPartitionEvent.class); - when(event.getPartitionIterator()).thenReturn(partitionIterator); - SerializableListenerEvent serializableEvent = factory.create(event); - assertCommon(serializableEvent); - assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_DROP_PARTITION); - } - - @Test - public void createSerializableInsertEvent() { - InsertEvent event = mockEvent(InsertEvent.class); - SerializableListenerEvent serializableEvent = factory.create(event); - assertCommon(serializableEvent); - assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_INSERT); - } - -} +/// ** +// * Copyright (C) 2016-2018 Expedia Inc. +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package com.hotels.shunting.yard.common.event; +// +// import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +// import static org.assertj.core.api.Assertions.assertThat; +// import static org.mockito.ArgumentMatchers.anyString; +// import static org.mockito.Mockito.doAnswer; +// import static org.mockito.Mockito.mock; +// import static org.mockito.Mockito.when; +// +// import java.util.HashMap; +// import java.util.Iterator; +// import java.util.Map; +// +// import org.apache.hadoop.hive.conf.HiveConf; +// import org.apache.hadoop.hive.metastore.api.Partition; +// import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +// import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +// import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +// import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +// import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +// import org.apache.hadoop.hive.metastore.events.DropTableEvent; +// import org.apache.hadoop.hive.metastore.events.InsertEvent; +// import org.apache.hadoop.hive.metastore.events.ListenerEvent; +// import org.apache.hive.common.util.HiveVersionInfo; +// import org.junit.Before; +// import org.junit.Test; +// import org.junit.runner.RunWith; +// import org.mockito.Mock; +// import org.mockito.invocation.InvocationOnMock; +// import org.mockito.junit.MockitoJUnitRunner; +// import org.mockito.stubbing.Answer; +// +// @RunWith(MockitoJUnitRunner.class) +// public class SerializableListenerEventFactoryTest { +// +// private static final String METASTORE_URIS = "thrift://localhost:1234"; +// +// private @Mock Iterator partitionIterator; +// +// private Map parameters; +// private SerializableListenerEventFactory factory; +// +// @Before +// public void init() { +// parameters = new HashMap<>(); +// HiveConf config = new HiveConf(); +// config.setVar(METASTOREURIS, METASTORE_URIS); +// factory = new SerializableListenerEventFactory(config); +// } +// +// private T mockEvent(Class clazz) { +// T event = mock(clazz); +// when(event.getStatus()).thenReturn(true); +// doAnswer(new Answer() { +// @Override +// public Void answer(InvocationOnMock invocation) throws Throwable { +// parameters.put(invocation.getArgument(0).toString(), invocation.getArgument(1).toString()); +// return null; +// } +// }).when(event).putParameter(anyString(), anyString()); +// return event; +// } +// +// private void assertCommon(SerializableListenerEvent event) { +// assertThat(event.getStatus()).isTrue(); +// // We don't use event.getParameters() here because it is deferred to parameters in the stub +// assertThat(parameters).containsEntry(METASTOREURIS.varname, METASTORE_URIS).containsEntry( +// CustomEventParameters.HIVE_VERSION.varname(), HiveVersionInfo.getVersion()); +// } +// +// @Test +// public void createSerializableCreateTableEvent() { +// CreateTableEvent event = mockEvent(CreateTableEvent.class); +// SerializableListenerEvent serializableEvent = factory.create(event); +// assertCommon(serializableEvent); +// assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_CREATE_TABLE); +// } +// +// @Test +// public void createSerializableAlterTableEvent() { +// AlterTableEvent event = mockEvent(AlterTableEvent.class); +// SerializableListenerEvent serializableEvent = factory.create(event); +// assertCommon(serializableEvent); +// assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_ALTER_TABLE); +// } +// +// @Test +// public void createSerializableDropTableEvent() { +// DropTableEvent event = mockEvent(DropTableEvent.class); +// SerializableListenerEvent serializableEvent = factory.create(event); +// assertCommon(serializableEvent); +// assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_DROP_TABLE); +// } +// +// @Test +// public void createSerializableAddPartitionEvent() { +// AddPartitionEvent event = mockEvent(AddPartitionEvent.class); +// when(event.getPartitionIterator()).thenReturn(partitionIterator); +// SerializableListenerEvent serializableEvent = factory.create(event); +// assertCommon(serializableEvent); +// assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_ADD_PARTITION); +// } +// +// @Test +// public void createSerializableAlterPartitionEvent() { +// AlterPartitionEvent event = mockEvent(AlterPartitionEvent.class); +// SerializableListenerEvent serializableEvent = factory.create(event); +// assertCommon(serializableEvent); +// assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_ALTER_PARTITION); +// } +// +// @Test +// public void createSerializableDropPartitionEvent() { +// DropPartitionEvent event = mockEvent(DropPartitionEvent.class); +// when(event.getPartitionIterator()).thenReturn(partitionIterator); +// SerializableListenerEvent serializableEvent = factory.create(event); +// assertCommon(serializableEvent); +// assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_DROP_PARTITION); +// } +// +// @Test +// public void createSerializableInsertEvent() { +// InsertEvent event = mockEvent(InsertEvent.class); +// SerializableListenerEvent serializableEvent = factory.create(event); +// assertCommon(serializableEvent); +// assertThat(serializableEvent.getEventType()).isSameAs(EventType.ON_INSERT); +// } +// +// } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/AbstractMetaStoreEventSerDeTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/AbstractMetaStoreEventSerDeTest.java index cf44f2f..b311712 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/AbstractMetaStoreEventSerDeTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/AbstractMetaStoreEventSerDeTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -129,13 +130,13 @@ public static SerializableListenerEvent[] data() throws Exception { serializableInsertEvent() }; } - public @Parameter SerializableListenerEvent event; + public @Parameter ListenerEvent event; protected abstract MetaStoreEventSerDe serDe(); @Test public void typical() throws Exception { - SerializableListenerEvent processedEvent = serDe().unmarshal(serDe().marshal(event)); + ListenerEvent processedEvent = serDe().unmarshal(serDe().marshal(event)); assertThat(processedEvent).isNotSameAs(event).isEqualTo(event); } diff --git a/shunting-yard-emitter/pom.xml b/shunting-yard-emitter/pom.xml index debb998..88c64ad 100644 --- a/shunting-yard-emitter/pom.xml +++ b/shunting-yard-emitter/pom.xml @@ -12,8 +12,8 @@ shunting-yard-emitter-kafka - shunting-yard-emitter-kinesis - shunting-yard-emitter-sqs + diff --git a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java index 08cf52e..734ea5a 100644 --- a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java +++ b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java index 9a73161..60f2a0d 100644 --- a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java +++ b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import com.hotels.shunting.yard.common.messaging.Message; import com.hotels.shunting.yard.common.messaging.MessageTask; @@ -36,7 +37,12 @@ class KafkaMessageTask implements MessageTask { @Override public void run() { - producer.send(new ProducerRecord<>(topic, partition(), message.getTimestamp(), message.getPayload())); + ProducerRecord pr = new ProducerRecord(topic, partition(), message.getTimestamp(), + message.getPayload()); + + Headers headers = pr.headers(); + headers.add("eventType", message.getEventType().name().getBytes()); + producer.send(pr); } private int partition() { diff --git a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/test/java/com/hotels/shunting/yard/emitter/kafka/listener/KafkaMetaStoreEventListenerTest.java b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/test/java/com/hotels/shunting/yard/emitter/kafka/listener/KafkaMetaStoreEventListenerTest.java index d62e0a6..aa03fd9 100644 --- a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/test/java/com/hotels/shunting/yard/emitter/kafka/listener/KafkaMetaStoreEventListenerTest.java +++ b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/test/java/com/hotels/shunting/yard/emitter/kafka/listener/KafkaMetaStoreEventListenerTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.junit.Before; import org.junit.Test; @@ -56,7 +57,6 @@ import com.hotels.shunting.yard.common.event.SerializableDropPartitionEvent; import com.hotels.shunting.yard.common.event.SerializableDropTableEvent; import com.hotels.shunting.yard.common.event.SerializableInsertEvent; -import com.hotels.shunting.yard.common.event.SerializableListenerEvent; import com.hotels.shunting.yard.common.event.SerializableListenerEventFactory; import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe; import com.hotels.shunting.yard.common.messaging.Message; @@ -73,7 +73,7 @@ public class KafkaMetaStoreEventListenerTest { private @Mock MetaStoreEventSerDe eventSerDe; private @Mock MessageTask messageTask; private @Mock MessageTaskFactory messageTaskFactory; - private @Mock SerializableListenerEventFactory serializableListenerEventFactory; + private @Mock SerializableListenerEventFactory ListenerEventFactory; private @Mock ExecutorService executorService; private final Configuration config = new Configuration(); @@ -81,7 +81,7 @@ public class KafkaMetaStoreEventListenerTest { @Before public void init() throws Exception { - when(eventSerDe.marshal(any(SerializableListenerEvent.class))).thenReturn(PAYLOAD); + when(eventSerDe.marshal(any(ListenerEvent.class))).thenReturn(PAYLOAD); when(messageTaskFactory.newTask(any(Message.class))).thenReturn(messageTask); doAnswer(new Answer() { @Override @@ -90,7 +90,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } }).when(executorService).submit(any(Runnable.class)); - listener = new KafkaMetaStoreEventListener(config, serializableListenerEventFactory, eventSerDe, messageTaskFactory, + listener = new KafkaMetaStoreEventListener(config, ListenerEventFactory, eventSerDe, messageTaskFactory, executorService); } @@ -100,7 +100,7 @@ public void onCreateTable() throws Exception { SerializableCreateTableEvent serializableEvent = mock(SerializableCreateTableEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onCreateTable(event); verify(messageTask).run(); } @@ -111,7 +111,7 @@ public void onAlterTable() throws Exception { SerializableAlterTableEvent serializableEvent = mock(SerializableAlterTableEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onAlterTable(event); verify(messageTask).run(); } @@ -122,7 +122,7 @@ public void onDropTable() throws Exception { SerializableDropTableEvent serializableEvent = mock(SerializableDropTableEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onDropTable(event); verify(messageTask).run(); } @@ -133,7 +133,7 @@ public void onAddPartition() throws Exception { SerializableAddPartitionEvent serializableEvent = mock(SerializableAddPartitionEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onAddPartition(event); verify(messageTask).run(); } @@ -144,7 +144,7 @@ public void onAlterPartition() throws Exception { SerializableAlterPartitionEvent serializableEvent = mock(SerializableAlterPartitionEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onAlterPartition(event); verify(messageTask).run(); } @@ -155,7 +155,7 @@ public void onDropPartition() throws Exception { SerializableDropPartitionEvent serializableEvent = mock(SerializableDropPartitionEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onDropPartition(event); verify(messageTask).run(); } @@ -166,7 +166,7 @@ public void onInsert() throws Exception { SerializableInsertEvent serializableEvent = mock(SerializableInsertEvent.class); when(serializableEvent.getDatabaseName()).thenReturn(DATABASE); when(serializableEvent.getTableName()).thenReturn(TABLE); - when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent); + when(ListenerEventFactory.create(event)).thenReturn(serializableEvent); listener.onInsert(event); verify(messageTask).run(); } @@ -175,63 +175,63 @@ public void onInsert() throws Exception { public void onConfigChange() throws Exception { listener.onConfigChange(mock(ConfigChangeEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onCreateDatabase() throws Exception { listener.onCreateDatabase(mock(CreateDatabaseEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onDropDatabase() throws Exception { listener.onDropDatabase(mock(DropDatabaseEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onLoadPartitionDone() throws Exception { listener.onLoadPartitionDone(mock(LoadPartitionDoneEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onAddIndex() throws Exception { listener.onAddIndex(mock(AddIndexEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onDropIndex() throws Exception { listener.onDropIndex(mock(DropIndexEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onAlterIndex() throws Exception { listener.onAlterIndex(mock(AlterIndexEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onCreateFunction() throws Exception { listener.onCreateFunction(mock(CreateFunctionEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } @Test public void onDropFunction() throws Exception { listener.onDropFunction(mock(DropFunctionEvent.class)); verify(executorService, never()).submit(any(Runnable.class)); - verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class)); + verify(eventSerDe, never()).marshal(any(ListenerEvent.class)); } } diff --git a/shunting-yard-emitter/shunting-yard-emitter-sqs/src/main/java/com/hotels/shunting/yard/emitter/sqs/messaging/SqsMessageTask.java b/shunting-yard-emitter/shunting-yard-emitter-sqs/src/main/java/com/hotels/shunting/yard/emitter/sqs/messaging/SqsMessageTask.java index efe3584..d764627 100644 --- a/shunting-yard-emitter/shunting-yard-emitter-sqs/src/main/java/com/hotels/shunting/yard/emitter/sqs/messaging/SqsMessageTask.java +++ b/shunting-yard-emitter/shunting-yard-emitter-sqs/src/main/java/com/hotels/shunting/yard/emitter/sqs/messaging/SqsMessageTask.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ */ package com.hotels.shunting.yard.emitter.sqs.messaging; -import org.datanucleus.util.Base64; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,11 +42,12 @@ class SqsMessageTask implements MessageTask { @Override public void run() { LOG.info("Sending message to topic {} and group ID {}", topic, messageGroupId); - producer.sendMessage(new SendMessageRequest() - .withQueueUrl(topic) - .withMessageGroupId(messageGroupId) - .withMessageBody(new String(Base64.encode(payload))) - .withDelaySeconds(0)); + producer + .sendMessage(new SendMessageRequest() + .withQueueUrl(topic) + .withMessageGroupId(messageGroupId) + .withMessageBody(new String(payload)) + .withDelaySeconds(0)); } } diff --git a/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java b/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java index ffad984..7593357 100644 --- a/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java +++ b/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java @@ -24,12 +24,14 @@ import java.util.Iterator; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import com.google.common.annotations.VisibleForTesting; -import com.hotels.shunting.yard.common.event.SerializableListenerEvent; import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe; import com.hotels.shunting.yard.common.io.SerDeException; import com.hotels.shunting.yard.common.messaging.MessageReader; @@ -75,7 +77,7 @@ public boolean hasNext() { } @Override - public SerializableListenerEvent next() { + public ListenerEvent next() { readRecordsIfNeeded(); return eventPayLoad(records.next()); } @@ -86,8 +88,16 @@ private void readRecordsIfNeeded() { } } - private SerializableListenerEvent eventPayLoad(ConsumerRecord message) { + private ListenerEvent eventPayLoad(ConsumerRecord message) { try { + Headers headers = message.headers(); + Iterator
itr = headers.headers("eventType").iterator(); + + while (itr.hasNext()) { + Header header = itr.next(); + System.out.println(new String(header.value())); + } + return eventSerDe.unmarshal(message.value()); } catch (Exception e) { throw new SerDeException("Unable to unmarshall event", e); diff --git a/shunting-yard-replicator/src/test/java/com/hotels/shunting/yard/replicator/exec/messaging/MessageReaderAdapterTest.java b/shunting-yard-replicator/src/test/java/com/hotels/shunting/yard/replicator/exec/messaging/MessageReaderAdapterTest.java index e8db65f..0fee8a8 100644 --- a/shunting-yard-replicator/src/test/java/com/hotels/shunting/yard/replicator/exec/messaging/MessageReaderAdapterTest.java +++ b/shunting-yard-replicator/src/test/java/com/hotels/shunting/yard/replicator/exec/messaging/MessageReaderAdapterTest.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) 2016-2019 Expedia Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.hotels.shunting.yard.replicator.exec.messaging; import static org.assertj.core.api.Assertions.assertThat; From a2ee0c40c86a38b4f565099bce2f46e6b03da514 Mon Sep 17 00:00:00 2001 From: Abhimanyu Gupta Date: Mon, 29 Jul 2019 10:27:48 +0100 Subject: [PATCH 2/2] Saving progress on kryo --- shunting-yard-common/derby.log | 13 ++ shunting-yard-common/pom.xml | 22 ++- .../io/jackson/ImmutableMapSerializer.java | 107 ++++++++++++ .../io/jackson/KryoMetaStoreEventSerDe.java | 162 ++++++++++++++++++ .../AbstractKryoMetaStoreEventSerDeTest.java | 134 +++++++++++++++ .../common/io/MetaStoreEventSerDeTest.java | 9 +- .../yard/common/io/SerDeTestUtils.java | 4 +- .../jackson/KryoMetaStoreEventSerDeTest.java | 30 ++++ .../yard/common/io/jackson/KryoSerdeTest.java | 61 +++++++ .../shunting-yard-emitter-kafka/pom.xml | 1 + .../emitter/kafka/KafkaProducerProperty.java | 4 +- .../kafka/messaging/KafkaMessageTask.java | 1 + .../kafka/messaging/KafkaMessageReader.java | 2 +- 13 files changed, 543 insertions(+), 7 deletions(-) create mode 100644 shunting-yard-common/derby.log create mode 100644 shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/ImmutableMapSerializer.java create mode 100644 shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/KryoMetaStoreEventSerDe.java create mode 100644 shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/AbstractKryoMetaStoreEventSerDeTest.java create mode 100644 shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/jackson/KryoMetaStoreEventSerDeTest.java create mode 100644 shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/jackson/KryoSerdeTest.java diff --git a/shunting-yard-common/derby.log b/shunting-yard-common/derby.log new file mode 100644 index 0000000..72eb23b --- /dev/null +++ b/shunting-yard-common/derby.log @@ -0,0 +1,13 @@ +---------------------------------------------------------------- +Fri Jul 26 13:27:25 BST 2019: +Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.2.0 - (1582446): instance a816c00e-016c-2e40-0e8a-00000a772ec8 +on database directory /Users/abhgupta/Desktop/workspace/shunting-yard/shunting-yard-common/metastore_db with class loader sun.misc.Launcher$AppClassLoader@3b192d32 +Loaded from file:/Users/abhgupta/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar +java.vendor=AdoptOpenJDK +java.runtime.version=1.8.0_212-b03 +user.dir=/Users/abhgupta/Desktop/workspace/shunting-yard/shunting-yard-common +os.name=Mac OS X +os.arch=x86_64 +os.version=10.13.6 +derby.system.home=null +Database Class Loader started - derby.database.classpath='' diff --git a/shunting-yard-common/pom.xml b/shunting-yard-common/pom.xml index cbc8a51..07dc7ef 100644 --- a/shunting-yard-common/pom.xml +++ b/shunting-yard-common/pom.xml @@ -1,4 +1,5 @@ - + 4.0.0 @@ -24,12 +25,24 @@ org.apache.hadoop hadoop-common + + + asm + asm + + org.apache.hive hive-metastore + + + asm + asm + + org.apache.hive.hcatalog @@ -40,6 +53,13 @@ hcommon-hive-metastore + + + com.esotericsoftware + kryo + 4.0.2 + + junit diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/ImmutableMapSerializer.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/ImmutableMapSerializer.java new file mode 100644 index 0000000..4d3226e --- /dev/null +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/ImmutableMapSerializer.java @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2016-2019 Expedia Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.shunting.yard.common.io.jackson; + +import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableTable; +import com.google.common.collect.Maps; + +public class ImmutableMapSerializer extends Serializer> { + private static final boolean DOES_NOT_ACCEPT_NULL = true; + private static final boolean IMMUTABLE = true; + + public ImmutableMapSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } + + @Override + public void write(Kryo kryo, Output output, ImmutableMap immutableMap) { + kryo.writeObject(output, Maps.newHashMap(immutableMap)); + } + + @Override + public ImmutableMap read( + Kryo kryo, + Input input, + Class> type) { + Map map = kryo.readObject(input, HashMap.class); + return ImmutableMap.copyOf(map); + } + + /** + * Creates a new {@link ImmutableMapSerializer} and registers its serializer for the several ImmutableMap related + * classes. + * + * @param kryo the {@link Kryo} instance to set the serializer on + */ + public static void registerSerializers(final Kryo kryo) { + + // we're writing a HashMap, therefore we should register it + kryo.register(java.util.HashMap.class); + + final ImmutableMapSerializer serializer = new ImmutableMapSerializer(); + + // ImmutableMap (abstract class) + // +- EmptyImmutableBiMap + // +- SingletonImmutableBiMap + // +- RegularImmutableMap + // +- ImmutableEnumMap + // +- RowMap from DenseImmutableTable + // +- Row from DenseImmutableTable + // +- ColumnMap from DenseImmutableTable + // +- Column from DenseImmutableTable + + kryo.register(ImmutableMap.class, serializer); + kryo.register(ImmutableMap.of().getClass(), serializer); + + Object o1 = new Object(); + Object o2 = new Object(); + + kryo.register(ImmutableMap.of(o1, o1).getClass(), serializer); + kryo.register(ImmutableMap.of(o1, o1, o2, o2).getClass(), serializer); + + Map enumMap = new EnumMap(DummyEnum.class); + for (DummyEnum e : DummyEnum.values()) { + enumMap.put(e, o1); + } + + kryo.register(ImmutableMap.copyOf(enumMap).getClass(), serializer); + + ImmutableTable denseImmutableTable = ImmutableTable + .builder() + .put("a", 1, 1) + .put("b", 1, 1) + .build(); + + kryo.register(denseImmutableTable.rowMap().getClass(), serializer); // RowMap + kryo.register(denseImmutableTable.rowMap().get("a").getClass(), serializer); // Row + kryo.register(denseImmutableTable.columnMap().getClass(), serializer); // ColumnMap + kryo.register(denseImmutableTable.columnMap().get(1).getClass(), serializer); // Column + } + + private enum DummyEnum { + VALUE1, + VALUE2 + } +} diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/KryoMetaStoreEventSerDe.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/KryoMetaStoreEventSerDe.java new file mode 100644 index 0000000..35bde25 --- /dev/null +++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/KryoMetaStoreEventSerDe.java @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2016-2019 Expedia Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.shunting.yard.common.io.jackson; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.objenesis.instantiator.ObjectInstantiator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Registration; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.ImmutableMap; + +import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe; + +public class KryoMetaStoreEventSerDe implements MetaStoreEventSerDe { + private static final Logger log = LoggerFactory.getLogger(KryoMetaStoreEventSerDe.class); + + private final Kryo kryo; + + public KryoMetaStoreEventSerDe() { + kryo = new Kryo(); + } + + // private void registerSerializers(SimpleModule module) { + // module.addSerializer(new SkewedInfoSerializer()); + // module.addSerializer(new JacksonThriftSerializer<>(TBase.class)); + // } + // + // private void registerDeserializers(SimpleModule module) { + // module.addDeserializer(SkewedInfo.class, new SkewedInfoDeserializer()); + // } + + @Override + public byte[] marshal(ListenerEvent listenerEvent) throws MetaException { + try { + log.debug("Marshalling event: {}", listenerEvent); + Output output = new Output(new ByteArrayOutputStream()); + Registration registration = kryo.register(CreateTableEvent.class); + kryo.register(HMSHandler.class); + kryo.register(EnvironmentContext.class); + + ImmutableMapSerializer.registerSerializers(kryo); + kryo.register(ImmutableMap.class, new ImmutableMapSerializer()); + + // + // Objenesis objenesis = new ObjenesisStd(); + // + // ObjectInstantiator createTableInstantiator = objenesis + // .getInstantiatorOf(CreateTableEvent.class); + // + // registration.setInstantiator(createTableInstantiator); + + registration.setInstantiator(new ObjectInstantiator() { + + @Override + public EnvironmentContext newInstance() { + EnvironmentContext ctxt = null; + ctxt = new EnvironmentContext(); + return ctxt; + } + + }); + + registration.setInstantiator(new ObjectInstantiator() { + + @Override + public CreateTableEvent newInstance() { + CreateTableEvent event = null; + try { + event = new CreateTableEvent(new Table(), true, new HMSHandler("hello", new HiveConf(), false)); + } catch (MetaException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return event; + } + + }); + + registration.setInstantiator(new ObjectInstantiator() { + + @Override + public HMSHandler newInstance() { + HMSHandler hmsHandler = null; + try { + hmsHandler = new HMSHandler("hello", new HiveConf(), false); + } catch (MetaException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return hmsHandler; + } + + }); + + kryo.writeClassAndObject(output, listenerEvent); + byte[] bytes = output.toBytes(); + if (log.isDebugEnabled()) { + log.debug("Marshalled event is: {}", new String(bytes)); + } + return bytes; + } catch (Exception e) { + String message = "Unable to marshal event " + listenerEvent; + log.error(message, e); + throw new MetaException(message); + } + } + + @Override + public ListenerEvent unmarshal(byte[] payload) throws MetaException { + try { + // System.out.println("Marshalled event is: " + new String(payload)); + ByteArrayInputStream buffer = new ByteArrayInputStream(payload); + Input input = new Input(buffer); + // As we don't know the type in advance we can only deserialize the event twice: + // 1. Create a dummy object just to find out the type + // T genericEvent = (T) kryo.readObject(input, ListenerEvent.class); + // log.debug("Umarshal event of type: {}", genericEvent.getEventType()); + // 2. Deserialize the actual object + + // T event = mapper.readerFor(genericEvent.getEventType().eventClass()).readValue(buffer); + ListenerEvent event = (ListenerEvent) kryo.readClassAndObject(input); + buffer.reset(); + input.close(); + log.debug("Unmarshalled event is: {}", event); + return event; + } catch (Exception e) { + String message = "Unable to unmarshal event from payload"; + System.out.println(message); + System.out.println(e); + throw new MetaException(message); + } + } + +} diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/AbstractKryoMetaStoreEventSerDeTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/AbstractKryoMetaStoreEventSerDeTest.java new file mode 100644 index 0000000..564f508 --- /dev/null +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/AbstractKryoMetaStoreEventSerDeTest.java @@ -0,0 +1,134 @@ +/** + * Copyright (C) 2016-2019 Expedia Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.shunting.yard.common.io; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static com.hotels.shunting.yard.common.io.SerDeTestUtils.DATABASE; +import static com.hotels.shunting.yard.common.io.SerDeTestUtils.TABLE; +import static com.hotels.shunting.yard.common.io.SerDeTestUtils.createEnvironmentContext; +import static com.hotels.shunting.yard.common.io.SerDeTestUtils.createPartition; +import static com.hotels.shunting.yard.common.io.SerDeTestUtils.createTable; + +import java.util.Arrays; + +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetTableRequest; +import org.apache.hadoop.hive.metastore.api.GetTableResult; +import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public abstract class AbstractKryoMetaStoreEventSerDeTest { + + private static InsertEventRequestData mockInsertEventRequestData() { + InsertEventRequestData eventRequestData = mock(InsertEventRequestData.class); + when(eventRequestData.getFilesAdded()).thenReturn(Arrays.asList("file_0000")); + return eventRequestData; + } + + private static HMSHandler mockHandler() throws Exception { + GetTableResult getTableResult = mock(GetTableResult.class); + when(getTableResult.getTable()).thenReturn(createTable()); + HMSHandler handler = mock(HMSHandler.class); + when(handler.get_table_req(any(GetTableRequest.class))).thenReturn(getTableResult); + return handler; + } + + private static CreateTableEvent CreateTableEvent() throws Exception { + CreateTableEvent event = new CreateTableEvent(createTable(), true, mockHandler()); + event.setEnvironmentContext(createEnvironmentContext()); + return event; + } + + private static AlterTableEvent AlterTableEvent() throws Exception { + AlterTableEvent event = new AlterTableEvent(createTable(), createTable(new FieldSchema("new_col", "string", null)), + true, mockHandler()); + event.setEnvironmentContext(createEnvironmentContext()); + return event; + } + + private static DropTableEvent DropTableEvent() throws Exception { + DropTableEvent event = new DropTableEvent(createTable(), true, false, mockHandler()); + event.setEnvironmentContext(createEnvironmentContext()); + return event; + } + + private static AddPartitionEvent AddPartitionEvent() throws Exception { + AddPartitionEvent event = new AddPartitionEvent(createTable(), createPartition("a"), true, mockHandler()); + event.setEnvironmentContext(createEnvironmentContext()); + return event; + } + + private static AlterPartitionEvent AlterPartitionEvent() throws Exception { + AlterPartitionEvent event = new AlterPartitionEvent(createPartition("a"), createPartition("b"), createTable(), true, + mockHandler()); + event.setEnvironmentContext(createEnvironmentContext()); + return event; + } + + private static DropPartitionEvent DropPartitionEvent() throws Exception { + DropPartitionEvent event = new DropPartitionEvent(createTable(), createPartition("a"), true, false, mockHandler()); + event.setEnvironmentContext(createEnvironmentContext()); + return event; + } + + private static InsertEvent InsertEvent() throws Exception { + InsertEvent event = new InsertEvent(DATABASE, TABLE, Arrays.asList("a"), mockInsertEventRequestData(), true, + mockHandler()); + event.setEnvironmentContext(createEnvironmentContext()); + return event; + } + + @Parameters(name = "{index}: {0}") + public static ListenerEvent[] data() throws Exception { + return new ListenerEvent[] { + CreateTableEvent(), + AlterTableEvent(), + DropTableEvent(), + AddPartitionEvent(), + AlterPartitionEvent(), + DropPartitionEvent(), + InsertEvent() }; + } + + public @Parameter ListenerEvent event; + + protected abstract MetaStoreEventSerDe serDe(); + + @Test + public void typical() throws Exception { + ListenerEvent processedEvent = serDe().unmarshal(serDe().marshal(event)); + assertThat(processedEvent).isNotSameAs(event).isEqualTo(event); + } + +} diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDeTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDeTest.java index 6a283b5..5614471 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDeTest.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDeTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import com.hotels.shunting.yard.common.ShuntingYardException; import com.hotels.shunting.yard.common.io.jackson.JsonMetaStoreEventSerDe; +import com.hotels.shunting.yard.common.io.jackson.KryoMetaStoreEventSerDe; import com.hotels.shunting.yard.common.io.java.JavaMetaStoreEventSerDe; public class MetaStoreEventSerDeTest { @@ -39,6 +40,12 @@ public void instantiateJsonSerDe() { assertThat(serDe).isExactlyInstanceOf(JsonMetaStoreEventSerDe.class); } + @Test + public void instantiateKryoSerDe() { + MetaStoreEventSerDe serDe = serDeForClassName(KryoMetaStoreEventSerDe.class.getName()); + assertThat(serDe).isExactlyInstanceOf(KryoMetaStoreEventSerDe.class); + } + @Test(expected = ShuntingYardException.class) public void invalidSerDeClassName() { serDeForClassName("com.hotels.shunting.yard.common.io.unknown.MetaStoreEventSerDe"); diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/SerDeTestUtils.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/SerDeTestUtils.java index 6d25d79..59d2bb4 100644 --- a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/SerDeTestUtils.java +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/SerDeTestUtils.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -150,7 +150,7 @@ public static Partition createPartition(String value) { public static EnvironmentContext createEnvironmentContext() { // Fully populated EnvironmentContext to make sure all fields are serde properly EnvironmentContext context = new EnvironmentContext(); - context.setProperties(ImmutableMap.of("context_key", "context_value")); + context.setProperties(ImmutableMap.of("context_key", "context_value", "c1", "v1", "c2", "v2")); return context; } diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/jackson/KryoMetaStoreEventSerDeTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/jackson/KryoMetaStoreEventSerDeTest.java new file mode 100644 index 0000000..71bce48 --- /dev/null +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/jackson/KryoMetaStoreEventSerDeTest.java @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2016-2019 Expedia Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.shunting.yard.common.io.jackson; + +import com.hotels.shunting.yard.common.io.AbstractKryoMetaStoreEventSerDeTest; +import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe; + +public class KryoMetaStoreEventSerDeTest extends AbstractKryoMetaStoreEventSerDeTest { + + private final KryoMetaStoreEventSerDe serDe = new KryoMetaStoreEventSerDe(); + + @Override + protected MetaStoreEventSerDe serDe() { + return serDe; + } + +} diff --git a/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/jackson/KryoSerdeTest.java b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/jackson/KryoSerdeTest.java new file mode 100644 index 0000000..e1a7af4 --- /dev/null +++ b/shunting-yard-common/src/test/java/com/hotels/shunting/yard/common/io/jackson/KryoSerdeTest.java @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2016-2019 Expedia Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hotels.shunting.yard.common.io.jackson; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import static com.hotels.shunting.yard.common.io.SerDeTestUtils.createEnvironmentContext; +import static com.hotels.shunting.yard.common.io.SerDeTestUtils.createTable; + +import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; +import org.apache.hadoop.hive.metastore.api.GetTableRequest; +import org.apache.hadoop.hive.metastore.api.GetTableResult; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KryoSerdeTest { + + private static HMSHandler mockHandler() throws Exception { + GetTableResult getTableResult = mock(GetTableResult.class); + when(getTableResult.getTable()).thenReturn(createTable()); + HMSHandler handler = mock(HMSHandler.class); + when(handler.get_table_req(any(GetTableRequest.class))).thenReturn(getTableResult); + return handler; + } + + private static CreateTableEvent createTableEvent() throws Exception { + CreateTableEvent event = new CreateTableEvent(createTable(), true, mockHandler()); + event.setEnvironmentContext(createEnvironmentContext()); + return event; + } + + @Test + public void typical() throws Exception { + KryoMetaStoreEventSerDe serde = new KryoMetaStoreEventSerDe(); + + ListenerEvent processedEvent = serde.unmarshal(serde.marshal(createTableEvent())); + + System.out.println(processedEvent.getStatus()); + + // assertThat(processedEvent).isNotSameAs(event).isEqualTo(event); + } +} diff --git a/shunting-yard-emitter/shunting-yard-emitter-kafka/pom.xml b/shunting-yard-emitter/shunting-yard-emitter-kafka/pom.xml index c3dbf24..3edd19e 100644 --- a/shunting-yard-emitter/shunting-yard-emitter-kafka/pom.xml +++ b/shunting-yard-emitter/shunting-yard-emitter-kafka/pom.xml @@ -74,6 +74,7 @@ com.hotels:shunting-yard-emitter* com.hotels:hcommon-hive-metastore org.apache.kafka:* + com.esotericsoftware:* diff --git a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java index 734ea5a..d2bd3ca 100644 --- a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java +++ b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java @@ -16,7 +16,7 @@ package com.hotels.shunting.yard.emitter.kafka; import com.hotels.shunting.yard.common.Property; -import com.hotels.shunting.yard.common.io.jackson.JsonMetaStoreEventSerDe; +import com.hotels.shunting.yard.common.io.jackson.KryoMetaStoreEventSerDe; public enum KafkaProducerProperty implements Property { TOPIC("topic", null), @@ -28,7 +28,7 @@ public enum KafkaProducerProperty implements Property { BATCH_SIZE("batch.size", 16384), LINGER_MS("linger.ms", 1L), BUFFER_MEMORY("buffer.memory", 33554432L), - SERDE_CLASS("serde.class", JsonMetaStoreEventSerDe.class.getName()); + SERDE_CLASS("serde.class", KryoMetaStoreEventSerDe.class.getName()); private static final String PROPERTY_PREFIX = "com.hotels.shunting.yard.event.emitter.kafka."; diff --git a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java index 60f2a0d..e10f326 100644 --- a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java +++ b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java @@ -42,6 +42,7 @@ public void run() { Headers headers = pr.headers(); headers.add("eventType", message.getEventType().name().getBytes()); + System.out.println("*********EVENT TYPE=" + message.getEventType().name()); producer.send(pr); } diff --git a/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java b/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java index 7593357..e7cc36e 100644 --- a/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java +++ b/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2018 Expedia Inc. + * Copyright (C) 2016-2019 Expedia Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.