Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10782. Replace GSON with Jackson in EventQueue #7247

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@
*/
package org.apache.hadoop.hdds.server.events;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.net.InnerNode;
import org.apache.hadoop.hdds.scm.net.NodeImpl;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -26,19 +39,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import org.apache.hadoop.hdds.scm.net.NodeImpl;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Simple async event processing utility.
* <p>
Expand All @@ -61,9 +61,10 @@ public class EventQueue implements EventPublisher, AutoCloseable {

private boolean isRunning = true;

private static final Gson TRACING_SERIALIZER = new GsonBuilder()
.setExclusionStrategies(new DatanodeDetailsGsonExclusionStrategy())
.create();
private static final ObjectMapper TRACING_SERIALIZER = new ObjectMapper()
.enable(SerializationFeature.INDENT_OUTPUT)
.addMixIn(NodeImpl.class, DatanodeDetailsJacksonMixin.class)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

private boolean isSilent = false;
private final String threadNamePrefix;
Expand All @@ -79,18 +80,9 @@ public EventQueue(String threadNamePrefix) {
// The field parent in DatanodeDetails class has the circular reference
// which will result in Gson infinite recursive parsing. We need to exclude
// this field when generating json string for DatanodeDetails object
static class DatanodeDetailsGsonExclusionStrategy
implements ExclusionStrategy {
@Override
public boolean shouldSkipField(FieldAttributes f) {
return f.getDeclaringClass() == NodeImpl.class
&& f.getName().equals("parent");
}

@Override
public boolean shouldSkipClass(Class<?> aClass) {
return false;
}
abstract static class DatanodeDetailsJacksonMixin {
@JsonIgnore
abstract InnerNode getParent();
}

/**
Expand Down Expand Up @@ -198,30 +190,32 @@ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(

eventCount.incrementAndGet();
if (eventExecutorListMap != null) {

for (Map.Entry<EventExecutor, List<EventHandler>> executorAndHandlers :
eventExecutorListMap.entrySet()) {

for (EventHandler handler : executorAndHandlers.getValue()) {
queuedCount.incrementAndGet();
if (LOG.isTraceEnabled()) {
LOG.trace(
"Delivering [event={}] to executor/handler {}: <json>{}</json>",
event.getName(),
executorAndHandlers.getKey().getName(),
TRACING_SERIALIZER.toJson(payload).replaceAll("\n", "\\\\n"));
} else if (LOG.isDebugEnabled()) {
LOG.debug("Delivering [event={}] to executor/handler {}: {}",
event.getName(),
executorAndHandlers.getKey().getName(),
payload.getClass().getSimpleName());
try {
if (LOG.isTraceEnabled()) {
String jsonPayload = TRACING_SERIALIZER.writeValueAsString(payload);
LOG.trace(
"Delivering [event={}] to executor/handler {}: <json>{}</json>",
event.getName(),
executorAndHandlers.getKey().getName(),
jsonPayload.replaceAll("\n", "\\\\n"));
} else if (LOG.isDebugEnabled()) {
LOG.debug("Delivering [event={}] to executor/handler {}: {}",
event.getName(),
executorAndHandlers.getKey().getName(),
payload.getClass().getSimpleName());
}
} catch (JsonProcessingException e) {
LOG.error("Error serializing payload: {}", e.getMessage());
}
executorAndHandlers.getKey()
.onMessage(handler, payload, this);

}
}

} else {
if (!isSilent) {
LOG.warn("No event handler registered for event {}", event);
Expand Down