Skip to content

Commit

Permalink
MQTT version 5, create topic collection in constructor, added automat…
Browse files Browse the repository at this point in the history
…ic reconnect
  • Loading branch information
NehaSelvan1512 committed Sep 13, 2023
1 parent 6075479 commit db22a1d
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import com.google.common.collect.ImmutableList;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.io.File;
Expand Down Expand Up @@ -212,6 +210,15 @@ public MqttStreamServer( TransactionManager transactionManager, Authenticator au
this.createCommonCollection = true;
createStreamCollection( this.commonCollectionName );
}
} else {
for( String topic : toList( settings.get( "topics" ) ) ) {
topic = topic.replace( '#', '_' )
.replace( '+', '_' )
.replace( '/', '_' );
if ( !this.commonCollection.get() && !collectionExists( topic ) ) {
createStreamCollection( topic );
}
}
}
String queryString = settings.get( "filterQuery" );
if ( queryString != null && !queryString.isBlank() ) {
Expand All @@ -231,6 +238,7 @@ public void run() {
.identifier( getUniqueName() )
.serverHost( brokerAddress )
.serverPort( brokerPort )
.automaticReconnectWithDefaultConfig()
.sslConfig()
//TODO: delete or enter password from GUI password thinghere and in method
.keyManagerFactory( SslHelper.createKeyManagerFactory( "polyphenyClient.crt", "polyphenyClient.key", "" ) )
Expand All @@ -242,6 +250,7 @@ public void run() {
.identifier( getUniqueName() )
.serverHost( brokerAddress )
.serverPort( brokerPort )
.automaticReconnectWithDefaultConfig()
.buildAsync();
}

Expand Down Expand Up @@ -494,9 +503,6 @@ public void subscribe( String topic ) {
throw new RuntimeException( "Subscription was not successful. Please try again.", throwable );
} else {
this.topicsMap.put( topic, new AtomicLong( 0 ) );
if ( !this.commonCollection.get() && !collectionExists( topic ) ) {
createStreamCollection( topic );
}
}
} );
//info: no notify() here, because otherwise only the first topic will be subscribed from the method subscribeToAll().
Expand Down

0 comments on commit db22a1d

Please sign in to comment.