Skip to content

Commit

Permalink
implement chunk count wrapper for max chunk limit
Browse files Browse the repository at this point in the history
Signed-off-by: yuye-aws <[email protected]>
  • Loading branch information
yuye-aws committed Mar 8, 2024
1 parent 2983226 commit 1579324
Showing 1 changed file with 28 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public final class DocumentChunkingProcessor extends AbstractProcessor {

private static final int DEFAULT_MAX_CHUNK_LIMIT = -1;

private int currentChunkCount = 0;

private int maxChunkLimit = DEFAULT_MAX_CHUNK_LIMIT;

private String chunkerType;
Expand All @@ -67,6 +65,19 @@ public final class DocumentChunkingProcessor extends AbstractProcessor {

private final Environment environment;

/**
* Users may specify parameter max_chunk_limit for a restriction on the number of strings from chunking results.
* Here the chunkCountWrapper is to store and increase the number of chunks across all output fields.
* chunkCount: the number of chunks of chunking result.
*/
static class ChunkCountWrapper {
private int chunkCount;

protected ChunkCountWrapper(int chunkCount) {
this.chunkCount = chunkCount;
}
}

public DocumentChunkingProcessor(
String tag,
String description,
Expand Down Expand Up @@ -149,14 +160,14 @@ private boolean isListString(Object value) {
return true;
}

private List<String> chunkString(String content) {
private List<String> chunkString(String content, ChunkCountWrapper chunkCountWrapper) {
FieldChunker chunker = ChunkerFactory.create(chunkerType, analysisRegistry);
List<String> result = chunker.chunk(content, chunkerParameters);
currentChunkCount += result.size();
if (maxChunkLimit != DEFAULT_MAX_CHUNK_LIMIT && currentChunkCount > maxChunkLimit) {
chunkCountWrapper.chunkCount += result.size();
if (maxChunkLimit != DEFAULT_MAX_CHUNK_LIMIT && chunkCountWrapper.chunkCount > maxChunkLimit) {
throw new IllegalArgumentException(
"Unable to create the processor as the number of chunks ["
+ currentChunkCount
+ chunkCountWrapper.chunkCount
+ "] exceeds the maximum chunk limit ["
+ maxChunkLimit
+ "]"
Expand All @@ -165,23 +176,23 @@ private List<String> chunkString(String content) {
return result;
}

private List<String> chunkList(List<String> contentList) {
private List<String> chunkList(List<String> contentList, ChunkCountWrapper chunkCountWrapper) {
// flatten the List<List<String>> output to List<String>
List<String> result = new ArrayList<>();
for (String content : contentList) {
result.addAll(chunkString(content));
result.addAll(chunkString(content, chunkCountWrapper));
}
return result;
}

@SuppressWarnings("unchecked")
private List<String> chunkLeafType(Object value) {
private List<String> chunkLeafType(Object value, ChunkCountWrapper chunkCountWrapper) {
// leaf type is either String or List<String>
List<String> chunkedResult = null;
if (value instanceof String) {
chunkedResult = chunkString(value.toString());
chunkedResult = chunkString(value.toString(), chunkCountWrapper);
} else if (isListString(value)) {
chunkedResult = chunkList((List<String>) value);
chunkedResult = chunkList((List<String>) value, chunkCountWrapper);
}
return chunkedResult;
}
Expand All @@ -193,7 +204,7 @@ private List<String> chunkLeafType(Object value) {
@Override
public IngestDocument execute(IngestDocument ingestDocument) {
validateFieldsValue(ingestDocument);
currentChunkCount = 0;
ChunkCountWrapper chunkCountWrapper = new ChunkCountWrapper(0);
if (Objects.equals(chunkerType, FIXED_LENGTH_ALGORITHM)) {
// add maxTokenCount setting from index metadata to chunker parameters
Map<String, Object> sourceAndMetadataMap = ingestDocument.getSourceAndMetadata();
Expand All @@ -209,7 +220,7 @@ public IngestDocument execute(IngestDocument ingestDocument) {
}

Map<String, Object> sourceAndMetadataMap = ingestDocument.getSourceAndMetadata();
chunkMapType(sourceAndMetadataMap, fieldMap);
chunkMapType(sourceAndMetadataMap, fieldMap, chunkCountWrapper);
sourceAndMetadataMap.forEach(ingestDocument::setFieldValue);
return ingestDocument;
}
Expand Down Expand Up @@ -259,7 +270,7 @@ private void validateListTypeValue(String sourceKey, Object sourceValue, int max
}

@SuppressWarnings("unchecked")
private void chunkMapType(Map<String, Object> sourceAndMetadataMap, Map<String, Object> fieldMap) {
private void chunkMapType(Map<String, Object> sourceAndMetadataMap, Map<String, Object> fieldMap, ChunkCountWrapper chunkCountWrapper) {
for (Map.Entry<String, Object> fieldMapEntry : fieldMap.entrySet()) {
String originalKey = fieldMapEntry.getKey();
Object targetKey = fieldMapEntry.getValue();
Expand All @@ -270,16 +281,16 @@ private void chunkMapType(Map<String, Object> sourceAndMetadataMap, Map<String,
List<Object> sourceObjectList = (List<Object>) sourceObject;
for (Object source : sourceObjectList) {
if (source instanceof Map) {
chunkMapType((Map<String, Object>) source, (Map<String, Object>) targetKey);
chunkMapType((Map<String, Object>) source, (Map<String, Object>) targetKey, chunkCountWrapper);
}
}
} else if (sourceObject instanceof Map) {
chunkMapType((Map<String, Object>) sourceObject, (Map<String, Object>) targetKey);
chunkMapType((Map<String, Object>) sourceObject, (Map<String, Object>) targetKey, chunkCountWrapper);
}
} else {
// chunk the object when target key is a string
Object chunkObject = sourceAndMetadataMap.get(originalKey);
List<String> chunkedResult = chunkLeafType(chunkObject);
List<String> chunkedResult = chunkLeafType(chunkObject, chunkCountWrapper);
if (chunkedResult != null) {
sourceAndMetadataMap.put(String.valueOf(targetKey), chunkedResult);
}
Expand Down

0 comments on commit 1579324

Please sign in to comment.