Skip to content
This repository has been archived by the owner on Oct 12, 2021. It is now read-only.

Old emitter with Kryo #43

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
<modules>
<module>shunting-yard-common</module>
<module>shunting-yard-emitter</module>
<module>shunting-yard-receiver</module>
<!--<module>shunting-yard-receiver</module>
<module>shunting-yard-replicator</module>
<module>shunting-yard-binary</module>
<module>shunting-yard-binary</module>-->
</modules>

<properties>
Expand Down
13 changes: 13 additions & 0 deletions shunting-yard-common/derby.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
----------------------------------------------------------------
Fri Jul 26 13:27:25 BST 2019:
Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.2.0 - (1582446): instance a816c00e-016c-2e40-0e8a-00000a772ec8
on database directory /Users/abhgupta/Desktop/workspace/shunting-yard/shunting-yard-common/metastore_db with class loader sun.misc.Launcher$AppClassLoader@3b192d32
Loaded from file:/Users/abhgupta/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar
java.vendor=AdoptOpenJDK
java.runtime.version=1.8.0_212-b03
user.dir=/Users/abhgupta/Desktop/workspace/shunting-yard/shunting-yard-common
os.name=Mac OS X
os.arch=x86_64
os.version=10.13.6
derby.system.home=null
Database Class Loader started - derby.database.classpath=''
22 changes: 21 additions & 1 deletion shunting-yard-common/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -24,12 +25,24 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Hive -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
Expand All @@ -40,6 +53,13 @@
<artifactId>hcommon-hive-metastore</artifactId>
</dependency>

<!-- Kryo -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>

<!-- Test -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2018 Expedia Inc.
* Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,10 @@
*/
package com.hotels.shunting.yard.common.emitter;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;

import static com.hotels.shunting.yard.common.emitter.EmitterUtils.error;
import static com.hotels.shunting.yard.common.event.CustomEventParameters.HIVE_VERSION;

import java.util.concurrent.ExecutorService;

Expand All @@ -37,11 +40,13 @@
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hive.common.util.HiveVersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hotels.shunting.yard.common.event.SerializableListenerEvent;
import com.hotels.shunting.yard.common.event.EventType;
import com.hotels.shunting.yard.common.event.SerializableListenerEventFactory;
import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe;
import com.hotels.shunting.yard.common.messaging.Message;
Expand Down Expand Up @@ -72,11 +77,13 @@ private MessageTask message(Message message) {
return new WrappingMessageTask(getMessageTaskFactory().newTask(message));
}

