Skip to content

Commit

Permalink
Debugging with Unit tests and added Qos
Browse files Browse the repository at this point in the history
made namespace unmodifiable,
Tested: saveQuery, reload, toList,
  • Loading branch information
NehaSelvan1512 committed Sep 15, 2023
1 parent db22a1d commit 11302ba
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.collect.ImmutableList;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.io.File;
Expand All @@ -42,6 +43,7 @@
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.KeyManagerFactory;
Expand Down Expand Up @@ -136,8 +138,8 @@ public static class MqttStreamServer extends QueryInterface {
new QueryInterfaceSettingInteger( "brokerPort", false, true, false, null ),
new QueryInterfaceSettingString( "namespace", false, true, true, null ),
// "RELATIONAL", "GRAPH" types are not supported yet.
new QueryInterfaceSettingList( "namespaceType", false, true, true,
new ArrayList<>( List.of( "DOCUMENT", "RELATIONAL", "GRAPH" ) ) ),
new QueryInterfaceSettingList( "namespaceType", false, true, false,
new ArrayList<>( List.of( "DOCUMENT") ) ),
new QueryInterfaceSettingList( "commonCollection", false, true, true, new ArrayList<>( List.of( "TRUE", "FALSE" ) ) ),
new QueryInterfaceSettingString( "commonCollectionName", true, false, true, null ),
new QueryInterfaceSettingString( "topics", false, true, true, null ),
Expand Down Expand Up @@ -170,7 +172,7 @@ public static class MqttStreamServer extends QueryInterface {
private String commonCollectionName;
private final long databaseId;
private final int userId;
final boolean ssl; // todo this.
private final boolean ssl; // todo this.
boolean createCommonCollection = false;
private final Object settingsLock = new Object();
private final MonitoringPage monitoringPage;
Expand Down Expand Up @@ -210,7 +212,7 @@ public MqttStreamServer( TransactionManager transactionManager, Authenticator au
this.createCommonCollection = true;
createStreamCollection( this.commonCollectionName );
}
} else {
} else if ( settings.get( "topics" ) != null ) {
for( String topic : toList( settings.get( "topics" ) ) ) {
topic = topic.replace( '#', '_' )
.replace( '+', '_' )
Expand Down Expand Up @@ -238,7 +240,6 @@ public void run() {
.identifier( getUniqueName() )
.serverHost( brokerAddress )
.serverPort( brokerPort )
.automaticReconnectWithDefaultConfig()
.sslConfig()
//TODO: delete or enter password from GUI password thinghere and in method
.keyManagerFactory( SslHelper.createKeyManagerFactory( "polyphenyClient.crt", "polyphenyClient.key", "" ) )
Expand All @@ -250,13 +251,12 @@ public void run() {
.identifier( getUniqueName() )
.serverHost( brokerAddress )
.serverPort( brokerPort )
.automaticReconnectWithDefaultConfig()
.buildAsync();
}

client.connectWith().send().whenComplete( ( connAck, throwable ) -> {
if ( throwable != null ) {
throw new RuntimeException( "Connection to broker could not be established. Please delete and recreate the Plug-In." + throwable );
throw new RuntimeException( "Connection to broker could not be established. Try to reconnect with the 'Reconnect' button" + throwable );
} else {
log.info( "{} started and is listening to broker on {}:{}", INTERFACE_NAME, brokerAddress, brokerPort );
subscribe( toList( this.settings.get( "topics" ) ) );
Expand Down Expand Up @@ -500,13 +500,11 @@ public void subscribe( String topic ) {
this.settings.put( "topics", topicsString );
}
log.info( "not successful: {}", topic );
throw new RuntimeException( "Subscription was not successful. Please try again.", throwable );
throw new RuntimeException( String.format( "Subscription was not successful for topic \"%s\" . Please try again.", topic), throwable );
} else {
this.topicsMap.put( topic, new AtomicLong( 0 ) );
}
} );
//info: no notify() here, because otherwise only the first topic will be subscribed from the method subscribeToAll().

}


Expand Down Expand Up @@ -598,7 +596,11 @@ private void saveQueriesInMap( String queries ) {
while ( !queries.isBlank() ) {
int index = 0;
String topic = queries.substring( 0, queries.indexOf( ":" ) );
queries = queries.replace( topic + ":", "" );
queries = queries.substring( queries.indexOf( ":" ) + 1 );
if ( topic.startsWith( "," ) || topic.startsWith( " ," ) ) {
topic = topic.replaceFirst( ",", "" ).trim();
}

while ( !queries.isBlank() ) {
char c = queries.charAt( index );
if ( c == '{' ) {
Expand All @@ -607,9 +609,11 @@ private void saveQueriesInMap( String queries ) {
} else if ( c == '}' ) {
if ( brackets.pop().equals( '{' ) ) {
if ( brackets.isEmpty() ) {
query = queries.substring( 0, index + 1 );
if ( this.filterMap.containsKey( topic ) && !this.filterMap.get( topic ).equals( queries ) ) {
this.filterMap.replace( topic, query );
query = queries.substring( 0, index + 1 ).trim();
if ( this.filterMap.containsKey( topic ) ) {
if ( !this.filterMap.get( topic ).equals( query ) ) {
this.filterMap.replace( topic, query );
}
} else {
this.filterMap.put( topic, query );
}
Expand Down Expand Up @@ -647,15 +651,17 @@ private String getWildcardTopic( String topic ) {
/**
* separates a string by commas and inserts the separated parts to a list.
*
* @param string
* @param string List of Strings seperated by comma without brackets as a String (entry form UI)
* @return List of seperated string values
*/
public List<String> toList( String string ) {
List<String> list = new ArrayList<>( List.of( string.split( "," ) ) );
for ( int i = 0; i < list.size(); i++ ) {
String topic = list.get( i ).trim();
if ( !topic.isBlank() ) {
if ( !topic.isBlank() || !topic.isEmpty() ) {
list.set( i, topic );
} else {
list.remove( i );
}
}
return list;
Expand Down Expand Up @@ -952,7 +958,11 @@ public MonitoringPage() {
msgButton = new InformationAction( informationGroupPub, "Send a msg", ( parameters ) -> {
String end = "Msg was published!";
try {
client.publishWith().topic( parameters.get( "topic" ) ).payload( parameters.get( "msg" ).getBytes() ).send();
client.publishWith()
.topic( parameters.get( "topic" ) )
.payload( parameters.get( "msg" ).getBytes() )
.qos( MqttQos.AT_LEAST_ONCE )
.send();
} catch ( IllegalArgumentException e ) {
throw new RuntimeException( e );
}
Expand Down
Loading

0 comments on commit 11302ba

Please sign in to comment.