Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Null Pointer Exception in NSQConsumer #10

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ for (int i=0; i < iterations; i++) {
* [slf4j][slf4j]
* [trendrr-oss][trendrr-oss]

Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.NSQLookup interface using a different json parser
Note: the trendrr-oss dependancy can easily be swapped out by implementing the com.trendrr.nsq.lookup.NSQLookup interface using a different json parser


[nsq]: https://github.com/bitly/nsq
Expand Down
95 changes: 78 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@

<groupId>com.github.dustismo</groupId>
<artifactId>trendrr-nsq-client</artifactId>
<version>1.2-SNAPSHOT</version>
<version>1.3-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>com.trendrr.oss</groupId>
<artifactId>trendrr-oss</artifactId>
<version>1.0</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand All @@ -32,23 +38,78 @@
<artifactId>slf4j-api</artifactId>
<version>1.6.4</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.2.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.2.2</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.2.2</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>r09</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>



</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<phase>verify</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.10</version>
<configuration>
<forkMode>always</forkMode>
<systemPropertyVariables>
<log4j.configuration>file:${basedir}/src/test/resources/log4j-surefire.xml</log4j.configuration>
</systemPropertyVariables>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>verify</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
89 changes: 46 additions & 43 deletions src/main/java/com/trendrr/nsq/AbstractNSQClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,36 @@
*/
public abstract class AbstractNSQClient {

protected static Logger log = LoggerFactory.getLogger(AbstractNSQClient.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNSQClient.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? I'm not against the change (log vs LOGGER) but you'll need to update it in the other classes for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I happened to have modified these files when I was investigating the issue. I'd like to do the refactoring across all files but wanted to keep this fix rather small.


private static final int CLEAN_UP_FREQUENCY = 1000 * 60 * 2;

/**
* Protocol version sent to nsqd on initial connect
*/
public static byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes();
private static byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes();
private volatile long lookupPeriod = 60 * 1000; //how often to recheck for new nodes (and clean up non responsive nodes)

private int messagesPerBatch = 200;
private long lookupPeriod = 60 * 1000; // how often to recheck for new nodes (and clean up non responsive nodes)


Connections connections = new Connections();
private int messagesPerBatch = 200;
private final Connections connections = new Connections();
// Configure the client.
protected ClientBootstrap bootstrap = null;
protected Timer timer = null;
private ClientBootstrap bootstrap = null;
private Timer timer = null;


//this executor is where the callback code is handled
protected Executor executor = Executors.newSingleThreadExecutor();

//this executor is where the callback code is handled
protected Executor executor = Executors.newSingleThreadExecutor();

/**
* connects, ready to produce.
*/
public synchronized void start() {
this.connect();

if (timer != null) {
timer.cancel();
}
timer = new Timer();

//TODO Use a scheduled executor service instead of a timer here
timer.schedule(new TimerTask() {
@Override
public void run() {
Expand Down Expand Up @@ -113,28 +113,27 @@ public synchronized void setNettyExecutors(Executor boss, Executor worker) {
* Handles connection and sending magic protocol
* @param address
* @param port
* @return
* @return the created connection; <strong>null</strong> if no connection could be created
*/
protected Connection createConnection(String address, int port) {

// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));

// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
log.error("Caught", future.getCause());
return null;
}
log.info("Creating connection: " + address + " : " + port);
Connection conn = new Connection(address, port, channel, this);
conn.setMessagesPerBatch(this.messagesPerBatch);

ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
buf.writeBytes(MAGIC_PROTOCOL_VERSION);
channel.write(buf);

//indentify
ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));

// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
LOGGER.error("Unable to create connection, caught: ", future.getCause());
return null;
}

LOGGER.info("Creating connection: " + address + " : " + port);
Connection conn = new Connection(address, port, channel, this);
conn.setMessagesPerBatch(this.messagesPerBatch);
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
buf.writeBytes(MAGIC_PROTOCOL_VERSION);
channel.write(buf);
//indentify
try {
String identJson = "{" +
"\"short_id\":\"" + InetAddress.getLocalHost().getHostName() + "\"" +
Expand All @@ -145,7 +144,7 @@ protected Connection createConnection(String address, int port) {
conn.command(ident);

} catch (UnknownHostException e) {
log.error("Caught", e);
LOGGER.error("Caught", e);
}

return conn;
Expand All @@ -168,34 +167,39 @@ protected synchronized void connect() {


for (ConnectionAddress addr : addresses ) {
int num = addr.getPoolsize() - this.connections.connectionSize(addr.getHost(), addr.getPort());
int num = addr.getPoolsize() - connections.connectionSize(addr.getHost(), addr.getPort());
for (int i=0; i < num; i++) {
Connection conn = this.createConnection(addr.getHost(), addr.getPort());
this.connections.addConnection(conn);
Connection conn = createConnection(addr.getHost(), addr.getPort());
connections.addConnection(conn);

}
//TODO: handle negative num? (i.e. if user lowered the poolsize we should kill some connections)
}
this.cleanupOldConnections();
}

/**
* will run through and remove any connections that have not recieved a ping in the last 2 minutes.
* will run through and remove any connections that have not recieved a ping in the last N milliseconds
*/
public synchronized void cleanupOldConnections() {
Date cutoff = new Date(new Date().getTime() - (1000*60*2));
Date cutoff = new Date(new Date().getTime() - CLEAN_UP_FREQUENCY);
try {
for (Connection c : this.connections.getConnections()) {
if (cutoff.after(c.getLastHeartbeat())) {
log.warn("Removing dead connection: " + c.getHost() + ":" + c.getPort());
LOGGER.warn("Removing dead connection [host={}, port={}]", c.getHost(), c.getPort());
c.close();
connections.remove(c);
}
}
} catch (NoConnectionsException e) {
//ignore
//ignore - TODO: is it a good idea to swallow this exception?
}
}

public Connections getConnections(){
return connections;
}

public void setMessagesPerBatch(int messagesPerBatch) {
this.messagesPerBatch = messagesPerBatch;
}
Expand All @@ -209,14 +213,13 @@ public void setLookupPeriod(long periodMillis) {
* @param connection
*/
public synchronized void _disconnected(Connection connection) {
log.warn("Disconnected!" + connection);
LOGGER.warn("Client disconnected [connection={}]", connection);
this.connections.remove(connection);
}

public void close() {
this.timer.cancel();
this.connections.close();
this.bootstrap.releaseExternalResources();

}
}
38 changes: 19 additions & 19 deletions src/main/java/com/trendrr/nsq/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,23 @@
*
*/
public class Connection {
private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class);

protected static Logger log = LoggerFactory.getLogger(Connection.class);
private Channel channel;
private int heartbeats = 0;
private Date lastHeartbeat = new Date();

Channel channel;
int heartbeats = 0;
Date lastHeartbeat = new Date();
private NSQMessageCallback callback = null;
private AtomicLong totalMessages = new AtomicLong(0l);
private int messagesPerBatch = 200;

NSQMessageCallback callback = null;
AtomicLong totalMessages = new AtomicLong(0l);
int messagesPerBatch = 200;
private AbstractNSQClient client = null;

AbstractNSQClient client = null;
private String host = null;
private int port;

String host = null;
int port;

LinkedBlockingQueue<NSQCommand> requests = new LinkedBlockingQueue<NSQCommand>(1);
LinkedBlockingQueue<NSQFrame> responses = new LinkedBlockingQueue<NSQFrame>(1);
private LinkedBlockingQueue<NSQCommand> requests = new LinkedBlockingQueue<NSQCommand>(1);
private LinkedBlockingQueue<NSQFrame> responses = new LinkedBlockingQueue<NSQFrame>(1);


public Connection(String host, int port, Channel channel, AbstractNSQClient client) {
Expand Down Expand Up @@ -94,7 +93,7 @@ public void incoming(NSQFrame frame) {
try {
this.responses.offer(frame, 20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Incoming frame error", e);
LOGGER.error("Incoming frame error", e);
//TODO: what to do here? we should probably disconnect!
this.close();
}
Expand Down Expand Up @@ -123,19 +122,20 @@ public void incoming(NSQFrame frame) {
message.setMessage(msg.getMessageBody());
message.setTimestamp(new Date(TimeUnit.NANOSECONDS.toMillis(msg.getTimestamp())));
if (this.callback == null) {
log.warn("NO CAllback, dropping message: " + message);
LOGGER.warn("NO Callback, dropping message: " + message);
} else {
this.callback.message(message);
}
return;
}

log.warn("Unknown frame type: " + frame);
LOGGER.warn("Unknown frame type: " + frame);
}


void heartbeat() {
log.info("HEARTBEAT!");
// This should be logged at debug level - it's more useful for troubleshooting and should not interfere with actual INFO logs
LOGGER.debug("HEARTBEAT!");
this.heartbeats++;
this.lastHeartbeat = new Date();
//send NOP here.
Expand Down Expand Up @@ -184,9 +184,9 @@ public void close() {
try {
channel.close().await(10000);
} catch (Exception x) {
log.error("Caught", x);
LOGGER.error("Caught", x);
}
log.warn("Close called on connection: " + this);
LOGGER.warn("Close called on connection: " + this);
this._disconnected();
}

Expand Down
Loading