Skip to content

Commit

Permalink
Refactor flint log format
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Mar 1, 2024
1 parent faa0d07 commit 5cf4ac2
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.AbstractStringLayout;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

@Plugin(name = "CustomJsonLayout", category = "Core", elementType = Layout.ELEMENT_TYPE, printObject = true)
public class CustomJsonLayout extends AbstractStringLayout {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String CLIENT_ID;
private static final String DOMAIN_NAME;
private static final String UNKNOWN = "UNKNOWN";

static {
String value = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", "");
String[] parts = value.split(":");
if (value.isEmpty() || parts.length != 2) {
CLIENT_ID = UNKNOWN;
DOMAIN_NAME = UNKNOWN;
} else {
CLIENT_ID = parts[0];
DOMAIN_NAME = parts[1];
}
}

protected CustomJsonLayout(Charset charset) {
super(charset);
}

@PluginFactory
public static CustomJsonLayout createLayout(@PluginAttribute(value = "charset", defaultString = "UTF-8") Charset charset) {
return new CustomJsonLayout(charset);
}

@Override
public String toSerializable(LogEvent event) {
if (!(event.getMessage() instanceof OperationMessage)) {
return event.getMessage().getFormattedMessage() + System.lineSeparator();
}

Map<String, Object> logEventMap = new HashMap<>();
logEventMap.put("timestamp", event.getTimeMillis());
logEventMap.put("message", event.getMessage().getFormattedMessage());
logEventMap.put("domainName", DOMAIN_NAME);
logEventMap.put("clientId", CLIENT_ID);

if (event.getMessage().getParameters().length == 1) {
logEventMap.put("StatusCode", event.getMessage().getParameters()[0]);
}

Throwable throwable = event.getThrown();
if (throwable != null) {
logEventMap.put("Exception", throwable.getClass().getName());
logEventMap.put("ExceptionMessage", throwable.getMessage());
}

return convertToJson(logEventMap) + System.lineSeparator();
}

private String convertToJson(Map<String, Object> logEventMap) {
try {
return OBJECT_MAPPER.writeValueAsString(logEventMap);
} catch (JsonProcessingException e) {
// Logging this error using System.err to avoid recursion
System.err.println("Error serializing log event to JSON: " + e.getMessage());
return "{\"Error\":\"Error serializing log event\"}";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import org.apache.logging.log4j.message.Message;

/**
* Represents an operation message with optional status code for logging purposes.
*/
public final class OperationMessage implements Message {
private final String message;
private final Integer statusCode;

/**
* Constructs an OperationMessage without a status code.
*
* @param message The message content.
*/
public OperationMessage(String message) {
this(message, null);
}

/**
* Constructs an OperationMessage with a message and an optional status code.
*
* @param message The message content.
* @param statusCode An optional status code, can be null.
*/
public OperationMessage(String message, Integer statusCode) {
this.message = message;
this.statusCode = statusCode;
}

@Override
public String getFormattedMessage() {
return message;
}

@Override
public String getFormat() {
return message;
}

@Override
public Object[] getParameters() {
// Always return an array, even if empty, to avoid null checks on the consuming side
return statusCode != null ? new Object[]{statusCode} : new Object[]{};
}

@Override
public Throwable getThrowable() {
// This implementation does not support Throwable directly
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
* @param <T> result type (supposed to be HttpResponse for OS client)
*/
public class HttpStatusCodeResultPredicate<T> implements CheckedPredicate<T> {

private static final Logger LOG = Logger.getLogger(HttpStatusCodeResultPredicate.class.getName());

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.message.Message;
import org.junit.Test;

import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class CustomJsonLayoutTest {
@Test
public void testSuccessfulLogPlainMessage() {
LogEvent event = mock(LogEvent.class);
when(event.getMessage()).thenReturn(mock(Message.class));
when(event.getMessage().getFormattedMessage()).thenReturn("Test message");

CustomJsonLayout layout = CustomJsonLayout.createLayout(StandardCharsets.UTF_8);
String result = layout.toSerializable(event);
assertNotNull(result);
assertEquals(result, "Test message" + System.lineSeparator());
}

@Test
public void testSuccessfulLogOperationMessage() throws NoSuchFieldException, IllegalAccessException {
Class<?> classOfMap = System.getenv().getClass();
Field field = classOfMap.getDeclaredField("m");
field.setAccessible(true);
Map<String, String> writeableEnvironmentVariables = (Map<String, String>)field.get(System.getenv());
writeableEnvironmentVariables.put("FLINT_CLUSTER_NAME", "1234567890:testDomain");

LogEvent event = mock(LogEvent.class);
when(event.getTimeMillis()).thenReturn(System.currentTimeMillis());
when(event.getMessage()).thenReturn(mock(OperationMessage.class));
when(event.getMessage().getFormattedMessage()).thenReturn("Test message");
when(event.getMessage().getParameters()).thenReturn(new Object[] {new Integer(200)});

try {
CustomJsonLayout layout = CustomJsonLayout.createLayout(StandardCharsets.UTF_8);

String result = layout.toSerializable(event);
assertNotNull(result);
assertTrue(result.contains("\"message\":\"Test message\""));
assertTrue(result.contains("\"clientId\":\"1234567890\""));
assertTrue(result.contains("\"domainName\":\"testDomain\""));
assertTrue(result.contains("\"StatusCode\":200"));
} finally {
// since system environment is shared by other tests. Make sure to remove them before exiting.
writeableEnvironmentVariables.remove("FLINT_CLUSTER_NAME");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down

0 comments on commit 5cf4ac2

Please sign in to comment.