Skip to content

Commit

Permalink
Passing IngestService to processor factory with processor params
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Gaievski <[email protected]>
  • Loading branch information
martin-gaievski committed Nov 15, 2023
1 parent 675dd41 commit e670524
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069))
- [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672))
- Allowing pipeline processors to access index maping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.indices.IndicesService;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -128,7 +129,8 @@ public IngestService(
ScriptService scriptService,
AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins,
Client client
Client client,
IndicesService indicesService
) {
this.clusterService = clusterService;
this.scriptService = scriptService;
Expand All @@ -143,7 +145,8 @@ public IngestService(
(delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC),
this,
client,
threadPool.generic()::execute
threadPool.generic()::execute,
indicesService
)
);
this.threadPool = threadPool;
Expand Down
7 changes: 6 additions & 1 deletion server/src/main/java/org/opensearch/ingest/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.env.Environment;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.indices.IndicesService;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.Scheduler;

Expand Down Expand Up @@ -156,6 +157,8 @@ class Parameters {
*/
public final Client client;

public final IndicesService indicesService;

public Parameters(
Environment env,
ScriptService scriptService,
Expand All @@ -165,7 +168,8 @@ public Parameters(
BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
IngestService ingestService,
Client client,
Consumer<Runnable> genericExecutor
Consumer<Runnable> genericExecutor,
IndicesService indicesService
) {
this.env = env;
this.scriptService = scriptService;
Expand All @@ -176,6 +180,7 @@ public Parameters(
this.ingestService = ingestService;
this.client = client;
this.genericExecutor = genericExecutor;
this.indicesService = indicesService;
}

}
Expand Down
20 changes: 11 additions & 9 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -629,15 +629,6 @@ protected Node(
metricsRegistry = metricsRegistryFactory.getMetricsRegistry();
resourcesToClose.add(tracer::close);
resourcesToClose.add(metricsRegistry::close);
final IngestService ingestService = new IngestService(
clusterService,
threadPool,
this.environment,
scriptService,
analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class),
client
);

final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();
Expand Down Expand Up @@ -823,6 +814,17 @@ protected Node(
recoverySettings
);

final IngestService ingestService = new IngestService(
clusterService,
threadPool,
this.environment,
scriptService,
analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class),
client,
indicesService
);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down
15 changes: 10 additions & 5 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.indices.IndicesService;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.script.MockScriptEngine;
import org.opensearch.script.Script;
Expand Down Expand Up @@ -149,7 +150,8 @@ public void testIngestPlugin() {
null,
null,
Collections.singletonList(DUMMY_PLUGIN),
client
client,
mock(IndicesService.class)
);
Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
assertTrue(factories.containsKey("foo"));
Expand All @@ -167,7 +169,8 @@ public void testIngestPluginDuplicate() {
null,
null,
Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN),
client
client,
mock(IndicesService.class)
)
);
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
Expand All @@ -182,7 +185,8 @@ public void testExecuteIndexPipelineDoesNotExist() {
null,
null,
Collections.singletonList(DUMMY_PLUGIN),
client
client,
mock(IndicesService.class)
);
final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
.source(emptyMap())
Expand Down Expand Up @@ -1485,7 +1489,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
null,
null,
Arrays.asList(testPlugin),
client
client,
mock(IndicesService.class)
);
ingestService.addIngestClusterStateListener(ingestClusterStateListener);

Expand Down Expand Up @@ -1702,7 +1707,7 @@ private static IngestService createWithProcessors(Map<String, Processor.Factory>
public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
return processors;
}
}), client);
}), client, mock(IndicesService.class));
}

private CompoundProcessor mockCompoundProcessor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2209,7 +2209,8 @@ public void onFailure(final Exception e) {
scriptService,
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
Collections.emptyList(),
client
client,
indicesService
),
transportShardBulkAction,
client,
Expand Down

0 comments on commit e670524

Please sign in to comment.