Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor flint log format #263

Merged
merged 3 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.BiConsumer;

/**
* CustomLogging provides a structured logging framework that supports various log levels
* and formats log messages as JSON. This new log format follows OTEL convention
* https://opentelemetry.io/docs/specs/semconv/general/logs/
*/
public class CustomLogging {
private static final Logger logger = LogManager.getLogger(CustomLogging.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final String CLIENT_ID;
private static final String DOMAIN_NAME;
private static final String UNKNOWN = "UNKNOWN";

/**
* Default severity level follows https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
*/
private static final Map<String, Integer> severityLevelMap = Map.of(
"TRACE", 1,
"DEBUG", 5,
"INFO", 9,
"WARN", 13,
"ERROR", 17,
"FATAL", 21
);
private static final BiConsumer<String, Throwable> defaultLogAction = logger::info;
private static final Map<String, BiConsumer<String, Throwable>> logLevelActions = new HashMap<>();

static {
String[] parts = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN).split(":");
CLIENT_ID = parts.length == 2 ? parts[0] : UNKNOWN;
DOMAIN_NAME = parts.length == 2 ? parts[1] : UNKNOWN;

logLevelActions.put("DEBUG", logger::debug);
logLevelActions.put("INFO", logger::info);
logLevelActions.put("WARN", logger::warn);
logLevelActions.put("ERROR", logger::error);
logLevelActions.put("FATAL", logger::fatal);
}

private static int getSeverityNumber(String level) {
return severityLevelMap.getOrDefault(level, 0);
}

private static String convertToJson(Map<String, Object> logEventMap) {
try {
return OBJECT_MAPPER.writeValueAsString(logEventMap);
} catch (JsonProcessingException e) {
System.err.println("Error serializing log event to JSON: " + e.getMessage());
return "{\"Error\":\"Error serializing log event\"}";
}
}

/**
* Constructs a log event map containing log details such as timestamp, severity,
* message body, and custom attributes including domainName and clientId.
*
* @param level The severity level of the log.
* @param content The main content of the log message.
* @param throwable An optional Throwable associated with log messages for error levels.
* @return A map representation of the log event.
*/
protected static Map<String, Object> constructLogEventMap(String level, Object content, Throwable throwable) {
if (content == null) {
throw new IllegalArgumentException("Log message must not be null");
}

Map<String, Object> logEventMap = new LinkedHashMap<>();
Map<String, Object> body = new LinkedHashMap<>();
constructMessageBody(content, body);

Map<String, Object> attributes = new LinkedHashMap<>();
attributes.put("domainName", DOMAIN_NAME);
attributes.put("clientId", CLIENT_ID);

if (throwable != null) {
attributes.put("exception.type", throwable.getClass().getName());
attributes.put("exception.message", throwable.getMessage());
}

logEventMap.put("timestamp", System.currentTimeMillis());
logEventMap.put("severityText", level);
logEventMap.put("severityNumber", getSeverityNumber(level));
logEventMap.put("body", body);
logEventMap.put("attributes", attributes);

return logEventMap;
}

private static void constructMessageBody(Object content, Map<String, Object> body) {
if (content instanceof Message) {
Message message = (Message) content;
body.put("message", message.getFormattedMessage());
if (content instanceof OperationMessage && message.getParameters().length > 0) {
body.put("statusCode", message.getParameters()[0]);
}
} else {
body.put("message", content.toString());
}
}

/**
* Logs a message with the specified severity level, message content, and an optional throwable.
*
* @param level The severity level of the log.
* @param content The content of the log message.
* @param throwable An optional Throwable for logging errors or exceptions.
*/
private static void log(String level, Object content, Throwable throwable) {
Map<String, Object> logEventMap = constructLogEventMap(level, content, throwable);
String jsonMessage = convertToJson(logEventMap);
logLevelActions.getOrDefault(level, defaultLogAction).accept(jsonMessage, throwable);
}

// Public logging methods for various severity levels.

public static void logDebug(Object message) {
log("DEBUG", message, null);
}

public static void logInfo(Object message) {
log("INFO", message, null);
}

public static void logWarning(Object message) {
log("WARN", message, null);
}

public static void logWarning(Object message, Throwable e) {
log("WARN", message, e);
}

public static void logError(Object message) {
log("ERROR", message, null);
}

public static void logError(Object message, Throwable throwable) {
log("ERROR", message, throwable);
}

public static void logFatal(Object message) {
log("FATAL", message, null);
}

public static void logFatal(Object message, Throwable throwable) {
log("FATAL", message, throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import org.apache.logging.log4j.message.Message;

/**
* Represents an operation message with optional status code for logging purposes.
*/
public final class OperationMessage implements Message {
private final String message;
private final Integer statusCode;

/**
* Constructs an OperationMessage without a status code.
*
* @param message The message content.
*/
public OperationMessage(String message) {
this(message, null);
}

/**
* Constructs an OperationMessage with a message and an optional status code.
*
* @param message The message content.
* @param statusCode An optional status code, can be null.
*/
public OperationMessage(String message, Integer statusCode) {
this.message = message;
this.statusCode = statusCode;
}

@Override
public String getFormattedMessage() {
return message;
}

@Override
public String getFormat() {
return message;
}

@Override
public Object[] getParameters() {
// Always return an array, even if empty, to avoid null checks on the consuming side
return statusCode != null ? new Object[]{statusCode} : new Object[]{};
}

@Override
public Throwable getThrowable() {
// This implementation does not support Throwable directly
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public void report(final SortedMap<String, Gauge> gauges,
final SortedMap<String, Histogram> histograms,
final SortedMap<String, Meter> meters,
final SortedMap<String, Timer> timers) {

if (builder.withDryRun) {
LOGGER.warn("** Reporter is running in 'DRY RUN' mode **");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import org.apache.logging.log4j.message.SimpleMessage;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class CustomLoggingTest {
private static final String testMsg = "Test message";

@Parameterized.Parameter(0)
public Object content;

@Parameterized.Parameter(1)
public String expectedMessage;

@Parameterized.Parameter(2)
public Object expectedStatusCode;

@Parameterized.Parameter(3)
public String severityLevel;

@Parameterized.Parameter(4)
public Throwable throwable;

@Parameterized.Parameters(name = "{index}: Test with content={0}, expectedMessage={1}, expectedStatusCode={2}, severityLevel={3}, throwable={4}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{testMsg, testMsg, null, "INFO", null},
{new SimpleMessage(testMsg), testMsg, null, "DEBUG", null},
{new OperationMessage(testMsg, 403), testMsg, 403, "ERROR", new RuntimeException("Test Exception")},
{testMsg, testMsg, null, "UNDEFINED_LEVEL", null}, // Test with an undefined severity level
{new SimpleMessage(testMsg), testMsg, null, "INFO", new Exception("New Exception")},
{testMsg, testMsg, null, "WARN", null},
{"", "", null, "INFO", null},
{new SimpleMessage(testMsg), testMsg, null, "FATAL", new Error("Critical Error")},
});
}

@Test
public void testConstructLogEventMap() {
Map<String, Object> logEventMap = CustomLogging.constructLogEventMap(severityLevel, content, throwable);

assertEquals("Incorrect severity text", severityLevel, logEventMap.get("severityText"));
assertTrue("Timestamp should be present and greater than 0", (Long)logEventMap.get("timestamp") > 0);
assertNotNull("Severity number should not be null", logEventMap.get("severityNumber"));

Map<String, Object> body = (Map<String, Object>) logEventMap.get("body");
assertEquals("Expected message value not found", expectedMessage, body.get("message"));
if (expectedStatusCode != null) {
assertEquals("Expected status code not found", expectedStatusCode, body.get("statusCode"));
} else {
assertNull("Status code should be null", body.get("statusCode"));
}

if (throwable != null) {
Map<String, Object> attributes = (Map<String, Object>) logEventMap.get("attributes");
assertEquals("Exception type should match", throwable.getClass().getName(), attributes.get("exception.type"));
assertEquals("Exception message should match", throwable.getMessage(), attributes.get("exception.message"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down
Loading