Skip to content

Commit

Permalink
Merge branch 'main' into implement-bloom-filter-query-pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Mar 8, 2024
2 parents 0d60837 + 8e810e5 commit 8c93f63
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.BiConsumer;

/**
* CustomLogging provides a structured logging framework that supports various log levels
* and formats log messages as JSON. This new log format follows OTEL convention
* https://opentelemetry.io/docs/specs/semconv/general/logs/
*/
public class CustomLogging {
private static final Logger logger = LogManager.getLogger(CustomLogging.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final String CLIENT_ID;
private static final String DOMAIN_NAME;
private static final String UNKNOWN = "UNKNOWN";

/**
* Default severity level follows https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
*/
private static final Map<String, Integer> severityLevelMap = Map.of(
"TRACE", 1,
"DEBUG", 5,
"INFO", 9,
"WARN", 13,
"ERROR", 17,
"FATAL", 21
);
private static final BiConsumer<String, Throwable> defaultLogAction = logger::info;
private static final Map<String, BiConsumer<String, Throwable>> logLevelActions = new HashMap<>();

static {
String[] parts = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN).split(":");
CLIENT_ID = parts.length == 2 ? parts[0] : UNKNOWN;
DOMAIN_NAME = parts.length == 2 ? parts[1] : UNKNOWN;

logLevelActions.put("DEBUG", logger::debug);
logLevelActions.put("INFO", logger::info);
logLevelActions.put("WARN", logger::warn);
logLevelActions.put("ERROR", logger::error);
logLevelActions.put("FATAL", logger::fatal);
}

private static int getSeverityNumber(String level) {
return severityLevelMap.getOrDefault(level, 0);
}

private static String convertToJson(Map<String, Object> logEventMap) {
try {
return OBJECT_MAPPER.writeValueAsString(logEventMap);
} catch (JsonProcessingException e) {
System.err.println("Error serializing log event to JSON: " + e.getMessage());
return "{\"Error\":\"Error serializing log event\"}";
}
}

/**
* Constructs a log event map containing log details such as timestamp, severity,
* message body, and custom attributes including domainName and clientId.
*
* @param level The severity level of the log.
* @param content The main content of the log message.
* @param throwable An optional Throwable associated with log messages for error levels.
* @return A map representation of the log event.
*/
protected static Map<String, Object> constructLogEventMap(String level, Object content, Throwable throwable) {
if (content == null) {
throw new IllegalArgumentException("Log message must not be null");
}

Map<String, Object> logEventMap = new LinkedHashMap<>();
Map<String, Object> body = new LinkedHashMap<>();
constructMessageBody(content, body);

Map<String, Object> attributes = new LinkedHashMap<>();
attributes.put("domainName", DOMAIN_NAME);
attributes.put("clientId", CLIENT_ID);

if (throwable != null) {
attributes.put("exception.type", throwable.getClass().getName());
attributes.put("exception.message", throwable.getMessage());
}

logEventMap.put("timestamp", System.currentTimeMillis());
logEventMap.put("severityText", level);
logEventMap.put("severityNumber", getSeverityNumber(level));
logEventMap.put("body", body);
logEventMap.put("attributes", attributes);

return logEventMap;
}

private static void constructMessageBody(Object content, Map<String, Object> body) {
if (content instanceof Message) {
Message message = (Message) content;
body.put("message", message.getFormattedMessage());
if (content instanceof OperationMessage && message.getParameters().length > 0) {
body.put("statusCode", message.getParameters()[0]);
}
} else {
body.put("message", content.toString());
}
}

/**
* Logs a message with the specified severity level, message content, and an optional throwable.
*
* @param level The severity level of the log.
* @param content The content of the log message.
* @param throwable An optional Throwable for logging errors or exceptions.
*/
private static void log(String level, Object content, Throwable throwable) {
Map<String, Object> logEventMap = constructLogEventMap(level, content, throwable);
String jsonMessage = convertToJson(logEventMap);
logLevelActions.getOrDefault(level, defaultLogAction).accept(jsonMessage, throwable);
}

// Public logging methods for various severity levels.

public static void logDebug(Object message) {
log("DEBUG", message, null);
}

public static void logInfo(Object message) {
log("INFO", message, null);
}

public static void logWarning(Object message) {
log("WARN", message, null);
}

public static void logWarning(Object message, Throwable e) {
log("WARN", message, e);
}

public static void logError(Object message) {
log("ERROR", message, null);
}

public static void logError(Object message, Throwable throwable) {
log("ERROR", message, throwable);
}

public static void logFatal(Object message) {
log("FATAL", message, null);
}

public static void logFatal(Object message, Throwable throwable) {
log("FATAL", message, throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import org.apache.logging.log4j.message.Message;

/**
* Represents an operation message with optional status code for logging purposes.
*/
public final class OperationMessage implements Message {
private final String message;
private final Integer statusCode;

/**
* Constructs an OperationMessage without a status code.
*
* @param message The message content.
*/
public OperationMessage(String message) {
this(message, null);
}

/**
* Constructs an OperationMessage with a message and an optional status code.
*
* @param message The message content.
* @param statusCode An optional status code, can be null.
*/
public OperationMessage(String message, Integer statusCode) {
this.message = message;
this.statusCode = statusCode;
}

@Override
public String getFormattedMessage() {
return message;
}

@Override
public String getFormat() {
return message;
}

@Override
public Object[] getParameters() {
// Always return an array, even if empty, to avoid null checks on the consuming side
return statusCode != null ? new Object[]{statusCode} : new Object[]{};
}

@Override
public Throwable getThrowable() {
// This implementation does not support Throwable directly
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public void report(final SortedMap<String, Gauge> gauges,
final SortedMap<String, Histogram> histograms,
final SortedMap<String, Meter> meters,
final SortedMap<String, Timer> timers) {

if (builder.withDryRun) {
LOGGER.warn("** Reporter is running in 'DRY RUN' mode **");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import org.apache.logging.log4j.message.SimpleMessage;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class CustomLoggingTest {
private static final String testMsg = "Test message";

@Parameterized.Parameter(0)
public Object content;

@Parameterized.Parameter(1)
public String expectedMessage;

@Parameterized.Parameter(2)
public Object expectedStatusCode;

@Parameterized.Parameter(3)
public String severityLevel;

@Parameterized.Parameter(4)
public Throwable throwable;

@Parameterized.Parameters(name = "{index}: Test with content={0}, expectedMessage={1}, expectedStatusCode={2}, severityLevel={3}, throwable={4}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{testMsg, testMsg, null, "INFO", null},
{new SimpleMessage(testMsg), testMsg, null, "DEBUG", null},
{new OperationMessage(testMsg, 403), testMsg, 403, "ERROR", new RuntimeException("Test Exception")},
{testMsg, testMsg, null, "UNDEFINED_LEVEL", null}, // Test with an undefined severity level
{new SimpleMessage(testMsg), testMsg, null, "INFO", new Exception("New Exception")},
{testMsg, testMsg, null, "WARN", null},
{"", "", null, "INFO", null},
{new SimpleMessage(testMsg), testMsg, null, "FATAL", new Error("Critical Error")},
});
}

@Test
public void testConstructLogEventMap() {
Map<String, Object> logEventMap = CustomLogging.constructLogEventMap(severityLevel, content, throwable);

assertEquals("Incorrect severity text", severityLevel, logEventMap.get("severityText"));
assertTrue("Timestamp should be present and greater than 0", (Long)logEventMap.get("timestamp") > 0);
assertNotNull("Severity number should not be null", logEventMap.get("severityNumber"));

Map<String, Object> body = (Map<String, Object>) logEventMap.get("body");
assertEquals("Expected message value not found", expectedMessage, body.get("message"));
if (expectedStatusCode != null) {
assertEquals("Expected status code not found", expectedStatusCode, body.get("statusCode"));
} else {
assertNull("Status code should be null", body.get("statusCode"));
}

if (throwable != null) {
Map<String, Object> attributes = (Map<String, Object>) logEventMap.get("attributes");
assertEquals("Exception type should match", throwable.getClass().getName(), attributes.get("exception.type"));
assertEquals("Exception message should match", throwable.getMessage(), attributes.get("exception.message"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down

0 comments on commit 8c93f63

Please sign in to comment.