Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/improve logging and db connection #66

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static Object avroToMysqlType(Object value, MysqlDataTypes mysqlType) thr
{
if (mysqlType == MysqlDataTypes.BIT || mysqlType == MysqlDataTypes.TINYTEXT
|| mysqlType == MysqlDataTypes.MEDIUMTEXT || mysqlType == MysqlDataTypes.LONGTEXT
|| mysqlType == MysqlDataTypes.TEXT)
|| mysqlType == MysqlDataTypes.TEXT || mysqlType == MysqlDataTypes.VARCHARORTEXT)
{
return new String(((java.nio.ByteBuffer) value).array());
} else if(mysqlType == MysqlDataTypes.TINYBLOB || mysqlType == MysqlDataTypes.MEDIUMBLOB
Expand Down Expand Up @@ -87,7 +87,7 @@ else if (mysqlType == MysqlDataTypes.TIME)
}
else if (value instanceof String)
{
if (mysqlType == MysqlDataTypes.CHAR || mysqlType == MysqlDataTypes.VARCHAR)
if (mysqlType == MysqlDataTypes.CHAR || mysqlType == MysqlDataTypes.VARCHAR || mysqlType == MysqlDataTypes.VARCHARORTEXT)
{
return value;
}
Expand All @@ -103,7 +103,7 @@ else if (mysqlType == MysqlDataTypes.DECIMAL)
}
else if (value instanceof Utf8)
{
if (mysqlType == MysqlDataTypes.CHAR || mysqlType == MysqlDataTypes.VARCHAR)
if (mysqlType == MysqlDataTypes.CHAR || mysqlType == MysqlDataTypes.VARCHAR || mysqlType == MysqlDataTypes.VARCHARORTEXT)
{
return value.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public enum MysqlDataTypes
MEDIUMBLOB,
LONGBLOB,
ENUM,
VARCHARORTEXT,
SET;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
package com.flipkart.aesop.hbasedatalayer;

import java.beans.PropertyVetoException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import javax.sql.DataSource;


import java.sql.Statement;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

Expand Down Expand Up @@ -91,19 +96,19 @@ private void createJdbcTemplateMap() {
}

private DataSource getDataSource() {
ComboPooledDataSource comboPooledDataSource = null;
try {
comboPooledDataSource = new ComboPooledDataSource();
comboPooledDataSource.setDriverClass(getDriverClass());
comboPooledDataSource.setJdbcUrl(getJdbcUrl());
comboPooledDataSource.setProperties(getDataSourceProperties());
} catch (PropertyVetoException e) {
LOGGER.error(e.getMessage());
}

return comboPooledDataSource;

}
DriverManagerDataSource driverManagerDataSource = null;
try {
driverManagerDataSource = new DriverManagerDataSource();
driverManagerDataSource.setDriverClassName(getDriverClass());
driverManagerDataSource.setUrl(getJdbcUrl());
Properties props = new Properties();
props.put("phoenix.connection.autoCommit","true");
driverManagerDataSource.setConnectionProperties(props);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
return driverManagerDataSource;
}

public Properties getDataSourceProperties() {
return dataSourceProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

import javax.naming.OperationNotSupportedException;
import java.util.Date;
import java.util.Map;

/**
Expand Down Expand Up @@ -54,13 +56,49 @@ public Map<String, NamedParameterJdbcTemplate> getJdbcTemplateMap()
return jdbcTemplateMap;
}

private boolean validEvent(AbstractEvent event, long threadId) {
LOGGER.debug("validating primaryKeys for event " + event.getEntityName() + " in thread " + threadId);
boolean validEvent = true;
for (String primaryKey : event.getPrimaryKeySet()) {
Object primaryKeyValue = event.getFieldMapPair().get(primaryKey);
LOGGER.debug("primary key " + primaryKey + " value " + primaryKeyValue);
if (null == primaryKeyValue || "".equalsIgnoreCase(primaryKeyValue.toString().trim())) {
validEvent = false;
} else if (primaryKeyValue instanceof String) {
LOGGER.debug("updating primary key value in thread " + threadId + " original value " + primaryKeyValue +
" updated value " + primaryKeyValue.toString().trim());
event.getFieldMapPair().put(primaryKey, primaryKeyValue.toString().trim());
}
}
return validEvent;
}

@Override
protected ConsumerCallbackResult upsert(AbstractEvent event)
{
String upsertQuery = generateUpsertQuery(event);
NamedParameterJdbcTemplate jdbcTemplate = jdbcTemplateMap.get(event.getNamespaceName());
jdbcTemplate.update(upsertQuery, event.getFieldMapPair());
return ConsumerCallbackResult.SUCCESS;
Long startTime = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
LOGGER.debug("Starting UPSERT " + threadId );
try {
String upsertQuery = generateUpsertQuery(event);
LOGGER.debug("Query executed thread " + threadId + " query " + upsertQuery);
LOGGER.debug("DATA " + threadId + " values " + event.getFieldMapPair());
if (validEvent(event, threadId)) {
NamedParameterJdbcTemplate jdbcTemplate = jdbcTemplateMap.get(event.getNamespaceName());
jdbcTemplate.update(upsertQuery, event.getFieldMapPair());
Long stopTime = System.currentTimeMillis();
LOGGER.debug("Upsert done for thread " + threadId + " Time taken to upsert " + (stopTime - startTime));
} else {
LOGGER.error("Invalid event obtained for thread " + threadId + " Event " + event.toString());
}
LOGGER.debug("End SUCCESS " + threadId);
return ConsumerCallbackResult.SUCCESS;
}
catch(Exception ex) {
LOGGER.error("Exception for thread " + threadId + " event " + event.toString() + " exception " + ex.getMessage(), ex);
LOGGER.debug("End FAILED "+ threadId );
return ConsumerCallbackResult.ERROR;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ protected byte[] serializeEvent(GenericRecord record) throws EventCreationExcept
catch (RuntimeException ex)
{
LOGGER.error("Failed to serialize avro record : " + record + " Exception : " + ex.getMessage()
+ " Cause: " + ex.getCause());
+ " Cause: " + ex.getCause(), ex);
throw new EventCreationException("Failed to serialize the Avro GenericRecord", ex);
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,26 @@ public int compare(String o1, String o2)
}
});
int cnt = 0;
int columnsSize = columns.size();
for (Schema.Field field : orderedFields)
{
if (cnt < columnsSize) {
Column column = columns.get(cnt);
record.put(field.name(), column == null ? null : orToAvroMapper.orToAvroType(column));

} else {
record.put(field.name(), null);
}
cnt++;
}
LOGGER.info("Mapped GenricRecord for schema " + schema.getName() + " : " + record.toString());
return (T)record;
}
catch (Exception e)
{
LOGGER.error("Error while mapping to DefaultBinlogEvent . Exception : " + e.getMessage() + " Cause: "
+ e.getCause());
LOGGER.error("Error while mapping to DefaultBinlogEvent . Exception : " + e.getMessage() +
" Cause: " + e.getCause(), e);
LOGGER.error("Schema " + schema.getName() + " " + row.toString());
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ public class RelayConfig implements InitializingBean {
/** The MAX SCN file directory location */
private String maxScnDirectoryLocation;

/** The MAX_INITIAL_LINE_LENGTH configures netty's maxInitialLineLength */
private int maxInitialLineLength;

/** The MAX_HEADER_SIZE configures netty's maxHeaderSize */
private int maxHeaderSize;

/** The MAX_CHUNK_SIZE configures netty's maxChunkSize */
private int maxChunkSize;

/**
* Interface method implementation. Ensures that all property names start
* with {@link RelayConfig#RELAY_PROPERTIES_PREFIX}
Expand Down Expand Up @@ -125,4 +134,27 @@ public void setMaxScnDirectoryLocation(String maxScnDirectoryLocation) {
+ this.maxScnDirectoryLocation).getAbsolutePath());
}

public int getMaxInitialLineLength() {
return maxInitialLineLength;
}

public void setMaxInitialLineLength(int maxInitialLineLength) {
this.maxInitialLineLength = maxInitialLineLength;
}

public int getMaxHeaderSize() {
return maxHeaderSize;
}

public void setMaxHeaderSize(int maxHeaderSize) {
this.maxHeaderSize = maxHeaderSize;
}

public int getMaxChunkSize() {
return maxChunkSize;
}

public void setMaxChunkSize(int maxChunkSize) {
this.maxChunkSize = maxChunkSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ public void run() {

Map<String,Object> map = new HashMap<String,Object>();
map.put("producer",this.collector.producerSCN);
Map<String, Long> numberOfConsumers = new HashMap<String, Long>();
numberOfConsumers.put("numberOfConsumers", Long.parseLong(""+relay.getPeers().size()));
map.put("consumers", numberOfConsumers);
// we want stats of only connected clients as known to the Relay
Map<String,Long> connectedClientSCN = new HashMap<String,Long>();
Map<String,Long> groupHostClient = new HashMap<String,Long>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,26 @@ public class DefaultRelay extends HttpRelay {

/** List of disconnected peers. We'll use a copy on write list to deal with concurrency. writes are low on this list*/
private List<String> disconnectedPeers = new CopyOnWriteArrayList<String>();

/** The MAX_INITIAL_LINE_LENGTH configures netty's maxInitialLineLength */
private int maxInitialLineLength;

/** The MAX_HEADER_SIZE configures netty's maxHeaderSize */
private int maxHeaderSize;

/** The MAX_CHUNK_SIZE configures netty's maxChunkSize */
private int maxChunkSize;

/**
* Constructor for this class. Invokes constructor of the super-type with the passed-in arguments
*/
public DefaultRelay(StaticConfig config, PhysicalSourceStaticConfig[] pConfigs, SourceIdNameRegistry sourcesIdNameRegistry,
SchemaRegistryService schemaRegistry) throws IOException, InvalidConfigException, DatabusException {
SchemaRegistryService schemaRegistry, int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) throws IOException, InvalidConfigException, DatabusException {
super(config, pConfigs, sourcesIdNameRegistry, schemaRegistry);
metricsCollector = new MetricsCollector(this);
this.maxInitialLineLength = maxInitialLineLength;
this.maxHeaderSize = maxHeaderSize;
this.maxChunkSize = maxChunkSize;
}

/**
Expand Down Expand Up @@ -197,5 +209,16 @@ public List<ProducerRegistration> getProducerRegistrationList() {
return this.producerRegistrationList;
}
public MetricsCollector getMetricsCollector() { return metricsCollector; }


public int getMaxInitialLineLength() {
return maxInitialLineLength;
}

public int getMaxHeaderSize() {
return maxHeaderSize;
}

public int getMaxChunkSize() {
return maxChunkSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public DefaultRelay getObject() throws Exception {
}
//Initialising relay. Only passing the first static config as everything else apart from
// initial SCN per producer is the same. Initial SCN per producer has already been set
relay = new DefaultRelay(staticConfigList[0],pStaticConfigs,SourceIdNameRegistry.createFromIdNamePairs(staticConfigList[0].getSourceIds()),schemaRegistryService);
relay = new DefaultRelay(staticConfigList[0],pStaticConfigs,SourceIdNameRegistry.createFromIdNamePairs(staticConfigList[0].getSourceIds()),
schemaRegistryService,relayConfig.getMaxInitialLineLength(),relayConfig.getMaxHeaderSize(),relayConfig.getMaxChunkSize());

//Commenting out this line. The {@link #getMaxScnReaderWriters() getMaxScnReaderWriters} is not used anywhere.
//relay.setMaxScnReaderWriters(this.maxScnReaderWriters.get(this.producerRegistrationList.get(0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.flipkart.aesop.runtime.relay.DefaultRelay;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;

/**
* The <code>HttpRelayPipelineFactory</code> class is a code port of the Databus {@link com.linkedin.databus.container.netty.HttpRelayPipelineFactory} that
Expand All @@ -43,7 +44,14 @@ public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = oldPipelineFactory.getPipeline();
RelayStatisticsCollectingHandler relayStatsHandler =
new RelayStatisticsCollectingHandler(relay);
pipeline.addBefore("databusRequestRunner", "relayStatsCollector", relayStatsHandler);
pipeline.addBefore("databusRequestRunner", "relayStatsCollector", relayStatsHandler);
int maxInitialLineLength = relay.getMaxInitialLineLength();
int maxHeaderSize = relay.getMaxHeaderSize();
int maxChunkSize = relay.getMaxChunkSize();
if (0 != maxInitialLineLength && 0 != maxHeaderSize && 0 != maxChunkSize) {
HttpRequestDecoder httpRequestDecoder = new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize);
pipeline.replace(pipeline.get("decoder"),"decoder", httpRequestDecoder);
}
return pipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public void setProducerName(String producerName) {
public String getProducerSinceSCN() {
return producerSinceSCN;
}
public Long fetchProducerSinceSCNInLong() {
return Long.parseLong(producerSinceSCN);
}
public Long fetchProducerSinceSCNBinLong() {
return fetchProducerSinceSCNInLong() >> 32;
}
public void setProducerSinceSCN(String producerSinceSCN) {
this.producerSinceSCN = producerSinceSCN;
}
Expand Down Expand Up @@ -98,15 +104,19 @@ public void setHostGroupedClient() {

if(clientHostSCN == null) {
clientHostSCN = new HashMap<String, Long>();
clientHostSCN.put("PARTITIONS", 0L);
}

if(clientHostSCN.get("MIN") == null || clientHostSCN.get("MIN") > clientSCN) {
clientHostSCN.put("MIN", clientSCN);
clientHostSCN.put("MIN-BINLOG", clientSCN >> 32);
}

if(clientHostSCN.get("MAX") == null || clientHostSCN.get("MAX") < clientSCN) {
clientHostSCN.put("MAX", clientSCN);
clientHostSCN.put("MAX-BINLOG", clientSCN >> 32);
}
clientHostSCN.put("PARTITIONS", clientHostSCN.get("PARTITIONS") + 1);

this.hostGroupedClient.put(
clientHost, clientHostSCN
Expand Down
Loading