Skip to content

Commit

Permalink
[Feature/extensions] Get namedXContentRegistry from ExtensionsRunner (#…
Browse files Browse the repository at this point in the history
…725)

* Extend BaseExtension to simplify boilerplate

Signed-off-by: Daniel Widdis <[email protected]>

* Pass ExtensionsRunner and Extension to Rest Action

Signed-off-by: Daniel Widdis <[email protected]>

* Use XContentRegistry in Rest Action

Signed-off-by: Daniel Widdis <[email protected]>

* AD YAML config is handled in another PR

Signed-off-by: Daniel Widdis <[email protected]>

* Extension settings are now private with a getter

Signed-off-by: Daniel Widdis <[email protected]>

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Nov 18, 2022
1 parent bcd5a3e commit fa81e2e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 101 deletions.
56 changes: 26 additions & 30 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
@@ -1,51 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ad;

import static java.util.Collections.unmodifiableList;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.rest.RestCreateDetectorAction;
import org.opensearch.ad.rest.RestGetDetectorAction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.sdk.Extension;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.sdk.BaseExtension;
import org.opensearch.sdk.ExtensionRestHandler;
import org.opensearch.sdk.ExtensionSettings;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient;
import org.opensearch.threadpool.ThreadPool;

import com.google.common.collect.ImmutableList;

