Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding exception in Json RPC #16

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.boundary</groupId>
<artifactId>java-plugin-sdk</artifactId>
<version>0.6.8</version>
<version>0.6.9</version>
<name>Boundary Java Plugin SDK</name>
<properties>
<!-- Plugin Versions -->
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/com/boundary/plugin/sdk/EventSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
// limitations under the License.
package com.boundary.plugin.sdk;

import java.io.IOException;
import java.net.UnknownHostException;

public interface EventSink {

public void emit(Event event);

public String emit(final String eventRpcJson);
public String emit(final String eventRpcJson) throws IOException;

public boolean openConnection();
public int openConnection() throws UnknownHostException, IOException;

public boolean closeConnection();
public int closeConnection() throws IOException;

}
21 changes: 17 additions & 4 deletions src/main/java/com/boundary/plugin/sdk/EventSinkAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@
// limitations under the License.
package com.boundary.plugin.sdk;

import java.io.IOException;
import java.net.UnknownHostException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.boundary.plugin.sdk.rpc.RPC;

public class EventSinkAPI implements EventSink {

private static Logger LOG = LoggerFactory.getLogger(EventSinkAPI.class);

private final EventFormatter formatter;
private RPC rpc;

Expand All @@ -27,21 +35,26 @@ public EventSinkAPI() {

@Override
public void emit(Event event) {
rpc.send(formatter.format(event));
try {
rpc.send(formatter.format(event));
} catch (IOException e) {
LOG.error("IOException : Event could not be sent, " + e.getMessage());
}
}

@Override
public String emit(String eventRpcJson) {
public String emit(String eventRpcJson) throws IOException {
return rpc.send(eventRpcJson);

}

@Override
public boolean openConnection() {
public int openConnection() throws UnknownHostException, IOException {
return rpc.openConnection();
}

@Override
public boolean closeConnection() {
public int closeConnection() throws IOException {
return rpc.closeConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public String emit(final String eventRpcJson) {
}

@Override
public boolean openConnection() {
public int openConnection() {
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public boolean closeConnection() {
public int closeConnection() {
throw new UnsupportedOperationException("Not supported yet.");
}

Expand Down
119 changes: 52 additions & 67 deletions src/main/java/com/boundary/plugin/sdk/rpc/RPC.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,84 +36,68 @@ public static RPC getInstance() {
return rpc;
}

public synchronized boolean openConnection() {
LOG.debug("Open connection called ...");
public synchronized int openConnection() throws UnknownHostException, IOException {
if (this.socket == null) {
LOG.debug("socket instance is null creating a socket connection");
try {
this.socket = new Socket(HOSTNAME, PORTNUMBER);
LOG.debug("Socket Connection successfully created and assigned ");
this.bufferedOutputStream = new BufferedOutputStream(socket.getOutputStream());
LOG.debug("Socket output stream created and assigned");
this.inStream = this.socket.getInputStream();
LOG.debug("Socket input Stream created and assigned");
LOG.debug("socket instance is null creating a socket connection");
this.socket = new Socket(HOSTNAME, PORTNUMBER);
LOG.debug("Socket Connection successfully created and assigned ");
this.bufferedOutputStream = new BufferedOutputStream(socket.getOutputStream());
LOG.debug("Socket output stream created and assigned");
this.inStream = this.socket.getInputStream();
LOG.debug("Socket input Stream created and assigned");
connectionCount++;
LOG.debug("Connection count is {}", connectionCount);
// socket.setKeepAlive(true);
return connectionCount;

} else {
LOG.debug("Socket instance is not null, checking if the socket is still connected ...");
if (this.socket.isConnected()) {
connectionCount++;
LOG.debug("Connection count is {}",connectionCount);
//socket.setKeepAlive(true);
return true;
} catch (UnknownHostException e) {
LOG.error("Unable to open socket connection to host", e);
} catch (IOException e) {
LOG.error("Unable to open Socket Connection", e);
LOG.debug("Socket is still connected, increasing connection count to {} ", connectionCount);
}
} else{
LOG.debug("Socket instance is not null, checking if the socket is still connected ...");
if (this.socket.isConnected()) {
connectionCount++;
LOG.debug("Socket is still connected, increasing connection count to {} ",connectionCount);
return true;
}
return connectionCount;
}
return false;
}

public synchronized String send(final String contentRpcJson) {
LOG.debug("Send called for events list , {} ...",contentRpcJson);
public synchronized String send(final String contentRpcJson) throws IOException {

LOG.debug("Send called for events list , {} ...", contentRpcJson);
String result = null;
try {
if (socket != null) {
LOG.debug("writing the events to output stream ..");
byte[] utf8JsonString = contentRpcJson.getBytes("UTF-8");
bufferedOutputStream.write(utf8JsonString, 0, utf8JsonString.length);
LOG.debug("Events written to output stream.");
bufferedOutputStream.flush();
LOG.debug("Output stream flushed, waiting for the input stream response .....");
result = convertStreamToString(this.inStream);
} else {
LOG.error("Unable to write the events, Socket connection is not open");
}
} catch (IOException ex) {
LOG.error("Exception occured while sending content to meter", ex);
if (socket != null) {
LOG.debug("writing the events to output stream ..");
byte[] utf8JsonString = contentRpcJson.getBytes("UTF-8");
bufferedOutputStream.write(utf8JsonString, 0, utf8JsonString.length);
LOG.debug("Events written to output stream.");
bufferedOutputStream.flush();
LOG.debug("Output stream flushed, waiting for the input stream response .....");
result = convertStreamToString(this.inStream);
} else {
LOG.error("Unable to write the events, Socket connection is not open");
}

return result;
}

public synchronized boolean closeConnection() {

try {
LOG.debug("Closing the connection , total {} connections are open",connectionCount);
connectionCount--;
if (connectionCount <= 0) {
LOG.debug(" total {} connections are open, its time to close the stream and sockets",connectionCount);
if (bufferedOutputStream != null) {
bufferedOutputStream.close();
bufferedOutputStream = null;
inStream.close();
LOG.debug("Streams closed");
}
if (socket != null) {
socket.close();
socket = null;
LOG.debug("Socket closed");
}
public synchronized int closeConnection() throws IOException {

LOG.debug("Closing the connection , total {} connections are open", connectionCount);
connectionCount--;
if (connectionCount <= 0) {
LOG.debug(" total {} connections are open, its time to close the stream and sockets", connectionCount);
if (bufferedOutputStream != null) {
bufferedOutputStream.close();
bufferedOutputStream = null;
inStream.close();
LOG.debug("Streams closed");
}
if (socket != null) {
socket.close();
socket = null;
LOG.debug("Socket closed");
}
return true;
} catch (IOException e) {
LOG.error("Unable to close Socket Connection", e);
}

return false;
return connectionCount;
}

private String convertStreamToString(InputStream instream) {
Expand All @@ -127,7 +111,8 @@ private String convertStreamToString(InputStream instream) {
} catch (IOException e) {
e.printStackTrace();
}
// NOTE: conversion from byte to char here works for ISO8859-1/US-ASCII
// NOTE: conversion from byte to char here works for
// ISO8859-1/US-ASCII
// but fails for UTF etc.
type.append((char) ch);
switch ((char) ch) {
Expand All @@ -150,7 +135,7 @@ private String convertStreamToString(InputStream instream) {
}
}
String data = type.toString();
LOG.debug("Recieved Response --> {}",type);
LOG.debug("Recieved Response --> {}", type);
return data;
}

Expand Down