Skip to content

Commit

Permalink
Change thread queue to 100 and fix headers parsing bug (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#265)

change thread queue to 100 and fix headers bug

Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz authored and dbwiddis committed Dec 15, 2023
1 parent fe43586 commit 1b37073
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
settings,
PROVISION_THREAD_POOL,
OpenSearchExecutors.allocatedProcessors(settings),
10,
100,
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Objects;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.util.ParseUtils.buildStringToObjectMap;
import static org.opensearch.flowframework.util.ParseUtils.buildStringToStringMap;
import static org.opensearch.flowframework.util.ParseUtils.parseStringToObjectMap;
import static org.opensearch.flowframework.util.ParseUtils.parseStringToStringMap;

/**
Expand Down Expand Up @@ -93,7 +95,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
} else {
for (Map<?, ?> map : (Map<?, ?>[]) e.getValue()) {
buildStringToStringMap(xContentBuilder, map);
buildStringToObjectMap(xContentBuilder, map);
}
}
xContentBuilder.endArray();
Expand Down Expand Up @@ -150,9 +152,9 @@ public static WorkflowNode parse(XContentParser parser) throws IOException {
}
userInputs.put(inputFieldName, processorList.toArray(new PipelineProcessor[0]));
} else {
List<Map<String, String>> mapList = new ArrayList<>();
List<Map<String, Object>> mapList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
mapList.add(parseStringToStringMap(parser));
mapList.add(parseStringToObjectMap(parser));
}
userInputs.put(inputFieldName, mapList.toArray(new Map[0]));
}
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/org/opensearch/flowframework/util/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ public static void buildStringToStringMap(XContentBuilder xContentBuilder, Map<?
xContentBuilder.endObject();
}

/**
* Builds an XContent object representing a map of String keys to Object values.
*
* @param xContentBuilder An XContent builder whose position is at the start of the map object to build
* @param map A map as key-value String to Object.
* @throws IOException on a build failure
*/
public static void buildStringToObjectMap(XContentBuilder xContentBuilder, Map<?, ?> map) throws IOException {
xContentBuilder.startObject();
for (Entry<?, ?> e : map.entrySet()) {
if (e.getValue() instanceof String) {
xContentBuilder.field((String) e.getKey(), (String) e.getValue());
} else {
xContentBuilder.field((String) e.getKey(), e.getValue());
}
}
xContentBuilder.endObject();
}

/**
* Builds an XContent object representing a LLMSpec.
*
Expand Down Expand Up @@ -117,6 +136,30 @@ public static Map<String, String> parseStringToStringMap(XContentParser parser)
return map;
}

/**
* Parses an XContent object representing a map of String keys to Object values.
* The Object value here can either be a string or a map
* @param parser An XContent parser whose position is at the start of the map object to parse
* @return A map as identified by the key-value pairs in the XContent
* @throws IOException on a parse failure
*/
public static Map<String, Object> parseStringToObjectMap(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
Map<String, Object> map = new HashMap<>();
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
// If the current token is a START_OBJECT, parse it as Map<String, String>
map.put(fieldName, parseStringToStringMap(parser));
} else {
// Otherwise, parse it as a string
map.put(fieldName, parser.text());
}
}
return map;
}

/**
* Parse content parser to {@link java.time.Instant}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ public void testToInstantWithNotValue() throws IOException {
assertNull(instant);
}

public void testBuildAndParseStringToStringMap() throws IOException {
Map<String, String> stringMap = Map.ofEntries(Map.entry("one", "two"));
XContentBuilder builder = XContentFactory.jsonBuilder();
ParseUtils.buildStringToStringMap(builder, stringMap);
XContentParser parser = this.createParser(builder);
parser.nextToken();
Map<String, String> parsedMap = ParseUtils.parseStringToStringMap(parser);
assertEquals(stringMap.get("one"), parsedMap.get("one"));
}

public void testGetInputsFromPreviousSteps() {
WorkflowData currentNodeInputs = new WorkflowData(
Map.ofEntries(Map.entry("content1", 1), Map.entry("param1", 2), Map.entry("content3", "${{step1.output1}}")),
Expand Down

0 comments on commit 1b37073

Please sign in to comment.