diff --git a/pom.xml b/pom.xml index a7bb7f6..046c561 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ com.plugin graylog-plugin-http-output - 1.0.3-SNAPSHOT + 1.0.4-SNAPSHOT jar ${project.artifactId} diff --git a/src/main/java/com/plugin/HttpOutput.java b/src/main/java/com/plugin/HttpOutput.java index 8b8149d..01ca25a 100644 --- a/src/main/java/com/plugin/HttpOutput.java +++ b/src/main/java/com/plugin/HttpOutput.java @@ -5,6 +5,7 @@ import java.net.URL; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.concurrent.TimeUnit; import okhttp3.*; @@ -33,71 +34,29 @@ * interfaces. (i.e. AlarmCallback, MessageInput, MessageOutput) */ public class HttpOutput implements MessageOutput { - - public static final int HTTP_BATCH_SIZE = 500; private final OkHttpClient httpClient; private final Gson gson = new Gson(); private boolean shutdown; private boolean raise_exception_on_http_error = false; private String url; + private String intake_key; private static final String CK_OUTPUT_API = "output_api"; - private static final String CK_GZIP_REQUEST = "gzip_request"; + private static final String CK_INTAKE_KEY = "intake_key"; private static final String CK_RAISE_EXCEPTION = "raise_exception"; private static final Logger LOG = LoggerFactory.getLogger(HttpOutput.class); private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8"); - /** - * This interceptor compresses the HTTP request body. Many webservers can't handle this! - *

- * taken from https://square.github.io/okhttp/interceptors/ - */ - final class GzipRequestInterceptor implements Interceptor { - @Override - public Response intercept(Interceptor.Chain chain) throws IOException { - Request originalRequest = chain.request(); - if (originalRequest.body() == null || originalRequest.header("Content-Encoding") != null) { - return chain.proceed(originalRequest); - } - - Request compressedRequest = originalRequest.newBuilder() - .header("Content-Encoding", "gzip") - .method(originalRequest.method(), gzip(originalRequest.body())) - .build(); - return chain.proceed(compressedRequest); - } - - private RequestBody gzip(final RequestBody body) { - return new RequestBody() { - @Override - public MediaType contentType() { - return body.contentType(); - } - - @Override - public long contentLength() { - return -1; // We don't know the compressed length in advance! - } - - @Override - public void writeTo(BufferedSink sink) throws IOException { - BufferedSink gzipSink = Okio.buffer(new GzipSink(sink)); - body.writeTo(gzipSink); - gzipSink.close(); - } - }; - } - } - @Inject public HttpOutput(@Assisted Stream stream, @Assisted Configuration conf) throws HttpOutputException { this.url = conf.getString(CK_OUTPUT_API); + this.intake_key = conf.getString(CK_INTAKE_KEY); this.raise_exception_on_http_error = conf.getBoolean(CK_RAISE_EXCEPTION); this.shutdown = false; LOG.info(" Http Output Plugin has been configured with the following parameters:"); LOG.info(CK_OUTPUT_API + " : " + this.url); - LOG.info(CK_GZIP_REQUEST + " : " + conf.getBoolean(CK_GZIP_REQUEST)); + LOG.info(CK_INTAKE_KEY + " : " + this.intake_key); LOG.info(CK_RAISE_EXCEPTION + " : " + this.raise_exception_on_http_error); try { @@ -112,9 +71,6 @@ public HttpOutput(@Assisted Stream stream, @Assisted Configuration conf) throws .writeTimeout(10, TimeUnit.SECONDS) .readTimeout(5, TimeUnit.SECONDS); - if (conf.getBoolean(CK_GZIP_REQUEST)) { - clientBuilder.addInterceptor(new GzipRequestInterceptor()); - } this.httpClient = clientBuilder.build(); } @@ -132,26 +88,22 @@ public void stop() { @Override public void write(List msgs) throws Exception { - for (List partition : ListUtils.partition(msgs, HTTP_BATCH_SIZE)) { - List> payload = new FastArrayList(); - for (Message msg : partition) { - payload.add(msg.getFields()); - } - this.executeRequest(RequestBody.create( - JSON, - this.gson.toJson(payload) - )); + for(Message message: msgs) { + this.write(message); } - } @Override public void write(Message msg) throws Exception { + Map payload = new HashMap<>(); + payload.put("intake_key", this.intake_key); + payload.put("json", this.gson.toJson(msg.getFields())); + this.executeRequest( RequestBody.create( JSON, - this.gson.toJson(msg.getFields()) + this.gson.toJson(payload) ) ); } @@ -204,8 +156,9 @@ public ConfigurationRequest getRequestedConfiguration() { new TextField(CK_OUTPUT_API, "API to forward the stream data.", "/", "HTTP address where the stream data to be sent.", ConfigurationField.Optional.NOT_OPTIONAL)); - configurationRequest.addField(new BooleanField(CK_GZIP_REQUEST, "GZIP request", false, - "Enable GZIP compression for HTTP requests.")); + configurationRequest.addField( + new TextField(CK_INTAKE_KEY, "Intake key", "", + "The intake key to identify the events", ConfigurationField.Optional.NOT_OPTIONAL)); configurationRequest.addField(new BooleanField(CK_RAISE_EXCEPTION, "Raise exception", false, "Raise an exception on HTTP error"));