diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomJsonLayout.java b/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomJsonLayout.java new file mode 100644 index 000000000..98715c2c2 --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomJsonLayout.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.layout.AbstractStringLayout; + +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +@Plugin(name = "CustomJsonLayout", category = "Core", elementType = Layout.ELEMENT_TYPE, printObject = true) +public class CustomJsonLayout extends AbstractStringLayout { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String CLIENT_ID; + private static final String DOMAIN_NAME; + private static final String UNKNOWN = "UNKNOWN"; + + static { + String value = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", ""); + String[] parts = value.split(":"); + if (value.isEmpty() || parts.length != 2) { + CLIENT_ID = UNKNOWN; + DOMAIN_NAME = UNKNOWN; + } else { + CLIENT_ID = parts[0]; + DOMAIN_NAME = parts[1]; + } + } + + protected CustomJsonLayout(Charset charset) { + super(charset); + } + + @PluginFactory + public static CustomJsonLayout createLayout(@PluginAttribute(value = "charset", defaultString = "UTF-8") Charset charset) { + return new CustomJsonLayout(charset); + } + + @Override + public String toSerializable(LogEvent event) { + if (!(event.getMessage() instanceof OperationMessage)) { + return event.getMessage().getFormattedMessage() + System.lineSeparator(); + } + + Map logEventMap = new HashMap<>(); + logEventMap.put("timestamp", event.getTimeMillis()); + logEventMap.put("message", event.getMessage().getFormattedMessage()); + logEventMap.put("domainName", DOMAIN_NAME); + logEventMap.put("clientId", CLIENT_ID); + + if (event.getMessage().getParameters().length == 1) { + logEventMap.put("StatusCode", event.getMessage().getParameters()[0]); + } + + Throwable throwable = event.getThrown(); + if (throwable != null) { + logEventMap.put("Exception", throwable.getClass().getName()); + logEventMap.put("ExceptionMessage", throwable.getMessage()); + } + + return convertToJson(logEventMap) + System.lineSeparator(); + } + + private String convertToJson(Map logEventMap) { + try { + return OBJECT_MAPPER.writeValueAsString(logEventMap); + } catch (JsonProcessingException e) { + // Logging this error using System.err to avoid recursion + System.err.println("Error serializing log event to JSON: " + e.getMessage()); + return "{\"Error\":\"Error serializing log event\"}"; + } + } +} \ No newline at end of file diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/OperationMessage.java b/flint-core/src/main/java/org/opensearch/flint/core/logging/OperationMessage.java new file mode 100644 index 000000000..7ded763fb --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/logging/OperationMessage.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging; + +import org.apache.logging.log4j.message.Message; + +/** + * Represents an operation message with optional status code for logging purposes. + */ +public final class OperationMessage implements Message { + private final String message; + private final Integer statusCode; + + /** + * Constructs an OperationMessage without a status code. + * + * @param message The message content. + */ + public OperationMessage(String message) { + this(message, null); + } + + /** + * Constructs an OperationMessage with a message and an optional status code. + * + * @param message The message content. + * @param statusCode An optional status code, can be null. + */ + public OperationMessage(String message, Integer statusCode) { + this.message = message; + this.statusCode = statusCode; + } + + @Override + public String getFormattedMessage() { + return message; + } + + @Override + public String getFormat() { + return message; + } + + @Override + public Object[] getParameters() { + // Always return an array, even if empty, to avoid null checks on the consuming side + return statusCode != null ? new Object[]{statusCode} : new Object[]{}; + } + + @Override + public Throwable getThrowable() { + // This implementation does not support Throwable directly + return null; + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpStatusCodeResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpStatusCodeResultPredicate.java index fa82e3655..ee2a23e3f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpStatusCodeResultPredicate.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpStatusCodeResultPredicate.java @@ -18,7 +18,6 @@ * @param result type (supposed to be HttpResponse for OS client) */ public class HttpStatusCodeResultPredicate implements CheckedPredicate { - private static final Logger LOG = Logger.getLogger(HttpStatusCodeResultPredicate.class.getName()); /** diff --git a/flint-core/src/test/java/org/opensearch/flint/core/logging/CustomJsonLayoutTest.java b/flint-core/src/test/java/org/opensearch/flint/core/logging/CustomJsonLayoutTest.java new file mode 100644 index 000000000..271f00a06 --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/logging/CustomJsonLayoutTest.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging; + +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.message.Message; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class CustomJsonLayoutTest { + @Test + public void testSuccessfulLogPlainMessage() { + LogEvent event = mock(LogEvent.class); + when(event.getMessage()).thenReturn(mock(Message.class)); + when(event.getMessage().getFormattedMessage()).thenReturn("Test message"); + + CustomJsonLayout layout = CustomJsonLayout.createLayout(StandardCharsets.UTF_8); + String result = layout.toSerializable(event); + assertNotNull(result); + assertEquals(result, "Test message" + System.lineSeparator()); + } + + @Test + public void testSuccessfulLogOperationMessage() throws NoSuchFieldException, IllegalAccessException { + Class classOfMap = System.getenv().getClass(); + Field field = classOfMap.getDeclaredField("m"); + field.setAccessible(true); + Map writeableEnvironmentVariables = (Map)field.get(System.getenv()); + writeableEnvironmentVariables.put("FLINT_CLUSTER_NAME", "1234567890:testDomain"); + + LogEvent event = mock(LogEvent.class); + when(event.getTimeMillis()).thenReturn(System.currentTimeMillis()); + when(event.getMessage()).thenReturn(mock(OperationMessage.class)); + when(event.getMessage().getFormattedMessage()).thenReturn("Test message"); + when(event.getMessage().getParameters()).thenReturn(new Object[] {new Integer(200)}); + + try { + CustomJsonLayout layout = CustomJsonLayout.createLayout(StandardCharsets.UTF_8); + + String result = layout.toSerializable(event); + assertNotNull(result); + assertTrue(result.contains("\"message\":\"Test message\"")); + assertTrue(result.contains("\"clientId\":\"1234567890\"")); + assertTrue(result.contains("\"domainName\":\"testDomain\"")); + assertTrue(result.contains("\"StatusCode\":200")); + } finally { + // since system environment is shared by other tests. Make sure to remove them before exiting. + writeableEnvironmentVariables.remove("FLINT_CLUSTER_NAME"); + } + } +} diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java index 2178c3c22..55f03a14b 100644 --- a/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java @@ -7,7 +7,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkEnv; -import org.apache.spark.metrics.source.FlintMetricSource; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals;