diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java b/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java new file mode 100644 index 000000000..8908e763b --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java @@ -0,0 +1,165 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.Message; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.BiConsumer; + +/** + * CustomLogging provides a structured logging framework that supports various log levels + * and formats log messages as JSON. This new log format follows OTEL convention + * https://opentelemetry.io/docs/specs/semconv/general/logs/ + */ +public class CustomLogging { + private static final Logger logger = LogManager.getLogger(CustomLogging.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String CLIENT_ID; + private static final String DOMAIN_NAME; + private static final String UNKNOWN = "UNKNOWN"; + + /** + * Default severity level follows https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber + */ + private static final Map severityLevelMap = Map.of( + "TRACE", 1, + "DEBUG", 5, + "INFO", 9, + "WARN", 13, + "ERROR", 17, + "FATAL", 21 + ); + private static final BiConsumer defaultLogAction = logger::info; + private static final Map> logLevelActions = new HashMap<>(); + + static { + String[] parts = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN).split(":"); + CLIENT_ID = parts.length == 2 ? parts[0] : UNKNOWN; + DOMAIN_NAME = parts.length == 2 ? parts[1] : UNKNOWN; + + logLevelActions.put("DEBUG", logger::debug); + logLevelActions.put("INFO", logger::info); + logLevelActions.put("WARN", logger::warn); + logLevelActions.put("ERROR", logger::error); + logLevelActions.put("FATAL", logger::fatal); + } + + private static int getSeverityNumber(String level) { + return severityLevelMap.getOrDefault(level, 0); + } + + private static String convertToJson(Map logEventMap) { + try { + return OBJECT_MAPPER.writeValueAsString(logEventMap); + } catch (JsonProcessingException e) { + System.err.println("Error serializing log event to JSON: " + e.getMessage()); + return "{\"Error\":\"Error serializing log event\"}"; + } + } + + /** + * Constructs a log event map containing log details such as timestamp, severity, + * message body, and custom attributes including domainName and clientId. + * + * @param level The severity level of the log. + * @param content The main content of the log message. + * @param throwable An optional Throwable associated with log messages for error levels. + * @return A map representation of the log event. + */ + protected static Map constructLogEventMap(String level, Object content, Throwable throwable) { + if (content == null) { + throw new IllegalArgumentException("Log message must not be null"); + } + + Map logEventMap = new LinkedHashMap<>(); + Map body = new LinkedHashMap<>(); + constructMessageBody(content, body); + + Map attributes = new LinkedHashMap<>(); + attributes.put("domainName", DOMAIN_NAME); + attributes.put("clientId", CLIENT_ID); + + if (throwable != null) { + attributes.put("exception.type", throwable.getClass().getName()); + attributes.put("exception.message", throwable.getMessage()); + } + + logEventMap.put("timestamp", System.currentTimeMillis()); + logEventMap.put("severityText", level); + logEventMap.put("severityNumber", getSeverityNumber(level)); + logEventMap.put("body", body); + logEventMap.put("attributes", attributes); + + return logEventMap; + } + + private static void constructMessageBody(Object content, Map body) { + if (content instanceof Message) { + Message message = (Message) content; + body.put("message", message.getFormattedMessage()); + if (content instanceof OperationMessage && message.getParameters().length > 0) { + body.put("statusCode", message.getParameters()[0]); + } + } else { + body.put("message", content.toString()); + } + } + + /** + * Logs a message with the specified severity level, message content, and an optional throwable. + * + * @param level The severity level of the log. + * @param content The content of the log message. + * @param throwable An optional Throwable for logging errors or exceptions. + */ + private static void log(String level, Object content, Throwable throwable) { + Map logEventMap = constructLogEventMap(level, content, throwable); + String jsonMessage = convertToJson(logEventMap); + logLevelActions.getOrDefault(level, defaultLogAction).accept(jsonMessage, throwable); + } + + // Public logging methods for various severity levels. + + public static void logDebug(Object message) { + log("DEBUG", message, null); + } + + public static void logInfo(Object message) { + log("INFO", message, null); + } + + public static void logWarning(Object message) { + log("WARN", message, null); + } + + public static void logWarning(Object message, Throwable e) { + log("WARN", message, e); + } + + public static void logError(Object message) { + log("ERROR", message, null); + } + + public static void logError(Object message, Throwable throwable) { + log("ERROR", message, throwable); + } + + public static void logFatal(Object message) { + log("FATAL", message, null); + } + + public static void logFatal(Object message, Throwable throwable) { + log("FATAL", message, throwable); + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/OperationMessage.java b/flint-core/src/main/java/org/opensearch/flint/core/logging/OperationMessage.java new file mode 100644 index 000000000..7ded763fb --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/logging/OperationMessage.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging; + +import org.apache.logging.log4j.message.Message; + +/** + * Represents an operation message with optional status code for logging purposes. + */ +public final class OperationMessage implements Message { + private final String message; + private final Integer statusCode; + + /** + * Constructs an OperationMessage without a status code. + * + * @param message The message content. + */ + public OperationMessage(String message) { + this(message, null); + } + + /** + * Constructs an OperationMessage with a message and an optional status code. + * + * @param message The message content. + * @param statusCode An optional status code, can be null. + */ + public OperationMessage(String message, Integer statusCode) { + this.message = message; + this.statusCode = statusCode; + } + + @Override + public String getFormattedMessage() { + return message; + } + + @Override + public String getFormat() { + return message; + } + + @Override + public Object[] getParameters() { + // Always return an array, even if empty, to avoid null checks on the consuming side + return statusCode != null ? new Object[]{statusCode} : new Object[]{}; + } + + @Override + public Throwable getThrowable() { + // This implementation does not support Throwable directly + return null; + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java index e16eb0021..f4d456899 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java @@ -137,7 +137,6 @@ public void report(final SortedMap gauges, final SortedMap histograms, final SortedMap meters, final SortedMap timers) { - if (builder.withDryRun) { LOGGER.warn("** Reporter is running in 'DRY RUN' mode **"); } diff --git a/flint-core/src/test/java/org/opensearch/flint/core/logging/CustomLoggingTest.java b/flint-core/src/test/java/org/opensearch/flint/core/logging/CustomLoggingTest.java new file mode 100644 index 000000000..e5cda05b8 --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/logging/CustomLoggingTest.java @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging; + +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class CustomLoggingTest { + private static final String testMsg = "Test message"; + + @Parameterized.Parameter(0) + public Object content; + + @Parameterized.Parameter(1) + public String expectedMessage; + + @Parameterized.Parameter(2) + public Object expectedStatusCode; + + @Parameterized.Parameter(3) + public String severityLevel; + + @Parameterized.Parameter(4) + public Throwable throwable; + + @Parameterized.Parameters(name = "{index}: Test with content={0}, expectedMessage={1}, expectedStatusCode={2}, severityLevel={3}, throwable={4}") + public static Collection data() { + return Arrays.asList(new Object[][]{ + {testMsg, testMsg, null, "INFO", null}, + {new SimpleMessage(testMsg), testMsg, null, "DEBUG", null}, + {new OperationMessage(testMsg, 403), testMsg, 403, "ERROR", new RuntimeException("Test Exception")}, + {testMsg, testMsg, null, "UNDEFINED_LEVEL", null}, // Test with an undefined severity level + {new SimpleMessage(testMsg), testMsg, null, "INFO", new Exception("New Exception")}, + {testMsg, testMsg, null, "WARN", null}, + {"", "", null, "INFO", null}, + {new SimpleMessage(testMsg), testMsg, null, "FATAL", new Error("Critical Error")}, + }); + } + + @Test + public void testConstructLogEventMap() { + Map logEventMap = CustomLogging.constructLogEventMap(severityLevel, content, throwable); + + assertEquals("Incorrect severity text", severityLevel, logEventMap.get("severityText")); + assertTrue("Timestamp should be present and greater than 0", (Long)logEventMap.get("timestamp") > 0); + assertNotNull("Severity number should not be null", logEventMap.get("severityNumber")); + + Map body = (Map) logEventMap.get("body"); + assertEquals("Expected message value not found", expectedMessage, body.get("message")); + if (expectedStatusCode != null) { + assertEquals("Expected status code not found", expectedStatusCode, body.get("statusCode")); + } else { + assertNull("Status code should be null", body.get("statusCode")); + } + + if (throwable != null) { + Map attributes = (Map) logEventMap.get("attributes"); + assertEquals("Exception type should match", throwable.getClass().getName(), attributes.get("exception.type")); + assertEquals("Exception message should match", throwable.getMessage(), attributes.get("exception.message")); + } + } +} diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java index 2178c3c22..55f03a14b 100644 --- a/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/reporter/DimensionUtilsTest.java @@ -7,7 +7,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkEnv; -import org.apache.spark.metrics.source.FlintMetricSource; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals;