From 2252e0aad1a4ddfa842fcf92e8f589bc99ae7d54 Mon Sep 17 00:00:00 2001 From: Manish Patel Date: Thu, 28 May 2015 16:58:31 +0100 Subject: [PATCH 01/10] take out abstract keyword - redundant --- .../java/com/exxeleron/qjava/QConnection.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/exxeleron/qjava/QConnection.java b/src/main/java/com/exxeleron/qjava/QConnection.java index e0bb76f..cef9609 100644 --- a/src/main/java/com/exxeleron/qjava/QConnection.java +++ b/src/main/java/com/exxeleron/qjava/QConnection.java @@ -65,14 +65,14 @@ public static MessageType getMessageType( byte i ) { * @throws UnknownHostException * @throws QException */ - public abstract void open() throws IOException, QException; + public void open() throws IOException, QException; /** * Closes connection with the remote q service. * * @throws IOException */ - public abstract void close() throws IOException; + public void close() throws IOException; /** * Reinitializes connection with the remote q service. @@ -81,7 +81,7 @@ public static MessageType getMessageType( byte i ) { * @throws UnknownHostException * @throws QException */ - public abstract void reset() throws IOException, QException; + public void reset() throws IOException, QException; /** * Check whether connection with the remote q host has been established. Note that this function doesn't check @@ -90,7 +90,7 @@ public static MessageType getMessageType( byte i ) { * * @return true if connection with remote host is established, false otherwise */ - public abstract boolean isConnected(); + public boolean isConnected(); /** * Executes a synchronous query against the remote q service. @@ -103,7 +103,7 @@ public static MessageType getMessageType( byte i ) { * @throws QException * @throws IOException */ - public abstract Object sync( String query, Object... parameters ) throws QException, IOException; + public Object sync( String query, Object... parameters ) throws QException, IOException; /** * Executes an asynchronous query against the remote q service. @@ -115,7 +115,7 @@ public static MessageType getMessageType( byte i ) { * @throws QException * @throws IOException */ - public abstract void async( String query, Object... parameters ) throws QException, IOException; + public void async( String query, Object... parameters ) throws QException, IOException; /** * Reads next message from the remote q service. @@ -129,7 +129,7 @@ public static MessageType getMessageType( byte i ) { * @throws IOException * @throws QException */ - public abstract Object receive( boolean dataOnly, boolean raw ) throws IOException, QException; + public Object receive( boolean dataOnly, boolean raw ) throws IOException, QException; /** * Reads next message from the remote q service. @@ -138,7 +138,7 @@ public static MessageType getMessageType( byte i ) { * @throws IOException * @throws QException */ - public abstract Object receive() throws IOException, QException; + public Object receive() throws IOException, QException; /** * Executes a query against the remote q service. Result of the query has to be retrieved by calling a receive @@ -154,48 +154,48 @@ public static MessageType getMessageType( byte i ) { * @throws QException * @throws IOException */ - public abstract int query( final MessageType msgType, final String query, final Object... parameters ) throws QException, IOException; + public int query( final MessageType msgType, final String query, final Object... parameters ) throws QException, IOException; /** * Returns the host of a remote q service. * * @return host of remote a q service */ - public abstract String getHost(); + public String getHost(); /** * Returns the port of a remote q service. * * @return post of remote a q service */ - public abstract int getPort(); + public int getPort(); /** * Returns username for remote authorization. * * @return username for remote authorization */ - public abstract String getUsername(); + public String getUsername(); /** * Returns password for remote authorization. * * @return password for remote authorization */ - public abstract String getPassword(); + public String getPassword(); /** * Returns encoding used for serialization/deserialization of string objects. * * @return encoding used for serialization/deserialization of string objects */ - public abstract String getEncoding(); + public String getEncoding(); /** * Retrives version of the IPC protocol for an established connection. * * @return protocol version */ - public abstract int getProtocolVersion(); + public int getProtocolVersion(); } \ No newline at end of file From 139edd2a9268bfaf9e329e737464d1ac70be0e4d Mon Sep 17 00:00:00 2001 From: Manish Patel Date: Thu, 28 May 2015 17:14:27 +0100 Subject: [PATCH 02/10] ignore file --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c3cd961 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea/* +*.iml +target/* + From eb55b7418d3128d0909013846370747f2351ca74 Mon Sep 17 00:00:00 2001 From: Manish Patel Date: Thu, 28 May 2015 17:17:19 +0100 Subject: [PATCH 03/10] superfluous error handle --- src/main/java/com/exxeleron/qjava/QBasicConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/exxeleron/qjava/QBasicConnection.java b/src/main/java/com/exxeleron/qjava/QBasicConnection.java index 9e56b33..3d309dc 100644 --- a/src/main/java/com/exxeleron/qjava/QBasicConnection.java +++ b/src/main/java/com/exxeleron/qjava/QBasicConnection.java @@ -96,7 +96,7 @@ public void open() throws IOException, QException { } } - private void initSocket() throws UnknownHostException, IOException { + private void initSocket() throws IOException { connection = new Socket(host, port); connection.setTcpNoDelay(true); inputStream = new DataInputStream(connection.getInputStream()); From 2c3a011233b71917ffcd9f20010eb381df99ca92 Mon Sep 17 00:00:00 2001 From: Manish Patel Date: Thu, 28 May 2015 18:51:20 +0100 Subject: [PATCH 04/10] initial forked commit change md files for manishpatelUK change pom.xml for manishpatelUK Add thread safe QConnection Add tests for QSynchronizedConnection --- CHANGELOG.txt | 8 + README.md | 16 +- pom.xml | 358 +----------------- .../qjava/QSynchronizedConnection.java | 226 +++++++++++ .../qjava/QSynchronizedConnectionTest.java | 203 ++++++++++ 5 files changed, 460 insertions(+), 351 deletions(-) create mode 100644 src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java create mode 100644 src/test/java/com/exxeleron/qjava/QSynchronizedConnectionTest.java diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 5d486d1..cb7c0c2 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,3 +1,11 @@ +------------------------------------------------------------------------------ + qJava 2.1.3 [2015.05.28] @manishpatelUK +------------------------------------------------------------------------------ + + - Forked from https://github.com/exxeleron/qJava + - Add multithread support on read and write classes + - Add connection watcher + ------------------------------------------------------------------------------ qJava 2.1.2 [2015.03.23] ------------------------------------------------------------------------------ diff --git a/README.md b/README.md index 90bde25..51d7d40 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,5 @@ - +Forked from https://github.com/exxeleron/qJava -qJava 2.1 -========= +Added capabilities: -The q/kdb+ interface is implemented as a set of Java classes and provides: -- Simple to use API -- Support for synchronous and asynchronous queries -- Convenient asynchronous callbacks mechanism -- Support for kdb+ protocol and types: v3.0, v2.6, v<=2.5 -- Uncompression of the IPC data stream -- Compatible with Java 5.0+ - - -For more details please refer to the [documentation](doc/Readme.md) +* Thread safe QConnection (QSynchronizedConnection) \ No newline at end of file diff --git a/pom.xml b/pom.xml index 75c5d12..00b0f3c 100644 --- a/pom.xml +++ b/pom.xml @@ -3,367 +3,49 @@ 4.0.0 qJava - Library providing connectivity between Java and kdb+. + Library providing connectivity between Java and kdb+. Forked from Exxeleron - exxeleron + com.github.manishpatelUK qJava - 2014 + 1.0 - bundle - 2.1.3-SNAPSHOT - - - exxeleron - http://www.exxeleron.com - - - ${project.organization.url} + + scm:git:ssh://git@github.com:manishpatelUK/qJava.git + scm:git:git@github.com:manishpatelUK/qJava.git + https://github.com/manishpatelUK/qJava.git + qJava-1.0 + - - - Apache License Version 2.0 - http://www.apache.org/licenses/ - - Copyright (c) 2011-2014 Exxeleron GmbH - - - + + Github + https://github.com/manishpatelUK/qJava/issues + + - + - + - 1.2 - 2.5 - 2.5.1 - 2.2 - 2.6 - 2.9 - 1.7 - 1.8 - 2.4 - 2.3.2 - 2.4 - 2.7 - 2.4 - 2.5.1 - 2.12.4 - 2.2 - 3.3 - 1.0 - 2.9 - 0.6.1.201212231917 - 2.4.0 + --> - - 4.11 - junit junit - ${junit.version} + 4.11 diff --git a/src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java b/src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java new file mode 100644 index 0000000..a17445d --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java @@ -0,0 +1,226 @@ +package com.exxeleron.qjava; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Thread safe connector class for interfacing with the kdb+ service. Provides methods for synchronous and asynchronous + * interaction. + */ +public class QSynchronizedConnection extends QBasicConnection { + + private final Lock lock = new ReentrantLock(); + private AtomicBoolean connectedFlag = new AtomicBoolean(false); + + /** + * Initializes a new {@link QSynchronizedConnection} instance. + * + * @param host + * Host of remote q service + * @param port + * Port of remote q service + * @param username + * Username for remote authorization + * @param password + * Password for remote authorization + * @param encoding + * Encoding used for serialization/deserialization of string objects + */ + public QSynchronizedConnection(final String host, final int port, final String username, final String password, final String encoding) { + super(host, port, username, password, encoding); + } + + /** + * Initializes a new {@link QSynchronizedConnection} instance with encoding set to "ISO-8859-1". + * + * @param host + * Host of remote q service + * @param port + * Port of remote q service + * @param username + * Username for remote authorization + * @param password + * Password for remote authorization + */ + public QSynchronizedConnection(final String host, final int port, final String username, final String password) { + this(host, port, username, password, "ISO-8859-1"); + } + + /** + * {@inheritDoc} + */ + public synchronized void open() throws IOException, QException { + if ( !connectedFlag.get() ) { + if ( getHost() != null ) { + initSocket(); + initialize(); + + reader = new QReader(inputStream, getEncoding()); + writer = new QWriter(outputStream, getEncoding(), protocolVersion); + + connectedFlag.getAndSet(true); + } else { + throw new QConnectionException("Host cannot be null"); + } + } + } + + private synchronized void initSocket() throws IOException { + connection = new Socket(getHost(), getPort()); + connection.setTcpNoDelay(true); + inputStream = new DataInputStream(connection.getInputStream()); + outputStream = connection.getOutputStream(); + } + + private synchronized void initialize() throws IOException, QException { + final String credentials = getPassword() != null ? String.format("%s:%s", getUsername(), getPassword()) : getUsername(); + byte[] request = (credentials + "\3\0").getBytes(getEncoding()); + final byte[] response = new byte[2]; + + outputStream.write(request); + if ( inputStream.read(response, 0, 1) != 1 ) { + close(); + initSocket(); + + request = (credentials + "\0").getBytes(getEncoding()); + outputStream.write(request); + if ( inputStream.read(response, 0, 1) != 1 ) { + throw new QConnectionException("Connection denied."); + } + } + + protocolVersion = Math.min(response[0], 3); + } + + /** + * {@inheritDoc} + */ + public synchronized void close() throws IOException { + if ( connectedFlag.get() ) { + connection.close(); + connection = null; + connectedFlag.getAndSet(false); + } + } + + /** + * {@inheritDoc} + */ + public synchronized void reset() throws IOException, QException { + if ( !connectedFlag.get() ) { + connection.close(); + } + connection = null; + connectedFlag.getAndSet(false); + open(); + } + + /** + * {@inheritDoc} + */ + public boolean isConnected() { + return connectedFlag.get(); + } + + /** + * {@inheritDoc} + */ + public Object sync( final String query, final Object... parameters ) throws QException, IOException { + QMessage response; + lock.lock(); + try { + query(QConnection.MessageType.SYNC, query, parameters); + response = reader.read(false); + } catch (IOException | QException e) { + throw e; + } finally { + lock.unlock(); + } + + if ( response.getMessageType() == QConnection.MessageType.RESPONSE ) { + return response.getData(); + } else { + lock.lock(); + try { + writer.write(new QException("nyi: qJava expected response message"), + response.getMessageType() == QConnection.MessageType.ASYNC ? QConnection.MessageType.ASYNC : QConnection.MessageType.RESPONSE); + throw new QReaderException("Received message of type: " + response.getMessageType() + " where response was expected"); + } catch (QReaderException e) { + throw e; + } finally { + lock.unlock(); + } + } + } + + /** + * {@inheritDoc} + */ + public void async( final String query, final Object... parameters ) throws QException, IOException { + query(QConnection.MessageType.ASYNC, query, parameters); + } + + /** + * {@inheritDoc} + */ + public int query( final QConnection.MessageType msgType, final String query, final Object... parameters ) throws QException, IOException { + if ( !connectedFlag.get() ) { + throw new IOException("Connection is not established."); + } + + if ( parameters.length > 8 ) { + throw new QWriterException("Too many parameters."); + } + + if ( parameters.length == 0 ) // simple string query + { + lock.lock(); + try { + return writer.write(query.toCharArray(), msgType); + } catch (IOException | QException e) { + throw e; + } finally { + lock.unlock(); + } + + } else { + final Object[] request = new Object[parameters.length + 1]; + request[0] = query.toCharArray(); + + int i = 1; + for ( final Object param : parameters ) { + request[i++] = param; + } + + lock.lock(); + try { + return writer.write(request, msgType); + } catch (IOException | QException e) { + throw e; + } finally { + lock.unlock(); + } + } + } + + /** + * {@inheritDoc} + */ + public synchronized Object receive( final boolean dataOnly, final boolean raw ) throws IOException, QException { + return dataOnly ? reader.read(raw).getData() : reader.read(raw); + } + + /** + * {@inheritDoc} + */ + public synchronized Object receive() throws IOException, QException { + return receive(true, false); + } + + +} diff --git a/src/test/java/com/exxeleron/qjava/QSynchronizedConnectionTest.java b/src/test/java/com/exxeleron/qjava/QSynchronizedConnectionTest.java new file mode 100644 index 0000000..6f6ec4d --- /dev/null +++ b/src/test/java/com/exxeleron/qjava/QSynchronizedConnectionTest.java @@ -0,0 +1,203 @@ +package com.exxeleron.qjava; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +public class QSynchronizedConnectionTest { + + private QSynchronizedConnection connection; + + @Before + public void setUp() throws Exception { + // assume + // [localhost] q -p -5001 -s 10 + connection = new QSynchronizedConnection("localhost",5001,null,null); + connection.open(); + } + + @After + public void tearDown() throws Exception { + connection.close(); + } + + @Test + public void testOpenAndClose() throws Exception { + int threadCount = 100; + List threads = new ArrayList<>(); + for(int i = 0; i < threadCount; i++) { + threads.add(new AnnoyingOpenCloseThread(connection)); + } + for(Thread thread : threads) { + thread.start(); + } + + //only exit when all threads are done + while(true) { + int c = 0; + for(Thread thread : threads) { + if(thread.isAlive()) { + break; + } + else c++; + } + System.out.println("Threads finished: " + c); + if(c == threadCount) { + break; + } + } + } + + @Test + public void testReset() throws Exception { + int threadCount = 10; // can't have too many else will could hit limit of connections + List threads = new ArrayList<>(); + for(int i = 0; i < threadCount; i++) { + threads.add(new AnnoyingResetThread(connection)); + } + for(Thread thread : threads) { + thread.start(); + } + + //only exit when all threads are done + while(true) { + int c = 0; + for(Thread thread : threads) { + if(thread.isAlive()) { + break; + } + else c++; + } + System.out.println("Threads finished: " + c); + if(c == threadCount) { + break; + } + } + } + + @Test + public void testSync() throws Exception { + int threadCount = 100; + List threads = new ArrayList<>(); + for(int i = 0; i < threadCount; i++) { + threads.add(new AnnoyingQueryThread(connection, true)); + } + for(Thread thread : threads) { + thread.start(); + } + + //only exit when all threads are done + while(true) { + int c = 0; + for(Thread thread : threads) { + if(thread.isAlive()) { + break; + } + else c++; + } + System.out.println("Threads finished: " + c); + if(c == threadCount) { + break; + } + } + } + + @Test + public void testAsync() throws Exception { + int threadCount = 100; + List threads = new ArrayList<>(); + for(int i = 0; i < threadCount; i++) { + threads.add(new AnnoyingQueryThread(connection, false)); + } + for(Thread thread : threads) { + thread.start(); + } + + //only exit when all threads are done + while(true) { + int c = 0; + for(Thread thread : threads) { + if(thread.isAlive()) { + break; + } + else c++; + } + System.out.println("Threads finished: " + c); + if(c == threadCount) { + break; + } + } + } + + private class AnnoyingOpenCloseThread extends Thread { + private final QConnection qConnection; + + public AnnoyingOpenCloseThread(QConnection qConnection) { + this.qConnection = qConnection; + } + + public void run() { + for(int i = 0; i < 100; i++) { + try { + qConnection.open(); + Thread.sleep(10); + qConnection.close(); + } catch (IOException | QException | InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + private class AnnoyingResetThread extends Thread { + private final QConnection qConnection; + + public AnnoyingResetThread(QConnection qConnection) { + this.qConnection = qConnection; + } + + public void run() { + for(int i = 0; i < 100; i++) { + try { + qConnection.reset(); + Thread.sleep(10); + } catch (IOException | QException | InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + private class AnnoyingQueryThread extends Thread { + private final QConnection qConnection; + private final boolean sync; + + public AnnoyingQueryThread(QConnection qConnection, boolean sync) { + this.qConnection = qConnection; + this.sync = sync; + } + + public void run() { + for(int i = 0; i < 100; i++) { + try { + if(sync) { + qConnection.sync("{2 xexp 10?x} peach 100"); + } + else { + qConnection.async("{2 xexp 10?x} peach 100"); + } + Thread.sleep(10); + } catch (IOException | QException | InterruptedException e) { + e.printStackTrace(); + } + } + } + } +} \ No newline at end of file From 68154520decf45ad23490d0cd0aee11b7a3a7446 Mon Sep 17 00:00:00 2001 From: Manish Patel Date: Thu, 28 May 2015 19:00:07 +0100 Subject: [PATCH 05/10] target 1.7 --- pom.xml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pom.xml b/pom.xml index 00b0f3c..ee45ecd 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,8 @@ https://github.com/manishpatelUK/qJava/issues + + + 1.7 + 1.7 + + + + + + From 48c9b0682635c4553a6aa79cf9171f2c0f8b9dea Mon Sep 17 00:00:00 2001 From: Manish Patel Date: Thu, 4 Jun 2015 21:45:57 +0100 Subject: [PATCH 06/10] reconnection logic --- pom.xml | 2 +- .../com/exxeleron/qjava/QBasicConnection.java | 44 ++++++++++++++++++- .../java/com/exxeleron/qjava/QConnection.java | 11 +++++ .../qjava/QSynchronizedConnection.java | 11 ++--- .../com/exxeleron/qjava/TestReconnect.java | 40 +++++++++++++++++ 5 files changed, 98 insertions(+), 10 deletions(-) create mode 100644 src/test/java/com/exxeleron/qjava/TestReconnect.java diff --git a/pom.xml b/pom.xml index ee45ecd..c943974 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ com.github.manishpatelUK qJava - 1.0 + 1.1 scm:git:ssh://git@github.com:manishpatelUK/qJava.git diff --git a/src/main/java/com/exxeleron/qjava/QBasicConnection.java b/src/main/java/com/exxeleron/qjava/QBasicConnection.java index 3d309dc..0b5b292 100644 --- a/src/main/java/com/exxeleron/qjava/QBasicConnection.java +++ b/src/main/java/com/exxeleron/qjava/QBasicConnection.java @@ -19,7 +19,7 @@ import java.io.IOException; import java.io.OutputStream; import java.net.Socket; -import java.net.UnknownHostException; +import java.net.SocketException; /** * Base connector class for interfacing with the kdb+ service. Provides methods for synchronous and asynchronous @@ -41,6 +41,8 @@ public class QBasicConnection implements QConnection { protected QReader reader; protected QWriter writer; + protected boolean attemptReconnect; + /** * Initializes a new {@link QBasicConnection} instance. * @@ -174,11 +176,17 @@ public void async( final String query, final Object... parameters ) throws QExce query(QConnection.MessageType.ASYNC, query, parameters); } + + /** * {@inheritDoc} */ public int query( final QConnection.MessageType msgType, final String query, final Object... parameters ) throws QException, IOException { - if ( connection == null ) { + if(attemptReconnect) { + testAndReopenSocket(); + } + + if (connection == null) { throw new IOException("Connection is not established."); } @@ -202,6 +210,23 @@ public int query( final QConnection.MessageType msgType, final String query, fin } } + protected void testAndReopenSocket() throws QException,IOException { + try { + writer.write(" ".toCharArray(), MessageType.SYNC); + reader.read(false); + } catch (SocketException ex) { + System.out.println("Attempt reconnect"); + try{ + close(); + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + open(); + } + } + } + /** * {@inheritDoc} */ @@ -268,4 +293,19 @@ public int getProtocolVersion() { return protocolVersion; } + /** + * {@inheritDoc} + */ + public boolean isAttemptReconnect() { + return attemptReconnect; + } + + /** + * {@inheritDoc} + */ + public void setAttemptReconnect(boolean reconnect) { + attemptReconnect = reconnect; + } + + } \ No newline at end of file diff --git a/src/main/java/com/exxeleron/qjava/QConnection.java b/src/main/java/com/exxeleron/qjava/QConnection.java index cef9609..b2f611f 100644 --- a/src/main/java/com/exxeleron/qjava/QConnection.java +++ b/src/main/java/com/exxeleron/qjava/QConnection.java @@ -198,4 +198,15 @@ public static MessageType getMessageType( byte i ) { */ public int getProtocolVersion(); + /** + * Returns whether this connection will attempt reconnection if database socket is broken + * @return + */ + public boolean isAttemptReconnect(); + + /** + * Instruct this connection to reconnect to a kdb database at least once if the socket is broken + * @param reconnect + */ + public void setAttemptReconnect(boolean reconnect); } \ No newline at end of file diff --git a/src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java b/src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java index a17445d..36d0ae0 100644 --- a/src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java +++ b/src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java @@ -158,17 +158,14 @@ public Object sync( final String query, final Object... parameters ) throws QExc } } - /** - * {@inheritDoc} - */ - public void async( final String query, final Object... parameters ) throws QException, IOException { - query(QConnection.MessageType.ASYNC, query, parameters); - } - /** * {@inheritDoc} */ public int query( final QConnection.MessageType msgType, final String query, final Object... parameters ) throws QException, IOException { + if(attemptReconnect) { + testAndReopenSocket(); + } + if ( !connectedFlag.get() ) { throw new IOException("Connection is not established."); } diff --git a/src/test/java/com/exxeleron/qjava/TestReconnect.java b/src/test/java/com/exxeleron/qjava/TestReconnect.java new file mode 100644 index 0000000..cc18e45 --- /dev/null +++ b/src/test/java/com/exxeleron/qjava/TestReconnect.java @@ -0,0 +1,40 @@ +package com.exxeleron.qjava; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.net.SocketException; + +import static org.junit.Assert.*; + +/** + * 04/06/2015 + */ +public class TestReconnect { + private QBasicConnection connection; + + @Before + public void setUp() throws Exception { + // assume + // [localhost] q -p -5001 -s 10 + connection = new QBasicConnection("localhost",5001,null,null); + connection.setAttemptReconnect(true); + connection.open(); + } + + @Test + public void testReconnect() throws Exception{ + Assert.assertTrue(connection.isConnected()); + + Assert.assertTrue(2L == (Long)connection.sync("1+1")); + + try { + connection.sync("@[{hclose each key[.z.W]};::;{}]"); + } + catch (Exception ex){} + + Assert.assertTrue(2L == (Long) connection.sync("1+1")); + } +} From 47f44be914be7c5a2e170b9be65bb9faf0562c18 Mon Sep 17 00:00:00 2001 From: Manish Patel Date: Wed, 22 Jul 2015 15:50:53 +0100 Subject: [PATCH 07/10] refactor last changes QBasicConnection reverted QRestorableConnection now has reconnection logic QSynchronizedConnection inherits from there Take out out.prints --- .../com/exxeleron/qjava/QBasicConnection.java | 58 +------ .../java/com/exxeleron/qjava/QConnection.java | 73 ++++---- .../qjava/QRestorableConnection.java | 163 ++++++++++++++++++ ...=> QRestorableSynchronizedConnection.java} | 31 +--- ...RestorableSynchronizedConnectionTest.java} | 9 +- .../com/exxeleron/qjava/TestReconnect.java | 9 +- 6 files changed, 213 insertions(+), 130 deletions(-) create mode 100644 src/main/java/com/exxeleron/qjava/QRestorableConnection.java rename src/main/java/com/exxeleron/qjava/{QSynchronizedConnection.java => QRestorableSynchronizedConnection.java} (83%) rename src/test/java/com/exxeleron/qjava/{QSynchronizedConnectionTest.java => QRestorableSynchronizedConnectionTest.java} (95%) diff --git a/src/main/java/com/exxeleron/qjava/QBasicConnection.java b/src/main/java/com/exxeleron/qjava/QBasicConnection.java index 0b5b292..1ed35e4 100644 --- a/src/main/java/com/exxeleron/qjava/QBasicConnection.java +++ b/src/main/java/com/exxeleron/qjava/QBasicConnection.java @@ -1,12 +1,12 @@ /** * Copyright (c) 2011-2014 Exxeleron GmbH - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,7 +19,7 @@ import java.io.IOException; import java.io.OutputStream; import java.net.Socket; -import java.net.SocketException; +import java.net.UnknownHostException; /** * Base connector class for interfacing with the kdb+ service. Provides methods for synchronous and asynchronous @@ -41,11 +41,9 @@ public class QBasicConnection implements QConnection { protected QReader reader; protected QWriter writer; - protected boolean attemptReconnect; - /** * Initializes a new {@link QBasicConnection} instance. - * + * * @param host * Host of remote q service * @param port @@ -67,7 +65,7 @@ public QBasicConnection(final String host, final int port, final String username /** * Initializes a new {@link QBasicConnection} instance with encoding set to "ISO-8859-1". - * + * * @param host * Host of remote q service * @param port @@ -98,7 +96,7 @@ public void open() throws IOException, QException { } } - private void initSocket() throws IOException { + private void initSocket() throws UnknownHostException, IOException { connection = new Socket(host, port); connection.setTcpNoDelay(true); inputStream = new DataInputStream(connection.getInputStream()); @@ -176,17 +174,11 @@ public void async( final String query, final Object... parameters ) throws QExce query(QConnection.MessageType.ASYNC, query, parameters); } - - /** * {@inheritDoc} */ public int query( final QConnection.MessageType msgType, final String query, final Object... parameters ) throws QException, IOException { - if(attemptReconnect) { - testAndReopenSocket(); - } - - if (connection == null) { + if ( connection == null ) { throw new IOException("Connection is not established."); } @@ -210,23 +202,6 @@ public int query( final QConnection.MessageType msgType, final String query, fin } } - protected void testAndReopenSocket() throws QException,IOException { - try { - writer.write(" ".toCharArray(), MessageType.SYNC); - reader.read(false); - } catch (SocketException ex) { - System.out.println("Attempt reconnect"); - try{ - close(); - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - open(); - } - } - } - /** * {@inheritDoc} */ @@ -243,7 +218,7 @@ public Object receive() throws IOException, QException { /** * Returns a String that represents the current {@link QBasicConnection}. - * + * * @return a String that represents the current {@link QBasicConnection} */ @Override @@ -293,19 +268,4 @@ public int getProtocolVersion() { return protocolVersion; } - /** - * {@inheritDoc} - */ - public boolean isAttemptReconnect() { - return attemptReconnect; - } - - /** - * {@inheritDoc} - */ - public void setAttemptReconnect(boolean reconnect) { - attemptReconnect = reconnect; - } - - } \ No newline at end of file diff --git a/src/main/java/com/exxeleron/qjava/QConnection.java b/src/main/java/com/exxeleron/qjava/QConnection.java index b2f611f..9e243c9 100644 --- a/src/main/java/com/exxeleron/qjava/QConnection.java +++ b/src/main/java/com/exxeleron/qjava/QConnection.java @@ -1,12 +1,12 @@ /** * Copyright (c) 2011-2014 Exxeleron GmbH - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,14 +20,14 @@ /** * Interface for the q connector. - * + * * @author dev123 */ public interface QConnection { /** * Defines IPC message types. - * + * * @author dev123 */ public static enum MessageType { @@ -37,30 +37,30 @@ public static enum MessageType { /** * Factory method for creating enum based on IPC message byte. - * + * * @param i * byte indicating message type * @return {@link MessageType} matching the byte - * + * * @throws IllegalArgumentException */ - public static MessageType getMessageType( byte i ) { + public static MessageType getMessageType( final byte i ) { switch ( i ) { - case 0: - return ASYNC; - case 1: - return SYNC; - case 2: - return RESPONSE; - default: - throw new IllegalArgumentException("Unsupported message type."); + case 0: + return ASYNC; + case 1: + return SYNC; + case 2: + return RESPONSE; + default: + throw new IllegalArgumentException("Unsupported message type."); } } } /** * Initializes connection with the remote q service. - * + * * @throws IOException * @throws UnknownHostException * @throws QException @@ -69,14 +69,14 @@ public static MessageType getMessageType( byte i ) { /** * Closes connection with the remote q service. - * + * * @throws IOException */ public void close() throws IOException; /** * Reinitializes connection with the remote q service. - * + * * @throws IOException * @throws UnknownHostException * @throws QException @@ -87,14 +87,14 @@ public static MessageType getMessageType( byte i ) { * Check whether connection with the remote q host has been established. Note that this function doesn't check * whether the connection is still active. One has to use a heartbeat mechanism in order to check whether the * connection is still active. - * + * * @return true if connection with remote host is established, false otherwise */ public boolean isConnected(); /** * Executes a synchronous query against the remote q service. - * + * * @param query * Query to be executed * @param parameters @@ -107,7 +107,7 @@ public static MessageType getMessageType( byte i ) { /** * Executes an asynchronous query against the remote q service. - * + * * @param query * Query to be executed * @param parameters @@ -119,7 +119,7 @@ public static MessageType getMessageType( byte i ) { /** * Reads next message from the remote q service. - * + * * @param dataOnly * if true returns only data part of the message, if false retuns data and * message meta-information encapsulated in QMessage @@ -133,7 +133,7 @@ public static MessageType getMessageType( byte i ) { /** * Reads next message from the remote q service. - * + * * @return deserialized response from the remote q service * @throws IOException * @throws QException @@ -143,7 +143,7 @@ public static MessageType getMessageType( byte i ) { /** * Executes a query against the remote q service. Result of the query has to be retrieved by calling a receive * method. - * + * * @param msgType * Indicates whether message should be synchronous or asynchronous * @param query @@ -158,55 +158,44 @@ public static MessageType getMessageType( byte i ) { /** * Returns the host of a remote q service. - * + * * @return host of remote a q service */ public String getHost(); /** * Returns the port of a remote q service. - * + * * @return post of remote a q service */ public int getPort(); /** * Returns username for remote authorization. - * + * * @return username for remote authorization */ public String getUsername(); /** * Returns password for remote authorization. - * + * * @return password for remote authorization */ public String getPassword(); /** * Returns encoding used for serialization/deserialization of string objects. - * + * * @return encoding used for serialization/deserialization of string objects */ public String getEncoding(); /** * Retrives version of the IPC protocol for an established connection. - * + * * @return protocol version */ public int getProtocolVersion(); - /** - * Returns whether this connection will attempt reconnection if database socket is broken - * @return - */ - public boolean isAttemptReconnect(); - - /** - * Instruct this connection to reconnect to a kdb database at least once if the socket is broken - * @param reconnect - */ - public void setAttemptReconnect(boolean reconnect); } \ No newline at end of file diff --git a/src/main/java/com/exxeleron/qjava/QRestorableConnection.java b/src/main/java/com/exxeleron/qjava/QRestorableConnection.java new file mode 100644 index 0000000..c7bbf39 --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/QRestorableConnection.java @@ -0,0 +1,163 @@ +/** + * Copyright (c) 2011-2014 Exxeleron GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exxeleron.qjava; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketException; + +/** + * {@inheritDoc} + */ +public class QRestorableConnection extends QBasicConnection { + + protected boolean attemptReconnect; + + private static final char[] TEST = new char[]{' '}; + + /** + * {@inheritDoc} + */ + public QRestorableConnection(final String host, final int port, final String username, final String password, final String encoding) { + super(host, port, username, password, encoding); + } + + /** + * Initializes a new {@link QBasicConnection} instance with encoding set to "ISO-8859-1". + * + * @param host + * Host of remote q service + * @param port + * Port of remote q service + * @param username + * Username for remote authorization + * @param password + * Password for remote authorization + */ + public QRestorableConnection(final String host, final int port, final String username, final String password) { + this(host, port, username, password, "ISO-8859-1"); + } + + /** + * {@inheritDoc} + */ + public void open() throws IOException, QException { + if ( !isConnected() ) { + if ( getHost() != null ) { + initSocket(); + initialize(); + + reader = new QReader(inputStream, getEncoding()); + writer = new QWriter(outputStream, getEncoding(), protocolVersion); + } else { + throw new QConnectionException("Host cannot be null"); + } + } + } + + private void initSocket() throws IOException { + connection = new Socket(getHost(), getPort()); + connection.setTcpNoDelay(true); + inputStream = new DataInputStream(connection.getInputStream()); + outputStream = connection.getOutputStream(); + } + + private void initialize() throws IOException, QException { + final String credentials = getPassword() != null ? String.format("%s:%s", getUsername(), getPassword()) : getUsername(); + byte[] request = (credentials + "\3\0").getBytes(getEncoding()); + final byte[] response = new byte[2]; + + outputStream.write(request); + if ( inputStream.read(response, 0, 1) != 1 ) { + close(); + initSocket(); + + request = (credentials + "\0").getBytes(getEncoding()); + outputStream.write(request); + if ( inputStream.read(response, 0, 1) != 1 ) { + throw new QConnectionException("Connection denied."); + } + } + + protocolVersion = Math.min(response[0], 3); + } + + /** + * {@inheritDoc} + */ + public int query( final QConnection.MessageType msgType, final String query, final Object... parameters ) throws QException, IOException { + if(attemptReconnect) { + testAndReopenSocket(); + } + + if (connection == null) { + throw new IOException("Connection is not established."); + } + + if ( parameters.length > 8 ) { + throw new QWriterException("Too many parameters."); + } + + if ( parameters.length == 0 ) // simple string query + { + return writer.write(query.toCharArray(), msgType); + } else { + final Object[] request = new Object[parameters.length + 1]; + request[0] = query.toCharArray(); + + int i = 1; + for ( final Object param : parameters ) { + request[i++] = param; + } + + return writer.write(request, msgType); + } + } + + protected void testAndReopenSocket() throws QException,IOException { + try { + writer.write(TEST, MessageType.SYNC); + reader.read(false); + } catch (SocketException ex) { + try{ + close(); + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + open(); + } + } + } + + /** + * {@inheritDoc} + */ + public boolean isAttemptReconnect() { + return attemptReconnect; + } + + /** + * {@inheritDoc} + */ + public void setAttemptReconnect(boolean reconnect) { + attemptReconnect = reconnect; + } + + +} \ No newline at end of file diff --git a/src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java b/src/main/java/com/exxeleron/qjava/QRestorableSynchronizedConnection.java similarity index 83% rename from src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java rename to src/main/java/com/exxeleron/qjava/QRestorableSynchronizedConnection.java index 36d0ae0..a171d68 100644 --- a/src/main/java/com/exxeleron/qjava/QSynchronizedConnection.java +++ b/src/main/java/com/exxeleron/qjava/QRestorableSynchronizedConnection.java @@ -2,7 +2,6 @@ import java.io.DataInputStream; import java.io.IOException; -import java.io.OutputStream; import java.net.Socket; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; @@ -12,42 +11,22 @@ * Thread safe connector class for interfacing with the kdb+ service. Provides methods for synchronous and asynchronous * interaction. */ -public class QSynchronizedConnection extends QBasicConnection { +public class QRestorableSynchronizedConnection extends QRestorableConnection { private final Lock lock = new ReentrantLock(); private AtomicBoolean connectedFlag = new AtomicBoolean(false); /** - * Initializes a new {@link QSynchronizedConnection} instance. - * - * @param host - * Host of remote q service - * @param port - * Port of remote q service - * @param username - * Username for remote authorization - * @param password - * Password for remote authorization - * @param encoding - * Encoding used for serialization/deserialization of string objects + * {@inheritDoc} */ - public QSynchronizedConnection(final String host, final int port, final String username, final String password, final String encoding) { + public QRestorableSynchronizedConnection(final String host, final int port, final String username, final String password, final String encoding) { super(host, port, username, password, encoding); } /** - * Initializes a new {@link QSynchronizedConnection} instance with encoding set to "ISO-8859-1". - * - * @param host - * Host of remote q service - * @param port - * Port of remote q service - * @param username - * Username for remote authorization - * @param password - * Password for remote authorization + * {@inheritDoc} */ - public QSynchronizedConnection(final String host, final int port, final String username, final String password) { + public QRestorableSynchronizedConnection(final String host, final int port, final String username, final String password) { this(host, port, username, password, "ISO-8859-1"); } diff --git a/src/test/java/com/exxeleron/qjava/QSynchronizedConnectionTest.java b/src/test/java/com/exxeleron/qjava/QRestorableSynchronizedConnectionTest.java similarity index 95% rename from src/test/java/com/exxeleron/qjava/QSynchronizedConnectionTest.java rename to src/test/java/com/exxeleron/qjava/QRestorableSynchronizedConnectionTest.java index 6f6ec4d..c812edb 100644 --- a/src/test/java/com/exxeleron/qjava/QSynchronizedConnectionTest.java +++ b/src/test/java/com/exxeleron/qjava/QRestorableSynchronizedConnectionTest.java @@ -5,21 +5,18 @@ import org.junit.Test; import java.io.IOException; -import java.lang.annotation.Annotation; import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.*; +public class QRestorableSynchronizedConnectionTest { -public class QSynchronizedConnectionTest { - - private QSynchronizedConnection connection; + private QRestorableSynchronizedConnection connection; @Before public void setUp() throws Exception { // assume // [localhost] q -p -5001 -s 10 - connection = new QSynchronizedConnection("localhost",5001,null,null); + connection = new QRestorableSynchronizedConnection("localhost",5001,null,null); connection.open(); } diff --git a/src/test/java/com/exxeleron/qjava/TestReconnect.java b/src/test/java/com/exxeleron/qjava/TestReconnect.java index cc18e45..5064a62 100644 --- a/src/test/java/com/exxeleron/qjava/TestReconnect.java +++ b/src/test/java/com/exxeleron/qjava/TestReconnect.java @@ -1,25 +1,20 @@ package com.exxeleron.qjava; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.net.SocketException; - -import static org.junit.Assert.*; - /** * 04/06/2015 */ public class TestReconnect { - private QBasicConnection connection; + private QRestorableConnection connection; @Before public void setUp() throws Exception { // assume // [localhost] q -p -5001 -s 10 - connection = new QBasicConnection("localhost",5001,null,null); + connection = new QRestorableConnection("localhost",5001,null,null); connection.setAttemptReconnect(true); connection.open(); } From d7b52c1a583550720c10f7f9f48ee7788f28b44d Mon Sep 17 00:00:00 2001 From: Manish Patel Date: Wed, 22 Jul 2015 15:53:38 +0100 Subject: [PATCH 08/10] bump version number --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c943974..5780234 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ com.github.manishpatelUK qJava - 1.1 + 1.1.1 scm:git:ssh://git@github.com:manishpatelUK/qJava.git From ba6ed8b1ab84238f81507bf77246b64895a771e4 Mon Sep 17 00:00:00 2001 From: Manish Patel Date: Wed, 22 Jul 2015 18:43:45 +0100 Subject: [PATCH 09/10] connection pool basic functionality --- .../qjava/extras/QConnectionPool.java | 18 +++ .../qjava/extras/QConnectionPoolFactory.java | 19 ++++ .../qjava/extras/QConnectionPoolImpl.java | 106 ++++++++++++++++++ .../qjava/extras/QConnectionPoolType.java | 21 ++++ .../qjava/extras/QConnectionPoolTest.java | 20 ++++ 5 files changed, 184 insertions(+) create mode 100644 src/main/java/com/exxeleron/qjava/extras/QConnectionPool.java create mode 100644 src/main/java/com/exxeleron/qjava/extras/QConnectionPoolFactory.java create mode 100644 src/main/java/com/exxeleron/qjava/extras/QConnectionPoolImpl.java create mode 100644 src/main/java/com/exxeleron/qjava/extras/QConnectionPoolType.java create mode 100644 src/test/java/com/exxeleron/qjava/extras/QConnectionPoolTest.java diff --git a/src/main/java/com/exxeleron/qjava/extras/QConnectionPool.java b/src/main/java/com/exxeleron/qjava/extras/QConnectionPool.java new file mode 100644 index 0000000..b79c35a --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/extras/QConnectionPool.java @@ -0,0 +1,18 @@ +package com.exxeleron.qjava.extras; + +import com.exxeleron.qjava.QConnection; + +public interface QConnectionPool { + /** + * Gets the next available connection. + * + * Semantics of "next available" depends on subclass + * @return the next available connection + */ + public QConnection next(); + /** + * Will attempt to close off current connections and create a new pool of the required size + * @param size of pool + */ + public void reinitialisePool(int size); +} diff --git a/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolFactory.java b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolFactory.java new file mode 100644 index 0000000..a182509 --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolFactory.java @@ -0,0 +1,19 @@ +package com.exxeleron.qjava.extras; + +public class QConnectionPoolFactory { + public static QConnectionPool createPool(QConnectionPoolType poolType, + String host, + int port, + String userName, + String password, + String encoding, + int poolSize) { + try { + QConnectionPool pool = new QConnectionPoolImpl(poolType.getClazz(),host,port,userName,password,encoding); + pool.reinitialisePool(poolSize); + return pool; + } catch (NoSuchMethodException e) { + return null; + } + } +} diff --git a/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolImpl.java b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolImpl.java new file mode 100644 index 0000000..90b9d8b --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolImpl.java @@ -0,0 +1,106 @@ +package com.exxeleron.qjava.extras; + +import com.exxeleron.qjava.QConnection; +import com.exxeleron.qjava.QException; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Simple round-robin connection pool + */ +public class QConnectionPoolImpl implements QConnectionPool { + + private Constructor constructor = null; + + private final Class clazz; + private final String host; + private final int port; + private final String userName; + private final String password; + private final String encoding; + + private QConnection[] currentPool; + private int currentIndex = -1; + + public QConnectionPoolImpl(Class clazz, + String host, + int port, + String userName, + String password, + String encoding) throws NoSuchMethodException { + this.clazz = clazz; + this.host = host; + this.port = port; + this.userName = userName; + this.password = password; + this.encoding = encoding; + + cacheConstructor(clazz); + } + + private void cacheConstructor(Class clazz) throws NoSuchMethodException { + if(encoding != null && encoding.length() != 0) { + constructor = clazz.getConstructor(String.class, Integer.TYPE, String.class, String.class, String.class); + } + else { + constructor = clazz.getConstructor(String.class, Integer.TYPE, String.class, String.class); + } + } + + private QConnection createInstance() throws IllegalAccessException, InvocationTargetException, InstantiationException { + if(encoding != null) { + return constructor.newInstance(host,port,userName,password,encoding); + } + else { + return constructor.newInstance(host,port,userName,password); + } + } + + @Override + public synchronized QConnection next() throws IllegalStateException{ + if(currentPool == null || currentPool.length == 0) { + throw new IllegalStateException("Pool not initialised"); + } + + // could use % + if(currentIndex > currentPool.length-1) { + currentIndex = 0; + } + + return currentPool[currentIndex++]; + } + + @Override + public synchronized void reinitialisePool(int size) { + if (size > 1024) {//default handle limit + throw new IllegalArgumentException("handle limit >1024"); + } + + if (currentPool != null) { + for (int i = 0; i < currentPool.length; i++) { + try { + currentPool[i].close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + currentPool = new QConnection[size]; + for (int i = 0; i < currentPool.length; i++) { + + try { + currentPool[i] = createInstance(); + currentPool[i].open(); + } catch (IOException | QException | InvocationTargetException | IllegalAccessException | InstantiationException e) { + e.printStackTrace(); + currentPool = new QConnection[0]; + return; + } + + currentIndex = 0; + } + } +} diff --git a/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolType.java b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolType.java new file mode 100644 index 0000000..627dd1c --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolType.java @@ -0,0 +1,21 @@ +package com.exxeleron.qjava.extras; + +import com.exxeleron.qjava.QBasicConnection; +import com.exxeleron.qjava.QConnection; +import com.exxeleron.qjava.QRestorableConnection; +import com.exxeleron.qjava.QRestorableSynchronizedConnection; + +public enum QConnectionPoolType { + BASIC(QBasicConnection.class), + RESTORABLE(QRestorableConnection.class), + SYNCHRONIZED(QRestorableSynchronizedConnection.class); + + private final Class clazz; + private QConnectionPoolType(Class clazz) { + this.clazz = clazz; + } + + public Class getClazz() { + return clazz; + } +} diff --git a/src/test/java/com/exxeleron/qjava/extras/QConnectionPoolTest.java b/src/test/java/com/exxeleron/qjava/extras/QConnectionPoolTest.java new file mode 100644 index 0000000..14b4e72 --- /dev/null +++ b/src/test/java/com/exxeleron/qjava/extras/QConnectionPoolTest.java @@ -0,0 +1,20 @@ +package com.exxeleron.qjava.extras; + +import org.junit.Assert; +import org.junit.Test; + +public class QConnectionPoolTest { + + + @Test + public void testNext() throws Exception { + int size = 101; + QConnectionPool pool = QConnectionPoolFactory.createPool(QConnectionPoolType.SYNCHRONIZED, "localhost", 5001, null, null, null,size); + + for(int i = 0; i < size*2; i++) { + pool.next().sync("t:"+i); + } + + Assert.assertTrue(((Long)pool.next().sync("t")) == (size*2)-1); + } +} \ No newline at end of file From 9c7f6f21356aaa188ea606464107e5ae56387a5c Mon Sep 17 00:00:00 2001 From: Manish Patel Date: Wed, 22 Jul 2015 18:46:28 +0100 Subject: [PATCH 10/10] plugin version --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 5780234..c1c6a88 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,7 @@ org.apache.maven.plugins maven-compiler-plugin + 3.3 1.7