Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB authored Oct 25, 2023
2 parents 61f96c8 + a5512f5 commit cd147d4
Show file tree
Hide file tree
Showing 92 changed files with 5,487 additions and 1,010 deletions.
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ repositories {
dependencies {
api "org.antlr:antlr4-runtime:4.7.1"
api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
api group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.20.0'
api group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}"
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.common.setting;

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.List;
Expand Down Expand Up @@ -36,7 +38,9 @@ public enum Key {
METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),
SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");
CLUSTER_NAME("cluster.name"),
SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"),
SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit");

@Getter private final String keyValue;

Expand All @@ -60,4 +64,9 @@ public static Optional<Key> of(String keyValue) {
public abstract <T> T getSettingValue(Key key);

public abstract List<?> getSettings();

/** Helper class */
public static boolean isSparkExecutionSessionEnabled(Settings settings) {
return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.datasource;

import java.util.Map;
import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand Down Expand Up @@ -56,12 +57,19 @@ public interface DataSourceService {
void createDataSource(DataSourceMetadata metadata);

/**
* Updates {@link DataSource} corresponding to dataSourceMetadata.
* Updates {@link DataSource} corresponding to dataSourceMetadata (all fields needed).
*
* @param dataSourceMetadata {@link DataSourceMetadata}.
*/
void updateDataSource(DataSourceMetadata dataSourceMetadata);

/**
* Patches {@link DataSource} corresponding to the given name (only fields to be changed needed).
*
* @param dataSourceData
*/
void patchDataSource(Map<String, Object> dataSourceData);

/**
* Deletes {@link DataSource} corresponding to the DataSource name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ public DataSource getDataSource(String dataSourceName) {
@Override
public void updateDataSource(DataSourceMetadata dataSourceMetadata) {}

@Override
public void patchDataSource(Map<String, Object> dataSourceData) {}

@Override
public void deleteDataSource(String dataSourceName) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.model.transport;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.CONNECTOR_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;

import java.io.IOException;
import java.util.Map;
import lombok.Getter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;

public class PatchDataSourceActionRequest extends ActionRequest {

@Getter private Map<String, Object> dataSourceData;

/** Constructor of UpdateDataSourceActionRequest from StreamInput. */
public PatchDataSourceActionRequest(StreamInput in) throws IOException {
super(in);
}

public PatchDataSourceActionRequest(Map<String, Object> dataSourceData) {
this.dataSourceData = dataSourceData;
}

@Override
public ActionRequestValidationException validate() {
if (this.dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) {
ActionRequestValidationException exception = new ActionRequestValidationException();
exception.addValidationError(
"Not allowed to update datasource with name : " + DEFAULT_DATASOURCE_NAME);
return exception;
} else if (this.dataSourceData.get(CONNECTOR_FIELD) != null) {
ActionRequestValidationException exception = new ActionRequestValidationException();
exception.addValidationError("Not allowed to update connector for datasource");
return exception;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.model.transport;

import java.io.IOException;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

@RequiredArgsConstructor
public class PatchDataSourceActionResponse extends ActionResponse {

@Getter private final String result;

public PatchDataSourceActionResponse(StreamInput in) throws IOException {
super(in);
result = in.readString();
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
streamOutput.writeString(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.NOT_FOUND;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.rest.RestRequest.Method.DELETE;
import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.POST;
import static org.opensearch.rest.RestRequest.Method.PUT;
import static org.opensearch.rest.RestRequest.Method.*;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
Expand All @@ -32,18 +30,8 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse;
import org.opensearch.sql.datasources.transport.TransportCreateDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportDeleteDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportGetDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportUpdateDataSourceAction;
import org.opensearch.sql.datasources.model.transport.*;
import org.opensearch.sql.datasources.transport.*;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.datasources.utils.XContentParserUtils;

Expand Down Expand Up @@ -98,6 +86,17 @@ public List<Route> routes() {
*/
new Route(PUT, BASE_DATASOURCE_ACTION_URL),

/*
* PATCH datasources
* Request body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.PatchDataSourceActionRequest]
* Response body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.PatchDataSourceActionResponse]
*/
new Route(PATCH, BASE_DATASOURCE_ACTION_URL),

/*
* DELETE datasources
* Request body: Ref
Expand All @@ -122,6 +121,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
return executeUpdateRequest(restRequest, nodeClient);
case DELETE:
return executeDeleteRequest(restRequest, nodeClient);
case PATCH:
return executePatchRequest(restRequest, nodeClient);
default:
return restChannel ->
restChannel.sendResponse(
Expand Down Expand Up @@ -216,6 +217,34 @@ public void onFailure(Exception e) {
}));
}

private RestChannelConsumer executePatchRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
Map<String, Object> dataSourceData = XContentParserUtils.toMap(restRequest.contentParser());
return restChannel ->
Scheduler.schedule(
nodeClient,
() ->
nodeClient.execute(
TransportPatchDataSourceAction.ACTION_TYPE,
new PatchDataSourceActionRequest(dataSourceData),
new ActionListener<>() {
@Override
public void onResponse(
PatchDataSourceActionResponse patchDataSourceActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.OK,
"application/json; charset=UTF-8",
patchDataSourceActionResponse.getResult()));
}

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}
}));
}

private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) {

String dataSourceName = restRequest.param("dataSourceName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
package org.opensearch.sql.datasources.service;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.*;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
Expand Down Expand Up @@ -100,6 +96,19 @@ public void updateDataSource(DataSourceMetadata dataSourceMetadata) {
}
}

@Override
public void patchDataSource(Map<String, Object> dataSourceData) {
if (!dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) {
DataSourceMetadata dataSourceMetadata =
getRawDataSourceMetadata((String) dataSourceData.get(NAME_FIELD));
replaceOldDatasourceMetadata(dataSourceData, dataSourceMetadata);
updateDataSource(dataSourceMetadata);
} else {
throw new UnsupportedOperationException(
"Not allowed to update default datasource :" + DEFAULT_DATASOURCE_NAME);
}
}

@Override
public void deleteDataSource(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
Expand Down Expand Up @@ -136,6 +145,35 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) {
+ " Properties are required parameters.");
}

/**
* Replaces the fields in the map of the given metadata.
*
* @param dataSourceData
* @param metadata {@link DataSourceMetadata}.
*/
private void replaceOldDatasourceMetadata(
Map<String, Object> dataSourceData, DataSourceMetadata metadata) {

for (String key : dataSourceData.keySet()) {
switch (key) {
// Name and connector should not be modified
case DESCRIPTION_FIELD:
metadata.setDescription((String) dataSourceData.get(DESCRIPTION_FIELD));
break;
case ALLOWED_ROLES_FIELD:
metadata.setAllowedRoles((List<String>) dataSourceData.get(ALLOWED_ROLES_FIELD));
break;
case PROPERTIES_FIELD:
Map<String, String> properties = new HashMap<>(metadata.getProperties());
properties.putAll(((Map<String, String>) dataSourceData.get(PROPERTIES_FIELD)));
break;
case NAME_FIELD:
case CONNECTOR_FIELD:
break;
}
}
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
Expand Down
Loading

0 comments on commit cd147d4

Please sign in to comment.