diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c3cd961 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea/* +*.iml +target/* + diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 5d486d1..cb7c0c2 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,3 +1,11 @@ +------------------------------------------------------------------------------ + qJava 2.1.3 [2015.05.28] @manishpatelUK +------------------------------------------------------------------------------ + + - Forked from https://github.com/exxeleron/qJava + - Add multithread support on read and write classes + - Add connection watcher + ------------------------------------------------------------------------------ qJava 2.1.2 [2015.03.23] ------------------------------------------------------------------------------ diff --git a/README.md b/README.md index 90bde25..51d7d40 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,5 @@ - +Forked from https://github.com/exxeleron/qJava -qJava 2.1 -========= +Added capabilities: -The q/kdb+ interface is implemented as a set of Java classes and provides: -- Simple to use API -- Support for synchronous and asynchronous queries -- Convenient asynchronous callbacks mechanism -- Support for kdb+ protocol and types: v3.0, v2.6, v<=2.5 -- Uncompression of the IPC data stream -- Compatible with Java 5.0+ - - -For more details please refer to the [documentation](doc/Readme.md) +* Thread safe QConnection (QSynchronizedConnection) \ No newline at end of file diff --git a/pom.xml b/pom.xml index 75c5d12..c1c6a88 100644 --- a/pom.xml +++ b/pom.xml @@ -3,368 +3,68 @@ 4.0.0 qJava - Library providing connectivity between Java and kdb+. + Library providing connectivity between Java and kdb+. Forked from Exxeleron - exxeleron + com.github.manishpatelUK qJava - 2014 + 1.1.1 - bundle - 2.1.3-SNAPSHOT + + scm:git:ssh://git@github.com:manishpatelUK/qJava.git + scm:git:git@github.com:manishpatelUK/qJava.git + https://github.com/manishpatelUK/qJava.git + qJava-1.0 + - - exxeleron - http://www.exxeleron.com - + + Github + https://github.com/manishpatelUK/qJava/issues + - ${project.organization.url} - - - Apache License Version 2.0 - http://www.apache.org/licenses/ - - Copyright (c) 2011-2014 Exxeleron GmbH - - - + - + - + - - - releases - releases - false - http://lib.devnet.de/libs-release-local - - - snapshots - snapshots - false - http://lib.devnet.de/libs-snapshot-local - - + + + + junit + junit + 4.11 + + org.apache.maven.plugins maven-compiler-plugin - true - - - org.apache.maven.plugins - maven-source-plugin - true - - - org.apache.maven.plugins - maven-enforcer-plugin - ${maven-enforcer-plugin-version} - - - enforce-plugin-versions - - enforce - - - - - Best Practice is to always define plugin versions! - true - true - true - clean,deploy,site - - org.apache.maven.plugins:maven-eclipse-plugin - org.apache.maven.plugins:maven-reactor-plugin - - org.apache.maven.plugins:maven-enforcer-plugin,org.apache.maven.plugins:maven-idea-plugin - - - - - - - - org.jacoco - jacoco-maven-plugin - - - - prepare-agent - - - - report - prepare-package - - report - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - - javadoc - - process-resources - - qJava ${project.version} API - - - - - - maven-assembly-plugin + 3.3 - - assembly.xml - - - - - dist-assembly - package - - single - - - - - - org.apache.felix - maven-bundle-plugin - ${maven-bundle-plugin-version} - true - - - - com.exxeleron.qjava - - + + 1.7 + 1.7 - - - - - org.apache.maven.plugins - maven-clean-plugin - ${maven-clean-plugin-version} - - - org.apache.maven.plugins - maven-compiler-plugin - ${maven-compiler-plugin-version} - - ${java.version} - ${java.version} - ${javac.optimize} - ${javac.verbose} - ${javac.debug} - ${file.encoding} - - - - org.apache.maven.plugins - maven-source-plugin - ${maven-source-plugin-version} - - - attach-sources - - jar - - - - - - org.apache.maven.plugins - maven-resources-plugin - ${maven-resources-plugin-version} - - - org.codehaus.mojo - build-helper-maven-plugin - ${maven-build-helper-maven-plugin} - - - parse-version - - parse-version - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - ${maven-javadoc-plugin-version} - - 512m - - - - package - - jar - - package - - - - - org.apache.maven.plugins - maven-scm-plugin - ${maven-scm-plugin-version} - - - org.apache.maven.plugins - maven-assembly-plugin - ${maven-assembly-plugin-version} - - - org.apache.maven.plugins - maven-release-plugin - ${maven-release-plugin-version} - - - org.apache.maven.plugins - maven-install-plugin - ${maven-install-plugin-version} - - - org.apache.maven.plugins - maven-deploy-plugin - ${maven-deploy-plugin-version} - - - org.apache.maven.plugins - maven-jar-plugin - ${maven-jar-plugin-version} - - - org.apache.maven.plugins - maven-dependency-plugin - ${maven-dependency-plugin-version} - - - org.apache.maven.plugins - maven-surefire-plugin - ${maven-surefire-plugin-version} - - - **/*IntegrationTest.java - **/*IT.java - - - **/*Test.java - **/Test*.java - - true - once - -Xmx1024m -XX:MaxPermSize=128M ${argLine} - true - - - - org.apache.maven.plugins - maven-archetype-plugin - ${maven-archetype-plugin-version} - - - org.apache.maven.plugins - maven-site-plugin - ${maven-site-plugin-version} - - - org.apache.maven.plugins - maven-reactor-plugin - ${maven-reactor-plugin-version} - - - org.apache.maven.plugins - maven-eclipse-plugin - ${maven-eclipse-plugin-version} - - true - true - - - - org.jacoco - jacoco-maven-plugin - ${maven-jacoco-maven-plugin-version} - - - - - UTF-8 - ISO-8859-1 - - 1.5 - false - true - false - - - 1.2 - 2.5 - 2.5.1 - 2.2 - 2.6 - 2.9 - 1.7 - 1.8 - 2.4 - 2.3.2 - 2.4 - 2.7 - 2.4 - 2.5.1 - 2.12.4 - 2.2 - 3.3 - 1.0 - 2.9 - 0.6.1.201212231917 - 2.4.0 - - - 4.11 - - - - - junit - junit - ${junit.version} - - diff --git a/src/main/java/com/exxeleron/qjava/QBasicConnection.java b/src/main/java/com/exxeleron/qjava/QBasicConnection.java index 9e56b33..1ed35e4 100644 --- a/src/main/java/com/exxeleron/qjava/QBasicConnection.java +++ b/src/main/java/com/exxeleron/qjava/QBasicConnection.java @@ -1,12 +1,12 @@ /** * Copyright (c) 2011-2014 Exxeleron GmbH - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -43,7 +43,7 @@ public class QBasicConnection implements QConnection { /** * Initializes a new {@link QBasicConnection} instance. - * + * * @param host * Host of remote q service * @param port @@ -65,7 +65,7 @@ public QBasicConnection(final String host, final int port, final String username /** * Initializes a new {@link QBasicConnection} instance with encoding set to "ISO-8859-1". - * + * * @param host * Host of remote q service * @param port @@ -218,7 +218,7 @@ public Object receive() throws IOException, QException { /** * Returns a String that represents the current {@link QBasicConnection}. - * + * * @return a String that represents the current {@link QBasicConnection} */ @Override diff --git a/src/main/java/com/exxeleron/qjava/QConnection.java b/src/main/java/com/exxeleron/qjava/QConnection.java index e0bb76f..9e243c9 100644 --- a/src/main/java/com/exxeleron/qjava/QConnection.java +++ b/src/main/java/com/exxeleron/qjava/QConnection.java @@ -1,12 +1,12 @@ /** * Copyright (c) 2011-2014 Exxeleron GmbH - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,14 +20,14 @@ /** * Interface for the q connector. - * + * * @author dev123 */ public interface QConnection { /** * Defines IPC message types. - * + * * @author dev123 */ public static enum MessageType { @@ -37,64 +37,64 @@ public static enum MessageType { /** * Factory method for creating enum based on IPC message byte. - * + * * @param i * byte indicating message type * @return {@link MessageType} matching the byte - * + * * @throws IllegalArgumentException */ - public static MessageType getMessageType( byte i ) { + public static MessageType getMessageType( final byte i ) { switch ( i ) { - case 0: - return ASYNC; - case 1: - return SYNC; - case 2: - return RESPONSE; - default: - throw new IllegalArgumentException("Unsupported message type."); + case 0: + return ASYNC; + case 1: + return SYNC; + case 2: + return RESPONSE; + default: + throw new IllegalArgumentException("Unsupported message type."); } } } /** * Initializes connection with the remote q service. - * + * * @throws IOException * @throws UnknownHostException * @throws QException */ - public abstract void open() throws IOException, QException; + public void open() throws IOException, QException; /** * Closes connection with the remote q service. - * + * * @throws IOException */ - public abstract void close() throws IOException; + public void close() throws IOException; /** * Reinitializes connection with the remote q service. - * + * * @throws IOException * @throws UnknownHostException * @throws QException */ - public abstract void reset() throws IOException, QException; + public void reset() throws IOException, QException; /** * Check whether connection with the remote q host has been established. Note that this function doesn't check * whether the connection is still active. One has to use a heartbeat mechanism in order to check whether the * connection is still active. - * + * * @return true if connection with remote host is established, false otherwise */ - public abstract boolean isConnected(); + public boolean isConnected(); /** * Executes a synchronous query against the remote q service. - * + * * @param query * Query to be executed * @param parameters @@ -103,11 +103,11 @@ public static MessageType getMessageType( byte i ) { * @throws QException * @throws IOException */ - public abstract Object sync( String query, Object... parameters ) throws QException, IOException; + public Object sync( String query, Object... parameters ) throws QException, IOException; /** * Executes an asynchronous query against the remote q service. - * + * * @param query * Query to be executed * @param parameters @@ -115,11 +115,11 @@ public static MessageType getMessageType( byte i ) { * @throws QException * @throws IOException */ - public abstract void async( String query, Object... parameters ) throws QException, IOException; + public void async( String query, Object... parameters ) throws QException, IOException; /** * Reads next message from the remote q service. - * + * * @param dataOnly * if true returns only data part of the message, if false retuns data and * message meta-information encapsulated in QMessage @@ -129,21 +129,21 @@ public static MessageType getMessageType( byte i ) { * @throws IOException * @throws QException */ - public abstract Object receive( boolean dataOnly, boolean raw ) throws IOException, QException; + public Object receive( boolean dataOnly, boolean raw ) throws IOException, QException; /** * Reads next message from the remote q service. - * + * * @return deserialized response from the remote q service * @throws IOException * @throws QException */ - public abstract Object receive() throws IOException, QException; + public Object receive() throws IOException, QException; /** * Executes a query against the remote q service. Result of the query has to be retrieved by calling a receive * method. - * + * * @param msgType * Indicates whether message should be synchronous or asynchronous * @param query @@ -154,48 +154,48 @@ public static MessageType getMessageType( byte i ) { * @throws QException * @throws IOException */ - public abstract int query( final MessageType msgType, final String query, final Object... parameters ) throws QException, IOException; + public int query( final MessageType msgType, final String query, final Object... parameters ) throws QException, IOException; /** * Returns the host of a remote q service. - * + * * @return host of remote a q service */ - public abstract String getHost(); + public String getHost(); /** * Returns the port of a remote q service. - * + * * @return post of remote a q service */ - public abstract int getPort(); + public int getPort(); /** * Returns username for remote authorization. - * + * * @return username for remote authorization */ - public abstract String getUsername(); + public String getUsername(); /** * Returns password for remote authorization. - * + * * @return password for remote authorization */ - public abstract String getPassword(); + public String getPassword(); /** * Returns encoding used for serialization/deserialization of string objects. - * + * * @return encoding used for serialization/deserialization of string objects */ - public abstract String getEncoding(); + public String getEncoding(); /** * Retrives version of the IPC protocol for an established connection. - * + * * @return protocol version */ - public abstract int getProtocolVersion(); + public int getProtocolVersion(); } \ No newline at end of file diff --git a/src/main/java/com/exxeleron/qjava/QRestorableConnection.java b/src/main/java/com/exxeleron/qjava/QRestorableConnection.java new file mode 100644 index 0000000..c7bbf39 --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/QRestorableConnection.java @@ -0,0 +1,163 @@ +/** + * Copyright (c) 2011-2014 Exxeleron GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exxeleron.qjava; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketException; + +/** + * {@inheritDoc} + */ +public class QRestorableConnection extends QBasicConnection { + + protected boolean attemptReconnect; + + private static final char[] TEST = new char[]{' '}; + + /** + * {@inheritDoc} + */ + public QRestorableConnection(final String host, final int port, final String username, final String password, final String encoding) { + super(host, port, username, password, encoding); + } + + /** + * Initializes a new {@link QBasicConnection} instance with encoding set to "ISO-8859-1". + * + * @param host + * Host of remote q service + * @param port + * Port of remote q service + * @param username + * Username for remote authorization + * @param password + * Password for remote authorization + */ + public QRestorableConnection(final String host, final int port, final String username, final String password) { + this(host, port, username, password, "ISO-8859-1"); + } + + /** + * {@inheritDoc} + */ + public void open() throws IOException, QException { + if ( !isConnected() ) { + if ( getHost() != null ) { + initSocket(); + initialize(); + + reader = new QReader(inputStream, getEncoding()); + writer = new QWriter(outputStream, getEncoding(), protocolVersion); + } else { + throw new QConnectionException("Host cannot be null"); + } + } + } + + private void initSocket() throws IOException { + connection = new Socket(getHost(), getPort()); + connection.setTcpNoDelay(true); + inputStream = new DataInputStream(connection.getInputStream()); + outputStream = connection.getOutputStream(); + } + + private void initialize() throws IOException, QException { + final String credentials = getPassword() != null ? String.format("%s:%s", getUsername(), getPassword()) : getUsername(); + byte[] request = (credentials + "\3\0").getBytes(getEncoding()); + final byte[] response = new byte[2]; + + outputStream.write(request); + if ( inputStream.read(response, 0, 1) != 1 ) { + close(); + initSocket(); + + request = (credentials + "\0").getBytes(getEncoding()); + outputStream.write(request); + if ( inputStream.read(response, 0, 1) != 1 ) { + throw new QConnectionException("Connection denied."); + } + } + + protocolVersion = Math.min(response[0], 3); + } + + /** + * {@inheritDoc} + */ + public int query( final QConnection.MessageType msgType, final String query, final Object... parameters ) throws QException, IOException { + if(attemptReconnect) { + testAndReopenSocket(); + } + + if (connection == null) { + throw new IOException("Connection is not established."); + } + + if ( parameters.length > 8 ) { + throw new QWriterException("Too many parameters."); + } + + if ( parameters.length == 0 ) // simple string query + { + return writer.write(query.toCharArray(), msgType); + } else { + final Object[] request = new Object[parameters.length + 1]; + request[0] = query.toCharArray(); + + int i = 1; + for ( final Object param : parameters ) { + request[i++] = param; + } + + return writer.write(request, msgType); + } + } + + protected void testAndReopenSocket() throws QException,IOException { + try { + writer.write(TEST, MessageType.SYNC); + reader.read(false); + } catch (SocketException ex) { + try{ + close(); + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + open(); + } + } + } + + /** + * {@inheritDoc} + */ + public boolean isAttemptReconnect() { + return attemptReconnect; + } + + /** + * {@inheritDoc} + */ + public void setAttemptReconnect(boolean reconnect) { + attemptReconnect = reconnect; + } + + +} \ No newline at end of file diff --git a/src/main/java/com/exxeleron/qjava/QRestorableSynchronizedConnection.java b/src/main/java/com/exxeleron/qjava/QRestorableSynchronizedConnection.java new file mode 100644 index 0000000..a171d68 --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/QRestorableSynchronizedConnection.java @@ -0,0 +1,202 @@ +package com.exxeleron.qjava; + +import java.io.DataInputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Thread safe connector class for interfacing with the kdb+ service. Provides methods for synchronous and asynchronous + * interaction. + */ +public class QRestorableSynchronizedConnection extends QRestorableConnection { + + private final Lock lock = new ReentrantLock(); + private AtomicBoolean connectedFlag = new AtomicBoolean(false); + + /** + * {@inheritDoc} + */ + public QRestorableSynchronizedConnection(final String host, final int port, final String username, final String password, final String encoding) { + super(host, port, username, password, encoding); + } + + /** + * {@inheritDoc} + */ + public QRestorableSynchronizedConnection(final String host, final int port, final String username, final String password) { + this(host, port, username, password, "ISO-8859-1"); + } + + /** + * {@inheritDoc} + */ + public synchronized void open() throws IOException, QException { + if ( !connectedFlag.get() ) { + if ( getHost() != null ) { + initSocket(); + initialize(); + + reader = new QReader(inputStream, getEncoding()); + writer = new QWriter(outputStream, getEncoding(), protocolVersion); + + connectedFlag.getAndSet(true); + } else { + throw new QConnectionException("Host cannot be null"); + } + } + } + + private synchronized void initSocket() throws IOException { + connection = new Socket(getHost(), getPort()); + connection.setTcpNoDelay(true); + inputStream = new DataInputStream(connection.getInputStream()); + outputStream = connection.getOutputStream(); + } + + private synchronized void initialize() throws IOException, QException { + final String credentials = getPassword() != null ? String.format("%s:%s", getUsername(), getPassword()) : getUsername(); + byte[] request = (credentials + "\3\0").getBytes(getEncoding()); + final byte[] response = new byte[2]; + + outputStream.write(request); + if ( inputStream.read(response, 0, 1) != 1 ) { + close(); + initSocket(); + + request = (credentials + "\0").getBytes(getEncoding()); + outputStream.write(request); + if ( inputStream.read(response, 0, 1) != 1 ) { + throw new QConnectionException("Connection denied."); + } + } + + protocolVersion = Math.min(response[0], 3); + } + + /** + * {@inheritDoc} + */ + public synchronized void close() throws IOException { + if ( connectedFlag.get() ) { + connection.close(); + connection = null; + connectedFlag.getAndSet(false); + } + } + + /** + * {@inheritDoc} + */ + public synchronized void reset() throws IOException, QException { + if ( !connectedFlag.get() ) { + connection.close(); + } + connection = null; + connectedFlag.getAndSet(false); + open(); + } + + /** + * {@inheritDoc} + */ + public boolean isConnected() { + return connectedFlag.get(); + } + + /** + * {@inheritDoc} + */ + public Object sync( final String query, final Object... parameters ) throws QException, IOException { + QMessage response; + lock.lock(); + try { + query(QConnection.MessageType.SYNC, query, parameters); + response = reader.read(false); + } catch (IOException | QException e) { + throw e; + } finally { + lock.unlock(); + } + + if ( response.getMessageType() == QConnection.MessageType.RESPONSE ) { + return response.getData(); + } else { + lock.lock(); + try { + writer.write(new QException("nyi: qJava expected response message"), + response.getMessageType() == QConnection.MessageType.ASYNC ? QConnection.MessageType.ASYNC : QConnection.MessageType.RESPONSE); + throw new QReaderException("Received message of type: " + response.getMessageType() + " where response was expected"); + } catch (QReaderException e) { + throw e; + } finally { + lock.unlock(); + } + } + } + + /** + * {@inheritDoc} + */ + public int query( final QConnection.MessageType msgType, final String query, final Object... parameters ) throws QException, IOException { + if(attemptReconnect) { + testAndReopenSocket(); + } + + if ( !connectedFlag.get() ) { + throw new IOException("Connection is not established."); + } + + if ( parameters.length > 8 ) { + throw new QWriterException("Too many parameters."); + } + + if ( parameters.length == 0 ) // simple string query + { + lock.lock(); + try { + return writer.write(query.toCharArray(), msgType); + } catch (IOException | QException e) { + throw e; + } finally { + lock.unlock(); + } + + } else { + final Object[] request = new Object[parameters.length + 1]; + request[0] = query.toCharArray(); + + int i = 1; + for ( final Object param : parameters ) { + request[i++] = param; + } + + lock.lock(); + try { + return writer.write(request, msgType); + } catch (IOException | QException e) { + throw e; + } finally { + lock.unlock(); + } + } + } + + /** + * {@inheritDoc} + */ + public synchronized Object receive( final boolean dataOnly, final boolean raw ) throws IOException, QException { + return dataOnly ? reader.read(raw).getData() : reader.read(raw); + } + + /** + * {@inheritDoc} + */ + public synchronized Object receive() throws IOException, QException { + return receive(true, false); + } + + +} diff --git a/src/main/java/com/exxeleron/qjava/extras/QConnectionPool.java b/src/main/java/com/exxeleron/qjava/extras/QConnectionPool.java new file mode 100644 index 0000000..b79c35a --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/extras/QConnectionPool.java @@ -0,0 +1,18 @@ +package com.exxeleron.qjava.extras; + +import com.exxeleron.qjava.QConnection; + +public interface QConnectionPool { + /** + * Gets the next available connection. + * + * Semantics of "next available" depends on subclass + * @return the next available connection + */ + public QConnection next(); + /** + * Will attempt to close off current connections and create a new pool of the required size + * @param size of pool + */ + public void reinitialisePool(int size); +} diff --git a/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolFactory.java b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolFactory.java new file mode 100644 index 0000000..a182509 --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolFactory.java @@ -0,0 +1,19 @@ +package com.exxeleron.qjava.extras; + +public class QConnectionPoolFactory { + public static QConnectionPool createPool(QConnectionPoolType poolType, + String host, + int port, + String userName, + String password, + String encoding, + int poolSize) { + try { + QConnectionPool pool = new QConnectionPoolImpl(poolType.getClazz(),host,port,userName,password,encoding); + pool.reinitialisePool(poolSize); + return pool; + } catch (NoSuchMethodException e) { + return null; + } + } +} diff --git a/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolImpl.java b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolImpl.java new file mode 100644 index 0000000..90b9d8b --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolImpl.java @@ -0,0 +1,106 @@ +package com.exxeleron.qjava.extras; + +import com.exxeleron.qjava.QConnection; +import com.exxeleron.qjava.QException; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Simple round-robin connection pool + */ +public class QConnectionPoolImpl implements QConnectionPool { + + private Constructor constructor = null; + + private final Class clazz; + private final String host; + private final int port; + private final String userName; + private final String password; + private final String encoding; + + private QConnection[] currentPool; + private int currentIndex = -1; + + public QConnectionPoolImpl(Class clazz, + String host, + int port, + String userName, + String password, + String encoding) throws NoSuchMethodException { + this.clazz = clazz; + this.host = host; + this.port = port; + this.userName = userName; + this.password = password; + this.encoding = encoding; + + cacheConstructor(clazz); + } + + private void cacheConstructor(Class clazz) throws NoSuchMethodException { + if(encoding != null && encoding.length() != 0) { + constructor = clazz.getConstructor(String.class, Integer.TYPE, String.class, String.class, String.class); + } + else { + constructor = clazz.getConstructor(String.class, Integer.TYPE, String.class, String.class); + } + } + + private QConnection createInstance() throws IllegalAccessException, InvocationTargetException, InstantiationException { + if(encoding != null) { + return constructor.newInstance(host,port,userName,password,encoding); + } + else { + return constructor.newInstance(host,port,userName,password); + } + } + + @Override + public synchronized QConnection next() throws IllegalStateException{ + if(currentPool == null || currentPool.length == 0) { + throw new IllegalStateException("Pool not initialised"); + } + + // could use % + if(currentIndex > currentPool.length-1) { + currentIndex = 0; + } + + return currentPool[currentIndex++]; + } + + @Override + public synchronized void reinitialisePool(int size) { + if (size > 1024) {//default handle limit + throw new IllegalArgumentException("handle limit >1024"); + } + + if (currentPool != null) { + for (int i = 0; i < currentPool.length; i++) { + try { + currentPool[i].close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + currentPool = new QConnection[size]; + for (int i = 0; i < currentPool.length; i++) { + + try { + currentPool[i] = createInstance(); + currentPool[i].open(); + } catch (IOException | QException | InvocationTargetException | IllegalAccessException | InstantiationException e) { + e.printStackTrace(); + currentPool = new QConnection[0]; + return; + } + + currentIndex = 0; + } + } +} diff --git a/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolType.java b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolType.java new file mode 100644 index 0000000..627dd1c --- /dev/null +++ b/src/main/java/com/exxeleron/qjava/extras/QConnectionPoolType.java @@ -0,0 +1,21 @@ +package com.exxeleron.qjava.extras; + +import com.exxeleron.qjava.QBasicConnection; +import com.exxeleron.qjava.QConnection; +import com.exxeleron.qjava.QRestorableConnection; +import com.exxeleron.qjava.QRestorableSynchronizedConnection; + +public enum QConnectionPoolType { + BASIC(QBasicConnection.class), + RESTORABLE(QRestorableConnection.class), + SYNCHRONIZED(QRestorableSynchronizedConnection.class); + + private final Class clazz; + private QConnectionPoolType(Class clazz) { + this.clazz = clazz; + } + + public Class getClazz() { + return clazz; + } +} diff --git a/src/test/java/com/exxeleron/qjava/QRestorableSynchronizedConnectionTest.java b/src/test/java/com/exxeleron/qjava/QRestorableSynchronizedConnectionTest.java new file mode 100644 index 0000000..c812edb --- /dev/null +++ b/src/test/java/com/exxeleron/qjava/QRestorableSynchronizedConnectionTest.java @@ -0,0 +1,200 @@ +package com.exxeleron.qjava; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class QRestorableSynchronizedConnectionTest { + + private QRestorableSynchronizedConnection connection; + + @Before + public void setUp() throws Exception { + // assume + // [localhost] q -p -5001 -s 10 + connection = new QRestorableSynchronizedConnection("localhost",5001,null,null); + connection.open(); + } + + @After + public void tearDown() throws Exception { + connection.close(); + } + + @Test + public void testOpenAndClose() throws Exception { + int threadCount = 100; + List threads = new ArrayList<>(); + for(int i = 0; i < threadCount; i++) { + threads.add(new AnnoyingOpenCloseThread(connection)); + } + for(Thread thread : threads) { + thread.start(); + } + + //only exit when all threads are done + while(true) { + int c = 0; + for(Thread thread : threads) { + if(thread.isAlive()) { + break; + } + else c++; + } + System.out.println("Threads finished: " + c); + if(c == threadCount) { + break; + } + } + } + + @Test + public void testReset() throws Exception { + int threadCount = 10; // can't have too many else will could hit limit of connections + List threads = new ArrayList<>(); + for(int i = 0; i < threadCount; i++) { + threads.add(new AnnoyingResetThread(connection)); + } + for(Thread thread : threads) { + thread.start(); + } + + //only exit when all threads are done + while(true) { + int c = 0; + for(Thread thread : threads) { + if(thread.isAlive()) { + break; + } + else c++; + } + System.out.println("Threads finished: " + c); + if(c == threadCount) { + break; + } + } + } + + @Test + public void testSync() throws Exception { + int threadCount = 100; + List threads = new ArrayList<>(); + for(int i = 0; i < threadCount; i++) { + threads.add(new AnnoyingQueryThread(connection, true)); + } + for(Thread thread : threads) { + thread.start(); + } + + //only exit when all threads are done + while(true) { + int c = 0; + for(Thread thread : threads) { + if(thread.isAlive()) { + break; + } + else c++; + } + System.out.println("Threads finished: " + c); + if(c == threadCount) { + break; + } + } + } + + @Test + public void testAsync() throws Exception { + int threadCount = 100; + List threads = new ArrayList<>(); + for(int i = 0; i < threadCount; i++) { + threads.add(new AnnoyingQueryThread(connection, false)); + } + for(Thread thread : threads) { + thread.start(); + } + + //only exit when all threads are done + while(true) { + int c = 0; + for(Thread thread : threads) { + if(thread.isAlive()) { + break; + } + else c++; + } + System.out.println("Threads finished: " + c); + if(c == threadCount) { + break; + } + } + } + + private class AnnoyingOpenCloseThread extends Thread { + private final QConnection qConnection; + + public AnnoyingOpenCloseThread(QConnection qConnection) { + this.qConnection = qConnection; + } + + public void run() { + for(int i = 0; i < 100; i++) { + try { + qConnection.open(); + Thread.sleep(10); + qConnection.close(); + } catch (IOException | QException | InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + private class AnnoyingResetThread extends Thread { + private final QConnection qConnection; + + public AnnoyingResetThread(QConnection qConnection) { + this.qConnection = qConnection; + } + + public void run() { + for(int i = 0; i < 100; i++) { + try { + qConnection.reset(); + Thread.sleep(10); + } catch (IOException | QException | InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + private class AnnoyingQueryThread extends Thread { + private final QConnection qConnection; + private final boolean sync; + + public AnnoyingQueryThread(QConnection qConnection, boolean sync) { + this.qConnection = qConnection; + this.sync = sync; + } + + public void run() { + for(int i = 0; i < 100; i++) { + try { + if(sync) { + qConnection.sync("{2 xexp 10?x} peach 100"); + } + else { + qConnection.async("{2 xexp 10?x} peach 100"); + } + Thread.sleep(10); + } catch (IOException | QException | InterruptedException e) { + e.printStackTrace(); + } + } + } + } +} \ No newline at end of file diff --git a/src/test/java/com/exxeleron/qjava/TestReconnect.java b/src/test/java/com/exxeleron/qjava/TestReconnect.java new file mode 100644 index 0000000..5064a62 --- /dev/null +++ b/src/test/java/com/exxeleron/qjava/TestReconnect.java @@ -0,0 +1,35 @@ +package com.exxeleron.qjava; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * 04/06/2015 + */ +public class TestReconnect { + private QRestorableConnection connection; + + @Before + public void setUp() throws Exception { + // assume + // [localhost] q -p -5001 -s 10 + connection = new QRestorableConnection("localhost",5001,null,null); + connection.setAttemptReconnect(true); + connection.open(); + } + + @Test + public void testReconnect() throws Exception{ + Assert.assertTrue(connection.isConnected()); + + Assert.assertTrue(2L == (Long)connection.sync("1+1")); + + try { + connection.sync("@[{hclose each key[.z.W]};::;{}]"); + } + catch (Exception ex){} + + Assert.assertTrue(2L == (Long) connection.sync("1+1")); + } +} diff --git a/src/test/java/com/exxeleron/qjava/extras/QConnectionPoolTest.java b/src/test/java/com/exxeleron/qjava/extras/QConnectionPoolTest.java new file mode 100644 index 0000000..14b4e72 --- /dev/null +++ b/src/test/java/com/exxeleron/qjava/extras/QConnectionPoolTest.java @@ -0,0 +1,20 @@ +package com.exxeleron.qjava.extras; + +import org.junit.Assert; +import org.junit.Test; + +public class QConnectionPoolTest { + + + @Test + public void testNext() throws Exception { + int size = 101; + QConnectionPool pool = QConnectionPoolFactory.createPool(QConnectionPoolType.SYNCHRONIZED, "localhost", 5001, null, null, null,size); + + for(int i = 0; i < size*2; i++) { + pool.next().sync("t:"+i); + } + + Assert.assertTrue(((Long)pool.next().sync("t")) == (size*2)-1); + } +} \ No newline at end of file