diff --git a/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/pom.xml b/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/pom.xml
index 18fc2c15717..bc18d81741c 100644
--- a/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/pom.xml
+++ b/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/pom.xml
@@ -45,7 +45,6 @@
nimbus-jose-jwt
-
org.apache.activemq
activemq-client
diff --git a/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/BrokerSettingException.java b/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/BrokerSettingException.java
deleted file mode 100644
index 441a0591cd7..00000000000
--- a/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/BrokerSettingException.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package io.subutai.core.environment.metadata.impl;
-
-
-public class BrokerSettingException extends Exception
-{
-
- public BrokerSettingException( final String message )
- {
- super( message );
- }
-
-
- public BrokerSettingException( final Exception e )
- {
- super( e );
- }
-}
diff --git a/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/EnvironmentMetadataManagerImpl.java b/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/EnvironmentMetadataManagerImpl.java
index dd6794f15c2..4eb3b4a2bbb 100644
--- a/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/EnvironmentMetadataManagerImpl.java
+++ b/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/EnvironmentMetadataManagerImpl.java
@@ -1,16 +1,16 @@
package io.subutai.core.environment.metadata.impl;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import io.subutai.common.command.CommandException;
import io.subutai.common.command.RequestBuilder;
@@ -24,7 +24,11 @@
import io.subutai.core.identity.api.exception.TokenCreateException;
import io.subutai.core.peer.api.PeerManager;
import io.subutai.hub.share.Utils;
-import io.subutai.hub.share.dto.BrokerSettingsDto;
+import io.subutai.hub.share.broker.BrokerConnectionFactory;
+import io.subutai.hub.share.broker.BrokerSettings;
+import io.subutai.hub.share.broker.BrokerTransport;
+import io.subutai.hub.share.broker.InvalidTransportException;
+import io.subutai.hub.share.broker.TransportNotFoundException;
import io.subutai.hub.share.dto.environment.EnvironmentInfoDto;
import io.subutai.hub.share.event.Event;
import io.subutai.hub.share.json.JsonUtil;
@@ -37,10 +41,12 @@ public class EnvironmentMetadataManagerImpl implements EnvironmentMetadataManage
{
private static final Logger LOG = LoggerFactory.getLogger( EnvironmentMetadataManagerImpl.class );
private final IdentityManager identityManager;
+ private Cache brokerTransportRequests;
private PeerManager peerManager;
private EnvironmentManager environmentManager;
- private BrokerSettings brokerSettings;
private HubManager hubManager;
+ private BrokerConnectionFactory brokerConnectionFactory = new BrokerConnectionFactory( 1 );
+ private BrokerSettings brokerSettings = new BrokerSettings();
public EnvironmentMetadataManagerImpl( PeerManager peerManager, EnvironmentManager environmentManager,
@@ -50,6 +56,7 @@ public EnvironmentMetadataManagerImpl( PeerManager peerManager, EnvironmentManag
this.environmentManager = environmentManager;
this.identityManager = identityManager;
this.hubManager = hubManager;
+ this.brokerTransportRequests = CacheBuilder.newBuilder().expireAfterWrite( 1, TimeUnit.MINUTES ).build();
}
@@ -106,53 +113,32 @@ public void pushEvent( final Event event )
LOG.debug( "Event received: {} {}", event, jsonEvent );
LOG.debug( "OS: {}", event.getCustomMetaByKey( "OS" ) );
String destination = "events." + event.getOrigin().getId();
-
- checkBrokerSettings( false );
- // TODO: 4/12/18 need to implement connections pool something like below; while creating connection
- // every time with random URI
- // ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
- // "vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false");
- // JmsPoolConnectionFactory cf = new JmsPoolConnectionFactory();
- // cf.setConnectionFactory(amq);
- // cf.setMaxConnections(3);
- thread( new EventProducer( this.brokerSettings.getBroker(), destination, jsonEvent ), true );
+ ConnectionFactory cf = getConnectionFactory( BrokerTransport.Type.TCP );
+ thread( new EventProducer( cf, destination, jsonEvent ), true );
}
- catch ( JsonProcessingException | URISyntaxException | BrokerSettingException e )
+ catch ( JsonProcessingException | TransportNotFoundException | InvalidTransportException e )
{
LOG.error( e.getMessage(), e );
}
}
- private void checkBrokerSettings( boolean retrieve ) throws BrokerSettingException
+ private ConnectionFactory getConnectionFactory( BrokerTransport.Type type )
+ throws TransportNotFoundException, InvalidTransportException
{
- if ( !( brokerSettings == null || retrieve ) )
+ // trying to get the previous connection factory
+ ConnectionFactory cf = brokerConnectionFactory.getConnectionFactory( type );
+ if ( cf == null )
{
- return;
- }
-
- try
- {
- final BrokerSettingsDto response = hubManager.getBrokers();
- if ( response == null )
- {
- throw new BrokerSettingException( "Could not retrieve broker settings." );
- }
- List list = new ArrayList<>();
- for ( String s : response.getBrokers() )
+ // seems it is first time to obtain connection factory
+ BrokerTransport transport = requestBrokerTransport( type );
+ if ( transport == null )
{
- list.add( new URI( s ) );
+ throw new TransportNotFoundException( "Transport not found for type: " + type );
}
- if ( list.size() == 0 )
- {
- throw new BrokerSettingException( "Broker URI list is empty." );
- }
- this.brokerSettings = new BrokerSettings( list );
- }
- catch ( URISyntaxException e )
- {
- throw new BrokerSettingException( e );
+ cf = brokerConnectionFactory.getConnectionFactory( transport );
}
+ return cf;
}
@@ -171,21 +157,15 @@ private void placeTokenIntoContainer( ContainerHost containerHost, String token
}
- private class BrokerSettings
+ private BrokerTransport requestBrokerTransport( final BrokerTransport.Type type )
{
- List uriList;
-
-
- public BrokerSettings( final List uriList )
- {
- this.uriList = uriList;
- }
-
+ BrokerTransport transport = this.brokerTransportRequests.getIfPresent( type );
- public URI getBroker()
+ if ( transport == null )
{
- return uriList.get( new Random().nextInt( uriList.size() ) );
+ transport = hubManager.getBrokerTransport( type );
}
+ return transport;
}
}
diff --git a/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/EventProducer.java b/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/EventProducer.java
index 106123b8a78..e84524e887f 100644
--- a/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/EventProducer.java
+++ b/management/server/core/environment-metadata-manager/environment-metadata-manager-impl/src/main/java/io/subutai/core/environment/metadata/impl/EventProducer.java
@@ -1,10 +1,8 @@
package io.subutai.core.environment.metadata.impl;
-import java.net.URI;
-import java.net.URISyntaxException;
-
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -15,8 +13,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.activemq.ActiveMQConnectionFactory;
-
public class EventProducer implements Runnable
{
@@ -24,17 +20,14 @@ public class EventProducer implements Runnable
private final String destination;
private String message;
- private URI uri;
- private ActiveMQConnectionFactory connectionFactory;
+ private ConnectionFactory connectionFactory;
- public EventProducer( final URI uri, final String destination, final String message ) throws URISyntaxException
+ public EventProducer( final ConnectionFactory connectionFactory, final String destination, final String message )
{
- this.uri = uri;
this.destination = destination;
this.message = message;
- // Create a ConnectionFactory
- connectionFactory = new ActiveMQConnectionFactory( uri );
+ this.connectionFactory = connectionFactory;
}
diff --git a/management/server/core/hub-manager/hub-manager-api/src/main/java/io/subutai/core/hubmanager/api/HubManager.java b/management/server/core/hub-manager/hub-manager-api/src/main/java/io/subutai/core/hubmanager/api/HubManager.java
index bdbb70ce19e..788025791f9 100644
--- a/management/server/core/hub-manager/hub-manager-api/src/main/java/io/subutai/core/hubmanager/api/HubManager.java
+++ b/management/server/core/hub-manager/hub-manager-api/src/main/java/io/subutai/core/hubmanager/api/HubManager.java
@@ -5,7 +5,8 @@
import io.subutai.core.hubmanager.api.exception.HubManagerException;
import io.subutai.core.hubmanager.api.model.Config;
-import io.subutai.hub.share.dto.BrokerSettingsDto;
+import io.subutai.hub.share.broker.BrokerSettings;
+import io.subutai.hub.share.broker.BrokerTransport;
public interface HubManager
@@ -52,5 +53,5 @@ public interface HubManager
void notifyHubThatPeerIsOffline();
- BrokerSettingsDto getBrokers();
+ BrokerTransport getBrokerTransport(BrokerTransport.Type type);
}
diff --git a/management/server/core/hub-manager/hub-manager-impl/src/main/java/io/subutai/core/hubmanager/impl/HubManagerImpl.java b/management/server/core/hub-manager/hub-manager-impl/src/main/java/io/subutai/core/hubmanager/impl/HubManagerImpl.java
index c75fd77e4bb..6a487d0b392 100644
--- a/management/server/core/hub-manager/hub-manager-impl/src/main/java/io/subutai/core/hubmanager/impl/HubManagerImpl.java
+++ b/management/server/core/hub-manager/hub-manager-impl/src/main/java/io/subutai/core/hubmanager/impl/HubManagerImpl.java
@@ -75,8 +75,8 @@
import io.subutai.core.peer.api.PeerManager;
import io.subutai.core.security.api.SecurityManager;
import io.subutai.core.systemmanager.api.SystemManager;
+import io.subutai.hub.share.broker.BrokerTransport;
import io.subutai.hub.share.common.HubEventListener;
-import io.subutai.hub.share.dto.BrokerSettingsDto;
import io.subutai.hub.share.dto.PeerDto;
import io.subutai.hub.share.dto.PeerProductDataDto;
import io.subutai.hub.share.dto.UserDto;
@@ -811,12 +811,13 @@ public void onRhDisconnected( final ResourceHostInfo resourceHostInfo )
@Override
- public BrokerSettingsDto getBrokers()
+ public BrokerTransport getBrokerTransport( BrokerTransport.Type type )
{
final HubRestClient client = new HubRestClient( configManager );
- final RestResult response =
- client.get( "/rest/v1/brokers/" + localPeer.getId(), BrokerSettingsDto.class );
+ final RestResult response =
+ client.get( String.format( "/rest/v1/broker/transport/%s/%s", type, localPeer.getId() ),
+ BrokerTransport.class );
return response.getEntity();
}
diff --git a/management/server/subutai-hub-share/pom.xml b/management/server/subutai-hub-share/pom.xml
index 3038c88925e..5afd03e31b3 100644
--- a/management/server/subutai-hub-share/pom.xml
+++ b/management/server/subutai-hub-share/pom.xml
@@ -39,9 +39,9 @@
- commons-io
- commons-io
- 2.4
+ commons-io
+ commons-io
+ 2.4
@@ -62,6 +62,18 @@
2.6.1
+
+ org.apache.activemq
+ activemq-client
+ 5.15.3
+
+
+
+ org.apache.activemq
+ activemq-jms-pool
+ 5.15.3
+
+
org.json
json
@@ -88,6 +100,12 @@
test
+
+ org.hamcrest
+ hamcrest-all
+ test
+
+
\ No newline at end of file
diff --git a/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/BrokerConnectionFactory.java b/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/BrokerConnectionFactory.java
new file mode 100644
index 00000000000..2ae1ee5af81
--- /dev/null
+++ b/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/BrokerConnectionFactory.java
@@ -0,0 +1,57 @@
+package io.subutai.hub.share.broker;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.jms.pool.PooledConnectionFactory;
+
+import com.google.common.base.Preconditions;
+
+
+public class BrokerConnectionFactory
+{
+
+ private final int poolSize;
+ private Map pool = new HashMap<>();
+
+
+ public BrokerConnectionFactory( int poolSize )
+ {
+ Preconditions.checkArgument( poolSize > 0 );
+ Preconditions.checkArgument( poolSize < 5 );
+
+ this.poolSize = poolSize;
+ }
+
+
+ public ConnectionFactory getConnectionFactory( BrokerTransport.Type type )
+ {
+ return pool.get( type );
+ }
+
+
+ public ConnectionFactory getConnectionFactory( BrokerTransport transport ) throws InvalidTransportException
+ {
+ Preconditions.checkNotNull( transport );
+
+ BrokerTransport.Type type = BrokerTransport.getType( transport.getUri() );
+
+ PooledConnectionFactory cf = this.pool.get( type );
+
+ if ( cf != null )
+ {
+ return cf;
+ }
+ ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory( transport.getUri() );
+ cf = new PooledConnectionFactory();
+ cf.setConnectionFactory( amq );
+ cf.setMaxConnections( this.poolSize );
+ this.pool.put( type, cf );
+
+ return cf;
+ }
+}
diff --git a/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/BrokerSettings.java b/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/BrokerSettings.java
new file mode 100644
index 00000000000..9b746155d55
--- /dev/null
+++ b/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/BrokerSettings.java
@@ -0,0 +1,107 @@
+package io.subutai.hub.share.broker;
+
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+
+@JsonAutoDetect( fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE,
+ setterVisibility = JsonAutoDetect.Visibility.NONE )
+public class BrokerSettings
+{
+ @JsonProperty( value = "name" )
+ private String name;
+
+ @JsonProperty( value = "transports" )
+ private Map transports;
+
+
+ protected BrokerSettings( final String name, final Map transports )
+ {
+ this.name = name;
+ this.transports = transports;
+ }
+
+
+ public BrokerSettings()
+ {
+ }
+
+
+ public String getName()
+ {
+ return name;
+ }
+
+
+ public void addTransport( final BrokerTransport transport ) throws InvalidTransportException
+ {
+ Preconditions.checkNotNull( transport );
+ this.transports.put( getType( transport ), transport );
+ }
+
+
+ private BrokerTransport.Type getType( final BrokerTransport transport ) throws InvalidTransportException
+ {
+ return BrokerTransport.Type.valueOf( transport.getUri() );
+ }
+
+
+ protected Collection getTransports()
+ {
+ return this.transports.values();
+ }
+
+ //
+ // public BrokerTransport getRandomTransportsByType( BrokerTransport.Type type ) throws
+ // TransportNotFoundException
+ // {
+ //
+ // final List transports = getTransportsByType( type );
+ //
+ // if ( transports.size() == 0 )
+ // {
+ // throw new TransportNotFoundException();
+ // }
+ // return transports.get( new Random().nextInt( transports.size() ) );
+ // }
+
+
+ public BrokerTransport getTransportByType( final BrokerTransport.Type type )
+ {
+ return this.transports.get( type );
+ }
+
+
+ @Override
+ public boolean equals( final Object o )
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ final BrokerSettings that = ( BrokerSettings ) o;
+
+ return new EqualsBuilder().append( name, that.name ).append( transports, that.transports ).isEquals();
+ }
+
+
+ @Override
+ public int hashCode()
+ {
+ return new HashCodeBuilder( 17, 37 ).append( name ).append( transports ).toHashCode();
+ }
+}
diff --git a/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/BrokerTransport.java b/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/BrokerTransport.java
new file mode 100644
index 00000000000..5af89465988
--- /dev/null
+++ b/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/BrokerTransport.java
@@ -0,0 +1,98 @@
+package io.subutai.hub.share.broker;
+
+
+import java.net.URI;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+
+public class BrokerTransport
+{
+ public enum Type
+ {
+ TCP( "tcp" ), WS( "ws" ), MQTT( "mqtt" );
+ private final String scheme;
+
+
+ Type( final String scheme )
+ {
+ this.scheme = scheme;
+ }
+
+
+ public String getScheme()
+ {
+ return scheme;
+ }
+
+
+ public static Type valueOf( URI uri ) throws InvalidTransportException
+ {
+ try
+ {
+ return Type.valueOf( uri.getScheme().toUpperCase() );
+ }
+ catch ( IllegalArgumentException e )
+ {
+ throw new InvalidTransportException( uri.toASCIIString() );
+ }
+ }
+ }
+
+
+ @JsonProperty( value = "uri" )
+ private URI uri;
+
+
+ public BrokerTransport( final URI uri ) throws InvalidTransportException
+ {
+ Preconditions.checkNotNull( uri );
+
+ getType( uri );
+
+ this.uri = uri;
+ }
+
+
+ private BrokerTransport()
+ {
+ }
+
+
+ public URI getUri()
+ {
+ return uri;
+ }
+
+
+ public static Type getType( URI uri ) throws InvalidTransportException
+ {
+ return Type.valueOf( uri );
+ }
+
+
+ @Override
+ public boolean equals( final Object o )
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+ final BrokerTransport that = ( BrokerTransport ) o;
+ return Objects.equals( uri, that.uri );
+ }
+
+
+ @Override
+ public int hashCode()
+ {
+
+ return Objects.hash( uri );
+ }
+}
diff --git a/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/InvalidTransportException.java b/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/InvalidTransportException.java
new file mode 100644
index 00000000000..af8f4950246
--- /dev/null
+++ b/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/InvalidTransportException.java
@@ -0,0 +1,10 @@
+package io.subutai.hub.share.broker;
+
+
+public class InvalidTransportException extends Exception
+{
+ public InvalidTransportException( final String message )
+ {
+ super( message );
+ }
+}
diff --git a/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/TransportNotFoundException.java b/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/TransportNotFoundException.java
new file mode 100644
index 00000000000..551636802b3
--- /dev/null
+++ b/management/server/subutai-hub-share/src/main/java/io/subutai/hub/share/broker/TransportNotFoundException.java
@@ -0,0 +1,10 @@
+package io.subutai.hub.share.broker;
+
+
+public class TransportNotFoundException extends Exception
+{
+ public TransportNotFoundException( final String message )
+ {
+ super( message );
+ }
+}
diff --git a/management/server/subutai-hub-share/src/test/java/io/subutai/hub/share/broker/BrokerSettingsTest.java b/management/server/subutai-hub-share/src/test/java/io/subutai/hub/share/broker/BrokerSettingsTest.java
new file mode 100644
index 00000000000..0add522af6b
--- /dev/null
+++ b/management/server/subutai-hub-share/src/test/java/io/subutai/hub/share/broker/BrokerSettingsTest.java
@@ -0,0 +1,54 @@
+package io.subutai.hub.share.broker;
+
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class BrokerSettingsTest
+{
+ public BrokerTransport tcpTransport;
+ public BrokerTransport wsTransport;
+
+ BrokerSettings brokerSettings;
+
+ private ObjectMapper objectMapper;
+
+
+ @Before
+ public void setup() throws InvalidTransportException
+ {
+ this.objectMapper = new ObjectMapper();
+
+ this.tcpTransport = new BrokerTransport( URI.create( "tcp://192.168.1.1" ) );
+ this.wsTransport = new BrokerTransport( URI.create( "ws://192.168.1.2" ) );
+
+ final Map transports = new HashMap<>();
+
+ transports.put( tcpTransport.getType( tcpTransport.getUri() ), tcpTransport );
+ transports.put( wsTransport.getType( wsTransport.getUri() ), wsTransport );
+
+ this.brokerSettings = new BrokerSettings( "broker1", transports );
+ }
+
+
+ @Test
+ public void testJsonSerializationAndDeserialization() throws IOException
+ {
+ final String json = objectMapper.writeValueAsString( brokerSettings );
+
+ System.out.println( json );
+ final BrokerSettings restoredObject = objectMapper.readValue( json, BrokerSettings.class );
+
+ assertEquals( brokerSettings, restoredObject );
+ }
+}
\ No newline at end of file