private Message withPayload(SerializableListenerEvent event) throws MetaException {
private Message withPayload(ListenerEvent event, String dbName, String tableName, EventType eventType)
throws MetaException {
return Message
.builder()
.database(event.getDatabaseName())
.table(event.getTableName())
.database(dbName)
.table(tableName)
.eventType(eventType)
.payload(getMetaStoreEventSerDe().marshal(event))
.build();
}
Expand All @@ -85,7 +92,12 @@ private Message withPayload(SerializableListenerEvent event) throws MetaExceptio
public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
log.info("Create table event received");
try {
executorService.submit(message(withPayload(serializableListenerEventFactory.create(tableEvent))));
tableEvent.putParameter(HIVE_VERSION.varname(), HiveVersionInfo.getVersion());
tableEvent.putParameter(METASTOREURIS.varname, super.getConf().get(METASTOREURIS.varname));

executorService
.submit(message(withPayload(tableEvent, tableEvent.getTable().getDbName(),
tableEvent.getTable().getTableName(), EventType.ON_CREATE_TABLE)));
} catch (Exception e) {
error(e);
}
Expand All @@ -95,7 +107,9 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
log.info("Drop table event received");
try {
executorService.submit(message(withPayload(serializableListenerEventFactory.create(tableEvent))));
executorService
.submit(message(withPayload(tableEvent, tableEvent.getTable().getDbName(),
tableEvent.getTable().getTableName(), EventType.ON_DROP_TABLE)));
} catch (Exception e) {
error(e);
}
Expand All @@ -105,7 +119,9 @@ public void onDropTable(DropTableEvent tableEvent) throws MetaException {
public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
log.info("Alter table event received");
try {
executorService.submit(message(withPayload(serializableListenerEventFactory.create(tableEvent))));
executorService
.submit(message(withPayload(tableEvent, tableEvent.getNewTable().getDbName(),
tableEvent.getNewTable().getTableName(), EventType.ON_ALTER_TABLE)));
} catch (Exception e) {
error(e);
}
Expand All @@ -115,7 +131,9 @@ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
log.info("Add partition event received");
try {
executorService.submit(message(withPayload(serializableListenerEventFactory.create(partitionEvent))));
executorService
.submit(message(withPayload(partitionEvent, partitionEvent.getTable().getDbName(),
partitionEvent.getTable().getTableName(), EventType.ON_ADD_PARTITION)));
} catch (Exception e) {
error(e);
}
Expand All @@ -125,7 +143,9 @@ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaExceptio
public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
log.info("Drop partition event received");
try {
executorService.submit(message(withPayload(serializableListenerEventFactory.create(partitionEvent))));
executorService
.submit(message(withPayload(partitionEvent, partitionEvent.getTable().getDbName(),
partitionEvent.getTable().getTableName(), EventType.ON_DROP_PARTITION)));
} catch (Exception e) {
error(e);
}
Expand All @@ -135,7 +155,9 @@ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaExcept
public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
log.info("Alter partition event received");
try {
executorService.submit(message(withPayload(serializableListenerEventFactory.create(partitionEvent))));
executorService
.submit(message(withPayload(partitionEvent, partitionEvent.getTable().getDbName(),
partitionEvent.getTable().getTableName(), EventType.ON_ALTER_PARTITION)));
} catch (Exception e) {
error(e);
}
Expand All @@ -145,7 +167,8 @@ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaExce
public void onInsert(InsertEvent insertEvent) throws MetaException {
log.info("Insert event received");
try {
executorService.submit(message(withPayload(serializableListenerEventFactory.create(insertEvent))));
executorService
.submit(message(withPayload(insertEvent, insertEvent.getTable(), insertEvent.getDb(), EventType.ON_INSERT)));
} catch (Exception e) {
error(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2018 Expedia Inc.
* Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,32 +15,41 @@
*/
package com.hotels.shunting.yard.common.event;

import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;

/**
* To make processing event in the receiver easier.
*/
public enum EventType {
ON_CREATE_TABLE(SerializableCreateTableEvent.class),
ON_ALTER_TABLE(SerializableAlterTableEvent.class),
ON_DROP_TABLE(SerializableDropTableEvent.class),
ON_ADD_PARTITION(SerializableAddPartitionEvent.class),
ON_ALTER_PARTITION(SerializableAlterPartitionEvent.class),
ON_DROP_PARTITION(SerializableDropPartitionEvent.class),
ON_INSERT(SerializableInsertEvent.class);
ON_CREATE_TABLE(CreateTableEvent.class),
ON_ALTER_TABLE(AlterTableEvent.class),
ON_DROP_TABLE(DropTableEvent.class),
ON_ADD_PARTITION(AddPartitionEvent.class),
ON_ALTER_PARTITION(AlterPartitionEvent.class),
ON_DROP_PARTITION(DropPartitionEvent.class),
ON_INSERT(InsertEvent.class);

private final Class<? extends SerializableListenerEvent> eventClass;
private final Class<? extends ListenerEvent> eventClass;

private EventType(Class<? extends SerializableListenerEvent> eventClass) {
private EventType(Class<? extends ListenerEvent> eventClass) {
if (eventClass == null) {
throw new NullPointerException("Parameter eventClass is required");
}
this.eventClass = eventClass;
}

public Class<? extends SerializableListenerEvent> eventClass() {
public Class<? extends ListenerEvent> eventClass() {
return eventClass;
}

public static EventType forClass(Class<? extends SerializableListenerEvent> clazz) {
public static EventType forClass(Class<? extends ListenerEvent> clazz) {
for (EventType e : values()) {
if (e.eventClass().equals(clazz)) {
return e;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.shunting.yard.common.event;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;

import static com.hotels.shunting.yard.common.event.CustomEventParameters.HIVE_VERSION;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hive.common.util.HiveVersionInfo;

public class ListenerEventFactory {

private final Configuration config;

public ListenerEventFactory(Configuration config) {
this.config = config;
}

private <T extends ListenerEvent> T addParams(T event) {
event.putParameter(HIVE_VERSION.varname(), HiveVersionInfo.getVersion());
event.putParameter(METASTOREURIS.varname, config.get(METASTOREURIS.varname));
return event;
}

public SerializableCreateTableEvent create(CreateTableEvent event) {
return new SerializableCreateTableEvent(addParams(event));
}

public SerializableAlterTableEvent create(AlterTableEvent event) {
return new SerializableAlterTableEvent(addParams(event));
}

public SerializableDropTableEvent create(DropTableEvent event) {
return new SerializableDropTableEvent(addParams(event));
}

public SerializableAddPartitionEvent create(AddPartitionEvent event) {
return new SerializableAddPartitionEvent(addParams(event));
}

public SerializableAlterPartitionEvent create(AlterPartitionEvent event) {
return new SerializableAlterPartitionEvent(addParams(event));
}

public SerializableDropPartitionEvent create(DropPartitionEvent event) {
return new SerializableDropPartitionEvent(addParams(event));
}

public SerializableInsertEvent create(InsertEvent event) {
return new SerializableInsertEvent(addParams(event));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2018 Expedia Inc.
* Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,9 +55,9 @@ protected SerializableListenerEvent(ListenerEvent event) {
environmentContext = event.getEnvironmentContext();
}

public EventType getEventType() {
return EventType.forClass(this.getClass());
}
// public EventType getEventType() {
// return EventType.forClass(this.getClass());
// }

public abstract String getDatabaseName();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2018 Expedia Inc.
* Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,9 +16,9 @@
package com.hotels.shunting.yard.common.io;

import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;

import com.hotels.shunting.yard.common.ShuntingYardException;
import com.hotels.shunting.yard.common.event.SerializableListenerEvent;

public interface MetaStoreEventSerDe {

Expand All @@ -31,8 +31,8 @@ static <T extends MetaStoreEventSerDe> T serDeForClassName(String className) {
}
}

byte[] marshal(SerializableListenerEvent listenerEvent) throws MetaException;
byte[] marshal(ListenerEvent listenerEvent) throws MetaException;

<T extends SerializableListenerEvent> T unmarshal(byte[] payload) throws MetaException;
<T extends ListenerEvent> T unmarshal(byte[] payload) throws MetaException;

}
Loading