Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Jan 5, 2024
1 parent 04867e8 commit 220aff1
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,32 @@
*/
@PublicApi(since = "2.8.0")
public interface ThreadContextStatePropagator {
/**
* Returns the list of transient headers that needs to be propagated from current context to new thread context.
*
* @param source current context transient headers
* @return the list of transient headers that needs to be propagated from current context to new thread context
*/
Map<String, Object> transients(Map<String, Object> source);

/**
* Returns the list of transient headers that needs to be propagated from current context to new thread context.
*
* @param source current context transient headers
* @param isSystemContext if the propagation is for system context.
* @return the list of transient headers that needs to be propagated from current context to new thread context
*/
Map<String, Object> transients(Map<String, Object> source, boolean isSystemContext);
default Map<String, Object> transients(Map<String, Object> source, boolean isSystemContext) {
return transients(source);
};

/**
* Returns the list of request headers that needs to be propagated from current context to request.
*
* @param source current context headers
* @return the list of request headers that needs to be propagated from current context to request
*/
Map<String, String> headers(Map<String, Object> source);

/**
* Returns the list of request headers that needs to be propagated from current context to request.
Expand All @@ -36,5 +54,7 @@ public interface ThreadContextStatePropagator {
* @param isSystemContext if the propagation is for system context.
* @return the list of request headers that needs to be propagated from current context to request
*/
Map<String, String> headers(Map<String, Object> source, boolean isSystemContext);
default Map<String, String> headers(Map<String, Object> source, boolean isSystemContext) {
return headers(source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
public class TaskThreadContextStatePropagator implements ThreadContextStatePropagator {
@Override
public Map<String, Object> transients(Map<String, Object> source, boolean isSystemContext) {
public Map<String, Object> transients(Map<String, Object> source) {
final Map<String, Object> transients = new HashMap<>();

if (source.containsKey(TASK_ID)) {
Expand All @@ -32,7 +32,7 @@ public Map<String, Object> transients(Map<String, Object> source, boolean isSyst
}

@Override
public Map<String, String> headers(Map<String, Object> source, boolean isSystemContext) {
public Map<String, String> headers(Map<String, Object> source) {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextStatePropagator;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -50,9 +51,9 @@ public void put(String key, Span span) {
}

@Override
public Map<String, Object> transients(Map<String, Object> source, boolean isSystemContext) {
public Map<String, Object> transients(Map<String, Object> source) {
final Map<String, Object> transients = new HashMap<>();
if (isSystemContext == false && source.containsKey(CURRENT_SPAN)) {
if (source.containsKey(CURRENT_SPAN)) {
final SpanReference current = (SpanReference) source.get(CURRENT_SPAN);
if (current != null) {
transients.put(CURRENT_SPAN, new SpanReference(current.getSpan()));
Expand All @@ -62,7 +63,16 @@ public Map<String, Object> transients(Map<String, Object> source, boolean isSyst
}

@Override
public Map<String, String> headers(Map<String, Object> source, boolean isSystemContext) {
public Map<String, Object> transients(Map<String, Object> source, boolean isSystemContext) {
if (isSystemContext == true) {
return Collections.emptyMap();
} else {
return transients(source);
}
}

@Override
public Map<String, String> headers(Map<String, Object> source) {
final Map<String, String> headers = new HashMap<>();

if (source.containsKey(CURRENT_SPAN)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,23 +787,42 @@ public void testSerializeSystemContext() throws IOException {

private ThreadContextStatePropagator createDummyPropagator(final String key) {
return new ThreadContextStatePropagator() {

@Override
public Map<String, Object> transients(Map<String, Object> source, boolean isSystemContext) {
public Map<String, Object> transients(Map<String, Object> source) {
Map<String, Object> transients = new HashMap<>();
if (isSystemContext == false && source.containsKey(key)) {
if (source.containsKey(key)) {
transients.put(key, source.get(key));
}
return transients;
}

@Override
public Map<String, String> headers(Map<String, Object> source, boolean isSystemContext) {
public Map<String, Object> transients(Map<String, Object> source, boolean isSystemContext) {
if (isSystemContext == true) {
return Collections.emptyMap();
} else {
return transients(source);
}
}

@Override
public Map<String, String> headers(Map<String, Object> source) {
Map<String, String> headers = new HashMap<>();
if (isSystemContext == false && source.containsKey(key)) {
if (source.containsKey(key)) {
headers.put(key, (String) source.get(key));
}
return headers;
}

@Override
public Map<String, String> headers(Map<String, Object> source, boolean isSystemContext) {
if (isSystemContext == true) {
return Collections.emptyMap();
} else {
return headers(source);
}
}
};
}

Expand Down

0 comments on commit 220aff1

Please sign in to comment.