Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
implemented pooled connection factory to AMQ
  • Loading branch information
tjamakeev committed Apr 13, 2018
1 parent dd28618 commit c8f2cbf
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
<artifactId>nimbus-jose-jwt</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package io.subutai.core.environment.metadata.impl;


import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import javax.jms.ConnectionFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import io.subutai.common.command.CommandException;
import io.subutai.common.command.RequestBuilder;
Expand All @@ -24,7 +24,11 @@
import io.subutai.core.identity.api.exception.TokenCreateException;
import io.subutai.core.peer.api.PeerManager;
import io.subutai.hub.share.Utils;
import io.subutai.hub.share.dto.BrokerSettingsDto;
import io.subutai.hub.share.broker.BrokerConnectionFactory;
import io.subutai.hub.share.broker.BrokerSettings;
import io.subutai.hub.share.broker.BrokerTransport;
import io.subutai.hub.share.broker.InvalidTransportException;
import io.subutai.hub.share.broker.TransportNotFoundException;
import io.subutai.hub.share.dto.environment.EnvironmentInfoDto;
import io.subutai.hub.share.event.Event;
import io.subutai.hub.share.json.JsonUtil;
Expand All @@ -37,10 +41,12 @@ public class EnvironmentMetadataManagerImpl implements EnvironmentMetadataManage
{
private static final Logger LOG = LoggerFactory.getLogger( EnvironmentMetadataManagerImpl.class );
private final IdentityManager identityManager;
private Cache<BrokerTransport.Type, BrokerTransport> brokerTransportRequests;
private PeerManager peerManager;
private EnvironmentManager environmentManager;
private BrokerSettings brokerSettings;
private HubManager hubManager;
private BrokerConnectionFactory brokerConnectionFactory = new BrokerConnectionFactory( 1 );
private BrokerSettings brokerSettings = new BrokerSettings();


public EnvironmentMetadataManagerImpl( PeerManager peerManager, EnvironmentManager environmentManager,
Expand All @@ -50,6 +56,7 @@ public EnvironmentMetadataManagerImpl( PeerManager peerManager, EnvironmentManag
this.environmentManager = environmentManager;
this.identityManager = identityManager;
this.hubManager = hubManager;
this.brokerTransportRequests = CacheBuilder.newBuilder().expireAfterWrite( 1, TimeUnit.MINUTES ).build();
}


Expand Down Expand Up @@ -106,53 +113,32 @@ public void pushEvent( final Event event )
LOG.debug( "Event received: {} {}", event, jsonEvent );
LOG.debug( "OS: {}", event.getCustomMetaByKey( "OS" ) );
String destination = "events." + event.getOrigin().getId();

checkBrokerSettings( false );
// TODO: 4/12/18 need to implement connections pool something like below; while creating connection
// every time with random URI
// ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
// "vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false");
// JmsPoolConnectionFactory cf = new JmsPoolConnectionFactory();
// cf.setConnectionFactory(amq);
// cf.setMaxConnections(3);
thread( new EventProducer( this.brokerSettings.getBroker(), destination, jsonEvent ), true );
ConnectionFactory cf = getConnectionFactory( BrokerTransport.Type.TCP );
thread( new EventProducer( cf, destination, jsonEvent ), true );
}
catch ( JsonProcessingException | URISyntaxException | BrokerSettingException e )
catch ( JsonProcessingException | TransportNotFoundException | InvalidTransportException e )
{
LOG.error( e.getMessage(), e );
}
}


private void checkBrokerSettings( boolean retrieve ) throws BrokerSettingException
private ConnectionFactory getConnectionFactory( BrokerTransport.Type type )
throws TransportNotFoundException, InvalidTransportException
{
if ( !( brokerSettings == null || retrieve ) )
// trying to get the previous connection factory
ConnectionFactory cf = brokerConnectionFactory.getConnectionFactory( type );
if ( cf == null )
{
return;
}

try
{
final BrokerSettingsDto response = hubManager.getBrokers();
if ( response == null )
{
throw new BrokerSettingException( "Could not retrieve broker settings." );
}
List<URI> list = new ArrayList<>();
for ( String s : response.getBrokers() )
// seems it is first time to obtain connection factory
BrokerTransport transport = requestBrokerTransport( type );
if ( transport == null )
{
list.add( new URI( s ) );
throw new TransportNotFoundException( "Transport not found for type: " + type );
}
if ( list.size() == 0 )
{
throw new BrokerSettingException( "Broker URI list is empty." );
}
this.brokerSettings = new BrokerSettings( list );
}
catch ( URISyntaxException e )
{
throw new BrokerSettingException( e );
cf = brokerConnectionFactory.getConnectionFactory( transport );
}
return cf;
}


