diff --git a/README.md b/README.md
index f5f8467..7bc1a19 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@ MQTT Plugin for Graylog
This is an input plugin that allows you to subscribe to an [MQTT](http://mqtt.org) broker and index all published messages.
-**Required Graylog version:** 2.0.0 and later
+**Required Graylog version:** 2.4.0 and later
## Installation
diff --git a/pom.xml b/pom.xml
index 2c30052..e925eeb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,11 +1,13 @@
4.0.0
-
- 3.0
-
- org.graylog.plugins
+
+ org.graylog.plugins
+ graylog-plugin-parent
+ 2.4.0
+
+
graylog-plugin-mqtt
1.2.0-SNAPSHOT
@@ -19,73 +21,88 @@
- scm:git:git@github.com:Graylog2/graylog-plugin-mqtt.git
- scm:git:git@github.com:Graylog2/graylog-plugin-mqtt.git
- https://github.com/Graylog2/graylog-plugin-mqtt
+ scm:git:git@github.com:graylog-labs/graylog-plugin-mqtt.git
+ scm:git:git@github.com:graylog-labs/graylog-plugin-mqtt.git
+ https://github.com/graylog-labs/graylog-plugin-mqtt
HEAD
- UTF-8
- 1.8
- 1.8
+ true
+ true
true
true
true
- /usr/share/graylog-server/plugin
- 2.0.0
+
+ 2.4.0
- net.sf.xenqtt
+ net.xenqtt
xenqtt
- 0.9.7
+ 1.0.0
- org.graylog2
- graylog2-server
- ${graylog2.version}
+ com.google.auto.service
+ auto-service
+ ${auto-service.version}
provided
- com.google.auto.service
- auto-service
- 1.0-rc2
+ com.google.auto.value
+ auto-value
+ ${auto-value.version}
provided
junit
junit
- 4.12
+ ${junit.version}
test
org.mockito
mockito-core
- 2.0.52-beta
+ ${mockito.version}
test
+
+
+ src/main/resources
+ true
+
+
org.apache.maven.plugins
maven-compiler-plugin
- 3.5.1
- com.google.auto.service.processor.AutoServiceProcessor
-
+ com.google.auto.service.processor.AutoServiceProcessor
+ com.google.auto.value.processor.AutoValueProcessor
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ ${project.groupId}.${project.artifactId}
+
+
+
+
org.apache.maven.plugins
maven-shade-plugin
- 2.4.3
- true
+ false
+ false
@@ -93,76 +110,14 @@
shade
+
+
+
+
+
-
-
- org.apache.maven.plugins
- maven-release-plugin
- 2.5.2
-
- true
- forked-path
- @{project.version}
- clean test
- package
-
-
-
-
- org.vafer
- jdeb
- 1.5
-
- ${project.build.directory}/${project.artifactId}-${project.version}.deb
-
-
- ${project.build.directory}/
- ${project.build.finalName}.jar
- directory
-
- perm
- ${graylog2.plugin-dir}
- 644
- root
- root
-
-
-
-
-
-
-
- org.codehaus.mojo
- rpm-maven-plugin
- 2.1.5
-
- Application/Internet
- /usr
-
- _unpackaged_files_terminate_build 0
- _binaries_in_noarch_packages_terminate_build 0
-
- 644
- 755
- root
- root
-
-
- ${graylog2.plugin-dir}
-
-
-
-
-
-
-
diff --git a/src/main/java/org/graylog2/inputs/mqtt/ClientListener.java b/src/main/java/org/graylog2/inputs/mqtt/ClientListener.java
index ddb3f53..c59b7b3 100644
--- a/src/main/java/org/graylog2/inputs/mqtt/ClientListener.java
+++ b/src/main/java/org/graylog2/inputs/mqtt/ClientListener.java
@@ -2,17 +2,16 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import net.sf.xenqtt.client.MqttClient;
-import net.sf.xenqtt.client.MqttClientListener;
-import net.sf.xenqtt.client.PublishMessage;
-import net.sf.xenqtt.client.Subscription;
-import net.sf.xenqtt.message.ConnectReturnCode;
+import net.xenqtt.client.MqttClient;
+import net.xenqtt.client.MqttClientListener;
+import net.xenqtt.client.PublishMessage;
+import net.xenqtt.client.Subscription;
+import net.xenqtt.message.ConnectReturnCode;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.journal.RawMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
import java.util.List;
import static com.codahale.metrics.MetricRegistry.name;
diff --git a/src/main/java/org/graylog2/inputs/mqtt/MQTTGELFInput.java b/src/main/java/org/graylog2/inputs/mqtt/MQTTGELFInput.java
index 2f8e955..37d1803 100644
--- a/src/main/java/org/graylog2/inputs/mqtt/MQTTGELFInput.java
+++ b/src/main/java/org/graylog2/inputs/mqtt/MQTTGELFInput.java
@@ -1,39 +1,15 @@
package org.graylog2.inputs.mqtt;
import com.codahale.metrics.MetricRegistry;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import com.google.common.hash.Hashing;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
-import net.sf.xenqtt.client.AsyncMqttClient;
-import net.sf.xenqtt.client.MqttClient;
-import net.sf.xenqtt.client.MqttClientConfig;
-import net.sf.xenqtt.client.Subscription;
-import net.sf.xenqtt.message.ConnectReturnCode;
-import net.sf.xenqtt.message.QoS;
import org.graylog2.inputs.codecs.GelfCodec;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
-import org.graylog2.plugin.buffers.Buffer;
import org.graylog2.plugin.configuration.Configuration;
-import org.graylog2.plugin.configuration.ConfigurationException;
-import org.graylog2.plugin.configuration.ConfigurationRequest;
-import org.graylog2.plugin.configuration.fields.BooleanField;
-import org.graylog2.plugin.configuration.fields.ConfigurationField;
-import org.graylog2.plugin.configuration.fields.NumberField;
-import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
-import org.graylog2.plugin.inputs.MisfireException;
-import org.graylog2.plugin.system.NodeId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.inject.Inject;
-import java.util.List;
-import java.util.Map;
public class MQTTGELFInput extends MessageInput {
private static final String NAME = "MQTT TCP (GELF)";
@@ -63,7 +39,7 @@ public interface Factory extends MessageInput.Factory {
public static class Descriptor extends MessageInput.Descriptor {
@Inject
public Descriptor() {
- super(NAME, false, "");
+ super(NAME, false, "https://github.com/graylog-labs/graylog-plugin-mqtt");
}
}
diff --git a/src/main/java/org/graylog2/inputs/mqtt/MQTTInputMetadata.java b/src/main/java/org/graylog2/inputs/mqtt/MQTTInputMetadata.java
index 6ecdc9d..b6eb0a3 100644
--- a/src/main/java/org/graylog2/inputs/mqtt/MQTTInputMetadata.java
+++ b/src/main/java/org/graylog2/inputs/mqtt/MQTTInputMetadata.java
@@ -9,6 +9,8 @@
import java.util.Set;
public class MQTTInputMetadata implements PluginMetaData {
+ private static final String PLUGIN_PROPERTIES = "org.graylog.plugins.graylog-plugin-mqtt/graylog-plugin.properties";
+
@Override
public String getUniqueId() {
return MQTTGELFInput.class.getCanonicalName();
@@ -31,7 +33,7 @@ public URI getURL() {
@Override
public Version getVersion() {
- return new Version(2, 0, 0);
+ return Version.fromPluginProperties(getClass(), PLUGIN_PROPERTIES, "version", Version.from(1, 0, 0));
}
@Override
@@ -41,7 +43,7 @@ public String getDescription() {
@Override
public Version getRequiredVersion() {
- return new Version(2, 0, 0);
+ return Version.fromPluginProperties(getClass(), PLUGIN_PROPERTIES, "graylog.version", Version.from(2, 4, 0));
}
@Override
diff --git a/src/main/java/org/graylog2/inputs/mqtt/MQTTInputModule.java b/src/main/java/org/graylog2/inputs/mqtt/MQTTInputModule.java
index 8c92c4e..a502d85 100644
--- a/src/main/java/org/graylog2/inputs/mqtt/MQTTInputModule.java
+++ b/src/main/java/org/graylog2/inputs/mqtt/MQTTInputModule.java
@@ -5,7 +5,10 @@
public class MQTTInputModule extends PluginModule {
@Override
protected void configure() {
- installTransport(transportMapBinder(), "mqtt-transport", MQTTTransport.class);
- installInput(inputsMapBinder(), MQTTGELFInput.class, MQTTGELFInput.Factory.class);
+ addTransport("mqtt-transport", MQTTTransport.class, MQTTTransport.Config.class, MQTTTransport.Factory.class);
+
+ addMessageInput(MQTTGELFInput.class, MQTTGELFInput.Factory.class);
+ addMessageInput(MQTTRawInput.class, MQTTRawInput.Factory.class);
+ addMessageInput(MQTTSyslogInput.class, MQTTSyslogInput.Factory.class);
}
}
diff --git a/src/main/java/org/graylog2/inputs/mqtt/MQTTInputPlugin.java b/src/main/java/org/graylog2/inputs/mqtt/MQTTInputPlugin.java
index f1325d9..303a31f 100644
--- a/src/main/java/org/graylog2/inputs/mqtt/MQTTInputPlugin.java
+++ b/src/main/java/org/graylog2/inputs/mqtt/MQTTInputPlugin.java
@@ -15,6 +15,7 @@ public PluginMetaData metadata() {
return new MQTTInputMetadata();
}
+ @Override
public Collection modules() {
return ImmutableSet.of(new MQTTInputModule());
}
diff --git a/src/main/java/org/graylog2/inputs/mqtt/MQTTRawInput.java b/src/main/java/org/graylog2/inputs/mqtt/MQTTRawInput.java
new file mode 100644
index 0000000..13aeb2c
--- /dev/null
+++ b/src/main/java/org/graylog2/inputs/mqtt/MQTTRawInput.java
@@ -0,0 +1,52 @@
+package org.graylog2.inputs.mqtt;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+import org.graylog2.inputs.codecs.RawCodec;
+import org.graylog2.plugin.LocalMetricRegistry;
+import org.graylog2.plugin.ServerStatus;
+import org.graylog2.plugin.configuration.Configuration;
+import org.graylog2.plugin.inputs.MessageInput;
+
+import javax.inject.Inject;
+
+public class MQTTRawInput extends MessageInput {
+ private static final String NAME = "MQTT TCP (Raw/Plaintext)";
+
+ @AssistedInject
+ public MQTTRawInput(final MetricRegistry metricRegistry,
+ @Assisted Configuration configuration,
+ MQTTTransport.Factory mqttTransportFactory,
+ RawCodec.Factory rawCodecFactory,
+ LocalMetricRegistry localRegistry,
+ Config config,
+ Descriptor descriptor, ServerStatus serverStatus) {
+ super(metricRegistry, configuration, mqttTransportFactory.create(configuration), localRegistry, rawCodecFactory.create(configuration), config, descriptor, serverStatus);
+ }
+
+ public interface Factory extends MessageInput.Factory {
+ @Override
+ MQTTRawInput create(Configuration configuration);
+
+ @Override
+ Config getConfig();
+
+ @Override
+ Descriptor getDescriptor();
+ }
+
+ public static class Descriptor extends MessageInput.Descriptor {
+ @Inject
+ public Descriptor() {
+ super(NAME, false, "https://github.com/graylog-labs/graylog-plugin-mqtt");
+ }
+ }
+
+ public static class Config extends MessageInput.Config {
+ @Inject
+ public Config(MQTTTransport.Factory transport, RawCodec.Factory codec) {
+ super(transport.getConfig(), codec.getConfig());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/graylog2/inputs/mqtt/MQTTSyslogInput.java b/src/main/java/org/graylog2/inputs/mqtt/MQTTSyslogInput.java
new file mode 100644
index 0000000..4159ae7
--- /dev/null
+++ b/src/main/java/org/graylog2/inputs/mqtt/MQTTSyslogInput.java
@@ -0,0 +1,52 @@
+package org.graylog2.inputs.mqtt;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+import org.graylog2.inputs.codecs.SyslogCodec;
+import org.graylog2.plugin.LocalMetricRegistry;
+import org.graylog2.plugin.ServerStatus;
+import org.graylog2.plugin.configuration.Configuration;
+import org.graylog2.plugin.inputs.MessageInput;
+
+import javax.inject.Inject;
+
+public class MQTTSyslogInput extends MessageInput {
+ private static final String NAME = "MQTT TCP (Syslog)";
+
+ @AssistedInject
+ public MQTTSyslogInput(final MetricRegistry metricRegistry,
+ @Assisted Configuration configuration,
+ MQTTTransport.Factory mqttTransportFactory,
+ SyslogCodec.Factory syslogCodecFactory,
+ LocalMetricRegistry localRegistry,
+ Config config,
+ Descriptor descriptor, ServerStatus serverStatus) {
+ super(metricRegistry, configuration, mqttTransportFactory.create(configuration), localRegistry, syslogCodecFactory.create(configuration), config, descriptor, serverStatus);
+ }
+
+ public interface Factory extends MessageInput.Factory {
+ @Override
+ MQTTSyslogInput create(Configuration configuration);
+
+ @Override
+ Config getConfig();
+
+ @Override
+ Descriptor getDescriptor();
+ }
+
+ public static class Descriptor extends MessageInput.Descriptor {
+ @Inject
+ public Descriptor() {
+ super(NAME, false, "https://github.com/graylog-labs/graylog-plugin-mqtt");
+ }
+ }
+
+ public static class Config extends MessageInput.Config {
+ @Inject
+ public Config(MQTTTransport.Factory transport, SyslogCodec.Factory codec) {
+ super(transport.getConfig(), codec.getConfig());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/graylog2/inputs/mqtt/MQTTTransport.java b/src/main/java/org/graylog2/inputs/mqtt/MQTTTransport.java
index ccdd54e..4876ea6 100644
--- a/src/main/java/org/graylog2/inputs/mqtt/MQTTTransport.java
+++ b/src/main/java/org/graylog2/inputs/mqtt/MQTTTransport.java
@@ -7,18 +7,18 @@
import com.google.common.hash.Hashing;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
-import net.sf.xenqtt.MqttCommandCancelledException;
-import net.sf.xenqtt.MqttInterruptedException;
-import net.sf.xenqtt.MqttInvocationError;
-import net.sf.xenqtt.MqttInvocationException;
-import net.sf.xenqtt.MqttTimeoutException;
-import net.sf.xenqtt.client.MqttClient;
-import net.sf.xenqtt.client.MqttClientConfig;
-import net.sf.xenqtt.client.MqttClientListener;
-import net.sf.xenqtt.client.Subscription;
-import net.sf.xenqtt.client.SyncMqttClient;
-import net.sf.xenqtt.message.ConnectReturnCode;
-import net.sf.xenqtt.message.QoS;
+import net.xenqtt.MqttCommandCancelledException;
+import net.xenqtt.MqttInterruptedException;
+import net.xenqtt.MqttInvocationError;
+import net.xenqtt.MqttInvocationException;
+import net.xenqtt.MqttTimeoutException;
+import net.xenqtt.client.MqttClient;
+import net.xenqtt.client.MqttClientConfig;
+import net.xenqtt.client.MqttClientListener;
+import net.xenqtt.client.Subscription;
+import net.xenqtt.client.SyncMqttClient;
+import net.xenqtt.message.ConnectReturnCode;
+import net.xenqtt.message.QoS;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
diff --git a/src/main/resources/org.graylog.plugins.graylog-plugin-mqtt/graylog-plugin.properties b/src/main/resources/org.graylog.plugins.graylog-plugin-mqtt/graylog-plugin.properties
new file mode 100644
index 0000000..80911d4
--- /dev/null
+++ b/src/main/resources/org.graylog.plugins.graylog-plugin-mqtt/graylog-plugin.properties
@@ -0,0 +1,12 @@
+# The plugin version
+version=${project.version}
+
+# The required Graylog server version
+graylog.version=${graylog.version}
+
+# When set to true (the default) the plugin gets a separate class loader
+# when loading the plugin. When set to false, the plugin shares a class loader
+# with other plugins that have isolated=false.
+#
+# Do not disable this unless this plugin depends on another plugin!
+#isolated=false
diff --git a/src/test/java/org/graylog2/inputs/mqtt/ClientListenerTest.java b/src/test/java/org/graylog2/inputs/mqtt/ClientListenerTest.java
index 0899cb1..a984a73 100644
--- a/src/test/java/org/graylog2/inputs/mqtt/ClientListenerTest.java
+++ b/src/test/java/org/graylog2/inputs/mqtt/ClientListenerTest.java
@@ -1,22 +1,22 @@
package org.graylog2.inputs.mqtt;
import com.codahale.metrics.MetricRegistry;
-import net.sf.xenqtt.client.MqttClient;
-import net.sf.xenqtt.client.PublishMessage;
-import net.sf.xenqtt.client.Subscription;
-import net.sf.xenqtt.message.ConnectReturnCode;
-import net.sf.xenqtt.message.QoS;
-import org.graylog2.plugin.buffers.Buffer;
+import net.xenqtt.client.MqttClient;
+import net.xenqtt.client.PublishMessage;
+import net.xenqtt.client.Subscription;
+import net.xenqtt.message.ConnectReturnCode;
+import net.xenqtt.message.QoS;
import org.graylog2.plugin.buffers.BufferOutOfCapacityException;
import org.graylog2.plugin.buffers.ProcessingDisabledException;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
-import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
import java.util.Collections;
import java.util.List;
@@ -25,16 +25,14 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assume.assumeThat;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@RunWith(MockitoJUnitRunner.class)
public class ClientListenerTest {
+ @Rule
+ public final MockitoRule mockitoRule = MockitoJUnit.rule();
- @Mock
- private Buffer processBuffer;
@Mock
private MessageInput messageInput;
@Mock