Skip to content

Commit

Permalink
resolve code review comments
Browse files Browse the repository at this point in the history
Signed-off-by: yuye-aws <[email protected]>
  • Loading branch information
yuye-aws committed Mar 8, 2024
1 parent ac40983 commit 30e75a1
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.neuralsearch.processor.TextImageEmbeddingProcessor;
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationFactory;
import org.opensearch.neuralsearch.processor.combination.ScoreCombiner;
import org.opensearch.neuralsearch.processor.factory.DocumentChunkingProcessorFactory;
import org.opensearch.neuralsearch.processor.factory.NormalizationProcessorFactory;
import org.opensearch.neuralsearch.processor.factory.RerankProcessorFactory;
import org.opensearch.neuralsearch.processor.factory.SparseEncodingProcessorFactory;
Expand Down Expand Up @@ -117,7 +118,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
TextImageEmbeddingProcessor.TYPE,
new TextImageEmbeddingProcessorFactory(clientAccessor, parameters.env, parameters.ingestService.getClusterService()),
DocumentChunkingProcessor.TYPE,
new DocumentChunkingProcessor.Factory(
new DocumentChunkingProcessorFactory(
parameters.env,
parameters.ingestService.getClusterService(),
parameters.indicesService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;
import org.opensearch.neuralsearch.processor.chunker.ChunkerFactory;
import org.opensearch.neuralsearch.processor.chunker.FieldChunker;
import org.opensearch.index.mapper.IndexFieldMapper;
import org.opensearch.neuralsearch.processor.chunker.FixedTokenLengthChunker;
import static org.opensearch.ingest.ConfigurationUtils.readMap;
import static org.opensearch.neuralsearch.processor.chunker.ChunkerFactory.DELIMITER_ALGORITHM;
import static org.opensearch.neuralsearch.processor.chunker.ChunkerFactory.FIXED_LENGTH_ALGORITHM;

Expand All @@ -47,9 +45,9 @@ public final class DocumentChunkingProcessor extends AbstractProcessor {

private static final int DEFAULT_MAX_CHUNK_LIMIT = -1;

private int current_chunk_count = 0;
private int currentChunkCount = 0;

private int max_chunk_limit = DEFAULT_MAX_CHUNK_LIMIT;
private int maxChunkLimit = DEFAULT_MAX_CHUNK_LIMIT;
private final Set<String> supportedChunkers = ChunkerFactory.getAllChunkers();

private String chunkerType;
Expand Down Expand Up @@ -122,11 +120,17 @@ private void validateAndParseAlgorithmMap(Map<String, Object> algorithmMap) {
this.chunkerParameters = (Map<String, Object>) algorithmValue;
chunker.validateParameters(chunkerParameters);
if (((Map<String, Object>) algorithmValue).containsKey(MAX_CHUNK_LIMIT_FIELD)) {
int max_chunk_limit = ((Number) ((Map<String, Object>) algorithmValue).get(MAX_CHUNK_LIMIT_FIELD)).intValue();
if (max_chunk_limit <= 0) {
Object maxChunkLimitObject = ((Map<String, Object>) algorithmValue).get(MAX_CHUNK_LIMIT_FIELD);
if (!(maxChunkLimitObject instanceof Number)) {
throw new IllegalArgumentException(
"Parameter [" + MAX_CHUNK_LIMIT_FIELD + "] cannot be cast to [" + Number.class.getName() + "]"
);
}
int maxChunkLimit = ((Number) maxChunkLimitObject).intValue();
if (maxChunkLimit <= 0 && maxChunkLimit != DEFAULT_MAX_CHUNK_LIMIT) {
throw new IllegalArgumentException("Parameter [" + MAX_CHUNK_LIMIT_FIELD + "] must be a positive integer");
}
this.max_chunk_limit = max_chunk_limit;
this.maxChunkLimit = maxChunkLimit;
}
}
}
Expand All @@ -148,13 +152,13 @@ private boolean isListString(Object value) {
private List<String> chunkString(String content) {
FieldChunker chunker = ChunkerFactory.create(chunkerType, analysisRegistry);
List<String> result = chunker.chunk(content, chunkerParameters);
current_chunk_count += result.size();
if (max_chunk_limit != DEFAULT_MAX_CHUNK_LIMIT && current_chunk_count > max_chunk_limit) {
currentChunkCount += result.size();
if (maxChunkLimit != DEFAULT_MAX_CHUNK_LIMIT && currentChunkCount > maxChunkLimit) {
throw new IllegalArgumentException(
"Unable to create the processor as the number of chunks ["
+ current_chunk_count
+ currentChunkCount
+ "] exceeds the maximum chunk limit ["
+ max_chunk_limit
+ maxChunkLimit
+ "]"
);
}
Expand Down Expand Up @@ -189,7 +193,7 @@ private List<String> chunkLeafType(Object value) {
@Override
public IngestDocument execute(IngestDocument ingestDocument) {
validateFieldsValue(ingestDocument);
current_chunk_count = 0;
currentChunkCount = 0;
if (Objects.equals(chunkerType, FIXED_LENGTH_ALGORITHM)) {
// add maxTokenCount setting from index metadata to chunker parameters
Map<String, Object> sourceAndMetadataMap = ingestDocument.getSourceAndMetadata();
Expand Down Expand Up @@ -283,51 +287,4 @@ private void chunkMapType(Map<String, Object> sourceAndMetadataMap, Map<String,
}
}
}

/**
* Factory for chunking ingest processor for ingestion pipeline. Instantiates processor based on user provided input.
*/
public static class Factory implements Processor.Factory {

private final Environment environment;

private final ClusterService clusterService;

private final IndicesService indicesService;

private final AnalysisRegistry analysisRegistry;

public Factory(
Environment environment,
ClusterService clusterService,
IndicesService indicesService,
AnalysisRegistry analysisRegistry
) {
this.environment = environment;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.analysisRegistry = analysisRegistry;
}

@Override
public DocumentChunkingProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
) throws Exception {
Map<String, Object> fieldMap = readMap(TYPE, processorTag, config, FIELD_MAP_FIELD);
Map<String, Object> algorithmMap = readMap(TYPE, processorTag, config, ALGORITHM_FIELD);
return new DocumentChunkingProcessor(
processorTag,
description,
fieldMap,
algorithmMap,
environment,
clusterService,
indicesService,
analysisRegistry
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,5 @@ public List<String> chunk(String content, Map<String, Object> parameters) {
chunkResult.add(content.substring(start));
}
return chunkResult;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void validateParameters(Map<String, Object> parameters) {
public List<String> chunk(String content, Map<String, Object> parameters) {
// prior to chunking, parameters have been validated
int tokenLimit = DEFAULT_TOKEN_LIMIT;
BigDecimal overlap_rate = new BigDecimal(String.valueOf(DEFAULT_OVERLAP_RATE));
BigDecimal overlap_rate = DEFAULT_OVERLAP_RATE;
int maxTokenCount = DEFAULT_MAX_TOKEN_COUNT;

String tokenizer = DEFAULT_TOKENIZER;
Expand All @@ -148,7 +148,6 @@ public List<String> chunk(String content, Map<String, Object> parameters) {
BigDecimal overlapTokenNumberBigDecimal = overlap_rate.multiply(new BigDecimal(String.valueOf(tokenLimit)))
.setScale(0, RoundingMode.DOWN);
int overlapTokenNumber = overlapTokenNumberBigDecimal.intValue();
;
// overlapTokenNumber must be smaller than the token limit
overlapTokenNumber = Math.min(overlapTokenNumber, tokenLimit - 1);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.processor.factory;

import java.util.Map;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.env.Environment;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.indices.IndicesService;
import org.opensearch.ingest.Processor;
import org.opensearch.neuralsearch.processor.DocumentChunkingProcessor;
import static org.opensearch.neuralsearch.processor.DocumentChunkingProcessor.TYPE;
import static org.opensearch.neuralsearch.processor.DocumentChunkingProcessor.FIELD_MAP_FIELD;
import static org.opensearch.neuralsearch.processor.DocumentChunkingProcessor.ALGORITHM_FIELD;
import static org.opensearch.ingest.ConfigurationUtils.readMap;

/**
* Factory for chunking ingest processor for ingestion pipeline.
* Instantiates processor based on user provided input.
*/
public class DocumentChunkingProcessorFactory implements Processor.Factory {

private final Environment environment;

private final ClusterService clusterService;

private final IndicesService indicesService;

private final AnalysisRegistry analysisRegistry;

public DocumentChunkingProcessorFactory(
Environment environment,
ClusterService clusterService,
IndicesService indicesService,
AnalysisRegistry analysisRegistry
) {
this.environment = environment;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.analysisRegistry = analysisRegistry;
}

@Override
public DocumentChunkingProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
) throws Exception {
Map<String, Object> fieldMap = readMap(TYPE, processorTag, config, FIELD_MAP_FIELD);
Map<String, Object> algorithmMap = readMap(TYPE, processorTag, config, ALGORITHM_FIELD);
return new DocumentChunkingProcessor(
processorTag,
description,
fieldMap,
algorithmMap,
environment,
clusterService,
indicesService,
analysisRegistry
);
}
}
Loading

0 comments on commit 30e75a1

Please sign in to comment.