From 168031d3b2a5cb87561e9c479936fc16263d8505 Mon Sep 17 00:00:00 2001 From: Emil Koutanov Date: Wed, 18 Oct 2017 20:06:46 +1100 Subject: [PATCH] Added JSON layout and Splunk appender --- build.gradle | 7 +- json/build.gradle | 1 + .../log4jextras/json/JsonLayout.java | 168 +++++++++++ .../log4jextras/json/JsonLayoutTest.java | 121 ++++++++ json/src/test/resources/log4j-test.properties | 2 +- splunk/build.gradle | 2 + .../splunk/HECTransportConfig.java | 174 ++++++++++++ .../log4jextras/splunk/SplunkHECAppender.java | 199 +++++++++++++ .../log4jextras/splunk/SplunkHECInput.java | 264 ++++++++++++++++++ .../log4jextras/splunk/SplunkInput.java | 140 ++++++++++ .../log4jextras/TestAppender.java | 20 ++ 11 files changed, 1096 insertions(+), 2 deletions(-) create mode 100644 json/src/main/java/com/obsidiandynamics/log4jextras/json/JsonLayout.java create mode 100644 json/src/test/java/com/obsidiandynamics/log4jextras/json/JsonLayoutTest.java create mode 100644 splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/HECTransportConfig.java create mode 100644 splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkHECAppender.java create mode 100644 splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkHECInput.java create mode 100644 splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkInput.java create mode 100644 src/test/java/com/obsidiandynamics/log4jextras/TestAppender.java diff --git a/build.gradle b/build.gradle index 09e8d60..63f02b8 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ apply plugin: 'maven-publish' apply plugin: 'com.jfrog.bintray' group = 'com.obsidiandynamics.log4jextras' -version = '0.1.0' +version = '0.1.0-SNAPSHOT' def envUser = 'BINTRAY_USER' def envKey = 'BINTRAY_KEY' @@ -71,6 +71,11 @@ allprojects { } subprojects { + dependencies { + compile project(':') + + testCompile project(':').sourceSets.test.output + } } task jacocoRootReport(type: JacocoReport) { diff --git a/json/build.gradle b/json/build.gradle index fdd8af8..33cc9f4 100644 --- a/json/build.gradle +++ b/json/build.gradle @@ -2,6 +2,7 @@ def packageName = 'log4j-extras-json' version = project(':').version dependencies { + compile 'com.google.code.gson:gson:2.8.1' } jar { diff --git a/json/src/main/java/com/obsidiandynamics/log4jextras/json/JsonLayout.java b/json/src/main/java/com/obsidiandynamics/log4jextras/json/JsonLayout.java new file mode 100644 index 0000000..ca2b3d5 --- /dev/null +++ b/json/src/main/java/com/obsidiandynamics/log4jextras/json/JsonLayout.java @@ -0,0 +1,168 @@ +package com.obsidiandynamics.log4jextras.json; + +import java.text.*; +import java.util.*; + +import org.apache.log4j.*; +import org.apache.log4j.spi.*; + +import com.google.gson.*; + +/** + * Layout for JSON logging.

+ * + * Adapted from https://github.com/michaeltandy/log4j-json. + */ +public final class JsonLayout extends Layout { + private final Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + private final String hostname = getHostname().toLowerCase(); + private final String username = System.getProperty("user.name").toLowerCase(); + private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + + private Level minimumLevelForSlowLogging = Level.ALL; + private String mdcRoot; + private List mdcFieldsToLog = Collections.emptyList(); + + @Override + public String format(LoggingEvent le) { + final Map r = new LinkedHashMap<>(); + r.put("timestamp", dateFormat.format(new Date(le.timeStamp))); + r.put("host", hostname); + r.put("user", username); + r.put("level", le.getLevel().toString()); + r.put("thread", le.getThreadName()); + r.put("ndc",le.getNDC()); + if (le.getLevel().isGreaterOrEqual(minimumLevelForSlowLogging)) { + r.put("class", le.getLocationInformation().getClassName()); + r.put("line", safeParseInt(le.getLocationInformation().getLineNumber())); + r.put("method", le.getLocationInformation().getMethodName()); + } + r.put("message", safeToString(le.getMessage())); + r.put("throwable", formatThrowable(le) ); + + for (String mdcKey : mdcFieldsToLog) { + if (! r.containsKey(mdcKey)) { + r.put(mdcKey, safeToString(le.getMDC(mdcKey))); + } + } + + if (mdcRoot != null) { + final Object mdcValue = le.getMDC(mdcRoot); + if (mdcValue != null) { + final String[] fields = ((String) mdcValue).split(","); + for (String field : fields) { + final String trimmedField = field.trim(); + r.put(trimmedField, safeToString(le.getMDC(trimmedField))); + } + } + } + + after(le, r); + return gson.toJson(r) + "\n"; + } + + /** + * Method called near the end of formatting a LoggingEvent in case users + * want to override the default object fields. + * + * @param le The event being logged. + * @param r The map which will be output. + */ + public void after(LoggingEvent le, Map r) {} + + /** + * LoggingEvent messages can have any type, and we call toString on them. As + * the user can define the toString method, we should catch any exceptions. + * + * @param obj The object to parse. + * @return The string value. + */ + private static String safeToString(Object obj) { + if (obj == null) return null; + try { + return obj.toString(); + } catch (Throwable t) { + return "Error getting message: " + t.getMessage(); + } + } + + /** + * Safe integer parser, for when line numbers aren't available. See for + * example https://github.com/michaeltandy/log4j-json/issues/1 + * + * @param obj The object to parse. + * @return The int value + */ + private static Integer safeParseInt(String obj) { + try { + return Integer.parseInt(obj.toString()); + } catch (NumberFormatException t) { + return null; + } + } + + /** + * If a throwable is present, format it with newlines between stack trace + * elements. Otherwise return null. + * + * @param le The logging event. + */ + private String formatThrowable(LoggingEvent le) { + if (le.getThrowableInformation() == null || + le.getThrowableInformation().getThrowable() == null) + return null; + + return mkString(le.getThrowableStrRep(), "\n"); + } + + private String mkString(Object[] parts,String separator) { + final StringBuilder sb = new StringBuilder(); + for (int i = 0; ; i++) { + sb.append(parts[i]); + if (i == parts.length - 1) + return sb.toString(); + sb.append(separator); + } + } + + @Override + public boolean ignoresThrowable() { + return false; + } + + @Override + public void activateOptions() {} + + private static String getHostname() { + String hostname; + try { + hostname = java.net.InetAddress.getLocalHost().getHostName(); + } catch (Exception e) { + hostname = "Unknown, " + e.getMessage(); + } + return hostname; + } + + public void setMinimumLevelForSlowLogging(String level) { + minimumLevelForSlowLogging = Level.toLevel(level, Level.ALL); + } + + public void setMdcRoot(String mdcRoot) { + this.mdcRoot = mdcRoot; + } + + public void setMdcFieldsToLog(String toLog) { + if (toLog == null || toLog.isEmpty()) { + mdcFieldsToLog = Collections.emptyList(); + } else { + final ArrayList listToLog = new ArrayList<>(); + for (String token : toLog.split(",")) { + token = token.trim(); + if (! token.isEmpty()) { + listToLog.add(token); + } + } + mdcFieldsToLog = Collections.unmodifiableList(listToLog); + } + } +} diff --git a/json/src/test/java/com/obsidiandynamics/log4jextras/json/JsonLayoutTest.java b/json/src/test/java/com/obsidiandynamics/log4jextras/json/JsonLayoutTest.java new file mode 100644 index 0000000..f1fa7f6 --- /dev/null +++ b/json/src/test/java/com/obsidiandynamics/log4jextras/json/JsonLayoutTest.java @@ -0,0 +1,121 @@ +package com.obsidiandynamics.log4jextras.json; + +import static org.junit.Assert.*; + +import org.apache.log4j.*; +import org.junit.*; +import com.obsidiandynamics.log4jextras.*; + +/** + * Adapted from https://github.com/michaeltandy/log4j-json. + */ +public final class JsonLayoutTest { + private static Logger LOG; + + @BeforeClass + public static void beforeClass() { + System.setProperty("log4j.configuration", "log4j-test.properties"); + LOG = Logger.getLogger(JsonLayoutTest.class); + } + + @AfterClass + public static void afterClass() { + System.clearProperty("log4j.configuration"); + LOG = null; + } + + @Before + public void before() { + TestAppender.baos.reset(); + MDC.clear(); + } + + @Test + public void testDemonstration() { + LOG.info("Example of some logging"); + LOG.warn("Some text\nwith a newline", new Exception("Outer Exception", new Exception("Nested Exception"))); + LOG.fatal("Text may be complicated & have many symbols\n¬!£$%^&*()_+{}:@~<>?,./;'#[]-=`\\| \t\n"); + + final String whatWasLogged = TestAppender.baos.toString(); + final String[] lines = whatWasLogged.split("\n"); + + assertEquals(3,lines.length); + assertTrue(lines[0].contains("INFO")); + assertTrue(lines[0].contains("Example of some logging")); + + assertTrue(lines[1].contains("newline")); + assertTrue(lines[1].contains("Outer Exception")); + assertTrue(lines[1].contains("Nested Exception")); + + assertTrue(lines[2].contains("have many symbols")); + } + + @Test + public void testObjectHandling() { + LOG.info(new Object() { + @Override public String toString() { + throw new RuntimeException("Hypothetical failure"); + } + }); + LOG.warn(null); + + final String whatWasLogged = TestAppender.baos.toString(); + final String[] lines = whatWasLogged.split("\n"); + assertEquals(2,lines.length); + + assertTrue(lines[0].contains("Hypothetical")); + assertTrue(lines[1].contains("WARN")); + } + + @Test + public void testLogMethod() { + // Test for https://github.com/michaeltandy/log4j-json/issues/1 + LOG.log("asdf", Level.INFO, "this is the log message", null); + + final String whatWasLogged = TestAppender.baos.toString(); + final String[] lines = whatWasLogged.split("\n"); + assertEquals(1,lines.length); + assertTrue(lines[0].contains("this is the log message")); + } + + @Test + public void testMinimumLevelForSlowLogging() { + LOG.info("Info level logging"); + LOG.debug("Debug level logging"); + + final String whatWasLogged = TestAppender.baos.toString(); + final String[] lines = whatWasLogged.split("\n"); + assertEquals(2,lines.length); + + assertTrue(lines[0].contains("INFO")); + assertTrue(lines[0].contains("class")); + assertTrue(lines[0].contains("line")); + assertTrue(lines[0].contains("method")); + + assertTrue(lines[1].contains("DEBUG")); + assertFalse(lines[1].contains("class")); + assertFalse(lines[1].contains("line")); + assertFalse(lines[1].contains("method")); + } + + @Test + public void testSelectiveMdcLogging() { + MDC.put("asdf", "value_for_key_asdf"); + MDC.put("qwer", "value_for_key_qwer"); + MDC.put("thread", "attempt to overwrite thread in output"); + + LOG.info("Example of some logging"); + + MDC.clear(); + + final String whatWasLogged = TestAppender.baos.toString(); + final String[] lines = whatWasLogged.split("\n"); + + assertEquals(1,lines.length); + assertTrue(lines[0].contains("value_for_key_asdf")); + assertFalse(lines[0].contains("value_for_key_qwer")); + + assertTrue(lines[0].contains("thread")); + assertFalse(lines[0].contains("attempt to overwrite thread in output")); + } +} diff --git a/json/src/test/resources/log4j-test.properties b/json/src/test/resources/log4j-test.properties index 94f6fab..2ac3137 100644 --- a/json/src/test/resources/log4j-test.properties +++ b/json/src/test/resources/log4j-test.properties @@ -6,7 +6,7 @@ log4j.appender.a.layout=com.obsidiandynamics.log4jextras.json.JsonLayout log4j.appender.a.layout.MinimumLevelForSlowLogging=INFO log4j.appender.a.layout.MdcFieldsToLog=asdf , , thread -log4j.appender.b=com.obsidiandynamics.log4jextras.json.TestAppender +log4j.appender.b=com.obsidiandynamics.log4jextras.TestAppender log4j.appender.b.layout=com.obsidiandynamics.log4jextras.json.JsonLayout log4j.appender.b.layout.MinimumLevelForSlowLogging=INFO log4j.appender.b.layout.MdcFieldsToLog= asdf , , thread \ No newline at end of file diff --git a/splunk/build.gradle b/splunk/build.gradle index 40bfd07..23da195 100644 --- a/splunk/build.gradle +++ b/splunk/build.gradle @@ -2,6 +2,8 @@ def packageName = 'log4j-extras-splunk' version = project(':').version dependencies { + compile 'org.apache.httpcomponents:httpclient:4.5.3' + compile 'org.apache.httpcomponents:httpasyncclient:4.1.3' } jar { diff --git a/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/HECTransportConfig.java b/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/HECTransportConfig.java new file mode 100644 index 0000000..3051ab5 --- /dev/null +++ b/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/HECTransportConfig.java @@ -0,0 +1,174 @@ +package com.obsidiandynamics.log4jextras.splunk; + + +import java.net.*; + +/** + * Adapted from https://github.com/damiendallimore/SplunkJavaLogging. + */ +final class HECTransportConfig { + private String token; + private String host = "localhost"; + private int port = 8088; + private boolean https = false; + private String path = "/services/collector"; + private int poolSize = 1; + + private String index = "main"; + private String source = "splunk_javalogging_hec"; + private String sourcetype = "splunk_javalogging_hec"; + + // data size multipliers + private static final int KB = 1024; + private static final int MB = KB * 1024; + private static final int GB = MB * 1024; + + private boolean batchMode = false; + private long maxBatchSizeBytes = 1 * MB; + private long maxBatchSizeEvents = 100; + private long maxInactiveTimeBeforeBatchFlush = 1000; + + void setUrl(String url) { + final URL u; + try { + u = new URL(url); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Malformed URL " + url, e); + } + setHttps(u.getProtocol().equals("https")); + setHost(u.getHost()); + setPort(u.getPort()); + setPath(u.getPath()); + } + + String getToken() { + return token; + } + + void setToken(String token) { + this.token = token; + } + + String getHost() { + return host; + } + + void setHost(String host) { + this.host = host; + } + + int getPort() { + return port; + } + + void setPort(int port) { + this.port = port; + } + + boolean isHttps() { + return https; + } + + void setHttps(boolean https) { + this.https = https; + } + + String getPath() { + return path; + } + + void setPath(String path) { + this.path = path; + } + + int getPoolSize() { + return poolSize; + } + + void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } + + String getIndex() { + return index; + } + + void setIndex(String index) { + this.index = index; + } + + String getSource() { + return source; + } + + void setSource(String source) { + this.source = source; + } + + String getSourcetype() { + return sourcetype; + } + + void setSourcetype(String sourcetype) { + this.sourcetype = sourcetype; + } + + boolean isBatchMode() { + return batchMode; + } + + void setBatchMode(boolean batchMode) { + this.batchMode = batchMode; + } + + long getMaxBatchSizeBytes() { + return maxBatchSizeBytes; + } + + void setMaxBatchSizeBytes(long maxBatchSizeBytes) { + this.maxBatchSizeBytes = maxBatchSizeBytes; + } + + /** + * Set the batch size from the configured property String value. If parsing + * fails , the default of 500KB will be used. + * + * @param rawProperty In format [|[KB|MB|GB]]. + */ + void setMaxBatchSizeBytes(String rawProperty) { + int multiplier; + int factor; + + if (rawProperty.endsWith("KB")) { + multiplier = KB; + } else if (rawProperty.endsWith("MB")) { + multiplier = MB; + } else if (rawProperty.endsWith("GB")) { + multiplier = GB; + } else { + return; + } + try { + factor = Integer.parseInt(rawProperty.substring(0, rawProperty.length() - 2)); + } catch (NumberFormatException e) { + return; + } + setMaxBatchSizeBytes(factor * multiplier); + } + + long getMaxBatchSizeEvents() { + return maxBatchSizeEvents; + } + + void setMaxBatchSizeEvents(long maxBatchSizeEvents) { + this.maxBatchSizeEvents = maxBatchSizeEvents; + } + + long getMaxInactiveTimeBeforeBatchFlush() { + return maxInactiveTimeBeforeBatchFlush; + } + + void setMaxInactiveTimeBeforeBatchFlush(long maxInactiveTimeBeforeBatchFlush) { + this.maxInactiveTimeBeforeBatchFlush = maxInactiveTimeBeforeBatchFlush; + } +} diff --git a/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkHECAppender.java b/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkHECAppender.java new file mode 100644 index 0000000..58ab92d --- /dev/null +++ b/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkHECAppender.java @@ -0,0 +1,199 @@ +package com.obsidiandynamics.log4jextras.splunk; + + +import java.util.*; +import java.util.function.*; + +import org.apache.log4j.*; +import org.apache.log4j.spi.*; + + +/** + * Log4j Appender for sending events to Splunk via HEC Endpoint.

+ * + * Adapted from https://github.com/damiendallimore/SplunkJavaLogging. + */ +public final class SplunkHECAppender extends AppenderSkeleton { + // connection settings + private HECTransportConfig config = new HECTransportConfig(); + + // queuing settings + private String maxQueueSize; + private boolean dropEventsOnQueueFull = true; + + private volatile SplunkHECInput shi; + + public SplunkHECAppender() {} + + public SplunkHECAppender(Layout layout) { + this.layout = layout; + } + + private static void setPropertyConditional(String key, Consumer setter) { + final String value = System.getProperty(key); + if (value != null) setter.accept(value); + } + + /** + * Log the message. + * + * @param event The log event. + */ + @Override + protected void append(LoggingEvent event) { + try { + if (shi == null) { + setPropertyConditional("log4jextras.splunk.token", this::setToken); + setPropertyConditional("log4jextras.splunk.url", this::setUrl); + setPropertyConditional("log4jextras.splunk.index", this::setIndex); + setPropertyConditional("log4jextras.splunk.source", this::setSource); + + synchronized (this) { + if (shi == null) { + shi = new SplunkHECInput(config); + if (maxQueueSize != null) shi.setMaxQueueSize(maxQueueSize); + shi.setDropEventsOnQueueFull(dropEventsOnQueueFull); + } + } + } + } catch (Exception e) { + errorHandler + .error("Couldn't establish connection for SplunkHECAppender named '" + + this.name + "': " + Arrays.asList(e.getStackTrace())); + e.printStackTrace(System.err); + return; + } + + String formatted = layout.format(event); + + // send error stack traces to splunk + if (layout.ignoresThrowable()) { + final String[] s = event.getThrowableStrRep(); + final StringBuilder stackTrace = new StringBuilder(); + if (s != null) { + int len = s.length; + for (int i = 0; i < len; i++) { + stackTrace.append(Layout.LINE_SEP); + stackTrace.append(s[i]); + } + } + formatted += stackTrace.toString(); + } + + shi.streamEvent(formatted, event.getTimeStamp()); + } + + /** + * Clean up resources. + */ + @Override + synchronized public void close() { + closed = true; + if (shi != null) { + try { + shi.closeStream(); + shi = null; + } catch (Exception e) { + Thread.currentThread().interrupt(); + shi = null; + } + } + } + + @Override + public boolean requiresLayout() { + return true; + } + + public void setUrl(String url) { + config.setUrl(url); + } + + public String getToken() { + return config.getToken(); + } + + public void setToken(String token) { + config.setToken(token); + } + + public int getPoolSize() { + return config.getPoolSize(); + } + + public void setPoolSize(int poolSize) { + config.setPoolSize(poolSize); + } + + public String getIndex() { + return config.getIndex(); + } + + public void setIndex(String index) { + config.setIndex(index); + } + + public String getSource() { + return config.getSource(); + } + + public void setSource(String source) { + config.setSource(source); + } + + public String getSourcetype() { + return config.getSourcetype(); + } + + public void setSourcetype(String sourcetype) { + config.setSourcetype(sourcetype); + } + + public String getMaxQueueSize() { + return maxQueueSize; + } + + public void setMaxQueueSize(String maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public boolean isDropEventsOnQueueFull() { + return dropEventsOnQueueFull; + } + + public void setDropEventsOnQueueFull(boolean dropEventsOnQueueFull) { + this.dropEventsOnQueueFull = dropEventsOnQueueFull; + } + + public long getMaxBatchSizeEvents() { + return config.getMaxBatchSizeEvents(); + } + + public void setMaxBatchSizeEvents(long maxBatchSizeEvents) { + config.setMaxBatchSizeEvents(maxBatchSizeEvents); + } + + public long getMaxInactiveTimeBeforeBatchFlush() { + return config.getMaxInactiveTimeBeforeBatchFlush(); + } + + public void setMaxInactiveTimeBeforeBatchFlush(long maxInactiveTimeBeforeBatchFlush) { + config.setMaxInactiveTimeBeforeBatchFlush(maxInactiveTimeBeforeBatchFlush); + } + + public boolean isBatchMode() { + return config.isBatchMode(); + } + + public void setBatchMode(boolean batchMode) { + config.setBatchMode(batchMode); + } + + public String getMaxBatchSizeBytes() { + return String.valueOf(config.getMaxBatchSizeBytes()); + } + + public void setMaxBatchSizeBytes(String maxBatchSizeBytes) { + config.setMaxBatchSizeBytes(maxBatchSizeBytes); + } +} diff --git a/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkHECInput.java b/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkHECInput.java new file mode 100644 index 0000000..63be8d0 --- /dev/null +++ b/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkHECInput.java @@ -0,0 +1,264 @@ +package com.obsidiandynamics.log4jextras.splunk; + + +import java.net.*; +import java.util.*; + +import javax.net.ssl.*; + +import org.apache.http.*; +import org.apache.http.client.methods.*; +import org.apache.http.client.utils.*; +import org.apache.http.concurrent.*; +import org.apache.http.config.*; +import org.apache.http.conn.routing.*; +import org.apache.http.entity.*; +import org.apache.http.impl.nio.client.*; +import org.apache.http.impl.nio.conn.*; +import org.apache.http.impl.nio.reactor.*; +import org.apache.http.nio.conn.*; +import org.apache.http.nio.conn.ssl.*; +import org.apache.http.nio.reactor.*; +import org.apache.http.ssl.*; +import org.apache.http.util.*; + +/** + * Common HEC logic shared by all appenders/handlers.

+ * + * Adapted from https://github.com/damiendallimore/SplunkJavaLogging. + */ +final class SplunkHECInput extends SplunkInput { + private static final boolean DEBUG = false; + + // connection props + private final HECTransportConfig config; + + // batch buffer + private final List batchBuffer; + private long currentBatchSizeBytes = 0; + private long lastEventReceivedTime; + + private final CloseableHttpAsyncClient httpClient; + private final URI uri; + + private final Object lock = new Object(); + + private static final HostnameVerifier HOSTNAME_VERIFIER = (s, sslSession) -> true; + + SplunkHECInput(HECTransportConfig config) throws Exception { + this.config = config; + + this.batchBuffer = new LinkedList<>(); + this.lastEventReceivedTime = System.currentTimeMillis(); + + final Registry sslSessionStrategy = RegistryBuilder + .create() + .register("http", NoopIOSessionStrategy.INSTANCE) + .register("https", + new SSLIOSessionStrategy(getSSLContext(), + HOSTNAME_VERIFIER)).build(); + + final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(); + final PoolingNHttpClientConnectionManager cm = new PoolingNHttpClientConnectionManager(ioReactor, sslSessionStrategy); + cm.setMaxTotal(config.getPoolSize()); + cm.setDefaultMaxPerRoute(config.getPoolSize()); + + final HttpHost splunk = new HttpHost(config.getHost(), config.getPort()); + cm.setMaxPerRoute(new HttpRoute(splunk), config.getPoolSize()); + + httpClient = HttpAsyncClients.custom().setConnectionManager(cm).build(); + + uri = new URIBuilder().setScheme(config.isHttps() ? "https" : "http") + .setHost(config.getHost()).setPort(config.getPort()) + .setPath(config.getPath()).build(); + + openStream(); + + if (config.isBatchMode()) { + new BatchBufferActivityCheckerThread().start(); + } + } + + private final class BatchBufferActivityCheckerThread extends Thread { + BatchBufferActivityCheckerThread() { + super(BatchBufferActivityCheckerThread.class.getSimpleName()); + setDaemon(true); + } + + @Override + public void run() { + while (true) { + String currentMessage = ""; + try { + long currentTime = System.currentTimeMillis(); + synchronized (lock) { + if ((currentTime - lastEventReceivedTime) >= config + .getMaxInactiveTimeBeforeBatchFlush()) { + if (batchBuffer.size() > 0) { + currentMessage = rollOutBatchBuffer(); + batchBuffer.clear(); + currentBatchSizeBytes = 0; + hecPost(currentMessage); + } + } + } + + Thread.sleep(1000); + } catch (Exception e) { + System.err.println("Splunk: handling batch: " + e); + e.printStackTrace(System.err); + + synchronized (lock) { + // something went wrong, put message on the queue for retry + enqueueAndReopen(currentMessage); + } + } + } + } + } + + private static SSLContext getSSLContext() { + try { + return SSLContexts.custom().loadTrustMaterial(null, (certificate, authType) -> true).build(); + } catch (Exception e) { + System.err.println("Splunk: error constructing SSL context: " + e); + e.printStackTrace(System.err); + return null; + } + } + + private void openStream() throws Exception { + httpClient.start(); + } + + void closeStream() { + try { + httpClient.close(); + } catch (Exception e) { + System.err.println("Splunk: error closing stream: " + e); + e.printStackTrace(System.err); + } + } + + private String escapeAndQuote(final String message) { + final String trimmed = message.substring(0, message.length() - 1); + return "\"" + trimmed.replace("\\", "\\\\").replace("\"", "\\\"") + "\""; + } + + /** + * Send an event via stream. + * + * @param message The message to send. + * @param timestamp The time of the original event. + */ + void streamEvent(String message, long timestamp) { + String currentMessage = ""; + try { + final String escaped = escapeAndQuote(message); + + // hand-building of JSON for speed + final StringBuilder json = new StringBuilder(); + json.append("{\"") + .append("time\":").append(timestamp).append(",\"") + .append("event\":").append(escaped).append(",\"") + .append("index\":\"").append(config.getIndex()) + .append("\",\"").append("source\":\"") + .append(config.getSource()).append("\",\"") + .append("sourcetype\":\"").append(config.getSourcetype()) + .append("\"").append("}"); + + currentMessage = json.toString(); + + synchronized (lock) { + if (config.isBatchMode()) { + lastEventReceivedTime = timestamp; + currentBatchSizeBytes += currentMessage.length(); + batchBuffer.add(currentMessage); + if (flushBuffer()) { + currentMessage = rollOutBatchBuffer(); + batchBuffer.clear(); + currentBatchSizeBytes = 0; + hecPost(currentMessage); + } + } else { + hecPost(currentMessage); + } + + // flush the queue + while (queueContainsEvents()) { + final String messageOffQueue = dequeue(); + currentMessage = messageOffQueue; + hecPost(currentMessage); + } + } + } catch (Exception e) { + System.err.println("Splunk: error streaming event: " + e); + e.printStackTrace(System.err); + + synchronized (lock) { + // something went wrong, put message on the queue for retry + enqueueAndReopen(message); + } + } + } + + private void enqueueAndReopen(String message) { + enqueue(message); + try { + closeStream(); + } catch (Exception e) { + System.err.println("Splunk: error closing stream: " + e); + e.printStackTrace(System.err); + } + + try { + openStream(); + } catch (Exception e) { + System.err.println("Splunk: error opening stream: " + e); + e.printStackTrace(System.err); + } + } + + private boolean flushBuffer() { + return (currentBatchSizeBytes >= config.getMaxBatchSizeBytes()) + || (batchBuffer.size() >= config.getMaxBatchSizeEvents()); + } + + private String rollOutBatchBuffer() { + final StringBuilder sb = new StringBuilder(); + for (String event : batchBuffer) { + sb.append(event); + } + return sb.toString(); + } + + private void hecPost(String payload) throws Exception { + final HttpPost post = new HttpPost(uri); + post.addHeader("Authorization", "Splunk " + config.getToken()); + + final StringEntity requestEntity = new StringEntity(payload, ContentType.APPLICATION_JSON); + if (DEBUG) System.out.println("Sending " + payload); + post.setEntity(requestEntity); + httpClient.execute(post, new FutureCallback() { + @Override public void completed(HttpResponse response) { + if (DEBUG) System.out.println("Completed " + payload); + if (response.getStatusLine().getStatusCode() >= 300) { + System.err.println("Splunk: error sending request '" + payload + "'"); + System.err.println(response.getStatusLine()); + try { + System.err.println(EntityUtils.toString(response.getEntity())); + } catch (Exception e) { + e.printStackTrace(System.err); + } + } + } + + @Override public void failed(Exception ex) { + System.err.println("Splunk: error sending request '" + payload + "'"); + ex.printStackTrace(System.err); + } + + @Override public void cancelled() {} + }); + } +} \ No newline at end of file diff --git a/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkInput.java b/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkInput.java new file mode 100644 index 0000000..1d911d2 --- /dev/null +++ b/splunk/src/main/java/com/obsidiandynamics/log4jextras/splunk/SplunkInput.java @@ -0,0 +1,140 @@ +package com.obsidiandynamics.log4jextras.splunk; + + +import java.util.ArrayList; +import java.util.List; + +/** + * Common base class for all Splunk Input types. Currently just has shared logic + * for queuing up events.

+ * + * Adapted from https://github.com/damiendallimore/SplunkJavaLogging. + */ +abstract class SplunkInput { + // data size multipliers + private static final int KB = 1024; + private static final int MB = KB * 1024; + private static final int GB = MB * 1024; + + private long maxQueueSize = 500 * KB; + + // If true, queue will get emptied when it fills up to accommodate new data. + private boolean dropEventsOnQueueFull = true; + + // Using this collection structure to implement the FIFO retry queue. + private List queue = new ArrayList(); + + private long currentQueueSizeInBytes = 0; + + /** + * Add an event to the tail of the FIFO queue subject to there being + * capacity. + * + * @param event The event to enqueue. + */ + protected void enqueue(String event) { + long eventSize = event.getBytes().length; + + if (queueHasCapacity(eventSize)) { + queue.add(event); + currentQueueSizeInBytes += eventSize; + } else if (dropEventsOnQueueFull) { + queue.clear(); + queue.add(event); + currentQueueSizeInBytes = eventSize; + } else { + // bummer, queue is full + } + } + + /** + * True if the queue has capacity for adding an event of the given size. + * + * @param eventSize + * @return + */ + private boolean queueHasCapacity(long eventSize) { + return (currentQueueSizeInBytes + eventSize) <= maxQueueSize; + } + + /** + * True if there are pending events in the queue. + * + * @return + */ + protected boolean queueContainsEvents() { + return ! queue.isEmpty(); + } + + /** + * Remove an event from the head of the FIFO queue or null if there are no + * items in the queue. + * + * @return + */ + protected String dequeue() { + if (queueContainsEvents()) { + final String event = queue.remove(0); + currentQueueSizeInBytes -= event.getBytes().length; + if (currentQueueSizeInBytes < 0) { + currentQueueSizeInBytes = 0; + } + return event; + } + return null; + } + + /** + * Set the queue size from the configured property String value. If parsing + * fails, the default will be used. + * + * @param rawProperty In format [|[KB|MB|GB]] + */ + public void setMaxQueueSize(String rawProperty) { + int multiplier; + int factor; + + if (rawProperty.endsWith("KB")) { + multiplier = KB; + } else if (rawProperty.endsWith("MB")) { + multiplier = MB; + } else if (rawProperty.endsWith("GB")) { + multiplier = GB; + } else { + return; + } + try { + factor = Integer.parseInt(rawProperty.substring(0, rawProperty.length() - 2)); + } catch (NumberFormatException e) { + return; + } + setMaxQueueSize(factor * multiplier); + + } + + public long getMaxQueueSize() { + return maxQueueSize; + } + + /** + * Max queue size in bytes. + * + * @param maxQueueSize + */ + public void setMaxQueueSize(long maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public boolean isDropEventsOnQueueFull() { + return dropEventsOnQueueFull; + } + + /** + * If true,queue will get emptied when it fills up to accommodate new data. + * + * @param dropEventsOnQueueFull + */ + public void setDropEventsOnQueueFull(boolean dropEventsOnQueueFull) { + this.dropEventsOnQueueFull = dropEventsOnQueueFull; + } +} \ No newline at end of file diff --git a/src/test/java/com/obsidiandynamics/log4jextras/TestAppender.java b/src/test/java/com/obsidiandynamics/log4jextras/TestAppender.java new file mode 100644 index 0000000..1f661fc --- /dev/null +++ b/src/test/java/com/obsidiandynamics/log4jextras/TestAppender.java @@ -0,0 +1,20 @@ +package com.obsidiandynamics.log4jextras; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStreamWriter; +import org.apache.log4j.Layout; +import org.apache.log4j.WriterAppender; + +public final class TestAppender extends WriterAppender { + public static final ByteArrayOutputStream baos = new ByteArrayOutputStream(16 * 1024); + public static final OutputStreamWriter w = new OutputStreamWriter(baos); + + public TestAppender() { + setWriter(w); + } + + public TestAppender(Layout layout) { + setWriter(w); + setLayout(layout); + } +} \ No newline at end of file