From 98d5bf332903be8ae155a8ea5ad2e81816bfefd6 Mon Sep 17 00:00:00 2001 From: rfoltyns Date: Thu, 15 Sep 2022 22:34:51 +0100 Subject: [PATCH] Add DataStream support (#62) * Add DataStream to ElasticsearchOperationFactory * Add DataStreamItem - batch item * Add DataStreamBatchRequest - batch * Add ElasticsearchDataStreamAPI - builders and serializers * Add ElasticsearchDataStreamAPIPlugin - Log4j2 config --- log4j2-elasticsearch-core/README.md | 17 +- .../log4j2/elasticsearch/DataStream.java | 88 +++++ .../elasticsearch/DataStreamPlugin.java | 69 ++++ .../log4j2/elasticsearch/ILMPolicy.java | 20 +- .../log4j2/elasticsearch/ILMPolicyPlugin.java | 17 +- .../log4j2/elasticsearch/IndexTemplate.java | 3 +- .../json/jackson/LogEventDataStreamMixIn.java | 26 ++ .../json/jackson/LogEventTimestampMixIn.java | 30 ++ .../elasticsearch/DataStreamPluginTest.java | 67 ++++ .../log4j2/elasticsearch/DataStreamTest.java | 66 ++++ .../elasticsearch/ILMPolicyPluginTest.java | 29 +- .../log4j2/elasticsearch/ILMPolicyTest.java | 44 +++ log4j2-elasticsearch-hc/README.md | 27 ++ .../elasticsearch/hc/CheckDataStream.java | 78 +++++ .../elasticsearch/hc/CreateDataStream.java | 89 +++++ .../hc/DataStreamBatchRequest.java | 89 +++++ .../elasticsearch/hc/DataStreamItem.java | 52 +++ .../elasticsearch/hc/DataStreamItemMixIn.java | 43 +++ .../hc/DataStreamItemResultMixIn.java | 45 +++ .../elasticsearch/hc/DataStreamSetupOp.java | 61 ++++ .../hc/ElasticsearchDataStreamAPI.java | 102 ++++++ .../hc/ElasticsearchDataStreamAPIPlugin.java | 116 +++++++ .../hc/ElasticsearchOperationFactory.java | 14 +- .../log4j2/elasticsearch/hc/HCHttpPlugin.java | 3 + .../elasticsearch/hc/ILMPolicySetupOp.java | 32 +- .../log4j2/elasticsearch/hc/IndexRequest.java | 2 +- .../elasticsearch/hc/CheckDataStreamTest.java | 177 ++++++++++ .../hc/CreateDataStreamTest.java | 186 +++++++++++ .../hc/DataStreamBatchRequestTest.java | 314 ++++++++++++++++++ .../elasticsearch/hc/DataStreamItemTest.java | 187 +++++++++++ .../hc/DataStreamRequestTest.java | 171 ++++++++++ .../ElasticsearchDataStreamAPIPluginTest.java | 103 ++++++ .../hc/ElasticsearchDataStreamAPITest.java | 127 +++++++ .../hc/ElasticsearchOperationFactoryTest.java | 80 +++++ .../FallbackDataStreamBatchRequestTest.java | 29 ++ .../hc/JCToolsDataStreamBatchRequestTest.java | 29 ++ .../elasticsearch/hc/smoke/SmokeTest.java | 106 +++++- ...ponentTemplate-7-mappings-data-stream.json | 28 ++ ...composableIndexTemplate-7-data-stream.json | 12 + 39 files changed, 2734 insertions(+), 44 deletions(-) create mode 100644 log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/DataStream.java create mode 100644 log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/DataStreamPlugin.java create mode 100644 log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/json/jackson/LogEventDataStreamMixIn.java create mode 100644 log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/json/jackson/LogEventTimestampMixIn.java create mode 100644 log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/DataStreamPluginTest.java create mode 100644 log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/DataStreamTest.java create mode 100644 log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/ILMPolicyTest.java create mode 100644 log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/CheckDataStream.java create mode 100644 log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/CreateDataStream.java create mode 100644 log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamBatchRequest.java create mode 100644 log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItem.java create mode 100644 log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemMixIn.java create mode 100644 log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemResultMixIn.java create mode 100644 log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamSetupOp.java create mode 100644 log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPI.java create mode 100644 log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPIPlugin.java create mode 100644 log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/CheckDataStreamTest.java create mode 100644 log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/CreateDataStreamTest.java create mode 100644 log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamBatchRequestTest.java create mode 100644 log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemTest.java create mode 100644 log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamRequestTest.java create mode 100644 log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPIPluginTest.java create mode 100644 log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPITest.java create mode 100644 log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/FallbackDataStreamBatchRequestTest.java create mode 100644 log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/JCToolsDataStreamBatchRequestTest.java create mode 100644 log4j2-elasticsearch-hc/src/test/resources/componentTemplate-7-mappings-data-stream.json create mode 100644 log4j2-elasticsearch-hc/src/test/resources/composableIndexTemplate-7-data-stream.json diff --git a/log4j2-elasticsearch-core/README.md b/log4j2-elasticsearch-core/README.md index 4f437ceb..5ee4af99 100644 --- a/log4j2-elasticsearch-core/README.md +++ b/log4j2-elasticsearch-core/README.md @@ -216,6 +216,15 @@ NOTE: Be aware that template parsing errors on cluster side MAY NOT prevent plug ### Index lifecycle management Since 1.5, [ILM Policy](https://www.elastic.co/guide/en/elasticsearch/reference/7.x/index-lifecycle-management.html) can be created during appender startup. Policy can be loaded from specified file or defined directly in the XML config: +| Config property | Type | Required | Default | Description | +|----------------------|-----------|-----------------------------------------------------------------------|---------|-----------------------------------------------------------------------------------------------------------------------| +| name | Attribute | yes | None | ILM Policy resource name | +| createBootstrapIndex | Attribute | no | `true` | If `true`, bootstrap index will be created and set as write index unless an index with the same `name` already exists | +| rolloverAlias | Attribute | yes, if `createBootstrapIndex` is `true` | None | Rollover alias | +| path | Attribute | yes, if document not defined with `sourceString` (see examples below) | None | Path to policy document. E.g. `classpath:ilm-policy.json` or `/ilm-policy.json` | + +If used with [DataStream](#data-stream), `createBootstrapIndex` MUST be false. + ```xml @@ -228,7 +237,7 @@ Since 1.5, [ILM Policy](https://www.elastic.co/guide/en/elasticsearch/reference/ ``` -or +or with `sourceString` ```xml @@ -252,6 +261,12 @@ NOTE: This feature is supported by [log4j2-elasticsearch-jest](https://github.co NOTE: Be aware that policy parsing errors on cluster side MAY NOT prevent plugin from loading - error is logged on client side and startup continues. +### Data streams support + +Since 1.6, [Data streams](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html) are supported with `DataStream` setup operation in several modules. + +See submodules documentation to check support. + ### Message output There are numerous ways to generate JSON output: diff --git a/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/DataStream.java b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/DataStream.java new file mode 100644 index 00000000..7e5c4ef8 --- /dev/null +++ b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/DataStream.java @@ -0,0 +1,88 @@ +package org.appenders.log4j2.elasticsearch; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +public class DataStream implements OpSource { + + public static final String TYPE_NAME = "DataStream"; + private static final String EMPTY_SOURCE = ""; + + private final String name; + + /** + * @param name Data stream name + */ + public DataStream(final String name) { + this.name = name; + } + + /** + * @return Data stream name + */ + public String getName() { + return name; + } + + /** + * @return {@link #TYPE_NAME} + */ + @Override + public String getType() { + return TYPE_NAME; + } + + /** + * @return Empty String {@code ""} + */ + @Override + public String getSource() { + return EMPTY_SOURCE; + } + + public static class Builder { + + protected String name; + + public DataStream build() { + validate(); + return new DataStream(name); + } + + void validate() { + + if (name == null) { + throw new IllegalArgumentException("No name provided for " + DataStream.class.getSimpleName()); + } + + } + + /** + * @param name Data stream name + * @return this + */ + public Builder withName(String name) { + this.name = name; + return this; + } + + } + +} diff --git a/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/DataStreamPlugin.java b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/DataStreamPlugin.java new file mode 100644 index 00000000..0b39ab4c --- /dev/null +++ b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/DataStreamPlugin.java @@ -0,0 +1,69 @@ +package org.appenders.log4j2.elasticsearch; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.logging.log4j.core.config.ConfigurationException; +import org.apache.logging.log4j.core.config.Node; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; + + +@Plugin(name = DataStreamPlugin.PLUGIN_NAME, category = Node.CATEGORY, elementType = "setupOperation", printObject = true) +public class DataStreamPlugin extends DataStream { + + public static final String PLUGIN_NAME = "DataStream"; + + /** + * @param name Data stream name + */ + protected DataStreamPlugin(final String name) { + super(name); + } + + @PluginBuilderFactory + public static DataStreamPlugin.Builder newBuilder() { + return new Builder(); + } + + public static class Builder implements org.apache.logging.log4j.core.util.Builder { + + @PluginBuilderAttribute + private String name; + + @Override + public DataStreamPlugin build() { + + if (name == null) { + throw new ConfigurationException("No name provided for " + PLUGIN_NAME); + } + + return new DataStreamPlugin(name); + } + + public Builder withName(String name) { + this.name = name; + return this; + } + + } + +} diff --git a/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/ILMPolicy.java b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/ILMPolicy.java index cc2e4035..d26329ec 100644 --- a/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/ILMPolicy.java +++ b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/ILMPolicy.java @@ -29,16 +29,30 @@ public class ILMPolicy implements OpSource { private final String name; private final String rolloverAlias; + private final boolean createBootstrapIndex; private final String source; /** * @param name ILM policy name * @param rolloverAlias index rollover alias * @param source ILM policy document + * @deprecated As of 1.7, this method will be removed. Use {@link #ILMPolicy(String, String, boolean, String)} instead. */ - public ILMPolicy(String name, String rolloverAlias, String source) { + @Deprecated + public ILMPolicy(final String name, final String rolloverAlias, final String source) { + this(name, rolloverAlias, true, source); + } + + /** + * @param name ILM policy name + * @param rolloverAlias index rollover alias + * @param createBootstrapIndex should bootstrap index be created or not + * @param source ILM policy document + */ + public ILMPolicy(final String name, final String rolloverAlias, final boolean createBootstrapIndex, final String source) { this.name = name; this.rolloverAlias = rolloverAlias; + this.createBootstrapIndex = createBootstrapIndex; this.source = source; } @@ -64,6 +78,10 @@ public String getRolloverAlias() { return this.rolloverAlias; } + public boolean isCreateBootstrapIndex() { + return createBootstrapIndex; + } + /** * @return ILM policy document */ diff --git a/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/ILMPolicyPlugin.java b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/ILMPolicyPlugin.java index 3bb5f19d..ff9437b6 100644 --- a/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/ILMPolicyPlugin.java +++ b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/ILMPolicyPlugin.java @@ -41,8 +41,8 @@ public class ILMPolicyPlugin extends ILMPolicy { /** * {@inheritDoc} */ - protected ILMPolicyPlugin(String policyName, String rolloverAlias, String source) { - super(policyName, rolloverAlias, source); + protected ILMPolicyPlugin(String policyName, String rolloverAlias, boolean createBootstrapIndex, String source) { + super(policyName, rolloverAlias, createBootstrapIndex, source); } @PluginBuilderFactory @@ -57,9 +57,11 @@ public static class Builder implements org.apache.logging.log4j.core.util.Builde private String name; @PluginAttribute("rolloverAlias") - @Required private String rolloverAlias; + @PluginAttribute("createBootstrapIndex") + private boolean createBootstrapIndex = true; + @PluginAttribute("path") private String path; @@ -72,7 +74,7 @@ public ILMPolicyPlugin build() { throw new ConfigurationException("No name provided for " + PLUGIN_NAME); } - if (rolloverAlias == null) { + if (createBootstrapIndex && rolloverAlias == null) { throw new ConfigurationException("No rolloverAlias provided for " + PLUGIN_NAME); } @@ -82,7 +84,7 @@ public ILMPolicyPlugin build() { throw new ConfigurationException("Either path or source must to be provided for " + PLUGIN_NAME); } - return new ILMPolicyPlugin(name, rolloverAlias, loadSource()); + return new ILMPolicyPlugin(name, rolloverAlias, createBootstrapIndex, loadSource()); } private String loadSource() { @@ -124,6 +126,11 @@ public Builder withRolloverAlias(String rolloverAlias) { return this; } + public Builder withCreateBootstrapIndex(boolean createBootstrapIndex) { + this.createBootstrapIndex = createBootstrapIndex; + return this; + } + /** * @param source ILM policy document. MAY contain placeholders resolvable by {@link ValueResolver} * @return this diff --git a/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/IndexTemplate.java b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/IndexTemplate.java index f35858c5..5c168987 100644 --- a/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/IndexTemplate.java +++ b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/IndexTemplate.java @@ -23,6 +23,7 @@ /** * Index template definition. Supports both composable index templates and templates deprecated in 7.8. * Set {@link #apiVersion} to 8 to indicate that this template is composable + * Set {@link #apiVersion} to 7 to use legacy Index Template API * * See Composable index templates * and Deprecated index templates @@ -30,7 +31,7 @@ public class IndexTemplate implements OpSource { public static final String TYPE_NAME = "IndexTemplate"; - public static final int DEFAULT_API_VERSION = 7; + public static final int DEFAULT_API_VERSION = 8; private final int apiVersion; private final String name; diff --git a/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/json/jackson/LogEventDataStreamMixIn.java b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/json/jackson/LogEventDataStreamMixIn.java new file mode 100644 index 00000000..91cc0ea3 --- /dev/null +++ b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/json/jackson/LogEventDataStreamMixIn.java @@ -0,0 +1,26 @@ +package org.appenders.log4j2.elasticsearch.json.jackson; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +public abstract class LogEventDataStreamMixIn extends ExtendedLogEventJacksonJsonMixIn implements LogEventTimestampMixIn { + +} + diff --git a/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/json/jackson/LogEventTimestampMixIn.java b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/json/jackson/LogEventTimestampMixIn.java new file mode 100644 index 00000000..7d22cef8 --- /dev/null +++ b/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/json/jackson/LogEventTimestampMixIn.java @@ -0,0 +1,30 @@ +package org.appenders.log4j2.elasticsearch.json.jackson; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.fasterxml.jackson.annotation.JsonProperty; + +public interface LogEventTimestampMixIn { + + @JsonProperty("@timestamp") + long getTimeMillis(); + +} diff --git a/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/DataStreamPluginTest.java b/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/DataStreamPluginTest.java new file mode 100644 index 00000000..1de1a3a0 --- /dev/null +++ b/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/DataStreamPluginTest.java @@ -0,0 +1,67 @@ +package org.appenders.log4j2.elasticsearch; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.logging.log4j.core.config.ConfigurationException; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class DataStreamPluginTest { + + public static final String TEST_DATA_STREAM_NAME = "testDataStream"; + + private static DataStreamPlugin createTestDataStream(final String name) { + return DataStreamPlugin.newBuilder() + .withName(name) + .build(); + } + + @Test + public void buildsWithName() { + + // when + final DataStreamPlugin plugin = createTestDataStream(TEST_DATA_STREAM_NAME); + + // then + assertNotNull(plugin); + assertNotNull(plugin.getName()); + assertNotNull(plugin.getSource()); + assertEquals(DataStreamPlugin.TYPE_NAME, plugin.getType()); + + } + + @Test + public void builderThrowsWhenNameIsNotSet() { + + // when + final ConfigurationException exception = assertThrows(ConfigurationException.class, () -> createTestDataStream(null)); + + // then + assertThat(exception.getMessage(), containsString("No name provided for " + DataStream.class.getSimpleName())); + + } + +} diff --git a/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/DataStreamTest.java b/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/DataStreamTest.java new file mode 100644 index 00000000..a15f6ff8 --- /dev/null +++ b/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/DataStreamTest.java @@ -0,0 +1,66 @@ +package org.appenders.log4j2.elasticsearch; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class DataStreamTest { + + public static final String TEST_DATA_STREAM_NAME = "testDataStream"; + + private DataStream createTestDataStream(final String name) { + return new DataStream.Builder() + .withName(name) + .build(); + } + + @Test + public void buildsWithName() { + + // when + final DataStream plugin = createTestDataStream(TEST_DATA_STREAM_NAME); + + // then + assertNotNull(plugin); + assertNotNull(plugin.getName()); + assertNotNull(plugin.getSource()); + assertEquals(DataStream.TYPE_NAME, plugin.getType()); + + } + + @Test + public void builderThrowsWhenNameIsNotSet() { + + // when + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> createTestDataStream(null)); + + // then + assertThat(exception.getMessage(), containsString("No name provided for " + DataStream.class.getSimpleName())); + + } + +} diff --git a/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/ILMPolicyPluginTest.java b/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/ILMPolicyPluginTest.java index 9ee5f037..f70017ff 100644 --- a/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/ILMPolicyPluginTest.java +++ b/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/ILMPolicyPluginTest.java @@ -29,8 +29,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ILMPolicyPluginTest { @@ -48,12 +50,13 @@ public static ILMPolicyPlugin.Builder createTestILMPolicyPluginBuilder() { } @Test - public void startsWhenSetupCorrectlyWithNameAndPathAndRolloverAlias() { + public void buildsWhenSetupCorrectlyWithNameAndPathAndRolloverAlias() { // given final ILMPolicyPlugin.Builder builder = createTestILMPolicyPluginBuilder(); builder.withName(TEST_ILM_POLICY_NAME) .withPath(TEST_PATH) + .withCreateBootstrapIndex(true) .withRolloverAlias(TEST_ROLLOVER_ALIAS); // when @@ -65,15 +68,18 @@ public void startsWhenSetupCorrectlyWithNameAndPathAndRolloverAlias() { assertNotNull(ilmPolicyPlugin.getSource()); assertEquals(ILMPolicy.TYPE_NAME, ilmPolicyPlugin.getType()); assertEquals(TEST_ROLLOVER_ALIAS, ilmPolicyPlugin.getRolloverAlias()); + assertTrue(ilmPolicyPlugin.isCreateBootstrapIndex()); + } @Test - public void startsWhenSetupCorrectlyWithNameAndSourceAndRolloverAlias() { + public void buildsWhenSetupCorrectlyWithNameAndSourceAndRolloverAlias() { // given final ILMPolicyPlugin.Builder builder = createTestILMPolicyPluginBuilder(); builder.withName(TEST_ILM_POLICY_NAME) .withRolloverAlias(TEST_ROLLOVER_ALIAS) + .withCreateBootstrapIndex(false) .withPath(null) .withSource(TEST_SOURCE); @@ -84,6 +90,25 @@ public void startsWhenSetupCorrectlyWithNameAndSourceAndRolloverAlias() { assertNotNull(ilmPolicyPlugin); assertNotNull(ilmPolicyPlugin.getName()); assertNotNull(ilmPolicyPlugin.getSource()); + assertFalse(ilmPolicyPlugin.isCreateBootstrapIndex()); + + } + + @Test + public void createsBootstrapIndexByDefault() { + + // given + final ILMPolicyPlugin.Builder builder = ILMPolicyPlugin.newBuilder() + .withName(TEST_ILM_POLICY_NAME) + .withSource(TEST_SOURCE) + .withRolloverAlias(TEST_ROLLOVER_ALIAS); + + // when + final ILMPolicy policy = builder.build(); + + // then + assertTrue(policy.isCreateBootstrapIndex()); + } @Test diff --git a/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/ILMPolicyTest.java b/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/ILMPolicyTest.java new file mode 100644 index 00000000..aa56c8ca --- /dev/null +++ b/log4j2-elasticsearch-core/src/test/java/org/appenders/log4j2/elasticsearch/ILMPolicyTest.java @@ -0,0 +1,44 @@ +package org.appenders.log4j2.elasticsearch; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ILMPolicyTest { + + private static final String TEST_ILM_POLICY_NAME = "test-ilm-policy"; + private static final String TEST_SOURCE = "{}"; + private static final String TEST_ROLLOVER_ALIAS = "test-rollover-alias"; + + @Test + public void deprecatedConstructorCreatesBootstrapIndex() { + + // when + final ILMPolicy policy = new ILMPolicy(TEST_ILM_POLICY_NAME, TEST_ROLLOVER_ALIAS, TEST_SOURCE); + + // then + assertTrue(policy.isCreateBootstrapIndex()); + + } + +} diff --git a/log4j2-elasticsearch-hc/README.md b/log4j2-elasticsearch-hc/README.md index 15165329..f70513f5 100644 --- a/log4j2-elasticsearch-hc/README.md +++ b/log4j2-elasticsearch-hc/README.md @@ -147,6 +147,33 @@ Configures builders and serializers for: |-------------|-----------|----------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | mappingType | Attribute | no | `null` since 1.6 | Name of index mapping type to use. Applicable to Elasticsearch <8.x. See [removal of types](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/removal-of-types.html). | +### ElasticsearchDataStream + +Since 1.6, [Data streams](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html) are supported with `DataStream` setup operation. + +Configures `BatchOperations`-level builders and serializers for: +* [DataStreamBatchRequest](https://github.com/rfoltyns/log4j2-elasticsearch/blob/master/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamBatchRequest.java) - `//_bulk` request (batch) +* [DataStreamItem](https://github.com/rfoltyns/log4j2-elasticsearch/blob/master/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItem.java) - document (batch item) + +If `ILMPolicy` is used, `ILMPolicy.createBootstrapIndex` MUST be set to `false`. This behaviour will be changed in future releases - bootstrap index will be created separately, similar to other setup operations. + +With `JacksonJsonLayout`, use [LogEventDataStreamMixIn](https://github.com/rfoltyns/log4j2-elasticsearch/blob/master/log4j2-elasticsearch-core/src/main/java/org/appenders/log4j2/elasticsearch/json/jackson/LogEventDataStreamMixIn.java) or equivalent to serialize `LogEvent.timeMillis` as `@timestamp`. + +```xml + + + + + + + + + + + +``` + ### Programmatic config See [programmatc config example](https://github.com/rfoltyns/log4j2-elasticsearch/blob/master/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/smoke/SmokeTest.java). diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/CheckDataStream.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/CheckDataStream.java new file mode 100644 index 00000000..95d63e1d --- /dev/null +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/CheckDataStream.java @@ -0,0 +1,78 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.appenders.log4j2.elasticsearch.Result; +import org.appenders.log4j2.elasticsearch.SetupStep; + +import static org.appenders.core.logging.InternalLogging.getLogger; + +/** + * Checks if data stream exists + */ +public class CheckDataStream extends SetupStep { + + protected final String name; + + public CheckDataStream(String name) { + this.name = name; + } + + /** + * @param response client response + * @return {@link Result#SUCCESS} if data stream does NOT exist, + * {@link Result#SKIP} if data stream exists, + * {@link Result#FAILURE} otherwise + */ + @Override + public Result onResponse(Response response) { + + if (response.getResponseCode() == 404) { + + getLogger().info("{}: Data stream {} does not exist", + getClass().getSimpleName(), name); + + return Result.SUCCESS; + + } else if (response.getResponseCode() == 200) { + + getLogger().info("{}: Data stream {} already exists", + getClass().getSimpleName(), name); + + return Result.SKIP; + + } else { + + getLogger().error("{}: Unable to determine if {} data stream already exists", + getClass().getSimpleName(), name); + + return Result.FAILURE; + + } + + } + + @Override + public Request createRequest() { + return new GenericRequest("GET", "_data_stream/" + name, null); + } + +} diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/CreateDataStream.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/CreateDataStream.java new file mode 100644 index 00000000..2b2ce178 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/CreateDataStream.java @@ -0,0 +1,89 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.appenders.log4j2.elasticsearch.ItemSource; +import org.appenders.log4j2.elasticsearch.Result; +import org.appenders.log4j2.elasticsearch.SetupContext; +import org.appenders.log4j2.elasticsearch.SetupStep; + +import static org.appenders.core.logging.InternalLogging.getLogger; + +/** + * Creates data stream + */ +public class CreateDataStream extends SetupStep { + + protected final String name; + protected final ItemSource itemSource; + + @SuppressWarnings("unchecked") + public CreateDataStream(final String name, final ItemSource itemSource) { + this.name = name; + this.itemSource = itemSource; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean shouldProcess(SetupContext setupContext) { + + final boolean shouldExecute = Result.SUCCESS.equals(setupContext.getLatestResult()); + + if (!shouldExecute) { + getLogger().info("{}: Skipping data stream creation", + getClass().getSimpleName()); + } + + return shouldExecute; + + } + + /** + * @param response client response + * @return {@link Result#SUCCESS} if data stream was created, {@link Result#FAILURE} otherwise + */ + @Override + public Result onResponse(Response response) { + + if (response.getResponseCode() == 200) { + + getLogger().info("{}: Data stream {} created", + getClass().getSimpleName(), name); + + return Result.SUCCESS; + + } + + getLogger().error("{}: Unable to create data stream: {}", + getClass().getSimpleName(), response.getErrorMessage()); + + return Result.FAILURE; + + } + + @Override + public Request createRequest() { + return new GenericRequest("PUT", "_data_stream/" + name, itemSource); + } + +} diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamBatchRequest.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamBatchRequest.java new file mode 100644 index 00000000..944e4b1c --- /dev/null +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamBatchRequest.java @@ -0,0 +1,89 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Collection; + +/** + * {@link org.appenders.log4j2.elasticsearch.ByteBufItemSource}-backed /_bulk request. + * Allows to index multiple {@link IndexRequest} documents + * in a single request. + */ +public class DataStreamBatchRequest extends BatchRequest { + + private IndexRequest first; + + protected DataStreamBatchRequest(Builder builder) { + super(builder); + } + + /** + * Checks if all items in given collection are equal + * ({@link IndexRequest#index} and {@link IndexRequest#type} are the same for all elements) + * + * @param indexRequests collection of items to be checked + * @return {@link IndexRequest} first action in given collection if all items are equal, null otherwise + */ + IndexRequest uniformAction(Collection indexRequests) { + + for (IndexRequest indexRequest : indexRequests) { + + if (first == null) { + first = indexRequest; + continue; + } + + final boolean sameIndex = first.sameIndex(indexRequest); + + if (!sameIndex) { + // fail fast and serialize each item + throw new IllegalArgumentException("Items for different indices found: " + first.getIndex() + " != " + indexRequest.getIndex()); + } + + } + + return first; + + } + + @Override + public String getURI() { + return uniformAction(indexRequests).getIndex() + "/_bulk"; + } + + @Override + public String getHttpMethodName() { + return HTTP_METHOD_NAME; + } + + public static class Builder extends BatchRequest.Builder { + + @Override + public DataStreamBatchRequest build() { + + validate(); + + return new DataStreamBatchRequest(this); + + } + + } +} diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItem.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItem.java new file mode 100644 index 00000000..1da3bc75 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItem.java @@ -0,0 +1,52 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import io.netty.buffer.ByteBuf; +import org.appenders.log4j2.elasticsearch.ItemSource; + +/** + * {@link ItemSource} based document to be indexed. + * When it's no longer needed, {@link #completed()} MUST be called to release underlying resources. + */ +public class DataStreamItem extends IndexRequest { + + protected DataStreamItem(final Builder builder) { + super(builder); + } + + public static class Builder extends IndexRequest.Builder { + + public Builder(final ItemSource source) { + super(source); + } + + public DataStreamItem build() { + + validate(); + + return new DataStreamItem(this); + + } + + } + +} diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemMixIn.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemMixIn.java new file mode 100644 index 00000000..e9a16c11 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemMixIn.java @@ -0,0 +1,43 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName(value = "create") +@JsonTypeInfo(include = JsonTypeInfo.As.WRAPPER_OBJECT, use = JsonTypeInfo.Id.NAME) +@JsonIgnoreProperties({"id", "type", "index", "source", "uri", "httpMethodName"}) +public abstract class DataStreamItemMixIn { + + @JsonIgnore + abstract String getIndex(); + + @JsonIgnore + abstract String getType(); + + @JsonIgnore + abstract String getId(); + +} diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemResultMixIn.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemResultMixIn.java new file mode 100644 index 00000000..b8fcd1a1 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemResultMixIn.java @@ -0,0 +1,45 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName(value = "create") +@JsonTypeInfo(include = JsonTypeInfo.As.WRAPPER_OBJECT, use = JsonTypeInfo.Id.NAME) +@JsonIgnoreProperties(ignoreUnknown = true, value = {"_version", "result", "_shards", "_primary_term", "_seq_no"}) +public abstract class DataStreamItemResultMixIn { + + @JsonProperty("_id") + String id; + + @JsonProperty("_type") + String type; + + @JsonProperty("_index") + String index; + + @JsonProperty("error") + Error error; + +} diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamSetupOp.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamSetupOp.java new file mode 100644 index 00000000..9dfbf207 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/DataStreamSetupOp.java @@ -0,0 +1,61 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.appenders.log4j2.elasticsearch.DataStream; +import org.appenders.log4j2.elasticsearch.EmptyItemSourceFactory; +import org.appenders.log4j2.elasticsearch.OpSource; +import org.appenders.log4j2.elasticsearch.Operation; +import org.appenders.log4j2.elasticsearch.OperationFactory; +import org.appenders.log4j2.elasticsearch.SetupStep; +import org.appenders.log4j2.elasticsearch.SkippingSetupStepChain; +import org.appenders.log4j2.elasticsearch.StepProcessor; + +import java.util.Arrays; + +public class DataStreamSetupOp implements OperationFactory { + + protected final StepProcessor> stepProcessor; + protected final EmptyItemSourceFactory itemSourceFactory; + + public DataStreamSetupOp(final StepProcessor> stepProcessor, final EmptyItemSourceFactory itemSourceFactory) { + this.stepProcessor = stepProcessor; + this.itemSourceFactory = itemSourceFactory; + } + + @SuppressWarnings("unchecked") + @Override + public Operation create(final T opSource) { + + final DataStream dataStream = (DataStream) opSource; + + final SetupStep checkDataStream = + new CheckDataStream(dataStream.getName()); + + final SetupStep createDataStream = new CreateDataStream( + dataStream.getName(), + itemSourceFactory.createEmptySource()); + + return new SkippingSetupStepChain<>(Arrays.asList(checkDataStream, createDataStream), stepProcessor); + + } + +} diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPI.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPI.java new file mode 100644 index 00000000..6a0ad903 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPI.java @@ -0,0 +1,102 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MappingJsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.introspect.VisibilityChecker; +import org.appenders.log4j2.elasticsearch.Deserializer; +import org.appenders.log4j2.elasticsearch.ExtendedObjectMapper; +import org.appenders.log4j2.elasticsearch.ItemSource; +import org.appenders.log4j2.elasticsearch.JacksonDeserializer; +import org.appenders.log4j2.elasticsearch.JacksonSerializer; +import org.appenders.log4j2.elasticsearch.Serializer; + +public class ElasticsearchDataStreamAPI implements ClientAPIFactory { + + private final Serializer itemSerializer; + private final Deserializer resultDeserializer; + + public ElasticsearchDataStreamAPI() { + this.itemSerializer = createItemSerializer(); + this.resultDeserializer = createResultDeserializer(); + } + + /** + * @param itemSerializer index request metadata serializer + * @param resultDeserializer batch response deserializer + */ + public ElasticsearchDataStreamAPI( + final Serializer itemSerializer, + final Deserializer resultDeserializer + ) { + this.itemSerializer = itemSerializer; + this.resultDeserializer = resultDeserializer; + } + + @Override + public IndexRequest.Builder itemBuilder(final String target, final ItemSource payload) { + return new DataStreamItem.Builder(payload) + .index(target); + } + + @Override + public BatchRequest.Builder batchBuilder() { + return new DataStreamBatchRequest.Builder() + .withItemSerializer(itemSerializer) + .withResultDeserializer(resultDeserializer); + } + + protected Serializer createItemSerializer() { + + final ObjectWriter objectWriter = new ExtendedObjectMapper(new MappingJsonFactory()) + .setSerializationInclusion(JsonInclude.Include.NON_EMPTY) + .addMixIn(IndexRequest.class, DataStreamItemMixIn.class) + .writerFor(IndexRequest.class); + + return new JacksonSerializer<>(objectWriter); + + } + + @SuppressWarnings("DuplicatedCode") + protected Deserializer createResultDeserializer() { + + final ObjectReader objectReader = new ObjectMapper() + .setVisibility(VisibilityChecker.Std.defaultInstance().with(JsonAutoDetect.Visibility.ANY)) + .setSerializationInclusion(JsonInclude.Include.NON_EMPTY) + .configure(SerializationFeature.CLOSE_CLOSEABLE, false) + .configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) + .addMixIn(BatchResult.class, BatchResultMixIn.class) + .addMixIn(Error.class, ErrorMixIn.class) + .addMixIn(BatchItemResult.class, DataStreamItemResultMixIn.class) + .readerFor(BatchResult.class); + + return new JacksonDeserializer<>(objectReader); + + } + +} diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPIPlugin.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPIPlugin.java new file mode 100644 index 00000000..94b26f40 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPIPlugin.java @@ -0,0 +1,116 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MappingJsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.introspect.VisibilityChecker; +import org.apache.logging.log4j.core.config.ConfigurationException; +import org.apache.logging.log4j.core.config.Node; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; +import org.appenders.log4j2.elasticsearch.Deserializer; +import org.appenders.log4j2.elasticsearch.ExtendedObjectMapper; +import org.appenders.log4j2.elasticsearch.JacksonDeserializer; +import org.appenders.log4j2.elasticsearch.JacksonSerializer; +import org.appenders.log4j2.elasticsearch.Serializer; + +@Plugin(name = "ElasticsearchDataStream", category = Node.CATEGORY, elementType = "clientAPIFactory", printObject = true) +public class ElasticsearchDataStreamAPIPlugin extends ElasticsearchDataStreamAPI { + + private ElasticsearchDataStreamAPIPlugin( + final Serializer serializer, + final Deserializer deserializer + ) { + super(serializer, deserializer); + } + + @PluginBuilderFactory + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder implements org.apache.logging.log4j.core.util.Builder { + + private Serializer itemSerializer = createItemSerializer(); + private Deserializer resultDeserializer = createResultDeserializer(); + + @Override + public ElasticsearchDataStreamAPIPlugin build() + { + if (itemSerializer == null) { + throw new ConfigurationException("itemSerializer cannot be null"); + } + + if (resultDeserializer == null) { + throw new ConfigurationException("resultDeserializer cannot be null"); + } + + return new ElasticsearchDataStreamAPIPlugin(itemSerializer, resultDeserializer); + } + + /** + * @return index request metadata serializer + */ + protected Serializer createItemSerializer() { + final ObjectWriter objectWriter = new ExtendedObjectMapper(new MappingJsonFactory()) + .setSerializationInclusion(JsonInclude.Include.NON_EMPTY) + .addMixIn(IndexRequest.class, DataStreamItemMixIn.class) + .writerFor(IndexRequest.class); + return new JacksonSerializer<>(objectWriter); + } + + /** + * @return batch response deserializer + */ + @SuppressWarnings("DuplicatedCode") + protected Deserializer createResultDeserializer() { + return new JacksonDeserializer<>(new ObjectMapper() + .setVisibility(VisibilityChecker.Std.defaultInstance().with(JsonAutoDetect.Visibility.ANY)) + .setSerializationInclusion(JsonInclude.Include.NON_EMPTY) + .configure(SerializationFeature.CLOSE_CLOSEABLE, false) + .configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) + .addMixIn(BatchResult.class, BatchResultMixIn.class) + .addMixIn(Error.class, ErrorMixIn.class) + .addMixIn(BatchItemResult.class, DataStreamItemResultMixIn.class) + .readerFor(BatchResult.class)); + } + + public Builder withItemSerializer(final Serializer itemSerializer) { + this.itemSerializer = itemSerializer; + return this; + } + + public Builder withResultDeserializer(final Deserializer resultDeserializer) { + this.resultDeserializer = resultDeserializer; + return this; + } + + } + +} + diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchOperationFactory.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchOperationFactory.java index 26e5bf8f..57b9ac94 100644 --- a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchOperationFactory.java +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchOperationFactory.java @@ -22,6 +22,7 @@ import org.appenders.log4j2.elasticsearch.ByteBufItemSourceFactoryPlugin; import org.appenders.log4j2.elasticsearch.ComponentTemplate; +import org.appenders.log4j2.elasticsearch.DataStream; import org.appenders.log4j2.elasticsearch.EmptyItemSourceFactory; import org.appenders.log4j2.elasticsearch.ILMPolicy; import org.appenders.log4j2.elasticsearch.IndexTemplate; @@ -43,19 +44,20 @@ public class ElasticsearchOperationFactory extends OperationFactoryDispatcher im private final EmptyItemSourceFactory itemSourceFactory; public ElasticsearchOperationFactory( - StepProcessor> stepProcessor, - ValueResolver valueResolver, - EmptyItemSourceFactory itemSourceFactory) { + final StepProcessor> stepProcessor, + final ValueResolver valueResolver, + final EmptyItemSourceFactory itemSourceFactory) { super(); this.itemSourceFactory = itemSourceFactory; register(ComponentTemplate.TYPE_NAME, new ComponentTemplateSetupOp(stepProcessor, valueResolver, this.itemSourceFactory)); register(IndexTemplate.TYPE_NAME, new IndexTemplateSetupOp(stepProcessor, valueResolver, this.itemSourceFactory)); register(ILMPolicy.TYPE_NAME, new ILMPolicySetupOp(stepProcessor, valueResolver, this.itemSourceFactory)); + register(DataStream.TYPE_NAME, new DataStreamSetupOp(stepProcessor, this.itemSourceFactory)); } public ElasticsearchOperationFactory( - StepProcessor> stepProcessor, - ValueResolver valueResolver) { + final StepProcessor> stepProcessor, + final ValueResolver valueResolver) { this(stepProcessor, valueResolver, createSetupOpsItemSourceFactory()); } @@ -68,7 +70,6 @@ private static PooledItemSourceFactory createSetupOpsItemSourceFactory() { .build(); } - @Override public void start() { @@ -104,4 +105,5 @@ public boolean isStarted() { public boolean isStopped() { return state == State.STOPPED; } + } diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/HCHttpPlugin.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/HCHttpPlugin.java index 155ca9da..a38606ca 100644 --- a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/HCHttpPlugin.java +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/HCHttpPlugin.java @@ -145,7 +145,9 @@ ValueResolver getValueResolver() { // fallback to no-op return ValueResolver.NO_OP; + } + protected ElasticsearchOperationFactory createOperationFactory(HttpClientProvider clientProvider) { final ObjectReader objectReader = new ObjectMapper() @@ -163,6 +165,7 @@ protected ElasticsearchOperationFactory createOperationFactory(HttpClientProvide return new ElasticsearchOperationFactory( new SyncStepProcessor(clientProvider, objectReader), valueResolver); + } protected HttpClientFactory.Builder createHttpClientFactoryBuilder() { diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ILMPolicySetupOp.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ILMPolicySetupOp.java index 190d34af..bfb7e12f 100644 --- a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ILMPolicySetupOp.java +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/ILMPolicySetupOp.java @@ -30,7 +30,9 @@ import org.appenders.log4j2.elasticsearch.StepProcessor; import org.appenders.log4j2.elasticsearch.ValueResolver; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import static org.appenders.log4j2.elasticsearch.hc.CreateBootstrapIndex.BOOTSTRAP_TEMPLATE; @@ -52,22 +54,32 @@ public ILMPolicySetupOp(StepProcessor> stepProcesso @Override public Operation create(T opSource) { - ILMPolicy ilmPolicy = (ILMPolicy) opSource; + final ILMPolicy ilmPolicy = (ILMPolicy) opSource; + final List> setupSteps = new ArrayList<>(); - SetupStep checkBootstrapIndex = - new CheckBootstrapIndex(ilmPolicy.getRolloverAlias()); + if (ilmPolicy.isCreateBootstrapIndex()) { - String bootstrapIndexRequestBody = String.format(BOOTSTRAP_TEMPLATE, ilmPolicy.getRolloverAlias()); - SetupStep createBootstrapIndex = new CreateBootstrapIndex( - ilmPolicy.getRolloverAlias(), - writer.write(itemSourceFactory.createEmptySource(), bootstrapIndexRequestBody.getBytes())); + final SetupStep checkBootstrapIndex = + new CheckBootstrapIndex(ilmPolicy.getRolloverAlias()); - String ilmPolicyRequestBody = valueResolver.resolve(ilmPolicy.getSource()); - SetupStep updateIlmPolicy = new PutILMPolicy( + final String bootstrapIndexRequestBody = String.format(BOOTSTRAP_TEMPLATE, ilmPolicy.getRolloverAlias()); + final SetupStep createBootstrapIndex = new CreateBootstrapIndex( + ilmPolicy.getRolloverAlias(), + writer.write(itemSourceFactory.createEmptySource(), bootstrapIndexRequestBody.getBytes())); + + setupSteps.add(checkBootstrapIndex); + setupSteps.add(createBootstrapIndex); + + } + + final String ilmPolicyRequestBody = valueResolver.resolve(ilmPolicy.getSource()); + final SetupStep updateIlmPolicy = new PutILMPolicy( ilmPolicy.getName(), writer.write(itemSourceFactory.createEmptySource(), ilmPolicyRequestBody.getBytes())); - return new SkippingSetupStepChain<>(Arrays.asList(checkBootstrapIndex, createBootstrapIndex, updateIlmPolicy), stepProcessor); + setupSteps.add(updateIlmPolicy); + + return new SkippingSetupStepChain<>(setupSteps, stepProcessor); } diff --git a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/IndexRequest.java b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/IndexRequest.java index b191f535..41dc6bfd 100644 --- a/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/IndexRequest.java +++ b/log4j2-elasticsearch-hc/src/main/java/org/appenders/log4j2/elasticsearch/hc/IndexRequest.java @@ -85,7 +85,7 @@ public static class Builder { private String index; private String type; - public Builder(ItemSource source) { + public Builder(final ItemSource source) { this.source = source; } diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/CheckDataStreamTest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/CheckDataStreamTest.java new file mode 100644 index 00000000..4cd216a9 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/CheckDataStreamTest.java @@ -0,0 +1,177 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.appenders.core.logging.Logger; +import org.appenders.log4j2.elasticsearch.Result; +import org.appenders.log4j2.elasticsearch.SetupContext; +import org.appenders.log4j2.elasticsearch.SetupStepTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.appenders.core.logging.InternalLogging.setLogger; +import static org.appenders.core.logging.InternalLoggingTest.mockTestLogger; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CheckDataStreamTest { + + public static final String TEST_NAME = "testDataStream"; + + @AfterEach + public void tearDown() { + setLogger(null); + } + + @Test + public void doesNotExecuteOnFailure() { + + // given + final CheckDataStream setupStep = new CheckDataStream(TEST_NAME); + final SetupContext setupContext = SetupStepTest.createTestSetupContext(Result.FAILURE); + + // when + final boolean result = setupStep.shouldProcess(setupContext); + + // then + assertFalse(result); + + } + + @Test + public void executesOnSuccess() { + + // given + final CheckDataStream setupStep = new CheckDataStream(TEST_NAME); + final SetupContext setupContext = SetupStepTest.createTestSetupContext(Result.SUCCESS); + + // when + final boolean result = setupStep.shouldProcess(setupContext); + + // then + assertTrue(result); + + } + + @Test + public void executesOnSkip() { + + // given + final CheckDataStream setupStep = new CheckDataStream(TEST_NAME); + final SetupContext setupContext = SetupStepTest.createTestSetupContext(Result.SKIP); + + // when + final boolean result = setupStep.shouldProcess(setupContext); + + // then + assertTrue(result); + + } + + @Test + public void onResponseLogsOnSuccess() { + + // given + final CheckDataStream setupStep = new CheckDataStream(TEST_NAME); + + final Response response = mock(Response.class); + when(response.getResponseCode()).thenReturn(404); + + final Logger logger = mockTestLogger(); + + // when + final Result result = setupStep.onResponse(response); + + // then + assertEquals(Result.SUCCESS, result); + verify(logger).info( + "{}: Data stream {} does not exist", + CheckDataStream.class.getSimpleName(), + TEST_NAME); + + } + + @Test + public void onResponseCode200LogsAndSkips() { + + // given + final CheckDataStream setupStep = new CheckDataStream(TEST_NAME); + + final Response response = mock(Response.class); + when(response.getResponseCode()).thenReturn(200); + + final Logger logger = mockTestLogger(); + + // when + final Result result = setupStep.onResponse(response); + + // then + assertEquals(Result.SKIP, result); + verify(logger).info( + "{}: Data stream {} already exists", + CheckDataStream.class.getSimpleName(), + TEST_NAME); + + } + + @Test + public void onResponseCodeZeroLogsAndFails() { + + // given + final CheckDataStream setupStep = new CheckDataStream(TEST_NAME); + + final Response response = mock(Response.class); + when(response.getResponseCode()).thenReturn(0); + + final Logger logger = mockTestLogger(); + + // when + final Result result = setupStep.onResponse(response); + + // then + assertEquals(Result.FAILURE, result); + verify(logger).error( + "{}: Unable to determine if {} data stream already exists", + CheckDataStream.class.getSimpleName(), + TEST_NAME); + + } + + @Test + public void createsGenericRequest() { + + // given + final CheckDataStream setupStep = new CheckDataStream(TEST_NAME); + + // when + final Request request = setupStep.createRequest(); + + // then + assertEquals("GET", request.getHttpMethodName()); + assertEquals("_data_stream/" + TEST_NAME, request.getURI()); + + } + +} diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/CreateDataStreamTest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/CreateDataStreamTest.java new file mode 100644 index 00000000..cfc09421 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/CreateDataStreamTest.java @@ -0,0 +1,186 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import io.netty.buffer.ByteBuf; +import org.appenders.core.logging.Logger; +import org.appenders.log4j2.elasticsearch.Result; +import org.appenders.log4j2.elasticsearch.SetupContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; + +import static org.appenders.core.logging.InternalLogging.setLogger; +import static org.appenders.core.logging.InternalLoggingTest.mockTestLogger; +import static org.appenders.log4j2.elasticsearch.ByteBufItemSourceTest.createTestItemSource; +import static org.appenders.log4j2.elasticsearch.SetupStepTest.createTestSetupContext; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CreateDataStreamTest { + + public static final String TEST_NAME = "testDataStreamName"; + + @AfterEach + public void tearDown() { + setLogger(null); + } + + @Test + public void doesNotExecuteOnFailure() { + + // given + final CreateDataStream setupStep = new CreateDataStream( + TEST_NAME, + createTestItemSource()); + final SetupContext setupContext = createTestSetupContext(Result.FAILURE); + + final Logger logger = mockTestLogger(); + + // when + final boolean result = setupStep.shouldProcess(setupContext); + + // then + assertFalse(result); + verify(logger).info("{}: Skipping data stream creation", + CreateDataStream.class.getSimpleName()); + + } + + @Test + public void executesOnSuccess() { + + // given + final CreateDataStream setupStep = new CreateDataStream( + TEST_NAME, + createTestItemSource()); + final SetupContext setupContext = createTestSetupContext(Result.SUCCESS); + + final Logger logger = mockTestLogger(); + + // when + final boolean result = setupStep.shouldProcess(setupContext); + + // then + assertTrue(result); + verify(logger, never()).info("{}: Skipping data stream creation", + CreateDataStream.class.getSimpleName()); + + } + + @Test + public void doesNotExecuteOnSkip() { + + // given + final CreateDataStream setupStep = new CreateDataStream( + TEST_NAME, + createTestItemSource()); + final SetupContext setupContext = createTestSetupContext(Result.SKIP); + + final Logger logger = mockTestLogger(); + + // when + final boolean result = setupStep.shouldProcess(setupContext); + + // then + assertFalse(result); + verify(logger).info("{}: Skipping data stream creation", + CreateDataStream.class.getSimpleName()); + + } + + @Test + public void onResponseLogsOnSuccess() { + + // given + final CreateDataStream setupStep = new CreateDataStream( + TEST_NAME, + createTestItemSource()); + + final Response jestResult = mock(Response.class); + when(jestResult.getResponseCode()).thenReturn(200); + + final Logger logger = mockTestLogger(); + + // when + final Result result = setupStep.onResponse(jestResult); + + // then + assertEquals(Result.SUCCESS, result); + verify(logger).info( + "{}: Data stream {} created", + CreateDataStream.class.getSimpleName(), + TEST_NAME); + + } + + @Test + public void onResponseLogsOnNonSuccess() { + + // given + final CreateDataStream setupStep = new CreateDataStream( + TEST_NAME, + createTestItemSource()); + + final Response jestResult = mock(Response.class); + when(jestResult.getResponseCode()).thenReturn(400); + String error = "test data stream creation error"; + when(jestResult.getErrorMessage()).thenReturn(error); + + final Logger logger = mockTestLogger(); + + // when + final Result result = setupStep.onResponse(jestResult); + + // then + assertEquals(Result.FAILURE, result); + verify(logger).error( + "{}: Unable to create data stream: {}", + CreateDataStream.class.getSimpleName(), + error); + + } + + @Test + public void createsGenericRequest() throws Exception { + + // given + final CreateDataStream setupStep = new CreateDataStream( + TEST_NAME, + createTestItemSource()); + + // when + final Request request = setupStep.createRequest(); + + // then + assertEquals("PUT", request.getHttpMethodName()); + assertEquals("_data_stream/" + TEST_NAME, request.getURI()); + assertEquals("", ((ByteBuf)request.serialize().getSource()).toString(StandardCharsets.UTF_8)); + + } + +} diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamBatchRequestTest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamBatchRequestTest.java new file mode 100644 index 00000000..86695afd --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamBatchRequestTest.java @@ -0,0 +1,314 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import org.appenders.log4j2.elasticsearch.ByteBufItemSource; +import org.appenders.log4j2.elasticsearch.Deserializer; +import org.appenders.log4j2.elasticsearch.ItemSource; +import org.appenders.log4j2.elasticsearch.JacksonSerializer; +import org.appenders.log4j2.elasticsearch.Serializer; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.appenders.log4j2.elasticsearch.ByteBufItemSourceTest.createTestItemSource; +import static org.appenders.log4j2.elasticsearch.hc.DataStreamItemTest.createDataStreamItemRequestBuilder; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public abstract class DataStreamBatchRequestTest { + + @Test + public void builderBuildsSuccessfully() { + + // given + BatchRequest.Builder builder = createDefaultTestObjectBuilder(); + + // when + BatchRequest batchRequest = builder.build(); + + // then + assertNotNull(batchRequest); + + } + + @Test + public void uriIsBasedOnBatchItems() { + + // given + BatchRequest.Builder builder = createDefaultTestObjectBuilder(); + + // when + BatchRequest batchRequest = builder.build(); + + // then + assertNotNull(batchRequest); + + } + + @Test + public void builderFailsWhenBufferIsNull() { + + // given + BatchRequest.Builder builder = createDefaultTestObjectBuilder(); + + builder.withBuffer(null); + + // when + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + + // then + assertThat(exception.getMessage(), containsString("buffer cannot be null")); + + } + + @Test + public void builderFailsWhenSerializerIsNull() { + + // given + final BatchRequest.Builder builder = createDefaultTestObjectBuilder(); + builder.withItemSerializer(null); + + // when + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + + // then + assertThat(exception.getMessage(), containsString("itemSerializer cannot be null")); + + } + + @Test + public void builderFailsWhenResultDeserializerIsNull() { + + // given + final BatchRequest.Builder builder = createDefaultTestObjectBuilder(); + builder.withResultDeserializer(null); + + // when + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + + // then + assertThat(exception.getMessage(), containsString("resultDeserializer cannot be null")); + + } + + @Test + public void builderFailsWhenObjectWriterIsNull() { + + // given + final BatchRequest.Builder builder = createDefaultTestObjectBuilder(); + + // when + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> builder.withObjectWriter(null)); + + // then + assertThat(exception.getMessage(), containsString("objectWriter cannot be null")); + + } + + @Test + public void builderCanStoreAction() { + + // given + BatchRequest.Builder builder = createDefaultTestObjectBuilder(); + + builder.withItemSerializer(null); + + // when + builder.add(mock(DataStreamItem.class)); + + // then + assertEquals(1, builder.items.size()); + + } + + @Test + public void builderCanStoreMultipleActionsAtOnce() { + + // given + BatchRequest.Builder builder = createDefaultTestObjectBuilder(); + + Collection actions = new ArrayList<>(); + int expectedSize = new Random().nextInt(100) + 1; + for (int i = 0; i < expectedSize; i++) { + actions.add(mock(DataStreamItem.class)); + } + + // when + builder.add(actions); + + // then + assertEquals(expectedSize, builder.items.size()); + + } + + @Test + public void throwsOnItemsWithDifferentIndicesInSameBatch() throws Exception { + + // given + final Serializer serializer = spy(new JacksonSerializer<>(new ObjectMapper().writerFor(IndexRequest.class))); + + final ItemSource source1 = createTestItemSource(); + final String index1 = UUID.randomUUID().toString(); + final IndexRequest action1 = createDataStreamItemRequestBuilder(source1) + .index(index1) + .build(); + + final ItemSource source2 = createTestItemSource(); + final String index2 = UUID.randomUUID().toString(); + + final IndexRequest action2 = createDataStreamItemRequestBuilder(source2) + .index(index2) + .build(); + + @SuppressWarnings("unchecked") + final BatchRequest request = new DataStreamBatchRequest.Builder() + .withItemSerializer(serializer) + .withResultDeserializer(mock(Deserializer.class)) + .withBuffer(createTestItemSource()) + .add(action1) + .add(action2) + .build(); + + // when + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, request::serialize); + + // then + assertThat(exception.getMessage(), containsString("Items for different indices found")); + assertThat(exception.getMessage(), containsString(index1)); + assertThat(exception.getMessage(), containsString(index2)); + + } + + @Test + public void canSerializeOnceIfAllItemsAreTheSame() throws Exception { + + // given + final Serializer serializer = spy(new JacksonSerializer<>(new ObjectMapper().writerFor(IndexRequest.class))); + + final ItemSource source1 = createTestItemSource(); + final String index = UUID.randomUUID().toString(); + final String mappingType = UUID.randomUUID().toString(); + final IndexRequest action1 = createDataStreamItemRequestBuilder(source1) + .index(index) + .type(mappingType) + .build(); + + final ItemSource source2 = createTestItemSource(); + final IndexRequest action2 = createDataStreamItemRequestBuilder(source2) + .index(index) + .type(mappingType) + .build(); + + @SuppressWarnings("unchecked") + final BatchRequest batchRequest = new DataStreamBatchRequest.Builder() + .withItemSerializer(serializer) + .withResultDeserializer(mock(Deserializer.class)) + .withBuffer(createTestItemSource()) + .add(action1) + .add(action2) + .build(); + + // when + batchRequest.serialize(); + + // then + final ArgumentCaptor captor = ArgumentCaptor.forClass(IndexRequest.class); + verify(serializer, times(1)).writeAsBytes(captor.capture()); + + final List allValues = captor.getAllValues(); + assertEquals(1, allValues.size()); + assertEquals(index, allValues.get(0).getIndex()); + + assertEquals(index + "/_bulk", batchRequest.getURI()); + assertEquals("POST", batchRequest.getHttpMethodName()); + + } + + @Test + public void callingCompletedReleasesItemSource() { + + // given + BatchRequest.Builder builder = createDefaultTestObjectBuilder(); + + ByteBufItemSource buffer = mock(ByteBufItemSource.class); + builder.withBuffer(buffer); + + BatchRequest request = builder.build(); + + // when + request.completed(); + + // then + verify(buffer).release(); + + } + + @Test + public void callingCompletedReleasesActions() { + + // given + BatchRequest.Builder builder = createDefaultTestObjectBuilder(); + + ByteBufItemSource buffer = mock(ByteBufItemSource.class); + builder.withBuffer(buffer); + + Collection actions = new ArrayList<>(); + IndexRequest indexRequest = spy(createDataStreamItemRequestBuilder(mock(ByteBufItemSource.class)) + .build()); + actions.add(indexRequest); + builder.add(actions); + + BatchRequest request = builder.build(); + + // when + request.completed(); + + // then + verify(indexRequest).completed(); + + } + + public static BatchRequest.Builder createDefaultTestObjectBuilder() { + //noinspection unchecked + return new DataStreamBatchRequest.Builder() + .withItemSerializer(mock(Serializer.class)) + .withResultDeserializer(mock(Deserializer.class)) + .withBuffer(mock(ByteBufItemSource.class)); + } + +} diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemTest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemTest.java new file mode 100644 index 00000000..2dac620d --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamItemTest.java @@ -0,0 +1,187 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.appenders.log4j2.elasticsearch.ByteBufItemSource; +import org.appenders.log4j2.elasticsearch.ItemSource; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class DataStreamItemTest { + + @Test + public void builderFailsWhenSourceIsNull() { + + // given + IndexRequest.Builder builder = createDataStreamItemRequestBuilder(null); + + // when + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + + // then + assertThat(exception.getMessage(), containsString("source cannot be null")); + + } + + @Test + public void builderFailsWhenIndexIsNull() { + + // given + IndexRequest.Builder builder = createDataStreamItemRequestBuilder() + .index(null); + + // when + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + + // then + assertThat(exception.getMessage(), containsString("index cannot be null")); + + } + + @Test + public void builderBuildsWhenMappingTypeIsNull() { + + // given + IndexRequest.Builder builder = createDataStreamItemRequestBuilder() + .type(null); + + // when + final IndexRequest request = builder.build(); + + // then + assertNotNull(request); + + } + + @Test + public void builderSetsAllFields() { + + // given + String expectedIndex = UUID.randomUUID().toString(); + ByteBufItemSource expectedItemSource = mock(ByteBufItemSource.class); + + // when + IndexRequest.Builder builder = createDataStreamItemRequestBuilder( + expectedItemSource, expectedIndex); + + IndexRequest request = builder.build(); + + // then + assertEquals(expectedIndex, request.getIndex()); + assertEquals(expectedItemSource, request.getSource()); + + } + + @Test + public void requestTypesAreSameIfBothAreNull() { + + // given + final IndexRequest request1 = createDataStreamItemRequestBuilder() + .type(null) + .build(); + + final IndexRequest request2 = createDataStreamItemRequestBuilder() + .type(null) + .build(); + + // when + final boolean result = request1.sameType(request2); + + // then + assertTrue(result); + + } + + @Test + public void requestTypesAreSameIfBothAreEqual() { + + // given + final String type = UUID.randomUUID().toString(); + final IndexRequest request1 = createDataStreamItemRequestBuilder() + .type(type) + .build(); + + final IndexRequest request2 = createDataStreamItemRequestBuilder() + .type(type) + .build(); + + // when + final boolean result = request1.sameType(request2); + + // then + assertTrue(result); + + } + + @Test + public void requestTypesAreNotSameIfOneIsNullAndOtherIsNot() { + + // given + final String type = UUID.randomUUID().toString(); + final IndexRequest request1 = createDataStreamItemRequestBuilder() + .type(null) + .build(); + + final IndexRequest request2 = createDataStreamItemRequestBuilder() + .type(type) + .build(); + + // when + final boolean result = request1.sameType(request2); + + // then + assertFalse(result); + + } + + public static IndexRequest.Builder createDataStreamItemRequestBuilder() { + return createDataStreamItemRequestBuilder(mock(ByteBufItemSource.class)); + + } + + public static IndexRequest.Builder createDataStreamItemRequestBuilder( + ItemSource expectedItemSource + ) { + return createDataStreamItemRequestBuilder( + expectedItemSource, + UUID.randomUUID().toString() + ); + } + + private static IndexRequest.Builder createDataStreamItemRequestBuilder( + ItemSource expectedItemSource, + String expectedIndex + ) { + return new DataStreamItem.Builder(expectedItemSource) + .index(expectedIndex); + } + +} diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamRequestTest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamRequestTest.java new file mode 100644 index 00000000..41bddf2e --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/DataStreamRequestTest.java @@ -0,0 +1,171 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.appenders.log4j2.elasticsearch.ByteBufItemSource; +import org.appenders.log4j2.elasticsearch.ItemSource; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class DataStreamRequestTest { + + @Test + public void builderFailsWhenSourceIsNull() { + + // given + IndexRequest.Builder builder = createDataStreamItemRequestBuilder(null); + + // when + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + + // then + assertThat(exception.getMessage(), containsString("source cannot be null")); + + } + + @Test + public void builderFailsWhenIndexIsNull() { + + // given + IndexRequest.Builder builder = createDataStreamItemRequestBuilder() + .index(null); + + // when + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + + // then + assertThat(exception.getMessage(), containsString("index cannot be null")); + + } + + @Test + public void builderSetsAllFields() { + + // given + String expectedIndex = UUID.randomUUID().toString(); + ByteBufItemSource expectedItemSource = mock(ByteBufItemSource.class); + + // when + IndexRequest.Builder builder = createDataStreamItemRequestBuilder( + expectedItemSource, expectedIndex); + + IndexRequest request = builder.build(); + + // then + assertEquals(expectedIndex, request.getIndex()); + assertEquals(expectedItemSource, request.getSource()); + + } + + @Test + public void requestTypesAreSameIfBothAreNull() { + + // given + final IndexRequest request1 = createDataStreamItemRequestBuilder() + .type(null) + .build(); + + final IndexRequest request2 = createDataStreamItemRequestBuilder() + .type(null) + .build(); + + // when + final boolean result = request1.sameType(request2); + + // then + assertTrue(result); + + } + + @Test + public void requestTypesAreSameIfBothAreEqual() { + + // given + final String type = UUID.randomUUID().toString(); + final IndexRequest request1 = createDataStreamItemRequestBuilder() + .type(type) + .build(); + + final IndexRequest request2 = createDataStreamItemRequestBuilder() + .type(type) + .build(); + + // when + final boolean result = request1.sameType(request2); + + // then + assertTrue(result); + + } + + @Test + public void requestTypesAreNotSameIfOneIsNullAndOtherIsNot() { + + // given + final String type = UUID.randomUUID().toString(); + final IndexRequest request1 = createDataStreamItemRequestBuilder() + .type(null) + .build(); + + final IndexRequest request2 = createDataStreamItemRequestBuilder() + .type(type) + .build(); + + // when + final boolean result = request1.sameType(request2); + + // then + assertFalse(result); + + } + + public static IndexRequest.Builder createDataStreamItemRequestBuilder() { + return createDataStreamItemRequestBuilder(mock(ByteBufItemSource.class)); + + } + + public static IndexRequest.Builder createDataStreamItemRequestBuilder( + ItemSource expectedItemSource + ) { + return createDataStreamItemRequestBuilder( + expectedItemSource, + UUID.randomUUID().toString() + ); + } + + private static IndexRequest.Builder createDataStreamItemRequestBuilder( + ItemSource expectedItemSource, + String expectedIndex + ) { + return new DataStreamItem.Builder(expectedItemSource) + .index(expectedIndex); + } + +} diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPIPluginTest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPIPluginTest.java new file mode 100644 index 00000000..fd3df7c1 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPIPluginTest.java @@ -0,0 +1,103 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.logging.log4j.core.config.ConfigurationException; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ElasticsearchDataStreamAPIPluginTest extends ElasticsearchBulkAPITest { + + public static final String TEST_PAYLOAD_STRING = "{}"; + public static final String TEST_NAME = UUID.randomUUID().toString(); + + @Test + public void defaultBuilderBuildsSuccessfully() { + + // given + final ElasticsearchDataStreamAPIPlugin.Builder builder = ElasticsearchDataStreamAPIPlugin.newBuilder(); + + // when + final ElasticsearchDataStreamAPIPlugin plugin = builder.build(); + + // then + assertNotNull(plugin); + + } + + @Test + public void defaultBuilderDoesNotSetMappingType() { + + // given + final ElasticsearchDataStreamAPIPlugin.Builder builder = ElasticsearchDataStreamAPIPlugin.newBuilder(); + + final ElasticsearchDataStreamAPIPlugin plugin = builder.build(); + + // when + final IndexRequest indexRequest = plugin.itemBuilder(TEST_NAME, createTestItemSource(TEST_PAYLOAD_STRING)).build(); + + // then + assertNull(indexRequest.id); + assertNull(indexRequest.type); + assertNotNull(indexRequest.index); + + } + + + @Test + public void builderThrowsWhenItemSerializerIsNull() { + + // given + final ElasticsearchDataStreamAPIPlugin.Builder builder = ElasticsearchDataStreamAPIPlugin.newBuilder(); + builder.withItemSerializer(null); + + // when + final ConfigurationException exception = assertThrows(ConfigurationException.class, builder::build); + + // then + assertThat(exception.getMessage(), containsString("itemSerializer cannot be null")); + + } + + @Test + public void builderThrowsWhenResultDeserializerIsNull() { + + // given + final ElasticsearchDataStreamAPIPlugin.Builder builder = ElasticsearchDataStreamAPIPlugin.newBuilder(); + builder.withResultDeserializer(null); + + // when + final ConfigurationException exception = assertThrows(ConfigurationException.class, builder::build); + + // then + assertThat(exception.getMessage(), containsString("resultDeserializer cannot be null")); + + } + +} diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPITest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPITest.java new file mode 100644 index 00000000..9283ed6a --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchDataStreamAPITest.java @@ -0,0 +1,127 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import org.appenders.log4j2.elasticsearch.ByteBufItemSourceTest; +import org.appenders.log4j2.elasticsearch.Deserializer; +import org.appenders.log4j2.elasticsearch.IndexNamePluginTest; +import org.appenders.log4j2.elasticsearch.ItemSource; +import org.appenders.log4j2.elasticsearch.Serializer; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +public class ElasticsearchDataStreamAPITest { + + @Test + public void createsDataStreamItemRequestBuilder() { + + // given + final String payloadString = UUID.randomUUID().toString(); + final ItemSource payload = createTestItemSource(payloadString); + + final ElasticsearchDataStreamAPI builder = new ElasticsearchDataStreamAPI(); + + final String target = IndexNamePluginTest.TEST_INDEX_NAME; + + // when + final IndexRequest.Builder requestBuilder = builder.itemBuilder(target, payload); + final IndexRequest request = requestBuilder.build(); + + // then + assertNotNull(request); + assertNull(request.id); + assertNotNull(request.index); + assertNull(request.type); + assertEquals(payload, request.source); + + } + + @Test + public void createsBatchRequestBuilder() throws Exception { + + // given + final Serializer serializer = spy(ElasticsearchDataStreamAPIPlugin.newBuilder().createItemSerializer()); + final Deserializer deserializer = mock(Deserializer.class); + + final String mappingType = UUID.randomUUID().toString(); + final ElasticsearchDataStreamAPI builder = new ElasticsearchDataStreamAPI(serializer, deserializer); + + final ItemSource batchBuffer = createDefaultTestBatchBuffer(); + + final String target = IndexNamePluginTest.TEST_INDEX_NAME; + + final String payloadString = UUID.randomUUID().toString(); + final ItemSource payload = createTestItemSource(payloadString); + final IndexRequest.Builder indexRequestBuilder = builder.itemBuilder(target, payload); + final IndexRequest indexRequest = indexRequestBuilder.build(); + + // when + final BatchRequest.Builder requestBuilder = builder.batchBuilder(); + requestBuilder.withBuffer(batchBuffer); + requestBuilder.add(indexRequest); + + final BatchRequest request = requestBuilder.build(); + final ItemSource serialized = request.serialize(); + + // then + verify(serializer).writeAsBytes(eq(indexRequest)); + + final ByteBuf source = (ByteBuf) serialized.getSource(); + final String batchString = source.toString(StandardCharsets.UTF_8); + + assertThat(batchString, containsString("{\"create\":{}}")); + assertThat(batchString, not(containsString(target))); + assertThat(batchString, not(containsString(mappingType))); + assertThat(batchString, containsString(payloadString)); + + } + + ItemSource createTestItemSource(final String payloadString) { + + final CompositeByteBuf buffer = ByteBufItemSourceTest.createDefaultTestByteBuf(); + buffer.writeBytes(payloadString.getBytes(StandardCharsets.UTF_8)); + + return ByteBufItemSourceTest.createTestItemSource(buffer, itemSource -> {}); + + } + + private ItemSource createDefaultTestBatchBuffer() { + final CompositeByteBuf batchByteBuf = ByteBufItemSourceTest.createDefaultTestByteBuf(); + return ByteBufItemSourceTest.createTestItemSource(batchByteBuf, itemSource -> {}); + } + +} diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchOperationFactoryTest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchOperationFactoryTest.java index fea52449..c19af3ce 100644 --- a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchOperationFactoryTest.java +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/ElasticsearchOperationFactoryTest.java @@ -24,6 +24,8 @@ import org.appenders.log4j2.elasticsearch.ByteBufItemSourceTest; import org.appenders.log4j2.elasticsearch.ComponentTemplate; import org.appenders.log4j2.elasticsearch.ComponentTemplateTest; +import org.appenders.log4j2.elasticsearch.DataStream; +import org.appenders.log4j2.elasticsearch.DataStreamPlugin; import org.appenders.log4j2.elasticsearch.EmptyItemSourceFactory; import org.appenders.log4j2.elasticsearch.ILMPolicy; import org.appenders.log4j2.elasticsearch.ILMPolicyPluginTest; @@ -46,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -152,6 +155,8 @@ public void supportsILMPolicy() throws Exception { result.execute(); // then + assertEquals(3, stepProcessor.requests.size()); + SetupStep checkBootstrapIndex = stepProcessor.requests.get(0); assertTrue(checkBootstrapIndex instanceof CheckBootstrapIndex); assertEquals(expectedRolloverAlias, checkBootstrapIndex.createRequest().getURI()); @@ -177,6 +182,81 @@ public void supportsILMPolicy() throws Exception { } + @Test + public void supportsNonBootstrappingILMPolicy() throws Exception { + + // given + final String expectedName = UUID.randomUUID().toString(); + final String unresolvedSource = String.format("%s${unresolved}%s", "test", "ilmPolicy"); + final ILMPolicy ilmPolicy = ILMPolicyPluginTest.createTestILMPolicyPluginBuilder() + .withName(expectedName) + .withCreateBootstrapIndex(false) + .withSource(unresolvedSource) + .withPath(null) + .build(); + + final CapturingStepProcessor stepProcessor = new CapturingStepProcessor(); + + final String expectedResolvedValue = UUID.randomUUID().toString(); + final ValueResolver valueResolver = mock(ValueResolver.class); + when(valueResolver.resolve(unresolvedSource)).thenReturn(String.format("%s%s%s", "test", expectedResolvedValue, "ilmPolicy")); + + final ElasticsearchOperationFactory ops = createDefaultTestOperationsFactoryDispatcher(stepProcessor, valueResolver, ByteBufItemSourceTest::createTestItemSource); + + // when + final Operation result = ops.create(ilmPolicy); + result.execute(); + + // then + assertEquals(1, stepProcessor.requests.size()); + final SetupStep putIlmPolicy = stepProcessor.requests.get(0); + assertTrue(putIlmPolicy instanceof PutILMPolicy); + + assertEquals(expectedName, ((PutILMPolicy) putIlmPolicy).name); + final ByteBufItemSource source = (ByteBufItemSource) ((PutILMPolicy) putIlmPolicy).source; + assertEquals(String.format("%s%s%s", "test", expectedResolvedValue, "ilmPolicy"), source.getSource().toString(StandardCharsets.UTF_8)); + assertEquals("_ilm/policy/" + expectedName, putIlmPolicy.createRequest().getURI()); + + } + + @Test + public void supportsDataStream() throws Exception { + + // given + final String expectedName = UUID.randomUUID().toString(); + final DataStream dataStream = DataStreamPlugin.newBuilder() + .withName(expectedName) + .build(); + + + final CapturingStepProcessor stepProcessor = new CapturingStepProcessor(); + + final ValueResolver valueResolver = mock(ValueResolver.class); + + final EmptyItemSourceFactory testItemSource = ByteBufItemSourceTest::createTestItemSource; + final ElasticsearchOperationFactory ops = createDefaultTestOperationsFactoryDispatcher(stepProcessor, valueResolver, testItemSource); + + + // when + Operation result = ops.create(dataStream); + result.execute(); + + // then + assertEquals(2, stepProcessor.requests.size()); + final SetupStep checkDataStream = stepProcessor.requests.get(0); + assertTrue(checkDataStream instanceof CheckDataStream); + assertNull(checkDataStream.createRequest().serialize()); + + assertEquals("_data_stream/" + expectedName, checkDataStream.createRequest().getURI()); + final SetupStep createDataStream = stepProcessor.requests.get(1); + assertTrue(createDataStream instanceof CreateDataStream); + + final ByteBufItemSource source = (ByteBufItemSource) ((CreateDataStream) createDataStream).itemSource; + assertEquals("", source.getSource().toString(StandardCharsets.UTF_8)); + assertEquals(expectedName, ((CreateDataStream) createDataStream).name); + assertEquals("_data_stream/" + expectedName, createDataStream.createRequest().getURI()); + + } @Test public void lifecycleStartStartItemSourceFactoryOnlyOnce() { diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/FallbackDataStreamBatchRequestTest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/FallbackDataStreamBatchRequestTest.java new file mode 100644 index 00000000..281bc794 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/FallbackDataStreamBatchRequestTest.java @@ -0,0 +1,29 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +public class FallbackDataStreamBatchRequestTest extends DataStreamBatchRequestTest { + + static { + System.setProperty("appenders." + BatchRequest.class.getSimpleName() + ".jctools.enabled", "false"); + } + +} diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/JCToolsDataStreamBatchRequestTest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/JCToolsDataStreamBatchRequestTest.java new file mode 100644 index 00000000..c8b628fb --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/JCToolsDataStreamBatchRequestTest.java @@ -0,0 +1,29 @@ +package org.appenders.log4j2.elasticsearch.hc; + +/*- + * #%L + * log4j2-elasticsearch + * %% + * Copyright (C) 2022 Rafal Foltynski + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +public class JCToolsDataStreamBatchRequestTest extends DataStreamBatchRequestTest { + + static { + System.setProperty("appenders." + BatchRequest.class.getSimpleName() + ".jctools.enabled", "true"); + } + +} diff --git a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/smoke/SmokeTest.java b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/smoke/SmokeTest.java index 39af6d5e..17cbd2d3 100644 --- a/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/smoke/SmokeTest.java +++ b/log4j2-elasticsearch-hc/src/test/java/org/appenders/log4j2/elasticsearch/hc/smoke/SmokeTest.java @@ -36,11 +36,13 @@ import org.appenders.log4j2.elasticsearch.AsyncBatchDelivery; import org.appenders.log4j2.elasticsearch.Auth; import org.appenders.log4j2.elasticsearch.BatchDelivery; +import org.appenders.log4j2.elasticsearch.BatchOperations; import org.appenders.log4j2.elasticsearch.ByteBufBoundedSizeLimitPolicy; import org.appenders.log4j2.elasticsearch.ByteBufPooledObjectOps; import org.appenders.log4j2.elasticsearch.CertInfo; import org.appenders.log4j2.elasticsearch.ComponentTemplate; import org.appenders.log4j2.elasticsearch.Credentials; +import org.appenders.log4j2.elasticsearch.DataStream; import org.appenders.log4j2.elasticsearch.ElasticsearchAppender; import org.appenders.log4j2.elasticsearch.ExampleJacksonModule; import org.appenders.log4j2.elasticsearch.ILMPolicy; @@ -52,7 +54,7 @@ import org.appenders.log4j2.elasticsearch.OpSource; import org.appenders.log4j2.elasticsearch.PooledItemSourceFactory; import org.appenders.log4j2.elasticsearch.ResourceUtil; -import org.appenders.log4j2.elasticsearch.RollingIndexNamePlugin; +import org.appenders.log4j2.elasticsearch.SimpleIndexName; import org.appenders.log4j2.elasticsearch.UnlimitedResizePolicy; import org.appenders.log4j2.elasticsearch.VirtualProperty; import org.appenders.log4j2.elasticsearch.backoff.BatchLimitBackoffPolicy; @@ -64,6 +66,8 @@ import org.appenders.log4j2.elasticsearch.hc.BatchResultMixIn; import org.appenders.log4j2.elasticsearch.hc.ClientProviderPoliciesRegistry; import org.appenders.log4j2.elasticsearch.hc.ClientProviderPolicy; +import org.appenders.log4j2.elasticsearch.hc.ElasticsearchBulkAPI; +import org.appenders.log4j2.elasticsearch.hc.ElasticsearchDataStreamAPI; import org.appenders.log4j2.elasticsearch.hc.ElasticsearchOperationFactory; import org.appenders.log4j2.elasticsearch.hc.Error; import org.appenders.log4j2.elasticsearch.hc.ErrorMixIn; @@ -78,6 +82,7 @@ import org.appenders.log4j2.elasticsearch.hc.discovery.ElasticsearchNodesQuery; import org.appenders.log4j2.elasticsearch.hc.discovery.ServiceDiscoveryFactory; import org.appenders.log4j2.elasticsearch.hc.discovery.ServiceDiscoveryRequest; +import org.appenders.log4j2.elasticsearch.json.jackson.LogEventDataStreamMixIn; import org.appenders.log4j2.elasticsearch.smoke.SmokeTestBase; import org.appenders.log4j2.elasticsearch.smoke.TestConfig; import org.appenders.log4j2.elasticsearch.util.SplitUtil; @@ -96,21 +101,28 @@ public class SmokeTest extends SmokeTestBase { + static final String MODULE_NAME = "log4j2-elasticsearch-hc"; + @BeforeEach public void beforeEach() { super.beforeEach(); configure(); } - protected void configure() { - addSecurityConfig(getConfig()) + protected TestConfig configure() { + + final boolean dataStreamsEnabled = Boolean.parseBoolean(System.getProperty("smokeTest.datastreams.enabled", "false")); + final String indexName = resolveIndexName(dataStreamsEnabled); + + return addSecurityConfig(getConfig()) .add("serverList", System.getProperty("smokeTest.serverList", "localhost:9200")) .add("batchSize", getInt("smokeTest.batchSize", 10000)) .add("initialItemPoolSize", getInt("smokeTest.initialItemPoolSize", 40000)) .add("initialItemBufferSizeInBytes", getInt("smokeTest.initialItemBufferSizeInBytes", 1024)) .add("initialBatchPoolSize", getInt("smokeTest.initialBatchPoolSize", 4)) - .add("indexName", System.getProperty("smokeTest.indexName", "log4j2-elasticsearch-hc")) .add("ecs.enabled", Boolean.parseBoolean(System.getProperty("smokeTest.ecs.enabled", "false"))) + .add("datastreams.enabled", Boolean.parseBoolean(System.getProperty("smokeTest.datastreams.enabled", "false"))) + .add("indexName", indexName) .add("servicediscovery.enabled", Boolean.parseBoolean(System.getProperty("smokeTest.servicediscovery.enabled", "true"))) .add("servicediscovery.nodesFilter", System.getProperty("smokeTest.servicediscovery.nodesFilter", ElasticsearchNodesQuery.DEFAULT_NODES_FILTER)) .add("chroniclemap.sequenceId", 1) @@ -119,9 +131,9 @@ protected void configure() { private TestConfig addSecurityConfig(TestConfig target) { return target.add("pemCertInfo.keyPath", System.getProperty("pemCertInfo.keyPath")) - .add("pemCertInfo.keyPassphrase", System.getProperty("pemCertInfo.keyPassphrase")) - .add("pemCertInfo.clientCertPath", System.getProperty("pemCertInfo.clientCertPath")) - .add("pemCertInfo.caPath", System.getProperty("pemCertInfo.caPath")); + .add("pemCertInfo.keyPassphrase", System.getProperty("pemCertInfo.keyPassphrase")) + .add("pemCertInfo.clientCertPath", System.getProperty("pemCertInfo.clientCertPath")) + .add("pemCertInfo.caPath", System.getProperty("pemCertInfo.caPath")); } @Override @@ -131,8 +143,9 @@ public ElasticsearchAppender.Builder createElasticsearchAppenderBuilder(boolean final int initialItemPoolSize = getConfig().getProperty("initialItemPoolSize", Integer.class); final int initialItemBufferSizeInBytes = getConfig().getProperty("initialItemBufferSizeInBytes", Integer.class); final int initialBatchPoolSize = getConfig().getProperty("initialBatchPoolSize", Integer.class); - final String indexName = getConfig().getProperty("indexName", String.class); final boolean ecsEnabled = getConfig().getProperty("ecs.enabled", Boolean.class); + final boolean dataStreamsEnabled = getConfig().getProperty("datastreams.enabled", Boolean.class); + final String indexName = getConfig().getProperty("indexName", String.class); final boolean serviceDiscoveryEnabled = getConfig().getProperty("servicediscovery.enabled", Boolean.class); final String version = getConfig().getProperty("api.version", String.class); @@ -160,7 +173,7 @@ public ElasticsearchAppender.Builder createElasticsearchAppenderBuilder(boolean HttpClientProvider clientProvider = new HttpClientProvider(httpConfig); HCHttp.Builder httpObjectFactoryBuilder = new HCHttp.Builder() - .withBatchOperations(new HCBatchOperations(pooledItemSourceFactory, mappingType(VersionUtil.parse(version)))) + .withBatchOperations(batchOperations(pooledItemSourceFactory, VersionUtil.parse(version), dataStreamsEnabled)) .withClientProvider(clientProvider) .withBackoffPolicy(new BatchLimitBackoffPolicy<>(8)); @@ -199,14 +212,13 @@ public ElasticsearchAppender.Builder createElasticsearchAppenderBuilder(boolean .withClientObjectFactory(httpObjectFactoryBuilder.build()) .withBatchSize(batchSize) .withDeliveryInterval(1000) - .withSetupOpSources(setupOpSources(VersionUtil.parse(version), indexName, ecsEnabled)) + .withSetupOpSources(setupOpSources(VersionUtil.parse(version), indexName, ecsEnabled, dataStreamsEnabled)) .withFailoverPolicy(resolveFailoverPolicy()) .withShutdownDelayMillis(10000) .build(); - final IndexNameFormatter indexNameFormatter = new RollingIndexNamePlugin.Builder() + IndexNameFormatter indexNameFormatter = new SimpleIndexName.Builder<>() .withIndexName(indexName) - .withPattern("yyyy-MM-dd-HH") .build(); JacksonJsonLayout.Builder layoutBuilder = JacksonJsonLayout.newBuilder() @@ -225,6 +237,13 @@ public ElasticsearchAppender.Builder createElasticsearchAppenderBuilder(boolean .build()); } + if (dataStreamsEnabled) { + layoutBuilder.withMixins(new JacksonMixIn.Builder() + .withMixInClass(LogEventDataStreamMixIn.class.getName()) + .withTargetClass(LogEvent.class.getName()) + .build()); + } + if (buffered) { PooledItemSourceFactory sourceFactoryConfig = new PooledItemSourceFactory.Builder() .withPoolName("itemPool") @@ -248,6 +267,16 @@ public ElasticsearchAppender.Builder createElasticsearchAppenderBuilder(boolean .withIgnoreExceptions(false); } + private BatchOperations batchOperations(final PooledItemSourceFactory pooledItemSourceFactory, + final Version version, + final boolean dataStreamsEnabled) { + if (dataStreamsEnabled) { + return new HCBatchOperations(pooledItemSourceFactory, new ElasticsearchDataStreamAPI()); + } else { + return new HCBatchOperations(pooledItemSourceFactory, new ElasticsearchBulkAPI(mappingType(version))); + } + } + private ServiceDiscoveryRequest serviceDiscoveryQuery(String nodesFilter) { final boolean secure = getConfig().getProperty("secure", Boolean.class); @@ -313,7 +342,7 @@ private Auth getAuth() { .build(); } - private OpSource[] setupOpSources(final Version version, final String indexName, boolean ecsEnabled) { + private OpSource[] setupOpSources(final Version version, final String indexName, boolean ecsEnabled, final boolean dataStreamsEnabled) { final ArrayList result = new ArrayList<>(); @@ -330,18 +359,19 @@ private OpSource[] setupOpSources(final Version version, final String indexName, result.add(new ComponentTemplate.Builder() .withName(indexName + "-mappings") - .withPath(ecsEnabled ? "classpath:componentTemplate-7-mappings-ecs.json": "classpath:componentTemplate-7-mappings.json") + .withPath(resolveIndexTemplatePath(ecsEnabled, dataStreamsEnabled)) .build()); result.add(new IndexTemplate.Builder() .withApiVersion(8) - .withName(indexName + "-composed-index-template") - .withPath("classpath:composableIndexTemplate-7.json") + .withName(indexName) + .withPath(indexTemplatePath()) .build()); + } else { result.add(new IndexTemplate.Builder() .withApiVersion(7) - .withName(indexName + "-index-template") + .withName(indexName) .withPath("classpath:indexTemplate-" + version.major() + ".json") .build()); } @@ -350,12 +380,34 @@ private OpSource[] setupOpSources(final Version version, final String indexName, result.add(new ILMPolicy( indexName + "-ilm-policy", indexName, + !dataStreamsEnabled, ResourceUtil.loadResource("classpath:ilmPolicy-7.json"))); } + if (dataStreamsEnabled) { + // Optional, ES will create one if it's missing + result.add(new DataStream.Builder() + .withName(indexName) + .build()); + } + return result.toArray(new OpSource[0]); } + private String resolveIndexTemplatePath(final boolean ecsEnabled, final boolean dataStreamsEnabled) { + + if (ecsEnabled) { + return "classpath:componentTemplate-7-mappings-ecs.json"; + } + + if (dataStreamsEnabled) { + return "classpath:componentTemplate-7-mappings-data-stream.json"; + } + + return "classpath:componentTemplate-7-mappings.json"; + + } + private String mappingType(final Version version) { if (version.lowerThan("7.0.0")) { @@ -370,4 +422,24 @@ private String mappingType(final Version version) { } + private String resolveIndexName(boolean dataStreamsEnabled) { + + String indexName = System.getProperty("smokeTest.indexName"); + + if (MODULE_NAME.equals(indexName) || indexName == null) { + final String suffix = (dataStreamsEnabled ? "-data-stream" : "-index"); + indexName = MODULE_NAME + suffix; + } + + System.setProperty("smokeTest.indexName", indexName); + + return indexName; + + } + + private String indexTemplatePath() { + final boolean dsEnabled = getConfig().getProperty("datastreams.enabled", Boolean.class); + return String.format("classpath:composableIndexTemplate-7%s.json", dsEnabled ? "-data-stream" : ""); + } + } diff --git a/log4j2-elasticsearch-hc/src/test/resources/componentTemplate-7-mappings-data-stream.json b/log4j2-elasticsearch-hc/src/test/resources/componentTemplate-7-mappings-data-stream.json new file mode 100644 index 00000000..a24538b8 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/resources/componentTemplate-7-mappings-data-stream.json @@ -0,0 +1,28 @@ +{ + "template": { + "mappings": { + "properties": { + "loggerName": { + "type": "keyword", + "index": false + }, + "message": { + "type": "keyword", + "index": false + }, + "@timestamp": { + "type": "date", + "format": "epoch_millis" + }, + "thread": { + "type": "keyword", + "index": false + }, + "level": { + "type": "keyword", + "index": false + } + } + } + } +} diff --git a/log4j2-elasticsearch-hc/src/test/resources/composableIndexTemplate-7-data-stream.json b/log4j2-elasticsearch-hc/src/test/resources/composableIndexTemplate-7-data-stream.json new file mode 100644 index 00000000..d6692e45 --- /dev/null +++ b/log4j2-elasticsearch-hc/src/test/resources/composableIndexTemplate-7-data-stream.json @@ -0,0 +1,12 @@ +{ + "index_patterns": [ + "${sys:smokeTest.indexName}*" + ], + "data_stream": {}, + "priority": 500, + "composed_of": [ + "${sys:smokeTest.indexName}-settings", + "${sys:smokeTest.indexName}-settings-ilm", + "${sys:smokeTest.indexName}-mappings" + ] +}