Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Null Pointer Exception in NSQConsumer #10

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ for (int i=0; i < iterations; i++) {
* [slf4j][slf4j]
* [trendrr-oss][trendrr-oss]

Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.NSQLookup interface using a different json parser
Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.lookup.NSQLookup interface using a different json parser


[nsq]: https://github.com/bitly/nsq
Expand Down
81 changes: 80 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@

<groupId>com.github.dustismo</groupId>
<artifactId>trendrr-nsq-client</artifactId>
<version>1.1-SNAPSHOT</version>
<version>1.3-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>com.trendrr.oss</groupId>
<artifactId>trendrr-oss</artifactId>
<version>1.0</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand All @@ -32,6 +38,79 @@
<artifactId>slf4j-api</artifactId>
<version>1.6.4</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.2.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.2.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.2.2</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>r09</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>



</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.10</version>
<configuration>
<forkMode>always</forkMode>
<systemPropertyVariables>
<log4j.configuration>file:${basedir}/src/test/resources/log4j-surefire.xml</log4j.configuration>
</systemPropertyVariables>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>verify</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
53 changes: 31 additions & 22 deletions src/main/java/com/trendrr/nsq/AbstractNSQClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,20 +37,22 @@
*/
public abstract class AbstractNSQClient {

protected static Logger log = LoggerFactory.getLogger(AbstractNSQClient.class);


private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNSQClient.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? I'm not against the change (log vs LOGGER) but you'll need to update it in the other classes for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I happened to have modified these files when I was investigating the issue. I'd like to do the refactoring across all files but wanted to keep this fix rather small.


private static final int CLEAN_UP_FREQUENCY = 1000 * 60 * 2;

/**
* Protocol version sent to nsqd on initial connect
*/
public static byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes();
public static long LOOKUP_PERIOD = 60*1000; //how often to recheck for new nodes (and clean up non responsive nodes)
private static byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes();
private static long LOOKUP_PERIOD = 60*1000; //how often to recheck for new nodes (and clean up non responsive nodes)


Connections connections = new Connections();
private final Connections connections = new Connections();
// Configure the client.
protected ClientBootstrap bootstrap = null;
protected Timer timer = null;
private ClientBootstrap bootstrap = null;
private Timer timer = null;


//this executor is where the callback code is handled
protected Executor executor = Executors.newSingleThreadExecutor();
Expand All @@ -67,6 +67,8 @@ public synchronized void start() {
timer.cancel();
}
timer = new Timer();

//TODO Use a scheduled executor service instead of a timer here
timer.schedule(new TimerTask() {
@Override
public void run() {
Expand Down Expand Up @@ -129,10 +131,11 @@ protected Connection createConnection(String address, int port) {
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
log.error("Caught", future.getCause());
LOGGER.error("Unable to create connection, caught: ", future.getCause());
return null;
}
log.warn("Creating connection: " + address + " : " + port);

LOGGER.warn("Creating connection: " + address + " : " + port);
Connection conn = new Connection(address, port, channel, this);
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
buf.writeBytes(MAGIC_PROTOCOL_VERSION);
Expand All @@ -149,7 +152,7 @@ protected Connection createConnection(String address, int port) {
conn.command(ident);

} catch (UnknownHostException e) {
log.error("Caught", e);
LOGGER.error("Caught", e);
}


Expand All @@ -176,10 +179,11 @@ protected synchronized void connect() {


for (ConnectionAddress addr : addresses ) {
int num = addr.getPoolsize() - this.connections.connectionSize(addr.getHost(), addr.getPort());
int num = addr.getPoolsize() - connections.connectionSize(addr.getHost(), addr.getPort());
for (int i=0; i < num; i++) {
Connection conn = this.createConnection(addr.getHost(), addr.getPort());
this.connections.addConnection(conn);
Connection conn = createConnection(addr.getHost(), addr.getPort());
connections.addConnection(conn);

}
//TODO: handle negative num? (i.e. if user lowered the poolsize we should kill some connections)
}
Expand All @@ -190,11 +194,11 @@ protected synchronized void connect() {
* will run through and remove any connections that have not recieved a ping in the last 2 minutes.
*/
public synchronized void cleanupOldConnections() {
Date cutoff = new Date(new Date().getTime() - (1000*60*2));
Date cutoff = new Date(new Date().getTime() - CLEAN_UP_FREQUENCY);
try {
for (Connection c : this.connections.getConnections()) {
if (cutoff.after(c.getLastHeartbeat())) {
log.warn("Removing dead connection: " + c.getHost() + ":" + c.getPort());
LOGGER.warn("Removing dead connection [host={}, port={}]", c.getHost(), c.getPort());
c.close();
connections.remove(c);
}
Expand All @@ -203,17 +207,22 @@ public synchronized void cleanupOldConnections() {
//ignore
}
}


public Connections getConnections(){
return connections;
}


/**
* for internal use. called when a connection is disconnected
* @param connection
*/
public synchronized void _disconnected(Connection connection) {
log.warn("Disconnected!" + connection);
LOGGER.warn("Client disconnected [connection={}]", connection);
this.connections.remove(connection);
}

public void close() {
public synchronized void close() {
this.timer.cancel();
this.connections.close();
this.bootstrap.releaseExternalResources();
Expand Down
9 changes: 2 additions & 7 deletions src/main/java/com/trendrr/nsq/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
*
*/
public class Connection {

protected static Logger log = LoggerFactory.getLogger(Connection.class);
private static Logger log = LoggerFactory.getLogger(Connection.class);

Channel channel;
int heartbeats = 0;
Expand Down Expand Up @@ -87,9 +86,6 @@ public void setMessagesPerBatch(int messagesPerBatch) {
}





public void incoming(NSQFrame frame) {
if (frame instanceof ResponseFrame) {
if ("_heartbeat_".equals(((ResponseFrame) frame).getMessage())) {
Expand Down Expand Up @@ -127,7 +123,7 @@ public void incoming(NSQFrame frame) {
message.setMessage(((MessageFrame) frame).getMessageBody());
message.setTimestamp(new Date(((MessageFrame) frame).getTimestamp()));
if (this.callback == null) {
log.warn("NO CAllback, dropping message: " + message);
log.warn("NO Callback, dropping message: " + message);
} else {
this.callback.message(message);
}
Expand All @@ -138,7 +134,6 @@ public void incoming(NSQFrame frame) {


void heartbeat() {
System.out.println("HEARTBEAT!");
this.heartbeats++;
this.lastHeartbeat = new Date();
//send NOP here.
Expand Down
60 changes: 42 additions & 18 deletions src/main/java/com/trendrr/nsq/ConnectionAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,64 @@


/**
* Holds the details of a connection
* @author Dustin Norlander
* @created Jan 22, 2013
*
*/
public class ConnectionAddress {

protected static Logger log = LoggerFactory.getLogger(ConnectionAddress.class);

private int poolsize = 1;

/**
private static Logger log = LoggerFactory.getLogger(ConnectionAddress.class);

private static final int DEFAULT_POOL_SIZE = 1;

private final String host;
private final int port;
private final int poolsize;

public ConnectionAddress(String host, int port) {
this(host, port, DEFAULT_POOL_SIZE);
}

public ConnectionAddress(String host, int port, int poolsize) {
this.host = host;
this.port = port;
this.poolsize = poolsize;
}

/**
* How many connections should we have in place?
* @return
*/
public int getPoolsize() {
return poolsize;
}
public void setPoolsize(int poolsize) {
this.poolsize = poolsize;
}



public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}

public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
private String host;
private int port;

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ConnectionAddress that = (ConnectionAddress) o;

if (port != that.port) return false;
if (!host.equals(that.host)) return false;

return true;
}

@Override
public int hashCode() {
int result = host.hashCode();
result = 31 * result + port;
return result;
}
}
Loading