Expand All @@ -171,21 +157,15 @@ private void placeTokenIntoContainer( ContainerHost containerHost, String token
}


private class BrokerSettings
private BrokerTransport requestBrokerTransport( final BrokerTransport.Type type )
{
List<URI> uriList;


public BrokerSettings( final List<URI> uriList )
{
this.uriList = uriList;
}

BrokerTransport transport = this.brokerTransportRequests.getIfPresent( type );

public URI getBroker()
if ( transport == null )
{
return uriList.get( new Random().nextInt( uriList.size() ) );
transport = hubManager.getBrokerTransport( type );
}
return transport;
}
}

Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package io.subutai.core.environment.metadata.impl;


import java.net.URI;
import java.net.URISyntaxException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
Expand All @@ -15,26 +13,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.activemq.ActiveMQConnectionFactory;


public class EventProducer implements Runnable
{
private static final Logger LOG = LoggerFactory.getLogger( EventProducer.class );
private final String destination;
private String message;

private URI uri;
private ActiveMQConnectionFactory connectionFactory;
private ConnectionFactory connectionFactory;


public EventProducer( final URI uri, final String destination, final String message ) throws URISyntaxException
public EventProducer( final ConnectionFactory connectionFactory, final String destination, final String message )
{
this.uri = uri;
this.destination = destination;
this.message = message;
// Create a ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory( uri );
this.connectionFactory = connectionFactory;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

import io.subutai.core.hubmanager.api.exception.HubManagerException;
import io.subutai.core.hubmanager.api.model.Config;
import io.subutai.hub.share.dto.BrokerSettingsDto;
import io.subutai.hub.share.broker.BrokerSettings;
import io.subutai.hub.share.broker.BrokerTransport;


public interface HubManager
Expand Down Expand Up @@ -52,5 +53,5 @@ public interface HubManager

void notifyHubThatPeerIsOffline();

BrokerSettingsDto getBrokers();
BrokerTransport getBrokerTransport(BrokerTransport.Type type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
import io.subutai.core.peer.api.PeerManager;
import io.subutai.core.security.api.SecurityManager;
import io.subutai.core.systemmanager.api.SystemManager;
import io.subutai.hub.share.broker.BrokerTransport;
import io.subutai.hub.share.common.HubEventListener;
import io.subutai.hub.share.dto.BrokerSettingsDto;
import io.subutai.hub.share.dto.PeerDto;
import io.subutai.hub.share.dto.PeerProductDataDto;
import io.subutai.hub.share.dto.UserDto;
Expand Down Expand Up @@ -811,12 +811,13 @@ public void onRhDisconnected( final ResourceHostInfo resourceHostInfo )


@Override
public BrokerSettingsDto getBrokers()
public BrokerTransport getBrokerTransport( BrokerTransport.Type type )
{
final HubRestClient client = new HubRestClient( configManager );

final RestResult<BrokerSettingsDto> response =
client.get( "/rest/v1/brokers/" + localPeer.getId(), BrokerSettingsDto.class );
final RestResult<BrokerTransport> response =
client.get( String.format( "/rest/v1/broker/transport/%s/%s", type, localPeer.getId() ),
BrokerTransport.class );

return response.getEntity();
}
Expand Down
24 changes: 21 additions & 3 deletions management/server/subutai-hub-share/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>

<dependency>
Expand All @@ -62,6 +62,18 @@
<version>2.6.1</version>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.3</version>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
<version>5.15.3</version>
</dependency>

<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
Expand All @@ -88,6 +100,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.subutai.hub.share.broker;


import java.util.HashMap;
import java.util.Map;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

import com.google.common.base.Preconditions;


public class BrokerConnectionFactory
{

private final int poolSize;
private Map<BrokerTransport.Type, PooledConnectionFactory> pool = new HashMap<>();


public BrokerConnectionFactory( int poolSize )
{
Preconditions.checkArgument( poolSize > 0 );
Preconditions.checkArgument( poolSize < 5 );

this.poolSize = poolSize;
}


public ConnectionFactory getConnectionFactory( BrokerTransport.Type type )
{
return pool.get( type );
}


public ConnectionFactory getConnectionFactory( BrokerTransport transport ) throws InvalidTransportException
{
Preconditions.checkNotNull( transport );

BrokerTransport.Type type = BrokerTransport.getType( transport.getUri() );

PooledConnectionFactory cf = this.pool.get( type );

if ( cf != null )
{
return cf;
}
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory( transport.getUri() );
cf = new PooledConnectionFactory();
cf.setConnectionFactory( amq );
cf.setMaxConnections( this.poolSize );
this.pool.put( type, cf );

return cf;
}
}
Loading

0 comments on commit c8f2cbf

Please sign in to comment.