diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/AvroToMysqlMapper.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/AvroToMysqlMapper.java index ffb62a1..056f6ad 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/AvroToMysqlMapper.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/AvroToMysqlMapper.java @@ -44,7 +44,7 @@ public static Object avroToMysqlType(Object value, MysqlDataTypes mysqlType) thr { if (mysqlType == MysqlDataTypes.BIT || mysqlType == MysqlDataTypes.TINYTEXT || mysqlType == MysqlDataTypes.MEDIUMTEXT || mysqlType == MysqlDataTypes.LONGTEXT - || mysqlType == MysqlDataTypes.TEXT) + || mysqlType == MysqlDataTypes.TEXT || mysqlType == MysqlDataTypes.VARCHARORTEXT) { return new String(((java.nio.ByteBuffer) value).array()); } else if(mysqlType == MysqlDataTypes.TINYBLOB || mysqlType == MysqlDataTypes.MEDIUMBLOB @@ -87,7 +87,7 @@ else if (mysqlType == MysqlDataTypes.TIME) } else if (value instanceof String) { - if (mysqlType == MysqlDataTypes.CHAR || mysqlType == MysqlDataTypes.VARCHAR) + if (mysqlType == MysqlDataTypes.CHAR || mysqlType == MysqlDataTypes.VARCHAR || mysqlType == MysqlDataTypes.VARCHARORTEXT) { return value; } @@ -103,7 +103,7 @@ else if (mysqlType == MysqlDataTypes.DECIMAL) } else if (value instanceof Utf8) { - if (mysqlType == MysqlDataTypes.CHAR || mysqlType == MysqlDataTypes.VARCHAR) + if (mysqlType == MysqlDataTypes.CHAR || mysqlType == MysqlDataTypes.VARCHAR || mysqlType == MysqlDataTypes.VARCHARORTEXT) { return value.toString(); } diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/MysqlDataTypes.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/MysqlDataTypes.java index 7248a2d..da7ed1e 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/MysqlDataTypes.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/MysqlDataTypes.java @@ -55,6 +55,7 @@ public enum MysqlDataTypes MEDIUMBLOB, LONGBLOB, ENUM, + VARCHARORTEXT, SET; } diff --git a/data-layers/data-layer-hbase/src/main/java/com/flipkart/aesop/hbasedatalayer/AbstractHBaseDataLayerFactory.java b/data-layers/data-layer-hbase/src/main/java/com/flipkart/aesop/hbasedatalayer/AbstractHBaseDataLayerFactory.java index 6c266c0..9f2d23a 100644 --- a/data-layers/data-layer-hbase/src/main/java/com/flipkart/aesop/hbasedatalayer/AbstractHBaseDataLayerFactory.java +++ b/data-layers/data-layer-hbase/src/main/java/com/flipkart/aesop/hbasedatalayer/AbstractHBaseDataLayerFactory.java @@ -15,14 +15,19 @@ package com.flipkart.aesop.hbasedatalayer; import java.beans.PropertyVetoException; +import java.sql.Connection; +import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import java.util.Properties; import javax.sql.DataSource; + +import java.sql.Statement; import org.springframework.beans.factory.FactoryBean; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.jdbc.datasource.DriverManagerDataSource; import org.trpr.platform.core.impl.logging.LogFactory; import org.trpr.platform.core.spi.logging.Logger; @@ -91,19 +96,19 @@ private void createJdbcTemplateMap() { } private DataSource getDataSource() { - ComboPooledDataSource comboPooledDataSource = null; - try { - comboPooledDataSource = new ComboPooledDataSource(); - comboPooledDataSource.setDriverClass(getDriverClass()); - comboPooledDataSource.setJdbcUrl(getJdbcUrl()); - comboPooledDataSource.setProperties(getDataSourceProperties()); - } catch (PropertyVetoException e) { - LOGGER.error(e.getMessage()); - } - - return comboPooledDataSource; - - } + DriverManagerDataSource driverManagerDataSource = null; + try { + driverManagerDataSource = new DriverManagerDataSource(); + driverManagerDataSource.setDriverClassName(getDriverClass()); + driverManagerDataSource.setUrl(getJdbcUrl()); + Properties props = new Properties(); + props.put("phoenix.connection.autoCommit","true"); + driverManagerDataSource.setConnectionProperties(props); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + return driverManagerDataSource; + } public Properties getDataSourceProperties() { return dataSourceProperties; diff --git a/data-layers/data-layer-hbase/src/main/java/com/flipkart/aesop/hbasedatalayer/upsert/HBaseUpsertDataLayer.java b/data-layers/data-layer-hbase/src/main/java/com/flipkart/aesop/hbasedatalayer/upsert/HBaseUpsertDataLayer.java index fea7687..79c1fe1 100644 --- a/data-layers/data-layer-hbase/src/main/java/com/flipkart/aesop/hbasedatalayer/upsert/HBaseUpsertDataLayer.java +++ b/data-layers/data-layer-hbase/src/main/java/com/flipkart/aesop/hbasedatalayer/upsert/HBaseUpsertDataLayer.java @@ -25,6 +25,8 @@ import org.trpr.platform.core.impl.logging.LogFactory; import org.trpr.platform.core.spi.logging.Logger; +import javax.naming.OperationNotSupportedException; +import java.util.Date; import java.util.Map; /** @@ -54,13 +56,49 @@ public Map getJdbcTemplateMap() return jdbcTemplateMap; } + private boolean validEvent(AbstractEvent event, long threadId) { + LOGGER.debug("validating primaryKeys for event " + event.getEntityName() + " in thread " + threadId); + boolean validEvent = true; + for (String primaryKey : event.getPrimaryKeySet()) { + Object primaryKeyValue = event.getFieldMapPair().get(primaryKey); + LOGGER.debug("primary key " + primaryKey + " value " + primaryKeyValue); + if (null == primaryKeyValue || "".equalsIgnoreCase(primaryKeyValue.toString().trim())) { + validEvent = false; + } else if (primaryKeyValue instanceof String) { + LOGGER.debug("updating primary key value in thread " + threadId + " original value " + primaryKeyValue + + " updated value " + primaryKeyValue.toString().trim()); + event.getFieldMapPair().put(primaryKey, primaryKeyValue.toString().trim()); + } + } + return validEvent; + } + @Override protected ConsumerCallbackResult upsert(AbstractEvent event) { - String upsertQuery = generateUpsertQuery(event); - NamedParameterJdbcTemplate jdbcTemplate = jdbcTemplateMap.get(event.getNamespaceName()); - jdbcTemplate.update(upsertQuery, event.getFieldMapPair()); - return ConsumerCallbackResult.SUCCESS; + Long startTime = System.currentTimeMillis(); + long threadId = Thread.currentThread().getId(); + LOGGER.debug("Starting UPSERT " + threadId ); + try { + String upsertQuery = generateUpsertQuery(event); + LOGGER.debug("Query executed thread " + threadId + " query " + upsertQuery); + LOGGER.debug("DATA " + threadId + " values " + event.getFieldMapPair()); + if (validEvent(event, threadId)) { + NamedParameterJdbcTemplate jdbcTemplate = jdbcTemplateMap.get(event.getNamespaceName()); + jdbcTemplate.update(upsertQuery, event.getFieldMapPair()); + Long stopTime = System.currentTimeMillis(); + LOGGER.debug("Upsert done for thread " + threadId + " Time taken to upsert " + (stopTime - startTime)); + } else { + LOGGER.error("Invalid event obtained for thread " + threadId + " Event " + event.toString()); + } + LOGGER.debug("End SUCCESS " + threadId); + return ConsumerCallbackResult.SUCCESS; + } + catch(Exception ex) { + LOGGER.error("Exception for thread " + threadId + " event " + event.toString() + " exception " + ex.getMessage(), ex); + LOGGER.debug("End FAILED "+ threadId ); + return ConsumerCallbackResult.ERROR; + } } /** diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/MysqlAvroEventManager.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/MysqlAvroEventManager.java index 7c65a38..e66cbeb 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/MysqlAvroEventManager.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/MysqlAvroEventManager.java @@ -194,7 +194,7 @@ protected byte[] serializeEvent(GenericRecord record) throws EventCreationExcept catch (RuntimeException ex) { LOGGER.error("Failed to serialize avro record : " + record + " Exception : " + ex.getMessage() - + " Cause: " + ex.getCause()); + + " Cause: " + ex.getCause(), ex); throw new EventCreationException("Failed to serialize the Avro GenericRecord", ex); } finally diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/mapper/impl/DefaultBinLogEventMapper.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/mapper/impl/DefaultBinLogEventMapper.java index 52b05da..72151be 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/mapper/impl/DefaultBinLogEventMapper.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/mapper/impl/DefaultBinLogEventMapper.java @@ -95,10 +95,16 @@ public int compare(String o1, String o2) } }); int cnt = 0; + int columnsSize = columns.size(); for (Schema.Field field : orderedFields) { + if (cnt < columnsSize) { Column column = columns.get(cnt); record.put(field.name(), column == null ? null : orToAvroMapper.orToAvroType(column)); + + } else { + record.put(field.name(), null); + } cnt++; } LOGGER.info("Mapped GenricRecord for schema " + schema.getName() + " : " + record.toString()); @@ -106,8 +112,9 @@ public int compare(String o1, String o2) } catch (Exception e) { - LOGGER.error("Error while mapping to DefaultBinlogEvent . Exception : " + e.getMessage() + " Cause: " - + e.getCause()); + LOGGER.error("Error while mapping to DefaultBinlogEvent . Exception : " + e.getMessage() + + " Cause: " + e.getCause(), e); + LOGGER.error("Schema " + schema.getName() + " " + row.toString()); } return null; } diff --git a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/config/RelayConfig.java b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/config/RelayConfig.java index b087938..fd387f2 100644 --- a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/config/RelayConfig.java +++ b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/config/RelayConfig.java @@ -57,6 +57,15 @@ public class RelayConfig implements InitializingBean { /** The MAX SCN file directory location */ private String maxScnDirectoryLocation; + /** The MAX_INITIAL_LINE_LENGTH configures netty's maxInitialLineLength */ + private int maxInitialLineLength; + + /** The MAX_HEADER_SIZE configures netty's maxHeaderSize */ + private int maxHeaderSize; + + /** The MAX_CHUNK_SIZE configures netty's maxChunkSize */ + private int maxChunkSize; + /** * Interface method implementation. Ensures that all property names start * with {@link RelayConfig#RELAY_PROPERTIES_PREFIX} @@ -125,4 +134,27 @@ public void setMaxScnDirectoryLocation(String maxScnDirectoryLocation) { + this.maxScnDirectoryLocation).getAbsolutePath()); } + public int getMaxInitialLineLength() { + return maxInitialLineLength; + } + + public void setMaxInitialLineLength(int maxInitialLineLength) { + this.maxInitialLineLength = maxInitialLineLength; + } + + public int getMaxHeaderSize() { + return maxHeaderSize; + } + + public void setMaxHeaderSize(int maxHeaderSize) { + this.maxHeaderSize = maxHeaderSize; + } + + public int getMaxChunkSize() { + return maxChunkSize; + } + + public void setMaxChunkSize(int maxChunkSize) { + this.maxChunkSize = maxChunkSize; + } } diff --git a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/metrics/MetricsCollector.java b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/metrics/MetricsCollector.java index 2726915..4025305 100644 --- a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/metrics/MetricsCollector.java +++ b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/metrics/MetricsCollector.java @@ -139,6 +139,9 @@ public void run() { Map map = new HashMap(); map.put("producer",this.collector.producerSCN); + Map numberOfConsumers = new HashMap(); + numberOfConsumers.put("numberOfConsumers", Long.parseLong(""+relay.getPeers().size())); + map.put("consumers", numberOfConsumers); // we want stats of only connected clients as known to the Relay Map connectedClientSCN = new HashMap(); Map groupHostClient = new HashMap(); diff --git a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/DefaultRelay.java b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/DefaultRelay.java index 431a72e..867d954 100644 --- a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/DefaultRelay.java +++ b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/DefaultRelay.java @@ -66,14 +66,26 @@ public class DefaultRelay extends HttpRelay { /** List of disconnected peers. We'll use a copy on write list to deal with concurrency. writes are low on this list*/ private List disconnectedPeers = new CopyOnWriteArrayList(); + + /** The MAX_INITIAL_LINE_LENGTH configures netty's maxInitialLineLength */ + private int maxInitialLineLength; + + /** The MAX_HEADER_SIZE configures netty's maxHeaderSize */ + private int maxHeaderSize; + + /** The MAX_CHUNK_SIZE configures netty's maxChunkSize */ + private int maxChunkSize; /** * Constructor for this class. Invokes constructor of the super-type with the passed-in arguments */ public DefaultRelay(StaticConfig config, PhysicalSourceStaticConfig[] pConfigs, SourceIdNameRegistry sourcesIdNameRegistry, - SchemaRegistryService schemaRegistry) throws IOException, InvalidConfigException, DatabusException { + SchemaRegistryService schemaRegistry, int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) throws IOException, InvalidConfigException, DatabusException { super(config, pConfigs, sourcesIdNameRegistry, schemaRegistry); metricsCollector = new MetricsCollector(this); + this.maxInitialLineLength = maxInitialLineLength; + this.maxHeaderSize = maxHeaderSize; + this.maxChunkSize = maxChunkSize; } /** @@ -197,5 +209,16 @@ public List getProducerRegistrationList() { return this.producerRegistrationList; } public MetricsCollector getMetricsCollector() { return metricsCollector; } - + + public int getMaxInitialLineLength() { + return maxInitialLineLength; + } + + public int getMaxHeaderSize() { + return maxHeaderSize; + } + + public int getMaxChunkSize() { + return maxChunkSize; + } } diff --git a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/DefaultRelayFactory.java b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/DefaultRelayFactory.java index df686ea..bff4910 100644 --- a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/DefaultRelayFactory.java +++ b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/DefaultRelayFactory.java @@ -111,7 +111,8 @@ public DefaultRelay getObject() throws Exception { } //Initialising relay. Only passing the first static config as everything else apart from // initial SCN per producer is the same. Initial SCN per producer has already been set - relay = new DefaultRelay(staticConfigList[0],pStaticConfigs,SourceIdNameRegistry.createFromIdNamePairs(staticConfigList[0].getSourceIds()),schemaRegistryService); + relay = new DefaultRelay(staticConfigList[0],pStaticConfigs,SourceIdNameRegistry.createFromIdNamePairs(staticConfigList[0].getSourceIds()), + schemaRegistryService,relayConfig.getMaxInitialLineLength(),relayConfig.getMaxHeaderSize(),relayConfig.getMaxChunkSize()); //Commenting out this line. The {@link #getMaxScnReaderWriters() getMaxScnReaderWriters} is not used anywhere. //relay.setMaxScnReaderWriters(this.maxScnReaderWriters.get(this.producerRegistrationList.get(0))); diff --git a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/netty/HttpRelayPipelineFactory.java b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/netty/HttpRelayPipelineFactory.java index c729394..55ced9e 100644 --- a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/netty/HttpRelayPipelineFactory.java +++ b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/relay/netty/HttpRelayPipelineFactory.java @@ -18,6 +18,7 @@ import com.flipkart.aesop.runtime.relay.DefaultRelay; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.handler.codec.http.HttpRequestDecoder; /** * The HttpRelayPipelineFactory class is a code port of the Databus {@link com.linkedin.databus.container.netty.HttpRelayPipelineFactory} that @@ -43,7 +44,14 @@ public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = oldPipelineFactory.getPipeline(); RelayStatisticsCollectingHandler relayStatsHandler = new RelayStatisticsCollectingHandler(relay); - pipeline.addBefore("databusRequestRunner", "relayStatsCollector", relayStatsHandler); + pipeline.addBefore("databusRequestRunner", "relayStatsCollector", relayStatsHandler); + int maxInitialLineLength = relay.getMaxInitialLineLength(); + int maxHeaderSize = relay.getMaxHeaderSize(); + int maxChunkSize = relay.getMaxChunkSize(); + if (0 != maxInitialLineLength && 0 != maxHeaderSize && 0 != maxChunkSize) { + HttpRequestDecoder httpRequestDecoder = new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize); + pipeline.replace(pipeline.get("decoder"),"decoder", httpRequestDecoder); + } return pipeline; } } \ No newline at end of file diff --git a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/spring/web/RelayInfo.java b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/spring/web/RelayInfo.java index 04783a5..5387152 100644 --- a/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/spring/web/RelayInfo.java +++ b/runtimes/runtime-relay/src/main/java/com/flipkart/aesop/runtime/spring/web/RelayInfo.java @@ -57,6 +57,12 @@ public void setProducerName(String producerName) { public String getProducerSinceSCN() { return producerSinceSCN; } + public Long fetchProducerSinceSCNInLong() { + return Long.parseLong(producerSinceSCN); + } + public Long fetchProducerSinceSCNBinLong() { + return fetchProducerSinceSCNInLong() >> 32; + } public void setProducerSinceSCN(String producerSinceSCN) { this.producerSinceSCN = producerSinceSCN; } @@ -98,15 +104,19 @@ public void setHostGroupedClient() { if(clientHostSCN == null) { clientHostSCN = new HashMap(); + clientHostSCN.put("PARTITIONS", 0L); } if(clientHostSCN.get("MIN") == null || clientHostSCN.get("MIN") > clientSCN) { clientHostSCN.put("MIN", clientSCN); + clientHostSCN.put("MIN-BINLOG", clientSCN >> 32); } if(clientHostSCN.get("MAX") == null || clientHostSCN.get("MAX") < clientSCN) { clientHostSCN.put("MAX", clientSCN); + clientHostSCN.put("MAX-BINLOG", clientSCN >> 32); } + clientHostSCN.put("PARTITIONS", clientHostSCN.get("PARTITIONS") + 1); this.hostGroupedClient.put( clientHost, clientHostSCN diff --git a/runtimes/runtime-relay/src/main/resources/WEB-INF/pages/relays.ftl b/runtimes/runtime-relay/src/main/resources/WEB-INF/pages/relays.ftl index bfb7806..c8b3095 100644 --- a/runtimes/runtime-relay/src/main/resources/WEB-INF/pages/relays.ftl +++ b/runtimes/runtime-relay/src/main/resources/WEB-INF/pages/relays.ftl @@ -1,79 +1,94 @@ <#include "header.ftl"> -
-

Registered Relays

-
- - - - - - - - - - - - - <#list relayInfos as relay> - - - - <#if (relay.lSourceInfos?size > 0)> - - <#else> - - - - - - -
Physical SourceLogical SourceProducer
IDNameConsumersLast SCN
${relay.getpSourceId()}${relay.getpSourceName()} - ${relay.lSourceInfos?size} - - None - ${relay.clientInfos?size}${relay.getProducerSinceSCN()}
-
- <#if relayInfos?size gt 0 > -
- - - - - - - - - +
+

Registered Relays

- <#list relayInfos[0].hostGroupedClient?keys as clientHost> -
+
+
Consumers
- Host - - Trailing SCN - - Leading SCN -
+ + + + + + + + + + + + + + <#list relayInfos as relay> + + + + <#if (relay.lSourceInfos?size > 0)> - - - - -
Physical SourceLogical SourceProducerConsumers
IDNameConsumersLast SCNBinlog
${relay.getpSourceId()}${relay.getpSourceName()} - ${ clientHost } + ${relay.lSourceInfos?size} ${ relayInfos[0].hostGroupedClient[clientHost]["MIN"]?c }${ relayInfos[0].hostGroupedClient[clientHost]["MAX"]?c }
-
- -
- -
+ <#else> + + None + + + ${relay.clientInfos?size} + ${relay.getProducerSinceSCN()} + ${relay.fetchProducerSinceSCNBinLong()?c} + + + + + + + + + + + + + + + + -
- + + + + + + + + + + +
+ Host + Partitions + Trailing SCN + + Leading SCN +
SCNBinlogLagSCNBinlogLag
+ ${ clientHost } + ${relay.hostGroupedClient[clientHost]["PARTITIONS"]?c}${relay.hostGroupedClient[clientHost]["MIN"]?c}${relay.hostGroupedClient[clientHost]["MIN-BINLOG"]?c}${(relay.fetchProducerSinceSCNInLong()-relay.hostGroupedClient[clientHost]["MIN"])?c}${relay.hostGroupedClient[clientHost]["MAX"]?c}${relay.hostGroupedClient[clientHost]["MAX-BINLOG"]?c}${(relay.fetchProducerSinceSCNInLong()-relay.hostGroupedClient[clientHost]["MAX"])?c}
+ + + + - +
+ + + +
+ + + <#include "footer.ftl">