public class AnomalyDetectorExtension implements Extension {
public class AnomalyDetectorExtension extends BaseExtension {

private static final String EXTENSION_SETTINGS_PATH = "/ad-extension.yml";

private ExtensionSettings settings;

public AnomalyDetectorExtension() {
try {
this.settings = initializeSettings();
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}

@Override
public ExtensionSettings getExtensionSettings() {
return this.settings;
super(EXTENSION_SETTINGS_PATH);
}

@Override
public List<ExtensionRestHandler> getExtensionRestHandlers() {
return List.of(new RestCreateDetectorAction(), new RestGetDetectorAction());
return List.of(new RestCreateDetectorAction(extensionsRunner, this), new RestGetDetectorAction());
}

@Override
Expand Down Expand Up @@ -90,22 +88,20 @@ public List<Setting<?>> getSettings() {
}

@Override
public Collection<Object> createComponents(SDKClient sdkClient, ClusterService clusterService, ThreadPool threadPool) {
return null;
}

private static ExtensionSettings initializeSettings() throws IOException {
ExtensionSettings settings = Extension.readSettingsFromYaml(EXTENSION_SETTINGS_PATH);
if (settings == null || settings.getHostAddress() == null || settings.getHostPort() == null) {
throw new IOException("Failed to initialize Extension settings. No port bound.");
}
return settings;
public List<NamedXContentRegistry.Entry> getNamedXContent() {
// Copied from AnomalyDetectorPlugin getNamedXContent
return ImmutableList.of(AnomalyDetector.XCONTENT_REGISTRY, AnomalyResult.XCONTENT_REGISTRY, DetectorInternalState.XCONTENT_REGISTRY
// Pending Job Scheduler Integration
// AnomalyDetectorJob.XCONTENT_REGISTRY
);
}

// TODO: replace or override client object on BaseExtension
// https://github.com/opensearch-project/opensearch-sdk-java/issues/160
public OpenSearchClient getClient() {
SDKClient sdkClient = new SDKClient();
OpenSearchClient client = sdkClient
.initializeClient(settings.getOpensearchAddress(), Integer.parseInt(settings.getOpensearchPort()));
.initializeClient(getExtensionSettings().getOpensearchAddress(), Integer.parseInt(getExtensionSettings().getOpensearchPort()));
return client;
}

Expand Down
95 changes: 24 additions & 71 deletions src/main/java/org/opensearch/ad/rest/RestCreateDetectorAction.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ad.rest;

import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTORS_INDEX_MAPPING_FILE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestRequest.Method.POST;
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.rest.RestStatus.CREATED;
import static org.opensearch.rest.RestStatus.NOT_FOUND;
import static org.opensearch.rest.RestStatus.OK;

import java.io.*;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
Expand All @@ -21,8 +31,6 @@
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
Expand All @@ -37,29 +45,25 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.extensions.rest.ExtensionRestRequest;
import org.opensearch.extensions.rest.ExtensionRestResponse;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.NestedQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.rest.RestHandler.Route;
import org.opensearch.rest.RestRequest.Method;
import org.opensearch.rest.RestStatus;
import org.opensearch.sdk.ExtensionRestHandler;
import org.opensearch.search.aggregations.BaseAggregationBuilder;
import org.opensearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.opensearch.search.aggregations.bucket.filter.InternalFilter;
import org.opensearch.search.aggregations.metrics.InternalSum;
import org.opensearch.search.aggregations.metrics.SumAggregationBuilder;
import org.opensearch.sdk.ExtensionsRunner;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import jakarta.json.stream.JsonParser;

public class RestCreateDetectorAction implements ExtensionRestHandler {
private final Logger logger = LogManager.getLogger(RestCreateDetectorAction.class);
private AnomalyDetectorExtension anomalyDetectorExtension = new AnomalyDetectorExtension();
private OpenSearchClient sdkClient = anomalyDetectorExtension.getClient();
private static final Logger logger = LogManager.getLogger(RestCreateDetectorAction.class);

private final OpenSearchClient sdkClient;
private final NamedXContentRegistry xContentRegistry;

public RestCreateDetectorAction(ExtensionsRunner runner, AnomalyDetectorExtension extension) {
this.xContentRegistry = runner.getNamedXContentRegistry().getRegistry();
this.sdkClient = extension.getClient();
}

@Override
public List<Route> routes() {
Expand Down Expand Up @@ -101,56 +105,6 @@ private IndexResponse indexAnomalyDetector(AnomalyDetector anomalyDetector) thro

}

public List<NamedXContentRegistry.Entry> getNamedXWriteables() {
List<NamedXContentRegistry.Entry> entries = new ArrayList<>();
entries.add(AnomalyDetector.XCONTENT_REGISTRY);
entries.add(AnomalyResult.XCONTENT_REGISTRY);
entries.add(DetectorInternalState.XCONTENT_REGISTRY);
entries
.add(
registerQuery(
new SearchPlugin.QuerySpec<>(NestedQueryBuilder.NAME, NestedQueryBuilder::new, NestedQueryBuilder::fromXContent)
)
);
entries
.add(registerQuery(new SearchPlugin.QuerySpec<>(BoolQueryBuilder.NAME, BoolQueryBuilder::new, BoolQueryBuilder::fromXContent)));
entries
.add(
registerAggregation(
new SearchPlugin.AggregationSpec(SumAggregationBuilder.NAME, SumAggregationBuilder::new, SumAggregationBuilder.PARSER)
.addResultReader(InternalSum::new)
)
);

entries
.add(
registerAggregation(
new SearchPlugin.AggregationSpec(
FilterAggregationBuilder.NAME,
FilterAggregationBuilder::new,
FilterAggregationBuilder::parse
).addResultReader(InternalFilter::new)
)
);
entries
.add(
registerQuery(new SearchPlugin.QuerySpec<>(RangeQueryBuilder.NAME, RangeQueryBuilder::new, RangeQueryBuilder::fromXContent))
);
return entries;

}

private NamedXContentRegistry.Entry registerQuery(SearchPlugin.QuerySpec<?> spec) {
return new NamedXContentRegistry.Entry(QueryBuilder.class, spec.getName(), (p, c) -> spec.getParser().fromXContent(p));
}

private NamedXContentRegistry.Entry registerAggregation(SearchPlugin.AggregationSpec spec) {
return new NamedXContentRegistry.Entry(BaseAggregationBuilder.class, spec.getName(), (p, c) -> {
String name = (String) c;
return spec.getParser().parse(p, name);
});
}

private CreateIndexRequest initAnomalyDetectorIndex() {
JsonpMapper mapper = sdkClient._transport().jsonpMapper();
JsonParser parser = null;
Expand Down Expand Up @@ -190,13 +144,12 @@ public ExtensionRestResponse handleRequest(ExtensionRestRequest request) {
);
}

NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(getNamedXWriteables());
XContentParser parser;
AnomalyDetector detector;
XContentBuilder builder = null;
CreateIndexRequest createIndexRequest;
try {
parser = request.contentParser(xContentRegistry);
parser = request.contentParser(this.xContentRegistry);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
detector = AnomalyDetector.parse(parser);
createIndexRequest = initAnomalyDetectorIndex();
Expand All @@ -211,7 +164,7 @@ public ExtensionRestResponse handleRequest(ExtensionRestRequest request) {
builder.field("seqNo", indexResponse.seqNo());
builder.field("primaryTerm", indexResponse.primaryTerm());
builder.field("detector", detector);
builder.field("status", RestStatus.CREATED);
builder.field("status", CREATED);
builder.endObject();
} catch (IOException e) {
e.printStackTrace();
Expand Down

0 comments on commit fa81e2e

Please sign in to comment.